import argparse from argparse import Namespace import getpass import json import subprocess import sys import os import signal import time from pathlib import Path from datetime import datetime, date sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) from journal.core.storage import rebuild_all_vaults, load_all_vaults from journal.core.parser import parse_journal_file from journal.core.models import JournalEntry from journal.core.config import DATA_DIR, PID_FILE, PROJECT_ROOT, BACKEND_MODE from journal.core.csharp_sidecar import call_sidecar_action class Args(Namespace): """Typed namespace for command-line arguments.""" command: str = "" # Vault and Server action: str | None = None # Search query: str | None = None tag: list[str] | None = None type: list[str] | None = None start_date: date | None = None end_date: date | None = None section: str | None = None checked: list[str] | None = None unchecked: list[str] | None = None # Chat prompt: str | None = None # Fragments fragments_action: str | None = None fragment_id: str | None = None fragment_type: str | None = None fragment_tag: str | None = None fragment_description: str | None = None fragment_time: str | None = None fragment_tags: list[str] | None = None fragment_clear_tags: bool = False def main(): parser = argparse.ArgumentParser( description="A command-line interface for your journal." ) subparsers = parser.add_subparsers(dest="command", required=True) # Vault commands vault_parser = subparsers.add_parser("vault", help="Manage the encrypted vault.") _ = vault_parser.add_argument( "action", choices=["save", "load"], help="The action to perform on the vault." ) # Search command search_parser = subparsers.add_parser("search", help="Search your journal entries.") _ = search_parser.add_argument( "query", nargs="?", default=None, help="The text to search for." ) _ = search_parser.add_argument( "--tag", "-t", action="append", help="Filter by tag (can be used multiple times).", ) _ = search_parser.add_argument( "--type", "-y", action="append", help="Filter by fragment type (e.g., !FLASHBACK).", ) _ = search_parser.add_argument( "--start-date", "-s", type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(), help="Filter by start date (YYYY-MM-DD).", ) _ = search_parser.add_argument( "--end-date", "-e", type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(), help="Filter by end date (YYYY-MM-DD).", ) _ = search_parser.add_argument( "--section", "-sec", help="Search for query within a specific section title." ) _ = search_parser.add_argument( "--checked", "-chk", action="append", help="Filter by checked checkbox text (can be used multiple times).", ) _ = search_parser.add_argument( "--unchecked", "-uchk", action="append", help="Filter by unchecked checkbox text (can be used multiple times).", ) # Fragment commands fragments_parser = subparsers.add_parser( "fragments", help="Manage fragment records (hybrid C# backend mode).", ) fragment_subparsers = fragments_parser.add_subparsers( dest="fragments_action", required=True, ) _ = fragment_subparsers.add_parser("list", help="List all fragments.") fragment_create = fragment_subparsers.add_parser("create", help="Create a fragment.") _ = fragment_create.add_argument("--type", dest="fragment_type", required=True) _ = fragment_create.add_argument("--description", dest="fragment_description", required=True) _ = fragment_create.add_argument("--time", dest="fragment_time") _ = fragment_create.add_argument( "--tag", dest="fragment_tags", action="append", help="Tag value (repeat for multiple tags).", ) fragment_get = fragment_subparsers.add_parser("get", help="Get one fragment by id.") _ = fragment_get.add_argument("--id", dest="fragment_id", required=True) fragment_update = fragment_subparsers.add_parser("update", help="Update one fragment by id.") _ = fragment_update.add_argument("--id", dest="fragment_id", required=True) _ = fragment_update.add_argument("--type", dest="fragment_type") _ = fragment_update.add_argument("--description", dest="fragment_description") _ = fragment_update.add_argument("--time", dest="fragment_time") _ = fragment_update.add_argument( "--tag", dest="fragment_tags", action="append", help="Replace tags with provided values (repeat for multiple tags).", ) _ = fragment_update.add_argument( "--clear-tags", dest="fragment_clear_tags", action="store_true", help="Clear all tags.", ) fragment_delete = fragment_subparsers.add_parser("delete", help="Delete one fragment by id.") _ = fragment_delete.add_argument("--id", dest="fragment_id", required=True) fragment_search = fragment_subparsers.add_parser( "search", help="Search fragments by type/tag.", ) _ = fragment_search.add_argument("--type", dest="fragment_type") _ = fragment_search.add_argument("--tag", dest="fragment_tag") # Server commands server_parser = subparsers.add_parser("server", help="Manage the NiceGUI server.") _ = server_parser.add_argument( "action", choices=["start", "stop"], help="Start or stop the server." ) # Chat command chat_parser = subparsers.add_parser("chat", help="Chat with the AI.") _ = chat_parser.add_argument("prompt", help="The prompt to send to the AI.") # Devices command devices_parser = subparsers.add_parser("devices", help="Manage hardware devices.") _ = devices_parser.add_argument( "action", choices=["list"], help="List available devices (e.g., microphones)." ) args = parser.parse_args(namespace=Args()) if args.command == "vault": password = getpass.getpass("Vault password: ") if args.action == "save": rebuild_all_vaults(password) elif args.action == "load": _ = load_all_vaults(password) print(f"Vault loaded. Decrypted files are in {DATA_DIR}") elif args.command == "search": if not any(DATA_DIR.iterdir()): print( "No decrypted journal entries found. Please load the vault first: journal vault load" ) return if BACKEND_MODE == "csharp-hybrid": payload: dict[str, object] = { "dataDirectory": str(DATA_DIR), } if args.query: payload["query"] = args.query if args.section: payload["section"] = args.section if args.start_date: payload["startDate"] = args.start_date.strftime("%Y-%m-%d") if args.end_date: payload["endDate"] = args.end_date.strftime("%Y-%m-%d") if args.tag: payload["tags"] = args.tag if args.type: payload["types"] = args.type if args.checked: payload["checked"] = args.checked if args.unchecked: payload["unchecked"] = args.unchecked try: results = call_sidecar_action( "search.entries", payload=payload, timeout_seconds=180, ) except Exception as e: print(f"Hybrid search failed: {e}") return if not isinstance(results, list) or not results: print("No entries found matching the criteria.") return found_any = False for item in results: if not isinstance(item, dict): continue date_value = item.get("Date") or item.get("date") or "unknown-date" raw_content = item.get("RawContent") or item.get("rawContent") or "" print(f"--- {date_value} ---") print(raw_content) print("\n") found_any = True if not found_any: print("No entries found matching the criteria.") return found_entries: list[JournalEntry] = [] for filepath in DATA_DIR.glob("*.md"): entry = parse_journal_file(str(filepath)) # Date filtering entry_date = datetime.strptime(entry.date, "%Y-%m-%d").date() if args.start_date and entry_date < args.start_date: continue if args.end_date and entry_date > args.end_date: continue # Content filtering content_match = False if args.query is not None: if args.section is not None: section_content = entry.get_section(args.section) if args.query.lower() in section_content.lower(): content_match = True elif args.query.lower() in entry.raw_content.lower(): content_match = True else: content_match = True # If no query, content always matches # Tag and Type filtering (for fragments) fragment_match = False if args.tag is not None or args.type is not None: for fragment in entry.fragments: if args.type is not None and fragment.type not in args.type: continue if args.tag is not None and not any( tag in args.tag for tag in fragment.tags ): continue fragment_match = True break else: fragment_match = True # If no tag/type filter, fragments always match # Checkbox filtering checkbox_match = True if args.checked is not None or args.unchecked is not None: checkbox_match = False # Assume no match until proven otherwise for _, parsed_section in entry.sections.items(): for checkbox_text, is_checked in parsed_section.checkboxes.items(): if ( args.checked is not None and checkbox_text in args.checked and is_checked ): checkbox_match = True break if ( args.unchecked is not None and checkbox_text in args.unchecked and not is_checked ): checkbox_match = True break if ( checkbox_match ): # Found a match in this section, no need to check other sections break if ( args.checked is not None or args.unchecked is not None ) and not checkbox_match: # If filters were applied but no match found continue # Skip this entry # Combine filters if content_match and fragment_match and checkbox_match: found_entries.append(entry) if found_entries: for entry in found_entries: print(f"--- {entry.date} ---") print(entry.raw_content) print("\n") else: print("No entries found matching the criteria.") elif args.command == "fragments": if BACKEND_MODE != "csharp-hybrid": print("Fragments CLI requires JOURNAL_BACKEND_MODE=csharp-hybrid.") return try: if args.fragments_action == "list": result = call_sidecar_action("fragments.list") _print_fragments(result) return if args.fragments_action == "create": payload: dict[str, object] = { "type": args.fragment_type or "", "description": args.fragment_description or "", } if args.fragment_time: payload["time"] = args.fragment_time if args.fragment_tags: payload["tags"] = args.fragment_tags created = call_sidecar_action("fragments.create", payload=payload) print("Fragment created.") _print_json(created) return if args.fragments_action == "get": fragment = call_sidecar_action( "fragments.get", command_fields={"id": args.fragment_id}, ) if fragment is None: print("Fragment not found.") return _print_json(fragment) return if args.fragments_action == "update": payload: dict[str, object] = {} if args.fragment_type is not None: payload["type"] = args.fragment_type if args.fragment_description is not None: payload["description"] = args.fragment_description if args.fragment_time is not None: payload["time"] = args.fragment_time if args.fragment_clear_tags: payload["tags"] = [] elif args.fragment_tags is not None: payload["tags"] = args.fragment_tags if not payload: print("No update fields provided. Use --type/--description/--time/--tag/--clear-tags.") return updated = call_sidecar_action( "fragments.update", payload=payload, command_fields={"id": args.fragment_id}, ) if updated: print("Fragment updated.") else: print("Fragment not found.") return if args.fragments_action == "delete": deleted = call_sidecar_action( "fragments.delete", command_fields={"id": args.fragment_id}, ) if deleted: print("Fragment deleted.") else: print("Fragment not found.") return if args.fragments_action == "search": results = call_sidecar_action( "fragments.search", command_fields={ "type": args.fragment_type, "tag": args.fragment_tag, }, ) _print_fragments(results) return except Exception as e: print(f"Fragment command failed: {e}") return elif args.command == "chat": from journal.ai.chat import get_cloud_ai_response if args.prompt: response = get_cloud_ai_response(args.prompt) print(response) elif args.command == "devices": if args.action == "list": try: import speech_recognition as sr print("Available Microphones:") for index, name in enumerate(sr.Microphone.list_microphone_names()): print(f' Index {index}: "{name}"') except Exception as e: print(f"Could not list microphones. Error: {e}") elif args.command == "server": if args.action == "start": if PID_FILE.exists(): print("Server already running (PID file exists).") sys.exit(1) # Use the same Python interpreter that is running this script. # This is more robust and portable than a hardcoded path. venv_python = sys.executable # Correct path to the UI application ui_app_path = PROJECT_ROOT / "journal" / "ui" / "main.py" command = [str(venv_python), str(ui_app_path)] print(f"Starting NiceGUI server in background: {' '.join(command)}") process = subprocess.Popen( command, cwd=PROJECT_ROOT, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) _ = PID_FILE.write_text(str(process.pid)) print(f"Server started with PID {process.pid}") elif args.action == "stop": if not PID_FILE.exists(): print("Server not running (PID file not found).") sys.exit(1) try: pid = int(PID_FILE.read_text()) if sys.platform == "win32": _ = subprocess.run( ["taskkill", "/F", "/PID", str(pid)], check=True, capture_output=True, ) print(f"Terminated server process with PID {pid}.") else: # On Unix-like systems, try graceful shutdown first print(f"Sending SIGTERM to server with PID {pid}...") os.kill(pid, signal.SIGTERM) time.sleep(3) # Give it a moment to shut down try: # Check if it's still alive, then kill it forcefully os.kill(pid, 0) print( f"Server with PID {pid} did not terminate gracefully. Sending SIGKILL." ) os.kill(pid, signal.SIGKILL) except ProcessLookupError: # This is the expected outcome if SIGTERM worked print("Server terminated gracefully.") except (ValueError, FileNotFoundError): print("PID file is invalid or missing.") except ProcessLookupError as e: print(f"Process not found: {e}") except subprocess.CalledProcessError: print( "Failed to terminate process with taskkill (it may already be gone)." ) except PermissionError as e: print(f"Permission denied to terminate process: {e}") finally: if PID_FILE.exists(): PID_FILE.unlink() print("Server stop command finished.") def _print_json(value: object) -> None: print(json.dumps(value, indent=2, ensure_ascii=False)) def _print_fragments(value: object) -> None: if not isinstance(value, list) or not value: print("No fragments found.") return for item in value: _print_json(item) if __name__ == "__main__": main()