Files
encoding-scripts/av1_opus_encoder.py

801 lines
36 KiB
Python

#!/usr/bin/env python3
import os
import sys
import subprocess
import shutil
import tempfile
import json
import re
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
REQUIRED_TOOLS = [
"ffmpeg", "ffprobe", "mkvmerge", "mkvpropedit",
"opusenc", "mediainfo", "av1an", "HandBrakeCLI", "ffmsindex" # Added HandBrakeCLI and ffmsindex
]
DIR_COMPLETED = Path("completed")
DIR_ORIGINAL = Path("original")
DIR_CONV_LOGS = Path("conv_logs") # Directory for conversion logs
REMUX_CODECS = {"aac", "opus"} # Using a set for efficient lookups
SVT_AV1_PARAMS = {
"speed": "slower", # "slower", "slow", "medium", "fast", "faster"
"quality": "medium", # "higher", "high", "medium", "low", "lower"
"film-grain": 6,
"color-primaries": 1,
"transfer-characteristics": 1,
"matrix-coefficients": 1,
"scd": 0, # Scene change detection OFF for Av1an use
"keyint": 0, # Keyframe interval, 0 disables automatic keyframes placement at a constant interval
"lp": 2, # Level of parallelism
"auto-tiling": 1, # Auto tiling ON
"tune": 1, # 0 = VQ, 1 = PSNR, 2 = SSIM
"progress": 2, # Detailed progress output
}
def check_tools() -> None:
for tool in REQUIRED_TOOLS:
if shutil.which(tool) is None:
print(f"Required tool '{tool}' not found in PATH.")
sys.exit(1)
def setup_logging(log_file: Path) -> None:
"""Configures logging to write to both the console and a log file."""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Clear existing handlers to avoid duplicates when processing multiple files
if logger.hasHandlers():
logger.handlers.clear()
# File Handler
fh = logging.FileHandler(log_file, mode='w', encoding='utf-8')
fh.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(fh)
# Console Handler
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(ch)
def run_cmd(cmd: List[str], capture_output: bool = False, check: bool = True) -> Optional[str]:
"""
Runs a command.
If capture_output is True, returns the stdout string.
If capture_output is False, streams output to the console (passthrough).
Always logs the command execution.
"""
cmd_str = " ".join(str(c) for c in cmd)
logging.info(f"Executing: {cmd_str}")
if capture_output:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True)
return result.stdout
else:
# Passthrough to console
subprocess.run(cmd, check=check)
return None
def convert_audio_track(index: int, ch: int, lang: str, audio_temp_dir: str, source_file: str, should_downmix: bool) -> Path:
audio_temp_path = Path(audio_temp_dir)
temp_extracted = audio_temp_path / f"track_{index}_extracted.flac"
temp_normalized = audio_temp_path / f"track_{index}_normalized.flac"
final_opus = audio_temp_path / f"track_{index}_final.opus"
logging.info(f" - Extracting Audio Track #{index} to FLAC...")
ffmpeg_args = [
"ffmpeg", "-v", "quiet", "-stats", "-y", "-i", str(source_file), "-map", f"0:{index}", "-map_metadata", "-1"
]
if should_downmix and ch >= 6:
if ch == 6:
ffmpeg_args += ["-af", "pan=stereo|c0=c2+0.30*c0+0.30*c4|c1=c2+0.30*c1+0.30*c5"]
elif ch == 8:
ffmpeg_args += ["-af", "pan=stereo|c0=c2+0.30*c0+0.30*c4+0.30*c6|c1=c2+0.30*c1+0.30*c5+0.30*c7"]
else: # Other multi-channel (e.g. 7ch, 10ch)
ffmpeg_args += ["-ac", "2"]
ffmpeg_args += ["-c:a", "flac", str(temp_extracted)]
run_cmd(ffmpeg_args)
logging.info(f" - Normalizing Audio Track #{index} with ffmpeg (loudnorm 2-pass)...")
# First pass: Analyze the audio to get loudnorm stats
# The stats are printed to stderr, so we must use subprocess.run directly to capture it.
logging.info(" - Pass 1: Analyzing...")
# Log the command for consistency
analyze_cmd = ["ffmpeg", "-v", "info", "-i", str(temp_extracted), "-af", "loudnorm=I=-23:LRA=7:tp=-1:print_format=json", "-f", "null", "-"]
logging.info(f"Executing: {' '.join(analyze_cmd)}")
result = subprocess.run(
analyze_cmd,
capture_output=True, text=True, check=True)
# Find the start of the JSON block in stderr and parse it.
stderr_output = result.stderr
json_start_index = stderr_output.find('{')
if json_start_index == -1:
raise ValueError("Could not find start of JSON block in ffmpeg output for loudnorm analysis.")
brace_level = 0
json_end_index = -1
for i, char in enumerate(stderr_output[json_start_index:]):
if char == '{':
brace_level += 1
elif char == '}':
brace_level -= 1
if brace_level == 0:
json_end_index = json_start_index + i + 1
break
stats = json.loads(stderr_output[json_start_index:json_end_index])
# Second pass: Apply the normalization using the stats from the first pass
logging.info(" - Pass 2: Applying normalization...")
run_cmd([
"ffmpeg", "-v", "quiet", "-stats", "-y", "-i", str(temp_extracted), "-af",
f"loudnorm=I=-23:LRA=7:tp=-1:measured_i={stats['input_i']}:measured_lra={stats['input_lra']}:measured_tp={stats['input_tp']}:measured_thresh={stats['input_thresh']}:offset={stats['target_offset']}",
"-c:a", "flac", str(temp_normalized)
])
# Set bitrate based on the final channel count of the Opus file.
is_being_downmixed = should_downmix and ch >= 6
if is_being_downmixed:
bitrate = "128k"
else:
if ch == 1: # Mono
bitrate = "64k"
elif ch == 2: # Stereo
bitrate = "128k"
elif ch == 6: # 5.1 Surround
bitrate = "256k"
elif ch == 8: # 7.1 Surround
bitrate = "384k"
else: # Other layouts
bitrate = "96k"
logging.info(f" - Encoding Audio Track #{index} to Opus at {bitrate}...")
run_cmd([
"opusenc", "--vbr", "--bitrate", bitrate, str(temp_normalized), str(final_opus)
])
return final_opus
def convert_video(source_file_base: str, source_file_full: str, is_vfr: bool, target_cfr_fps_for_handbrake: Optional[str], autocrop_filter: Optional[str] = None) -> tuple[Path, Optional[Path]]:
logging.info(" --- Starting Video Processing ---")
# source_file_base is file_path.stem (e.g., "my.anime.episode.01")
vpy_file = Path(f"{source_file_base}.vpy")
ut_video_file = Path(f"{source_file_base}.ut.mkv")
encoded_video_file = Path(f"temp-{source_file_base}.mkv")
handbrake_cfr_intermediate_file = None # To store path of HandBrake output if created
current_input_for_utvideo = Path(source_file_full)
if is_vfr and target_cfr_fps_for_handbrake:
logging.info(f" - Source is VFR. Converting to CFR ({target_cfr_fps_for_handbrake}) with HandBrakeCLI...")
handbrake_cfr_intermediate_file = Path(f"{source_file_base}.cfr_temp.mkv")
handbrake_args = [
"HandBrakeCLI",
"--input", str(source_file_full),
"--output", str(handbrake_cfr_intermediate_file),
"--cfr",
"--rate", str(target_cfr_fps_for_handbrake),
"--encoder", "x264_10bit", # Changed to x264_10bit for 10-bit CFR intermediate
"--quality", "0", # CRF 0 for x264 is often considered visually lossless, or near-lossless
"--encoder-preset", "superfast", # Use a fast preset for quicker processing
"--encoder-tune", "fastdecode", # Added tune for faster decoding
"--audio", "none",
"--subtitle", "none",
"--crop-mode", "none" # Disable auto-cropping
]
logging.info(f" - Running HandBrakeCLI: {' '.join(handbrake_args)}")
try:
run_cmd(handbrake_args)
if handbrake_cfr_intermediate_file.exists() and handbrake_cfr_intermediate_file.stat().st_size > 0:
logging.info(f" - HandBrake VFR to CFR conversion successful: {handbrake_cfr_intermediate_file}")
current_input_for_utvideo = handbrake_cfr_intermediate_file
else:
logging.info(f" - Warning: HandBrakeCLI VFR-to-CFR conversion failed or produced an empty file. Proceeding with original source for UTVideo.")
handbrake_cfr_intermediate_file = None # Ensure it's None if failed
except subprocess.CalledProcessError as e:
logging.info(f" - Error during HandBrakeCLI execution: {e}")
logging.info(f" - Proceeding with original source for UTVideo.")
handbrake_cfr_intermediate_file = None # Ensure it's None if failed
logging.info(" - Creating UTVideo intermediate file (overwriting if exists)...")
# Check if source is already UTVideo
ffprobe_cmd = [
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1",
str(current_input_for_utvideo) # Use current input, which might be HandBrake output
]
source_codec = run_cmd(ffprobe_cmd, capture_output=True, check=True)
if source_codec:
source_codec = source_codec.strip()
else:
source_codec = ""
video_codec_args = ["-c:v", "utvideo"]
if source_codec == "utvideo" and current_input_for_utvideo == Path(source_file_full): # Only copy if original was UTVideo
logging.info(" - Source is already UTVideo. Copying video stream...")
video_codec_args = ["-c:v", "copy"]
ffmpeg_args = [
"ffmpeg", "-hide_banner", "-v", "quiet", "-stats", "-y", "-i", str(current_input_for_utvideo),
"-map", "0:v:0", "-map_metadata", "-1", "-map_chapters", "-1", "-an", "-sn", "-dn",
]
if autocrop_filter:
ffmpeg_args += ["-vf", autocrop_filter]
ffmpeg_args += video_codec_args + [str(ut_video_file)]
run_cmd(ffmpeg_args)
logging.info(" - Indexing UTVideo file with ffmsindex for VapourSynth...")
ffmsindex_args = ["ffmsindex", "-f", str(ut_video_file)]
run_cmd(ffmsindex_args)
ut_video_full_path = os.path.abspath(ut_video_file)
vpy_script_content = f"""import vapoursynth as vs
core = vs.core
core.num_threads = 4
clip = core.ffms2.Source(source=r'''{ut_video_full_path}''')
clip = core.resize.Point(clip, format=vs.YUV420P10, matrix_in_s="709") # type: ignore
clip.set_output()
"""
with vpy_file.open("w", encoding="utf-8") as f:
f.write(vpy_script_content)
logging.info(" - Starting AV1 encode with av1an (this will take a long time)...")
total_cores = os.cpu_count() or 4 # Fallback if cpu_count is None
workers = max(1, (total_cores // 2) - 1) # Half the cores minus one, with a minimum of 1 worker.
logging.info(f" - Using {workers} workers for av1an (Total Cores: {total_cores}, Logic: (Cores/2)-1).")
# Create the parameter string for av1an's -v option, which expects a single string.
av1an_video_params_str = " ".join([f"--{key} {value}" for key, value in SVT_AV1_PARAMS.items()])
logging.info(f" - Using SVT-AV1 parameters: {av1an_video_params_str}")
av1an_enc_args = [
"av1an", "-i", str(vpy_file), "-o", str(encoded_video_file), "-n",
"-e", "svt-av1", "--resume", "--sc-pix-format", "yuv420p", "-c", "mkvmerge",
"--set-thread-affinity", "2", "--pix-format", "yuv420p10le", "--force", "--no-defaults",
"-w", str(workers),
"-v", av1an_video_params_str
]
run_cmd(av1an_enc_args)
logging.info(" --- Finished Video Processing ---")
return encoded_video_file, handbrake_cfr_intermediate_file
def is_ffmpeg_decodable(file_path):
"""Quickly check if ffmpeg can decode the input file."""
try:
# Try to decode a short segment of the first audio stream
subprocess.run([
"ffmpeg", "-v", "error", "-i", str(file_path), "-map", "0:a:0", "-t", "1", "-f", "null", "-"
], check=True)
return True
except subprocess.CalledProcessError:
return False
# --- CROPDETECT LOGIC FROM cropdetect.py ---
import argparse as _argparse_cropdetect
import multiprocessing as _multiprocessing_cropdetect
from collections import Counter as _Counter_cropdetect
COLOR_GREEN = "\033[92m"
COLOR_RED = "\033[91m"
COLOR_YELLOW = "\033[93m"
COLOR_RESET = "\033[0m"
KNOWN_ASPECT_RATIOS = [
{"name": "HDTV (16:9)", "ratio": 16/9},
{"name": "Widescreen (Scope)", "ratio": 2.39},
{"name": "Widescreen (Flat)", "ratio": 1.85},
{"name": "IMAX Digital (1.90:1)", "ratio": 1.90},
{"name": "Fullscreen (4:3)", "ratio": 4/3},
{"name": "IMAX 70mm (1.43:1)", "ratio": 1.43},
]
def _check_prerequisites_cropdetect():
for tool in ['ffmpeg', 'ffprobe']:
if not shutil.which(tool):
print(f"Error: '{tool}' command not found. Is it installed and in your PATH?")
return False
return True
def _analyze_segment_cropdetect(task_args):
seek_time, input_file, width, height = task_args
ffmpeg_args = [
'ffmpeg', '-hide_banner',
'-ss', str(seek_time),
'-i', input_file, '-t', '1', '-vf', 'cropdetect',
'-f', 'null', '-'
]
result = subprocess.run(ffmpeg_args, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
return []
crop_detections = re.findall(r'crop=(\d+):(\d+):(\d+):(\d+)', result.stderr)
significant_crops = []
for w_str, h_str, x_str, y_str in crop_detections:
w, h, x, y = map(int, [w_str, h_str, x_str, y_str])
significant_crops.append((f"crop={w}:{h}:{x}:{y}", seek_time))
return significant_crops
def _snap_to_known_ar_cropdetect(w, h, x, y, video_w, video_h, tolerance=0.03):
if h == 0: return f"crop={w}:{h}:{x}:{y}", None
detected_ratio = w / h
best_match = None
smallest_diff = float('inf')
for ar in KNOWN_ASPECT_RATIOS:
diff = abs(detected_ratio - ar['ratio'])
if diff < smallest_diff:
smallest_diff = diff
best_match = ar
if not best_match or (smallest_diff / best_match['ratio']) >= tolerance:
return f"crop={w}:{h}:{x}:{y}", None
if abs(w - video_w) < 16:
new_h = round(video_w / best_match['ratio'])
if new_h % 8 != 0:
new_h = new_h + (8 - (new_h % 8))
new_h = min(new_h, video_h)
new_y = round((video_h - new_h) / 2)
if new_y % 2 != 0:
new_y -= 1
new_y = max(0, new_y)
return f"crop={video_w}:{new_h}:0:{new_y}", best_match['name']
if abs(h - video_h) < 16:
new_w = round(video_h * best_match['ratio'])
if new_w % 8 != 0:
new_w = new_w + (8 - (new_w % 8))
new_w = min(new_w, video_w)
new_x = round((video_w - new_w) / 2)
if new_x % 2 != 0:
new_x -= 1
new_x = max(0, new_x)
return f"crop={new_w}:{video_h}:{new_x}:0", best_match['name']
return f"crop={w}:{h}:{x}:{y}", None
def _cluster_crop_values_cropdetect(crop_counts, tolerance=8):
clusters = []
temp_counts = crop_counts.copy()
while temp_counts:
center_str, _ = temp_counts.most_common(1)[0]
try:
_, values = center_str.split('=');
cw, ch, cx, cy = map(int, values.split(':'))
except (ValueError, IndexError):
del temp_counts[center_str]
continue
cluster_total_count = 0
crops_to_remove = []
for crop_str, count in temp_counts.items():
try:
_, values = crop_str.split('=');
w, h, x, y = map(int, values.split(':'))
if abs(x - cx) <= tolerance and abs(y - cy) <= tolerance:
cluster_total_count += count
crops_to_remove.append(crop_str)
except (ValueError, IndexError):
continue
if cluster_total_count > 0:
clusters.append({'center': center_str, 'count': cluster_total_count})
for crop_str in crops_to_remove:
del temp_counts[crop_str]
clusters.sort(key=lambda c: c['count'], reverse=True)
return clusters
def _parse_crop_string_cropdetect(crop_str):
try:
_, values = crop_str.split('=');
w, h, x, y = map(int, values.split(':'))
return {'w': w, 'h': h, 'x': x, 'y': y}
except (ValueError, IndexError):
return None
def _calculate_bounding_box_cropdetect(crop_keys):
min_x = min_w = min_y = min_h = float('inf')
max_x = max_w = max_y = max_h = float('-inf')
for key in crop_keys:
parsed = _parse_crop_string_cropdetect(key)
if not parsed:
continue
w, h, x, y = parsed['w'], parsed['h'], parsed['x'], parsed['y']
min_x = min(min_x, x)
min_y = min(min_y, y)
max_x = max(max_x, x + w)
max_y = max(max_y, y + h)
min_w = min(min_w, w)
min_h = min(min_h, h)
max_w = max(max_w, w)
max_h = max(max_h, h)
if (max_x - min_x) <= 2 and (max_y - min_y) <= 2:
return None
bounding_crop = f"crop={max_x - min_x}:{max_y - min_y}:{min_x}:{min_y}"
return bounding_crop
def _analyze_video_cropdetect(input_file, duration, width, height, num_workers, significant_crop_threshold, min_crop, debug=False):
num_tasks = num_workers * 4
segment_duration = max(1, duration // num_tasks)
tasks = [(i * segment_duration, input_file, width, height) for i in range(num_tasks)]
crop_results = []
with _multiprocessing_cropdetect.Pool(processes=num_workers) as pool:
results_iterator = pool.imap_unordered(_analyze_segment_cropdetect, tasks)
for result in results_iterator:
crop_results.append(result)
all_crops_with_ts = [crop for sublist in crop_results for crop in sublist]
all_crop_strings = [item[0] for item in all_crops_with_ts]
if not all_crop_strings:
return None
crop_counts = _Counter_cropdetect(all_crop_strings)
clusters = _cluster_crop_values_cropdetect(crop_counts)
total_detections = sum(c['count'] for c in clusters)
significant_clusters = []
for cluster in clusters:
percentage = (cluster['count'] / total_detections) * 100
if percentage >= significant_crop_threshold:
significant_clusters.append(cluster)
for cluster in significant_clusters:
parsed_crop = _parse_crop_string_cropdetect(cluster['center'])
if parsed_crop:
_, ar_label = _snap_to_known_ar_cropdetect(
parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height
)
cluster['ar_label'] = ar_label
else:
cluster['ar_label'] = None
if not significant_clusters:
return None
elif len(significant_clusters) == 1:
dominant_cluster = significant_clusters[0]
parsed_crop = _parse_crop_string_cropdetect(dominant_cluster['center'])
snapped_crop, ar_label = _snap_to_known_ar_cropdetect(
parsed_crop['w'], parsed_crop['h'], parsed_crop['x'], parsed_crop['y'], width, height
)
parsed_snapped = _parse_crop_string_cropdetect(snapped_crop)
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
return None
else:
return snapped_crop
else:
crop_keys = [c['center'] for c in significant_clusters]
bounding_box_crop = _calculate_bounding_box_cropdetect(crop_keys)
if bounding_box_crop:
parsed_bb = _parse_crop_string_cropdetect(bounding_box_crop)
snapped_crop, ar_label = _snap_to_known_ar_cropdetect(
parsed_bb['w'], parsed_bb['h'], parsed_bb['x'], parsed_bb['y'], width, height
)
parsed_snapped = _parse_crop_string_cropdetect(snapped_crop)
if parsed_snapped and parsed_snapped['w'] == width and parsed_snapped['h'] == height:
return None
else:
return snapped_crop
else:
return None
def detect_autocrop_filter(input_file, significant_crop_threshold=5.0, min_crop=10, debug=False):
if not _check_prerequisites_cropdetect():
return None
try:
probe_duration_args = [
'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1',
input_file
]
duration_str = subprocess.check_output(probe_duration_args, stderr=subprocess.STDOUT, text=True)
duration = int(float(duration_str))
probe_res_args = [
'ffprobe', '-v', 'error',
'-select_streams', 'v',
'-show_entries', 'stream=width,height,disposition',
'-of', 'json',
input_file
]
probe_output = subprocess.check_output(probe_res_args, stderr=subprocess.STDOUT, text=True)
streams_data = json.loads(probe_output)
video_stream = None
for stream in streams_data.get('streams', []):
if stream.get('disposition', {}).get('attached_pic', 0) == 0:
video_stream = stream
break
if not video_stream or 'width' not in video_stream or 'height' not in video_stream:
return None
width = int(video_stream['width'])
height = int(video_stream['height'])
except Exception:
return None
return _analyze_video_cropdetect(input_file, duration, width, height, max(1, os.cpu_count() // 2), significant_crop_threshold, min_crop, debug)
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Batch-process MKV files with resumable video encoding, audio downmixing, per-file logging, and optional autocrop.")
parser.add_argument("--no-downmix", action="store_true", help="Preserve original audio channel layout.")
parser.add_argument("--autocrop", action="store_true", help="Automatically detect and crop black bars from video using cropdetect.")
parser.add_argument("--speed", type=str, help="Set the encoding speed. Possible values: slower, slow, medium, fast, faster.")
parser.add_argument("--quality", type=str, help="Set the encoding quality. Possible values: lowest, low, medium, high, higher.")
parser.add_argument("--grain", type=int, help="Set the film-grain value (number). Adjusts the film grain synthesis level.")
return parser.parse_args()
def process_file(file_path: Path, no_downmix: bool, autocrop: bool) -> None:
current_dir = Path(".")
log_file_name = f"{file_path.stem}.log"
log_file_path = DIR_CONV_LOGS / log_file_name
# Setup logging for this file
setup_logging(log_file_path)
logging.info(f"STARTING LOG FOR: {file_path.name}")
logging.info(f"Processing started at: {datetime.now()}")
logging.info(f"Full input file path: {file_path.resolve()}")
logging.info("-" * shutil.get_terminal_size(fallback=(80, 24)).columns)
input_file_abs = file_path.resolve()
intermediate_output_file = current_dir / f"output-{file_path.name}"
audio_temp_dir = None
handbrake_intermediate_for_cleanup = None
processing_error_occurred = False
start_time = datetime.now()
try:
audio_temp_dir = tempfile.mkdtemp(prefix="anime_audio_")
logging.info(f"Audio temporary directory created at: {audio_temp_dir}")
logging.info(f"Analyzing file: {input_file_abs}")
ffprobe_info_json = run_cmd([
"ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(input_file_abs)
], capture_output=True)
if not ffprobe_info_json:
raise RuntimeError("ffprobe failed to return info")
ffprobe_info = json.loads(ffprobe_info_json)
mkvmerge_info_json = run_cmd([
"mkvmerge", "-J", str(input_file_abs)
], capture_output=True)
if not mkvmerge_info_json:
raise RuntimeError("mkvmerge failed to return info")
mkv_info = json.loads(mkvmerge_info_json)
mediainfo_json = run_cmd([
"mediainfo", "--Output=JSON", "-f", str(input_file_abs)
], capture_output=True)
if not mediainfo_json:
raise RuntimeError("mediainfo failed to return info")
media_info = json.loads(mediainfo_json)
is_vfr = False
target_cfr_fps_for_handbrake = None
video_track_info = None
if media_info.get("media") and media_info["media"].get("track"):
for track in media_info["media"]["track"]:
if track.get("@type") == "Video":
video_track_info = track
break
if video_track_info:
frame_rate_mode = video_track_info.get("FrameRate_Mode")
if frame_rate_mode and frame_rate_mode.upper() in ["VFR", "VARIABLE"]:
is_vfr = True
logging.info(f" - Detected VFR based on MediaInfo FrameRate_Mode: {frame_rate_mode}")
original_fps_str = video_track_info.get("FrameRate_Original_String")
if original_fps_str:
match = re.search(r'\((\d+/\d+)\)', original_fps_str)
if match:
target_cfr_fps_for_handbrake = match.group(1)
else:
target_cfr_fps_for_handbrake = video_track_info.get("FrameRate_Original")
if not target_cfr_fps_for_handbrake:
target_cfr_fps_for_handbrake = video_track_info.get("FrameRate_Original")
if not target_cfr_fps_for_handbrake:
target_cfr_fps_for_handbrake = video_track_info.get("FrameRate")
if target_cfr_fps_for_handbrake:
logging.info(f" - Using MediaInfo FrameRate ({target_cfr_fps_for_handbrake}) as fallback for HandBrake target FPS.")
if target_cfr_fps_for_handbrake:
logging.info(f" - Target CFR for HandBrake: {target_cfr_fps_for_handbrake}")
if isinstance(target_cfr_fps_for_handbrake, str) and "/" in target_cfr_fps_for_handbrake:
try:
num, den = map(float, target_cfr_fps_for_handbrake.split('/'))
target_cfr_fps_for_handbrake = f"{num / den:.3f}"
logging.info(f" - Converted fractional FPS to decimal for HandBrake: {target_cfr_fps_for_handbrake}")
except ValueError:
logging.info(f" - Warning: Could not parse fractional FPS '{target_cfr_fps_for_handbrake}'. HandBrakeCLI might fail.")
is_vfr = False
else:
logging.info(" - Warning: VFR detected, but could not determine target CFR from MediaInfo. Will attempt standard UTVideo conversion without HandBrake.")
is_vfr = False
else:
logging.info(f" - Video appears to be CFR or FrameRate_Mode not specified as VFR/Variable by MediaInfo.")
autocrop_filter = None
if autocrop:
logging.info("--- Running autocrop detection ---")
autocrop_filter = detect_autocrop_filter(str(input_file_abs))
if autocrop_filter:
logging.info(f" - Autocrop filter detected: {autocrop_filter}")
else:
logging.info(" - No crop needed or detected.")
encoded_video_file, handbrake_intermediate_for_cleanup = convert_video(
file_path.stem, str(input_file_abs), is_vfr, target_cfr_fps_for_handbrake, autocrop_filter=autocrop_filter
)
logging.info("--- Starting Audio Processing ---")
processed_audio_files = []
audio_tracks_to_remux = []
audio_streams = [s for s in ffprobe_info.get("streams", []) if s.get("codec_type") == "audio"]
# Build mkvmerge track mapping by track ID
mkv_audio_tracks = {t["id"]: t for t in mkv_info.get("tracks", []) if t.get("type") == "audio"}
# Build mediainfo track mapping by StreamOrder
media_tracks_data = media_info.get("media", {}).get("track", [])
mediainfo_audio_tracks = {int(t.get("StreamOrder", -1)): t for t in media_tracks_data if t.get("@type") == "Audio"}
for stream in audio_streams:
stream_index = stream["index"]
codec = stream.get("codec_name")
channels = stream.get("channels", 2)
language = stream.get("tags", {}).get("language", "und")
# Find mkvmerge track by matching ffprobe stream index to mkvmerge track's 'properties'->'stream_id'
mkv_track = None
for t in mkv_info.get("tracks", []):
if t.get("type") == "audio" and t.get("properties", {}).get("stream_id") == stream_index:
mkv_track = t
break
if not mkv_track:
# Fallback: try by position
mkv_track = mkv_info.get("tracks", [])[stream_index] if stream_index < len(mkv_info.get("tracks", [])) else {}
track_id = mkv_track.get("id", -1)
track_title = mkv_track.get("properties", {}).get("track_name", "")
# Find mediainfo track by StreamOrder
audio_track_info = mediainfo_audio_tracks.get(stream_index)
track_delay = 0
delay_raw = audio_track_info.get("Video_Delay") if audio_track_info else None
if delay_raw is not None:
try:
delay_val = float(delay_raw)
# If the value is a float < 1, it's seconds, so convert to ms.
if delay_val < 1:
track_delay = int(round(delay_val * 1000))
else:
track_delay = int(round(delay_val))
except Exception:
track_delay = 0
logging.info(f"Processing Audio Stream #{stream_index} (TID: {track_id}, Codec: {codec}, Channels: {channels})")
if codec in REMUX_CODECS:
audio_tracks_to_remux.append(str(track_id))
else:
# Convert any codec that is not in REMUX_CODECS
opus_file = convert_audio_track(
stream_index, channels, language, audio_temp_dir, str(input_file_abs), not no_downmix
)
processed_audio_files.append({
"Path": opus_file,
"Language": language,
"Title": track_title,
"Delay": track_delay
})
logging.info("--- Finished Audio Processing ---")
# Final mux
logging.info("Assembling final file with mkvmerge...")
mkvmerge_args = ["mkvmerge", "-o", str(intermediate_output_file), str(encoded_video_file)]
for file_info in processed_audio_files:
sync_switch = ["--sync", f"0:{file_info['Delay']}"] if file_info["Delay"] else []
mkvmerge_args += [
"--language", f"0:{file_info['Language']}",
"--track-name", f"0:{file_info['Title']}"
] + sync_switch + [str(file_info["Path"])]
source_copy_args = ["--no-video"]
if audio_tracks_to_remux:
source_copy_args += ["--audio-tracks", ",".join(audio_tracks_to_remux)]
else:
source_copy_args += ["--no-audio"]
mkvmerge_args += source_copy_args + [str(input_file_abs)]
run_cmd(mkvmerge_args)
# Move files
logging.info("Moving files to final destinations...")
shutil.move(str(file_path), DIR_ORIGINAL / file_path.name)
shutil.move(str(intermediate_output_file), DIR_COMPLETED / file_path.name)
logging.info("Cleaning up persistent video temporary files (after successful processing)...")
video_temp_files_on_success = [
current_dir / f"{file_path.stem}.vpy",
current_dir / f"{file_path.stem}.ut.mkv",
current_dir / f"temp-{file_path.stem}.mkv", # This is encoded_video_file
current_dir / f"{file_path.stem}.ut.mkv.lwi",
current_dir / f"{file_path.stem}.ut.mkv.ffindex",
]
if handbrake_intermediate_for_cleanup and handbrake_intermediate_for_cleanup.exists():
video_temp_files_on_success.append(handbrake_intermediate_for_cleanup)
for temp_vid_file in video_temp_files_on_success:
if temp_vid_file.exists():
logging.info(f" Deleting: {temp_vid_file}")
temp_vid_file.unlink(missing_ok=True)
else:
logging.info(f" Skipping (not found): {temp_vid_file}")
except Exception as e:
logging.error(f"ERROR: An error occurred while processing '{file_path.name}': {e}")
processing_error_occurred = True
finally:
logging.info("--- Starting Universal Cleanup (for this file) ---")
logging.info(" - Cleaning up disposable audio temporary directory...")
if audio_temp_dir and Path(audio_temp_dir).exists():
shutil.rmtree(audio_temp_dir, ignore_errors=True)
logging.info(f" - Deleted audio temp dir: {audio_temp_dir}")
elif audio_temp_dir:
logging.info(f" - Audio temp dir not found or already cleaned: {audio_temp_dir}")
else:
logging.info(f" - Audio temp dir was not created.")
logging.info(" - Cleaning up intermediate output file (if it wasn't moved on success)...")
if intermediate_output_file.exists():
if processing_error_occurred:
logging.warning(f" - WARNING: Processing error occurred. Intermediate output file '{intermediate_output_file}' is being preserved at its original path for inspection.")
else:
logging.info(f" - INFO: Intermediate output file '{intermediate_output_file}' found at original path despite no errors (expected to be moved). Cleaning up.")
intermediate_output_file.unlink(missing_ok=True)
logging.info(f" - Deleted intermediate output file from original path: {intermediate_output_file}")
else:
if not processing_error_occurred:
logging.info(f" - Intermediate output file successfully moved (not found at original path, as expected): {intermediate_output_file}")
else:
logging.info(f" - Processing error occurred, and intermediate output file '{intermediate_output_file}' not found at original path (likely not created or cleaned by another step).")
runtime = datetime.now() - start_time
runtime_str = str(runtime).split('.')[0]
logging.info(f"\nTotal runtime for this file: {runtime_str}")
logging.info(f"FINISHED LOG FOR: {file_path.name}")
def main() -> None:
args = parse_arguments()
check_tools()
# Override default SVT-AV1 params if provided via command line
if args.speed:
SVT_AV1_PARAMS["speed"] = args.speed
if args.quality:
SVT_AV1_PARAMS["quality"] = args.quality
if args.grain is not None:
SVT_AV1_PARAMS["film-grain"] = args.grain
current_dir = Path(".")
DIR_COMPLETED.mkdir(exist_ok=True, parents=True)
DIR_ORIGINAL.mkdir(exist_ok=True, parents=True)
DIR_CONV_LOGS.mkdir(exist_ok=True, parents=True)
files_to_process = sorted(
f for f in current_dir.glob("*.mkv")
if not (f.name.endswith(".ut.mkv") or f.name.startswith("temp-") or f.name.startswith("output-") or f.name.endswith(".cfr_temp.mkv"))
)
if not files_to_process:
print("No MKV files found to process. Exiting.")
return
while True:
files_to_process = sorted(
f for f in current_dir.glob("*.mkv")
if not (f.name.endswith(".ut.mkv") or f.name.startswith("temp-") or f.name.startswith("output-") or f.name.endswith(".cfr_temp.mkv"))
)
if not files_to_process:
print("No more .mkv files found to process in the current directory. The script will now exit.")
break
file_path = files_to_process[0]
if not is_ffmpeg_decodable(file_path):
print(f"ERROR: ffmpeg cannot decode '{file_path.name}'. Skipping this file.", file=sys.stderr)
shutil.move(str(file_path), DIR_ORIGINAL / file_path.name)
continue
process_file(file_path, args.no_downmix, args.autocrop)
if __name__ == "__main__":
import argparse
main()