402 lines
16 KiB
Python
402 lines
16 KiB
Python
import difflib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import unittest
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
from journal.core import storage
|
|
from journal.core.csharp_sidecar import call_sidecar_action
|
|
from journal.core.parser import parse_journal_content, parse_journal_file
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
FIXTURES_ROOT = PROJECT_ROOT / "fixtures"
|
|
ENTRY_FIXTURES = FIXTURES_ROOT / "entries"
|
|
SEARCH_FIXTURES = FIXTURES_ROOT / "search" / "queries.json"
|
|
VAULT_MANIFEST = FIXTURES_ROOT / "vaults" / "manifest.json"
|
|
PARITY_REPORT: list[dict[str, Any]] = []
|
|
|
|
|
|
def _load_queries() -> list[dict[str, Any]]:
|
|
return json.loads(SEARCH_FIXTURES.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _load_vault_manifest() -> dict[str, Any]:
|
|
return json.loads(VAULT_MANIFEST.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _copy_entry_fixtures(target_dir: Path) -> None:
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
for source in sorted(ENTRY_FIXTURES.glob("*.md")):
|
|
shutil.copy2(source, target_dir / source.name)
|
|
|
|
|
|
def _copy_vault_fixtures(manifest: dict[str, Any], target_dir: Path) -> None:
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
for vault_row in manifest.get("vaults", []):
|
|
if not isinstance(vault_row, dict):
|
|
continue
|
|
name = vault_row.get("vault_file")
|
|
if not isinstance(name, str):
|
|
continue
|
|
source = FIXTURES_ROOT / "vaults" / name
|
|
shutil.copy2(source, target_dir / name)
|
|
|
|
|
|
def _sha256_file(path: Path) -> str:
|
|
digest = hashlib.sha256()
|
|
with path.open("rb") as handle:
|
|
while True:
|
|
chunk = handle.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
@contextmanager
|
|
def _workspace():
|
|
root = PROJECT_ROOT / ".tmp" / "parity-tests" / uuid4().hex
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
yield root
|
|
finally:
|
|
shutil.rmtree(root, ignore_errors=True)
|
|
|
|
|
|
def _normalize_for_json(value: Any) -> Any:
|
|
if isinstance(value, dict):
|
|
return {str(k): _normalize_for_json(v) for k, v in sorted(value.items(), key=lambda item: str(item[0]))}
|
|
if isinstance(value, list):
|
|
return [_normalize_for_json(item) for item in value]
|
|
if isinstance(value, tuple):
|
|
return [_normalize_for_json(item) for item in value]
|
|
return value
|
|
|
|
|
|
def _record_parity(name: str, python_result: Any, csharp_result: Any) -> dict[str, Any]:
|
|
normalized_python = _normalize_for_json(python_result)
|
|
normalized_csharp = _normalize_for_json(csharp_result)
|
|
python_json = json.dumps(normalized_python, indent=2, ensure_ascii=True, sort_keys=True)
|
|
csharp_json = json.dumps(normalized_csharp, indent=2, ensure_ascii=True, sort_keys=True)
|
|
match = python_json == csharp_json
|
|
diff = ""
|
|
if not match:
|
|
diff = "\n".join(
|
|
difflib.unified_diff(
|
|
python_json.splitlines(),
|
|
csharp_json.splitlines(),
|
|
fromfile="python_result",
|
|
tofile="csharp_result",
|
|
lineterm="",
|
|
)
|
|
)
|
|
row = {
|
|
"name": name,
|
|
"python_result": normalized_python,
|
|
"csharp_result": normalized_csharp,
|
|
"match": match,
|
|
"diff": diff,
|
|
}
|
|
PARITY_REPORT.append(row)
|
|
return row
|
|
|
|
|
|
def _normalize_search_results(results: list[dict[str, Any]]) -> list[tuple[str, str]]:
|
|
normalized: list[tuple[str, str]] = []
|
|
for item in results:
|
|
date_value = item.get("Date") or item.get("date")
|
|
file_name = item.get("FileName") or item.get("fileName")
|
|
if isinstance(date_value, str) and isinstance(file_name, str):
|
|
normalized.append((date_value, file_name))
|
|
return sorted(normalized, key=lambda row: row[1])
|
|
|
|
|
|
def _python_search(data_dir: Path, payload: dict[str, Any]) -> list[tuple[str, str]]:
|
|
query = (payload.get("query") or "").strip()
|
|
section = (payload.get("section") or "").strip()
|
|
tags = {v.strip() for v in payload.get("tags", []) if isinstance(v, str) and v.strip()}
|
|
types = {v.strip() for v in payload.get("types", []) if isinstance(v, str) and v.strip()}
|
|
checked = {v.strip() for v in payload.get("checked", []) if isinstance(v, str) and v.strip()}
|
|
unchecked = {v.strip() for v in payload.get("unchecked", []) if isinstance(v, str) and v.strip()}
|
|
|
|
start_date = _parse_optional_date(payload.get("startDate"))
|
|
end_date = _parse_optional_date(payload.get("endDate"))
|
|
if start_date and end_date and start_date > end_date:
|
|
raise ValueError("startDate cannot be after endDate.")
|
|
|
|
results: list[tuple[str, str]] = []
|
|
for file_path in sorted(data_dir.glob("*.md"), key=lambda p: p.name):
|
|
entry = parse_journal_file(str(file_path))
|
|
entry_date = _parse_optional_date(entry.date)
|
|
|
|
if (start_date or end_date) and entry_date is None:
|
|
continue
|
|
if start_date and entry_date and entry_date < start_date:
|
|
continue
|
|
if end_date and entry_date and entry_date > end_date:
|
|
continue
|
|
|
|
if query:
|
|
haystack = entry.get_section(section) if section else entry.raw_content
|
|
if query.lower() not in haystack.lower():
|
|
continue
|
|
|
|
if tags or types:
|
|
matched_fragment = False
|
|
for fragment in entry.fragments:
|
|
type_ok = not types or fragment.type in types
|
|
tag_ok = not tags or any(tag in tags for tag in fragment.tags)
|
|
if type_ok and tag_ok:
|
|
matched_fragment = True
|
|
break
|
|
if not matched_fragment:
|
|
continue
|
|
|
|
if checked or unchecked:
|
|
matched_checkbox = False
|
|
for parsed_section in entry.sections.values():
|
|
for checkbox_text, is_checked in parsed_section.checkboxes.items():
|
|
if checked and is_checked and checkbox_text in checked:
|
|
matched_checkbox = True
|
|
break
|
|
if unchecked and (not is_checked) and checkbox_text in unchecked:
|
|
matched_checkbox = True
|
|
break
|
|
if matched_checkbox:
|
|
break
|
|
if not matched_checkbox:
|
|
continue
|
|
|
|
results.append((entry.date, file_path.name))
|
|
|
|
return sorted(results, key=lambda row: row[1])
|
|
|
|
|
|
def _parse_optional_date(value: str | None):
|
|
if not value or not isinstance(value, str):
|
|
return None
|
|
try:
|
|
return datetime.strptime(value.strip(), "%Y-%m-%d").date()
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
class ParityHarnessTests(unittest.TestCase):
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
report_path = Path(
|
|
os.environ.get(
|
|
"PARITY_HARNESS_REPORT",
|
|
str(PROJECT_ROOT / "logs" / "parity_harness_results.json"),
|
|
)
|
|
)
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
|
|
"total_cases": len(PARITY_REPORT),
|
|
"passed_cases": sum(1 for case in PARITY_REPORT if case["match"]),
|
|
"failed_cases": sum(1 for case in PARITY_REPORT if not case["match"]),
|
|
"cases": PARITY_REPORT,
|
|
}
|
|
report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
|
|
|
|
def test_entries_list_load_parity(self):
|
|
with _workspace() as root:
|
|
data_dir = root / "data"
|
|
_copy_entry_fixtures(data_dir)
|
|
|
|
csharp_list = call_sidecar_action(
|
|
"entries.list",
|
|
payload={"dataDirectory": str(data_dir)},
|
|
)
|
|
self.assertIsInstance(csharp_list, list)
|
|
|
|
csharp_names = sorted(
|
|
[
|
|
item.get("FileName") or item.get("fileName")
|
|
for item in csharp_list
|
|
if isinstance(item, dict)
|
|
]
|
|
)
|
|
python_names = sorted([path.name for path in data_dir.glob("*.md")])
|
|
row = _record_parity("entries.list", python_names, csharp_names)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
for name in python_names:
|
|
file_path = data_dir / name
|
|
csharp_loaded = call_sidecar_action(
|
|
"entries.load",
|
|
payload={"filePath": str(file_path)},
|
|
)
|
|
self.assertIsInstance(csharp_loaded, dict)
|
|
csharp_raw = csharp_loaded.get("RawContent") or csharp_loaded.get("rawContent")
|
|
python_raw = storage._strip_rich_html(file_path.read_text(encoding="utf-8")) # pylint: disable=protected-access
|
|
row = _record_parity(f"entries.load::{name}", python_raw, csharp_raw)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
def test_entries_save_merge_parity(self):
|
|
with _workspace() as root:
|
|
data_dir = root / "data"
|
|
_copy_entry_fixtures(data_dir)
|
|
target = data_dir / "2026-01-05.md"
|
|
original = target.read_text(encoding="utf-8")
|
|
|
|
new_content = (
|
|
"**Date:** 2026-01-05\n\n"
|
|
"## Triggers\n"
|
|
"Crowded grocery store caused severe panic.\n\n"
|
|
"## Reflections\n"
|
|
"Added one new thought after grounding.\n"
|
|
)
|
|
|
|
python_existing = parse_journal_content(original, target.stem)
|
|
python_incoming = parse_journal_content(new_content, target.stem)
|
|
python_existing.merge_with(python_incoming)
|
|
python_markdown = python_existing.to_markdown()
|
|
|
|
_ = call_sidecar_action(
|
|
"entries.save",
|
|
payload={
|
|
"content": new_content,
|
|
"filePath": str(target),
|
|
"mode": "Daily",
|
|
},
|
|
)
|
|
|
|
csharp_markdown = target.read_text(encoding="utf-8")
|
|
python_entry = parse_journal_content(python_markdown, target.stem)
|
|
csharp_entry = parse_journal_content(csharp_markdown, target.stem)
|
|
row = _record_parity(
|
|
"entries.save::merge",
|
|
{
|
|
"date": python_entry.date,
|
|
"triggers": python_entry.get_section("Triggers").strip(),
|
|
"reflections": python_entry.get_section("Reflections").strip(),
|
|
},
|
|
{
|
|
"date": csharp_entry.date,
|
|
"triggers": csharp_entry.get_section("Triggers").strip(),
|
|
"reflections": csharp_entry.get_section("Reflections").strip(),
|
|
},
|
|
)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
def test_search_parity_against_python_and_expected_ids(self):
|
|
with _workspace() as root:
|
|
data_dir = root / "data"
|
|
_copy_entry_fixtures(data_dir)
|
|
queries = _load_queries()
|
|
|
|
for case in queries:
|
|
case_name = str(case.get("name", "unnamed"))
|
|
payload = dict(case.get("payload", {}))
|
|
payload["dataDirectory"] = str(data_dir)
|
|
|
|
python_result = _python_search(data_dir, payload)
|
|
csharp_result = call_sidecar_action("search.entries", payload=payload)
|
|
self.assertIsInstance(csharp_result, list)
|
|
csharp_normalized = _normalize_search_results(csharp_result)
|
|
|
|
parity_row = _record_parity(f"search.entries::{case_name}", python_result, csharp_normalized)
|
|
self.assertTrue(parity_row["match"], parity_row["diff"])
|
|
|
|
expected_file_names = sorted(case.get("expected_file_names", []))
|
|
expected_row = _record_parity(
|
|
f"search.expected::{case_name}",
|
|
expected_file_names,
|
|
[item[1] for item in csharp_normalized],
|
|
)
|
|
self.assertTrue(expected_row["match"], expected_row["diff"])
|
|
|
|
def test_sanitizer_parity_for_html_heavy_input(self):
|
|
with _workspace() as root:
|
|
data_dir = root / "data"
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
target = data_dir / "2026-02-26.md"
|
|
html_input = (
|
|
'<p style="font-family: Times New Roman;">Hello <b>World</b></p>'
|
|
"<ul><li>alpha</li><li>beta</li></ul>"
|
|
)
|
|
python_sanitized = storage._strip_rich_html(html_input) # pylint: disable=protected-access
|
|
_ = call_sidecar_action(
|
|
"entries.save",
|
|
payload={
|
|
"content": html_input,
|
|
"filePath": str(target),
|
|
"mode": "Overwrite",
|
|
},
|
|
)
|
|
csharp_saved = target.read_text(encoding="utf-8")
|
|
row = _record_parity("sanitizer.rich_html", python_sanitized, csharp_saved)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
def test_vault_manifest_load_and_hash_integrity(self):
|
|
manifest = _load_vault_manifest()
|
|
fixture_password = manifest.get("password")
|
|
self.assertIsInstance(fixture_password, str)
|
|
self.assertTrue(fixture_password)
|
|
|
|
with _workspace() as root:
|
|
vault_dir = root / "vault"
|
|
data_dir = root / "data"
|
|
_copy_vault_fixtures(manifest, vault_dir)
|
|
|
|
expected_hashes: dict[str, str] = {}
|
|
for vault_row in manifest.get("vaults", []):
|
|
for entry_row in vault_row.get("expected_entries", []):
|
|
expected_hashes[str(entry_row["file_name"])] = str(entry_row["sha256"])
|
|
|
|
loaded = call_sidecar_action(
|
|
"vault.load_all",
|
|
payload={
|
|
"password": fixture_password,
|
|
"vaultDirectory": str(vault_dir),
|
|
"dataDirectory": str(data_dir),
|
|
},
|
|
)
|
|
self.assertTrue(bool(loaded), "Expected fixture vaults to load with manifest password.")
|
|
|
|
actual_hashes: dict[str, str] = {}
|
|
for file_path in sorted(data_dir.glob("*.md"), key=lambda p: p.name):
|
|
actual_hashes[file_path.name] = _sha256_file(file_path)
|
|
|
|
row = _record_parity("vault.load_all::hashes", expected_hashes, actual_hashes)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
def test_vault_wrong_password_preserves_bytes(self):
|
|
manifest = _load_vault_manifest()
|
|
wrong_password = manifest.get("wrong_password")
|
|
self.assertIsInstance(wrong_password, str)
|
|
self.assertTrue(wrong_password)
|
|
|
|
with _workspace() as root:
|
|
vault_dir = root / "vault"
|
|
data_dir = root / "data"
|
|
_copy_vault_fixtures(manifest, vault_dir)
|
|
|
|
before_hashes = {path.name: _sha256_file(path) for path in sorted(vault_dir.glob("*.vault"), key=lambda p: p.name)}
|
|
loaded = call_sidecar_action(
|
|
"vault.load_all",
|
|
payload={
|
|
"password": wrong_password,
|
|
"vaultDirectory": str(vault_dir),
|
|
"dataDirectory": str(data_dir),
|
|
},
|
|
)
|
|
self.assertFalse(bool(loaded), "Wrong password should fail vault.load_all.")
|
|
after_hashes = {path.name: _sha256_file(path) for path in sorted(vault_dir.glob("*.vault"), key=lambda p: p.name)}
|
|
|
|
row = _record_parity("vault.load_all::wrong_password_invariant", before_hashes, after_hashes)
|
|
self.assertTrue(row["match"], row["diff"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|