From 3a97ebbb5ee2a1a1a60ee21e21b693af0b738604 Mon Sep 17 00:00:00 2001 From: pat-e Date: Sun, 20 Jul 2025 09:02:38 +0200 Subject: [PATCH] added additional files --- cropdetect.py | 432 ++++++++++++++++++++++++++++++++++++++ old/scene_cutter_old.py | 214 +++++++++++++++++++ old/scene_detector_old.py | 147 +++++++++++++ 3 files changed, 793 insertions(+) create mode 100644 cropdetect.py create mode 100644 old/scene_cutter_old.py create mode 100644 old/scene_detector_old.py diff --git a/cropdetect.py b/cropdetect.py new file mode 100644 index 0000000..0f691d1 --- /dev/null +++ b/cropdetect.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python3 + +import argparse +import subprocess +import sys +import os +import re +from collections import Counter +import shutil +import multiprocessing +import json + +# ANSI color codes +COLOR_GREEN = "\033[92m" +COLOR_RED = "\033[91m" +COLOR_YELLOW = "\033[93m" +COLOR_RESET = "\033[0m" + +def check_prerequisites(): + """Checks if required tools are available.""" + print("--- Prerequisite Check ---") + all_found = True + for tool in ['ffmpeg', 'ffprobe']: + if not shutil.which(tool): + print(f"Error: '{tool}' command not found. Is it installed and in your PATH?") + all_found = False + if not all_found: + sys.exit(1) + print("All required tools found.") + +def analyze_segment(task_args): + """Function to be run by each worker process. Analyzes one video segment.""" + seek_time, input_file, width, height = task_args + + ffmpeg_args = [ + 'ffmpeg', '-hide_banner', + '-ss', str(seek_time), + '-i', input_file, '-t', '1', '-vf', 'cropdetect', + '-f', 'null', '-' + ] + + result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8') + + if result.returncode != 0: + return [] # Return empty list on error + + crop_detections = re.findall(r'crop=(\d+):(\d+):(\d+):(\d+)', result.stderr) + + significant_crops = [] + for w_str, h_str, x_str, y_str in crop_detections: + w, h, x, y = map(int, [w_str, h_str, x_str, y_str]) + + # Return the crop string along with the timestamp it was found at + significant_crops.append((f"crop={w}:{h}:{x}:{y}", seek_time)) + + return significant_crops + +def get_frame_luma(input_file, seek_time): + """Analyzes a single frame at a given timestamp to get its average luma.""" + ffmpeg_args = [ + 'ffmpeg', '-hide_banner', + '-ss', str(seek_time), + '-i', input_file, + '-t', '1', + '-vf', 'signalstats', + '-f', 'null', '-' + ] + result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8') + + if result.returncode != 0: + return None # Error during analysis + + # Find the average luma (YAVG) for the frame + match = re.search(r'YAVG:([0-9.]+)', result.stderr) + if match: + return float(match.group(1)) + + return None + +def check_luma_for_group(task_args): + """Worker function to check the luma for a single group.""" + group_key, sample_ts, input_file, luma_threshold = task_args + luma = get_frame_luma(input_file, sample_ts) + is_bright = luma is not None and luma >= luma_threshold + return (group_key, is_bright) + +KNOWN_ASPECT_RATIOS = [ + {"name": "HDTV (16:9)", "ratio": 16/9}, + {"name": "Widescreen (Scope)", "ratio": 2.39}, + {"name": "Widescreen (Flat)", "ratio": 1.85}, + {"name": "IMAX Digital (1.90:1)", "ratio": 1.90}, + {"name": "Fullscreen (4:3)", "ratio": 4/3}, + {"name": "IMAX 70mm (1.43:1)", "ratio": 1.43}, +] + +def snap_to_known_ar(w, h, x, y, video_w, video_h, tolerance=0.03): + """Snaps a crop rectangle to the nearest standard aspect ratio if it's close enough.""" + if h == 0: return f"crop={w}:{h}:{x}:{y}", None + detected_ratio = w / h + + best_match = None + smallest_diff = float('inf') + + for ar in KNOWN_ASPECT_RATIOS: + diff = abs(detected_ratio - ar['ratio']) + if diff < smallest_diff: + smallest_diff = diff + best_match = ar + + # If the best match is not within the tolerance, return the original + if not best_match or (smallest_diff / best_match['ratio']) >= tolerance: + return f"crop={w}:{h}:{x}:{y}", None + + # Match found, now snap the dimensions. + # Heuristic: if width is close to full video width, it's letterboxed. + if abs(w - video_w) < 16: + new_h = round(video_w / best_match['ratio']) + + # Round height up to the nearest multiple of 8 for cleaner dimensions and less aggressive cropping. + if new_h % 8 != 0: + new_h = new_h + (8 - (new_h % 8)) + + new_y = round((video_h - new_h) / 2) + # Ensure y offset is an even number for compatibility. + if new_y % 2 != 0: + new_y -= 1 + + return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name'] + + # Heuristic: if height is close to full video height, it's pillarboxed. + if abs(h - video_h) < 16: + new_w = round(video_h * best_match['ratio']) + + # Round width up to the nearest multiple of 8. + if new_w % 8 != 0: + new_w = new_w + (8 - (new_w % 8)) + + new_x = round((video_w - new_w) / 2) + # Ensure x offset is an even number. + if new_x % 2 != 0: + new_x -= 1 + + return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name'] + + # If not clearly letterboxed or pillarboxed, don't snap. + return f"crop={w}:{h}:{x}:{y}", None + +def cluster_crop_values(crop_counts, tolerance=8): + """Groups similar crop values into clusters based on the top-left corner.""" + clusters = [] + temp_counts = crop_counts.copy() + + while temp_counts: + # Get the most frequent remaining crop as the new cluster center + center_str, _ = temp_counts.most_common(1)[0] + + try: + _, values = center_str.split('=') + cw, ch, cx, cy = map(int, values.split(':')) + except (ValueError, IndexError): + del temp_counts[center_str] # Skip malformed strings + continue + + cluster_total_count = 0 + crops_to_remove = [] + + # Find all crops "close" to the center + for crop_str, count in temp_counts.items(): + try: + _, values = crop_str.split('=') + w, h, x, y = map(int, values.split(':')) + if abs(x - cx) <= tolerance and abs(y - cy) <= tolerance: + cluster_total_count += count + crops_to_remove.append(crop_str) + except (ValueError, IndexError): + continue + + if cluster_total_count > 0: + clusters.append({'center': center_str, 'count': cluster_total_count}) + + # Remove the clustered crops from the temporary counter + for crop_str in crops_to_remove: + del temp_counts[crop_str] + + clusters.sort(key=lambda c: c['count'], reverse=True) + return clusters + +def parse_crop_string(crop_str): + """Parses a 'crop=w:h:x:y' string into a dictionary of integers.""" + try: + _, values = crop_str.split('=') + w, h, x, y = map(int, values.split(':')) + return {'w': w, 'h': h, 'x': x, 'y': y} + except (ValueError, IndexError): + return None + +def calculate_bounding_box(crop_keys): + """Calculates a bounding box that contains all given crop rectangles.""" + min_x = min_w = min_y = min_h = float('inf') + max_x = max_w = max_y = max_h = float('-inf') + + for key in crop_keys: + parsed = parse_crop_string(key) + if not parsed: + continue + + w, h, x, y = parsed['w'], parsed['h'], parsed['x'], parsed['y'] + + min_x = min(min_x, x) + min_y = min(min_y, y) + max_x = max(max_x, x + w) + max_y = max(max_y, y + h) + + min_w = min(min_w, w) + min_h = min(min_h, h) + max_w = max(max_w, w) + max_h = max(max_h, h) + + # Heuristic: if the bounding box is very close to the min/max, it means all crops were similar + if (max_x - min_x) <= 2 and (max_y - min_y) <= 2: + return None # Too uniform, don't create a bounding box + + # Create a crop that spans the entire bounding box + bounding_crop = f"crop={max_x - min_x}:{max_y - min_y}:{min_x}:{min_y}" + + return bounding_crop + +def is_major_crop(crop_str, video_w, video_h, min_crop_size): + """Checks if a crop is significant enough to be recommended by checking if any side is cropped by at least min_crop_size pixels.""" + parsed = parse_crop_string(crop_str) + if not parsed: + return False + + w, h, x, y = parsed['w'], parsed['h'], parsed['x'], parsed['y'] + + # Calculate how much is cropped from each side + crop_top = y + crop_bottom = video_h - (y + h) + crop_left = x + crop_right = video_w - (x + w) + + # Return True if the largest crop on any single side meets the threshold + if max(crop_top, crop_bottom, crop_left, crop_right) >= min_crop_size: + return True + + return False + +def analyze_video(input_file, duration, width, height, num_workers, significant_crop_threshold, min_crop, debug=False): + """Main analysis function for the video.""" + print(f"\n--- Analyzing Video: {os.path.basename(input_file)} ---") + + # Step 1: Analyze video in segments to detect crops + num_tasks = num_workers * 4 + segment_duration = max(1, duration // num_tasks) + tasks = [(i * segment_duration, input_file, width, height) for i in range(num_tasks)] + + print(f"Analyzing {len(tasks)} segments across {num_workers} worker(s)...") + + crop_results = [] + with multiprocessing.Pool(processes=num_workers) as pool: + total_tasks = len(tasks) + results_iterator = pool.imap_unordered(analyze_segment, tasks) + + for i, result in enumerate(results_iterator, 1): + crop_results.append(result) + progress_message = f"Analyzing Segments: {i}/{total_tasks} completed..." + sys.stdout.write(f"\r{progress_message}") + sys.stdout.flush() + print() + + all_crops_with_ts = [crop for sublist in crop_results for crop in sublist] + all_crop_strings = [item[0] for item in all_crops_with_ts] + if not all_crop_strings: + print(f"\n{COLOR_GREEN}Analysis complete. No black bars detected.{COLOR_RESET}") + return + + crop_counts = Counter(all_crop_strings) + + if debug: + print("\n--- Debug: Most Common Raw Detections ---") + for crop_str, count in crop_counts.most_common(10): + print(f" - {crop_str} (Count: {count})") + + # Step 2: Cluster similar crop values + clusters = cluster_crop_values(crop_counts) + total_detections = sum(c['count'] for c in clusters) + + if debug: + print("\n--- Debug: Detected Clusters ---") + for cluster in clusters: + percentage = (cluster['count'] / total_detections) * 100 + print(f" - Center: {cluster['center']}, Count: {cluster['count']} ({percentage:.1f}%)") + + # Step 3: Filter clusters that are below the significance threshold + significant_clusters = [] + for cluster in clusters: + percentage = (cluster['count'] / total_detections) * 100 + if percentage >= significant_crop_threshold: + significant_clusters.append(cluster) + + # Step 4: Determine final recommendation based on significant clusters + print("\n--- Determining Final Crop Recommendation ---") + + for cluster in significant_clusters: + parsed_crop = parse_crop_string(cluster['center']) + if parsed_crop: + _, ar_label = snap_to_known_ar( + parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height + ) + cluster['ar_label'] = ar_label + else: + cluster['ar_label'] = None + + if not significant_clusters: + print(f"{COLOR_RED}No single crop value meets the {significant_crop_threshold}% significance threshold.{COLOR_RESET}") + print("Recommendation: Do not crop. Try lowering the -sct threshold.") + + elif len(significant_clusters) == 1: + dominant_cluster = significant_clusters[0] + parsed_crop = parse_crop_string(dominant_cluster['center']) + snapped_crop, ar_label = snap_to_known_ar( + parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height + ) + + print("A single dominant aspect ratio was found.") + if ar_label: + print(f"The detected crop snaps to the '{ar_label}' aspect ratio.") + + # Check if the final crop is a no-op (i.e., matches source dimensions) + parsed_snapped = parse_crop_string(snapped_crop) + if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height: + print(f"\n{COLOR_GREEN}The detected crop matches the source resolution. No crop is needed.{COLOR_RESET}") + else: + print(f"\n{COLOR_GREEN}Recommended crop filter: -vf {snapped_crop}{COLOR_RESET}") + + else: # len > 1, mixed AR case + print(f"{COLOR_YELLOW}Mixed aspect ratios detected (e.g., IMAX scenes).{COLOR_RESET}") + print("Calculating a safe 'master' crop to contain all significant scenes.") + + crop_keys = [c['center'] for c in significant_clusters] + bounding_box_crop = calculate_bounding_box(crop_keys) + + if bounding_box_crop: + parsed_bb = parse_crop_string(bounding_box_crop) + snapped_crop, ar_label = snap_to_known_ar( + parsed_bb['w'], parsed_bb['h'], parsed_bb['x'], parsed_bb['y'], width, height + ) + + print("\n--- Detected Significant Ratios ---") + for cluster in significant_clusters: + percentage = (cluster['count'] / total_detections) * 100 + label = f"'{cluster['ar_label']}'" if cluster['ar_label'] else "Custom AR" + print(f" - {label} ({cluster['center']}) was found in {percentage:.1f}% of samples.") + + print(f"\n{COLOR_GREEN}Analysis complete.{COLOR_RESET}") + if ar_label: + print(f"The calculated master crop snaps to the '{ar_label}' aspect ratio.") + + # Check if the final crop is a no-op + parsed_snapped = parse_crop_string(snapped_crop) + if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height: + print(f"{COLOR_GREEN}The final calculated crop matches the source resolution. No crop is needed.{COLOR_RESET}") + else: + print(f"{COLOR_GREEN}Recommended safe crop filter: -vf {snapped_crop}{COLOR_RESET}") + else: + print(f"{COLOR_RED}Could not calculate a bounding box. Manual review is required.{COLOR_RESET}") + +def main(): + parser = argparse.ArgumentParser( + description="Analyzes a video file to detect black bars and recommend crop values. " + "Handles mixed aspect ratios by calculating a safe bounding box.", + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument("input", help="Input video file") + parser.add_argument("-n", "--num_workers", type=int, default=max(1, multiprocessing.cpu_count() // 2), help="Number of worker threads. Defaults to half of available cores.") + parser.add_argument("-sct", "--significant_crop_threshold", type=float, default=5.0, help="Percentage a crop must be present to be considered 'significant'. Default is 5.0.") + parser.add_argument("-mc", "--min_crop", type=int, default=10, help="Minimum pixels to crop on any side for it to be considered a 'major' crop. Default is 10.") + parser.add_argument("--debug", action="store_true", help="Enable detailed debug logging.") + + args = parser.parse_args() + + input_file = args.input + num_workers = args.num_workers + significant_crop_threshold = args.significant_crop_threshold + min_crop = args.min_crop + + # Validate input file + if not os.path.isfile(input_file): + print(f"{COLOR_RED}Error: Input file does not exist.{COLOR_RESET}") + sys.exit(1) + + # Always probe the video file for metadata + print("--- Probing video file for metadata ---") + + try: + probe_duration_args = [ + 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', + input_file + ] + duration_str = subprocess.check_output(probe_duration_args, stderr=subprocess.STDOUT, text=True) + duration = int(float(duration_str)) + print(f"Detected duration: {duration}s") + + probe_res_args = [ + 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0', + input_file + ] + resolution_str = subprocess.check_output(probe_res_args, stderr=subprocess.STDOUT, text=True) + width, height = map(int, resolution_str.strip().split('x')) + print(f"Detected resolution: {width}x{height}") + + except Exception as e: + print(f"{COLOR_RED}Error probing video file: {e}{COLOR_RESET}") + sys.exit(1) + + print(f"\n--- Video Analysis Parameters ---") + print(f"Input File: {os.path.basename(input_file)}") + print(f"Duration: {duration}s") + print(f"Resolution: {width}x{height}") + print(f"Number of Workers: {num_workers}") + print(f"Significance Threshold: {significant_crop_threshold}%") + print(f"Minimum Crop Size: {min_crop}px") + + # Check for required tools + check_prerequisites() + + # Analyze the video + analyze_video(input_file, duration, width, height, num_workers, significant_crop_threshold, min_crop, args.debug) + +if __name__ == "__main__": + main() + diff --git a/old/scene_cutter_old.py b/old/scene_cutter_old.py new file mode 100644 index 0000000..a1dd394 --- /dev/null +++ b/old/scene_cutter_old.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +import subprocess +import json +import os +import sys +import argparse + +def get_video_duration(video_path): + """Gets the duration of a video file in seconds using ffprobe.""" + command = [ + 'ffprobe', + '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + video_path + ] + try: + result = subprocess.run(command, capture_output=True, text=True, check=True) + return float(result.stdout.strip()) + except FileNotFoundError: + print("\nError: 'ffprobe' command not found. Is it installed and in your system's PATH?") + sys.exit(1) + except (subprocess.CalledProcessError, ValueError) as e: + print(f"\nError getting video duration: {e}") + sys.exit(1) + +def get_best_hwaccel(): + """ + Checks for available FFmpeg hardware acceleration methods and returns the best one. + Priority is defined as: cuda > qsv > dxva2. + Returns the name of the best available method, or None if none are found. + """ + priority = ['cuda', 'qsv', 'dxva2'] + print("Checking for available hardware acceleration...") + try: + # Run ffmpeg to list available hardware acceleration methods + result = subprocess.run( + ['ffmpeg', '-hide_banner', '-hwaccels'], + capture_output=True, text=True, encoding='utf-8', check=False + ) + + if result.returncode != 0: + print("Warning: Could not query FFmpeg for hardware acceleration. Using software decoding.") + return None + + # Parse the output to get a list of available methods + available_methods = result.stdout.strip().split('\n') + if len(available_methods) > 1: + available_methods = available_methods[1:] + + # Check for our preferred methods in order of priority + for method in priority: + if method in available_methods: + print(f"Found best available hardware acceleration: {method}") + return method + + print("No high-priority hardware acceleration (cuda, qsv, dxva2) found. Using software decoding.") + return None + + except FileNotFoundError: + # This will be handled by the main ffmpeg call later, so we just return None. + return None + except Exception as e: + print(f"Warning: An error occurred while checking for hwaccel: {e}. Using software decoding.") + return None + +def cut_video_into_scenes(video_path, max_segment_length, hwaccel=None): + """ + Cuts a video into segments based on a .scenes.json file using FFmpeg's segment muxer, + ensuring no segment exceeds a maximum length. + + Args: + video_path (str): The path to the input video file. + max_segment_length (int): The maximum allowed length for any segment in seconds. + hwaccel (str, optional): The hardware acceleration method to use for decoding. + """ + print(f"\nProcessing video: {os.path.basename(video_path)}") + + # 1. Derive the .scenes.json path and check for its existence + base_name = os.path.splitext(video_path)[0] + json_path = f"{base_name}.scenes.json" + + if not os.path.isfile(json_path): + print(f"\nError: Scene file not found at '{json_path}'") + print("Please run scene_detector.py on the video first.") + sys.exit(1) + + # 2. Load the scene timestamps from the JSON file + try: + with open(json_path, 'r') as f: + scene_timestamps = json.load(f) + except json.JSONDecodeError: + print(f"\nError: Could not parse the JSON file at '{json_path}'. It may be corrupted.") + sys.exit(1) + + if not scene_timestamps: + print("Warning: The scene file is empty. No segments to cut.") + return + + # Get total video duration to handle the last segment correctly + print("Getting video duration...") + video_duration = get_video_duration(video_path) + print(f"Video duration: {video_duration:.2f} seconds.") + + # 3. Create the output directory + output_dir = "cuts" + os.makedirs(output_dir, exist_ok=True) + print(f"Output will be saved to the '{output_dir}' directory.") + + if hwaccel: + print(f"Using hardware acceleration for decoding: {hwaccel}") + + # 4. Process timestamps to enforce max segment length + print(f"Enforcing maximum segment length of {max_segment_length} seconds...") + + # Define all segment boundaries, starting at 0 and ending at the video's duration + segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration] + + final_cut_points = [] + for i in range(len(segment_boundaries) - 1): + start = segment_boundaries[i] + end = segment_boundaries[i+1] + + current_time = start + while (end - current_time) > max_segment_length: + current_time += max_segment_length + final_cut_points.append(current_time) + + final_cut_points.append(end) + + # Clean up the final list: sort, remove duplicates, and remove the final cut at the very end + final_cut_points = sorted(list(set(final_cut_points))) + if final_cut_points and final_cut_points[-1] >= video_duration - 0.1: # Use a small tolerance + final_cut_points.pop() + + print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.") + + # 5. Prepare arguments for the segment muxer + segment_times_str = ",".join(map(str, final_cut_points)) + video_basename = os.path.basename(base_name) + output_pattern = os.path.join(output_dir, f"{video_basename}_segment%03d.mkv") + + # 6. Build the single FFmpeg command + command = ['ffmpeg', '-hide_banner'] + if hwaccel: + command.extend(['-hwaccel', hwaccel]) + + command.extend([ + '-i', video_path, + '-c:v', 'utvideo', + '-an', '-sn', '-dn', + '-map_metadata', '-1', + '-map_chapters', '-1', + '-f', 'segment', + '-segment_times', segment_times_str, + '-segment_start_number', '1', + '-reset_timestamps', '1', + output_pattern + ]) + + print("\nStarting FFmpeg to cut all segments in a single pass...") + try: + # Use Popen to show FFmpeg's progress in real-time + process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8') + + for line in iter(process.stderr.readline, ''): + if line.strip().startswith('frame='): + sys.stderr.write(f'\r{line.strip()}') + sys.stderr.flush() + + process.wait() + sys.stderr.write('\n') + sys.stderr.flush() + + if process.returncode != 0: + print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).") + else: + total_segments = len(final_cut_points) + 1 + print(f"\nSuccessfully created {total_segments} segments in the '{output_dir}' directory.") + + except FileNotFoundError: + print("\nError: 'ffmpeg' command not found. Is it installed and in your system's PATH?") + sys.exit(1) + except Exception as e: + print(f"\nAn error occurred during cutting: {e}") + +def main(): + """ + Main function to handle command-line arguments. + """ + parser = argparse.ArgumentParser( + description="Automatically detects the best hardware decoder and cuts a video into scene-based segments using a corresponding '.scenes.json' file." + ) + parser.add_argument("video_path", help="Path to the video file.") + parser.add_argument( + '--segment-length', + type=int, + default=10, + help='Maximum length of a segment in seconds. Default: 10' + ) + + args = parser.parse_args() + + if not os.path.isfile(args.video_path): + print(f"Error: Input file not found: '{args.video_path}'") + sys.exit(1) + + # Automatically determine the best hardware acceleration method + hwaccel_method = get_best_hwaccel() + + cut_video_into_scenes(args.video_path, args.segment_length, hwaccel_method) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/old/scene_detector_old.py b/old/scene_detector_old.py new file mode 100644 index 0000000..2cda564 --- /dev/null +++ b/old/scene_detector_old.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +import subprocess +import json +import os +import sys +import argparse + +def get_best_hwaccel(): + """ + Checks for available FFmpeg hardware acceleration methods and returns the best one. + Priority is defined as: cuda > qsv > dxva2. + Returns the name of the best available method, or None if none are found. + """ + priority = ['cuda', 'qsv', 'dxva2'] + print("Checking for available hardware acceleration...") + try: + # Run ffmpeg to list available hardware acceleration methods + result = subprocess.run( + ['ffmpeg', '-hide_banner', '-hwaccels'], + capture_output=True, text=True, encoding='utf-8', check=False + ) + + if result.returncode != 0: + print("Warning: Could not query FFmpeg for hardware acceleration. Using software decoding.") + return None + + # Parse the output to get a list of available methods + available_methods = result.stdout.strip().split('\n') + if len(available_methods) > 1: + available_methods = available_methods[1:] + + # Check for our preferred methods in order of priority + for method in priority: + if method in available_methods: + print(f"Found best available hardware acceleration: {method}") + return method + + print("No high-priority hardware acceleration (cuda, qsv, dxva2) found. Using software decoding.") + return None + + except FileNotFoundError: + # This will be handled by the main ffmpeg call later, so we just return None. + return None + except Exception as e: + print(f"Warning: An error occurred while checking for hwaccel: {e}. Using software decoding.") + return None + +def detect_scenes(video_path, hwaccel=None, threshold=0.4): + """ + Uses FFmpeg to detect scene changes in a video and saves the timestamps to a JSON file. + """ + print(f"Starting scene detection for: {os.path.basename(video_path)}") + + # Define the output JSON file path based on the input video name + base_name = os.path.splitext(video_path)[0] + json_output_path = f"{base_name}.scenes.json" + + # Base FFmpeg command + command = ['ffmpeg', '-hide_banner'] + + # If hardware acceleration is specified, add the relevant flags + if hwaccel: + print(f"Attempting to use hardware acceleration: {hwaccel}") + command.extend(['-hwaccel', hwaccel]) + + # Add the rest of the command arguments + # The filter string is now built dynamically with the specified threshold + filter_string = f"select='gt(scene,{threshold})',showinfo" + command.extend([ + '-i', video_path, + '-vf', filter_string, + '-f', 'null', + '-' + ]) + + # Execute the FFmpeg command + try: + # Use Popen to start the process and read its output in real-time + process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8') + + scene_timestamps = [] + + # Read stderr line by line as it's produced + for line in iter(process.stderr.readline, ''): + # Check for scene detection output (which contains pts_time) + if 'pts_time:' in line: + timestamp_str = line.split('pts_time:')[1].split()[0] + scene_timestamps.append(float(timestamp_str)) + # Check for ffmpeg's progress line (which starts with 'frame=') + # and print it to the actual stderr to show progress + elif line.strip().startswith('frame='): + # Use sys.stderr to avoid interfering with stdout prints + # Use '\r' to have the line overwrite itself, mimicking ffmpeg's behavior + sys.stderr.write(f'\r{line.strip()}') + sys.stderr.flush() + + # Wait for the process to finish and get the return code + process.wait() + + # Print a newline to move past the progress bar + sys.stderr.write('\n') + sys.stderr.flush() + + if process.returncode != 0: + print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).") + + if not scene_timestamps: + print("Warning: No scenes were detected. The output file will be empty.") + + # Save the detected scenes to a JSON file + with open(json_output_path, 'w') as json_file: + json.dump(scene_timestamps, json_file, indent=4) + + print(f"Scene detection completed. {len(scene_timestamps)} scenes found.") + print(f"Results saved to: {json_output_path}") + + except FileNotFoundError: + print("Error: 'ffmpeg' command not found. Is it installed and in your system's PATH?") + except Exception as e: + print(f"An error occurred: {e}") + +def main(): + """ + Main function to handle command-line arguments. + """ + parser = argparse.ArgumentParser(description="Detect scene changes in a video using FFmpeg.") + parser.add_argument("video_path", help="Path to the video file.") + parser.add_argument( + "-t", "--threshold", + type=float, + default=0.4, + help="Set the scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.4" + ) + + args = parser.parse_args() + + if not os.path.isfile(args.video_path): + print(f"Error: Input file not found: '{args.video_path}'") + sys.exit(1) + + # Automatically determine the best hardware acceleration method + hwaccel_method = get_best_hwaccel() + + detect_scenes(args.video_path, hwaccel_method, args.threshold) + +if __name__ == "__main__": + main() \ No newline at end of file