added additional files
This commit is contained in:
432
cropdetect.py
Normal file
432
cropdetect.py
Normal file
@@ -0,0 +1,432 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from collections import Counter
|
||||||
|
import shutil
|
||||||
|
import multiprocessing
|
||||||
|
import json
|
||||||
|
|
||||||
|
# ANSI color codes
|
||||||
|
COLOR_GREEN = "\033[92m"
|
||||||
|
COLOR_RED = "\033[91m"
|
||||||
|
COLOR_YELLOW = "\033[93m"
|
||||||
|
COLOR_RESET = "\033[0m"
|
||||||
|
|
||||||
|
def check_prerequisites():
|
||||||
|
"""Checks if required tools are available."""
|
||||||
|
print("--- Prerequisite Check ---")
|
||||||
|
all_found = True
|
||||||
|
for tool in ['ffmpeg', 'ffprobe']:
|
||||||
|
if not shutil.which(tool):
|
||||||
|
print(f"Error: '{tool}' command not found. Is it installed and in your PATH?")
|
||||||
|
all_found = False
|
||||||
|
if not all_found:
|
||||||
|
sys.exit(1)
|
||||||
|
print("All required tools found.")
|
||||||
|
|
||||||
|
def analyze_segment(task_args):
|
||||||
|
"""Function to be run by each worker process. Analyzes one video segment."""
|
||||||
|
seek_time, input_file, width, height = task_args
|
||||||
|
|
||||||
|
ffmpeg_args = [
|
||||||
|
'ffmpeg', '-hide_banner',
|
||||||
|
'-ss', str(seek_time),
|
||||||
|
'-i', input_file, '-t', '1', '-vf', 'cropdetect',
|
||||||
|
'-f', 'null', '-'
|
||||||
|
]
|
||||||
|
|
||||||
|
result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8')
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
return [] # Return empty list on error
|
||||||
|
|
||||||
|
crop_detections = re.findall(r'crop=(\d+):(\d+):(\d+):(\d+)', result.stderr)
|
||||||
|
|
||||||
|
significant_crops = []
|
||||||
|
for w_str, h_str, x_str, y_str in crop_detections:
|
||||||
|
w, h, x, y = map(int, [w_str, h_str, x_str, y_str])
|
||||||
|
|
||||||
|
# Return the crop string along with the timestamp it was found at
|
||||||
|
significant_crops.append((f"crop={w}:{h}:{x}:{y}", seek_time))
|
||||||
|
|
||||||
|
return significant_crops
|
||||||
|
|
||||||
|
def get_frame_luma(input_file, seek_time):
|
||||||
|
"""Analyzes a single frame at a given timestamp to get its average luma."""
|
||||||
|
ffmpeg_args = [
|
||||||
|
'ffmpeg', '-hide_banner',
|
||||||
|
'-ss', str(seek_time),
|
||||||
|
'-i', input_file,
|
||||||
|
'-t', '1',
|
||||||
|
'-vf', 'signalstats',
|
||||||
|
'-f', 'null', '-'
|
||||||
|
]
|
||||||
|
result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8')
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
return None # Error during analysis
|
||||||
|
|
||||||
|
# Find the average luma (YAVG) for the frame
|
||||||
|
match = re.search(r'YAVG:([0-9.]+)', result.stderr)
|
||||||
|
if match:
|
||||||
|
return float(match.group(1))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def check_luma_for_group(task_args):
|
||||||
|
"""Worker function to check the luma for a single group."""
|
||||||
|
group_key, sample_ts, input_file, luma_threshold = task_args
|
||||||
|
luma = get_frame_luma(input_file, sample_ts)
|
||||||
|
is_bright = luma is not None and luma >= luma_threshold
|
||||||
|
return (group_key, is_bright)
|
||||||
|
|
||||||
|
KNOWN_ASPECT_RATIOS = [
|
||||||
|
{"name": "HDTV (16:9)", "ratio": 16/9},
|
||||||
|
{"name": "Widescreen (Scope)", "ratio": 2.39},
|
||||||
|
{"name": "Widescreen (Flat)", "ratio": 1.85},
|
||||||
|
{"name": "IMAX Digital (1.90:1)", "ratio": 1.90},
|
||||||
|
{"name": "Fullscreen (4:3)", "ratio": 4/3},
|
||||||
|
{"name": "IMAX 70mm (1.43:1)", "ratio": 1.43},
|
||||||
|
]
|
||||||
|
|
||||||
|
def snap_to_known_ar(w, h, x, y, video_w, video_h, tolerance=0.03):
|
||||||
|
"""Snaps a crop rectangle to the nearest standard aspect ratio if it's close enough."""
|
||||||
|
if h == 0: return f"crop={w}:{h}:{x}:{y}", None
|
||||||
|
detected_ratio = w / h
|
||||||
|
|
||||||
|
best_match = None
|
||||||
|
smallest_diff = float('inf')
|
||||||
|
|
||||||
|
for ar in KNOWN_ASPECT_RATIOS:
|
||||||
|
diff = abs(detected_ratio - ar['ratio'])
|
||||||
|
if diff < smallest_diff:
|
||||||
|
smallest_diff = diff
|
||||||
|
best_match = ar
|
||||||
|
|
||||||
|
# If the best match is not within the tolerance, return the original
|
||||||
|
if not best_match or (smallest_diff / best_match['ratio']) >= tolerance:
|
||||||
|
return f"crop={w}:{h}:{x}:{y}", None
|
||||||
|
|
||||||
|
# Match found, now snap the dimensions.
|
||||||
|
# Heuristic: if width is close to full video width, it's letterboxed.
|
||||||
|
if abs(w - video_w) < 16:
|
||||||
|
new_h = round(video_w / best_match['ratio'])
|
||||||
|
|
||||||
|
# Round height up to the nearest multiple of 8 for cleaner dimensions and less aggressive cropping.
|
||||||
|
if new_h % 8 != 0:
|
||||||
|
new_h = new_h + (8 - (new_h % 8))
|
||||||
|
|
||||||
|
new_y = round((video_h - new_h) / 2)
|
||||||
|
# Ensure y offset is an even number for compatibility.
|
||||||
|
if new_y % 2 != 0:
|
||||||
|
new_y -= 1
|
||||||
|
|
||||||
|
return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name']
|
||||||
|
|
||||||
|
# Heuristic: if height is close to full video height, it's pillarboxed.
|
||||||
|
if abs(h - video_h) < 16:
|
||||||
|
new_w = round(video_h * best_match['ratio'])
|
||||||
|
|
||||||
|
# Round width up to the nearest multiple of 8.
|
||||||
|
if new_w % 8 != 0:
|
||||||
|
new_w = new_w + (8 - (new_w % 8))
|
||||||
|
|
||||||
|
new_x = round((video_w - new_w) / 2)
|
||||||
|
# Ensure x offset is an even number.
|
||||||
|
if new_x % 2 != 0:
|
||||||
|
new_x -= 1
|
||||||
|
|
||||||
|
return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name']
|
||||||
|
|
||||||
|
# If not clearly letterboxed or pillarboxed, don't snap.
|
||||||
|
return f"crop={w}:{h}:{x}:{y}", None
|
||||||
|
|
||||||
|
def cluster_crop_values(crop_counts, tolerance=8):
|
||||||
|
"""Groups similar crop values into clusters based on the top-left corner."""
|
||||||
|
clusters = []
|
||||||
|
temp_counts = crop_counts.copy()
|
||||||
|
|
||||||
|
while temp_counts:
|
||||||
|
# Get the most frequent remaining crop as the new cluster center
|
||||||
|
center_str, _ = temp_counts.most_common(1)[0]
|
||||||
|
|
||||||
|
try:
|
||||||
|
_, values = center_str.split('=')
|
||||||
|
cw, ch, cx, cy = map(int, values.split(':'))
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
del temp_counts[center_str] # Skip malformed strings
|
||||||
|
continue
|
||||||
|
|
||||||
|
cluster_total_count = 0
|
||||||
|
crops_to_remove = []
|
||||||
|
|
||||||
|
# Find all crops "close" to the center
|
||||||
|
for crop_str, count in temp_counts.items():
|
||||||
|
try:
|
||||||
|
_, values = crop_str.split('=')
|
||||||
|
w, h, x, y = map(int, values.split(':'))
|
||||||
|
if abs(x - cx) <= tolerance and abs(y - cy) <= tolerance:
|
||||||
|
cluster_total_count += count
|
||||||
|
crops_to_remove.append(crop_str)
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if cluster_total_count > 0:
|
||||||
|
clusters.append({'center': center_str, 'count': cluster_total_count})
|
||||||
|
|
||||||
|
# Remove the clustered crops from the temporary counter
|
||||||
|
for crop_str in crops_to_remove:
|
||||||
|
del temp_counts[crop_str]
|
||||||
|
|
||||||
|
clusters.sort(key=lambda c: c['count'], reverse=True)
|
||||||
|
return clusters
|
||||||
|
|
||||||
|
def parse_crop_string(crop_str):
|
||||||
|
"""Parses a 'crop=w:h:x:y' string into a dictionary of integers."""
|
||||||
|
try:
|
||||||
|
_, values = crop_str.split('=')
|
||||||
|
w, h, x, y = map(int, values.split(':'))
|
||||||
|
return {'w': w, 'h': h, 'x': x, 'y': y}
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def calculate_bounding_box(crop_keys):
|
||||||
|
"""Calculates a bounding box that contains all given crop rectangles."""
|
||||||
|
min_x = min_w = min_y = min_h = float('inf')
|
||||||
|
max_x = max_w = max_y = max_h = float('-inf')
|
||||||
|
|
||||||
|
for key in crop_keys:
|
||||||
|
parsed = parse_crop_string(key)
|
||||||
|
if not parsed:
|
||||||
|
continue
|
||||||
|
|
||||||
|
w, h, x, y = parsed['w'], parsed['h'], parsed['x'], parsed['y']
|
||||||
|
|
||||||
|
min_x = min(min_x, x)
|
||||||
|
min_y = min(min_y, y)
|
||||||
|
max_x = max(max_x, x + w)
|
||||||
|
max_y = max(max_y, y + h)
|
||||||
|
|
||||||
|
min_w = min(min_w, w)
|
||||||
|
min_h = min(min_h, h)
|
||||||
|
max_w = max(max_w, w)
|
||||||
|
max_h = max(max_h, h)
|
||||||
|
|
||||||
|
# Heuristic: if the bounding box is very close to the min/max, it means all crops were similar
|
||||||
|
if (max_x - min_x) <= 2 and (max_y - min_y) <= 2:
|
||||||
|
return None # Too uniform, don't create a bounding box
|
||||||
|
|
||||||
|
# Create a crop that spans the entire bounding box
|
||||||
|
bounding_crop = f"crop={max_x - min_x}:{max_y - min_y}:{min_x}:{min_y}"
|
||||||
|
|
||||||
|
return bounding_crop
|
||||||
|
|
||||||
|
def is_major_crop(crop_str, video_w, video_h, min_crop_size):
|
||||||
|
"""Checks if a crop is significant enough to be recommended by checking if any side is cropped by at least min_crop_size pixels."""
|
||||||
|
parsed = parse_crop_string(crop_str)
|
||||||
|
if not parsed:
|
||||||
|
return False
|
||||||
|
|
||||||
|
w, h, x, y = parsed['w'], parsed['h'], parsed['x'], parsed['y']
|
||||||
|
|
||||||
|
# Calculate how much is cropped from each side
|
||||||
|
crop_top = y
|
||||||
|
crop_bottom = video_h - (y + h)
|
||||||
|
crop_left = x
|
||||||
|
crop_right = video_w - (x + w)
|
||||||
|
|
||||||
|
# Return True if the largest crop on any single side meets the threshold
|
||||||
|
if max(crop_top, crop_bottom, crop_left, crop_right) >= min_crop_size:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def analyze_video(input_file, duration, width, height, num_workers, significant_crop_threshold, min_crop, debug=False):
|
||||||
|
"""Main analysis function for the video."""
|
||||||
|
print(f"\n--- Analyzing Video: {os.path.basename(input_file)} ---")
|
||||||
|
|
||||||
|
# Step 1: Analyze video in segments to detect crops
|
||||||
|
num_tasks = num_workers * 4
|
||||||
|
segment_duration = max(1, duration // num_tasks)
|
||||||
|
tasks = [(i * segment_duration, input_file, width, height) for i in range(num_tasks)]
|
||||||
|
|
||||||
|
print(f"Analyzing {len(tasks)} segments across {num_workers} worker(s)...")
|
||||||
|
|
||||||
|
crop_results = []
|
||||||
|
with multiprocessing.Pool(processes=num_workers) as pool:
|
||||||
|
total_tasks = len(tasks)
|
||||||
|
results_iterator = pool.imap_unordered(analyze_segment, tasks)
|
||||||
|
|
||||||
|
for i, result in enumerate(results_iterator, 1):
|
||||||
|
crop_results.append(result)
|
||||||
|
progress_message = f"Analyzing Segments: {i}/{total_tasks} completed..."
|
||||||
|
sys.stdout.write(f"\r{progress_message}")
|
||||||
|
sys.stdout.flush()
|
||||||
|
print()
|
||||||
|
|
||||||
|
all_crops_with_ts = [crop for sublist in crop_results for crop in sublist]
|
||||||
|
all_crop_strings = [item[0] for item in all_crops_with_ts]
|
||||||
|
if not all_crop_strings:
|
||||||
|
print(f"\n{COLOR_GREEN}Analysis complete. No black bars detected.{COLOR_RESET}")
|
||||||
|
return
|
||||||
|
|
||||||
|
crop_counts = Counter(all_crop_strings)
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
print("\n--- Debug: Most Common Raw Detections ---")
|
||||||
|
for crop_str, count in crop_counts.most_common(10):
|
||||||
|
print(f" - {crop_str} (Count: {count})")
|
||||||
|
|
||||||
|
# Step 2: Cluster similar crop values
|
||||||
|
clusters = cluster_crop_values(crop_counts)
|
||||||
|
total_detections = sum(c['count'] for c in clusters)
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
print("\n--- Debug: Detected Clusters ---")
|
||||||
|
for cluster in clusters:
|
||||||
|
percentage = (cluster['count'] / total_detections) * 100
|
||||||
|
print(f" - Center: {cluster['center']}, Count: {cluster['count']} ({percentage:.1f}%)")
|
||||||
|
|
||||||
|
# Step 3: Filter clusters that are below the significance threshold
|
||||||
|
significant_clusters = []
|
||||||
|
for cluster in clusters:
|
||||||
|
percentage = (cluster['count'] / total_detections) * 100
|
||||||
|
if percentage >= significant_crop_threshold:
|
||||||
|
significant_clusters.append(cluster)
|
||||||
|
|
||||||
|
# Step 4: Determine final recommendation based on significant clusters
|
||||||
|
print("\n--- Determining Final Crop Recommendation ---")
|
||||||
|
|
||||||
|
for cluster in significant_clusters:
|
||||||
|
parsed_crop = parse_crop_string(cluster['center'])
|
||||||
|
if parsed_crop:
|
||||||
|
_, ar_label = snap_to_known_ar(
|
||||||
|
parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height
|
||||||
|
)
|
||||||
|
cluster['ar_label'] = ar_label
|
||||||
|
else:
|
||||||
|
cluster['ar_label'] = None
|
||||||
|
|
||||||
|
if not significant_clusters:
|
||||||
|
print(f"{COLOR_RED}No single crop value meets the {significant_crop_threshold}% significance threshold.{COLOR_RESET}")
|
||||||
|
print("Recommendation: Do not crop. Try lowering the -sct threshold.")
|
||||||
|
|
||||||
|
elif len(significant_clusters) == 1:
|
||||||
|
dominant_cluster = significant_clusters[0]
|
||||||
|
parsed_crop = parse_crop_string(dominant_cluster['center'])
|
||||||
|
snapped_crop, ar_label = snap_to_known_ar(
|
||||||
|
parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height
|
||||||
|
)
|
||||||
|
|
||||||
|
print("A single dominant aspect ratio was found.")
|
||||||
|
if ar_label:
|
||||||
|
print(f"The detected crop snaps to the '{ar_label}' aspect ratio.")
|
||||||
|
|
||||||
|
# Check if the final crop is a no-op (i.e., matches source dimensions)
|
||||||
|
parsed_snapped = parse_crop_string(snapped_crop)
|
||||||
|
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
|
||||||
|
print(f"\n{COLOR_GREEN}The detected crop matches the source resolution. No crop is needed.{COLOR_RESET}")
|
||||||
|
else:
|
||||||
|
print(f"\n{COLOR_GREEN}Recommended crop filter: -vf {snapped_crop}{COLOR_RESET}")
|
||||||
|
|
||||||
|
else: # len > 1, mixed AR case
|
||||||
|
print(f"{COLOR_YELLOW}Mixed aspect ratios detected (e.g., IMAX scenes).{COLOR_RESET}")
|
||||||
|
print("Calculating a safe 'master' crop to contain all significant scenes.")
|
||||||
|
|
||||||
|
crop_keys = [c['center'] for c in significant_clusters]
|
||||||
|
bounding_box_crop = calculate_bounding_box(crop_keys)
|
||||||
|
|
||||||
|
if bounding_box_crop:
|
||||||
|
parsed_bb = parse_crop_string(bounding_box_crop)
|
||||||
|
snapped_crop, ar_label = snap_to_known_ar(
|
||||||
|
parsed_bb['w'], parsed_bb['h'], parsed_bb['x'], parsed_bb['y'], width, height
|
||||||
|
)
|
||||||
|
|
||||||
|
print("\n--- Detected Significant Ratios ---")
|
||||||
|
for cluster in significant_clusters:
|
||||||
|
percentage = (cluster['count'] / total_detections) * 100
|
||||||
|
label = f"'{cluster['ar_label']}'" if cluster['ar_label'] else "Custom AR"
|
||||||
|
print(f" - {label} ({cluster['center']}) was found in {percentage:.1f}% of samples.")
|
||||||
|
|
||||||
|
print(f"\n{COLOR_GREEN}Analysis complete.{COLOR_RESET}")
|
||||||
|
if ar_label:
|
||||||
|
print(f"The calculated master crop snaps to the '{ar_label}' aspect ratio.")
|
||||||
|
|
||||||
|
# Check if the final crop is a no-op
|
||||||
|
parsed_snapped = parse_crop_string(snapped_crop)
|
||||||
|
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
|
||||||
|
print(f"{COLOR_GREEN}The final calculated crop matches the source resolution. No crop is needed.{COLOR_RESET}")
|
||||||
|
else:
|
||||||
|
print(f"{COLOR_GREEN}Recommended safe crop filter: -vf {snapped_crop}{COLOR_RESET}")
|
||||||
|
else:
|
||||||
|
print(f"{COLOR_RED}Could not calculate a bounding box. Manual review is required.{COLOR_RESET}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Analyzes a video file to detect black bars and recommend crop values. "
|
||||||
|
"Handles mixed aspect ratios by calculating a safe bounding box.",
|
||||||
|
formatter_class=argparse.RawTextHelpFormatter
|
||||||
|
)
|
||||||
|
parser.add_argument("input", help="Input video file")
|
||||||
|
parser.add_argument("-n", "--num_workers", type=int, default=max(1, multiprocessing.cpu_count() // 2), help="Number of worker threads. Defaults to half of available cores.")
|
||||||
|
parser.add_argument("-sct", "--significant_crop_threshold", type=float, default=5.0, help="Percentage a crop must be present to be considered 'significant'. Default is 5.0.")
|
||||||
|
parser.add_argument("-mc", "--min_crop", type=int, default=10, help="Minimum pixels to crop on any side for it to be considered a 'major' crop. Default is 10.")
|
||||||
|
parser.add_argument("--debug", action="store_true", help="Enable detailed debug logging.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_file = args.input
|
||||||
|
num_workers = args.num_workers
|
||||||
|
significant_crop_threshold = args.significant_crop_threshold
|
||||||
|
min_crop = args.min_crop
|
||||||
|
|
||||||
|
# Validate input file
|
||||||
|
if not os.path.isfile(input_file):
|
||||||
|
print(f"{COLOR_RED}Error: Input file does not exist.{COLOR_RESET}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Always probe the video file for metadata
|
||||||
|
print("--- Probing video file for metadata ---")
|
||||||
|
|
||||||
|
try:
|
||||||
|
probe_duration_args = [
|
||||||
|
'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1',
|
||||||
|
input_file
|
||||||
|
]
|
||||||
|
duration_str = subprocess.check_output(probe_duration_args, stderr=subprocess.STDOUT, text=True)
|
||||||
|
duration = int(float(duration_str))
|
||||||
|
print(f"Detected duration: {duration}s")
|
||||||
|
|
||||||
|
probe_res_args = [
|
||||||
|
'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0',
|
||||||
|
input_file
|
||||||
|
]
|
||||||
|
resolution_str = subprocess.check_output(probe_res_args, stderr=subprocess.STDOUT, text=True)
|
||||||
|
width, height = map(int, resolution_str.strip().split('x'))
|
||||||
|
print(f"Detected resolution: {width}x{height}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{COLOR_RED}Error probing video file: {e}{COLOR_RESET}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"\n--- Video Analysis Parameters ---")
|
||||||
|
print(f"Input File: {os.path.basename(input_file)}")
|
||||||
|
print(f"Duration: {duration}s")
|
||||||
|
print(f"Resolution: {width}x{height}")
|
||||||
|
print(f"Number of Workers: {num_workers}")
|
||||||
|
print(f"Significance Threshold: {significant_crop_threshold}%")
|
||||||
|
print(f"Minimum Crop Size: {min_crop}px")
|
||||||
|
|
||||||
|
# Check for required tools
|
||||||
|
check_prerequisites()
|
||||||
|
|
||||||
|
# Analyze the video
|
||||||
|
analyze_video(input_file, duration, width, height, num_workers, significant_crop_threshold, min_crop, args.debug)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
214
old/scene_cutter_old.py
Normal file
214
old/scene_cutter_old.py
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import subprocess
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def get_video_duration(video_path):
|
||||||
|
"""Gets the duration of a video file in seconds using ffprobe."""
|
||||||
|
command = [
|
||||||
|
'ffprobe',
|
||||||
|
'-v', 'error',
|
||||||
|
'-show_entries', 'format=duration',
|
||||||
|
'-of', 'default=noprint_wrappers=1:nokey=1',
|
||||||
|
video_path
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
result = subprocess.run(command, capture_output=True, text=True, check=True)
|
||||||
|
return float(result.stdout.strip())
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("\nError: 'ffprobe' command not found. Is it installed and in your system's PATH?")
|
||||||
|
sys.exit(1)
|
||||||
|
except (subprocess.CalledProcessError, ValueError) as e:
|
||||||
|
print(f"\nError getting video duration: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def get_best_hwaccel():
|
||||||
|
"""
|
||||||
|
Checks for available FFmpeg hardware acceleration methods and returns the best one.
|
||||||
|
Priority is defined as: cuda > qsv > dxva2.
|
||||||
|
Returns the name of the best available method, or None if none are found.
|
||||||
|
"""
|
||||||
|
priority = ['cuda', 'qsv', 'dxva2']
|
||||||
|
print("Checking for available hardware acceleration...")
|
||||||
|
try:
|
||||||
|
# Run ffmpeg to list available hardware acceleration methods
|
||||||
|
result = subprocess.run(
|
||||||
|
['ffmpeg', '-hide_banner', '-hwaccels'],
|
||||||
|
capture_output=True, text=True, encoding='utf-8', check=False
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
print("Warning: Could not query FFmpeg for hardware acceleration. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse the output to get a list of available methods
|
||||||
|
available_methods = result.stdout.strip().split('\n')
|
||||||
|
if len(available_methods) > 1:
|
||||||
|
available_methods = available_methods[1:]
|
||||||
|
|
||||||
|
# Check for our preferred methods in order of priority
|
||||||
|
for method in priority:
|
||||||
|
if method in available_methods:
|
||||||
|
print(f"Found best available hardware acceleration: {method}")
|
||||||
|
return method
|
||||||
|
|
||||||
|
print("No high-priority hardware acceleration (cuda, qsv, dxva2) found. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
# This will be handled by the main ffmpeg call later, so we just return None.
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: An error occurred while checking for hwaccel: {e}. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def cut_video_into_scenes(video_path, max_segment_length, hwaccel=None):
|
||||||
|
"""
|
||||||
|
Cuts a video into segments based on a .scenes.json file using FFmpeg's segment muxer,
|
||||||
|
ensuring no segment exceeds a maximum length.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_path (str): The path to the input video file.
|
||||||
|
max_segment_length (int): The maximum allowed length for any segment in seconds.
|
||||||
|
hwaccel (str, optional): The hardware acceleration method to use for decoding.
|
||||||
|
"""
|
||||||
|
print(f"\nProcessing video: {os.path.basename(video_path)}")
|
||||||
|
|
||||||
|
# 1. Derive the .scenes.json path and check for its existence
|
||||||
|
base_name = os.path.splitext(video_path)[0]
|
||||||
|
json_path = f"{base_name}.scenes.json"
|
||||||
|
|
||||||
|
if not os.path.isfile(json_path):
|
||||||
|
print(f"\nError: Scene file not found at '{json_path}'")
|
||||||
|
print("Please run scene_detector.py on the video first.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# 2. Load the scene timestamps from the JSON file
|
||||||
|
try:
|
||||||
|
with open(json_path, 'r') as f:
|
||||||
|
scene_timestamps = json.load(f)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"\nError: Could not parse the JSON file at '{json_path}'. It may be corrupted.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not scene_timestamps:
|
||||||
|
print("Warning: The scene file is empty. No segments to cut.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get total video duration to handle the last segment correctly
|
||||||
|
print("Getting video duration...")
|
||||||
|
video_duration = get_video_duration(video_path)
|
||||||
|
print(f"Video duration: {video_duration:.2f} seconds.")
|
||||||
|
|
||||||
|
# 3. Create the output directory
|
||||||
|
output_dir = "cuts"
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
print(f"Output will be saved to the '{output_dir}' directory.")
|
||||||
|
|
||||||
|
if hwaccel:
|
||||||
|
print(f"Using hardware acceleration for decoding: {hwaccel}")
|
||||||
|
|
||||||
|
# 4. Process timestamps to enforce max segment length
|
||||||
|
print(f"Enforcing maximum segment length of {max_segment_length} seconds...")
|
||||||
|
|
||||||
|
# Define all segment boundaries, starting at 0 and ending at the video's duration
|
||||||
|
segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration]
|
||||||
|
|
||||||
|
final_cut_points = []
|
||||||
|
for i in range(len(segment_boundaries) - 1):
|
||||||
|
start = segment_boundaries[i]
|
||||||
|
end = segment_boundaries[i+1]
|
||||||
|
|
||||||
|
current_time = start
|
||||||
|
while (end - current_time) > max_segment_length:
|
||||||
|
current_time += max_segment_length
|
||||||
|
final_cut_points.append(current_time)
|
||||||
|
|
||||||
|
final_cut_points.append(end)
|
||||||
|
|
||||||
|
# Clean up the final list: sort, remove duplicates, and remove the final cut at the very end
|
||||||
|
final_cut_points = sorted(list(set(final_cut_points)))
|
||||||
|
if final_cut_points and final_cut_points[-1] >= video_duration - 0.1: # Use a small tolerance
|
||||||
|
final_cut_points.pop()
|
||||||
|
|
||||||
|
print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.")
|
||||||
|
|
||||||
|
# 5. Prepare arguments for the segment muxer
|
||||||
|
segment_times_str = ",".join(map(str, final_cut_points))
|
||||||
|
video_basename = os.path.basename(base_name)
|
||||||
|
output_pattern = os.path.join(output_dir, f"{video_basename}_segment%03d.mkv")
|
||||||
|
|
||||||
|
# 6. Build the single FFmpeg command
|
||||||
|
command = ['ffmpeg', '-hide_banner']
|
||||||
|
if hwaccel:
|
||||||
|
command.extend(['-hwaccel', hwaccel])
|
||||||
|
|
||||||
|
command.extend([
|
||||||
|
'-i', video_path,
|
||||||
|
'-c:v', 'utvideo',
|
||||||
|
'-an', '-sn', '-dn',
|
||||||
|
'-map_metadata', '-1',
|
||||||
|
'-map_chapters', '-1',
|
||||||
|
'-f', 'segment',
|
||||||
|
'-segment_times', segment_times_str,
|
||||||
|
'-segment_start_number', '1',
|
||||||
|
'-reset_timestamps', '1',
|
||||||
|
output_pattern
|
||||||
|
])
|
||||||
|
|
||||||
|
print("\nStarting FFmpeg to cut all segments in a single pass...")
|
||||||
|
try:
|
||||||
|
# Use Popen to show FFmpeg's progress in real-time
|
||||||
|
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
|
||||||
|
|
||||||
|
for line in iter(process.stderr.readline, ''):
|
||||||
|
if line.strip().startswith('frame='):
|
||||||
|
sys.stderr.write(f'\r{line.strip()}')
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
process.wait()
|
||||||
|
sys.stderr.write('\n')
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
if process.returncode != 0:
|
||||||
|
print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).")
|
||||||
|
else:
|
||||||
|
total_segments = len(final_cut_points) + 1
|
||||||
|
print(f"\nSuccessfully created {total_segments} segments in the '{output_dir}' directory.")
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("\nError: 'ffmpeg' command not found. Is it installed and in your system's PATH?")
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\nAn error occurred during cutting: {e}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Main function to handle command-line arguments.
|
||||||
|
"""
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Automatically detects the best hardware decoder and cuts a video into scene-based segments using a corresponding '.scenes.json' file."
|
||||||
|
)
|
||||||
|
parser.add_argument("video_path", help="Path to the video file.")
|
||||||
|
parser.add_argument(
|
||||||
|
'--segment-length',
|
||||||
|
type=int,
|
||||||
|
default=10,
|
||||||
|
help='Maximum length of a segment in seconds. Default: 10'
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not os.path.isfile(args.video_path):
|
||||||
|
print(f"Error: Input file not found: '{args.video_path}'")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Automatically determine the best hardware acceleration method
|
||||||
|
hwaccel_method = get_best_hwaccel()
|
||||||
|
|
||||||
|
cut_video_into_scenes(args.video_path, args.segment_length, hwaccel_method)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
147
old/scene_detector_old.py
Normal file
147
old/scene_detector_old.py
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import subprocess
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def get_best_hwaccel():
|
||||||
|
"""
|
||||||
|
Checks for available FFmpeg hardware acceleration methods and returns the best one.
|
||||||
|
Priority is defined as: cuda > qsv > dxva2.
|
||||||
|
Returns the name of the best available method, or None if none are found.
|
||||||
|
"""
|
||||||
|
priority = ['cuda', 'qsv', 'dxva2']
|
||||||
|
print("Checking for available hardware acceleration...")
|
||||||
|
try:
|
||||||
|
# Run ffmpeg to list available hardware acceleration methods
|
||||||
|
result = subprocess.run(
|
||||||
|
['ffmpeg', '-hide_banner', '-hwaccels'],
|
||||||
|
capture_output=True, text=True, encoding='utf-8', check=False
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
print("Warning: Could not query FFmpeg for hardware acceleration. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse the output to get a list of available methods
|
||||||
|
available_methods = result.stdout.strip().split('\n')
|
||||||
|
if len(available_methods) > 1:
|
||||||
|
available_methods = available_methods[1:]
|
||||||
|
|
||||||
|
# Check for our preferred methods in order of priority
|
||||||
|
for method in priority:
|
||||||
|
if method in available_methods:
|
||||||
|
print(f"Found best available hardware acceleration: {method}")
|
||||||
|
return method
|
||||||
|
|
||||||
|
print("No high-priority hardware acceleration (cuda, qsv, dxva2) found. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
# This will be handled by the main ffmpeg call later, so we just return None.
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: An error occurred while checking for hwaccel: {e}. Using software decoding.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def detect_scenes(video_path, hwaccel=None, threshold=0.4):
|
||||||
|
"""
|
||||||
|
Uses FFmpeg to detect scene changes in a video and saves the timestamps to a JSON file.
|
||||||
|
"""
|
||||||
|
print(f"Starting scene detection for: {os.path.basename(video_path)}")
|
||||||
|
|
||||||
|
# Define the output JSON file path based on the input video name
|
||||||
|
base_name = os.path.splitext(video_path)[0]
|
||||||
|
json_output_path = f"{base_name}.scenes.json"
|
||||||
|
|
||||||
|
# Base FFmpeg command
|
||||||
|
command = ['ffmpeg', '-hide_banner']
|
||||||
|
|
||||||
|
# If hardware acceleration is specified, add the relevant flags
|
||||||
|
if hwaccel:
|
||||||
|
print(f"Attempting to use hardware acceleration: {hwaccel}")
|
||||||
|
command.extend(['-hwaccel', hwaccel])
|
||||||
|
|
||||||
|
# Add the rest of the command arguments
|
||||||
|
# The filter string is now built dynamically with the specified threshold
|
||||||
|
filter_string = f"select='gt(scene,{threshold})',showinfo"
|
||||||
|
command.extend([
|
||||||
|
'-i', video_path,
|
||||||
|
'-vf', filter_string,
|
||||||
|
'-f', 'null',
|
||||||
|
'-'
|
||||||
|
])
|
||||||
|
|
||||||
|
# Execute the FFmpeg command
|
||||||
|
try:
|
||||||
|
# Use Popen to start the process and read its output in real-time
|
||||||
|
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
|
||||||
|
|
||||||
|
scene_timestamps = []
|
||||||
|
|
||||||
|
# Read stderr line by line as it's produced
|
||||||
|
for line in iter(process.stderr.readline, ''):
|
||||||
|
# Check for scene detection output (which contains pts_time)
|
||||||
|
if 'pts_time:' in line:
|
||||||
|
timestamp_str = line.split('pts_time:')[1].split()[0]
|
||||||
|
scene_timestamps.append(float(timestamp_str))
|
||||||
|
# Check for ffmpeg's progress line (which starts with 'frame=')
|
||||||
|
# and print it to the actual stderr to show progress
|
||||||
|
elif line.strip().startswith('frame='):
|
||||||
|
# Use sys.stderr to avoid interfering with stdout prints
|
||||||
|
# Use '\r' to have the line overwrite itself, mimicking ffmpeg's behavior
|
||||||
|
sys.stderr.write(f'\r{line.strip()}')
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
# Wait for the process to finish and get the return code
|
||||||
|
process.wait()
|
||||||
|
|
||||||
|
# Print a newline to move past the progress bar
|
||||||
|
sys.stderr.write('\n')
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
if process.returncode != 0:
|
||||||
|
print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).")
|
||||||
|
|
||||||
|
if not scene_timestamps:
|
||||||
|
print("Warning: No scenes were detected. The output file will be empty.")
|
||||||
|
|
||||||
|
# Save the detected scenes to a JSON file
|
||||||
|
with open(json_output_path, 'w') as json_file:
|
||||||
|
json.dump(scene_timestamps, json_file, indent=4)
|
||||||
|
|
||||||
|
print(f"Scene detection completed. {len(scene_timestamps)} scenes found.")
|
||||||
|
print(f"Results saved to: {json_output_path}")
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("Error: 'ffmpeg' command not found. Is it installed and in your system's PATH?")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Main function to handle command-line arguments.
|
||||||
|
"""
|
||||||
|
parser = argparse.ArgumentParser(description="Detect scene changes in a video using FFmpeg.")
|
||||||
|
parser.add_argument("video_path", help="Path to the video file.")
|
||||||
|
parser.add_argument(
|
||||||
|
"-t", "--threshold",
|
||||||
|
type=float,
|
||||||
|
default=0.4,
|
||||||
|
help="Set the scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.4"
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not os.path.isfile(args.video_path):
|
||||||
|
print(f"Error: Input file not found: '{args.video_path}'")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Automatically determine the best hardware acceleration method
|
||||||
|
hwaccel_method = get_best_hwaccel()
|
||||||
|
|
||||||
|
detect_scenes(args.video_path, hwaccel_method, args.threshold)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user