Fix windows and fallback to vszip if turbo-metrics fails

This commit is contained in:
trixoniisama
2024-11-19 10:32:56 +01:00

View File

@@ -9,10 +9,14 @@ import re
import argparse
import psutil
import shutil
import platform
import vapoursynth as vs
core = vs.core
core.max_cache_size = 1024
IS_WINDOWS = platform.system() == 'Windows'
NULL_DEVICE = 'NUL' if IS_WINDOWS else '/dev/null'
if shutil.which("av1an") is None:
raise FileNotFoundError("av1an not found, exiting")
@@ -83,6 +87,11 @@ def fast_pass(
:type workers: int
"""
# Enclose paths in quotes if they contain spaces
input_file = f'"{input_file}"' if ' ' in input_file else input_file
output_file = f'"{output_file}"' if ' ' in output_file else output_file
tmp_dir = f'"{tmp_dir}"' if ' ' in tmp_dir else tmp_dir
fast_av1an_command = [
'av1an',
'-i', input_file,
@@ -147,61 +156,66 @@ def turbo_metrics(
)
def calculate_ssimu2(src_file, enc_file, ssimu2_txt_path, ranges, skip):
if ssimu2zig:
source_clip = core.lsmas.LWLibavSource(source=src_file, cache=0)
encoded_clip = core.lsmas.LWLibavSource(source=enc_file, cache=0)
#source_clip = source_clip.resize.Bicubic(format=vs.RGBS, matrix_in_s='709').fmtc.transfer(transs="srgb", transd="linear", bits=32)
#encoded_clip = encoded_clip.resize.Bicubic(format=vs.RGBS, matrix_in_s='709').fmtc.transfer(transs="srgb", transd="linear", bits=32)
print(f"source: {len(source_clip)} frames")
print(f"encode: {len(encoded_clip)} frames")
with open(ssimu2_txt_path, "w") as file:
file.write(f"skip: {skip}\n")
iter = 0
for i in range(len(ranges) - 1):
cut_source_clip = source_clip[ranges[i]:ranges[i+1]].std.SelectEvery(cycle=skip, offsets=1)
cut_encoded_clip = encoded_clip[ranges[i]:ranges[i+1]].std.SelectEvery(cycle=skip, offsets=1)
result = core.vszip.Metrics(cut_source_clip, cut_encoded_clip, mode=0)
for index, frame in enumerate(result.frames()):
iter += 1
score = frame.props['_SSIMULACRA2']
with open(ssimu2_txt_path, "a") as file:
file.write(f"{iter}: {score}\n")
else:
if not ssimu2zig: # Try turbo-metrics first if ssimu2zig is False
turbo_metrics_run = turbo_metrics(src_file, enc_file, skip)
if turbo_metrics_run.returncode != 0:
if turbo_metrics_run.returncode == 0: # If turbo-metrics succeeds
with open(ssimu2_txt_path, "w") as file:
file.write(f"skip: {skip}\n")
frame = 0
# for whatever reason, turbo-metrics in csv mode dumps the entire scores to stdout at the end even though it prints them live to stdout.
# so we need to see if we've seen ``ssimulacra2`` before and if we have, ignore anything after the second one.
ignore_end_barf = False
for line in turbo_metrics_run.stdout.splitlines():
# set ignore_end_barf to true as this is the first "ssimulacra2" line
if line == "ssimulacra2" and not ignore_end_barf:
ignore_end_barf = True
# break the loop as we've encountered the second "ssimulacra2" line so we don't get a dupe of the scores.
elif line == "ssimulacra2" and ignore_end_barf:
break
# assume everything not "ssimulacra2" is a score.
if line != "ssimulacra2":
frame += 1
with open(ssimu2_txt_path, "a") as file:
file.write(f"{frame}: {float(line)}\n")
return # Exit if turbo-metrics succeeded
else:
print(f"Turbo Metrics exited with code: {turbo_metrics_run.returncode}")
print(turbo_metrics_run.stdout)
print(turbo_metrics_run.stderr)
exit(1)
print("Falling back to vs-zip")
skip = args.skip if args.skip is not None else '3'
with open(ssimu2_txt_path, "w") as file:
file.write(f"skip: {skip}\n")
# If ssimu2zig is True or turbo-metrics failed, use vs-zip
source_clip = core.lsmas.LWLibavSource(source=src_file, cache=0)
encoded_clip = core.lsmas.LWLibavSource(source=enc_file, cache=0)
frame = 0
# for whatever reason, turbo-metrics in csv mode dumps the entire scores to stdout at the end even though it prints them live to stdout.
# so we need to see if we've seen ``ssimulacra2`` before and if we have, ignore anything after the second one.
ignore_end_barf = False
for line in turbo_metrics_run.stdout.splitlines():
# set ignore_end_barf to true as this is the first "ssimulacra2" line
if line == "ssimulacra2" and not ignore_end_barf:
ignore_end_barf = True
# break the loop as we've encountered the second "ssimulacra2" line so we don't get a dupe of the scores.
elif line == "ssimulacra2" and ignore_end_barf:
break
# assume everything not "ssimulacra2" is a score.
if line != "ssimulacra2":
frame += 1
with open(ssimu2_txt_path, "a") as file:
file.write(f"{frame}: {float(line)}\n")
#source_clip = source_clip.resize.Bicubic(format=vs.RGBS, matrix_in_s='709').fmtc.transfer(transs="srgb", transd="linear", bits=32)
#encoded_clip = encoded_clip.resize.Bicubic(format=vs.RGBS, matrix_in_s='709').fmtc.transfer(transs="srgb", transd="linear", bits=32)
print(f"source: {len(source_clip)} frames")
print(f"encode: {len(encoded_clip)} frames")
with open(ssimu2_txt_path, "w") as file:
file.write(f"skip: {skip}\n")
iter = 0
for i in range(len(ranges) - 1):
cut_source_clip = source_clip[ranges[i]:ranges[i+1]].std.SelectEvery(cycle=skip, offsets=1)
cut_encoded_clip = encoded_clip[ranges[i]:ranges[i+1]].std.SelectEvery(cycle=skip, offsets=1)
result = core.vszip.Metrics(cut_source_clip, cut_encoded_clip, mode=0)
for index, frame in enumerate(result.frames()):
iter += 1
score = frame.props['_SSIMULACRA2']
with open(ssimu2_txt_path, "a") as file:
file.write(f"{iter}: {score}\n")
def calculate_xpsnr(src_file, enc_path, xpsnr_txt_path):
xpsnr_command = f'ffmpeg -i {src_file} -i {enc_path} -lavfi xpsnr="stats_file={xpsnr_txt_path}" -f null /dev/null'
if IS_WINDOWS:
xpsnr_txt_path = xpsnr_txt_path.replace(':', r'\\:')
if IS_WINDOWS:
xpsnr_command = f'ffmpeg -i "{src_file}" -i "{enc_path}" -lavfi xpsnr="stats_file={xpsnr_txt_path}" -f null {NULL_DEVICE}'
else:
xpsnr_command = f'ffmpeg -i {src_file} -i {enc_path} -lavfi xpsnr="stats_file={xpsnr_txt_path}" -f null {NULL_DEVICE}'
p = subprocess.Popen(xpsnr_command, shell=True)
exit_code = p.wait()
@@ -481,4 +495,4 @@ match stage:
calculate_zones(tmp_dir, ranges, zones, crf)
case _:
print(f"Stage argument invalid, exiting.")
exit(-2)
exit(-2)