2026-04-22 21:52:09 -04:00

308 lines
10 KiB
Python

"""Core audio extraction logic using ffmpeg"""
import subprocess
import json
import shutil
from pathlib import Path
from typing import List, Dict, Any
class AudioExtractor:
"""Handles audio extraction from video files using ffmpeg"""
# Common video file extensions
VIDEO_EXTENSIONS = {
".mp4", ".mkv", ".mov", ".avi", ".flv", ".wmv", ".webm",
".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".mts"
}
def __init__(self):
self._verify_ffmpeg_installed()
def _verify_ffmpeg_installed(self) -> None:
"""Verify that ffmpeg is installed and accessible"""
try:
subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
check=True
)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError(
"ffmpeg is not installed or not found in PATH. "
"Please install ffmpeg to use this tool."
)
def find_video_files(self, folder: Path) -> List[Path]:
"""
Find all video files in a folder.
Args:
folder: Path to folder to search
Returns:
List of video file paths
"""
video_files = []
for ext in self.VIDEO_EXTENSIONS:
video_files.extend(folder.glob(f"*{ext}"))
video_files.extend(folder.glob(f"*{ext.upper()}"))
return sorted(set(video_files)) # Remove duplicates and sort
def copy_file(self, source: Path, destination: Path) -> None:
"""
Copy a file from source to destination with verification.
Args:
source: Source file path
destination: Destination file path
"""
shutil.copy2(source, destination)
# Verify the copy completed
if not destination.exists():
raise RuntimeError(f"File copy failed: {destination} was not created")
source_size = source.stat().st_size
dest_size = destination.stat().st_size
if source_size != dest_size:
raise RuntimeError(
f"File copy incomplete: source {source_size} bytes != destination {dest_size} bytes"
)
def get_stream_info(self, video_file: Path) -> Dict[str, Any]:
"""
Get stream information from video file using ffprobe.
Args:
video_file: Path to video file
Returns:
Dictionary containing stream information
"""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "stream=index,codec_type,codec_name",
"-of", "json",
str(video_file)
],
capture_output=True,
text=True,
check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to get stream info: {e.stderr}")
except json.JSONDecodeError:
raise RuntimeError("Failed to parse ffprobe output")
def extract_audio_from_file(self, video_file: Path, output_folder: Path) -> None:
"""
Extract all audio tracks from a video file.
Args:
video_file: Path to video file
output_folder: Path to output folder
"""
if not video_file.exists():
raise FileNotFoundError(f"Video file not found: {video_file}")
# Get stream information
try:
stream_info = self.get_stream_info(video_file)
except RuntimeError as e:
raise RuntimeError(f"Could not analyze {video_file.name}: {e}")
# Find audio streams
audio_streams = [
stream for stream in stream_info.get("streams", [])
if stream.get("codec_type") == "audio"
]
if not audio_streams:
print(f" No audio streams found in {video_file.name}")
return
# Extract each audio stream
file_stem = video_file.stem
for audio_index, stream in enumerate(audio_streams):
codec_name = stream.get("codec_name", "aac")
# Determine output file extension based on codec
output_ext = self._get_audio_extension(codec_name)
# Handle multiple audio tracks
if len(audio_streams) > 1:
output_filename = f"{file_stem}_audio_{audio_index}.{output_ext}"
else:
output_filename = f"{file_stem}.{output_ext}"
output_path = output_folder / output_filename
self._extract_stream(video_file, output_path, audio_index)
def _get_audio_extension(self, codec_name: str) -> str:
"""
Get file extension based on audio codec.
Args:
codec_name: FFmpeg codec name
Returns:
File extension (without dot)
"""
extension_map = {
"aac": "aac",
"mp3": "mp3",
"libmp3lame": "mp3",
"flac": "flac",
"opus": "opus",
"vorbis": "ogg",
"libvorbis": "ogg",
"ac3": "ac3",
"eac3": "ec3",
"dts": "dts",
"truehd": "thd",
"alac": "m4a",
"pcm_s16le": "wav",
"pcm_s24le": "wav",
"pcm_s32le": "wav",
}
return extension_map.get(codec_name, "aac")
def _extract_stream(self, video_file: Path, output_path: Path, stream_index: int) -> None:
"""
Extract a single audio stream using ffmpeg.
Args:
video_file: Path to input video file
output_path: Path to output audio file
stream_index: Index of the audio stream to extract (0, 1, 2... for audio streams only)
"""
try:
# Use ffmpeg to copy the audio codec without re-encoding
# This preserves the original bitrate and codec
cmd = [
"ffmpeg", "-i", str(video_file),
"-map", f"0:a:{stream_index}",
"-c", "copy", # Copy codec without re-encoding
"-y", # Overwrite output file
str(output_path)
]
subprocess.run(cmd, capture_output=True, check=True)
print(f" ✓ Extracted: {output_path.name}")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to extract audio stream {stream_index}: {e.stderr.decode() if e.stderr else 'Unknown error'}"
)
# Audio file extensions for finding audio files
AUDIO_EXTENSIONS = {
".aac", ".mp3", ".flac", ".opus", ".ogg", ".ac3", ".ec3",
".dts", ".thd", ".m4a", ".wav", ".wma", ".ape", ".alac"
}
def find_audio_files(self, folder: Path) -> List[Path]:
"""
Find all audio files in a folder.
Args:
folder: Path to folder to search
Returns:
List of audio file paths
"""
audio_files = []
for ext in self.AUDIO_EXTENSIONS:
audio_files.extend(folder.glob(f"*{ext}"))
audio_files.extend(folder.glob(f"*{ext.upper()}"))
return sorted(set(audio_files)) # Remove duplicates and sort
def find_matching_video(self, audio_file: Path, video_folder: Path) -> Path:
"""
Find a video file matching the audio file's base name.
Args:
audio_file: Path to audio file
video_folder: Path to folder containing video files
Returns:
Path to matching video file
Raises:
FileNotFoundError: If no matching video file is found
"""
audio_stem = audio_file.stem
# Try to find a matching video file by base name
for video_ext in self.VIDEO_EXTENSIONS:
video_path = video_folder / f"{audio_stem}{video_ext}"
if video_path.exists():
return video_path
# Try uppercase extension
video_path = video_folder / f"{audio_stem}{video_ext.upper()}"
if video_path.exists():
return video_path
raise FileNotFoundError(
f"No matching video found for audio file: {audio_file.name}"
)
def add_audio_to_video(self, video_file: Path, audio_file: Path, output_file: Path,
track_title: str = None) -> None:
"""
Add an audio track to a video file.
Args:
video_file: Path to input video file
audio_file: Path to audio file to add
output_file: Path to output video file
track_title: Title/name for the audio track (optional)
"""
if not video_file.exists():
raise FileNotFoundError(f"Video file not found: {video_file}")
if not audio_file.exists():
raise FileNotFoundError(f"Audio file not found: {audio_file}")
try:
# Determine the index of the newly added audio stream by counting existing streams
try:
stream_info = self.get_stream_info(video_file)
audio_streams = [s for s in stream_info.get("streams", []) if s.get("codec_type") == "audio"]
new_audio_index = len(audio_streams)
except:
# If we can't get stream info, assume it's the first audio (index 0)
new_audio_index = 0
# Build ffmpeg command to add audio track
cmd = [
"ffmpeg", "-i", str(video_file),
"-i", str(audio_file),
"-c:v", "copy", # Copy video codec
"-c:a", "copy", # Copy audio codec
"-map", "0", # Include all streams from video
"-map", "1:a", # Add audio from audio file
"-y" # Overwrite output
]
# Add metadata for track title if provided
# Only apply to the newly added audio stream
if track_title:
cmd.extend([
f"-metadata:s:a:{new_audio_index}", f"title={track_title}"
])
cmd.append(str(output_file))
subprocess.run(cmd, capture_output=True, check=True)
print(f" ✓ Added audio track from: {audio_file.name}")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to add audio track: {e.stderr.decode() if e.stderr else 'Unknown error'}"
)
except Exception as e:
raise RuntimeError(f"Error during audio addition: {e}")