Updates processes and scripts

This commit is contained in:
2025-07-20 15:44:17 +02:00
parent 58a9c8ce3b
commit 7a8a85d953
3 changed files with 344 additions and 109 deletions

View File

@@ -4,6 +4,10 @@ import json
import os
import sys
import argparse
import re
from collections import Counter
import multiprocessing
import shutil
# --- Utility Functions (from previous scripts) ---
@@ -64,57 +68,225 @@ def get_video_duration(video_path):
print(f"\nError getting video duration: {e}")
return None
# --- Core Logic Functions ---
def get_video_resolution(video_path):
"""Gets the resolution (width, height) of a video file using ffprobe's JSON output for robustness."""
command = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_streams',
video_path
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True, encoding='utf-8')
data = json.loads(result.stdout)
for stream in data.get('streams', []):
if stream.get('codec_type') == 'video' and 'width' in stream and 'height' in stream:
return int(stream['width']), int(stream['height'])
# If no video stream with resolution is found
raise ValueError("Could not find video stream with resolution in ffprobe output.")
except (FileNotFoundError, subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e:
print(f"\nError getting video resolution: {e}")
return None, None
# --- Core Logic Functions (Ported 1:1 from cropdetect.py) ---
KNOWN_ASPECT_RATIOS = [
{"name": "HDTV (16:9)", "ratio": 16/9},
{"name": "Widescreen (Scope)", "ratio": 2.39},
{"name": "Widescreen (Flat)", "ratio": 1.85},
{"name": "IMAX Digital (1.90:1)", "ratio": 1.90},
{"name": "Fullscreen (4:3)", "ratio": 4/3},
{"name": "IMAX 70mm (1.43:1)", "ratio": 1.43},
]
def parse_crop_string(crop_str):
"""Parses a 'crop=w:h:x:y' string into a dictionary of integers."""
try:
_, values = crop_str.split('=')
w, h, x, y = map(int, values.split(':'))
return {'w': w, 'h': h, 'x': x, 'y': y}
except (ValueError, IndexError):
return None
def snap_to_known_ar(w, h, x, y, video_w, video_h, tolerance=0.03):
"""Snaps a crop rectangle to the nearest standard aspect ratio if it's close enough."""
if h == 0: return f"crop={w}:{h}:{x}:{y}", None
detected_ratio = w / h
best_match = None
smallest_diff = float('inf')
for ar in KNOWN_ASPECT_RATIOS:
diff = abs(detected_ratio - ar['ratio'])
if diff < smallest_diff:
smallest_diff = diff
best_match = ar
if not best_match or (smallest_diff / best_match['ratio']) >= tolerance:
return f"crop={w}:{h}:{x}:{y}", None
# Heuristic: if width is close to full video width, it's letterboxed.
if abs(w - video_w) < 16:
new_h = round(video_w / best_match['ratio'])
# Round height up to the nearest multiple of 8 for cleaner dimensions.
if new_h % 8 != 0:
new_h = new_h + (8 - (new_h % 8))
new_y = round((video_h - new_h) / 2)
if new_y % 2 != 0: new_y -= 1 # Ensure y offset is even
return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name']
# Heuristic: if height is close to full video height, it's pillarboxed.
if abs(h - video_h) < 16:
new_w = round(video_h * best_match['ratio'])
# Round width up to the nearest multiple of 8.
if new_w % 8 != 0:
new_w = new_w + (8 - (new_w % 8))
new_x = round((video_w - new_w) / 2)
if new_x % 2 != 0: new_x -= 1 # Ensure x offset is even
return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name']
return f"crop={w}:{h}:{x}:{y}", None
def cluster_crop_values(crop_counts, tolerance=8):
"""Groups similar crop values into clusters based on the top-left corner."""
clusters = []
temp_counts = crop_counts.copy()
while temp_counts:
center_str, _ = temp_counts.most_common(1)[0]
parsed_center = parse_crop_string(center_str)
if not parsed_center:
del temp_counts[center_str]; continue
cx, cy = parsed_center['x'], parsed_center['y']
cluster_total_count = 0
crops_to_remove = []
for crop_str, count in temp_counts.items():
parsed_crop = parse_crop_string(crop_str)
if parsed_crop and abs(parsed_crop['x'] - cx) <= tolerance and abs(parsed_crop['y'] - cy) <= tolerance:
cluster_total_count += count
crops_to_remove.append(crop_str)
if cluster_total_count > 0:
clusters.append({'center': center_str, 'count': cluster_total_count})
for crop_str in crops_to_remove:
del temp_counts[crop_str]
return sorted(clusters, key=lambda c: c['count'], reverse=True)
def calculate_bounding_box(crop_keys):
"""Calculates a bounding box that contains all given crop rectangles."""
min_x, max_x = float('inf'), float('-inf')
min_y, max_y = float('inf'), float('-inf')
for key in crop_keys:
parsed = parse_crop_string(key)
if parsed:
x, y, w, h = parsed['x'], parsed['y'], parsed['w'], parsed['h']
min_x = min(min_x, x)
min_y = min(min_y, y)
max_x = max(max_x, x + w)
max_y = max(max_y, y + h)
final_w, final_h = (max_x - min_x), (max_y - min_y)
if final_w % 2 != 0: final_w -= 1
if final_h % 2 != 0: final_h -= 1
return f"crop={final_w}:{final_h}:{min_x}:{min_y}"
def analyze_segment_for_crop(task_args):
"""Worker process to analyze one video segment for crop values."""
seek_time, input_file = task_args
ffmpeg_args = ['ffmpeg', '-hide_banner', '-ss', str(seek_time), '-i', input_file, '-t', '1', '-vf', 'cropdetect', '-f', 'null', '-']
result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8')
return re.findall(r'crop=\d+:\d+:\d+:\d+', result.stderr)
def detect_crop(video_path, hwaccel=None):
"""
Detects black bars using FFmpeg's cropdetect filter and returns the crop filter string.
Analyzes the first 60 seconds of the video for efficiency.
Detects black bars using the full, robust logic from cropdetect.py, including
multiprocess analysis, clustering, and aspect ratio snapping.
"""
print("\nStarting crop detection...")
command = ['ffmpeg', '-hide_banner']
if hwaccel:
command.extend(['-hwaccel', hwaccel])
print("\nStarting robust crop detection (1:1 logic from cropdetect.py)...")
# Analyze a portion of the video to find crop values
command.extend(['-i', video_path, '-t', '60', '-vf', 'cropdetect', '-f', 'null', '-'])
try:
# Using Popen to read stderr line by line
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
last_crop_line = ""
for line in iter(process.stderr.readline, ''):
if 'crop=' in line:
last_crop_line = line.strip()
process.wait()
# --- Parameters from original script ---
significant_crop_threshold = 5.0
num_workers = max(1, multiprocessing.cpu_count() // 2)
if last_crop_line:
# Find the 'crop=' part in the line and extract it
crop_part_index = last_crop_line.find('crop=')
if crop_part_index != -1:
# Extract the substring starting from 'crop='
crop_filter = last_crop_line[crop_part_index:]
# In case there's other info on the line, split by space and take the first part
crop_filter = crop_filter.split(' ')[0]
print(f"Crop detection finished. Recommended filter: {crop_filter}")
return crop_filter
print("Could not determine crop. No black bars detected or an error occurred.")
# --- Probing ---
duration = get_video_duration(video_path)
width, height = get_video_resolution(video_path)
if not all([duration, width, height]):
print("Could not get video metadata. Aborting crop detection.")
return None
except (FileNotFoundError, Exception) as e:
print(f"\nAn error occurred during crop detection: {e}")
# --- Analysis ---
num_tasks = num_workers * 4
segment_duration = max(1, duration // num_tasks)
tasks = [(i * segment_duration, video_path) for i in range(num_tasks)]
print(f"Analyzing {len(tasks)} segments across {num_workers} worker(s)...")
all_crops = []
with multiprocessing.Pool(processes=num_workers) as pool:
for i, result in enumerate(pool.imap_unordered(analyze_segment_for_crop, tasks), 1):
all_crops.extend(result)
sys.stdout.write(f"\rAnalyzing Segments: {i}/{len(tasks)} completed...")
sys.stdout.flush()
print("\nAnalysis complete.")
if not all_crops:
print("No black bars detected.")
return None
def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.4, crop_filter=None):
# --- Decision Logic ---
crop_counts = Counter(all_crops)
clusters = cluster_crop_values(crop_counts)
total_detections = sum(c['count'] for c in clusters)
if total_detections == 0:
print("No valid crop detections found.")
return None
significant_clusters = [c for c in clusters if (c['count'] / total_detections * 100) >= significant_crop_threshold]
final_crop = None
ar_label = None
if not significant_clusters:
print(f"No single crop value meets the {significant_crop_threshold}% significance threshold. No crop will be applied.")
return None
elif len(significant_clusters) == 1:
print("A single dominant aspect ratio was found.")
final_crop = significant_clusters[0]['center']
else: # Mixed AR
print("Mixed aspect ratios detected. Calculating a safe 'master' crop.")
crop_keys = [c['center'] for c in significant_clusters]
final_crop = calculate_bounding_box(crop_keys)
# --- Snapping ---
parsed = parse_crop_string(final_crop)
if not parsed: return None
snapped_crop, ar_label = snap_to_known_ar(parsed['w'], parsed['h'], parsed['x'], parsed['y'], width, height)
if ar_label:
print(f"The detected crop snaps to the '{ar_label}' aspect ratio.")
# --- Final Check ---
parsed_snapped = parse_crop_string(snapped_crop)
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
print("Final crop matches source resolution. No cropping needed.")
return None
print(f"Robust crop detection finished. Recommended filter: {snapped_crop}")
return snapped_crop
def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.23, crop_filter=None):
"""Uses FFmpeg to detect scene changes and saves timestamps to a JSON file."""
print(f"\nStarting scene detection for: {os.path.basename(video_path)}")
# NOTE: Hardware acceleration is intentionally disabled for scene detection.
# The scenedetect filter can be unreliable with hwaccel contexts as it
# operates on CPU frames. The performance gain is negligible for this step.
command = ['ffmpeg', '-hide_banner']
if hwaccel:
print(f"Attempting to use hardware acceleration: {hwaccel}")
command.extend(['-hwaccel', hwaccel])
filters = []
if crop_filter:
@@ -123,7 +295,8 @@ def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.4, cro
filters.append(f"select='gt(scene,{threshold})',showinfo")
filter_string = ",".join(filters)
command.extend(['-i', video_path, '-vf', filter_string, '-f', 'null', '-'])
# Add -map 0:v:0 to explicitly select the first video stream, ignoring cover art.
command.extend(['-i', video_path, '-map', '0:v:0', '-vf', filter_string, '-f', 'null', '-'])
try:
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
@@ -202,7 +375,9 @@ def cut_video_into_scenes(video_path, json_path, max_segment_length, hwaccel=Non
print(f"Applying crop filter during cutting: {crop_filter}")
command.extend(['-vf', crop_filter])
command.extend(['-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern])
# Add -map 0:v:0 to explicitly select the first video stream for cutting.
# Combine with -an/-sn to ensure no other streams are processed.
command.extend(['-map', '0:v:0', '-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern])
print("\nStarting FFmpeg to cut all segments in a single pass...")
try:
@@ -241,8 +416,8 @@ def main():
parser.add_argument(
"-t", "--threshold",
type=float,
default=0.4,
help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.4"
default=0.23,
help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.23"
)
args = parser.parse_args()