502 lines
18 KiB
Python

import argparse
from argparse import Namespace
import getpass
import json
import subprocess
import sys
import os
import signal
import time
from pathlib import Path
from datetime import datetime, date
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
from journal.core.storage import rebuild_all_vaults, load_all_vaults
from journal.core.parser import parse_journal_file
from journal.core.models import JournalEntry
from journal.core.fragments import (
list_fragments,
create_fragment,
get_fragment,
update_fragment,
delete_fragment,
search_fragments,
)
from journal.core.config import DATA_DIR, PID_FILE, PROJECT_ROOT, BACKEND_MODE
from journal.core.csharp_sidecar import call_sidecar_action
class Args(Namespace):
"""Typed namespace for command-line arguments."""
command: str = ""
# Vault and Server
action: str | None = None
# Search
query: str | None = None
tag: list[str] | None = None
type: list[str] | None = None
start_date: date | None = None
end_date: date | None = None
section: str | None = None
checked: list[str] | None = None
unchecked: list[str] | None = None
# Chat
prompt: str | None = None
# Fragments
fragments_action: str | None = None
fragment_id: str | None = None
fragment_type: str | None = None
fragment_tag: str | None = None
fragment_description: str | None = None
fragment_time: str | None = None
fragment_tags: list[str] | None = None
fragment_clear_tags: bool = False
def main():
parser = argparse.ArgumentParser(
description="A command-line interface for your journal."
)
subparsers = parser.add_subparsers(dest="command", required=True)
# Vault commands
vault_parser = subparsers.add_parser("vault", help="Manage the encrypted vault.")
_ = vault_parser.add_argument(
"action", choices=["save", "load"], help="The action to perform on the vault."
)
# Search command
search_parser = subparsers.add_parser("search", help="Search your journal entries.")
_ = search_parser.add_argument(
"query", nargs="?", default=None, help="The text to search for."
)
_ = search_parser.add_argument(
"--tag",
"-t",
action="append",
help="Filter by tag (can be used multiple times).",
)
_ = search_parser.add_argument(
"--type",
"-y",
action="append",
help="Filter by fragment type (e.g., !FLASHBACK).",
)
_ = search_parser.add_argument(
"--start-date",
"-s",
type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(),
help="Filter by start date (YYYY-MM-DD).",
)
_ = search_parser.add_argument(
"--end-date",
"-e",
type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(),
help="Filter by end date (YYYY-MM-DD).",
)
_ = search_parser.add_argument(
"--section", "-sec", help="Search for query within a specific section title."
)
_ = search_parser.add_argument(
"--checked",
"-chk",
action="append",
help="Filter by checked checkbox text (can be used multiple times).",
)
_ = search_parser.add_argument(
"--unchecked",
"-uchk",
action="append",
help="Filter by unchecked checkbox text (can be used multiple times).",
)
# Fragment commands
fragments_parser = subparsers.add_parser(
"fragments",
help="Manage fragment records (hybrid C# backend mode).",
)
fragment_subparsers = fragments_parser.add_subparsers(
dest="fragments_action",
required=True,
)
_ = fragment_subparsers.add_parser("list", help="List all fragments.")
fragment_create = fragment_subparsers.add_parser("create", help="Create a fragment.")
_ = fragment_create.add_argument("--type", dest="fragment_type", required=True)
_ = fragment_create.add_argument("--description", dest="fragment_description", required=True)
_ = fragment_create.add_argument("--time", dest="fragment_time")
_ = fragment_create.add_argument(
"--tag",
dest="fragment_tags",
action="append",
help="Tag value (repeat for multiple tags).",
)
fragment_get = fragment_subparsers.add_parser("get", help="Get one fragment by id.")
_ = fragment_get.add_argument("--id", dest="fragment_id", required=True)
fragment_update = fragment_subparsers.add_parser("update", help="Update one fragment by id.")
_ = fragment_update.add_argument("--id", dest="fragment_id", required=True)
_ = fragment_update.add_argument("--type", dest="fragment_type")
_ = fragment_update.add_argument("--description", dest="fragment_description")
_ = fragment_update.add_argument("--time", dest="fragment_time")
_ = fragment_update.add_argument(
"--tag",
dest="fragment_tags",
action="append",
help="Replace tags with provided values (repeat for multiple tags).",
)
_ = fragment_update.add_argument(
"--clear-tags",
dest="fragment_clear_tags",
action="store_true",
help="Clear all tags.",
)
fragment_delete = fragment_subparsers.add_parser("delete", help="Delete one fragment by id.")
_ = fragment_delete.add_argument("--id", dest="fragment_id", required=True)
fragment_search = fragment_subparsers.add_parser(
"search",
help="Search fragments by type/tag.",
)
_ = fragment_search.add_argument("--type", dest="fragment_type")
_ = fragment_search.add_argument("--tag", dest="fragment_tag")
# Server commands
server_parser = subparsers.add_parser("server", help="Manage the NiceGUI server.")
_ = server_parser.add_argument(
"action", choices=["start", "stop"], help="Start or stop the server."
)
# Chat command
chat_parser = subparsers.add_parser("chat", help="Chat with the AI.")
_ = chat_parser.add_argument("prompt", help="The prompt to send to the AI.")
# Devices command
devices_parser = subparsers.add_parser("devices", help="Manage hardware devices.")
_ = devices_parser.add_argument(
"action", choices=["list"], help="List available devices (e.g., microphones)."
)
args = parser.parse_args(namespace=Args())
if args.command == "vault":
password = getpass.getpass("Vault password: ")
if args.action == "save":
rebuild_all_vaults(password)
elif args.action == "load":
_ = load_all_vaults(password)
print(f"Vault loaded. Decrypted files are in {DATA_DIR}")
elif args.command == "search":
if not any(DATA_DIR.iterdir()):
print(
"No decrypted journal entries found. Please load the vault first: journal vault load"
)
return
if BACKEND_MODE == "csharp-hybrid":
payload: dict[str, object] = {
"dataDirectory": str(DATA_DIR),
}
if args.query:
payload["query"] = args.query
if args.section:
payload["section"] = args.section
if args.start_date:
payload["startDate"] = args.start_date.strftime("%Y-%m-%d")
if args.end_date:
payload["endDate"] = args.end_date.strftime("%Y-%m-%d")
if args.tag:
payload["tags"] = args.tag
if args.type:
payload["types"] = args.type
if args.checked:
payload["checked"] = args.checked
if args.unchecked:
payload["unchecked"] = args.unchecked
try:
results = call_sidecar_action(
"search.entries",
payload=payload,
timeout_seconds=180,
)
except Exception as e:
print(f"Hybrid search failed: {e}")
return
if not isinstance(results, list) or not results:
print("No entries found matching the criteria.")
return
found_any = False
for item in results:
if not isinstance(item, dict):
continue
date_value = item.get("Date") or item.get("date") or "unknown-date"
raw_content = item.get("RawContent") or item.get("rawContent") or ""
print(f"--- {date_value} ---")
print(raw_content)
print("\n")
found_any = True
if not found_any:
print("No entries found matching the criteria.")
return
found_entries: list[JournalEntry] = []
for filepath in DATA_DIR.glob("*.md"):
entry = parse_journal_file(str(filepath))
# Date filtering
entry_date = datetime.strptime(entry.date, "%Y-%m-%d").date()
if args.start_date and entry_date < args.start_date:
continue
if args.end_date and entry_date > args.end_date:
continue
# Content filtering
content_match = False
if args.query is not None:
if args.section is not None:
section_content = entry.get_section(args.section)
if args.query.lower() in section_content.lower():
content_match = True
elif args.query.lower() in entry.raw_content.lower():
content_match = True
else:
content_match = True # If no query, content always matches
# Tag and Type filtering (for fragments)
fragment_match = False
if args.tag is not None or args.type is not None:
for fragment in entry.fragments:
if args.type is not None and fragment.type not in args.type:
continue
if args.tag is not None and not any(
tag in args.tag for tag in fragment.tags
):
continue
fragment_match = True
break
else:
fragment_match = True # If no tag/type filter, fragments always match
# Checkbox filtering
checkbox_match = True
if args.checked is not None or args.unchecked is not None:
checkbox_match = False # Assume no match until proven otherwise
for _, parsed_section in entry.sections.items():
for checkbox_text, is_checked in parsed_section.checkboxes.items():
if (
args.checked is not None
and checkbox_text in args.checked
and is_checked
):
checkbox_match = True
break
if (
args.unchecked is not None
and checkbox_text in args.unchecked
and not is_checked
):
checkbox_match = True
break
if (
checkbox_match
): # Found a match in this section, no need to check other sections
break
if (
args.checked is not None or args.unchecked is not None
) and not checkbox_match: # If filters were applied but no match found
continue # Skip this entry
# Combine filters
if content_match and fragment_match and checkbox_match:
found_entries.append(entry)
if found_entries:
for entry in found_entries:
print(f"--- {entry.date} ---")
print(entry.raw_content)
print("\n")
else:
print("No entries found matching the criteria.")
elif args.command == "fragments":
if BACKEND_MODE != "csharp-hybrid":
print("Fragments CLI requires JOURNAL_BACKEND_MODE=csharp-hybrid.")
return
try:
if args.fragments_action == "list":
result = list_fragments()
_print_fragments(result)
return
if args.fragments_action == "create":
created = create_fragment(
type_=args.fragment_type or "",
description=args.fragment_description or "",
time=args.fragment_time,
tags=args.fragment_tags,
)
print("Fragment created.")
_print_json(created)
return
if args.fragments_action == "get":
fragment = get_fragment(args.fragment_id or "")
if fragment is None:
print("Fragment not found.")
return
_print_json(fragment)
return
if args.fragments_action == "update":
updated = update_fragment(
args.fragment_id or "",
type_=args.fragment_type,
description=args.fragment_description,
time=args.fragment_time,
tags=args.fragment_tags,
clear_tags=args.fragment_clear_tags,
)
if updated:
print("Fragment updated.")
else:
print("Fragment not found.")
return
if args.fragments_action == "delete":
deleted = delete_fragment(args.fragment_id or "")
if deleted:
print("Fragment deleted.")
else:
print("Fragment not found.")
return
if args.fragments_action == "search":
results = search_fragments(
type_=args.fragment_type,
tag=args.fragment_tag,
)
_print_fragments(results)
return
except ValueError as e:
print(str(e))
return
except Exception as e:
print(f"Fragment command failed: {e}")
return
elif args.command == "chat":
from journal.ai.chat import get_cloud_ai_response
if args.prompt:
response = get_cloud_ai_response(args.prompt)
print(response)
elif args.command == "devices":
if args.action == "list":
try:
import speech_recognition as sr
print("Available Microphones:")
for index, name in enumerate(sr.Microphone.list_microphone_names()):
print(f' Index {index}: "{name}"')
except Exception as e:
print(f"Could not list microphones. Error: {e}")
elif args.command == "server":
if args.action == "start":
if PID_FILE.exists():
print("Server already running (PID file exists).")
sys.exit(1)
# Use the same Python interpreter that is running this script.
# This is more robust and portable than a hardcoded path.
venv_python = sys.executable
# Correct path to the UI application
ui_app_path = PROJECT_ROOT / "journal" / "ui" / "main.py"
command = [str(venv_python), str(ui_app_path)]
print(f"Starting NiceGUI server in background: {' '.join(command)}")
process = subprocess.Popen(
command,
cwd=PROJECT_ROOT,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_ = PID_FILE.write_text(str(process.pid))
print(f"Server started with PID {process.pid}")
elif args.action == "stop":
if not PID_FILE.exists():
print("Server not running (PID file not found).")
sys.exit(1)
try:
pid = int(PID_FILE.read_text())
if sys.platform == "win32":
_ = subprocess.run(
["taskkill", "/F", "/PID", str(pid)],
check=True,
capture_output=True,
)
print(f"Terminated server process with PID {pid}.")
else:
# On Unix-like systems, try graceful shutdown first
print(f"Sending SIGTERM to server with PID {pid}...")
os.kill(pid, signal.SIGTERM)
time.sleep(3) # Give it a moment to shut down
try:
# Check if it's still alive, then kill it forcefully
os.kill(pid, 0)
print(
f"Server with PID {pid} did not terminate gracefully. Sending SIGKILL."
)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
# This is the expected outcome if SIGTERM worked
print("Server terminated gracefully.")
except (ValueError, FileNotFoundError):
print("PID file is invalid or missing.")
except ProcessLookupError as e:
print(f"Process not found: {e}")
except subprocess.CalledProcessError:
print(
"Failed to terminate process with taskkill (it may already be gone)."
)
except PermissionError as e:
print(f"Permission denied to terminate process: {e}")
finally:
if PID_FILE.exists():
PID_FILE.unlink()
print("Server stop command finished.")
def _print_json(value: object) -> None:
print(json.dumps(value, indent=2, ensure_ascii=False))
def _print_fragments(value: object) -> None:
if not isinstance(value, list) or not value:
print("No fragments found.")
return
for item in value:
_print_json(item)
if __name__ == "__main__":
main()