#!/usr/bin/env python3 """ GUI Path Manager for Batch Video Transcoder Allows easy browsing of folders and appending to paths.txt with encoding options. """ import tkinter as tk from tkinter import ttk, messagebox, filedialog from pathlib import Path import os import subprocess import re import json from core.config_helper import load_config_xml from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent / "logs") class PathManagerGUI: def __init__(self, root): self.root = root self.root.title("Batch Transcoder - Path Manager") self.root.geometry("1100x700") # Load config config_path = Path(__file__).parent / "config.xml" self.config = load_config_xml(config_path) self.path_mappings = self.config.get("path_mappings", {}) # Paths file self.paths_file = Path(__file__).parent / "paths.txt" self.transcode_bat = Path(__file__).parent / "transcode.bat" # Current selected folder self.selected_folder = None self.current_category = None self.recently_added = None # Track recently added folder for highlighting self.status_timer = None # Track status message timer self.added_folders = set() # Folders that are in paths.txt # Cache for folder data - split per category self.cache_dir = Path(__file__).parent / ".cache" self.cache_dir.mkdir(exist_ok=True) self.folder_cache = {} # Only current category in memory: {folder_path: size} self.scan_in_progress = False self.scanned_categories = set() # Track which categories have been scanned # Lazy loading self.all_folders = [] # All folders for current category self.loaded_items = 0 # How many items are currently loaded self.items_per_batch = 100 # Load 100 items at a time # Load existing paths self._load_existing_paths() # Build UI self._build_ui() # Handle window close self.root.protocol("WM_DELETE_WINDOW", self._on_closing) def _build_ui(self): """Build the GUI layout.""" # Top frame for category selection and transcode launcher top_frame = ttk.Frame(self.root) top_frame.pack(fill=tk.X, padx=10, pady=10) left_top = ttk.Frame(top_frame) left_top.pack(side=tk.LEFT, fill=tk.X, expand=True) ttk.Label(left_top, text="Category:").pack(side=tk.LEFT, padx=5) self.category_var = tk.StringVar(value="tv") categories = ["tv", "anime", "movies"] for cat in categories: ttk.Radiobutton( left_top, text=cat.upper(), variable=self.category_var, value=cat, command=self._on_category_change ).pack(side=tk.LEFT, padx=5) ttk.Button(left_top, text="Refresh", command=self._refresh_with_cache_clear).pack(side=tk.LEFT, padx=5) # Right side of top frame - transcode launcher right_top = ttk.Frame(top_frame) right_top.pack(side=tk.RIGHT) if self.transcode_bat.exists(): ttk.Button( right_top, text="▶ Run transcode.bat", command=self._run_transcode ).pack(side=tk.RIGHT, padx=5) # Main content frame main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # Left side - folder tree left_frame = ttk.LabelFrame(main_frame, text="Folders (sorted by size)") left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) # Treeview for folders with add button column self.tree = ttk.Treeview(left_frame, columns=("size", "add", "remove"), height=20) self.tree.column("#0", width=180) self.tree.column("size", width=80) self.tree.column("add", width=50) self.tree.column("remove", width=50) self.tree.heading("#0", text="Folder Name") self.tree.heading("size", text="Size") self.tree.heading("add", text="Add") self.tree.heading("remove", text="Remove") scrollbar = ttk.Scrollbar(left_frame, orient=tk.VERTICAL, command=self._on_scrollbar) self.tree.configure(yscroll=scrollbar.set) self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Configure tags for folder status self.tree.tag_configure("added", background="#90EE90") # Light green for added self.tree.tag_configure("not_added", background="white") # White for not added self.tree.tag_configure("recently_added", background="#FFD700") # Gold for recently added self.tree.bind("", self._on_folder_expand) self.tree.bind("<>", self._on_folder_select) self.tree.bind("", self._on_tree_click) # Right side - options and preview right_frame = ttk.LabelFrame(main_frame, text="Encoding Options & Preview") right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0)) # Mode selection mode_frame = ttk.LabelFrame(right_frame, text="Mode (--m)") mode_frame.pack(fill=tk.X, padx=5, pady=5) self.mode_var = tk.StringVar(value="default") for mode in ["default", "cq", "bitrate"]: ttk.Radiobutton(mode_frame, text=mode, variable=self.mode_var, value=mode, command=self._update_preview).pack(anchor=tk.W, padx=5) # Resolution selection res_frame = ttk.LabelFrame(right_frame, text="Resolution (--r)") res_frame.pack(fill=tk.X, padx=5, pady=5) self.resolution_var = tk.StringVar(value="none") for res in ["none", "480", "720", "1080"]: label = "Auto" if res == "none" else res + "p" ttk.Radiobutton(res_frame, text=label, variable=self.resolution_var, value=res, command=self._update_preview).pack(anchor=tk.W, padx=5) # CQ value cq_frame = ttk.LabelFrame(right_frame, text="CQ Value (--cq, optional)") cq_frame.pack(fill=tk.X, padx=5, pady=5) self.cq_var = tk.StringVar(value="") cq_entry = ttk.Entry(cq_frame, textvariable=self.cq_var, width=10) cq_entry.pack(anchor=tk.W, padx=5, pady=3) cq_entry.bind("", lambda e: self._update_preview()) # Preview frame preview_frame = ttk.LabelFrame(right_frame, text="Command Preview") preview_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.preview_text = tk.Text(preview_frame, height=8, width=40, wrap=tk.WORD, bg="lightgray") self.preview_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.preview_text.config(state=tk.DISABLED) # Bottom frame - action buttons and status bottom_frame = ttk.Frame(self.root) bottom_frame.pack(fill=tk.X, padx=10, pady=10) button_frame = ttk.Frame(bottom_frame) button_frame.pack(side=tk.LEFT) ttk.Button(button_frame, text="View paths.txt", command=self._view_paths_file).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="Clear paths.txt", command=self._clear_paths_file).pack(side=tk.LEFT, padx=5) # Status label (for silent feedback) self.status_label = ttk.Label(bottom_frame, text="", foreground="green") self.status_label.pack(side=tk.LEFT, padx=10) # Load cache and populate initial category self._load_cache() self._refresh_folders(use_cache=True) # Only scan once per category on first view if self.current_category not in self.scanned_categories: self.root.after(100, self._scan_folders_once) def _on_category_change(self): """Handle category radio button change.""" self.current_category = self.category_var.get() # Load cache for this category self._load_cache() # Show cached data first self._refresh_folders(use_cache=True) # Only scan once per category on first view if self.current_category not in self.scanned_categories: self.root.after(100, self._scan_folders_once) def _load_cache(self): """Load folder cache for current category from disk (lazy).""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" self.folder_cache.clear() # Don't fully load cache yet - just verify it exists if not cache_file.exists(): logger.info(f"No cache file for {category}") else: logger.info(f"Cache file exists for {category}") def _parse_cache_lazily(self, limit=None): """Parse cache file lazily and return folders.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" folders = [] if cache_file.exists(): try: with open(cache_file, "r", encoding="utf-8") as f: cache_dict = json.load(f) # Convert to list and sort for folder_path_str, size in cache_dict.items(): folder_path = Path(folder_path_str) if folder_path.exists(): folders.append((folder_path.name, folder_path, size)) # Early exit if limit reached if limit and len(folders) >= limit: break # Sort by size descending (only what we loaded) folders.sort(key=lambda x: x[2], reverse=True) except Exception as e: logger.error(f"Failed to parse cache: {e}") return folders def _save_cache(self): """Save current category's folder cache to disk.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(self.folder_cache, f, indent=2) except Exception as e: logger.error(f"Failed to save {category} cache: {e}") def _refresh_with_cache_clear(self): """Refresh and clear cache to force full scan.""" category = self.category_var.get() cache_file = self.cache_dir / f".cache_{category}.json" # Delete cache file for this category if cache_file.exists(): cache_file.unlink() self.folder_cache.clear() self.scanned_categories.discard(category) # Reset so it will scan again self._refresh_folders(use_cache=False) def _scan_folders_once(self): """Scan folders once per category on first load.""" if self.scan_in_progress: return category = self.category_var.get() if category in self.scanned_categories: return # Already scanned this category self.scan_in_progress = True try: category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: return base_path = Path(base_key) if not base_path.exists(): return # Scan folders and update cache new_cache = {} for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) new_cache[str(Path(entry))] = size # Update cache and save self.folder_cache = new_cache self._save_cache() self.scanned_categories.add(category) # Update UI if still on same category if self.category_var.get() == category: self._refresh_folders(use_cache=True) finally: self.scan_in_progress = False def _scan_folders_background(self): """Scan folders in background and update cache.""" if self.scan_in_progress: return self.scan_in_progress = True try: category = self.category_var.get() category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: return base_path = Path(base_key) if not base_path.exists(): return # Scan folders and update cache new_cache = {} for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) new_cache[str(Path(entry))] = size # Update cache and save self.folder_cache[category] = new_cache self._save_cache() # Update UI if still on same category if self.category_var.get() == category: self._refresh_folders(use_cache=True) finally: self.scan_in_progress = False # Schedule next continuous scan self.background_scan_timer = self.root.after( self.background_scan_interval, self._continuous_background_scan ) # Schedule next continuous scan self.background_scan_timer = self.root.after( self.background_scan_interval, self._continuous_background_scan ) def _load_existing_paths(self): """Load existing paths from paths.txt and extract folder paths.""" self.added_folders.clear() if not self.paths_file.exists(): return try: with open(self.paths_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue # Extract the path (last argument in the command) # Format: --m mode --r res --cq val "path" or just "path" # Find all quoted strings matches = re.findall(r'"([^"]*)"', line) if matches: # Last quoted string is the path path = matches[-1] self.added_folders.add(path) except Exception as e: logger.error(f"Failed to load existing paths: {e}") def _get_folder_size(self, path: Path) -> int: """Calculate total size of folder in bytes.""" total = 0 try: for entry in os.scandir(path): if entry.is_file(follow_symlinks=False): total += entry.stat().st_size elif entry.is_dir(follow_symlinks=False): total += self._get_folder_size(Path(entry)) except PermissionError: pass return total def _format_size(self, bytes_size: int) -> str: """Format bytes to human readable size.""" for unit in ["B", "KB", "MB", "GB", "TB"]: if bytes_size < 1024: return f"{bytes_size:.1f} {unit}" bytes_size /= 1024 return f"{bytes_size:.1f} PB" def _refresh_folders(self, use_cache=False): """Refresh the folder tree from cache or disk.""" # Clear existing items for item in self.tree.get_children(): self.tree.delete(item) self.all_folders = [] self.loaded_items = 0 category = self.category_var.get() # Map category to path mapping key category_mapping = { "tv": "P:\\tv", "anime": "P:\\anime", "movies": "P:\\movies" } base_key = category_mapping.get(category) if not base_key or base_key not in self.path_mappings: messagebox.showwarning("Info", f"No mapping found for {category}") return base_path = Path(base_key) # Check if path exists if not base_path.exists(): messagebox.showerror("Error", f"Path not found: {base_path}") return # Get folders from cache or disk if use_cache: # Parse cache lazily - only load what we need initially folders = self._parse_cache_lazily(limit=None) # Get all but parse efficiently else: # Scan from disk folders = [] try: for entry in os.scandir(base_path): if entry.is_dir(follow_symlinks=False): size = self._get_folder_size(Path(entry)) folders.append((entry.name, Path(entry), size)) except PermissionError: messagebox.showerror("Error", f"Permission denied accessing {base_path}") return # Update cache with fresh scan cache_dict = {str(f[1]): f[2] for f in folders} self.folder_cache = cache_dict self._save_cache() # Sort by size descending folders.sort(key=lambda x: x[2], reverse=True) # Store all folders and load first batch only self.all_folders = folders self._load_more_items() def _on_folder_expand(self, event): """Handle folder double-click to show contents.""" selection = self.tree.selection() if not selection: return item = selection[0] tags = self.tree.item(item, "tags") if not tags: return folder_path = Path(tags[0]) # Check if already expanded if self.tree.get_children(item): # Toggle: remove children for child in self.tree.get_children(item): self.tree.delete(child) else: # Add file/folder contents try: entries = [] for entry in os.scandir(folder_path): if entry.is_file(): size = entry.stat().st_size entries.append((entry.name, "File", size)) elif entry.is_dir(): size = self._get_folder_size(Path(entry)) entries.append((entry.name, "Folder", size)) # Sort by size descending entries.sort(key=lambda x: x[2], reverse=True) for name, type_str, size in entries: size_str = self._format_size(size) self.tree.insert(item, "end", text=f"[{type_str}] {name}", values=(size_str,)) except PermissionError: messagebox.showerror("Error", f"Permission denied accessing {folder_path}") def _on_folder_select(self, event): """Handle folder selection to update preview.""" selection = self.tree.selection() if not selection: return item = selection[0] tags = self.tree.item(item, "tags") if tags: self.selected_folder = tags[0] self._update_preview() def _on_tree_click(self, event): """Handle click on '+' or '-' button in add column.""" item = self.tree.identify("item", event.x, event.y) column = self.tree.identify_column(event.x) # Only takes x coordinate # Column #2 is the "add" column (columns are #0=name, #1=size, #2=add, #3=remove) if item and column == "#2": tags = self.tree.item(item, "tags") if tags: folder_path = tags[0] values = self.tree.item(item, "values") if len(values) > 1: button_text = values[1] # Get button text if "[+]" in button_text: # Immediately update UI for snappy response size_val = values[0] self.tree.item(item, values=(size_val, "", "[-]"), tags=(folder_path, "added")) # Add to paths.txt asynchronously self.selected_folder = folder_path self.root.after(0, self._add_to_paths_file_async, folder_path) # Column #3 is the "remove" column elif item and column == "#3": tags = self.tree.item(item, "tags") if tags: folder_path = tags[0] # Immediately update UI for snappy response values = self.tree.item(item, "values") size_val = values[0] self.tree.item(item, values=(size_val, "[+]", ""), tags=(folder_path, "not_added")) # Remove from paths.txt asynchronously self.root.after(0, self._remove_from_paths_file_async, folder_path) def _add_to_paths_file_async(self, folder_path): """Add to paths.txt without blocking UI.""" self.selected_folder = folder_path self._add_to_paths_file() # Silently reload in background self._load_existing_paths() def _remove_from_paths_file_async(self, folder_path): """Remove from paths.txt without blocking UI.""" self._remove_from_paths_file(folder_path) def _update_preview(self): """Update the command preview.""" if not self.selected_folder: preview_text = "No folder selected" else: folder_path = self.selected_folder # Build command cmd_parts = ['py main.py'] # Add mode if not default mode = self.mode_var.get() if mode != "default": cmd_parts.append(f'--m {mode}') # Add resolution if specified resolution = self.resolution_var.get() if resolution != "none": cmd_parts.append(f'--r {resolution}') # Add CQ if specified cq = self.cq_var.get().strip() if cq: cmd_parts.append(f'--cq {cq}') # Add path cmd_parts.append(f'"{folder_path}"') preview_text = " ".join(cmd_parts) self.preview_text.config(state=tk.NORMAL) self.preview_text.delete("1.0", tk.END) self.preview_text.insert("1.0", preview_text) self.preview_text.config(state=tk.DISABLED) def _add_to_paths_file(self): """Append the current command to paths.txt.""" if not self.selected_folder: messagebox.showwarning("Warning", "Please select a folder first") return folder_path = self.selected_folder # Check if already in file if folder_path in self.added_folders: self._show_status(f"Already added: {Path(folder_path).name}") return # Build command line - start fresh cmd_parts = [] # Add mode if not default mode = self.mode_var.get() if mode != "default": cmd_parts.append(f'--m {mode}') # Add resolution if specified resolution = self.resolution_var.get() if resolution != "none": cmd_parts.append(f'--r {resolution}') # Add CQ if specified cq = self.cq_var.get().strip() if cq: cmd_parts.append(f'--cq {cq}') # Add folder path cmd_parts.append(f'"{folder_path}"') line = " ".join(cmd_parts) # Append to paths.txt try: # Check if file exists and has content if self.paths_file.exists() and self.paths_file.stat().st_size > 0: # Read last character to check if it ends with newline with open(self.paths_file, "rb") as f: f.seek(-1, 2) # Seek to last byte last_char = f.read(1) needs_newline = last_char != b'\n' else: needs_newline = False # Write to file with open(self.paths_file, "a", encoding="utf-8") as f: if needs_newline: f.write("\n") f.write(line + "\n") # Add to tracked set self.added_folders.add(folder_path) # Silent success - show status label instead of popup self.recently_added = folder_path self._show_status(f"✓ Added: {Path(folder_path).name}") logger.info(f"Added to paths.txt: {line}") # Clear timer if exists if self.status_timer: self.root.after_cancel(self.status_timer) # Clear status after 3 seconds self.status_timer = self.root.after(3000, self._clear_status) except Exception as e: messagebox.showerror("Error", f"Failed to write to paths.txt: {e}") logger.error(f"Failed to write to paths.txt: {e}") def _remove_from_paths_file(self, folder_path): """Remove a folder from paths.txt.""" if not self.paths_file.exists(): messagebox.showwarning("Warning", "paths.txt does not exist") return try: with open(self.paths_file, "r", encoding="utf-8") as f: lines = f.readlines() # Filter out lines containing this folder path new_lines = [] found = False for line in lines: if f'"{folder_path}"' in line or f"'{folder_path}'" in line: found = True else: new_lines.append(line) if not found: messagebox.showwarning("Warning", "Path not found in paths.txt") return # Write back with open(self.paths_file, "w", encoding="utf-8") as f: f.writelines(new_lines) # Remove from tracked set self.added_folders.discard(folder_path) self._show_status(f"✓ Removed: {Path(folder_path).name}") logger.info(f"Removed from paths.txt: {folder_path}") # Clear timer if exists if self.status_timer: self.root.after_cancel(self.status_timer) # Clear status after 3 seconds self.status_timer = self.root.after(3000, self._clear_status) except Exception as e: messagebox.showerror("Error", f"Failed to remove from paths.txt: {e}") logger.error(f"Failed to remove from paths.txt: {e}") def _show_status(self, message): """Show status message in label.""" self.status_label.config(text=message, foreground="green") def _clear_status(self): """Clear status message after delay.""" self.status_label.config(text="") self.status_timer = None def _run_transcode(self): """Launch transcode.bat in a new command window.""" if not self.transcode_bat.exists(): messagebox.showerror("Error", f"transcode.bat not found at {self.transcode_bat}") return try: # Launch in new cmd window subprocess.Popen( ['cmd', '/c', f'"{self.transcode_bat}"'], cwd=str(self.transcode_bat.parent), creationflags=subprocess.CREATE_NEW_CONSOLE ) logger.info("Launched transcode.bat") except Exception as e: messagebox.showerror("Error", f"Failed to launch transcode.bat: {e}") logger.error(f"Failed to launch transcode.bat: {e}") def _view_paths_file(self): """Open paths.txt in a new window.""" if not self.paths_file.exists(): messagebox.showinfo("Info", "paths.txt does not exist yet") return try: with open(self.paths_file, "r", encoding="utf-8") as f: content = f.read() # Create new window view_window = tk.Toplevel(self.root) view_window.title("paths.txt") view_window.geometry("800x400") text_widget = tk.Text(view_window, wrap=tk.WORD) text_widget.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) text_widget.insert("1.0", content) # Add close button ttk.Button(view_window, text="Close", command=view_window.destroy).pack(pady=5) except Exception as e: messagebox.showerror("Error", f"Failed to read paths.txt: {e}") def _clear_paths_file(self): """Clear the paths.txt file.""" if not self.paths_file.exists(): messagebox.showinfo("Info", "paths.txt does not exist") return if messagebox.askyesno("Confirm", "Are you sure you want to clear paths.txt?"): try: self.paths_file.write_text("", encoding="utf-8") messagebox.showinfo("Success", "paths.txt has been cleared") logger.info("paths.txt cleared") except Exception as e: messagebox.showerror("Error", f"Failed to clear paths.txt: {e}") def _on_closing(self): """Handle window closing - cleanup timers.""" self.root.destroy() def _on_scrollbar(self, *args): """Handle scrollbar movement - load more items when scrolling.""" self.tree.yview(*args) # Check if we need to load more items if self.all_folders and self.loaded_items < len(self.all_folders): # Get scroll position first_visible = self.tree.yview()[0] last_visible = self.tree.yview()[1] # If we're past 70% scrolled, load more if last_visible > 0.7: self._load_more_items() def _load_more_items(self): """Load next batch of items into tree.""" start = self.loaded_items end = min(start + self.items_per_batch, len(self.all_folders)) for i in range(start, end): folder_name, folder_path, size = self.all_folders[i] size_str = self._format_size(size) folder_path_str = str(folder_path) # Determine button and tag if folder_path_str in self.added_folders: add_btn = "" remove_btn = "[-]" tag = "added" else: add_btn = "[+]" remove_btn = "" tag = "not_added" self.tree.insert("", "end", text=folder_name, values=(size_str, add_btn, remove_btn), tags=(folder_path_str, tag)) self.loaded_items = end def main(): root = tk.Tk() app = PathManagerGUI(root) root.mainloop() if __name__ == "__main__": main()