308 lines
10 KiB
Python
308 lines
10 KiB
Python
"""Core audio extraction logic using ffmpeg"""
|
|
|
|
import subprocess
|
|
import json
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
class AudioExtractor:
|
|
"""Handles audio extraction from video files using ffmpeg"""
|
|
|
|
# Common video file extensions
|
|
VIDEO_EXTENSIONS = {
|
|
".mp4", ".mkv", ".mov", ".avi", ".flv", ".wmv", ".webm",
|
|
".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".mts"
|
|
}
|
|
|
|
def __init__(self):
|
|
self._verify_ffmpeg_installed()
|
|
|
|
def _verify_ffmpeg_installed(self) -> None:
|
|
"""Verify that ffmpeg is installed and accessible"""
|
|
try:
|
|
subprocess.run(
|
|
["ffmpeg", "-version"],
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
raise RuntimeError(
|
|
"ffmpeg is not installed or not found in PATH. "
|
|
"Please install ffmpeg to use this tool."
|
|
)
|
|
|
|
def find_video_files(self, folder: Path) -> List[Path]:
|
|
"""
|
|
Find all video files in a folder.
|
|
|
|
Args:
|
|
folder: Path to folder to search
|
|
|
|
Returns:
|
|
List of video file paths
|
|
"""
|
|
video_files = []
|
|
for ext in self.VIDEO_EXTENSIONS:
|
|
video_files.extend(folder.glob(f"*{ext}"))
|
|
video_files.extend(folder.glob(f"*{ext.upper()}"))
|
|
return sorted(set(video_files)) # Remove duplicates and sort
|
|
|
|
def copy_file(self, source: Path, destination: Path) -> None:
|
|
"""
|
|
Copy a file from source to destination with verification.
|
|
|
|
Args:
|
|
source: Source file path
|
|
destination: Destination file path
|
|
"""
|
|
shutil.copy2(source, destination)
|
|
# Verify the copy completed
|
|
if not destination.exists():
|
|
raise RuntimeError(f"File copy failed: {destination} was not created")
|
|
|
|
source_size = source.stat().st_size
|
|
dest_size = destination.stat().st_size
|
|
if source_size != dest_size:
|
|
raise RuntimeError(
|
|
f"File copy incomplete: source {source_size} bytes != destination {dest_size} bytes"
|
|
)
|
|
|
|
def get_stream_info(self, video_file: Path) -> Dict[str, Any]:
|
|
"""
|
|
Get stream information from video file using ffprobe.
|
|
|
|
Args:
|
|
video_file: Path to video file
|
|
|
|
Returns:
|
|
Dictionary containing stream information
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "stream=index,codec_type,codec_name",
|
|
"-of", "json",
|
|
str(video_file)
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
return json.loads(result.stdout)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Failed to get stream info: {e.stderr}")
|
|
except json.JSONDecodeError:
|
|
raise RuntimeError("Failed to parse ffprobe output")
|
|
|
|
def extract_audio_from_file(self, video_file: Path, output_folder: Path) -> None:
|
|
"""
|
|
Extract all audio tracks from a video file.
|
|
|
|
Args:
|
|
video_file: Path to video file
|
|
output_folder: Path to output folder
|
|
"""
|
|
if not video_file.exists():
|
|
raise FileNotFoundError(f"Video file not found: {video_file}")
|
|
|
|
# Get stream information
|
|
try:
|
|
stream_info = self.get_stream_info(video_file)
|
|
except RuntimeError as e:
|
|
raise RuntimeError(f"Could not analyze {video_file.name}: {e}")
|
|
|
|
# Find audio streams
|
|
audio_streams = [
|
|
stream for stream in stream_info.get("streams", [])
|
|
if stream.get("codec_type") == "audio"
|
|
]
|
|
|
|
if not audio_streams:
|
|
print(f" No audio streams found in {video_file.name}")
|
|
return
|
|
|
|
# Extract each audio stream
|
|
file_stem = video_file.stem
|
|
for audio_index, stream in enumerate(audio_streams):
|
|
codec_name = stream.get("codec_name", "aac")
|
|
|
|
# Determine output file extension based on codec
|
|
output_ext = self._get_audio_extension(codec_name)
|
|
|
|
# Handle multiple audio tracks
|
|
if len(audio_streams) > 1:
|
|
output_filename = f"{file_stem}_audio_{audio_index}.{output_ext}"
|
|
else:
|
|
output_filename = f"{file_stem}.{output_ext}"
|
|
|
|
output_path = output_folder / output_filename
|
|
|
|
self._extract_stream(video_file, output_path, audio_index)
|
|
|
|
def _get_audio_extension(self, codec_name: str) -> str:
|
|
"""
|
|
Get file extension based on audio codec.
|
|
|
|
Args:
|
|
codec_name: FFmpeg codec name
|
|
|
|
Returns:
|
|
File extension (without dot)
|
|
"""
|
|
extension_map = {
|
|
"aac": "aac",
|
|
"mp3": "mp3",
|
|
"libmp3lame": "mp3",
|
|
"flac": "flac",
|
|
"opus": "opus",
|
|
"vorbis": "ogg",
|
|
"libvorbis": "ogg",
|
|
"ac3": "ac3",
|
|
"eac3": "ec3",
|
|
"dts": "dts",
|
|
"truehd": "thd",
|
|
"alac": "m4a",
|
|
"pcm_s16le": "wav",
|
|
"pcm_s24le": "wav",
|
|
"pcm_s32le": "wav",
|
|
}
|
|
return extension_map.get(codec_name, "aac")
|
|
|
|
def _extract_stream(self, video_file: Path, output_path: Path, stream_index: int) -> None:
|
|
"""
|
|
Extract a single audio stream using ffmpeg.
|
|
|
|
Args:
|
|
video_file: Path to input video file
|
|
output_path: Path to output audio file
|
|
stream_index: Index of the audio stream to extract (0, 1, 2... for audio streams only)
|
|
"""
|
|
try:
|
|
# Use ffmpeg to copy the audio codec without re-encoding
|
|
# This preserves the original bitrate and codec
|
|
cmd = [
|
|
"ffmpeg", "-i", str(video_file),
|
|
"-map", f"0:a:{stream_index}",
|
|
"-c", "copy", # Copy codec without re-encoding
|
|
"-y", # Overwrite output file
|
|
str(output_path)
|
|
]
|
|
|
|
subprocess.run(cmd, capture_output=True, check=True)
|
|
print(f" ✓ Extracted: {output_path.name}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(
|
|
f"Failed to extract audio stream {stream_index}: {e.stderr.decode() if e.stderr else 'Unknown error'}"
|
|
)
|
|
|
|
# Audio file extensions for finding audio files
|
|
AUDIO_EXTENSIONS = {
|
|
".aac", ".mp3", ".flac", ".opus", ".ogg", ".ac3", ".ec3",
|
|
".dts", ".thd", ".m4a", ".wav", ".wma", ".ape", ".alac"
|
|
}
|
|
|
|
def find_audio_files(self, folder: Path) -> List[Path]:
|
|
"""
|
|
Find all audio files in a folder.
|
|
|
|
Args:
|
|
folder: Path to folder to search
|
|
|
|
Returns:
|
|
List of audio file paths
|
|
"""
|
|
audio_files = []
|
|
for ext in self.AUDIO_EXTENSIONS:
|
|
audio_files.extend(folder.glob(f"*{ext}"))
|
|
audio_files.extend(folder.glob(f"*{ext.upper()}"))
|
|
return sorted(set(audio_files)) # Remove duplicates and sort
|
|
|
|
def find_matching_video(self, audio_file: Path, video_folder: Path) -> Path:
|
|
"""
|
|
Find a video file matching the audio file's base name.
|
|
|
|
Args:
|
|
audio_file: Path to audio file
|
|
video_folder: Path to folder containing video files
|
|
|
|
Returns:
|
|
Path to matching video file
|
|
|
|
Raises:
|
|
FileNotFoundError: If no matching video file is found
|
|
"""
|
|
audio_stem = audio_file.stem
|
|
# Try to find a matching video file by base name
|
|
for video_ext in self.VIDEO_EXTENSIONS:
|
|
video_path = video_folder / f"{audio_stem}{video_ext}"
|
|
if video_path.exists():
|
|
return video_path
|
|
# Try uppercase extension
|
|
video_path = video_folder / f"{audio_stem}{video_ext.upper()}"
|
|
if video_path.exists():
|
|
return video_path
|
|
|
|
raise FileNotFoundError(
|
|
f"No matching video found for audio file: {audio_file.name}"
|
|
)
|
|
|
|
def add_audio_to_video(self, video_file: Path, audio_file: Path, output_file: Path,
|
|
track_title: str = None) -> None:
|
|
"""
|
|
Add an audio track to a video file.
|
|
|
|
Args:
|
|
video_file: Path to input video file
|
|
audio_file: Path to audio file to add
|
|
output_file: Path to output video file
|
|
track_title: Title/name for the audio track (optional)
|
|
"""
|
|
if not video_file.exists():
|
|
raise FileNotFoundError(f"Video file not found: {video_file}")
|
|
if not audio_file.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {audio_file}")
|
|
|
|
try:
|
|
# Determine the index of the newly added audio stream by counting existing streams
|
|
try:
|
|
stream_info = self.get_stream_info(video_file)
|
|
audio_streams = [s for s in stream_info.get("streams", []) if s.get("codec_type") == "audio"]
|
|
new_audio_index = len(audio_streams)
|
|
except:
|
|
# If we can't get stream info, assume it's the first audio (index 0)
|
|
new_audio_index = 0
|
|
|
|
# Build ffmpeg command to add audio track
|
|
cmd = [
|
|
"ffmpeg", "-i", str(video_file),
|
|
"-i", str(audio_file),
|
|
"-c:v", "copy", # Copy video codec
|
|
"-c:a", "copy", # Copy audio codec
|
|
"-map", "0", # Include all streams from video
|
|
"-map", "1:a", # Add audio from audio file
|
|
"-y" # Overwrite output
|
|
]
|
|
|
|
# Add metadata for track title if provided
|
|
# Only apply to the newly added audio stream
|
|
if track_title:
|
|
cmd.extend([
|
|
f"-metadata:s:a:{new_audio_index}", f"title={track_title}"
|
|
])
|
|
|
|
cmd.append(str(output_file))
|
|
|
|
subprocess.run(cmd, capture_output=True, check=True)
|
|
print(f" ✓ Added audio track from: {audio_file.name}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(
|
|
f"Failed to add audio track: {e.stderr.decode() if e.stderr else 'Unknown error'}"
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error during audio addition: {e}")
|