300 lines
11 KiB
Python
300 lines
11 KiB
Python
import argparse
|
|
from argparse import Namespace
|
|
import getpass
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import signal
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime, date
|
|
|
|
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
|
|
|
|
from journal.core.storage import rebuild_all_vaults, load_all_vaults
|
|
from journal.core.parser import parse_journal_file
|
|
from journal.core.models import JournalEntry
|
|
from journal.core.config import DATA_DIR, PID_FILE, PROJECT_ROOT
|
|
|
|
|
|
class Args(Namespace):
|
|
"""Typed namespace for command-line arguments."""
|
|
|
|
command: str = ""
|
|
|
|
# Vault and Server
|
|
action: str | None = None
|
|
|
|
# Search
|
|
query: str | None = None
|
|
tag: list[str] | None = None
|
|
type: list[str] | None = None
|
|
start_date: date | None = None
|
|
end_date: date | None = None
|
|
section: str | None = None
|
|
checked: list[str] | None = None
|
|
unchecked: list[str] | None = None
|
|
|
|
# Chat
|
|
prompt: str | None = None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="A command-line interface for your journal."
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# Vault commands
|
|
vault_parser = subparsers.add_parser("vault", help="Manage the encrypted vault.")
|
|
_ = vault_parser.add_argument(
|
|
"action", choices=["save", "load"], help="The action to perform on the vault."
|
|
)
|
|
|
|
# Search command
|
|
search_parser = subparsers.add_parser("search", help="Search your journal entries.")
|
|
_ = search_parser.add_argument(
|
|
"query", nargs="?", default=None, help="The text to search for."
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--tag",
|
|
"-t",
|
|
action="append",
|
|
help="Filter by tag (can be used multiple times).",
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--type",
|
|
"-y",
|
|
action="append",
|
|
help="Filter by fragment type (e.g., !FLASHBACK).",
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--start-date",
|
|
"-s",
|
|
type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(),
|
|
help="Filter by start date (YYYY-MM-DD).",
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--end-date",
|
|
"-e",
|
|
type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(),
|
|
help="Filter by end date (YYYY-MM-DD).",
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--section", "-sec", help="Search for query within a specific section title."
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--checked",
|
|
"-chk",
|
|
action="append",
|
|
help="Filter by checked checkbox text (can be used multiple times).",
|
|
)
|
|
_ = search_parser.add_argument(
|
|
"--unchecked",
|
|
"-uchk",
|
|
action="append",
|
|
help="Filter by unchecked checkbox text (can be used multiple times).",
|
|
)
|
|
|
|
# Server commands
|
|
server_parser = subparsers.add_parser("server", help="Manage the NiceGUI server.")
|
|
_ = server_parser.add_argument(
|
|
"action", choices=["start", "stop"], help="Start or stop the server."
|
|
)
|
|
|
|
# Chat command
|
|
chat_parser = subparsers.add_parser("chat", help="Chat with the AI.")
|
|
_ = chat_parser.add_argument("prompt", help="The prompt to send to the AI.")
|
|
|
|
# Devices command
|
|
devices_parser = subparsers.add_parser("devices", help="Manage hardware devices.")
|
|
_ = devices_parser.add_argument(
|
|
"action", choices=["list"], help="List available devices (e.g., microphones)."
|
|
)
|
|
|
|
args = parser.parse_args(namespace=Args())
|
|
|
|
if args.command == "vault":
|
|
password = getpass.getpass("Vault password: ")
|
|
if args.action == "save":
|
|
rebuild_all_vaults(password)
|
|
elif args.action == "load":
|
|
_ = load_all_vaults(password)
|
|
print(f"Vault loaded. Decrypted files are in {DATA_DIR}")
|
|
|
|
elif args.command == "search":
|
|
if not any(DATA_DIR.iterdir()):
|
|
print(
|
|
"No decrypted journal entries found. Please load the vault first: journal vault load"
|
|
)
|
|
return
|
|
|
|
found_entries: list[JournalEntry] = []
|
|
for filepath in DATA_DIR.glob("*.md"):
|
|
entry = parse_journal_file(str(filepath))
|
|
|
|
# Date filtering
|
|
entry_date = datetime.strptime(entry.date, "%Y-%m-%d").date()
|
|
if args.start_date and entry_date < args.start_date:
|
|
continue
|
|
if args.end_date and entry_date > args.end_date:
|
|
continue
|
|
|
|
# Content filtering
|
|
content_match = False
|
|
if args.query is not None:
|
|
if args.section is not None:
|
|
section_content = entry.get_section(args.section)
|
|
if args.query.lower() in section_content.lower():
|
|
content_match = True
|
|
elif args.query.lower() in entry.raw_content.lower():
|
|
content_match = True
|
|
else:
|
|
content_match = True # If no query, content always matches
|
|
|
|
# Tag and Type filtering (for fragments)
|
|
fragment_match = False
|
|
if args.tag is not None or args.type is not None:
|
|
for fragment in entry.fragments:
|
|
if args.type is not None and fragment.type not in args.type:
|
|
continue
|
|
if args.tag is not None and not any(
|
|
tag in args.tag for tag in fragment.tags
|
|
):
|
|
continue
|
|
fragment_match = True
|
|
break
|
|
else:
|
|
fragment_match = True # If no tag/type filter, fragments always match
|
|
|
|
# Checkbox filtering
|
|
checkbox_match = True
|
|
if args.checked is not None or args.unchecked is not None:
|
|
checkbox_match = False # Assume no match until proven otherwise
|
|
for _, parsed_section in entry.sections.items():
|
|
for checkbox_text, is_checked in parsed_section.checkboxes.items():
|
|
if (
|
|
args.checked is not None
|
|
and checkbox_text in args.checked
|
|
and is_checked
|
|
):
|
|
checkbox_match = True
|
|
break
|
|
if (
|
|
args.unchecked is not None
|
|
and checkbox_text in args.unchecked
|
|
and not is_checked
|
|
):
|
|
checkbox_match = True
|
|
break
|
|
if (
|
|
checkbox_match
|
|
): # Found a match in this section, no need to check other sections
|
|
break
|
|
if (
|
|
args.checked is not None or args.unchecked is not None
|
|
) and not checkbox_match: # If filters were applied but no match found
|
|
continue # Skip this entry
|
|
|
|
# Combine filters
|
|
if content_match and fragment_match and checkbox_match:
|
|
found_entries.append(entry)
|
|
|
|
if found_entries:
|
|
for entry in found_entries:
|
|
print(f"--- {entry.date} ---")
|
|
print(entry.raw_content)
|
|
print("\n")
|
|
else:
|
|
print("No entries found matching the criteria.")
|
|
|
|
elif args.command == "chat":
|
|
from journal.ai.chat import get_cloud_ai_response
|
|
|
|
if args.prompt:
|
|
response = get_cloud_ai_response(args.prompt)
|
|
print(response)
|
|
|
|
elif args.command == "devices":
|
|
if args.action == "list":
|
|
try:
|
|
import speech_recognition as sr
|
|
|
|
print("Available Microphones:")
|
|
for index, name in enumerate(sr.Microphone.list_microphone_names()):
|
|
print(f' Index {index}: "{name}"')
|
|
except Exception as e:
|
|
print(f"Could not list microphones. Error: {e}")
|
|
|
|
elif args.command == "server":
|
|
if args.action == "start":
|
|
if PID_FILE.exists():
|
|
print("Server already running (PID file exists).")
|
|
sys.exit(1)
|
|
|
|
# Use the same Python interpreter that is running this script.
|
|
# This is more robust and portable than a hardcoded path.
|
|
venv_python = sys.executable
|
|
|
|
# Correct path to the UI application
|
|
ui_app_path = PROJECT_ROOT / "journal" / "ui" / "main.py"
|
|
command = [str(venv_python), str(ui_app_path)]
|
|
|
|
print(f"Starting NiceGUI server in background: {' '.join(command)}")
|
|
process = subprocess.Popen(
|
|
command,
|
|
cwd=PROJECT_ROOT,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
_ = PID_FILE.write_text(str(process.pid))
|
|
print(f"Server started with PID {process.pid}")
|
|
|
|
elif args.action == "stop":
|
|
if not PID_FILE.exists():
|
|
print("Server not running (PID file not found).")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
pid = int(PID_FILE.read_text())
|
|
if sys.platform == "win32":
|
|
_ = subprocess.run(
|
|
["taskkill", "/F", "/PID", str(pid)],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
print(f"Terminated server process with PID {pid}.")
|
|
else:
|
|
# On Unix-like systems, try graceful shutdown first
|
|
print(f"Sending SIGTERM to server with PID {pid}...")
|
|
os.kill(pid, signal.SIGTERM)
|
|
time.sleep(3) # Give it a moment to shut down
|
|
try:
|
|
# Check if it's still alive, then kill it forcefully
|
|
os.kill(pid, 0)
|
|
print(
|
|
f"Server with PID {pid} did not terminate gracefully. Sending SIGKILL."
|
|
)
|
|
os.kill(pid, signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
# This is the expected outcome if SIGTERM worked
|
|
print("Server terminated gracefully.")
|
|
except (ValueError, FileNotFoundError):
|
|
print("PID file is invalid or missing.")
|
|
except ProcessLookupError as e:
|
|
print(f"Process not found: {e}")
|
|
except subprocess.CalledProcessError:
|
|
print(
|
|
"Failed to terminate process with taskkill (it may already be gone)."
|
|
)
|
|
except PermissionError as e:
|
|
print(f"Permission denied to terminate process: {e}")
|
|
finally:
|
|
if PID_FILE.exists():
|
|
PID_FILE.unlink()
|
|
print("Server stop command finished.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|