234 lines
9.6 KiB
Python
234 lines
9.6 KiB
Python
# core/video_handler.py
|
|
"""Video resolution detection and encoding logic."""
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from core.logger_helper import setup_logger
|
|
|
|
logger = setup_logger(Path(__file__).parent.parent / "logs")
|
|
|
|
|
|
def get_source_resolution(input_file: Path) -> tuple:
|
|
"""
|
|
Get source video resolution (width, height).
|
|
Returns tuple: (width, height)
|
|
Skips attached pictures and cover art.
|
|
"""
|
|
try:
|
|
# First, get all video streams and their disposition to find the first non-attached pic
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-select_streams", "v",
|
|
"-show_entries", "stream=width,height,disposition",
|
|
"-of", "default=noprint_wrappers=1",
|
|
str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
|
|
if result.stdout:
|
|
lines = result.stdout.strip().split("\n")
|
|
# Parse the output to find a non-attached picture video stream
|
|
width = None
|
|
height = None
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
if line.startswith("width="):
|
|
width_val = int(line.split("=")[1]) if "=" in line else None
|
|
# Look ahead for height and disposition
|
|
height_val = None
|
|
is_attached_pic = False
|
|
|
|
if i + 1 < len(lines):
|
|
next_line = lines[i + 1].strip()
|
|
if next_line.startswith("height="):
|
|
height_val = int(next_line.split("=")[1]) if "=" in next_line else None
|
|
if i + 2 < len(lines):
|
|
disp_line = lines[i + 2].strip()
|
|
if disp_line.startswith("disposition="):
|
|
# Check if attached_pic flag is set to 1
|
|
if "attached_pic=1" in disp_line:
|
|
is_attached_pic = True
|
|
|
|
# If this is a real video stream (not attached pic) and has valid dimensions, use it
|
|
if width_val and height_val and not is_attached_pic:
|
|
width = width_val
|
|
height = height_val
|
|
return (width, height)
|
|
|
|
i += 1
|
|
|
|
# Fallback: if no valid stream found, try simple v:0 selection
|
|
if not width or not height:
|
|
logger.debug("No non-attached-pic video stream found, trying fallback method")
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-select_streams", "v:0",
|
|
"-show_entries", "stream=width,height",
|
|
"-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1",
|
|
str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
if result.stdout:
|
|
lines = result.stdout.strip().split("\n")
|
|
width = int(lines[0]) if len(lines) > 0 and lines[0].strip() else 1920
|
|
height = int(lines[1]) if len(lines) > 1 and lines[1].strip() else 1080
|
|
return (width, height)
|
|
|
|
logger.warning(f"ffprobe returned no output for {input_file.name}. Defaulting to 1920x1080")
|
|
return (1920, 1080)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to detect source resolution: {e}. Defaulting to 1920x1080")
|
|
return (1920, 1080)
|
|
|
|
|
|
def get_source_bit_depth(input_file: Path) -> int:
|
|
"""
|
|
Detect source video bit depth (8, 10, or 12).
|
|
Returns: 12, 10, or 8 (default)
|
|
Skips attached pictures and cover art.
|
|
"""
|
|
try:
|
|
# Get all video streams with pixel format and disposition
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-select_streams", "v",
|
|
"-show_entries", "stream=pix_fmt,disposition",
|
|
"-of", "default=noprint_wrappers=1",
|
|
str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
|
|
if result.stdout:
|
|
lines = result.stdout.strip().split("\n")
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
if line.startswith("pix_fmt="):
|
|
pix_fmt = line.split("=")[1] if "=" in line else None
|
|
# Check next line for disposition
|
|
is_attached_pic = False
|
|
if i + 1 < len(lines):
|
|
disp_line = lines[i + 1].strip()
|
|
if disp_line.startswith("disposition="):
|
|
if "attached_pic=1" in disp_line:
|
|
is_attached_pic = True
|
|
|
|
# If not attached pic, analyze the pixel format
|
|
if pix_fmt and not is_attached_pic:
|
|
pix_fmt_lower = pix_fmt.lower()
|
|
# Check for 12-bit indicators first
|
|
if any(x in pix_fmt_lower for x in ["12le", "12be"]):
|
|
return 12
|
|
# Check for 10-bit indicators
|
|
elif any(x in pix_fmt_lower for x in ["10le", "10be", "p010", "yuv420p10"]):
|
|
return 10
|
|
else:
|
|
return 8
|
|
|
|
i += 1
|
|
|
|
# Fallback to simple method if no streams found
|
|
logger.debug(f"Could not detect bit depth for {input_file.name}. Defaulting to 8-bit")
|
|
return 8
|
|
except Exception as e:
|
|
logger.warning(f"Failed to detect source bit depth: {e}. Defaulting to 8-bit")
|
|
return 8
|
|
|
|
|
|
def determine_target_resolution(src_width: int, src_height: int, explicit_resolution: str = None) -> tuple:
|
|
"""
|
|
Determine target resolution based on source and explicit override.
|
|
|
|
Returns tuple: (res_width, res_height, target_resolution_label)
|
|
|
|
Logic:
|
|
If explicit_resolution specified: use it as a MAXIMUM (downscale only, never upscale)
|
|
- If source > max: scale down to max
|
|
- If source <= max: preserve source resolution
|
|
Else:
|
|
- If source > 1080p: scale to 1080p
|
|
- If source <= 1080p: preserve source resolution
|
|
"""
|
|
if explicit_resolution:
|
|
# User explicitly specified resolution as a maximum threshold
|
|
max_height = int(explicit_resolution)
|
|
|
|
if src_height > max_height:
|
|
# Source is larger than max - downscale to max
|
|
if max_height == 1080:
|
|
return (1920, 1080, "1080")
|
|
elif max_height == 720:
|
|
return (1280, 720, "720")
|
|
else: # 480
|
|
return (854, 480, "480")
|
|
else:
|
|
# Source is <= max - preserve source resolution (no upscaling)
|
|
if src_height <= 720:
|
|
return (src_width, src_height, "720")
|
|
else:
|
|
return (src_width, src_height, "1080")
|
|
else:
|
|
# No explicit resolution - use smart defaults
|
|
if src_height > 1080:
|
|
# Scale down anything above 1080p to 1080p
|
|
return (1920, 1080, "1080")
|
|
else:
|
|
# Preserve source resolution (480p, 720p, 1080p, etc.)
|
|
if src_height <= 720:
|
|
return (src_width, src_height, "720")
|
|
else:
|
|
return (src_width, src_height, "1080")
|
|
|
|
|
|
def has_forced_subtitles(input_file: Path) -> bool:
|
|
"""
|
|
Check if the input file has any subtitles with the forced flag set.
|
|
Returns True if at least one subtitle stream has forced=1 disposition.
|
|
"""
|
|
try:
|
|
import json
|
|
|
|
# Method 1: Try JSON output (most reliable)
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-select_streams", "s",
|
|
"-show_entries", "stream=disposition",
|
|
"-of", "json",
|
|
str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
|
|
if result.stdout:
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
for stream in data.get("streams", []):
|
|
disposition = stream.get("disposition", {})
|
|
if isinstance(disposition, dict) and disposition.get("forced") == 1:
|
|
logger.debug(f"Found forced subtitle stream in {input_file.name}")
|
|
return True
|
|
except json.JSONDecodeError:
|
|
logger.debug(f"Failed to parse JSON from ffprobe for {input_file.name}, trying fallback method")
|
|
|
|
# Method 2: Fallback to text search for "forced=1" or "(forced)"
|
|
cmd = [
|
|
"ffprobe", "-v", "info",
|
|
"-select_streams", "s",
|
|
str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
|
|
if result.stderr:
|
|
# Look for "(forced)" in the human-readable ffprobe output
|
|
if "(forced)" in result.stderr:
|
|
logger.debug(f"Found (forced) in ffprobe output for {input_file.name}")
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"Failed to check forced subtitles for {input_file.name}: {e}")
|
|
return False
|