#!/usr/bin/env python3 import subprocess import json import os import sys import argparse # --- Utility Functions (from previous scripts) --- def get_best_hwaccel(): """ Checks for available FFmpeg hardware acceleration methods and returns the best one based on the current operating system. """ # Determine the priority list based on the operating system if sys.platform == "win32": # Windows: CUDA (Nvidia) > QSV (Intel) > D3D11VA (Modern, AMD/All) > DXVA2 (Legacy) priority = ['cuda', 'qsv', 'd3d11va', 'dxva2'] elif sys.platform == "linux": # Linux: CUDA (Nvidia) > VAAPI (Intel/AMD) priority = ['cuda', 'vaapi'] elif sys.platform == "darwin": # macOS: VideoToolbox is the native framework for Apple Silicon priority = ['videotoolbox'] else: # Fallback for other operating systems (e.g., BSD) priority = [] if not priority: print(f"No hardware acceleration priority list for this OS ({sys.platform}). Using software.") return None print(f"Checking for available hardware acceleration on {sys.platform}...") try: result = subprocess.run( ['ffmpeg', '-hide_banner', '-hwaccels'], capture_output=True, text=True, encoding='utf-8', check=False ) if result.returncode != 0: print("Warning: Could not query FFmpeg. Using software decoding.") return None available_methods = result.stdout.strip().split('\n') if len(available_methods) > 1: available_methods = available_methods[1:] for method in priority: if method in available_methods: print(f"Found best available hardware acceleration: {method}") return method print("No high-priority hardware acceleration found. Using software decoding.") return None except (FileNotFoundError, Exception): return None def get_video_duration(video_path): """Gets the duration of a video file in seconds using ffprobe.""" command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path] try: result = subprocess.run(command, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except (FileNotFoundError, subprocess.CalledProcessError, ValueError) as e: print(f"\nError getting video duration: {e}") return None # --- Core Logic Functions --- def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.4): """Uses FFmpeg to detect scene changes and saves timestamps to a JSON file.""" print(f"\nStarting scene detection for: {os.path.basename(video_path)}") command = ['ffmpeg', '-hide_banner'] if hwaccel: print(f"Attempting to use hardware acceleration: {hwaccel}") command.extend(['-hwaccel', hwaccel]) filter_string = f"select='gt(scene,{threshold})',showinfo" command.extend(['-i', video_path, '-vf', filter_string, '-f', 'null', '-']) try: process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8') scene_timestamps = [] for line in iter(process.stderr.readline, ''): if 'pts_time:' in line: timestamp_str = line.split('pts_time:')[1].split()[0] scene_timestamps.append(float(timestamp_str)) elif line.strip().startswith('frame='): sys.stderr.write(f'\r{line.strip()}') sys.stderr.flush() process.wait() sys.stderr.write('\n') sys.stderr.flush() if process.returncode != 0: print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).") with open(json_output_path, 'w') as json_file: json.dump(scene_timestamps, json_file, indent=4) print(f"Scene detection completed. {len(scene_timestamps)} scenes found.") return True except (FileNotFoundError, Exception) as e: print(f"\nAn error occurred during scene detection: {e}") return False def cut_video_into_scenes(video_path, json_path, max_segment_length, hwaccel=None): """Cuts a video into segments, ensuring no segment exceeds a maximum length.""" print(f"\nStarting segment cutting for: {os.path.basename(video_path)}") try: with open(json_path, 'r') as f: scene_timestamps = json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"\nError reading scene file '{json_path}': {e}") return video_duration = get_video_duration(video_path) if video_duration is None: return output_dir = "cuts" os.makedirs(output_dir, exist_ok=True) print(f"Output will be saved to the '{output_dir}' directory.") print(f"Enforcing maximum segment length of {max_segment_length} seconds...") segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration] final_cut_points = [] for i in range(len(segment_boundaries) - 1): start, end = segment_boundaries[i], segment_boundaries[i+1] current_time = start while (end - current_time) > max_segment_length: current_time += max_segment_length final_cut_points.append(current_time) final_cut_points.append(end) final_cut_points = sorted(list(set(final_cut_points))) if final_cut_points and final_cut_points[-1] >= video_duration - 0.1: final_cut_points.pop() print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.") segment_times_str = ",".join(map(str, final_cut_points)) base_name = os.path.splitext(os.path.basename(video_path))[0] output_pattern = os.path.join(output_dir, f"{base_name}_segment%03d.mkv") # Add -loglevel error to hide info messages and -stats to show progress command = ['ffmpeg', '-hide_banner', '-loglevel', 'error', '-stats'] if hwaccel: command.extend(['-hwaccel', hwaccel]) command.extend(['-i', video_path, '-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern]) print("\nStarting FFmpeg to cut all segments in a single pass...") try: # -stats will print progress to stderr, which subprocess.run will display subprocess.run(command, check=True) total_segments = len(final_cut_points) + 1 print(f"Successfully created {total_segments} segments in the '{output_dir}' directory.") except (FileNotFoundError, subprocess.CalledProcessError, Exception) as e: print(f"\nAn error occurred during cutting: {e}") # --- Main Orchestrator --- def main(): parser = argparse.ArgumentParser( description="A comprehensive video processing script to detect scenes and cut segments.", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument("video_path", help="Path to the input video file.") parser.add_argument( "--so", "--sceneonly", action='store_true', dest='sceneonly', # Explicitly set the destination attribute name help="Only run scene detection and create the .scenes.json file." ) parser.add_argument( "--segtime", type=int, default=10, help="Maximum length of any cut segment in seconds. Default: 10" ) parser.add_argument( "-t", "--threshold", type=float, default=0.4, help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.4" ) args = parser.parse_args() if not os.path.isfile(args.video_path): print(f"Error: Input file not found: '{args.video_path}'") sys.exit(1) base_name = os.path.splitext(args.video_path)[0] json_path = f"{base_name}.scenes.json" hwaccel_method = get_best_hwaccel() if args.sceneonly: detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold) sys.exit(0) # --- Full Workflow (Detect if needed, then Cut) --- if not os.path.isfile(json_path): print("--- Scene file not found, running detection first ---") if not detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold): print("\nScene detection failed. Aborting process.") sys.exit(1) else: print(f"--- Found existing scene file: {os.path.basename(json_path)} ---") cut_video_into_scenes(args.video_path, json_path, args.segtime, hwaccel_method) if __name__ == "__main__": main()