# core/ffmpeg_helper.py import json import os import subprocess import tempfile from pathlib import Path from typing import Tuple from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") # ============================= # ROBUST BITRATE CALCULATION # ============================= def calculate_stream_bitrate(input_file: Path, stream_index: int) -> int: """ Extract audio stream to temporary file using -c copy, calculate bitrate from file size and duration. Returns bitrate in kbps. Formula: bitrate_kbps = (file_size_bytes * 8) / duration_seconds / 1000 """ temp_fd, temp_audio_path = tempfile.mkstemp(suffix=".aac", dir=None) os.close(temp_fd) try: # Step 1: Extract audio stream with -c copy (lossless extraction) extract_cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-map", f"0:a:{stream_index}", "-c", "copy", temp_audio_path ] logger.debug(f"Extracting audio stream {stream_index} to temporary file...") subprocess.run(extract_cmd, capture_output=True, text=True, check=True) # Step 2: Get duration using ffprobe duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1", temp_audio_path ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, check=True) duration_seconds = float(duration_result.stdout.strip()) # Step 3: Get file size and calculate bitrate file_size_bytes = os.path.getsize(temp_audio_path) bitrate_kbps = int((file_size_bytes * 8) / duration_seconds / 1000) logger.debug(f"Stream {stream_index}: size={file_size_bytes} bytes, duration={duration_seconds:.2f}s, calculated_bitrate={bitrate_kbps} kbps") return bitrate_kbps except Exception as e: logger.warning(f"Failed to calculate bitrate for stream {stream_index}: {e}. Falling back to metadata.") return 0 finally: # Clean up temporary audio file try: if os.path.exists(temp_audio_path): os.remove(temp_audio_path) logger.debug(f"Deleted temporary audio file: {temp_audio_path}") except Exception as e: logger.warning(f"Could not delete temporary file {temp_audio_path}: {e}") def get_audio_streams(input_file: Path): """Return a list of (index, channels, bitrate_kbps, lang) Uses robust bitrate calculation by extracting each stream and computing bitrate from file size and duration instead of relying on metadata. """ cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index,channels,bit_rate,tags=language", "-of", "json", str(input_file) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout or "{}") streams = [] for i, s in enumerate(data.get("streams", [])): index = s["index"] channels = s.get("channels", 2) lang = s.get("tags", {}).get("language", "und") # Calculate robust bitrate from extraction calculated_bitrate = calculate_stream_bitrate(input_file, i) # Fallback to metadata if calculation fails if calculated_bitrate == 0: bitrate = int(int(s.get("bit_rate", 128000)) / 1000) logger.info(f"Stream {index}: Using metadata bitrate {bitrate} kbps (calculation failed)") else: bitrate = calculated_bitrate logger.info(f"Stream {index}: Using calculated bitrate {bitrate} kbps") streams.append((index, channels, bitrate, lang)) return streams # ============================= # AUDIO DECISION LOGIC # ============================= def choose_audio_settings(channels: int, bitrate_kbps: int, audio_config: dict) -> Tuple[str, int]: """ Return (codec, target_bitrate) Rules: - If 128 kbps or lower → use Opus - Otherwise → use AAC - Use audio_config to bucket bitrates. """ if channels == 2: if bitrate_kbps <= 80: target_br = audio_config["stereo"]["low"] elif bitrate_kbps <= 112: target_br = audio_config["stereo"]["medium"] else: target_br = audio_config["stereo"]["high"] else: if bitrate_kbps <= 176: target_br = audio_config["multi_channel"]["low"] else: target_br = audio_config["multi_channel"]["high"] # Opus threshold: <=128 kbps threshold = audio_config.get("use_opus_below_kbps", 128) codec = "libopus" if target_br <= threshold * 1000 else "aac" return codec, target_br # ============================= # FFMPEG COMMAND BUILDER # ============================= def build_ffmpeg_command(input_file: Path, output_file: Path, cq: int, width: int, height: int, filter_flags: str, audio_config: dict): """Builds FFmpeg command with smart audio logic.""" streams = get_audio_streams(input_file) logger.info(f"🎛 Detected {len(streams)} audio stream(s). Building command...") cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-vf", f"scale={width}:{height}:flags={filter_flags}:force_original_aspect_ratio=decrease", "-map", "0:v", "-map", "0:a", "-map", "0:s?", "-c:v", "av1_nvenc", "-preset", "p1", "-cq", str(cq), "-pix_fmt", "p010le" ] for i, (index, channels, bitrate, lang) in enumerate(streams): codec, br = choose_audio_settings(channels, bitrate, audio_config) cmd += [ f"-c:a:{i}", codec, f"-b:a:{i}", str(br), f"-ac:{i}", str(channels), f"-metadata:s:a:{i}", f"language={lang}" ] cmd += ["-c:s", "copy", str(output_file)] return cmd, streams # ============================= # ENCODE RUNNER # ============================= def run_encode(input_file: Path, output_file: Path, cq: int, width: int, height: int, filter_flags: str, audio_config: dict): """Handles encode, fallback logic, and returns size stats.""" cmd, streams = build_ffmpeg_command(input_file, output_file, cq, width, height, filter_flags, audio_config) logger.info(f"🎬 Running FFmpeg CQ encode → {output_file.name}") subprocess.run(cmd, check=True) # Size check orig_size = input_file.stat().st_size out_size = output_file.stat().st_size ratio = out_size / orig_size logger.info(f"📦 Size: {orig_size/1e6:.2f}MB → {out_size/1e6:.2f}MB ({ratio:.1%})") # Fallback logic if ratio >= 0.5: logger.warning(f"⚠️ Reduction too low ({ratio:.0%}), retrying with bitrate mode...") output_file.unlink(missing_ok=True) vb, maxrate, bufsize = ( ("1500k", "1750k", "2250k") if height >= 1080 else ("900k", "1250k", "1600k") ) cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-vf", f"scale={width}:{height}:flags={filter_flags}:force_original_aspect_ratio=decrease", "-map", "0:v", "-map", "0:a", "-map", "0:s?", "-c:v", "av1_nvenc", "-preset", "p1", "-b:v", vb, "-maxrate", maxrate, "-bufsize", bufsize, "-pix_fmt", "p010le" ] for i, (index, channels, bitrate, lang) in enumerate(streams): codec, br = choose_audio_settings(channels, bitrate, audio_config) cmd += [ f"-c:a:{i}", codec, f"-b:a:{i}", str(br), f"-ac:{i}", str(channels), f"-metadata:s:a:{i}", f"language={lang}" ] cmd += ["-c:s", "copy", str(output_file)] subprocess.run(cmd, check=True) return orig_size, out_size