conversion_project/core/encode_engine.py
2026-05-17 21:21:28 -04:00

413 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/encode_engine.py
"""FFmpeg encoding engine with comprehensive logging."""
import subprocess
from pathlib import Path
from core.audio_handler import get_audio_streams, choose_audio_bitrate, filter_audio_streams, prompt_user_audio_selection, prompt_for_title_stripping
from core.video_handler import calculate_crop_dimensions
from core.logger_helper import setup_logger
logger = setup_logger(Path(__file__).parent.parent / "logs")
def run_ffmpeg(input_file: Path, output_file: Path, cq: int, scale_width: int, scale_height: int,
src_width: int, src_height: int, filter_flags: str, audio_config: dict,
method: str, bitrate_config: dict, encoder: str = "nvenc", subtitle_files: list = None, audio_language: str = None,
audio_filter_config: dict = None, test_mode: bool = False, strip_all_titles: bool = False, src_bit_depth: int = None, unforce_subs: bool = False, no_encode: bool = False, color_bit: int = None, crop_height: int = None, audio_titles: dict = None, audio_channels: dict = None):
"""
Execute FFmpeg encoding/re-muxing with structured console output.
Args:
input_file: Path to source video file
output_file: Path for encoded output file
cq: Quality value (0-63, lower=better) for CQ mode
scale_width/height: Target resolution dimensions
src_width/height: Source resolution dimensions
filter_flags: Scaling filter algorithm (lanczos, bicubic, etc)
audio_config: Audio bitrate configuration dict
method: Encoding method - "CQ" or "Bitrate"
bitrate_config: Bitrate/maxrate/bufsize configuration dict
encoder: Video codec - "hevc", "av1", or "nvenc"
subtitle_files: List of external subtitle file paths (if any)
audio_language: ISO 639-2 language code to tag audio (e.g., "eng", "spa")
audio_filter_config: Audio filtering/selection configuration
test_mode: If True, only encode first 15 minutes, don't move files
strip_all_titles: If True, strip title metadata from all audio tracks
src_bit_depth: Source bit depth (8/10/12) for encoder auto-selection
unforce_subs: If True, remove forced flag from subtitle tracks
no_encode: If True, copy video/audio (re-mux only, skip encoding)
color_bit: If specified (8 or 10), forces HEVC color bit depth. 8-bit uses yuv420p, 10-bit uses p010le.
crop_height: If specified, crop video to this height (centered). E.g., 816 for 1920x816 from 1920x1080 source.
audio_titles: Dict mapping stream index to custom title. E.g., {1: "Commentary"} sets stream 1 title to "Commentary".
audio_channels: Dict mapping stream index to channel count. E.g., {0: 2, 1: 6} forces track 0 to stereo, track 1 to 5.1. Only 2 or 6 allowed.
Returns:
tuple: (orig_size_bytes, output_size_bytes, reduction_ratio)
"""
streams = get_audio_streams(input_file)
# Apply audio filter if enabled
if audio_filter_config and audio_filter_config.get("enabled", False):
# Check if pre-selected streams provided
if audio_filter_config.get("preselected"):
# Use pre-selected streams (skip interactive)
preselected_str = audio_filter_config["preselected"]
try:
selected_indices = set()
for part in preselected_str.split(","):
idx = int(part.strip())
selected_indices.add(idx)
# Filter to only selected streams
streams = [s for s in streams if s[0] in selected_indices]
logger.info(f"Pre-selected audio streams: {[s[0] for s in streams]}")
except ValueError:
logger.warning(f"Invalid audio_select format: {preselected_str}. Using all streams.")
else:
# Check if interactive mode requested (via --filter-audio CLI flag)
# If audio_filter_config came from CLI, it has "interactive": True
if "interactive" in audio_filter_config and audio_filter_config.get("interactive", False):
# Interactive audio selection (show prompt to user)
streams = prompt_user_audio_selection(streams)
# Prompt for title stripping after stream selection
streams = prompt_for_title_stripping(streams)
else:
# Automatic filtering from config (keep best English + Commentary)
streams = filter_audio_streams(input_file, streams)
# Determine encoder display name and settings
if encoder == "av1":
encoder_name = "AV1 NVENC"
encoder_codec = "av1_nvenc"
encoder_preset = "p7" # p7 = fastest/lower quality (0-7 scale)
encoder_pix_fmt = "yuv420p"
encoder_bit_depth = "8-bit"
else: # default hevc = HEVC NVENC
encoder_name = "HEVC NVENC"
encoder_codec = "hevc_nvenc"
encoder_preset = "p7" # p7 = fastest/lower quality (0-7 scale)
encoder_pix_fmt = "p010le"
encoder_bit_depth = "10-bit"
# Handle --color-bit override if specified (only for HEVC)
if color_bit is not None and encoder == "hevc":
if color_bit == 8:
encoder_pix_fmt = "yuv420p"
encoder_bit_depth = "8-bit"
logger.info(f"Using --color-bit {color_bit}: HEVC NVENC 8-bit (yuv420p)")
elif color_bit == 10:
encoder_pix_fmt = "p010le"
encoder_bit_depth = "10-bit"
logger.info(f"Using --color-bit {color_bit}: HEVC NVENC 10-bit (p010le)")
# Auto-select encoder based on detected source bit depth if provided (only if --color-bit not specified)
elif src_bit_depth is not None and color_bit is None:
if src_bit_depth >= 10:
# Source is 10-bit or higher - use HEVC NVENC
encoder_name = "HEVC NVENC"
encoder_codec = "hevc_nvenc"
encoder_preset = "p7"
encoder_pix_fmt = "p010le"
encoder_bit_depth = "10-bit"
logger.info(f"Auto-selected HEVC NVENC for detected {src_bit_depth}-bit source")
else:
# Source is 8-bit - use AV1 NVENC
encoder_name = "AV1 NVENC"
encoder_codec = "av1_nvenc"
encoder_preset = "p7"
encoder_pix_fmt = "yuv420p"
encoder_bit_depth = "8-bit"
logger.info(f"Auto-selected AV1 NVENC for detected {src_bit_depth}-bit source")
# Debug: log audio_language received
logger.debug(f"audio_language parameter: {audio_language}")
# Build simple console summary
audio_summary_lines = []
for (index, channels, avg_bitrate, src_lang, meta_bitrate, title, codec_name) in streams:
# Determine final title (considering custom titles override)
final_title = audio_titles.get(index, title) if audio_titles else title
# Check if this is a commentary track (original or custom title)
is_commentary = final_title and "commentary" in final_title.lower()
# Determine output channels: audio_channels override takes precedence
is_1080_class = scale_height >= 1080 or scale_width >= 1920
if audio_channels and index in audio_channels:
# User explicitly specified channel count for this stream
output_channels = audio_channels[index]
channels_override = True
elif is_commentary:
output_channels = 2 # Commentary always stereo
channels_override = False
else:
output_channels = 6 if is_1080_class and channels >= 6 else 2
channels_override = False
codec, br = choose_audio_bitrate(output_channels, avg_bitrate, audio_config, is_1080_class, is_commentary)
if codec == "copy":
action = "COPY"
output_codec = codec_name
output_bitrate = f"{avg_bitrate}kbps"
else:
action = "ENC"
# Determine output codec based on encode choice
output_codec = "EAC3" if codec == "eac3" else "AAC"
output_bitrate = f"{br/1000:.0f}kbps"
# Show language change if audio_language is set
lang_info = f"{src_lang}{audio_language}" if audio_language else src_lang
# Include title in display if present
title_info = f" [{final_title}]" if final_title else ""
# Add override note if channels were forced
override_note = " [FORCED]" if channels_override else ""
line = f" - Stream #{index}: {channels}ch→{output_channels}ch | {lang_info} | Detected: {codec_name} {avg_bitrate}kbps | Output: {output_codec} {output_bitrate} ({action}){title_info}{override_note}"
audio_summary_lines.append(line)
cmd = ["ffmpeg","-y","-i",str(input_file)]
# Add subtitle inputs if present
if subtitle_files:
for sub_file in subtitle_files:
cmd.extend(["-i", str(sub_file)])
# In test mode, only encode first 15 minutes
if test_mode:
cmd.extend(["-t", "900"]) # 900 seconds = 15 minutes
# Build video filters (crop and/or scale)
video_filters = []
# Add crop filter first (if specified)
if crop_height and not no_encode:
crop_dims = calculate_crop_dimensions(src_height, crop_height)
if crop_dims["ffmpeg_filter"]:
video_filters.append(crop_dims["ffmpeg_filter"])
print(f" Applying crop: {crop_dims['ffmpeg_filter']} ({src_height}p → {crop_height}p)")
# Add scale filter (if encoding, not copying)
if not no_encode:
video_filters.append(f"scale={scale_width}:{scale_height}:flags={filter_flags}:force_original_aspect_ratio=decrease")
# Combine all filters with commas (ffmpeg filter chain syntax)
if video_filters:
filter_chain = ",".join(video_filters)
cmd.extend(["-vf", filter_chain])
cmd.extend(["-map","0:v:0"]) # Map only first actual video stream (skips attached pictures)
# Map only selected audio streams
for index, _, _, _, _, _, _ in streams:
cmd.extend(["-map", f"0:{index}"])
# Add subtitle mapping if present
if subtitle_files:
for i, _ in enumerate(subtitle_files):
cmd.extend(["-map", f"{i+1}:s"])
else:
cmd.extend(["-map", "0:s?"])
# Video codec: copy if no_encode, otherwise use specified encoder
if no_encode:
cmd.extend(["-c:v", "copy"])
else:
cmd.extend([
"-c:v", encoder_codec, "-preset", encoder_preset, "-pix_fmt", encoder_pix_fmt])
if method=="CQ":
cmd += ["-cq", str(cq)]
else:
# Use bitrate config (fallback mode)
res_key = "1080" if scale_height >= 1080 or scale_width >= 1920 else "720"
vb = bitrate_config.get(f"bitrate_{res_key}", "900k")
maxrate = bitrate_config.get(f"maxrate_{res_key}", "1250k")
bufsize = bitrate_config.get(f"bufsize_{res_key}", "1800k")
cmd += ["-b:v", vb, "-maxrate", maxrate, "-bufsize", bufsize]
for i, (index, channels, avg_bitrate, src_lang, meta_bitrate, title, codec_name) in enumerate(streams):
# Determine final title (considering custom titles override)
final_title = audio_titles.get(index, title) if audio_titles else title
# Debug: Log what we're working with
if i == 0: # Only log once per file
logger.debug(f"audio_titles dict received: {audio_titles}")
logger.debug(f"Stream {index}: original_title='{title}', final_title='{final_title}', audio_titles_present={audio_titles is not None}")
# Check if this is a commentary track (original or custom title)
is_commentary = final_title and "commentary" in final_title.lower()
# Determine output channels: audio_channels override takes precedence
# BUT: Commentary tracks ALWAYS max out at 2ch (stereo) unless explicitly overridden
is_1080_class = scale_height >= 1080 or scale_width >= 1920
if audio_channels and index in audio_channels:
# User explicitly specified channel count for this stream
output_channels = audio_channels[index]
logger.info(f"Stream #{index}: Audio channels override applied: {channels}ch → {output_channels}ch")
elif is_commentary:
output_channels = 2 # Commentary always stereo
else:
output_channels = 6 if is_1080_class and channels >= 6 else 2
# If no_encode is True, always copy audio
if no_encode:
codec, br = "copy", avg_bitrate
else:
codec, br = choose_audio_bitrate(output_channels, avg_bitrate, audio_config, is_1080_class, is_commentary)
# Check if title should be stripped (for this stream or globally)
# Preserve any stream with "commentary" or "descriptive" in the title, regardless of strip_all_titles
is_special_audio = title and ("commentary" in title.lower() or "descriptive" in title.lower())
should_strip = strip_all_titles and not is_special_audio
# Log title stripping decisions for debugging (debug level, not info)
logger.debug(f"Stream {index}: title='{final_title}', is_commentary={is_commentary}, is_special_audio={is_special_audio}, strip_all_titles={strip_all_titles}, should_strip={should_strip}")
if is_commentary:
logger.info(f"Stream #{index}: Commentary track detected (forcing 2ch stereo)")
if strip_all_titles and is_special_audio:
logger.debug(f"Stream {index}: ✓ Preserving title '{title}' (special audio track)")
if codec == "copy":
# Preserve original audio
cmd += [f"-c:a:{i}", "copy"]
# Only add language metadata if explicitly provided
if audio_language:
cmd += [f"-metadata:s:a:{i}", f"language={audio_language}"]
# Apply custom title if provided for this stream (takes precedence)
if audio_titles and index in audio_titles:
cmd += [f"-metadata:s:a:{i}", f"title={audio_titles[index]}"]
# Strip title metadata if requested (but preserve commentary tracks and custom titles)
elif should_strip:
cmd += [f"-metadata:s:a:{i}", "title="]
else:
# Re-encode with target bitrate
# EAC3 for multichannel, AAC for stereo
if codec == "eac3":
# Enhanced AC-3 (5.1 surround)
cmd += [
f"-c:a:{i}", "eac3",
f"-b:a:{i}", str(br),
f"-ac:{i}", str(output_channels),
f"-channel_layout:a:{i}", "5.1"
]
else:
# AAC (stereo)
cmd += [
f"-c:a:{i}", "aac",
f"-b:a:{i}", str(br),
f"-ac:{i}", str(output_channels),
f"-channel_layout:a:{i}", "stereo"
]
# Only add language metadata if explicitly provided
if audio_language:
cmd += [f"-metadata:s:a:{i}", f"language={audio_language}"]
# Apply custom title if provided for this stream (takes precedence)
if audio_titles and index in audio_titles:
cmd += [f"-metadata:s:a:{i}", f"title={audio_titles[index]}"]
# Strip title metadata if requested (but preserve commentary tracks and custom titles)
elif should_strip:
cmd += [f"-metadata:s:a:{i}", "title="]
# Add subtitle codec and metadata if subtitles are present
if subtitle_files:
cmd += ["-c:s", "srt"]
for i in range(len(subtitle_files)):
cmd += ["-metadata:s:s:" + str(i), "language=eng"]
if unforce_subs:
cmd += ["-disposition:s:" + str(i), "-forced"]
else:
# Convert mov_text (MP4 subtitles) to subrip (MKV-compatible)
# Use "copy" for other formats like subrip, ass, ssa, webvtt that work in MKV
cmd += ["-c:s", "subrip"]
# For embedded subtitles, still apply -disposition if unforce_subs is enabled
if unforce_subs:
# Apply to all embedded subtitle streams
cmd += ["-disposition:s", "-forced"]
cmd += [str(output_file)]
# Print detailed console output with VIDEO and AUDIO sections
print(f"\n🎬 Encoding: {output_file.name}")
# VIDEO SECTION
print(f"📹 VIDEO")
# Build resolution and bit depth info
detected_bit = f" {src_bit_depth}-bit" if src_bit_depth else ""
output_bit = f" {encoder_bit_depth}"
if scale_width != src_width or scale_height != src_height:
res_info = f"Detected: {src_width}x{src_height}{detected_bit} | Output: {scale_width}x{scale_height}{output_bit}"
else:
res_info = f"Detected: {src_width}x{src_height}{detected_bit} | Output: {scale_width}x{scale_height}{output_bit}"
cq_info = f"CQ {cq}" if method == "CQ" else f"VBR {bitrate_config.get('bitrate_1080', '900k')}"
test_str = " [TEST 15min]" if test_mode else ""
print(f" {res_info} | {encoder_name} preset {encoder_preset} | {cq_info}{test_str}")
# AUDIO SECTION
print(f"🔊 AUDIO")
for line in audio_summary_lines:
print(line)
logger.debug(f"Running {method} encode: {output_file.name}")
# Run FFmpeg with stderr/stdout captured (hide version/config info)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
# Print progress section header
print(f"\n⏳ PROGRESS")
# Read output line by line but only print progress-related lines
ffmpeg_log = []
import re
for line in process.stdout:
ffmpeg_log.append(line.rstrip())
# Only print progress lines (frame= indicates encoding progress)
if "frame=" in line:
# Extract key metrics: time, bitrate, and elapsed
time_match = re.search(r'time=(\S+)', line)
bitrate_match = re.search(r'bitrate=(\S+)', line)
elapsed_match = re.search(r'elapsed=(\S+)', line)
time_str = time_match.group(1) if time_match else "00:00:00"
bitrate_str = bitrate_match.group(1) if bitrate_match else "0kbps"
elapsed_str = elapsed_match.group(1) if elapsed_match else "0:00:00"
# Print with carriage return to update same line (no newline, use \r to go back to start)
print(f"\r {time_str} | {bitrate_str} | elapsed={elapsed_str}", end='', flush=True)
print() # Newline after encoding completes
returncode = process.wait()
if returncode != 0:
# Log full FFmpeg output if there was an error
logger.error("FFmpeg output (full):")
for line in ffmpeg_log:
logger.error(line)
raise subprocess.CalledProcessError(returncode, cmd)
orig_size = input_file.stat().st_size
out_size = output_file.stat().st_size
reduction_ratio = out_size / orig_size
# Log comprehensive results
logger.info(f"\n📊 ENCODE RESULTS:")
logger.info(f" Original Size: {orig_size/1e6:.2f} MB")
logger.info(f" Encoded Size: {out_size/1e6:.2f} MB")
logger.info(f" Reduction: {reduction_ratio:.1%} of original ({(1-reduction_ratio):.1%} saved)")
logger.info(f" Resolution: {src_width}x{src_height}{scale_width}x{scale_height}")
logger.info(f" Audio Streams: {len(streams)} streams processed")
msg = f"📦 Original: {orig_size/1e6:.2f} MB → Encoded: {out_size/1e6:.2f} MB ({reduction_ratio:.1%} of original)"
print(msg)
return orig_size, out_size, reduction_ratio