452 lines
18 KiB
Python
452 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
from collections import Counter
|
|
import multiprocessing
|
|
import shutil
|
|
|
|
# --- Utility Functions (from previous scripts) ---
|
|
|
|
def get_best_hwaccel():
|
|
"""
|
|
Checks for available FFmpeg hardware acceleration methods and returns the best one
|
|
based on the current operating system.
|
|
"""
|
|
# Determine the priority list based on the operating system
|
|
if sys.platform == "win32":
|
|
# Windows: CUDA (Nvidia) > QSV (Intel) > D3D11VA (Modern, AMD/All) > DXVA2 (Legacy)
|
|
priority = ['cuda', 'qsv', 'd3d11va', 'dxva2']
|
|
elif sys.platform == "linux":
|
|
# Linux: CUDA (Nvidia) > VAAPI (Intel/AMD)
|
|
priority = ['cuda', 'vaapi']
|
|
elif sys.platform == "darwin":
|
|
# macOS: VideoToolbox is the native framework for Apple Silicon
|
|
priority = ['videotoolbox']
|
|
else:
|
|
# Fallback for other operating systems (e.g., BSD)
|
|
priority = []
|
|
|
|
if not priority:
|
|
print(f"No hardware acceleration priority list for this OS ({sys.platform}). Using software.")
|
|
return None
|
|
|
|
print(f"Checking for available hardware acceleration on {sys.platform}...")
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-hide_banner', '-hwaccels'],
|
|
capture_output=True, text=True, encoding='utf-8', check=False
|
|
)
|
|
if result.returncode != 0:
|
|
print("Warning: Could not query FFmpeg. Using software decoding.")
|
|
return None
|
|
|
|
available_methods = result.stdout.strip().split('\n')
|
|
if len(available_methods) > 1:
|
|
available_methods = available_methods[1:]
|
|
|
|
for method in priority:
|
|
if method in available_methods:
|
|
print(f"Found best available hardware acceleration: {method}")
|
|
return method
|
|
|
|
print("No high-priority hardware acceleration found. Using software decoding.")
|
|
return None
|
|
except (FileNotFoundError, Exception):
|
|
return None
|
|
|
|
def get_video_duration(video_path):
|
|
"""Gets the duration of a video file in seconds using ffprobe."""
|
|
command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path]
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, text=True, check=True)
|
|
return float(result.stdout.strip())
|
|
except (FileNotFoundError, subprocess.CalledProcessError, ValueError) as e:
|
|
print(f"\nError getting video duration: {e}")
|
|
return None
|
|
|
|
def get_video_resolution(video_path):
|
|
"""Gets the resolution (width, height) of a video file using ffprobe's JSON output for robustness."""
|
|
command = [
|
|
'ffprobe',
|
|
'-v', 'quiet',
|
|
'-print_format', 'json',
|
|
'-show_streams',
|
|
video_path
|
|
]
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, text=True, check=True, encoding='utf-8')
|
|
data = json.loads(result.stdout)
|
|
for stream in data.get('streams', []):
|
|
if stream.get('codec_type') == 'video' and 'width' in stream and 'height' in stream:
|
|
return int(stream['width']), int(stream['height'])
|
|
|
|
# If no video stream with resolution is found
|
|
raise ValueError("Could not find video stream with resolution in ffprobe output.")
|
|
except (FileNotFoundError, subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e:
|
|
print(f"\nError getting video resolution: {e}")
|
|
return None, None
|
|
|
|
# --- Core Logic Functions (Ported 1:1 from cropdetect.py) ---
|
|
|
|
KNOWN_ASPECT_RATIOS = [
|
|
{"name": "HDTV (16:9)", "ratio": 16/9},
|
|
{"name": "Widescreen (Scope)", "ratio": 2.39},
|
|
{"name": "Widescreen (Flat)", "ratio": 1.85},
|
|
{"name": "IMAX Digital (1.90:1)", "ratio": 1.90},
|
|
{"name": "Fullscreen (4:3)", "ratio": 4/3},
|
|
{"name": "IMAX 70mm (1.43:1)", "ratio": 1.43},
|
|
]
|
|
|
|
def parse_crop_string(crop_str):
|
|
"""Parses a 'crop=w:h:x:y' string into a dictionary of integers."""
|
|
try:
|
|
_, values = crop_str.split('=')
|
|
w, h, x, y = map(int, values.split(':'))
|
|
return {'w': w, 'h': h, 'x': x, 'y': y}
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
def snap_to_known_ar(w, h, x, y, video_w, video_h, tolerance=0.03):
|
|
"""Snaps a crop rectangle to the nearest standard aspect ratio if it's close enough."""
|
|
if h == 0: return f"crop={w}:{h}:{x}:{y}", None
|
|
detected_ratio = w / h
|
|
|
|
best_match = None
|
|
smallest_diff = float('inf')
|
|
|
|
for ar in KNOWN_ASPECT_RATIOS:
|
|
diff = abs(detected_ratio - ar['ratio'])
|
|
if diff < smallest_diff:
|
|
smallest_diff = diff
|
|
best_match = ar
|
|
|
|
if not best_match or (smallest_diff / best_match['ratio']) >= tolerance:
|
|
return f"crop={w}:{h}:{x}:{y}", None
|
|
|
|
# Heuristic: if width is close to full video width, it's letterboxed.
|
|
if abs(w - video_w) < 16:
|
|
new_h = round(video_w / best_match['ratio'])
|
|
# Round height up to the nearest multiple of 8 for cleaner dimensions.
|
|
if new_h % 8 != 0:
|
|
new_h = new_h + (8 - (new_h % 8))
|
|
new_y = round((video_h - new_h) / 2)
|
|
if new_y % 2 != 0: new_y -= 1 # Ensure y offset is even
|
|
return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name']
|
|
|
|
# Heuristic: if height is close to full video height, it's pillarboxed.
|
|
if abs(h - video_h) < 16:
|
|
new_w = round(video_h * best_match['ratio'])
|
|
# Round width up to the nearest multiple of 8.
|
|
if new_w % 8 != 0:
|
|
new_w = new_w + (8 - (new_w % 8))
|
|
new_x = round((video_w - new_w) / 2)
|
|
if new_x % 2 != 0: new_x -= 1 # Ensure x offset is even
|
|
return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name']
|
|
|
|
return f"crop={w}:{h}:{x}:{y}", None
|
|
|
|
def cluster_crop_values(crop_counts, tolerance=8):
|
|
"""Groups similar crop values into clusters based on the top-left corner."""
|
|
clusters = []
|
|
temp_counts = crop_counts.copy()
|
|
while temp_counts:
|
|
center_str, _ = temp_counts.most_common(1)[0]
|
|
parsed_center = parse_crop_string(center_str)
|
|
if not parsed_center:
|
|
del temp_counts[center_str]; continue
|
|
|
|
cx, cy = parsed_center['x'], parsed_center['y']
|
|
cluster_total_count = 0
|
|
crops_to_remove = []
|
|
for crop_str, count in temp_counts.items():
|
|
parsed_crop = parse_crop_string(crop_str)
|
|
if parsed_crop and abs(parsed_crop['x'] - cx) <= tolerance and abs(parsed_crop['y'] - cy) <= tolerance:
|
|
cluster_total_count += count
|
|
crops_to_remove.append(crop_str)
|
|
|
|
if cluster_total_count > 0:
|
|
clusters.append({'center': center_str, 'count': cluster_total_count})
|
|
for crop_str in crops_to_remove:
|
|
del temp_counts[crop_str]
|
|
|
|
return sorted(clusters, key=lambda c: c['count'], reverse=True)
|
|
|
|
def calculate_bounding_box(crop_keys):
|
|
"""Calculates a bounding box that contains all given crop rectangles."""
|
|
min_x, max_x = float('inf'), float('-inf')
|
|
min_y, max_y = float('inf'), float('-inf')
|
|
for key in crop_keys:
|
|
parsed = parse_crop_string(key)
|
|
if parsed:
|
|
x, y, w, h = parsed['x'], parsed['y'], parsed['w'], parsed['h']
|
|
min_x = min(min_x, x)
|
|
min_y = min(min_y, y)
|
|
max_x = max(max_x, x + w)
|
|
max_y = max(max_y, y + h)
|
|
|
|
final_w, final_h = (max_x - min_x), (max_y - min_y)
|
|
if final_w % 2 != 0: final_w -= 1
|
|
if final_h % 2 != 0: final_h -= 1
|
|
return f"crop={final_w}:{final_h}:{min_x}:{min_y}"
|
|
|
|
def analyze_segment_for_crop(task_args):
|
|
"""Worker process to analyze one video segment for crop values."""
|
|
seek_time, input_file = task_args
|
|
ffmpeg_args = ['ffmpeg', '-hide_banner', '-ss', str(seek_time), '-i', input_file, '-t', '1', '-vf', 'cropdetect', '-f', 'null', '-']
|
|
result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8')
|
|
return re.findall(r'crop=\d+:\d+:\d+:\d+', result.stderr)
|
|
|
|
def detect_crop(video_path, hwaccel=None):
|
|
"""
|
|
Detects black bars using the full, robust logic from cropdetect.py, including
|
|
multiprocess analysis, clustering, and aspect ratio snapping.
|
|
"""
|
|
print("\nStarting robust crop detection (1:1 logic from cropdetect.py)...")
|
|
|
|
# --- Parameters from original script ---
|
|
significant_crop_threshold = 5.0
|
|
num_workers = max(1, multiprocessing.cpu_count() // 2)
|
|
|
|
# --- Probing ---
|
|
duration = get_video_duration(video_path)
|
|
width, height = get_video_resolution(video_path)
|
|
if not all([duration, width, height]):
|
|
print("Could not get video metadata. Aborting crop detection.")
|
|
return None
|
|
|
|
# --- Analysis ---
|
|
num_tasks = num_workers * 4
|
|
segment_duration = max(1, duration // num_tasks)
|
|
tasks = [(i * segment_duration, video_path) for i in range(num_tasks)]
|
|
|
|
print(f"Analyzing {len(tasks)} segments across {num_workers} worker(s)...")
|
|
all_crops = []
|
|
with multiprocessing.Pool(processes=num_workers) as pool:
|
|
for i, result in enumerate(pool.imap_unordered(analyze_segment_for_crop, tasks), 1):
|
|
all_crops.extend(result)
|
|
sys.stdout.write(f"\rAnalyzing Segments: {i}/{len(tasks)} completed...")
|
|
sys.stdout.flush()
|
|
print("\nAnalysis complete.")
|
|
|
|
if not all_crops:
|
|
print("No black bars detected.")
|
|
return None
|
|
|
|
# --- Decision Logic ---
|
|
crop_counts = Counter(all_crops)
|
|
clusters = cluster_crop_values(crop_counts)
|
|
total_detections = sum(c['count'] for c in clusters)
|
|
|
|
if total_detections == 0:
|
|
print("No valid crop detections found.")
|
|
return None
|
|
|
|
significant_clusters = [c for c in clusters if (c['count'] / total_detections * 100) >= significant_crop_threshold]
|
|
|
|
final_crop = None
|
|
ar_label = None
|
|
|
|
if not significant_clusters:
|
|
print(f"No single crop value meets the {significant_crop_threshold}% significance threshold. No crop will be applied.")
|
|
return None
|
|
|
|
elif len(significant_clusters) == 1:
|
|
print("A single dominant aspect ratio was found.")
|
|
final_crop = significant_clusters[0]['center']
|
|
|
|
else: # Mixed AR
|
|
print("Mixed aspect ratios detected. Calculating a safe 'master' crop.")
|
|
crop_keys = [c['center'] for c in significant_clusters]
|
|
final_crop = calculate_bounding_box(crop_keys)
|
|
|
|
# --- Snapping ---
|
|
parsed = parse_crop_string(final_crop)
|
|
if not parsed: return None
|
|
|
|
snapped_crop, ar_label = snap_to_known_ar(parsed['w'], parsed['h'], parsed['x'], parsed['y'], width, height)
|
|
if ar_label:
|
|
print(f"The detected crop snaps to the '{ar_label}' aspect ratio.")
|
|
|
|
# --- Final Check ---
|
|
parsed_snapped = parse_crop_string(snapped_crop)
|
|
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
|
|
print("Final crop matches source resolution. No cropping needed.")
|
|
return None
|
|
|
|
print(f"Robust crop detection finished. Recommended filter: {snapped_crop}")
|
|
return snapped_crop
|
|
|
|
def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.23, crop_filter=None):
|
|
"""Uses FFmpeg to detect scene changes and saves timestamps to a JSON file."""
|
|
print(f"\nStarting scene detection for: {os.path.basename(video_path)}")
|
|
# NOTE: Hardware acceleration is intentionally disabled for scene detection.
|
|
# The scenedetect filter can be unreliable with hwaccel contexts as it
|
|
# operates on CPU frames. The performance gain is negligible for this step.
|
|
command = ['ffmpeg', '-hide_banner']
|
|
|
|
filters = []
|
|
if crop_filter:
|
|
print(f"Applying crop filter during scene detection: {crop_filter}")
|
|
filters.append(crop_filter)
|
|
filters.append(f"select='gt(scene,{threshold})',showinfo")
|
|
filter_string = ",".join(filters)
|
|
|
|
# Add -map 0:v:0 to explicitly select the first video stream, ignoring cover art.
|
|
command.extend(['-i', video_path, '-map', '0:v:0', '-vf', filter_string, '-f', 'null', '-'])
|
|
|
|
try:
|
|
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
|
|
scene_timestamps = []
|
|
for line in iter(process.stderr.readline, ''):
|
|
if 'pts_time:' in line:
|
|
timestamp_str = line.split('pts_time:')[1].split()[0]
|
|
scene_timestamps.append(float(timestamp_str))
|
|
elif line.strip().startswith('frame='):
|
|
sys.stderr.write(f'\r{line.strip()}')
|
|
sys.stderr.flush()
|
|
|
|
process.wait()
|
|
sys.stderr.write('\n')
|
|
sys.stderr.flush()
|
|
|
|
if process.returncode != 0:
|
|
print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).")
|
|
|
|
with open(json_output_path, 'w') as json_file:
|
|
json.dump(scene_timestamps, json_file, indent=4)
|
|
|
|
print(f"Scene detection completed. {len(scene_timestamps)} scenes found.")
|
|
return True
|
|
|
|
except (FileNotFoundError, Exception) as e:
|
|
print(f"\nAn error occurred during scene detection: {e}")
|
|
return False
|
|
|
|
def cut_video_into_scenes(video_path, json_path, max_segment_length, hwaccel=None, crop_filter=None):
|
|
"""Cuts a video into segments, ensuring no segment exceeds a maximum length."""
|
|
print(f"\nStarting segment cutting for: {os.path.basename(video_path)}")
|
|
try:
|
|
with open(json_path, 'r') as f:
|
|
scene_timestamps = json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f"\nError reading scene file '{json_path}': {e}")
|
|
return
|
|
|
|
video_duration = get_video_duration(video_path)
|
|
if video_duration is None: return
|
|
|
|
output_dir = "cuts"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
print(f"Output will be saved to the '{output_dir}' directory.")
|
|
|
|
print(f"Enforcing maximum segment length of {max_segment_length} seconds...")
|
|
segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration]
|
|
final_cut_points = []
|
|
for i in range(len(segment_boundaries) - 1):
|
|
start, end = segment_boundaries[i], segment_boundaries[i+1]
|
|
current_time = start
|
|
while (end - current_time) > max_segment_length:
|
|
current_time += max_segment_length
|
|
final_cut_points.append(current_time)
|
|
final_cut_points.append(end)
|
|
|
|
final_cut_points = sorted(list(set(final_cut_points)))
|
|
if final_cut_points and final_cut_points[-1] >= video_duration - 0.1:
|
|
final_cut_points.pop()
|
|
|
|
print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.")
|
|
|
|
segment_times_str = ",".join(map(str, final_cut_points))
|
|
base_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
output_pattern = os.path.join(output_dir, f"{base_name}_segment%03d.mkv")
|
|
|
|
# Add -loglevel error to hide info messages and -stats to show progress
|
|
command = ['ffmpeg', '-hide_banner', '-loglevel', 'error', '-stats']
|
|
if hwaccel:
|
|
command.extend(['-hwaccel', hwaccel])
|
|
|
|
command.extend(['-i', video_path])
|
|
|
|
if crop_filter:
|
|
print(f"Applying crop filter during cutting: {crop_filter}")
|
|
command.extend(['-vf', crop_filter])
|
|
|
|
# Add -map 0:v:0 to explicitly select the first video stream for cutting.
|
|
# Combine with -an/-sn to ensure no other streams are processed.
|
|
command.extend(['-map', '0:v:0', '-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern])
|
|
|
|
print("\nStarting FFmpeg to cut all segments in a single pass...")
|
|
try:
|
|
# -stats will print progress to stderr, which subprocess.run will display
|
|
subprocess.run(command, check=True)
|
|
total_segments = len(final_cut_points) + 1
|
|
print(f"Successfully created {total_segments} segments in the '{output_dir}' directory.")
|
|
except (FileNotFoundError, subprocess.CalledProcessError, Exception) as e:
|
|
print(f"\nAn error occurred during cutting: {e}")
|
|
|
|
# --- Main Orchestrator ---
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="A comprehensive video processing script to detect scenes and cut segments.",
|
|
formatter_class=argparse.RawTextHelpFormatter
|
|
)
|
|
parser.add_argument("video_path", help="Path to the input video file.")
|
|
parser.add_argument(
|
|
"--autocrop",
|
|
action='store_true',
|
|
help="Automatically detect and apply cropping to remove black bars. Default: False"
|
|
)
|
|
parser.add_argument(
|
|
"--so", "--sceneonly",
|
|
action='store_true',
|
|
dest='sceneonly', # Explicitly set the destination attribute name
|
|
help="Only run scene detection and create the .scenes.json file."
|
|
)
|
|
parser.add_argument(
|
|
"--segtime",
|
|
type=int,
|
|
default=10,
|
|
help="Maximum length of any cut segment in seconds. Default: 10"
|
|
)
|
|
parser.add_argument(
|
|
"-t", "--threshold",
|
|
type=float,
|
|
default=0.23,
|
|
help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.23"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.isfile(args.video_path):
|
|
print(f"Error: Input file not found: '{args.video_path}'")
|
|
sys.exit(1)
|
|
|
|
base_name = os.path.splitext(args.video_path)[0]
|
|
json_path = f"{base_name}.scenes.json"
|
|
|
|
hwaccel_method = get_best_hwaccel()
|
|
crop_filter = None
|
|
if args.autocrop:
|
|
crop_filter = detect_crop(args.video_path, hwaccel_method)
|
|
|
|
if args.sceneonly:
|
|
detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold, crop_filter)
|
|
sys.exit(0)
|
|
|
|
# --- Full Workflow (Detect if needed, then Cut) ---
|
|
if not os.path.isfile(json_path):
|
|
print("--- Scene file not found, running detection first ---")
|
|
if not detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold, crop_filter):
|
|
print("\nScene detection failed. Aborting process.")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"--- Found existing scene file: {os.path.basename(json_path)} ---")
|
|
|
|
cut_video_into_scenes(args.video_path, json_path, args.segtime, hwaccel_method, crop_filter)
|
|
|
|
if __name__ == "__main__":
|
|
main() |