215 lines
8.6 KiB
Python
215 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
import argparse
|
|
|
|
# --- Utility Functions (from previous scripts) ---
|
|
|
|
def get_best_hwaccel():
|
|
"""
|
|
Checks for available FFmpeg hardware acceleration methods and returns the best one
|
|
based on the current operating system.
|
|
"""
|
|
# Determine the priority list based on the operating system
|
|
if sys.platform == "win32":
|
|
# Windows: CUDA (Nvidia) > QSV (Intel) > D3D11VA (Modern, AMD/All) > DXVA2 (Legacy)
|
|
priority = ['cuda', 'qsv', 'd3d11va', 'dxva2']
|
|
elif sys.platform == "linux":
|
|
# Linux: CUDA (Nvidia) > VAAPI (Intel/AMD)
|
|
priority = ['cuda', 'vaapi']
|
|
elif sys.platform == "darwin":
|
|
# macOS: VideoToolbox is the native framework for Apple Silicon
|
|
priority = ['videotoolbox']
|
|
else:
|
|
# Fallback for other operating systems (e.g., BSD)
|
|
priority = []
|
|
|
|
if not priority:
|
|
print(f"No hardware acceleration priority list for this OS ({sys.platform}). Using software.")
|
|
return None
|
|
|
|
print(f"Checking for available hardware acceleration on {sys.platform}...")
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-hide_banner', '-hwaccels'],
|
|
capture_output=True, text=True, encoding='utf-8', check=False
|
|
)
|
|
if result.returncode != 0:
|
|
print("Warning: Could not query FFmpeg. Using software decoding.")
|
|
return None
|
|
|
|
available_methods = result.stdout.strip().split('\n')
|
|
if len(available_methods) > 1:
|
|
available_methods = available_methods[1:]
|
|
|
|
for method in priority:
|
|
if method in available_methods:
|
|
print(f"Found best available hardware acceleration: {method}")
|
|
return method
|
|
|
|
print("No high-priority hardware acceleration found. Using software decoding.")
|
|
return None
|
|
except (FileNotFoundError, Exception):
|
|
return None
|
|
|
|
def get_video_duration(video_path):
|
|
"""Gets the duration of a video file in seconds using ffprobe."""
|
|
command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path]
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, text=True, check=True)
|
|
return float(result.stdout.strip())
|
|
except (FileNotFoundError, subprocess.CalledProcessError, ValueError) as e:
|
|
print(f"\nError getting video duration: {e}")
|
|
return None
|
|
|
|
# --- Core Logic Functions ---
|
|
|
|
def detect_scenes(video_path, json_output_path, hwaccel=None, threshold=0.4):
|
|
"""Uses FFmpeg to detect scene changes and saves timestamps to a JSON file."""
|
|
print(f"\nStarting scene detection for: {os.path.basename(video_path)}")
|
|
command = ['ffmpeg', '-hide_banner']
|
|
if hwaccel:
|
|
print(f"Attempting to use hardware acceleration: {hwaccel}")
|
|
command.extend(['-hwaccel', hwaccel])
|
|
|
|
filter_string = f"select='gt(scene,{threshold})',showinfo"
|
|
command.extend(['-i', video_path, '-vf', filter_string, '-f', 'null', '-'])
|
|
|
|
try:
|
|
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True, encoding='utf-8')
|
|
scene_timestamps = []
|
|
for line in iter(process.stderr.readline, ''):
|
|
if 'pts_time:' in line:
|
|
timestamp_str = line.split('pts_time:')[1].split()[0]
|
|
scene_timestamps.append(float(timestamp_str))
|
|
elif line.strip().startswith('frame='):
|
|
sys.stderr.write(f'\r{line.strip()}')
|
|
sys.stderr.flush()
|
|
|
|
process.wait()
|
|
sys.stderr.write('\n')
|
|
sys.stderr.flush()
|
|
|
|
if process.returncode != 0:
|
|
print(f"\nWarning: FFmpeg may have encountered an error (exit code: {process.returncode}).")
|
|
|
|
with open(json_output_path, 'w') as json_file:
|
|
json.dump(scene_timestamps, json_file, indent=4)
|
|
|
|
print(f"Scene detection completed. {len(scene_timestamps)} scenes found.")
|
|
return True
|
|
|
|
except (FileNotFoundError, Exception) as e:
|
|
print(f"\nAn error occurred during scene detection: {e}")
|
|
return False
|
|
|
|
def cut_video_into_scenes(video_path, json_path, max_segment_length, hwaccel=None):
|
|
"""Cuts a video into segments, ensuring no segment exceeds a maximum length."""
|
|
print(f"\nStarting segment cutting for: {os.path.basename(video_path)}")
|
|
try:
|
|
with open(json_path, 'r') as f:
|
|
scene_timestamps = json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f"\nError reading scene file '{json_path}': {e}")
|
|
return
|
|
|
|
video_duration = get_video_duration(video_path)
|
|
if video_duration is None: return
|
|
|
|
output_dir = "cuts"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
print(f"Output will be saved to the '{output_dir}' directory.")
|
|
|
|
print(f"Enforcing maximum segment length of {max_segment_length} seconds...")
|
|
segment_boundaries = [0.0] + sorted(list(set(scene_timestamps))) + [video_duration]
|
|
final_cut_points = []
|
|
for i in range(len(segment_boundaries) - 1):
|
|
start, end = segment_boundaries[i], segment_boundaries[i+1]
|
|
current_time = start
|
|
while (end - current_time) > max_segment_length:
|
|
current_time += max_segment_length
|
|
final_cut_points.append(current_time)
|
|
final_cut_points.append(end)
|
|
|
|
final_cut_points = sorted(list(set(final_cut_points)))
|
|
if final_cut_points and final_cut_points[-1] >= video_duration - 0.1:
|
|
final_cut_points.pop()
|
|
|
|
print(f"Original scenes: {len(scene_timestamps)}. Total segments after splitting: {len(final_cut_points) + 1}.")
|
|
|
|
segment_times_str = ",".join(map(str, final_cut_points))
|
|
base_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
output_pattern = os.path.join(output_dir, f"{base_name}_segment%03d.mkv")
|
|
|
|
# Add -loglevel error to hide info messages and -stats to show progress
|
|
command = ['ffmpeg', '-hide_banner', '-loglevel', 'error', '-stats']
|
|
if hwaccel:
|
|
command.extend(['-hwaccel', hwaccel])
|
|
|
|
command.extend(['-i', video_path, '-c:v', 'utvideo', '-an', '-sn', '-dn', '-map_metadata', '-1', '-map_chapters', '-1', '-f', 'segment', '-segment_times', segment_times_str, '-segment_start_number', '1', '-reset_timestamps', '1', output_pattern])
|
|
|
|
print("\nStarting FFmpeg to cut all segments in a single pass...")
|
|
try:
|
|
# -stats will print progress to stderr, which subprocess.run will display
|
|
subprocess.run(command, check=True)
|
|
total_segments = len(final_cut_points) + 1
|
|
print(f"Successfully created {total_segments} segments in the '{output_dir}' directory.")
|
|
except (FileNotFoundError, subprocess.CalledProcessError, Exception) as e:
|
|
print(f"\nAn error occurred during cutting: {e}")
|
|
|
|
# --- Main Orchestrator ---
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="A comprehensive video processing script to detect scenes and cut segments.",
|
|
formatter_class=argparse.RawTextHelpFormatter
|
|
)
|
|
parser.add_argument("video_path", help="Path to the input video file.")
|
|
parser.add_argument(
|
|
"--so", "--sceneonly",
|
|
action='store_true',
|
|
dest='sceneonly', # Explicitly set the destination attribute name
|
|
help="Only run scene detection and create the .scenes.json file."
|
|
)
|
|
parser.add_argument(
|
|
"--segtime",
|
|
type=int,
|
|
default=10,
|
|
help="Maximum length of any cut segment in seconds. Default: 10"
|
|
)
|
|
parser.add_argument(
|
|
"-t", "--threshold",
|
|
type=float,
|
|
default=0.4,
|
|
help="Scene detection threshold (0.0 to 1.0). Lower is more sensitive. Default: 0.4"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.isfile(args.video_path):
|
|
print(f"Error: Input file not found: '{args.video_path}'")
|
|
sys.exit(1)
|
|
|
|
base_name = os.path.splitext(args.video_path)[0]
|
|
json_path = f"{base_name}.scenes.json"
|
|
|
|
hwaccel_method = get_best_hwaccel()
|
|
|
|
if args.sceneonly:
|
|
detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold)
|
|
sys.exit(0)
|
|
|
|
# --- Full Workflow (Detect if needed, then Cut) ---
|
|
if not os.path.isfile(json_path):
|
|
print("--- Scene file not found, running detection first ---")
|
|
if not detect_scenes(args.video_path, json_path, hwaccel_method, args.threshold):
|
|
print("\nScene detection failed. Aborting process.")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"--- Found existing scene file: {os.path.basename(json_path)} ---")
|
|
|
|
cut_video_into_scenes(args.video_path, json_path, args.segtime, hwaccel_method)
|
|
|
|
if __name__ == "__main__":
|
|
main() |