Files
chunk_encoder/static_encoder.py

220 lines
8.3 KiB
Python

#!/usr/bin/env python3
import subprocess
import os
import sys
import argparse
import shutil
import multiprocessing
import re
import time
def get_video_resolution(video_path):
"""Gets video resolution using ffprobe."""
command = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0',
video_path
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
width, height = map(int, result.stdout.strip().split('x'))
return width, height
except (subprocess.CalledProcessError, ValueError):
return None, None
def get_frame_count(video_path):
"""Gets the total number of frames in a video file using ffprobe."""
command = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-count_frames', '-show_entries', 'stream=nb_read_frames',
'-of', 'default=nokey=1:noprint_wrappers=1', video_path
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
return int(result.stdout.strip())
except (subprocess.CalledProcessError, ValueError):
return 0
def encode_segment_worker(task_args):
"""
Wrapper for the multiprocessing pool.
Reports progress to shared memory objects.
"""
segment_path, crf, worker_id, progress_dict, total_processed_frames, lock = task_args
progress_dict[worker_id] = {'fps': 0.0, 'status': 'Starting'}
success = encode_segment(segment_path, crf, worker_id, progress_dict, total_processed_frames, lock)
status = 'Finished' if success else 'FAILED'
progress_dict[worker_id] = {'fps': 0.0, 'status': status}
return success
def encode_segment(segment_path, crf, worker_id, progress_dict, total_processed_frames, lock):
"""
Encodes a single segment, reporting progress via shared objects.
Calculates FPS manually based on frame processing time.
"""
output_dir = "segments"
base_name = os.path.basename(segment_path)
final_output_path = os.path.join(output_dir, base_name)
width, height = get_video_resolution(segment_path)
if not width or not height:
return False
ffmpeg_command = [
'ffmpeg', '-hide_banner', '-loglevel', 'error', '-i', segment_path,
'-pix_fmt', 'yuv420p10le', '-f', 'yuv4mpegpipe', '-strict', '-1', '-'
]
svt_command = [
'SvtAv1EncApp.exe', '-i', 'stdin', '--width', str(width), '--height', str(height),
'--progress', '2', '--preset', '2', '--input-depth', '10',
'--crf', str(crf), '--film-grain', '8', '--tune', '2', '--keyint', '-1',
'--color-primaries', '1', '--transfer-characteristics', '1', '--matrix-coefficients', '1',
'-b', final_output_path
]
try:
ffmpeg_process = subprocess.Popen(ffmpeg_command, stdout=subprocess.PIPE)
svt_process = subprocess.Popen(svt_command, stdin=ffmpeg_process.stdout, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, encoding='utf-8')
if ffmpeg_process.stdout:
ffmpeg_process.stdout.close()
progress_regex = re.compile(r"Encoding frame\s+(\d+)")
last_frame = 0
frames_processed_this_segment = 0
last_update_time = time.time()
frames_at_last_update = 0
for line in iter(svt_process.stderr.readline, ''):
line = line.strip()
match = progress_regex.search(line)
if match:
current_frame = int(match.group(1))
current_time = time.time()
time_since_last_update = current_time - last_update_time
fps = 0.0
if time_since_last_update > 1.0:
frames_since_last_update = current_frame - frames_at_last_update
if frames_since_last_update > 0 and time_since_last_update > 0:
fps = frames_since_last_update / time_since_last_update
last_update_time = current_time
frames_at_last_update = current_frame
delta = current_frame - last_frame
if delta > 0:
with lock:
total_processed_frames.value += delta
frames_processed_this_segment += delta
last_frame = current_frame
if fps > 0.0:
progress_dict[worker_id] = {'fps': fps, 'status': 'Encoding'}
svt_process.communicate()
ffmpeg_process.wait()
if svt_process.returncode != 0:
raise subprocess.CalledProcessError(svt_process.returncode, svt_command)
total_segment_frames = get_frame_count(segment_path)
remaining_frames = total_segment_frames - frames_processed_this_segment
if remaining_frames > 0:
with lock:
total_processed_frames.value += remaining_frames
return True
except (subprocess.CalledProcessError, FileNotFoundError):
if os.path.exists(final_output_path):
os.remove(final_output_path)
return False
def draw_global_progress(processed_frames, total_frames, progress_dict):
"""Draws a single line global progress bar."""
bar_width = 50
percentage = processed_frames / total_frames if total_frames > 0 else 0
filled_len = int(round(bar_width * percentage))
bar = '' * filled_len + '-' * (bar_width - filled_len)
total_fps = sum(worker.get('fps', 0.0) for worker in progress_dict.values())
status_str = f"Progress: |{bar}| {processed_frames}/{total_frames} ({percentage:.1%}) @ {total_fps:.2f} FPS"
sys.stdout.write('\r' + status_str)
sys.stdout.flush()
def main():
parser = argparse.ArgumentParser(
description="Encodes video segments from the 'cuts' directory to AV1 using a static CRF value.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("--crf", type=int, default=27, help="The static CRF value to use for all segments. Default: 27")
parser.add_argument(
"--workers",
type=int,
default=4,
help="Number of segments to encode in parallel. Default: 4"
)
args = parser.parse_args()
for exe in ["SvtAv1EncApp.exe", "ffprobe"]:
if not shutil.which(exe):
print(f"Error: '{exe}' not found. Please ensure it is in your system's PATH.")
sys.exit(1)
input_dir = "cuts"
output_dir = "segments"
if not os.path.isdir(input_dir):
print(f"Error: Input directory '{input_dir}' not found. Please run the scene cutter script first.")
sys.exit(1)
os.makedirs(output_dir, exist_ok=True)
segments = sorted([os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith('.mkv')])
total_segments = len(segments)
if total_segments == 0:
print(f"No segments found in '{input_dir}'.")
sys.exit(0)
print(f"Found {total_segments} segments to process from '{input_dir}'.")
print(f"Encoding with static CRF {args.crf} using {args.workers} parallel worker(s).")
print("\nGathering segment information...")
grand_total_frames = sum(get_frame_count(s) for s in segments)
manager = multiprocessing.Manager()
progress_dict = manager.dict()
total_processed_frames = manager.Value('i', 0)
lock = manager.Lock()
tasks = [(segments[i], args.crf, i, progress_dict, total_processed_frames, lock) for i in range(total_segments)]
pool = multiprocessing.Pool(processes=args.workers)
future_results = pool.imap_unordered(encode_segment_worker, tasks)
while total_processed_frames.value < grand_total_frames:
draw_global_progress(total_processed_frames.value, grand_total_frames, progress_dict)
time.sleep(0.1)
if all(p.get('status') in ['Finished', 'FAILED'] for p in progress_dict.values()) and len(progress_dict) == total_segments:
break
pool.close()
pool.join()
draw_global_progress(grand_total_frames, grand_total_frames, progress_dict)
print()
results = list(future_results)
successful_encodes = sum(1 for r in results if r)
print(f"\n--- All segments processed. ---")
print(f"Successfully encoded {successful_encodes}/{total_segments} segments.")
if __name__ == "__main__":
main()