import difflib import hashlib import json import os import shutil import unittest from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any from uuid import uuid4 from journal.core import storage from journal.core.csharp_sidecar import call_sidecar_action from journal.core.parser import parse_journal_content, parse_journal_file PROJECT_ROOT = Path(__file__).resolve().parents[1] FIXTURES_ROOT = PROJECT_ROOT / "fixtures" ENTRY_FIXTURES = FIXTURES_ROOT / "entries" SEARCH_FIXTURES = FIXTURES_ROOT / "search" / "queries.json" VAULT_MANIFEST = FIXTURES_ROOT / "vaults" / "manifest.json" PARITY_REPORT: list[dict[str, Any]] = [] def _load_queries() -> list[dict[str, Any]]: return json.loads(SEARCH_FIXTURES.read_text(encoding="utf-8")) def _load_vault_manifest() -> dict[str, Any]: return json.loads(VAULT_MANIFEST.read_text(encoding="utf-8")) def _copy_entry_fixtures(target_dir: Path) -> None: target_dir.mkdir(parents=True, exist_ok=True) for source in sorted(ENTRY_FIXTURES.glob("*.md")): shutil.copy2(source, target_dir / source.name) def _copy_vault_fixtures(manifest: dict[str, Any], target_dir: Path) -> None: target_dir.mkdir(parents=True, exist_ok=True) for vault_row in manifest.get("vaults", []): if not isinstance(vault_row, dict): continue name = vault_row.get("vault_file") if not isinstance(name, str): continue source = FIXTURES_ROOT / "vaults" / name shutil.copy2(source, target_dir / name) def _sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: while True: chunk = handle.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() @contextmanager def _workspace(): root = PROJECT_ROOT / ".tmp" / "parity-tests" / uuid4().hex root.mkdir(parents=True, exist_ok=True) try: yield root finally: shutil.rmtree(root, ignore_errors=True) def _normalize_for_json(value: Any) -> Any: if isinstance(value, dict): return {str(k): _normalize_for_json(v) for k, v in sorted(value.items(), key=lambda item: str(item[0]))} if isinstance(value, list): return [_normalize_for_json(item) for item in value] if isinstance(value, tuple): return [_normalize_for_json(item) for item in value] return value def _record_parity(name: str, python_result: Any, csharp_result: Any) -> dict[str, Any]: normalized_python = _normalize_for_json(python_result) normalized_csharp = _normalize_for_json(csharp_result) python_json = json.dumps(normalized_python, indent=2, ensure_ascii=True, sort_keys=True) csharp_json = json.dumps(normalized_csharp, indent=2, ensure_ascii=True, sort_keys=True) match = python_json == csharp_json diff = "" if not match: diff = "\n".join( difflib.unified_diff( python_json.splitlines(), csharp_json.splitlines(), fromfile="python_result", tofile="csharp_result", lineterm="", ) ) row = { "name": name, "python_result": normalized_python, "csharp_result": normalized_csharp, "match": match, "diff": diff, } PARITY_REPORT.append(row) return row def _normalize_search_results(results: list[dict[str, Any]]) -> list[tuple[str, str]]: normalized: list[tuple[str, str]] = [] for item in results: date_value = item.get("Date") or item.get("date") file_name = item.get("FileName") or item.get("fileName") if isinstance(date_value, str) and isinstance(file_name, str): normalized.append((date_value, file_name)) return sorted(normalized, key=lambda row: row[1]) def _python_search(data_dir: Path, payload: dict[str, Any]) -> list[tuple[str, str]]: query = (payload.get("query") or "").strip() section = (payload.get("section") or "").strip() tags = {v.strip() for v in payload.get("tags", []) if isinstance(v, str) and v.strip()} types = {v.strip() for v in payload.get("types", []) if isinstance(v, str) and v.strip()} checked = {v.strip() for v in payload.get("checked", []) if isinstance(v, str) and v.strip()} unchecked = {v.strip() for v in payload.get("unchecked", []) if isinstance(v, str) and v.strip()} start_date = _parse_optional_date(payload.get("startDate")) end_date = _parse_optional_date(payload.get("endDate")) if start_date and end_date and start_date > end_date: raise ValueError("startDate cannot be after endDate.") results: list[tuple[str, str]] = [] for file_path in sorted(data_dir.glob("*.md"), key=lambda p: p.name): entry = parse_journal_file(str(file_path)) entry_date = _parse_optional_date(entry.date) if (start_date or end_date) and entry_date is None: continue if start_date and entry_date and entry_date < start_date: continue if end_date and entry_date and entry_date > end_date: continue if query: haystack = entry.get_section(section) if section else entry.raw_content if query.lower() not in haystack.lower(): continue if tags or types: matched_fragment = False for fragment in entry.fragments: type_ok = not types or fragment.type in types tag_ok = not tags or any(tag in tags for tag in fragment.tags) if type_ok and tag_ok: matched_fragment = True break if not matched_fragment: continue if checked or unchecked: matched_checkbox = False for parsed_section in entry.sections.values(): for checkbox_text, is_checked in parsed_section.checkboxes.items(): if checked and is_checked and checkbox_text in checked: matched_checkbox = True break if unchecked and (not is_checked) and checkbox_text in unchecked: matched_checkbox = True break if matched_checkbox: break if not matched_checkbox: continue results.append((entry.date, file_path.name)) return sorted(results, key=lambda row: row[1]) def _parse_optional_date(value: str | None): if not value or not isinstance(value, str): return None try: return datetime.strptime(value.strip(), "%Y-%m-%d").date() except ValueError: return None class ParityHarnessTests(unittest.TestCase): @classmethod def tearDownClass(cls) -> None: report_path = Path( os.environ.get( "PARITY_HARNESS_REPORT", str(PROJECT_ROOT / "logs" / "parity_harness_results.json"), ) ) report_path.parent.mkdir(parents=True, exist_ok=True) payload = { "generated_at_utc": datetime.now(timezone.utc).isoformat(), "total_cases": len(PARITY_REPORT), "passed_cases": sum(1 for case in PARITY_REPORT if case["match"]), "failed_cases": sum(1 for case in PARITY_REPORT if not case["match"]), "cases": PARITY_REPORT, } report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=True) + "\n", encoding="utf-8") def test_entries_list_load_parity(self): with _workspace() as root: data_dir = root / "data" _copy_entry_fixtures(data_dir) csharp_list = call_sidecar_action( "entries.list", payload={"dataDirectory": str(data_dir)}, ) self.assertIsInstance(csharp_list, list) csharp_names = sorted( [ item.get("FileName") or item.get("fileName") for item in csharp_list if isinstance(item, dict) ] ) python_names = sorted([path.name for path in data_dir.glob("*.md")]) row = _record_parity("entries.list", python_names, csharp_names) self.assertTrue(row["match"], row["diff"]) for name in python_names: file_path = data_dir / name csharp_loaded = call_sidecar_action( "entries.load", payload={"filePath": str(file_path)}, ) self.assertIsInstance(csharp_loaded, dict) csharp_raw = csharp_loaded.get("RawContent") or csharp_loaded.get("rawContent") python_raw = storage._strip_rich_html(file_path.read_text(encoding="utf-8")) # pylint: disable=protected-access row = _record_parity(f"entries.load::{name}", python_raw, csharp_raw) self.assertTrue(row["match"], row["diff"]) def test_entries_save_merge_parity(self): with _workspace() as root: data_dir = root / "data" _copy_entry_fixtures(data_dir) target = data_dir / "2026-01-05.md" original = target.read_text(encoding="utf-8") new_content = ( "**Date:** 2026-01-05\n\n" "## Triggers\n" "Crowded grocery store caused severe panic.\n\n" "## Reflections\n" "Added one new thought after grounding.\n" ) python_existing = parse_journal_content(original, target.stem) python_incoming = parse_journal_content(new_content, target.stem) python_existing.merge_with(python_incoming) python_markdown = python_existing.to_markdown() _ = call_sidecar_action( "entries.save", payload={ "content": new_content, "filePath": str(target), "mode": "Daily", }, ) csharp_markdown = target.read_text(encoding="utf-8") python_entry = parse_journal_content(python_markdown, target.stem) csharp_entry = parse_journal_content(csharp_markdown, target.stem) row = _record_parity( "entries.save::merge", { "date": python_entry.date, "triggers": python_entry.get_section("Triggers").strip(), "reflections": python_entry.get_section("Reflections").strip(), }, { "date": csharp_entry.date, "triggers": csharp_entry.get_section("Triggers").strip(), "reflections": csharp_entry.get_section("Reflections").strip(), }, ) self.assertTrue(row["match"], row["diff"]) def test_search_parity_against_python_and_expected_ids(self): with _workspace() as root: data_dir = root / "data" _copy_entry_fixtures(data_dir) queries = _load_queries() for case in queries: case_name = str(case.get("name", "unnamed")) payload = dict(case.get("payload", {})) payload["dataDirectory"] = str(data_dir) python_result = _python_search(data_dir, payload) csharp_result = call_sidecar_action("search.entries", payload=payload) self.assertIsInstance(csharp_result, list) csharp_normalized = _normalize_search_results(csharp_result) parity_row = _record_parity(f"search.entries::{case_name}", python_result, csharp_normalized) self.assertTrue(parity_row["match"], parity_row["diff"]) expected_file_names = sorted(case.get("expected_file_names", [])) expected_row = _record_parity( f"search.expected::{case_name}", expected_file_names, [item[1] for item in csharp_normalized], ) self.assertTrue(expected_row["match"], expected_row["diff"]) def test_sanitizer_parity_for_html_heavy_input(self): with _workspace() as root: data_dir = root / "data" data_dir.mkdir(parents=True, exist_ok=True) target = data_dir / "2026-02-26.md" html_input = ( '

Hello World

' "" ) python_sanitized = storage._strip_rich_html(html_input) # pylint: disable=protected-access _ = call_sidecar_action( "entries.save", payload={ "content": html_input, "filePath": str(target), "mode": "Overwrite", }, ) csharp_saved = target.read_text(encoding="utf-8") row = _record_parity("sanitizer.rich_html", python_sanitized, csharp_saved) self.assertTrue(row["match"], row["diff"]) def test_vault_manifest_load_and_hash_integrity(self): manifest = _load_vault_manifest() fixture_password = manifest.get("password") self.assertIsInstance(fixture_password, str) self.assertTrue(fixture_password) with _workspace() as root: vault_dir = root / "vault" data_dir = root / "data" _copy_vault_fixtures(manifest, vault_dir) expected_hashes: dict[str, str] = {} for vault_row in manifest.get("vaults", []): for entry_row in vault_row.get("expected_entries", []): expected_hashes[str(entry_row["file_name"])] = str(entry_row["sha256"]) loaded = call_sidecar_action( "vault.load_all", payload={ "password": fixture_password, "vaultDirectory": str(vault_dir), "dataDirectory": str(data_dir), }, ) self.assertTrue(bool(loaded), "Expected fixture vaults to load with manifest password.") actual_hashes: dict[str, str] = {} for file_path in sorted(data_dir.glob("*.md"), key=lambda p: p.name): actual_hashes[file_path.name] = _sha256_file(file_path) row = _record_parity("vault.load_all::hashes", expected_hashes, actual_hashes) self.assertTrue(row["match"], row["diff"]) def test_vault_wrong_password_preserves_bytes(self): manifest = _load_vault_manifest() wrong_password = manifest.get("wrong_password") self.assertIsInstance(wrong_password, str) self.assertTrue(wrong_password) with _workspace() as root: vault_dir = root / "vault" data_dir = root / "data" _copy_vault_fixtures(manifest, vault_dir) before_hashes = {path.name: _sha256_file(path) for path in sorted(vault_dir.glob("*.vault"), key=lambda p: p.name)} loaded = call_sidecar_action( "vault.load_all", payload={ "password": wrong_password, "vaultDirectory": str(vault_dir), "dataDirectory": str(data_dir), }, ) self.assertFalse(bool(loaded), "Wrong password should fail vault.load_all.") after_hashes = {path.name: _sha256_file(path) for path in sorted(vault_dir.glob("*.vault"), key=lambda p: p.name)} row = _record_parity("vault.load_all::wrong_password_invariant", before_hashes, after_hashes) self.assertTrue(row["match"], row["diff"]) if __name__ == "__main__": unittest.main()