#!/usr/bin/env python3 """ GUI Path Manager for Batch Video Transcoder Allows easy browsing of folders and appending to paths.txt with encoding options. """ import sys from pathlib import Path # Add parent directory to path so we can import core modules sys.path.insert(0, str(Path(__file__).parent.parent)) import tkinter as tk from tkinter import ttk, messagebox, filedialog import os import subprocess import re import json from core.config_helper import load_config_xml from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") class PathManagerGUI: def __init__(self, root): self.root = root self.root.title("Batch Transcoder - Path Manager") self.root.geometry("1100x700") # Load config (from parent directory) config_path = Path(__file__).parent.parent / "config.xml" self.config = load_config_xml(config_path) # Convert path_mappings from list to dict for easier lookup path_mappings_list = self.config.get("path_mappings", []) self.path_mappings = {m["from"]: m["to"] for m in path_mappings_list} if isinstance(path_mappings_list, list) else path_mappings_list # Paths file (in root directory) self.paths_file = Path(__file__).parent.parent / "paths.txt" self.transcode_bat = Path(__file__).parent.parent / "transcode.bat" # Current selected folder self.selected_folder = None self.current_category = None self.recently_added = None # Track recently added folder for highlighting self.status_timer = None # Track status message timer self.added_folders = set() # Folders that are in paths.txt # Cache for folder data - split per category self.cache_dir = Path(__file__).parent.parent / ".cache" self.cache_dir.mkdir(exist_ok=True) self.folder_cache = {} # Only current category in memory: {folder_path: size} self.scan_in_progress = False self.scanned_categories = set() # Track which categories have been scanned # Lazy loading self.all_folders = [] # All folders for current category self.loaded_items = 0 # How many items are currently loaded self.items_per_batch = 100 # Load 100 items at a time # Load existing paths self._load_existing_paths() # Build UI self._build_ui() # Handle window close self.root.protocol("WM_DELETE_WINDOW", self._on_closing) def _build_ui(self): """Build the GUI layout.""" # Top frame for category selection and transcode launcher top_frame = ttk.Frame(self.root) top_frame.pack(fill=tk.X, padx=10, pady=10) left_top = ttk.Frame(top_frame) left_top.pack(side=tk.LEFT, fill=tk.X, expand=True) ttk.Label(left_top, text="Category:").pack(side=tk.LEFT, padx=5) self.category_var = tk.StringVar(value="tv") categories = ["tv", "anime", "movies"] for cat in categories: ttk.Radiobutton( left_top, text=cat.upper(), variable=self.category_var, value=cat, command=self._on_category_change ).pack(side=tk.LEFT, padx=5) ttk.Button(left_top, text="Refresh", command=self._refresh_with_cache_clear).pack(side=tk.LEFT, padx=5) # Right side of top frame - transcode launcher right_top = ttk.Frame(top_frame) right_top.pack(side=tk.RIGHT) if self.transcode_bat.exists(): ttk.Button( right_top, text="▶ Run transcode.bat", command=self._run_transcode ).pack(side=tk.RIGHT, padx=5) # Main content frame main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # Left side - folder tree left_frame = ttk.LabelFrame(main_frame, text="Folders (sorted by size)") left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) # Treeview for folders with add button column self.tree = ttk.Treeview(left_frame, columns=("size", "add", "remove"), height=20) self.tree.column("#0", width=180) self.tree.column("size", width=80) self.tree.column("add", width=50) self.tree.column("remove", width=50) self.tree.heading("#0", text="Folder Name") self.tree.heading("size", text="Size") self.tree.heading("add", text="Add") self.tree.heading("remove", text="Remove") scrollbar = ttk.Scrollbar(left_frame, orient=tk.VERTICAL, command=self._on_scrollbar) self.tree.configure(yscroll=scrollbar.set) self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Configure tags for folder status self.tree.tag_configure("added", background="#90EE90") # Light green for added self.tree.tag_configure("not_added", background="white") # White for not added self.tree.tag_configure("recently_added", background="#FFD700") # Gold for recently added self.tree.bind("", self._on_folder_expand) self.tree.bind("<>", self._on_folder_select) self.tree.bind("", self._on_tree_click) # Right side - options and preview right_frame = ttk.LabelFrame(main_frame, text="Encoding Options & Preview") right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0)) # Mode selection mode_frame = ttk.LabelFrame(right_frame, text="Mode (--m)") mode_frame.pack(fill=tk.X, padx=5, pady=5) self.mode_var = tk.StringVar(value="default") for mode in ["default", "cq", "bitrate"]: ttk.Radiobutton(mode_frame, text=mode, variable=self.mode_var, value=mode, command=self._update_preview).pack(anchor=tk.W, padx=5) # Resolution selection res_frame = ttk.LabelFrame(right_frame, text="Resolution (--r)") res_frame.pack(fill=tk.X, padx=5, pady=5) self.resolution_var = tk.StringVar(value="none") for res in ["none", "480", "720", "1080"]: label = "Auto" if res == "none" else res + "p" ttk.Radiobutton(res_frame, text=label, variable=self.resolution_var, value=res, command=self._update_preview).pack(anchor=tk.W, padx=5) # CQ value cq_frame = ttk.LabelFrame(right_frame, text="CQ Value (--cq, optional)") cq_frame.pack(fill=tk.X, padx=5, pady=5) self.cq_var = tk.StringVar(value="") cq_entry = ttk.Entry(cq_frame, textvariable=self.cq_var, width=10) cq_entry.pack(anchor=tk.W, padx=5, pady=3) cq_entry.bind("", lambda e: self._update_preview()) # Preview frame preview_frame = ttk.LabelFrame(right_frame, text="Command Preview") preview_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.preview_text = tk.Text(preview_frame, height=8, width=40, wrap=tk.WORD, bg="lightgray") self.preview_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.preview_text.config(state=tk.DISABLED) # Bottom frame - action buttons and status bottom_frame = ttk.Frame(self.root) bottom_frame.pack(fill=tk.X, padx=10, pady=10) button_frame = ttk.Frame(bottom_frame) button_frame.pack(side=tk.LEFT) ttk.Button(button_frame, text="View paths.txt", command=self._view_paths_file).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="Clear paths.txt", command=self._clear_paths_file).pack(side=tk.LEFT, padx=5) # Status label (for silent feedback) self.status_label = ttk.Label(bottom_frame, text="", foreground="green") self.status_label.pack(side=tk.LEFT, padx=10) # Load cache and populate initial category self._load_cache() self._refresh_folders(use_cache=True) # Only scan once per category on first view if self.current_category not in self.scanned_categories: self.root.after(100, self._scan_folders_once) def _on_category_change(self): """Handle category radio button change.""" self.current_category = self.category_var.get() # Load cache for this category self._load_cache() # Show cached data first self._refresh_folders(use_cache=True) # Only scan once per category on first view if self.current_category not in self.scanned_categories: self.root.after(100, self._scan_folders_once) def _load_cache(self): """Load folder cache for current category from disk (lazy).""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" self.folder_cache.clear() # Don't fully load cache yet - just verify it exists if not cache_file.exists(): logger.info(f"No cache file for {category}") else: logger.info(f"Cache file exists for {category}") def _parse_cache_lazily(self, limit=None): """Parse cache file lazily and return folders.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" folders = [] if cache_file.exists(): try: with open(cache_file, "r", encoding="utf-8") as f: cache_dict = json.load(f) # Convert to list and sort for folder_path_str, size in cache_dict.items(): folder_path = Path(folder_path_str) if folder_path.exists(): folders.append((folder_path.name, folder_path, size)) # Early exit if limit reached if limit and len(folders) >= limit: break # Sort by size descending (only what we loaded) folders.sort(key=lambda x: x[2], reverse=True) except Exception as e: logger.error(f"Failed to parse cache: {e}") return folders def _save_cache(self): """Save current category's folder cache to disk.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(self.folder_cache, f, indent=2) except Exception as e: logger.error(f"Failed to save {category} cache: {e}") def _refresh_with_cache_clear(self): """Refresh and clear cache to force full scan.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" # Delete cache file for this category if cache_file.exists(): cache_file.unlink() self.folder_cache.clear() self.scanned_categories.discard(category) # Reset so it will scan again self._refresh_folders(use_cache=False) def _scan_folders_once(self): """Scan folders once per category on first load.""" if self.scan_in_progress: return category = self.category_var.get() if category in self.scanned_categories: return # Already scanned this category self.scan_in_progress = True try: category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: return base_path = Path(base_key) if not base_path.exists(): return # Scan folders and update cache new_cache = {} for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) new_cache[str(Path(entry))] = size # Update cache and save self.folder_cache = new_cache self._save_cache() self.scanned_categories.add(category) # Update UI if still on same category if self.category_var.get() == category: self._refresh_folders(use_cache=True) finally: self.scan_in_progress = False def _scan_folders_background(self): """Scan folders in background and update cache.""" if self.scan_in_progress: return self.scan_in_progress = True try: category = self.category_var.get() category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: return base_path = Path(base_key) if not base_path.exists(): return # Scan folders and update cache new_cache = {} for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) new_cache[str(Path(entry))] = size # Update cache and save self.folder_cache[category] = new_cache self._save_cache() # Update UI if still on same category if self.category_var.get() == category: self._refresh_folders(use_cache=True) finally: self.scan_in_progress = False # Schedule next continuous scan self.background_scan_timer = self.root.after( self.background_scan_interval, self._continuous_background_scan ) # Schedule next continuous scan self.background_scan_timer = self.root.after( self.background_scan_interval, self._continuous_background_scan ) def _load_existing_paths(self): """Load existing paths from paths.txt and extract folder paths.""" self.added_folders.clear() if not self.paths_file.exists(): return try: with open(self.paths_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue # Extract the path (last argument in the command) # Format: --m mode --r res --cq val "path" or just "path" # Find all quoted strings matches = re.findall(r'"([^"]*)"', line) if matches: # Last quoted string is the path path = matches[-1] self.added_folders.add(path) except Exception as e: logger.error(f"Failed to load existing paths: {e}") def _get_folder_size(self, path: Path) -> int: """Calculate total size of folder in bytes.""" total = 0 try: for entry in os.scandir(path): if entry.is_file(follow_symlinks=False): total += entry.stat().st_size elif entry.is_dir(follow_symlinks=False): total += self._get_folder_size(Path(entry)) except PermissionError: pass return total def _format_size(self, bytes_size: int) -> str: """Format bytes to human readable size.""" for unit in ["B", "KB", "MB", "GB", "TB"]: if bytes_size < 1024: return f"{bytes_size:.1f} {unit}" bytes_size /= 1024 return f"{bytes_size:.1f} PB" def _refresh_folders(self, use_cache=False): """Refresh the folder tree from cache or disk.""" # Clear existing items for item in self.tree.get_children(): self.tree.delete(item) self.all_folders = [] self.loaded_items = 0 category = self.category_var.get() # Map category to path mapping key category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: messagebox.showwarning("Info", f"No mapping found for {category}") return base_path = Path(base_key) # Check if path exists if not base_path.exists(): messagebox.showerror("Error", f"Path not found: {base_path}") return # Get folders from cache or disk if use_cache: # Parse cache lazily - only load what we need initially folders = self._parse_cache_lazily(limit=None) # Get all but parse efficiently else: # Scan from disk folders = [] try: for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) folders.append((entry.name, Path(entry), size)) except PermissionError: messagebox.showerror("Error", f"Permission denied accessing {base_path}") return # Update cache with fresh scan cache_dict = {str(f[1]): f[2] for f in folders} self.folder_cache = cache_dict self._save_cache() # Sort by size descending folders.sort(key=lambda x: x[2], reverse=True) # Store all folders and load first batch only self.all_folders = folders self._load_more_items() def _on_folder_expand(self, event): """Handle folder double-click to show contents.""" selection = self.tree.selection() if not selection: return item = selection[0] tags = self.tree.item(item, "tags") if not tags: return folder_path = Path(tags[0]) # Check if already expanded if self.tree.get_children(item): # Toggle: remove children for child in self.tree.get_children(item): self.tree.delete(child) else: # Add file/folder contents try: entries = [] for entry in os.scandir(folder_path): if entry.is_file(): size = entry.stat().st_size entries.append((entry.name, "File", size)) elif entry.is_dir(): size = self._get_folder_size(Path(entry)) entries.append((entry.name, "Folder", size)) # Sort by size descending entries.sort(key=lambda x: x[2], reverse=True) for name, type_str, size in entries: size_str = self._format_size(size) self.tree.insert(item, "end", text=f"[{type_str}] {name}", values=(size_str,)) except PermissionError: messagebox.showerror("Error", f"Permission denied accessing {folder_path}") def _on_folder_select(self, event): """Handle folder selection to update preview.""" selection = self.tree.selection() if not selection: return item = selection[0] tags = self.tree.item(item, "tags") if tags: self.selected_folder = tags[0] self._update_preview() def _on_tree_click(self, event): """Handle click on '+' or '-' button in add column.""" item = self.tree.identify("item", event.x, event.y) column = self.tree.identify_column(event.x) # Only takes x coordinate # Column #2 is the "add" column (columns are #0=name, #1=size, #2=add, #3=remove) if item and column == "#2": tags = self.tree.item(item, "tags") if tags: folder_path = tags[0] values = self.tree.item(item, "values") if len(values) > 1: button_text = values[1] # Get button text if "[+]" in button_text: # Immediately update UI for snappy response size_val = values[0] self.tree.item(item, values=(size_val, "", "[-]"), tags=(folder_path, "added")) # Add to paths.txt asynchronously self.selected_folder = folder_path self.root.after(0, self._add_to_paths_file_async, folder_path) # Column #3 is the "remove" column elif item and column == "#3": tags = self.tree.item(item, "tags") if tags: folder_path = tags[0] # Immediately update UI for snappy response values = self.tree.item(item, "values") size_val = values[0] self.tree.item(item, values=(size_val, "[+]", ""), tags=(folder_path, "not_added")) # Remove from paths.txt asynchronously self.root.after(0, self._remove_from_paths_file_async, folder_path) def _add_to_paths_file_async(self, folder_path): """Add to paths.txt without blocking UI.""" self.selected_folder = folder_path self._add_to_paths_file() # Silently reload in background self._load_existing_paths() def _remove_from_paths_file_async(self, folder_path): """Remove from paths.txt without blocking UI.""" self._remove_from_paths_file(folder_path) def _update_preview(self): """Update the command preview.""" if not self.selected_folder: preview_text = "No folder selected" else: folder_path = self.selected_folder # Build command cmd_parts = ['py main.py'] # Add mode if not default mode = self.mode_var.get() if mode != "default": cmd_parts.append(f'--m {mode}') # Add resolution if specified resolution = self.resolution_var.get() if resolution != "none": cmd_parts.append(f'--r {resolution}') # Add CQ if specified cq = self.cq_var.get().strip() if cq: cmd_parts.append(f'--cq {cq}') # Add path cmd_parts.append(f'"{folder_path}"') preview_text = " ".join(cmd_parts) self.preview_text.config(state=tk.NORMAL) self.preview_text.delete("1.0", tk.END) self.preview_text.insert("1.0", preview_text) self.preview_text.config(state=tk.DISABLED) def _add_to_paths_file(self): """Append the current command to paths.txt.""" if not self.selected_folder: messagebox.showwarning("Warning", "Please select a folder first") return folder_path = self.selected_folder # Check if already in file if folder_path in self.added_folders: self._show_status(f"Already added: {Path(folder_path).name}") return # Build command line - start fresh cmd_parts = [] # Add mode if not default mode = self.mode_var.get() if mode != "default": cmd_parts.append(f'--m {mode}') # Add resolution if specified resolution = self.resolution_var.get() if resolution != "none": cmd_parts.append(f'--r {resolution}') # Add CQ if specified cq = self.cq_var.get().strip() if cq: cmd_parts.append(f'--cq {cq}') # Add folder path cmd_parts.append(f'"{folder_path}"') line = " ".join(cmd_parts) # Append to paths.txt try: # Check if file exists and has content if self.paths_file.exists() and self.paths_file.stat().st_size > 0: # Read last character to check if it ends with newline with open(self.paths_file, "rb") as f: f.seek(-1, 2) # Seek to last byte last_char = f.read(1) needs_newline = last_char != b'\n' else: needs_newline = False # Write to file with open(self.paths_file, "a", encoding="utf-8") as f: if needs_newline: f.write("\n") f.write(line + "\n") # Add to tracked set self.added_folders.add(folder_path) # Silent success - show status label instead of popup self.recently_added = folder_path self._show_status(f"✓ Added: {Path(folder_path).name}") logger.info(f"Added to paths.txt: {line}") # Clear timer if exists if self.status_timer: self.root.after_cancel(self.status_timer) # Clear status after 3 seconds self.status_timer = self.root.after(3000, self._clear_status) except Exception as e: messagebox.showerror("Error", f"Failed to write to paths.txt: {e}") logger.error(f"Failed to write to paths.txt: {e}") def _remove_from_paths_file(self, folder_path): """Remove a folder from paths.txt.""" if not self.paths_file.exists(): messagebox.showwarning("Warning", "paths.txt does not exist") return try: with open(self.paths_file, "r", encoding="utf-8") as f: lines = f.readlines() # Filter out lines containing this folder path new_lines = [] found = False for line in lines: if f'"{folder_path}"' in line or f"'{folder_path}'" in line: found = True else: new_lines.append(line) if not found: messagebox.showwarning("Warning", "Path not found in paths.txt") return # Write back with open(self.paths_file, "w", encoding="utf-8") as f: f.writelines(new_lines) # Remove from tracked set self.added_folders.discard(folder_path) self._show_status(f"✓ Removed: {Path(folder_path).name}") logger.info(f"Removed from paths.txt: {folder_path}") # Clear timer if exists if self.status_timer: self.root.after_cancel(self.status_timer) # Clear status after 3 seconds self.status_timer = self.root.after(3000, self._clear_status) except Exception as e: messagebox.showerror("Error", f"Failed to remove from paths.txt: {e}") logger.error(f"Failed to remove from paths.txt: {e}") def _show_status(self, message): """Show status message in label.""" self.status_label.config(text=message, foreground="green") def _clear_status(self): """Clear status message after delay.""" self.status_label.config(text="") self.status_timer = None def _run_transcode(self): """Launch transcode.bat in a new command window.""" if not self.transcode_bat.exists(): messagebox.showerror("Error", f"transcode.bat not found at {self.transcode_bat}") return try: # Launch in new cmd window subprocess.Popen( ['cmd', '/c', f'"{self.transcode_bat}"'], cwd=str(self.transcode_bat.parent), creationflags=subprocess.CREATE_NEW_CONSOLE ) logger.info("Launched transcode.bat") except Exception as e: messagebox.showerror("Error", f"Failed to launch transcode.bat: {e}") logger.error(f"Failed to launch transcode.bat: {e}") def _view_paths_file(self): """Open paths.txt in a new window.""" if not self.paths_file.exists(): messagebox.showinfo("Info", "paths.txt does not exist yet") return try: with open(self.paths_file, "r", encoding="utf-8") as f: content = f.read() # Create new window view_window = tk.Toplevel(self.root) view_window.title("paths.txt") view_window.geometry("800x400") text_widget = tk.Text(view_window, wrap=tk.WORD) text_widget.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) text_widget.insert("1.0", content) # Add close button ttk.Button(view_window, text="Close", command=view_window.destroy).pack(pady=5) except Exception as e: messagebox.showerror("Error", f"Failed to read paths.txt: {e}") def _clear_paths_file(self): """Clear the paths.txt file.""" if not self.paths_file.exists(): messagebox.showinfo("Info", "paths.txt does not exist") return if messagebox.askyesno("Confirm", "Are you sure you want to clear paths.txt?"): try: self.paths_file.write_text("", encoding="utf-8") messagebox.showinfo("Success", "paths.txt has been cleared") logger.info("paths.txt cleared") except Exception as e: messagebox.showerror("Error", f"Failed to clear paths.txt: {e}") def _on_closing(self): """Handle window closing - cleanup timers.""" self.root.destroy() def _on_scrollbar(self, *args): """Handle scrollbar movement - load more items when scrolling.""" self.tree.yview(*args) # Check if we need to load more items if self.all_folders and self.loaded_items < len(self.all_folders): # Get scroll position first_visible = self.tree.yview()[0] last_visible = self.tree.yview()[1] # If we're past 70% scrolled, load more if last_visible > 0.7: self._load_more_items() def _load_more_items(self): """Load next batch of items into tree.""" start = self.loaded_items end = min(start + self.items_per_batch, len(self.all_folders)) for i in range(start, end): folder_name, folder_path, size = self.all_folders[i] size_str = self._format_size(size) folder_path_str = str(folder_path) # Determine button and tag if folder_path_str in self.added_folders: add_btn = "" remove_btn = "[-]" tag = "added" else: add_btn = "[+]" remove_btn = "" tag = "not_added" self.tree.insert("", "end", text=folder_name, values=(size_str, add_btn, remove_btn), tags=(folder_path_str, tag)) self.loaded_items = end def main(): root = tk.Tk() app = PathManagerGUI(root) root.mainloop() if __name__ == "__main__": main()