conversion_project/path_manager/gui_path_manager.py
2026-01-01 15:37:38 -05:00

833 lines
32 KiB
Python

#!/usr/bin/env python3
"""
GUI Path Manager for Batch Video Transcoder
Allows easy browsing of folders and appending to paths.txt with encoding options.
"""
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from pathlib import Path
import os
import subprocess
import re
import json
from core.config_helper import load_config_xml
from core.logger_helper import setup_logger
logger = setup_logger(Path(__file__).parent / "logs")
class PathManagerGUI:
def __init__(self, root):
self.root = root
self.root.title("Batch Transcoder - Path Manager")
self.root.geometry("1100x700")
# Load config
config_path = Path(__file__).parent / "config.xml"
self.config = load_config_xml(config_path)
self.path_mappings = self.config.get("path_mappings", {})
# Paths file
self.paths_file = Path(__file__).parent / "paths.txt"
self.transcode_bat = Path(__file__).parent / "transcode.bat"
# Current selected folder
self.selected_folder = None
self.current_category = None
self.recently_added = None # Track recently added folder for highlighting
self.status_timer = None # Track status message timer
self.added_folders = set() # Folders that are in paths.txt
# Cache for folder data - split per category
self.cache_dir = Path(__file__).parent / ".cache"
self.cache_dir.mkdir(exist_ok=True)
self.folder_cache = {} # Only current category in memory: {folder_path: size}
self.scan_in_progress = False
self.scanned_categories = set() # Track which categories have been scanned
# Lazy loading
self.all_folders = [] # All folders for current category
self.loaded_items = 0 # How many items are currently loaded
self.items_per_batch = 100 # Load 100 items at a time
# Load existing paths
self._load_existing_paths()
# Build UI
self._build_ui()
# Handle window close
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
def _build_ui(self):
"""Build the GUI layout."""
# Top frame for category selection and transcode launcher
top_frame = ttk.Frame(self.root)
top_frame.pack(fill=tk.X, padx=10, pady=10)
left_top = ttk.Frame(top_frame)
left_top.pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Label(left_top, text="Category:").pack(side=tk.LEFT, padx=5)
self.category_var = tk.StringVar(value="tv")
categories = ["tv", "anime", "movies"]
for cat in categories:
ttk.Radiobutton(
left_top, text=cat.upper(), variable=self.category_var,
value=cat, command=self._on_category_change
).pack(side=tk.LEFT, padx=5)
ttk.Button(left_top, text="Refresh", command=self._refresh_with_cache_clear).pack(side=tk.LEFT, padx=5)
# Right side of top frame - transcode launcher
right_top = ttk.Frame(top_frame)
right_top.pack(side=tk.RIGHT)
if self.transcode_bat.exists():
ttk.Button(
right_top, text="▶ Run transcode.bat", command=self._run_transcode
).pack(side=tk.RIGHT, padx=5)
# Main content frame
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Left side - folder tree
left_frame = ttk.LabelFrame(main_frame, text="Folders (sorted by size)")
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
# Treeview for folders with add button column
self.tree = ttk.Treeview(left_frame, columns=("size", "add", "remove"), height=20)
self.tree.column("#0", width=180)
self.tree.column("size", width=80)
self.tree.column("add", width=50)
self.tree.column("remove", width=50)
self.tree.heading("#0", text="Folder Name")
self.tree.heading("size", text="Size")
self.tree.heading("add", text="Add")
self.tree.heading("remove", text="Remove")
scrollbar = ttk.Scrollbar(left_frame, orient=tk.VERTICAL, command=self._on_scrollbar)
self.tree.configure(yscroll=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Configure tags for folder status
self.tree.tag_configure("added", background="#90EE90") # Light green for added
self.tree.tag_configure("not_added", background="white") # White for not added
self.tree.tag_configure("recently_added", background="#FFD700") # Gold for recently added
self.tree.bind("<Double-1>", self._on_folder_expand)
self.tree.bind("<<TreeviewSelect>>", self._on_folder_select)
self.tree.bind("<Button-1>", self._on_tree_click)
# Right side - options and preview
right_frame = ttk.LabelFrame(main_frame, text="Encoding Options & Preview")
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# Mode selection
mode_frame = ttk.LabelFrame(right_frame, text="Mode (--m)")
mode_frame.pack(fill=tk.X, padx=5, pady=5)
self.mode_var = tk.StringVar(value="default")
for mode in ["default", "cq", "bitrate"]:
ttk.Radiobutton(mode_frame, text=mode, variable=self.mode_var, value=mode,
command=self._update_preview).pack(anchor=tk.W, padx=5)
# Resolution selection
res_frame = ttk.LabelFrame(right_frame, text="Resolution (--r)")
res_frame.pack(fill=tk.X, padx=5, pady=5)
self.resolution_var = tk.StringVar(value="none")
for res in ["none", "480", "720", "1080"]:
label = "Auto" if res == "none" else res + "p"
ttk.Radiobutton(res_frame, text=label, variable=self.resolution_var, value=res,
command=self._update_preview).pack(anchor=tk.W, padx=5)
# CQ value
cq_frame = ttk.LabelFrame(right_frame, text="CQ Value (--cq, optional)")
cq_frame.pack(fill=tk.X, padx=5, pady=5)
self.cq_var = tk.StringVar(value="")
cq_entry = ttk.Entry(cq_frame, textvariable=self.cq_var, width=10)
cq_entry.pack(anchor=tk.W, padx=5, pady=3)
cq_entry.bind("<KeyRelease>", lambda e: self._update_preview())
# Preview frame
preview_frame = ttk.LabelFrame(right_frame, text="Command Preview")
preview_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.preview_text = tk.Text(preview_frame, height=8, width=40, wrap=tk.WORD, bg="lightgray")
self.preview_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.preview_text.config(state=tk.DISABLED)
# Bottom frame - action buttons and status
bottom_frame = ttk.Frame(self.root)
bottom_frame.pack(fill=tk.X, padx=10, pady=10)
button_frame = ttk.Frame(bottom_frame)
button_frame.pack(side=tk.LEFT)
ttk.Button(button_frame, text="View paths.txt", command=self._view_paths_file).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Clear paths.txt", command=self._clear_paths_file).pack(side=tk.LEFT, padx=5)
# Status label (for silent feedback)
self.status_label = ttk.Label(bottom_frame, text="", foreground="green")
self.status_label.pack(side=tk.LEFT, padx=10)
# Load cache and populate initial category
self._load_cache()
self._refresh_folders(use_cache=True)
# Only scan once per category on first view
if self.current_category not in self.scanned_categories:
self.root.after(100, self._scan_folders_once)
def _on_category_change(self):
"""Handle category radio button change."""
self.current_category = self.category_var.get()
# Load cache for this category
self._load_cache()
# Show cached data first
self._refresh_folders(use_cache=True)
# Only scan once per category on first view
if self.current_category not in self.scanned_categories:
self.root.after(100, self._scan_folders_once)
def _load_cache(self):
"""Load folder cache for current category from disk (lazy)."""
category = self.category_var.get()
cache_file = self.cache_dir / f".cache_{category}.json"
self.folder_cache.clear()
# Don't fully load cache yet - just verify it exists
if not cache_file.exists():
logger.info(f"No cache file for {category}")
else:
logger.info(f"Cache file exists for {category}")
def _parse_cache_lazily(self, limit=None):
"""Parse cache file lazily and return folders."""
category = self.category_var.get()
cache_file = self.cache_dir / f".cache_{category}.json"
folders = []
if cache_file.exists():
try:
with open(cache_file, "r", encoding="utf-8") as f:
cache_dict = json.load(f)
# Convert to list and sort
for folder_path_str, size in cache_dict.items():
folder_path = Path(folder_path_str)
if folder_path.exists():
folders.append((folder_path.name, folder_path, size))
# Early exit if limit reached
if limit and len(folders) >= limit:
break
# Sort by size descending (only what we loaded)
folders.sort(key=lambda x: x[2], reverse=True)
except Exception as e:
logger.error(f"Failed to parse cache: {e}")
return folders
def _save_cache(self):
"""Save current category's folder cache to disk."""
category = self.category_var.get()
cache_file = self.cache_dir / f".cache_{category}.json"
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(self.folder_cache, f, indent=2)
except Exception as e:
logger.error(f"Failed to save {category} cache: {e}")
def _refresh_with_cache_clear(self):
"""Refresh and clear cache to force full scan."""
category = self.category_var.get()
cache_file = self.cache_dir / f".cache_{category}.json"
# Delete cache file for this category
if cache_file.exists():
cache_file.unlink()
self.folder_cache.clear()
self.scanned_categories.discard(category) # Reset so it will scan again
self._refresh_folders(use_cache=False)
def _scan_folders_once(self):
"""Scan folders once per category on first load."""
if self.scan_in_progress:
return
category = self.category_var.get()
if category in self.scanned_categories:
return # Already scanned this category
self.scan_in_progress = True
try:
category_mapping = {
"tv": "P:\\tv",
"anime": "P:\\anime",
"movies": "P:\\movies"
}
base_key = category_mapping.get(category)
if not base_key or base_key not in self.path_mappings:
return
base_path = Path(base_key)
if not base_path.exists():
return
# Scan folders and update cache
new_cache = {}
for entry in os.scandir(base_path):
if entry.is_dir(follow_symlinks=False):
size = self._get_folder_size(Path(entry))
new_cache[str(Path(entry))] = size
# Update cache and save
self.folder_cache = new_cache
self._save_cache()
self.scanned_categories.add(category)
# Update UI if still on same category
if self.category_var.get() == category:
self._refresh_folders(use_cache=True)
finally:
self.scan_in_progress = False
def _scan_folders_background(self):
"""Scan folders in background and update cache."""
if self.scan_in_progress:
return
self.scan_in_progress = True
try:
category = self.category_var.get()
category_mapping = {
"tv": "P:\\tv",
"anime": "P:\\anime",
"movies": "P:\\movies"
}
base_key = category_mapping.get(category)
if not base_key or base_key not in self.path_mappings:
return
base_path = Path(base_key)
if not base_path.exists():
return
# Scan folders and update cache
new_cache = {}
for entry in os.scandir(base_path):
if entry.is_dir(follow_symlinks=False):
size = self._get_folder_size(Path(entry))
new_cache[str(Path(entry))] = size
# Update cache and save
self.folder_cache[category] = new_cache
self._save_cache()
# Update UI if still on same category
if self.category_var.get() == category:
self._refresh_folders(use_cache=True)
finally:
self.scan_in_progress = False
# Schedule next continuous scan
self.background_scan_timer = self.root.after(
self.background_scan_interval,
self._continuous_background_scan
)
# Schedule next continuous scan
self.background_scan_timer = self.root.after(
self.background_scan_interval,
self._continuous_background_scan
)
def _load_existing_paths(self):
"""Load existing paths from paths.txt and extract folder paths."""
self.added_folders.clear()
if not self.paths_file.exists():
return
try:
with open(self.paths_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
# Extract the path (last argument in the command)
# Format: --m mode --r res --cq val "path" or just "path"
# Find all quoted strings
matches = re.findall(r'"([^"]*)"', line)
if matches:
# Last quoted string is the path
path = matches[-1]
self.added_folders.add(path)
except Exception as e:
logger.error(f"Failed to load existing paths: {e}")
def _get_folder_size(self, path: Path) -> int:
"""Calculate total size of folder in bytes."""
total = 0
try:
for entry in os.scandir(path):
if entry.is_file(follow_symlinks=False):
total += entry.stat().st_size
elif entry.is_dir(follow_symlinks=False):
total += self._get_folder_size(Path(entry))
except PermissionError:
pass
return total
def _format_size(self, bytes_size: int) -> str:
"""Format bytes to human readable size."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_size < 1024:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024
return f"{bytes_size:.1f} PB"
def _refresh_folders(self, use_cache=False):
"""Refresh the folder tree from cache or disk."""
# Clear existing items
for item in self.tree.get_children():
self.tree.delete(item)
self.all_folders = []
self.loaded_items = 0
category = self.category_var.get()
# Map category to path mapping key
category_mapping = {
"tv": "P:\\tv",
"anime": "P:\\anime",
"movies": "P:\\movies"
}
base_key = category_mapping.get(category)
if not base_key or base_key not in self.path_mappings:
messagebox.showwarning("Info", f"No mapping found for {category}")
return
base_path = Path(base_key)
# Check if path exists
if not base_path.exists():
messagebox.showerror("Error", f"Path not found: {base_path}")
return
# Get folders from cache or disk
if use_cache:
# Parse cache lazily - only load what we need initially
folders = self._parse_cache_lazily(limit=None) # Get all but parse efficiently
else:
# Scan from disk
folders = []
try:
for entry in os.scandir(base_path):
if entry.is_dir(follow_symlinks=False):
size = self._get_folder_size(Path(entry))
folders.append((entry.name, Path(entry), size))
except PermissionError:
messagebox.showerror("Error", f"Permission denied accessing {base_path}")
return
# Update cache with fresh scan
cache_dict = {str(f[1]): f[2] for f in folders}
self.folder_cache = cache_dict
self._save_cache()
# Sort by size descending
folders.sort(key=lambda x: x[2], reverse=True)
# Store all folders and load first batch only
self.all_folders = folders
self._load_more_items()
def _on_folder_expand(self, event):
"""Handle folder double-click to show contents."""
selection = self.tree.selection()
if not selection:
return
item = selection[0]
tags = self.tree.item(item, "tags")
if not tags:
return
folder_path = Path(tags[0])
# Check if already expanded
if self.tree.get_children(item):
# Toggle: remove children
for child in self.tree.get_children(item):
self.tree.delete(child)
else:
# Add file/folder contents
try:
entries = []
for entry in os.scandir(folder_path):
if entry.is_file():
size = entry.stat().st_size
entries.append((entry.name, "File", size))
elif entry.is_dir():
size = self._get_folder_size(Path(entry))
entries.append((entry.name, "Folder", size))
# Sort by size descending
entries.sort(key=lambda x: x[2], reverse=True)
for name, type_str, size in entries:
size_str = self._format_size(size)
self.tree.insert(item, "end", text=f"[{type_str}] {name}", values=(size_str,))
except PermissionError:
messagebox.showerror("Error", f"Permission denied accessing {folder_path}")
def _on_folder_select(self, event):
"""Handle folder selection to update preview."""
selection = self.tree.selection()
if not selection:
return
item = selection[0]
tags = self.tree.item(item, "tags")
if tags:
self.selected_folder = tags[0]
self._update_preview()
def _on_tree_click(self, event):
"""Handle click on '+' or '-' button in add column."""
item = self.tree.identify("item", event.x, event.y)
column = self.tree.identify_column(event.x) # Only takes x coordinate
# Column #2 is the "add" column (columns are #0=name, #1=size, #2=add, #3=remove)
if item and column == "#2":
tags = self.tree.item(item, "tags")
if tags:
folder_path = tags[0]
values = self.tree.item(item, "values")
if len(values) > 1:
button_text = values[1] # Get button text
if "[+]" in button_text:
# Immediately update UI for snappy response
size_val = values[0]
self.tree.item(item, values=(size_val, "", "[-]"), tags=(folder_path, "added"))
# Add to paths.txt asynchronously
self.selected_folder = folder_path
self.root.after(0, self._add_to_paths_file_async, folder_path)
# Column #3 is the "remove" column
elif item and column == "#3":
tags = self.tree.item(item, "tags")
if tags:
folder_path = tags[0]
# Immediately update UI for snappy response
values = self.tree.item(item, "values")
size_val = values[0]
self.tree.item(item, values=(size_val, "[+]", ""), tags=(folder_path, "not_added"))
# Remove from paths.txt asynchronously
self.root.after(0, self._remove_from_paths_file_async, folder_path)
def _add_to_paths_file_async(self, folder_path):
"""Add to paths.txt without blocking UI."""
self.selected_folder = folder_path
self._add_to_paths_file()
# Silently reload in background
self._load_existing_paths()
def _remove_from_paths_file_async(self, folder_path):
"""Remove from paths.txt without blocking UI."""
self._remove_from_paths_file(folder_path)
def _update_preview(self):
"""Update the command preview."""
if not self.selected_folder:
preview_text = "No folder selected"
else:
folder_path = self.selected_folder
# Build command
cmd_parts = ['py main.py']
# Add mode if not default
mode = self.mode_var.get()
if mode != "default":
cmd_parts.append(f'--m {mode}')
# Add resolution if specified
resolution = self.resolution_var.get()
if resolution != "none":
cmd_parts.append(f'--r {resolution}')
# Add CQ if specified
cq = self.cq_var.get().strip()
if cq:
cmd_parts.append(f'--cq {cq}')
# Add path
cmd_parts.append(f'"{folder_path}"')
preview_text = " ".join(cmd_parts)
self.preview_text.config(state=tk.NORMAL)
self.preview_text.delete("1.0", tk.END)
self.preview_text.insert("1.0", preview_text)
self.preview_text.config(state=tk.DISABLED)
def _add_to_paths_file(self):
"""Append the current command to paths.txt."""
if not self.selected_folder:
messagebox.showwarning("Warning", "Please select a folder first")
return
folder_path = self.selected_folder
# Check if already in file
if folder_path in self.added_folders:
self._show_status(f"Already added: {Path(folder_path).name}")
return
# Build command line - start fresh
cmd_parts = []
# Add mode if not default
mode = self.mode_var.get()
if mode != "default":
cmd_parts.append(f'--m {mode}')
# Add resolution if specified
resolution = self.resolution_var.get()
if resolution != "none":
cmd_parts.append(f'--r {resolution}')
# Add CQ if specified
cq = self.cq_var.get().strip()
if cq:
cmd_parts.append(f'--cq {cq}')
# Add folder path
cmd_parts.append(f'"{folder_path}"')
line = " ".join(cmd_parts)
# Append to paths.txt
try:
# Check if file exists and has content
if self.paths_file.exists() and self.paths_file.stat().st_size > 0:
# Read last character to check if it ends with newline
with open(self.paths_file, "rb") as f:
f.seek(-1, 2) # Seek to last byte
last_char = f.read(1)
needs_newline = last_char != b'\n'
else:
needs_newline = False
# Write to file
with open(self.paths_file, "a", encoding="utf-8") as f:
if needs_newline:
f.write("\n")
f.write(line + "\n")
# Add to tracked set
self.added_folders.add(folder_path)
# Silent success - show status label instead of popup
self.recently_added = folder_path
self._show_status(f"✓ Added: {Path(folder_path).name}")
logger.info(f"Added to paths.txt: {line}")
# Clear timer if exists
if self.status_timer:
self.root.after_cancel(self.status_timer)
# Clear status after 3 seconds
self.status_timer = self.root.after(3000, self._clear_status)
except Exception as e:
messagebox.showerror("Error", f"Failed to write to paths.txt: {e}")
logger.error(f"Failed to write to paths.txt: {e}")
def _remove_from_paths_file(self, folder_path):
"""Remove a folder from paths.txt."""
if not self.paths_file.exists():
messagebox.showwarning("Warning", "paths.txt does not exist")
return
try:
with open(self.paths_file, "r", encoding="utf-8") as f:
lines = f.readlines()
# Filter out lines containing this folder path
new_lines = []
found = False
for line in lines:
if f'"{folder_path}"' in line or f"'{folder_path}'" in line:
found = True
else:
new_lines.append(line)
if not found:
messagebox.showwarning("Warning", "Path not found in paths.txt")
return
# Write back
with open(self.paths_file, "w", encoding="utf-8") as f:
f.writelines(new_lines)
# Remove from tracked set
self.added_folders.discard(folder_path)
self._show_status(f"✓ Removed: {Path(folder_path).name}")
logger.info(f"Removed from paths.txt: {folder_path}")
# Clear timer if exists
if self.status_timer:
self.root.after_cancel(self.status_timer)
# Clear status after 3 seconds
self.status_timer = self.root.after(3000, self._clear_status)
except Exception as e:
messagebox.showerror("Error", f"Failed to remove from paths.txt: {e}")
logger.error(f"Failed to remove from paths.txt: {e}")
def _show_status(self, message):
"""Show status message in label."""
self.status_label.config(text=message, foreground="green")
def _clear_status(self):
"""Clear status message after delay."""
self.status_label.config(text="")
self.status_timer = None
def _run_transcode(self):
"""Launch transcode.bat in a new command window."""
if not self.transcode_bat.exists():
messagebox.showerror("Error", f"transcode.bat not found at {self.transcode_bat}")
return
try:
# Launch in new cmd window
subprocess.Popen(
['cmd', '/c', f'"{self.transcode_bat}"'],
cwd=str(self.transcode_bat.parent),
creationflags=subprocess.CREATE_NEW_CONSOLE
)
logger.info("Launched transcode.bat")
except Exception as e:
messagebox.showerror("Error", f"Failed to launch transcode.bat: {e}")
logger.error(f"Failed to launch transcode.bat: {e}")
def _view_paths_file(self):
"""Open paths.txt in a new window."""
if not self.paths_file.exists():
messagebox.showinfo("Info", "paths.txt does not exist yet")
return
try:
with open(self.paths_file, "r", encoding="utf-8") as f:
content = f.read()
# Create new window
view_window = tk.Toplevel(self.root)
view_window.title("paths.txt")
view_window.geometry("800x400")
text_widget = tk.Text(view_window, wrap=tk.WORD)
text_widget.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
text_widget.insert("1.0", content)
# Add close button
ttk.Button(view_window, text="Close", command=view_window.destroy).pack(pady=5)
except Exception as e:
messagebox.showerror("Error", f"Failed to read paths.txt: {e}")
def _clear_paths_file(self):
"""Clear the paths.txt file."""
if not self.paths_file.exists():
messagebox.showinfo("Info", "paths.txt does not exist")
return
if messagebox.askyesno("Confirm", "Are you sure you want to clear paths.txt?"):
try:
self.paths_file.write_text("", encoding="utf-8")
messagebox.showinfo("Success", "paths.txt has been cleared")
logger.info("paths.txt cleared")
except Exception as e:
messagebox.showerror("Error", f"Failed to clear paths.txt: {e}")
def _on_closing(self):
"""Handle window closing - cleanup timers."""
self.root.destroy()
def _on_scrollbar(self, *args):
"""Handle scrollbar movement - load more items when scrolling."""
self.tree.yview(*args)
# Check if we need to load more items
if self.all_folders and self.loaded_items < len(self.all_folders):
# Get scroll position
first_visible = self.tree.yview()[0]
last_visible = self.tree.yview()[1]
# If we're past 70% scrolled, load more
if last_visible > 0.7:
self._load_more_items()
def _load_more_items(self):
"""Load next batch of items into tree."""
start = self.loaded_items
end = min(start + self.items_per_batch, len(self.all_folders))
for i in range(start, end):
folder_name, folder_path, size = self.all_folders[i]
size_str = self._format_size(size)
folder_path_str = str(folder_path)
# Determine button and tag
if folder_path_str in self.added_folders:
add_btn = ""
remove_btn = "[-]"
tag = "added"
else:
add_btn = "[+]"
remove_btn = ""
tag = "not_added"
self.tree.insert("", "end", text=folder_name, values=(size_str, add_btn, remove_btn),
tags=(folder_path_str, tag))
self.loaded_items = end
def main():
root = tk.Tk()
app = PathManagerGUI(root)
root.mainloop()
if __name__ == "__main__":
main()