# core/ffmpeg_helper.py import json import subprocess from pathlib import Path from typing import Tuple from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") # ============================= # STREAM ANALYSIS # ============================= def get_audio_streams(input_file: Path): """Return a list of (index, channels, bitrate_kbps, lang)""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index,channels,bit_rate,tags=language", "-of", "json", str(input_file) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout or "{}") streams = [] for s in data.get("streams", []): index = s["index"] channels = s.get("channels", 2) bitrate = int(int(s.get("bit_rate", 128000)) / 1000) lang = s.get("tags", {}).get("language", "und") streams.append((index, channels, bitrate, lang)) return streams # ============================= # AUDIO DECISION LOGIC # ============================= def choose_audio_settings(channels: int, bitrate_kbps: int, audio_config: dict) -> Tuple[str, int]: """ Return (codec, target_bitrate) Rules: - If 128 kbps or lower → use Opus - Otherwise → use AAC - Use audio_config to bucket bitrates. """ if channels == 2: if bitrate_kbps <= 80: target_br = audio_config["stereo"]["low"] elif bitrate_kbps <= 112: target_br = audio_config["stereo"]["medium"] else: target_br = audio_config["stereo"]["high"] else: if bitrate_kbps <= 176: target_br = audio_config["multi_channel"]["low"] else: target_br = audio_config["multi_channel"]["high"] # Opus threshold: <=128 kbps threshold = audio_config.get("use_opus_below_kbps", 128) codec = "libopus" if target_br <= threshold * 1000 else "aac" return codec, target_br # ============================= # FFMPEG COMMAND BUILDER # ============================= def build_ffmpeg_command(input_file: Path, output_file: Path, cq: int, width: int, height: int, filter_flags: str, audio_config: dict): """Builds FFmpeg command with smart audio logic.""" streams = get_audio_streams(input_file) logger.info(f"🎛 Detected {len(streams)} audio stream(s). Building command...") cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-vf", f"scale={width}:{height}:flags={filter_flags}:force_original_aspect_ratio=decrease", "-map", "0:v", "-map", "0:a", "-map", "0:s?", "-c:v", "av1_nvenc", "-preset", "p1", "-cq", str(cq), "-pix_fmt", "p010le" ] for i, (index, channels, bitrate, lang) in enumerate(streams): codec, br = choose_audio_settings(channels, bitrate, audio_config) cmd += [ f"-c:a:{i}", codec, f"-b:a:{i}", str(br), f"-ac:{i}", str(channels), f"-metadata:s:a:{i}", f"language={lang}" ] cmd += ["-c:s", "copy", str(output_file)] return cmd, streams # ============================= # ENCODE RUNNER # ============================= def run_encode(input_file: Path, output_file: Path, cq: int, width: int, height: int, filter_flags: str, audio_config: dict): """Handles encode, fallback logic, and returns size stats.""" cmd, streams = build_ffmpeg_command(input_file, output_file, cq, width, height, filter_flags, audio_config) logger.info(f"🎬 Running FFmpeg CQ encode → {output_file.name}") subprocess.run(cmd, check=True) # Size check orig_size = input_file.stat().st_size out_size = output_file.stat().st_size ratio = out_size / orig_size logger.info(f"📦 Size: {orig_size/1e6:.2f}MB → {out_size/1e6:.2f}MB ({ratio:.1%})") # Fallback logic if ratio >= 0.5: logger.warning(f"⚠️ Reduction too low ({ratio:.0%}), retrying with bitrate mode...") output_file.unlink(missing_ok=True) vb, maxrate, bufsize = ( ("1500k", "1750k", "2250k") if height >= 1080 else ("900k", "1250k", "1600k") ) cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-vf", f"scale={width}:{height}:flags={filter_flags}:force_original_aspect_ratio=decrease", "-map", "0:v", "-map", "0:a", "-map", "0:s?", "-c:v", "av1_nvenc", "-preset", "p1", "-b:v", vb, "-maxrate", maxrate, "-bufsize", bufsize, "-pix_fmt", "p010le" ] for i, (index, channels, bitrate, lang) in enumerate(streams): codec, br = choose_audio_settings(channels, bitrate, audio_config) cmd += [ f"-c:a:{i}", codec, f"-b:a:{i}", str(br), f"-ac:{i}", str(channels), f"-metadata:s:a:{i}", f"language={lang}" ] cmd += ["-c:s", "copy", str(output_file)] subprocess.run(cmd, check=True) return orig_size, out_size