import argparse from argparse import Namespace import getpass import subprocess import sys import os import signal import time from pathlib import Path from datetime import datetime, date sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) from journal.core.storage import rebuild_all_vaults, load_all_vaults from journal.core.parser import parse_journal_file from journal.core.models import JournalEntry from journal.core.config import DATA_DIR, PID_FILE, PROJECT_ROOT class Args(Namespace): """Typed namespace for command-line arguments.""" command: str = "" # Vault and Server action: str | None = None # Search query: str | None = None tag: list[str] | None = None type: list[str] | None = None start_date: date | None = None end_date: date | None = None section: str | None = None checked: list[str] | None = None unchecked: list[str] | None = None # Chat prompt: str | None = None def main(): parser = argparse.ArgumentParser( description="A command-line interface for your journal." ) subparsers = parser.add_subparsers(dest="command", required=True) # Vault commands vault_parser = subparsers.add_parser("vault", help="Manage the encrypted vault.") _ = vault_parser.add_argument( "action", choices=["save", "load"], help="The action to perform on the vault." ) # Search command search_parser = subparsers.add_parser("search", help="Search your journal entries.") _ = search_parser.add_argument( "query", nargs="?", default=None, help="The text to search for." ) _ = search_parser.add_argument( "--tag", "-t", action="append", help="Filter by tag (can be used multiple times).", ) _ = search_parser.add_argument( "--type", "-y", action="append", help="Filter by fragment type (e.g., !FLASHBACK).", ) _ = search_parser.add_argument( "--start-date", "-s", type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(), help="Filter by start date (YYYY-MM-DD).", ) _ = search_parser.add_argument( "--end-date", "-e", type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(), help="Filter by end date (YYYY-MM-DD).", ) _ = search_parser.add_argument( "--section", "-sec", help="Search for query within a specific section title." ) _ = search_parser.add_argument( "--checked", "-chk", action="append", help="Filter by checked checkbox text (can be used multiple times).", ) _ = search_parser.add_argument( "--unchecked", "-uchk", action="append", help="Filter by unchecked checkbox text (can be used multiple times).", ) # Server commands server_parser = subparsers.add_parser("server", help="Manage the NiceGUI server.") _ = server_parser.add_argument( "action", choices=["start", "stop"], help="Start or stop the server." ) # Chat command chat_parser = subparsers.add_parser("chat", help="Chat with the AI.") _ = chat_parser.add_argument("prompt", help="The prompt to send to the AI.") # Devices command devices_parser = subparsers.add_parser("devices", help="Manage hardware devices.") _ = devices_parser.add_argument( "action", choices=["list"], help="List available devices (e.g., microphones)." ) args = parser.parse_args(namespace=Args()) if args.command == "vault": password = getpass.getpass("Vault password: ") if args.action == "save": rebuild_all_vaults(password) elif args.action == "load": _ = load_all_vaults(password) print(f"Vault loaded. Decrypted files are in {DATA_DIR}") elif args.command == "search": if not any(DATA_DIR.iterdir()): print( "No decrypted journal entries found. Please load the vault first: journal vault load" ) return found_entries: list[JournalEntry] = [] for filepath in DATA_DIR.glob("*.md"): entry = parse_journal_file(str(filepath)) # Date filtering entry_date = datetime.strptime(entry.date, "%Y-%m-%d").date() if args.start_date and entry_date < args.start_date: continue if args.end_date and entry_date > args.end_date: continue # Content filtering content_match = False if args.query is not None: if args.section is not None: section_content = entry.get_section(args.section) if args.query.lower() in section_content.lower(): content_match = True elif args.query.lower() in entry.raw_content.lower(): content_match = True else: content_match = True # If no query, content always matches # Tag and Type filtering (for fragments) fragment_match = False if args.tag is not None or args.type is not None: for fragment in entry.fragments: if args.type is not None and fragment.type not in args.type: continue if args.tag is not None and not any( tag in args.tag for tag in fragment.tags ): continue fragment_match = True break else: fragment_match = True # If no tag/type filter, fragments always match # Checkbox filtering checkbox_match = True if args.checked is not None or args.unchecked is not None: checkbox_match = False # Assume no match until proven otherwise for _, parsed_section in entry.sections.items(): for checkbox_text, is_checked in parsed_section.checkboxes.items(): if ( args.checked is not None and checkbox_text in args.checked and is_checked ): checkbox_match = True break if ( args.unchecked is not None and checkbox_text in args.unchecked and not is_checked ): checkbox_match = True break if ( checkbox_match ): # Found a match in this section, no need to check other sections break if ( args.checked is not None or args.unchecked is not None ) and not checkbox_match: # If filters were applied but no match found continue # Skip this entry # Combine filters if content_match and fragment_match and checkbox_match: found_entries.append(entry) if found_entries: for entry in found_entries: print(f"--- {entry.date} ---") print(entry.raw_content) print("\n") else: print("No entries found matching the criteria.") elif args.command == "chat": from journal.ai.chat import get_cloud_ai_response if args.prompt: response = get_cloud_ai_response(args.prompt) print(response) elif args.command == "devices": if args.action == "list": try: import speech_recognition as sr print("Available Microphones:") for index, name in enumerate(sr.Microphone.list_microphone_names()): print(f' Index {index}: "{name}"') except Exception as e: print(f"Could not list microphones. Error: {e}") elif args.command == "server": if args.action == "start": if PID_FILE.exists(): print("Server already running (PID file exists).") sys.exit(1) # Use the same Python interpreter that is running this script. # This is more robust and portable than a hardcoded path. venv_python = sys.executable # Correct path to the UI application ui_app_path = PROJECT_ROOT / "journal" / "ui" / "main.py" command = [str(venv_python), str(ui_app_path)] print(f"Starting NiceGUI server in background: {' '.join(command)}") process = subprocess.Popen( command, cwd=PROJECT_ROOT, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) _ = PID_FILE.write_text(str(process.pid)) print(f"Server started with PID {process.pid}") elif args.action == "stop": if not PID_FILE.exists(): print("Server not running (PID file not found).") sys.exit(1) try: pid = int(PID_FILE.read_text()) if sys.platform == "win32": _ = subprocess.run( ["taskkill", "/F", "/PID", str(pid)], check=True, capture_output=True, ) print(f"Terminated server process with PID {pid}.") else: # On Unix-like systems, try graceful shutdown first print(f"Sending SIGTERM to server with PID {pid}...") os.kill(pid, signal.SIGTERM) time.sleep(3) # Give it a moment to shut down try: # Check if it's still alive, then kill it forcefully os.kill(pid, 0) print( f"Server with PID {pid} did not terminate gracefully. Sending SIGKILL." ) os.kill(pid, signal.SIGKILL) except ProcessLookupError: # This is the expected outcome if SIGTERM worked print("Server terminated gracefully.") except (ValueError, FileNotFoundError): print("PID file is invalid or missing.") except ProcessLookupError as e: print(f"Process not found: {e}") except subprocess.CalledProcessError: print( "Failed to terminate process with taskkill (it may already be gone)." ) except PermissionError as e: print(f"Permission denied to terminate process: {e}") finally: if PID_FILE.exists(): PID_FILE.unlink() print("Server stop command finished.") if __name__ == "__main__": main()