#!/usr/bin/env python3 import subprocess import json import os import sys import argparse import re from collections import Counter import multiprocessing import shutil # --- Utility Functions (from previous scripts) --- def get_best_hwaccel(): """ Checks for available FFmpeg hardware acceleration methods and returns the best one based on the current operating system. """ # Determine the priority list based on the operating system if sys.platform == "win32": # Windows: CUDA (Nvidia) > QSV (Intel) > D3D11VA (Modern, AMD/All) > DXVA2 (Legacy) priority = ['cuda', 'qsv', 'd3d11va', 'dxva2'] elif sys.platform == "linux": # Linux: CUDA (Nvidia) > VAAPI (Intel/AMD) priority = ['cuda', 'vaapi'] elif sys.platform == "darwin": # macOS: VideoToolbox is the native framework for Apple Silicon priority = ['videotoolbox'] else: # Fallback for other operating systems (e.g., BSD) priority = [] if not priority: print(f"No hardware acceleration priority list for this OS ({sys.platform}). Using software.") return None print(f"Checking for available hardware acceleration on {sys.platform}...") try: result = subprocess.run( ['ffmpeg', '-hide_banner', '-hwaccels'], capture_output=True, text=True, encoding='utf-8', check=False ) if result.returncode != 0: print("Warning: Could not query FFmpeg. Using software decoding.") return None available_methods = result.stdout.strip().split('\n') if len(available_methods) > 1: available_methods = available_methods[1:] for method in priority: if method in available_methods: print(f"Found best available hardware acceleration: {method}") return method print("No high-priority hardware acceleration found. Using software decoding.") return None except (FileNotFoundError, Exception): return None def get_video_duration(video_path): """Gets the duration of a video file in seconds using ffprobe.""" command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path] try: result = subprocess.run(command, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except (FileNotFoundError, subprocess.CalledProcessError, ValueError) as e: print(f"\nError getting video duration: {e}") return None def get_video_resolution(video_path): """Gets the resolution (width, height) of a video file using ffprobe's JSON output for robustness.""" command = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', video_path ] try: result = subprocess.run(command, capture_output=True, text=True, check=True, encoding='utf-8') data = json.loads(result.stdout) for stream in data.get('streams', []): if stream.get('codec_type') == 'video' and 'width' in stream and 'height' in stream: return int(stream['width']), int(stream['height']) # If no video stream with resolution is found raise ValueError("Could not find video stream with resolution in ffprobe output.") except (FileNotFoundError, subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e: print(f"\nError getting video resolution: {e}") return None, None # --- Core Logic Functions (Ported 1:1 from cropdetect.py) --- KNOWN_ASPECT_RATIOS = [ {"name": "HDTV (16:9)", "ratio": 16/9}, {"name": "Widescreen (Scope)", "ratio": 2.39}, {"name": "Widescreen (Flat)", "ratio": 1.85}, {"name": "IMAX Digital (1.90:1)", "ratio": 1.90}, {"name": "Fullscreen (4:3)", "ratio": 4/3}, {"name": "IMAX 70mm (1.43:1)", "ratio": 1.43}, ] def parse_crop_string(crop_str): """Parses a 'crop=w:h:x:y' string into a dictionary of integers.""" try: _, values = crop_str.split('=') w, h, x, y = map(int, values.split(':')) return {'w': w, 'h': h, 'x': x, 'y': y} except (ValueError, IndexError): return None def snap_to_known_ar(w, h, x, y, video_w, video_h, tolerance=0.03): """Snaps a crop rectangle to the nearest standard aspect ratio if it's close enough.""" if h == 0: return f"crop={w}:{h}:{x}:{y}", None detected_ratio = w / h best_match = None smallest_diff = float('inf') for ar in KNOWN_ASPECT_RATIOS: diff = abs(detected_ratio - ar['ratio']) if diff < smallest_diff: smallest_diff = diff best_match = ar if not best_match or (smallest_diff / best_match['ratio']) >= tolerance: return f"crop={w}:{h}:{x}:{y}", None # Heuristic: if width is close to full video width, it's letterboxed. if abs(w - video_w) < 16: new_h = round(video_w / best_match['ratio']) # Round height up to the nearest multiple of 8 for cleaner dimensions. if new_h % 8 != 0: new_h = new_h + (8 - (new_h % 8)) new_y = round((video_h - new_h) / 2) if new_y % 2 != 0: new_y -= 1 # Ensure y offset is even return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name'] # Heuristic: if height is close to full video height, it's pillarboxed. if abs(h - video_h) < 16: new_w = round(video_h * best_match['ratio']) # Round width up to the nearest multiple of 8. if new_w % 8 != 0: new_w = new_w + (8 - (new_w % 8)) new_x = round((video_w - new_w) / 2) if new_x % 2 != 0: new_x -= 1 # Ensure x offset is even return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name'] return f"crop={w}:{h}:{x}:{y}", None def cluster_crop_values(crop_counts, tolerance=8): """Groups similar crop values into clusters based on the top-left corner.""" clusters = [] temp_counts = crop_counts.copy() while temp_counts: center_str, _ = temp_counts.most_common(1)[0] parsed_center = parse_crop_string(center_str) if not parsed_center: del temp_counts[center_str]; continue cx, cy = parsed_center['x'], parsed_center['y'] cluster_total_count = 0 crops_to_remove = [] for crop_str, count in temp_counts.items(): parsed_crop = parse_crop_string(crop_str) if parsed_crop and abs(parsed_crop['x'] - cx) <= tolerance and abs(parsed_crop['y'] - cy) <= tolerance: cluster_total_count += count crops_to_remove.append(crop_str) if cluster_total_count > 0: clusters.append({'center': center_str, 'count': cluster_total_count}) for crop_str in crops_to_remove: del temp_counts[crop_str] return sorted(clusters, key=lambda c: c['count'], reverse=True) def calculate_bounding_box(crop_keys): """Calculates a bounding box that contains all given crop rectangles.""" min_x, max_x = float('inf'), float('-inf') min_y, max_y = float('inf'), float('-inf') for key in crop_keys: parsed = parse_crop_string(key) if parsed: x, y, w, h = parsed['x'], parsed['y'], parsed['w'], parsed['h'] min_x = min(min_x, x) min_y = min(min_y, y) max_x = max(max_x, x + w) max_y = max(max_y, y + h) final_w, final_h = (max_x - min_x), (max_y - min_y) if final_w % 2 != 0: final_w -= 1 if final_h % 2 != 0: final_h -= 1 return f"crop={final_w}:{final_h}:{min_x}:{min_y}" def analyze_segment_for_crop(task_args): """Worker process to analyze one video segment for crop values.""" seek_time, input_file = task_args ffmpeg_args = ['ffmpeg', '-hide_banner', '-ss', str(seek_time), '-i', input_file, '-t', '1', '-vf', 'cropdetect', '-f', 'null', '-'] result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8') return re.findall(r'crop=\d+:\d+:\d+:\d+', result.stderr) def detect_crop(video_path, hwaccel=None): """ Detects black bars using the full, robust logic from cropdetect.py, including multiprocess analysis, clustering, and aspect ratio snapping. """ print("\nStarting robust crop detection (1:1 logic from cropdetect.py)...") # --- Parameters from original script --- significant_crop_threshold = 5.0 num_workers = max(1, multiprocessing.cpu_count() // 2) # --- Probing --- duration = get_video_duration(video_path) width, height = get_video_resolution(video_path) if not all([duration, width, height]): print("Could not get video metadata. Aborting crop detection.") return None # --- Analysis --- num_tasks = num_workers * 4 segment_duration = max(1, duration // num_tasks) tasks = [(i * segment_duration, video_path) for i in range(num_tasks)] print(f"Analyzing {len(tasks)} segments across {num_workers} worker(s)...") all_crops = [] with multiprocessing.Pool(processes=num_workers) as pool: for i, result in enumerate(pool.imap_unordered(analyze_segment_for_crop, tasks), 1): all_crops.extend(result) sys.stdout.write(f"\rAnalyzing Segments: {i}/{len(tasks)} completed...") sys.stdout.flush() print("\nAnalysis complete.") if not all_crops: print("No black bars detected.") return None # --- Decision Logic --- crop_counts = Counter(all_crops) clusters = cluster_crop_values(crop_counts) total_detections = sum(c['count'] for c in clusters) if total_detections == 0: print("No valid crop detections found.") return None significant_clusters = [c for c in clusters if (c['count'] / total_detections * 100) >= significant_crop_threshold] final_crop = None ar_label = None if not significant_clusters: print(f"No single crop value meets the {significant_crop_threshold}% significance threshold. No crop will be applied.") return None elif len(significant_clusters) == 1: print("A single dominant aspect ratio was found.") final_crop = significant_clusters[0]['center'] else: # Mixed AR print("Mixed aspect ratios detected. Calculating a safe 'master' crop.") crop_keys = [c['center'] for c in significant_clusters] final_crop = calculate_bounding_box(crop_keys) # --- Snapping --- parsed = parse_crop_string(final_crop) if not parsed: return None snapped_crop, ar_label = snap_to_known_ar(parsed['w'], parsed['h'], parsed['x'], parsed['y'], width, height) if ar_label: print(f"The detected crop snaps to the '{ar_label}' aspect ratio.") # --- Final Check --- parsed_snapped = parse_crop_string(snapped_crop) if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height: print("Final crop matches source resolution. No cropping needed.") return None print(f"Robust crop detection finished. Recommended filter: {snapped_crop}") return snapped_crop def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.23, crop_filter=None): """Uses FFmpeg to detect scene changes and saves timestamps to a JSON file.""" print(f"\nStarting scene detection for: {os.path.basename(video_path)}") # NOTE: Hardware acceleration is intentionally disabled for scene detection. # The scenedetect filter can be unreliable with hwaccel contexts as it # operates on CPU frames. The performance gain is negligible for this step. command = ['ffmpeg', '-hide_banner'] filters = [] if crop_filter: print(f"Applying crop filter during scene detection: {crop_filter}") filters.append(crop_filter) filters.append(f"select='gt(scene,{threshold})',showinfo") filter_string = ",".join(filters) # Add -map 0:v:0 to explicitly select the first video stream, ignoring cover art. command.extend(['-i', video_path, '-map', '0:v:0', '-vf', filter_string, '-f', 'null', '-']) try: process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8') scene_timestamps = [] for line in iter(process.stderr.readline, ''): if 'pts_time:' in line: timestamp_str = line.split('pts_time:')[1].split()[0] scene_timestamps.append(float(timestamp_str)) elif line.strip().startswith('frame='): sys.stderr.write(f'\r{line.strip()}') sys.stderr.flush() process.wait() sys.stderr.write('\n') sys.stderr.flush() if process.returncode != 0: print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).") with open(json_output_path, 'w') as json_file: json.dump(scene_timestamps, json_file, indent=4) print(f"Scene detection completed. {len(scene_timestamps)} scenes found.") return True except (FileNotFoundError, Exception) as e: print(f"\nAn error occurred during scene detection: {e}") return False def cut_video_into_scenes(video_path, json_path, max_segment_length, hwaccel=None, crop_filter=None): """Cuts a video into segments, ensuring no segment exceeds a maximum length.""" print(f"\nStarting segment cutting for: {os.path.basename(video_path)}") try: with open(json_path, 'r') as f: scene_timestamps = json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"\nError reading scene file '{json_path}': {e}") return video_duration = get_video_duration(video_path) if video_duration is None: return output_dir = "cuts" os.makedirs(output_dir, exist_ok=True) print(f"Output will be saved to the '{output_dir}' directory.") print(f"Enforcing maximum segment length of {max_segment_length} seconds...") segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration] final_cut_points = [] for i in range(len(segment_boundaries) - 1): start, end = segment_boundaries[i], segment_boundaries[i+1] current_time = start while (end - current_time) > max_segment_length: current_time += max_segment_length final_cut_points.append(current_time) final_cut_points.append(end) final_cut_points = sorted(list(set(final_cut_points))) if final_cut_points and final_cut_points[-1] >= video_duration - 0.1: final_cut_points.pop() print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.") segment_times_str = ",".join(map(str, final_cut_points)) base_name = os.path.splitext(os.path.basename(video_path))[0] output_pattern = os.path.join(output_dir, f"{base_name}_segment%03d.mkv") # Add -loglevel error to hide info messages and -stats to show progress command = ['ffmpeg', '-hide_banner', '-loglevel', 'error', '-stats'] if hwaccel: command.extend(['-hwaccel', hwaccel]) command.extend(['-i', video_path]) if crop_filter: print(f"Applying crop filter during cutting: {crop_filter}") command.extend(['-vf', crop_filter]) # Add -map 0:v:0 to explicitly select the first video stream for cutting. # Combine with -an/-sn to ensure no other streams are processed. command.extend(['-map', '0:v:0', '-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern]) print("\nStarting FFmpeg to cut all segments in a single pass...") try: # -stats will print progress to stderr, which subprocess.run will display subprocess.run(command, check=True) total_segments = len(final_cut_points) + 1 print(f"Successfully created {total_segments} segments in the '{output_dir}' directory.") except (FileNotFoundError, subprocess.CalledProcessError, Exception) as e: print(f"\nAn error occurred during cutting: {e}") # --- Main Orchestrator --- def main(): parser = argparse.ArgumentParser( description="A comprehensive video processing script to detect scenes and cut segments.", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument("video_path", help="Path to the input video file.") parser.add_argument( "--autocrop", action='store_true', help="Automatically detect and apply cropping to remove black bars. Default: False" ) parser.add_argument( "--so", "--sceneonly", action='store_true', dest='sceneonly', # Explicitly set the destination attribute name help="Only run scene detection and create the .scenes.json file." ) parser.add_argument( "--segtime", type=int, default=10, help="Maximum length of any cut segment in seconds. Default: 10" ) parser.add_argument( "-t", "--threshold", type=float, default=0.23, help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.23" ) args = parser.parse_args() if not os.path.isfile(args.video_path): print(f"Error: Input file not found: '{args.video_path}'") sys.exit(1) base_name = os.path.splitext(args.video_path)[0] json_path = f"{base_name}.scenes.json" hwaccel_method = get_best_hwaccel() crop_filter = None if args.autocrop: crop_filter = detect_crop(args.video_path, hwaccel_method) if args.sceneonly: detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold, crop_filter) sys.exit(0) # --- Full Workflow (Detect if needed, then Cut) --- if not os.path.isfile(json_path): print("--- Scene file not found, running detection first ---") if not detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold, crop_filter): print("\nScene detection failed. Aborting process.") sys.exit(1) else: print(f"--- Found existing scene file: {os.path.basename(json_path)} ---") cut_video_into_scenes(args.video_path, json_path, args.segtime, hwaccel_method, crop_filter) if __name__ == "__main__": main()