2026-04-22 21:01:43 -04:00

181 lines
5.8 KiB
Python

"""Core audio extraction logic using ffmpeg"""
import subprocess
import json
from pathlib import Path
from typing import List, Dict, Any
class AudioExtractor:
"""Handles audio extraction from video files using ffmpeg"""
# Common video file extensions
VIDEO_EXTENSIONS = {
".mp4", ".mkv", ".mov", ".avi", ".flv", ".wmv", ".webm",
".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".mts"
}
def __init__(self):
self._verify_ffmpeg_installed()
def _verify_ffmpeg_installed(self) -> None:
"""Verify that ffmpeg is installed and accessible"""
try:
subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
check=True
)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError(
"ffmpeg is not installed or not found in PATH. "
"Please install ffmpeg to use this tool."
)
def find_video_files(self, folder: Path) -> List[Path]:
"""
Find all video files in a folder.
Args:
folder: Path to folder to search
Returns:
List of video file paths
"""
video_files = []
for ext in self.VIDEO_EXTENSIONS:
video_files.extend(folder.glob(f"*{ext}"))
video_files.extend(folder.glob(f"*{ext.upper()}"))
return sorted(set(video_files)) # Remove duplicates and sort
def get_stream_info(self, video_file: Path) -> Dict[str, Any]:
"""
Get stream information from video file using ffprobe.
Args:
video_file: Path to video file
Returns:
Dictionary containing stream information
"""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "stream=index,codec_type,codec_name",
"-of", "json",
str(video_file)
],
capture_output=True,
text=True,
check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to get stream info: {e.stderr}")
except json.JSONDecodeError:
raise RuntimeError("Failed to parse ffprobe output")
def extract_audio_from_file(self, video_file: Path, output_folder: Path) -> None:
"""
Extract all audio tracks from a video file.
Args:
video_file: Path to video file
output_folder: Path to output folder
"""
if not video_file.exists():
raise FileNotFoundError(f"Video file not found: {video_file}")
# Get stream information
try:
stream_info = self.get_stream_info(video_file)
except RuntimeError as e:
raise RuntimeError(f"Could not analyze {video_file.name}: {e}")
# Find audio streams
audio_streams = [
stream for stream in stream_info.get("streams", [])
if stream.get("codec_type") == "audio"
]
if not audio_streams:
print(f" No audio streams found in {video_file.name}")
return
# Extract each audio stream
file_stem = video_file.stem
for stream in audio_streams:
stream_index = stream.get("index")
codec_name = stream.get("codec_name", "aac")
# Determine output file extension based on codec
output_ext = self._get_audio_extension(codec_name)
# Handle multiple audio tracks
if len(audio_streams) > 1:
output_filename = f"{file_stem}_audio_{stream_index}.{output_ext}"
else:
output_filename = f"{file_stem}.{output_ext}"
output_path = output_folder / output_filename
self._extract_stream(video_file, output_path, stream_index)
def _get_audio_extension(self, codec_name: str) -> str:
"""
Get file extension based on audio codec.
Args:
codec_name: FFmpeg codec name
Returns:
File extension (without dot)
"""
extension_map = {
"aac": "aac",
"mp3": "mp3",
"libmp3lame": "mp3",
"flac": "flac",
"opus": "opus",
"vorbis": "ogg",
"libvorbis": "ogg",
"ac3": "ac3",
"eac3": "ec3",
"dts": "dts",
"truehd": "thd",
"alac": "m4a",
"pcm_s16le": "wav",
"pcm_s24le": "wav",
"pcm_s32le": "wav",
}
return extension_map.get(codec_name, "aac")
def _extract_stream(self, video_file: Path, output_path: Path, stream_index: int) -> None:
"""
Extract a single audio stream using ffmpeg.
Args:
video_file: Path to input video file
output_path: Path to output audio file
stream_index: Index of the audio stream to extract
"""
try:
# Use ffmpeg to copy the audio codec without re-encoding
# This preserves the original bitrate and codec
cmd = [
"ffmpeg", "-i", str(video_file),
"-map", f"0:a:{stream_index}",
"-c", "copy", # Copy codec without re-encoding
"-y", # Overwrite output file
str(output_path)
]
subprocess.run(cmd, capture_output=True, check=True)
print(f" ✓ Extracted: {output_path.name}")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to extract audio stream {stream_index}: {e.stderr.decode() if e.stderr else 'Unknown error'}"
)