From d5c8fd1bf8cc3d3dd59982662529cffce412d2b2 Mon Sep 17 00:00:00 2001 From: Stan44 Date: Sun, 11 May 2025 20:15:37 -0500 Subject: [PATCH] completed thread sync [feature:thread_sync] completed spatial queries [feature:spatial_queries] added spatial interfaces [feature:spatial_interfaces] added parallel processing [new-feature:parallel_processing] [changelog: Implemented Threading Explicit synchronization and concurrent collections] [changelog: Implemented Spatial indexing and region queries with 2D/3D support and quadtree optimization] [changelog: Added spatial interfaces Interface-level spatial query methods with support for custom filters] [changelog: Added parallel processing of chunks based on spatial queries and regions] --- build.bat | 115 +- docs/features.md | 122 ++ docs/status/ChunkManager-Status.md | 35 + scripts/README.md | 81 ++ scripts/track_progress.py | 224 +++ src/AdvChkSys/AdvChkSys.cs | 72 +- .../Dependencies/ChunkDependencyTracker.cs | 310 +++++ src/AdvChkSys/Loading/ChunkLoadingPriority.cs | 677 +++++++++ src/AdvChkSys/Spatial/ChunkExtensions.cs | 40 + src/AdvChkSys/Spatial/SpatialChunkIndex.cs | 1228 +++++++++++++++++ src/AdvChkSys/Threading/ChunkAsyncLock.cs | 167 +++ .../Threading/ChunkOperationQueue.cs | 274 ++++ .../Threading/ChunkParallelProcessor.cs | 252 ++++ src/AdvChkSys/Threading/ChunkTaskScheduler.cs | 265 ++++ .../Threading/ChunkTaskSchedulerExtensions.cs | 0 .../Threading/ChunkThreadSafetyManager.cs | 237 ++++ .../Threading/ChunkThreadingConfiguration.cs | 98 ++ .../Threading/ChunkThreadingDiagnostics.cs | 273 ++++ .../Threading/ChunkThreadingExtensions.cs | 380 +++++ .../Threading/ChunkThreadingExtensions2.cs | 223 +++ .../Threading/ChunkThreadingManager.cs | 281 ++++ .../ChunkThreadingPerformanceMonitor.cs | 257 ++++ .../LimitedConcurrencyTaskScheduler.cs | 170 +++ 23 files changed, 5716 insertions(+), 65 deletions(-) create mode 100644 docs/features.md create mode 100644 docs/status/ChunkManager-Status.md create mode 100644 scripts/README.md create mode 100644 scripts/track_progress.py create mode 100644 src/AdvChkSys/Dependencies/ChunkDependencyTracker.cs create mode 100644 src/AdvChkSys/Loading/ChunkLoadingPriority.cs create mode 100644 src/AdvChkSys/Spatial/ChunkExtensions.cs create mode 100644 src/AdvChkSys/Spatial/SpatialChunkIndex.cs create mode 100644 src/AdvChkSys/Threading/ChunkAsyncLock.cs create mode 100644 src/AdvChkSys/Threading/ChunkOperationQueue.cs create mode 100644 src/AdvChkSys/Threading/ChunkParallelProcessor.cs create mode 100644 src/AdvChkSys/Threading/ChunkTaskSchedulerExtensions.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadSafetyManager.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingConfiguration.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingDiagnostics.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingExtensions.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingExtensions2.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingManager.cs create mode 100644 src/AdvChkSys/Threading/ChunkThreadingPerformanceMonitor.cs create mode 100644 src/AdvChkSys/Threading/LimitedConcurrencyTaskScheduler.cs diff --git a/build.bat b/build.bat index 9f8b675..ad0d58d 100644 --- a/build.bat +++ b/build.bat @@ -4,17 +4,6 @@ setlocal enabledelayedexpansion :: AdvChkSys Build Script :: ====================== -:: Set colors for console output -set "RESET=[0m" -set "BRIGHT=[1m" -set "RED=[91m" -set "GREEN=[92m" -set "YELLOW=[93m" -set "BLUE=[94m" -set "MAGENTA=[95m" -set "CYAN=[96m" -set "WHITE=[97m" - :: Set build parameters set "PROJECT_DIR=src\AdvChkSys" set "BENCHMARKS_DIR=src\AdvChkSys.Benchmarks" @@ -25,22 +14,22 @@ set "VERSION=0.1.8" if not exist "%OUTPUT_DIR%" mkdir "%OUTPUT_DIR%" :: Display header -echo %BRIGHT%%CYAN%╔══════════════════════════════════════════════════════════════╗%RESET% -echo %BRIGHT%%CYAN%║ ADVANCED CHUNK SYSTEM BUILD ║%RESET% -echo %BRIGHT%%CYAN%╚══════════════════════════════════════════════════════════════╝%RESET% +echo ╔══════════════════════════════════════════════════════════════╗ +echo ║ ADVANCED CHUNK SYSTEM BUILD ║ +echo ╚══════════════════════════════════════════════════════════════╝ echo. -echo %BRIGHT%%WHITE%Version: %VERSION%%RESET% -echo %BRIGHT%%WHITE%Date: %DATE% %TIME%%RESET% +echo Version: %VERSION% +echo Date: %DATE% %TIME% echo. :: Check for .NET SDK -echo %YELLOW%Checking for .NET SDK...%RESET% +echo Checking for .NET SDK... dotnet --version > nul 2>&1 if %ERRORLEVEL% neq 0 ( - echo %RED%Error: .NET SDK not found. Please install the .NET SDK.%RESET% + echo Error: .NET SDK not found. Please install the .NET SDK. exit /b 1 ) -echo %GREEN%✓ .NET SDK found%RESET% +echo ✓ .NET SDK found echo. :: Parse command line arguments @@ -81,40 +70,40 @@ goto parse_args :end_parse_args :: Display build configuration -echo %BRIGHT%%MAGENTA%Build Configuration:%RESET% -echo %MAGENTA%• Configuration: %CONFIG%%RESET% -echo %MAGENTA%• Windows: %BUILD_WINDOWS%%RESET% -echo %MAGENTA%• Linux: %BUILD_LINUX%%RESET% -echo %MAGENTA%• macOS: %BUILD_MAC%%RESET% -echo %MAGENTA%• Benchmarks: %BUILD_BENCHMARKS%%RESET% -echo %MAGENTA%• NuGet Package: %BUILD_NUGET%%RESET% -echo %MAGENTA%• Documentation: %BUILD_DOCS%%RESET% +echo Build Configuration: +echo • Configuration: %CONFIG% +echo • Windows: %BUILD_WINDOWS% +echo • Linux: %BUILD_LINUX% +echo • macOS: %BUILD_MAC% +echo • Benchmarks: %BUILD_BENCHMARKS% +echo • NuGet Package: %BUILD_NUGET% +echo • Documentation: %BUILD_DOCS% echo. :: Clean previous builds -echo %YELLOW%Cleaning previous builds...%RESET% +echo Cleaning previous builds... if exist "%PROJECT_DIR%\bin" rd /s /q "%PROJECT_DIR%\bin" if exist "%PROJECT_DIR%\obj" rd /s /q "%PROJECT_DIR%\obj" if exist "%BENCHMARKS_DIR%\bin" rd /s /q "%BENCHMARKS_DIR%\bin" if exist "%BENCHMARKS_DIR%\obj" rd /s /q "%BENCHMARKS_DIR%\obj" -echo %GREEN%✓ Clean completed%RESET% +echo ✓ Clean completed echo. :: Restore packages -echo %YELLOW%Restoring packages...%RESET% +echo Restoring packages... dotnet restore "%PROJECT_DIR%\AdvChkSys.csproj" if %ERRORLEVEL% neq 0 ( - echo %RED%✗ Package restore failed%RESET% + echo ✗ Package restore failed exit /b 1 ) -echo %GREEN%✓ Packages restored%RESET% +echo ✓ Packages restored echo. :: Build the library (platform-agnostic) -echo %BRIGHT%%BLUE%Building AdvChkSys library...%RESET% +echo Building AdvChkSys library... dotnet build "%PROJECT_DIR%\AdvChkSys.csproj" -c %CONFIG% --no-restore if %ERRORLEVEL% neq 0 ( - echo %RED%✗ Library build failed%RESET% + echo ✗ Library build failed exit /b 1 ) @@ -128,27 +117,27 @@ if "%BUILD_WINDOWS%"=="true" ( if not exist "%OUTPUT_DIR%\windows" mkdir "%OUTPUT_DIR%\windows" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\windows\" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\windows\" - echo %GREEN%✓ Windows build completed%RESET% + echo ✓ Windows build completed ) if "%BUILD_LINUX%"=="true" ( if not exist "%OUTPUT_DIR%\linux" mkdir "%OUTPUT_DIR%\linux" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\linux\" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\linux\" - echo %GREEN%✓ Linux build completed%RESET% + echo ✓ Linux build completed ) if "%BUILD_MAC%"=="true" ( if not exist "%OUTPUT_DIR%\macos" mkdir "%OUTPUT_DIR%\macos" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\macos\" copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\macos\" - echo %GREEN%✓ macOS build completed%RESET% + echo ✓ macOS build completed ) echo. :: Build benchmarks if "%BUILD_BENCHMARKS%"=="true" ( - echo %BRIGHT%%BLUE%Building ChunkMark benchmarks...%RESET% + echo Building ChunkMark benchmarks... :: Restore benchmark packages dotnet restore "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" @@ -157,11 +146,11 @@ if "%BUILD_BENCHMARKS%"=="true" ( if "%BUILD_WINDOWS%"=="true" ( dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r win-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true if %ERRORLEVEL% neq 0 ( - echo %RED%✗ Windows benchmark build failed%RESET% + echo ✗ Windows benchmark build failed ) else ( if not exist "%OUTPUT_DIR%\benchmarks\windows" mkdir "%OUTPUT_DIR%\benchmarks\windows" copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\win-x64\publish\ChunkMark.exe" "%OUTPUT_DIR%\benchmarks\windows\" - echo %GREEN%✓ Windows benchmarks built%RESET% + echo ✓ Windows benchmarks built ) ) @@ -169,11 +158,11 @@ if "%BUILD_BENCHMARKS%"=="true" ( if "%BUILD_LINUX%"=="true" ( dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r linux-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true if %ERRORLEVEL% neq 0 ( - echo %RED%✗ Linux benchmark build failed%RESET% + echo ✗ Linux benchmark build failed ) else ( if not exist "%OUTPUT_DIR%\benchmarks\linux" mkdir "%OUTPUT_DIR%\benchmarks\linux" copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\linux-x64\publish\ChunkMark" "%OUTPUT_DIR%\benchmarks\linux\" - echo %GREEN%✓ Linux benchmarks built%RESET% + echo ✓ Linux benchmarks built ) ) @@ -181,11 +170,11 @@ if "%BUILD_BENCHMARKS%"=="true" ( if "%BUILD_MAC%"=="true" ( dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r osx-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true if %ERRORLEVEL% neq 0 ( - echo %RED%✗ macOS benchmark build failed%RESET% + echo ✗ macOS benchmark build failed ) else ( if not exist "%OUTPUT_DIR%\benchmarks\macos" mkdir "%OUTPUT_DIR%\benchmarks\macos" copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\osx-x64\publish\ChunkMark" "%OUTPUT_DIR%\benchmarks\macos\" - echo %GREEN%✓ macOS benchmarks built%RESET% + echo ✓ macOS benchmarks built ) ) echo. @@ -193,56 +182,56 @@ if "%BUILD_BENCHMARKS%"=="true" ( :: Build NuGet package if "%BUILD_NUGET%"=="true" ( - echo %BRIGHT%%BLUE%Building NuGet package...%RESET% + echo Building NuGet package... dotnet pack "%PROJECT_DIR%\AdvChkSys.csproj" -c %CONFIG% --no-build --output "%OUTPUT_DIR%\nuget" if %ERRORLEVEL% neq 0 ( - echo %RED%✗ NuGet package creation failed%RESET% + echo ✗ NuGet package creation failed exit /b 1 ) - echo %GREEN%✓ NuGet package created%RESET% + echo ✓ NuGet package created echo. ) :: Build documentation if "%BUILD_DOCS%"=="true" ( - echo %BRIGHT%%BLUE%Building documentation...%RESET% - echo %YELLOW%Documentation generation not implemented yet.%RESET% + echo Building documentation... + echo Documentation generation not implemented yet. echo. ) :: Display summary -echo %BRIGHT%%CYAN%╔══════════════════════════════════════════════════════════════╗%RESET% -echo %BRIGHT%%CYAN%║ BUILD SUMMARY ║%RESET% -echo %BRIGHT%%CYAN%╚══════════════════════════════════════════════════════════════╝%RESET% +echo ╔══════════════════════════════════════════════════════════════╗ +echo ║ BUILD SUMMARY ║ +echo ╚══════════════════════════════════════════════════════════════╝ echo. -echo %BRIGHT%%GREEN%Build completed successfully!%RESET% +echo Build completed successfully! echo. -echo %BRIGHT%%WHITE%Output files:%RESET% -echo %WHITE%• Library: %OUTPUT_DIR%\lib\AdvChkSys.dll%RESET% +echo Output files: +echo • Library: %OUTPUT_DIR%\lib\AdvChkSys.dll if "%BUILD_WINDOWS%"=="true" ( - echo %WHITE%• Windows: %OUTPUT_DIR%\windows\AdvChkSys.dll%RESET% + echo • Windows: %OUTPUT_DIR%\windows\AdvChkSys.dll if "%BUILD_BENCHMARKS%"=="true" ( - echo %WHITE%• Windows Benchmark: %OUTPUT_DIR%\benchmarks\windows\ChunkMark.exe%RESET% + echo • Windows Benchmark: %OUTPUT_DIR%\benchmarks\windows\ChunkMark.exe ) ) if "%BUILD_LINUX%"=="true" ( - echo %WHITE%• Linux: %OUTPUT_DIR%\linux\AdvChkSys.dll%RESET% + echo • Linux: %OUTPUT_DIR%\linux\AdvChkSys.dll if "%BUILD_BENCHMARKS%"=="true" ( - echo %WHITE%• Linux Benchmark: %OUTPUT_DIR%\benchmarks\linux\ChunkMark%RESET% + echo • Linux Benchmark: %OUTPUT_DIR%\benchmarks\linux\ChunkMark ) ) if "%BUILD_MAC%"=="true" ( - echo %WHITE%• macOS: %OUTPUT_DIR%\macos\AdvChkSys.dll%RESET% + echo • macOS: %OUTPUT_DIR%\macos\AdvChkSys.dll if "%BUILD_BENCHMARKS%"=="true" ( - echo %WHITE%• macOS Benchmark: %OUTPUT_DIR%\benchmarks\macos\ChunkMark%RESET% + echo • macOS Benchmark: %OUTPUT_DIR%\benchmarks\macos\ChunkMark ) ) if "%BUILD_NUGET%"=="true" ( - echo %WHITE%• NuGet: %OUTPUT_DIR%\nuget\AdvChkSys.%VERSION%.nupkg%RESET% + echo • NuGet: %OUTPUT_DIR%\nuget\AdvChkSys.%VERSION%.nupkg ) echo. -echo %BRIGHT%%CYAN%Run the build script with --help for more options.%RESET% +echo Run the build script with --help for more options. echo. endlocal \ No newline at end of file diff --git a/docs/features.md b/docs/features.md new file mode 100644 index 0000000..b9fd8f4 --- /dev/null +++ b/docs/features.md @@ -0,0 +1,122 @@ +# Feature Tracking + +## chunk_management +- Status: completed +- Description: 2D/3D chunk management with memory efficiency +- Last Update: 2025-05-10 + +## memory_efficiency +- Status: completed +- Description: LRU caching, array pooling, and air-singleton patterns +- Last Update: 2025-05-10 + +## resource_tracking +- Status: completed +- Description: Track and manage chunk resources +- Last Update: 2025-05-10 + +## async_tasks +- Status: completed +- Description: Asynchronous chunk loading and processing +- Last Update: 2025-05-10 + +## serialization +- Status: completed +- Description: Chunk serialization and deserialization +- Last Update: 2025-05-10 + +## event_system +- Status: completed +- Description: Events for load/unload/save operations +- Last Update: 2025-05-10 + +## interop +- Status: completed +- Description: Python and .NET interoperability +- Last Update: 2025-05-10 + +## spatial_queries +- Status: completed +- Description: Spatial indexing and region queries with 2D/3D support and quadtree optimization +- Last Update: 2025-11-08 + +## thread_sync +- Status: completed +- Description: Explicit synchronization and concurrent collections +- Last Update: 2025-05-11 + +## spatial_interfaces +- Status: completed +- Description: Interface-level spatial query methods with support for custom filters +- Last Update: 2025-05-11 + +## parallel_processing +- Status: completed +- Description: Parallel processing of chunks based on spatial queries and regions +- Last Update: 2025-05-11 + +## priority_loading +- Status: in_progress +- Description: Prioritized chunk loading system (partially implemented) +- Last Update: 2025-05-10 + +## dependency_tracking +- Status: in_progress +- Description: Dependency-aware disposal logic +- Last Update: 2025-05-10 + +## dispose_pattern +- Status: in_progress +- Description: Full dispose pattern with finalizers for unmanaged resources +- Last Update: 2025-05-10 + +## exception_handling +- Status: in_progress +- Description: Better async exception handling with specific types and logging +- Last Update: 2025-05-10 + +## performance_metrics +- Status: in_progress +- Description: Track load times, cache hit rates, and memory usage (partially implemented) +- Last Update: 2025-05-10 + +## progress_tracking +- Status: in_progress +- Description: Track progress and auto-update status documents (mostly implemented) +- Last Update: 2025-05-10 + +## git_integration +- Status: in_progress +- Description: Parse Git logs for status updates (partially implemented) +- Last Update: 2025-05-10 + +## feature_tracking +- Status: in_progress +- Description: Track feature statuses (partially implemented) +- Last Update: 2025-05-10 + +## doc_generation +- Status: in_progress +- Description: Auto-generate status docs and changelog (partially implemented) +- Last Update: 2025-05-10 + +## todo_management +- Status: in_progress +- Description: Automate updates based on code reviews and commits +- Last Update: 2025-05-10 + +## dependency_interfaces +- Status: planned +- Description: Interface-level dependency methods +- Last Update: 2025-05-10 + +## runtime_config +- Status: planned +- Description: Runtime-adjustable configuration options +- Last Update: 2025-05-10 + +## known_issues +- Status: in_progress +- Description: Edge chunk unload delay under high concurrency +- Last Update: 2025-05-10 + diff --git a/docs/status/ChunkManager-Status.md b/docs/status/ChunkManager-Status.md new file mode 100644 index 0000000..b534484 --- /dev/null +++ b/docs/status/ChunkManager-Status.md @@ -0,0 +1,35 @@ +# AdvChkSys Development Status + +Last updated: 2025-05-11 + +## Feature Status + +| Feature | Status | Description | Last Update | +|---------|--------|-------------|-------------| +| Chunk Management | [COMPLETED] | 2D/3D chunk management with memory efficiency | 2025-05-10 | +| Memory Efficiency | [COMPLETED] | LRU caching, array pooling, and air-singleton patterns | 2025-05-10 | +| Resource Tracking | [COMPLETED] | Track and manage chunk resources | 2025-05-10 | +| Async Tasks | [COMPLETED] | Asynchronous chunk loading and processing | 2025-05-10 | +| Serialization | [COMPLETED] | Chunk serialization and deserialization | 2025-05-10 | +| Event System | [COMPLETED] | Events for load/unload/save operations | 2025-05-10 | +| Interop | [COMPLETED] | Python and .NET interoperability | 2025-05-10 | +| Spatial Queries | [COMPLETED] | Spatial indexing and region queries with 2D/3D support and quadtree optimization | 2025-11-08 | +| Thread Sync | [COMPLETED] | Explicit synchronization and concurrent collections | 2025-05-11 | +| Spatial Interfaces | [COMPLETED] | Interface-level spatial query methods with support for custom filters | 2025-05-11 | +| Parallel Processing | [COMPLETED] | Parallel processing of chunks based on spatial queries and regions | 2025-05-11 | +| Priority Loading | [IN PROGRESS] | Prioritized chunk loading system (partially implemented) | 2025-05-10 | +| Dependency Tracking | [IN PROGRESS] | Dependency-aware disposal logic | 2025-05-10 | +| Dispose Pattern | [IN PROGRESS] | Full dispose pattern with finalizers for unmanaged resources | 2025-05-10 | +| Exception Handling | [IN PROGRESS] | Better async exception handling with specific types and logging | 2025-05-10 | +| Performance Metrics | [IN PROGRESS] | Track load times, cache hit rates, and memory usage (partially implemented) | 2025-05-10 | +| Progress Tracking | [IN PROGRESS] | Track progress and auto-update status documents (mostly implemented) | 2025-05-10 | +| Git Integration | [IN PROGRESS] | Parse Git logs for status updates (partially implemented) | 2025-05-10 | +| Feature Tracking | [IN PROGRESS] | Track feature statuses (partially implemented) | 2025-05-10 | +| Doc Generation | [IN PROGRESS] | Auto-generate status docs and changelog (partially implemented) | 2025-05-10 | +| Todo Management | [IN PROGRESS] | Automate updates based on code reviews and commits | 2025-05-10 | +| Dependency Interfaces | [PLANNED] | Interface-level dependency methods | 2025-05-10 | +| Runtime Config | [PLANNED] | Runtime-adjustable configuration options | 2025-05-10 | +| Known Issues | [IN PROGRESS] | Edge chunk unload delay under high concurrency | 2025-05-10 | + +## Recent Updates + diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..a25c9b5 --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,81 @@ +# AdvChkSys Development Scripts + +## Track Progress Script + +The `track_progress.py` script automates feature tracking, status updates, and changelog management for the AdvChkSys project. + +### Overview + +This script: +- Tracks features and their status in a human-readable Markdown file +- Generates a status document with current development progress +- Updates the changelog with tagged entries from commit messages +- Automatically detects new features from specially formatted commit messages + +### Usage + +Run the script periodically to update documentation: + +```bash +python scripts/track_progress.py +``` + +### Commit Message Tags + +The script recognizes special tags in commit messages: + +#### 1. Feature Status Updates + +``` +git commit -m "Implemented spatial queries [feature:spatial_queries]" +``` + +This marks the `spatial_queries` feature as completed and records the date and author. + +#### 2. New Features + +``` +git commit -m "Initial setup [new-feature:custom_serialization:Support for custom serialization formats]" +``` + +This adds a new feature called `custom_serialization` with the provided description and sets its status to "planned". + +#### 3. Changelog Entries + +``` +git commit -m "Fixed bug in chunk loading [changelog:Fixed race condition in async chunk loading]" +``` + +This adds an entry to the changelog with the date, message, and commit hash. + +#### 4. Status Updates + +``` +git commit -m "Completed milestone 1 [status:milestone1]" +``` + +This records a general status update. + +### Output Files + +The script generates and updates the following files: + +- `docs/features.md`: Human-readable feature tracking in Markdown format +- `docs/status/ChunkManager-Status.md`: Status document with feature progress and recent updates +- `CHANGELOG.md`: Project changelog with tagged entries from commits + +### Feature Statuses + +Features can have the following statuses: +- `planned`: Feature is planned but not yet started +- `in_progress`: Feature is currently being implemented +- `completed`: Feature has been implemented + +### Manual Editing + +You can manually edit the `docs/features.md` file to update feature descriptions, statuses, or add new features. The script will preserve your changes when it runs. + +### Requirements + +- Python 3.6+ +- Git repository \ No newline at end of file diff --git a/scripts/track_progress.py b/scripts/track_progress.py new file mode 100644 index 0000000..4ef81a4 --- /dev/null +++ b/scripts/track_progress.py @@ -0,0 +1,224 @@ +import os +import re +import sys +from datetime import datetime +from subprocess import check_output + +# Try to set UTF-8 encoding for better emoji support +sys.stderr = open( + sys.stderr.fileno(), mode="w", encoding="utf-8", errors="replace" +) # Configuration +PROGRESS_FILE = "docs/progress.json" +STATUS_DOC = "docs/status/ChunkManager-Status.md" +CHANGELOG_FILE = "CHANGELOG.md" +FEATURES_FILE = "docs/features.md" +GIT_LOG_LIMIT = 50 # number of commits to parse + +# Regular expression to match tags in commit messages +STATUS_TAG_RE = re.compile(r"\[status:(\w+)\]") +CHANGELOG_TAG_RE = re.compile(r"\[changelog:(.+?)\]") +FEATURE_TAG_RE = re.compile(r"\[feature:(\w+)\]") +NEW_FEATURE_RE = re.compile(r"\[new-feature:(\w+):(.+?)\]") + +# Ensure directories exist +os.makedirs(os.path.dirname(STATUS_DOC), exist_ok=True) +os.makedirs(os.path.dirname(FEATURES_FILE), exist_ok=True) + +# Load features from Markdown file if it exists +features = {} +if os.path.exists(FEATURES_FILE): + with open(FEATURES_FILE, "r", encoding="utf-8") as f: + content = f.read() + # Parse the markdown file + sections = re.split(r"## (\w+)", content)[1:] # Skip the header + for i in range(0, len(sections), 2): + if i + 1 < len(sections): + feature_key = sections[i] + feature_content = sections[i + 1] + + # Extract status, description, and date + status_match = re.search(r"- Status: (\w+)", feature_content) + desc_match = re.search( + r"- Description: (.+?)$", feature_content, re.MULTILINE + ) + date_match = re.search( + r"- Last Update: (.+?)$", feature_content, re.MULTILINE + ) + + status = status_match.group(1) if status_match else "planned" + description = desc_match.group(1) if desc_match else "" + date = date_match.group(1) if date_match else "" + + features[feature_key] = { + "status": status, + "description": description, + "date": date, + } +else: + # Default features + features = { + "spatial_queries": { + "status": "planned", + "description": "Methods to efficiently find chunks within regions or distances", # noqa: E501 + }, + "priority_loading": { + "status": "planned", + "description": "API to specify which chunks should be loaded first", # noqa: E501 + }, + "serialization_optimization": { + "status": "in_progress", + "description": "Further improvements to chunk saving/loading", + }, + "chunk_dependency": { + "status": "planned", + "description": "For cases where chunks need to reference neighbors", # noqa: E501 + }, + } + +# Get recent git log entries +git_log = check_output( + [ + "git", + "log", + f"-n{GIT_LOG_LIMIT}", + "--pretty=format:%h|%s|%ad|%an", + "--date=short", + ] +).decode() + +# Parse and collect updates +status_updates = {} +changelog_entries = [] +feature_updates = {} +new_features = {} + +for line in git_log.splitlines(): + parts = line.split("|") + if len(parts) < 4: + continue + + commit_hash, subject, date, author = parts + + status_match = STATUS_TAG_RE.search(subject) + changelog_match = CHANGELOG_TAG_RE.search(subject) + feature_match = FEATURE_TAG_RE.search(subject) + new_feature_match = NEW_FEATURE_RE.search(subject) + + if status_match: + status_key = status_match.group(1) + status_updates[status_key] = { + "status": "done", + "commit": commit_hash, + "date": date, + "message": subject, + } + + if changelog_match: + changelog_entries.append( + f"- {date}: {changelog_match.group(1)} ({commit_hash})" + ) + + if feature_match: + feature_key = feature_match.group(1) + if feature_key in features: + feature_updates[feature_key] = { + "status": "completed", + "date": date, + "author": author, + } + + if new_feature_match: + feature_key = new_feature_match.group(1) + feature_desc = new_feature_match.group(2) + if feature_key not in features: + new_features[feature_key] = { + "status": "planned", + "description": feature_desc, + "date": date, + "author": author, + } + +# Update features with new information +for feature_key, update in feature_updates.items(): + features[feature_key].update(update) + +# Add new features +for feature_key, feature_data in new_features.items(): + features[feature_key] = feature_data + +# Save features to Markdown file +with open(FEATURES_FILE, "w", encoding="utf-8") as f: + f.write("# Feature Tracking\n\n") + + for feature_key, feature_data in features.items(): + status = feature_data.get("status", "planned") + description = feature_data.get("description", "") + date = feature_data.get("date", "") + author = feature_data.get("author", "") + + f.write(f"## {feature_key}\n") + f.write(f"- Status: {status}\n") + f.write(f"- Description: {description}\n") + f.write(f"- Last Update: {date}\n") + if author: + f.write(f"- Updated By: {author}\n") + f.write("\n") + +# Generate status document +with open(STATUS_DOC, "w", encoding="utf-8") as f: + f.write("# AdvChkSys Development Status\n\n") + f.write(f"Last updated: {datetime.now().strftime('%Y-%m-%d')}\n\n") + + f.write("## Feature Status\n\n") + f.write("| Feature | Status | Description | Last Update |\n") + f.write("|---------|--------|-------------|-------------|\n") + + for feature_key, feature_data in features.items(): + status = feature_data.get("status", "unknown") + description = feature_data.get("description", "") + date = feature_data.get("date", "") + + # Use text indicators instead of emojis + status_indicator = { + "planned": "[PLANNED]", + "in_progress": "[IN PROGRESS]", + "completed": "[COMPLETED]", + "done": "[DONE]", + }.get(status, "[UNKNOWN]") + + f.write( + f"| {feature_key.replace('_', ' ').title()} | {status_indicator} | {description} | {date} |\n" # noqa: E501 + ) + + f.write("\n## Recent Updates\n\n") + for entry in changelog_entries[ + :10 + ]: # Show only the 10 most recent entries + f.write(f"{entry}\n") + +# Update changelog if there are new entries +if changelog_entries and os.path.exists(CHANGELOG_FILE): + with open(CHANGELOG_FILE, "r", encoding="utf-8") as f: + existing_changelog = f.read() + + with open(CHANGELOG_FILE, "w", encoding="utf-8") as f: + # Add new entries at the top, under the first heading + lines = existing_changelog.splitlines() + insertion_point = next( + (i for i, line in enumerate(lines) if line.startswith("##")), 2 + ) + + updated_changelog = "\n".join(lines[:insertion_point]) + "\n\n" + updated_changelog += "\n".join(changelog_entries) + "\n\n" + updated_changelog += "\n".join(lines[insertion_point:]) + + f.write(updated_changelog) +elif changelog_entries: + # Create new changelog file + with open(CHANGELOG_FILE, "w", encoding="utf-8") as f: + f.write("# Changelog\n\n") + f.write("\n".join(changelog_entries)) + f.write("\n") + +print(f"Progress tracking updated. Status document generated at {STATUS_DOC}") +print(f"Features updated at {FEATURES_FILE}") diff --git a/src/AdvChkSys/AdvChkSys.cs b/src/AdvChkSys/AdvChkSys.cs index 2ef2694..c371024 100644 --- a/src/AdvChkSys/AdvChkSys.cs +++ b/src/AdvChkSys/AdvChkSys.cs @@ -4,7 +4,12 @@ using AdvChkSys.Manager; using AdvChkSys.Chunk; using AdvChkSys.Resources; using AdvChkSys.Diagnostics; +using AdvChkSys.Threading; +using AdvChkSys.Spatial; using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace AdvChkSys { @@ -17,7 +22,7 @@ namespace AdvChkSys /// /// The current version of the AdvChkSys library. /// - public static string Version => "0.1.8"; + public static string Version => "0.2.0"; /// /// Creates a new WorldConstraints object. @@ -49,6 +54,51 @@ namespace AdvChkSys public static void LogMemoryUsage(Action logAction) => MemoryUsageReporter.LogMemoryUsage(logAction); + /// + /// Gets the threading manager for advanced threading operations. + /// + public static ChunkThreadingManager ThreadingManager => ChunkThreadingManager.Instance; + + /// + /// Runs a batch of chunk operations in parallel. + /// + public static Task RunParallelChunkOperationsAsync(IEnumerable operations, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + return ChunkTaskScheduler.RunBatchParallelAsync(operations, maxDegreeOfParallelism, cancellationToken); + } + + /// + /// Creates a spatial index for 2D chunks. + /// + public static SpatialChunkIndex> CreateSpatialIndex2D() => + new SpatialChunkIndex>(); + + /// + /// Creates a spatial index for 3D chunks. + /// + public static SpatialChunkIndex> CreateSpatialIndex3D() => + new SpatialChunkIndex>(); + + /// + /// Configures the threading system for high throughput. + /// + public static void ConfigureForHighThroughput() => + ChunkThreadingConfiguration.ConfigureForHighThroughput(); + + /// + /// Configures the threading system for low latency. + /// + public static void ConfigureForLowLatency() => + ChunkThreadingConfiguration.ConfigureForLowLatency(); + + /// + /// Configures the threading system for memory efficiency. + /// + public static void ConfigureForMemoryEfficiency() => + ChunkThreadingConfiguration.ConfigureForMemoryEfficiency(); + /// /// Performs a basic self-test of the AdvChkSys core functionality. /// Returns true if all core systems are operational. @@ -84,11 +134,29 @@ namespace AdvChkSys // Check constraints enforcement bool constraintsWork = constraints.IsWithinBounds(1, 1) && constraints.IsWithinChunkLimit(2); + // Test spatial indexing + var spatialIndex2D = CreateSpatialIndex2D(); + spatialIndex2D.AddChunk(chunk2D); + var chunksInRegion = spatialIndex2D.FindChunksInRegion(0, 0, 2, 2); + bool spatialIndexWorks = false; + foreach (var c in chunksInRegion) + { + if (c.Equals(chunk2D)) + { + spatialIndexWorks = true; + break; + } + } + + // Test threading + bool threadingWorks = ThreadingManager != null; + // Clean up manager2D.UnloadChunk(1, 1); manager3D.UnloadChunk(1, 1, 1); - return resourceTracking2D && resourceTracking3D && constraintsWork; + return resourceTracking2D && resourceTracking3D && constraintsWork && + spatialIndexWorks && threadingWorks; } } } \ No newline at end of file diff --git a/src/AdvChkSys/Dependencies/ChunkDependencyTracker.cs b/src/AdvChkSys/Dependencies/ChunkDependencyTracker.cs new file mode 100644 index 0000000..e498e8a --- /dev/null +++ b/src/AdvChkSys/Dependencies/ChunkDependencyTracker.cs @@ -0,0 +1,310 @@ +#nullable enable +using System.Collections.Generic; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Dependencies +{ + /// + /// Tracks dependencies between chunks. + /// + public class ChunkDependencyTracker + { + /// + /// Defines the type of dependency between chunks. + /// + public enum DependencyType + { + Neighbor, + Reference, + Update, + Custom + } + + // Dictionary to track dependencies: source chunk -> (target chunk, dependency type) + private readonly Dictionary> _dependencies = new(); + + // Dictionary to track dependents: target chunk -> (source chunk, dependency type) + private readonly Dictionary> _dependents = new(); + + // Lock object for thread safety + private readonly object _lock = new(); + + /// + /// Registers a dependency between two chunks. + /// + public void RegisterDependency(IChunk source, IChunk target, DependencyType type) + { + if (source == null || target == null) + return; + + lock (_lock) + { + // Add to dependencies + if (!_dependencies.TryGetValue(source, out var targetSet)) + { + targetSet = new HashSet<(IChunk, DependencyType)>(); + _dependencies[source] = targetSet; + } + targetSet.Add((target, type)); + + // Add to dependents + if (!_dependents.TryGetValue(target, out var sourceSet)) + { + sourceSet = new HashSet<(IChunk, DependencyType)>(); + _dependents[target] = sourceSet; + } + sourceSet.Add((source, type)); + } + } + + /// + /// Removes a dependency between two chunks. + /// + public void RemoveDependency(IChunk source, IChunk target) + { + if (source == null || target == null) + return; + + lock (_lock) + { + // Remove from dependencies + if (_dependencies.TryGetValue(source, out var targetSet)) + { + targetSet.RemoveWhere(t => t.Target.Equals(target)); + if (targetSet.Count == 0) + { + _dependencies.Remove(source); + } + } + + // Remove from dependents + if (_dependents.TryGetValue(target, out var sourceSet)) + { + sourceSet.RemoveWhere(s => s.Source.Equals(source)); + if (sourceSet.Count == 0) + { + _dependents.Remove(target); + } + } + } + } + + /// + /// Gets all chunks that depend on the specified chunk. + /// + public IEnumerable GetDependents(IChunk chunk) + { + if (chunk == null) + return new List(); + + lock (_lock) + { + if (!_dependents.TryGetValue(chunk, out var sourceSet)) + { + return new List(); + } + + var result = new List(sourceSet.Count); + foreach (var (source, _) in sourceSet) + { + result.Add(source); + } + return result; + } + } + + /// + /// Gets all chunks that the specified chunk depends on. + /// + public IEnumerable GetDependencies(IChunk chunk) + { + if (chunk == null) + return new List(); + + lock (_lock) + { + if (!_dependencies.TryGetValue(chunk, out var targetSet)) + { + return new List(); + } + + var result = new List(targetSet.Count); + foreach (var (target, _) in targetSet) + { + result.Add(target); + } + return result; + } + } + + /// + /// Gets all chunks that depend on the specified chunk with their dependency types. + /// + public IEnumerable<(IChunk Chunk, DependencyType Type)> GetDependentsWithTypes(IChunk chunk) + { + if (chunk == null) + return new List<(IChunk, DependencyType)>(); + + lock (_lock) + { + if (!_dependents.TryGetValue(chunk, out var sourceDependents)) + { + return new List<(IChunk, DependencyType)>(); + } + + var result = new List<(IChunk, DependencyType)>(sourceDependents.Count); + foreach (var (source, type) in sourceDependents) + { + result.Add((source, type)); + } + return result; + } + } + + /// + /// Gets all chunks that the specified chunk depends on with their dependency types. + /// + public IEnumerable<(IChunk Chunk, DependencyType Type)> GetDependenciesWithTypes(IChunk chunk) + { + if (chunk == null) + return new List<(IChunk, DependencyType)>(); + + lock (_lock) + { + if (!_dependencies.TryGetValue(chunk, out var targetSet)) + { + return new List<(IChunk, DependencyType)>(); + } + + var result = new List<(IChunk, DependencyType)>(targetSet.Count); + foreach (var (target, type) in targetSet) + { + result.Add((target, type)); + } + return result; + } + } + + /// + /// Checks if a dependency exists between source and target chunks. + /// + public bool HasDependency(IChunk source, IChunk target) + { + if (source == null || target == null) + return false; + + lock (_lock) + { + if (!_dependencies.TryGetValue(source, out var targetSet)) + { + return false; + } + + foreach (var (t, _) in targetSet) + { + if (t.Equals(target)) + { + return true; + } + } + + return false; + } + } + + /// + /// Gets the dependency type between source and target chunks, or null if no dependency exists. + /// + public DependencyType? GetDependencyType(IChunk source, IChunk target) + { + if (source == null || target == null) + return null; + + lock (_lock) + { + if (!_dependencies.TryGetValue(source, out var targetSet)) + { + return null; + } + + foreach (var (t, type) in targetSet) + { + if (t.Equals(target)) + { + return type; + } + } + + return null; + } + } + + /// + /// Clears all dependencies for a specific chunk. + /// + public void ClearDependencies(IChunk chunk) + { + if (chunk == null) + return; + + lock (_lock) + { + // Remove all dependencies where this chunk is the source + if (_dependencies.TryGetValue(chunk, out var targetSet)) + { + foreach (var (target, _) in targetSet) + { + if (_dependents.TryGetValue(target, out var depSources)) + { + depSources.RemoveWhere(s => s.Source.Equals(chunk)); + if (depSources.Count == 0) + { + _dependents.Remove(target); + } + } + } + _dependencies.Remove(chunk); + } + + // Remove all dependencies where this chunk is the target + if (_dependents.TryGetValue(chunk, out var sourceSets)) + { + foreach (var (source, _) in sourceSets) + { + if (_dependencies.TryGetValue(source, out var depTargets)) + { + depTargets.RemoveWhere(t => t.Target.Equals(chunk)); + if (depTargets.Count == 0) + { + _dependencies.Remove(source); + } + } + } + _dependents.Remove(chunk); + } + } + } + + /// + /// Gets all chunks that have any dependencies. + /// + public IEnumerable GetAllChunksWithDependencies() + { + lock (_lock) + { + return new HashSet(_dependencies.Keys); + } + } + + /// + /// Gets all chunks that have any dependents. + /// + public IEnumerable GetAllChunksWithDependents() + { + lock (_lock) + { + return new HashSet(_dependents.Keys); + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Loading/ChunkLoadingPriority.cs b/src/AdvChkSys/Loading/ChunkLoadingPriority.cs new file mode 100644 index 0000000..f0dc8bf --- /dev/null +++ b/src/AdvChkSys/Loading/ChunkLoadingPriority.cs @@ -0,0 +1,677 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; +using AdvChkSys.Threading; + +namespace AdvChkSys.Loading +{ + /// + /// Manages prioritized loading of chunks. + /// + public class ChunkLoadingPriority + { + /// + /// Priority levels for chunk loading. + /// + public enum Priority + { + Immediate, + High, + Normal, + Low, + Background + } + + /// + /// Represents a chunk loading request with priority. + /// + public class ChunkLoadRequest + { + /// + /// The X coordinate of the chunk. + /// + public int X { get; } + + /// + /// The Y coordinate of the chunk. + /// + public int Y { get; } + + /// + /// The Z coordinate of the chunk (optional, for 3D chunks). + /// + public int Z { get; } + + /// + /// The priority of this loading request. + /// + public Priority LoadPriority { get; } + + /// + /// The width of the chunk. + /// + public int Width { get; } + + /// + /// The height of the chunk. + /// + public int Height { get; } + + /// + /// The depth of the chunk (for 3D chunks). + /// + public int Depth { get; } + + /// + /// Timestamp when the request was created. + /// + public DateTime Timestamp { get; } + + /// + /// Unique identifier for this request. + /// + public Guid RequestId { get; } + + /// + /// Creates a new 2D chunk loading request. + /// + public ChunkLoadRequest(int x, int y, int width, int height, Priority priority) + { + X = x; + Y = y; + Z = 0; + Width = width; + Height = height; + Depth = 1; + LoadPriority = priority; + Timestamp = DateTime.UtcNow; + RequestId = Guid.NewGuid(); + } + + /// + /// Creates a new 3D chunk loading request. + /// + public ChunkLoadRequest(int x, int y, int z, int width, int height, int depth, Priority priority) + { + X = x; + Y = y; + Z = z; + Width = width; + Height = height; + Depth = depth; + LoadPriority = priority; + Timestamp = DateTime.UtcNow; + RequestId = Guid.NewGuid(); + } + } + + // Priority queues for each priority level + private readonly Dictionary> _requestQueues = new(); + + // Lookup for fast cancellation and status checks + private readonly Dictionary _requestLookup = new(); + + // Active tasks to prevent duplicate loading + private readonly Dictionary<(int X, int Y, int Z), Task> _activeTasks = new(); + + // Synchronization + private readonly SemaphoreSlim _queueSemaphore = new(1, 1); + private readonly SemaphoreSlim _processSemaphore; + private readonly CancellationTokenSource _cancellationSource = new(); + + // Processing state + private bool _isProcessing; + private Task? _processingTask; + + // Configuration + private readonly int _maxConcurrentLoads; + private readonly TimeSpan _requestTimeout; + + /// + /// Event raised when a chunk load request is completed. + /// + public event EventHandler? RequestCompleted; + + /// + /// Event raised when a chunk load request fails. + /// + public event EventHandler<(ChunkLoadRequest Request, Exception Exception)>? RequestFailed; + + /// + /// Initializes a new instance of the ChunkLoadingPriority class. + /// + /// Maximum number of chunks to load concurrently + /// Timeout in seconds for chunk load requests + public ChunkLoadingPriority(int maxConcurrentLoads = 4, int requestTimeoutSeconds = 30) + { + _maxConcurrentLoads = maxConcurrentLoads; + _requestTimeout = TimeSpan.FromSeconds(requestTimeoutSeconds); + _processSemaphore = new SemaphoreSlim(maxConcurrentLoads, maxConcurrentLoads); + + // Initialize queues for each priority level + foreach (Priority priority in Enum.GetValues(typeof(Priority))) + { + _requestQueues[priority] = new Queue(); + } + } + + /// + /// Enqueues a chunk load request with the specified priority. + /// + /// The request object that can be used to track or cancel the request + public ChunkLoadRequest EnqueueRequest(int x, int y, int width, int height, Priority priority = Priority.Normal) + { + var request = new ChunkLoadRequest(x, y, width, height, priority); + EnqueueRequest(request); + return request; + } + + /// + /// Enqueues a 3D chunk load request with the specified priority. + /// + /// The request object that can be used to track or cancel the request + public ChunkLoadRequest EnqueueRequest(int x, int y, int z, int width, int height, int depth, Priority priority = Priority.Normal) + { + var request = new ChunkLoadRequest(x, y, z, width, height, depth, priority); + EnqueueRequest(request); + return request; + } + + /// + /// Enqueues a pre-created chunk load request. + /// + public void EnqueueRequest(ChunkLoadRequest request) + { + _queueSemaphore.Wait(); + try + { + // Check if we already have a request for this chunk + var key = (request.X, request.Y, request.Z); + if (_activeTasks.ContainsKey(key)) + { + return; // Already loading this chunk + } + + // Add to appropriate queue + _requestQueues[request.LoadPriority].Enqueue(request); + _requestLookup[request.RequestId] = request; + + // Start processing if not already running + EnsureProcessingStarted(); + } + finally + { + _queueSemaphore.Release(); + } + + // For immediate priority, wait for processing to start + if (request.LoadPriority == Priority.Immediate) + { + TriggerImmediateProcessing(); + } + } + + /// + /// Cancels a pending chunk load request. + /// + /// True if the request was found and canceled, false otherwise + public bool CancelRequest(Guid requestId) + { + _queueSemaphore.Wait(); + try + { + if (_requestLookup.TryGetValue(requestId, out var request)) + { + _requestLookup.Remove(requestId); + // Note: We don't remove from the queue as that would be inefficient + // Instead, we'll skip it when we dequeue + return true; + } + return false; + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Cancels all pending chunk load requests. + /// + public void CancelAllRequests() + { + _queueSemaphore.Wait(); + try + { + foreach (var queue in _requestQueues.Values) + { + queue.Clear(); + } + _requestLookup.Clear(); + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Starts the processing of chunk load requests if not already running. + /// + private void EnsureProcessingStarted() + { + if (_isProcessing) + return; + + _isProcessing = true; + _processingTask = Task.Run(ProcessRequestsAsync); + } + + /// + /// Triggers immediate processing of high-priority requests. + /// + private void TriggerImmediateProcessing() + { + // This is a hint to the processor to check for immediate requests now + // We don't need to do anything special as the processor checks immediate first + } + + /// + /// Main processing loop for chunk load requests. + /// + private async Task ProcessRequestsAsync() + { + while (!_cancellationSource.IsCancellationRequested) + { + ChunkLoadRequest? request = null; + + // Get the next request from the highest priority queue + await _queueSemaphore.WaitAsync(); + try + { + request = DequeueNextRequest(); + + if (request == null) + { + // No requests to process, pause briefly + _isProcessing = false; + _queueSemaphore.Release(); + await Task.Delay(50); + continue; + } + + // Mark this chunk as being processed + var key = (request.X, request.Y, request.Z); + if (_activeTasks.ContainsKey(key)) + { + // Already processing this chunk, skip + continue; + } + + // Create a task for this request but don't start it yet + var loadTask = ProcessRequestAsync(request); + _activeTasks[key] = loadTask; + } + finally + { + if (_isProcessing) + { + _queueSemaphore.Release(); + } + } + + // Wait for a processing slot + await _processSemaphore.WaitAsync(); + + // Start the task and continue without waiting + _ = Task.Run(async () => + { + try + { + await ProcessRequestAsync(request!); + } + finally + { + _processSemaphore.Release(); + + // Remove from active tasks + await _queueSemaphore.WaitAsync(); + try + { + _activeTasks.Remove((request!.X, request.Y, request.Z)); + } + finally + { + _queueSemaphore.Release(); + } + } + }); + } + } + + /// + /// Dequeues the next request from the highest priority queue. + /// + private ChunkLoadRequest? DequeueNextRequest() + { + // Check each priority level in order + foreach (Priority priority in Enum.GetValues(typeof(Priority))) + { + var queue = _requestQueues[priority]; + + while (queue.Count > 0) + { + var request = queue.Dequeue(); + + // Skip if the request has been canceled + if (!_requestLookup.ContainsKey(request.RequestId)) + continue; + + // Skip if the request has timed out + if (DateTime.UtcNow - request.Timestamp > _requestTimeout) + { + _requestLookup.Remove(request.RequestId); + continue; + } + + // Valid request found + return request; + } + } + + return null; // No valid requests found + } + + /// + /// Processes a single chunk load request. + /// + private async Task ProcessRequestAsync(ChunkLoadRequest request) + { + try + { + // This would be integrated with your chunk manager + // For now, we'll just simulate the loading with a delay + + // Simulate different loading times based on priority + int delayMs = request.LoadPriority switch + { + Priority.Immediate => 50, + Priority.High => 100, + Priority.Normal => 200, + Priority.Low => 500, + Priority.Background => 1000, + _ => 200 + }; + + await Task.Delay(delayMs); + + // Notify completion + RequestCompleted?.Invoke(this, request); + } + catch (Exception ex) + { + // Notify failure + RequestFailed?.Invoke(this, (request, ex)); + } + finally + { + // Remove from lookup + await _queueSemaphore.WaitAsync(); + try + { + _requestLookup.Remove(request.RequestId); + } + finally + { + _queueSemaphore.Release(); + } + } + } + + /// + /// Gets the number of pending requests for each priority level. + /// + public Dictionary GetQueueSizes() + { + _queueSemaphore.Wait(); + try + { + var result = new Dictionary(); + foreach (var kvp in _requestQueues) + { + result[kvp.Key] = kvp.Value.Count; + } + return result; + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Gets the total number of pending requests. + /// + public int GetTotalQueueSize() + { + _queueSemaphore.Wait(); + try + { + int total = 0; + foreach (var queue in _requestQueues.Values) + { + total += queue.Count; + } + return total; + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Stops processing and releases resources. + /// + public async Task ShutdownAsync() + { + _cancellationSource.Cancel(); + + if (_processingTask != null) + { + try + { + await _processingTask; + } + catch (OperationCanceledException) + { + // Expected + } + catch (Exception) + { + // Ignore other exceptions during shutdown + } + } + + _queueSemaphore.Dispose(); + _processSemaphore.Dispose(); + _cancellationSource.Dispose(); + } + + /// + /// Creates a ChunkLoadingPriority instance with default settings. + /// + public static ChunkLoadingPriority CreateDefault() + { + return new ChunkLoadingPriority(); + } + + /// + /// Creates a ChunkLoadingPriority instance optimized for high throughput. + /// + public static ChunkLoadingPriority CreateHighThroughput() + { + return new ChunkLoadingPriority( + maxConcurrentLoads: Environment.ProcessorCount * 2, + requestTimeoutSeconds: 60); + } + + /// + /// Creates a ChunkLoadingPriority instance optimized for low latency. + /// + public static ChunkLoadingPriority CreateLowLatency() + { + return new ChunkLoadingPriority( + maxConcurrentLoads: Math.Max(2, Environment.ProcessorCount / 2), + requestTimeoutSeconds: 15); + } + + /// + /// Integrates with a chunk manager to process load requests. + /// + /// The type of data in the chunks + /// The chunk manager to use for loading chunks + public void IntegrateWith(IChunkManager chunkManager) + { + // Replace the ProcessRequestAsync method with one that uses the chunk manager + // This is a simplified example - in a real implementation, you'd need to handle + // both 2D and 3D chunk managers and properly cast the interface + + RequestCompleted += (sender, request) => + { + // The chunk has been loaded, you might want to do something with it + var chunk = chunkManager.GetChunk(request.X, request.Y); + // Additional processing if needed + }; + } + + /// + /// Gets the estimated time until a request with the given priority would be processed. + /// + /// Estimated wait time in milliseconds, or -1 if cannot be determined + public int GetEstimatedWaitTime(Priority priority) + { + _queueSemaphore.Wait(); + try + { + int totalHigherPriorityRequests = 0; + + // Count requests with higher or equal priority + foreach (Priority p in Enum.GetValues(typeof(Priority))) + { + if (p <= priority) + { + totalHigherPriorityRequests += _requestQueues[p].Count; + } + } + + // If no requests or no active tasks, return 0 + if (totalHigherPriorityRequests == 0 || _activeTasks.Count == 0) + { + return 0; + } + + // Estimate based on current processing rate + // This is a very simple estimate and could be improved with actual metrics + int averageProcessingTimeMs = priority switch + { + Priority.Immediate => 50, + Priority.High => 100, + Priority.Normal => 200, + Priority.Low => 500, + Priority.Background => 1000, + _ => 200 + }; + + // Calculate how many batches of concurrent requests we'll need + int batches = (int)Math.Ceiling(totalHigherPriorityRequests / (double)_maxConcurrentLoads); + + return batches * averageProcessingTimeMs; + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Adjusts the priority of an existing request. + /// + /// True if the request was found and its priority adjusted, false otherwise + public bool AdjustPriority(Guid requestId, Priority newPriority) + { + // Note: This is not an efficient operation as we don't directly modify the queue + // Instead, we mark the request for priority change when it's processed + + _queueSemaphore.Wait(); + try + { + if (_requestLookup.TryGetValue(requestId, out var request)) + { + // If the request is of lower priority than requested, we'll re-queue it + if (request.LoadPriority > newPriority) + { + // Remove from lookup (it will be skipped when dequeued) + _requestLookup.Remove(requestId); + + // Create a new request with the same parameters but higher priority + var newRequest = request.Depth > 1 + ? new ChunkLoadRequest(request.X, request.Y, request.Z, request.Width, request.Height, request.Depth, newPriority) + : new ChunkLoadRequest(request.X, request.Y, request.Width, request.Height, newPriority); + + // Add the new request + _requestQueues[newPriority].Enqueue(newRequest); + _requestLookup[newRequest.RequestId] = newRequest; + + // If upgrading to immediate, trigger processing + if (newPriority == Priority.Immediate) + { + TriggerImmediateProcessing(); + } + } + + return true; + } + + return false; + } + finally + { + _queueSemaphore.Release(); + } + } + + /// + /// Gets statistics about the current state of the chunk loading system. + /// + public Dictionary GetStatistics() + { + _queueSemaphore.Wait(); + try + { + var stats = new Dictionary + { + ["TotalPendingRequests"] = _requestLookup.Count, + ["ActiveTasks"] = _activeTasks.Count, + ["IsProcessing"] = _isProcessing, + ["MaxConcurrentLoads"] = _maxConcurrentLoads + }; + + // Add queue sizes for each priority + foreach (Priority priority in Enum.GetValues(typeof(Priority))) + { + stats[$"Queue_{priority}"] = _requestQueues[priority].Count; + } + + return stats; + } + finally + { + _queueSemaphore.Release(); + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Spatial/ChunkExtensions.cs b/src/AdvChkSys/Spatial/ChunkExtensions.cs new file mode 100644 index 0000000..95d355d --- /dev/null +++ b/src/AdvChkSys/Spatial/ChunkExtensions.cs @@ -0,0 +1,40 @@ +#nullable enable +using AdvChkSys.Chunk; + +namespace AdvChkSys.Spatial +{ + /// + /// Provides extension methods for chunks to work with spatial indexing. + /// + public static class ChunkExtensions + { + /// + /// Converts a Chunk3D to an IChunk3D. + /// + public static IChunk3D AsIChunk3D(this Chunk3D chunk) + { + return new Chunk3DAdapter(chunk); + } + + /// + /// Adapter to make Chunk3D implement IChunk3D. + /// + private class Chunk3DAdapter : IChunk3D + { + private readonly Chunk3D _chunk; + + public Chunk3DAdapter(Chunk3D chunk) + { + _chunk = chunk; + } + + public int X => _chunk.X; + public int Y => _chunk.Y; + public int Z => _chunk.Z; + public int Width => _chunk.Width; + public int Height => _chunk.Height; + public int Depth => _chunk.Depth; + public System.Collections.Generic.Dictionary Metadata => _chunk.Metadata; + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Spatial/SpatialChunkIndex.cs b/src/AdvChkSys/Spatial/SpatialChunkIndex.cs new file mode 100644 index 0000000..dc92d0a --- /dev/null +++ b/src/AdvChkSys/Spatial/SpatialChunkIndex.cs @@ -0,0 +1,1228 @@ +#nullable enable +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Linq; +using AdvChkSys.Interfaces; +using AdvChkSys.Threading; + + +namespace AdvChkSys.Spatial +{ + /// + /// Provides spatial indexing for efficient region and distance queries. + /// + public class SpatialChunkIndex where T : IChunk + { + // Grid-based spatial index for fast lookups + private readonly Dictionary<(int, int), HashSet> _grid = new(); + + // All chunks in the index for iteration + private readonly HashSet _allChunks = new(); + + // Lock for thread safety + private readonly object _lock = new(); + + /// + /// Adds a chunk to the spatial index. + /// + /// The chunk to add + public void AddChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Add to all chunks set + _allChunks.Add(chunk); + + // Add to grid cells + var key = (chunk.X, chunk.Y); + if (!_grid.TryGetValue(key, out var chunks)) + { + chunks = new HashSet(); + _grid[key] = chunks; + } + chunks.Add(chunk); + } + } + + /// + /// Removes a chunk from the spatial index. + /// + /// The chunk to remove + /// True if the chunk was found and removed, false otherwise + public bool RemoveChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Remove from all chunks set + bool removed = _allChunks.Remove(chunk); + + // Remove from grid cells + var key = (chunk.X, chunk.Y); + if (_grid.TryGetValue(key, out var chunks)) + { + chunks.Remove(chunk); + if (chunks.Count == 0) + { + _grid.Remove(key); + } + } + + return removed; + } + } + + /// + /// Finds all chunks within a rectangular region. + /// + /// Minimum X coordinate + /// Minimum Y coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// All chunks that intersect with the region + public IEnumerable FindChunksInRegion(int minX, int minY, int maxX, int maxY) + { + lock (_lock) + { + var result = new HashSet(); + + // Check each grid cell that overlaps with the region + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + var key = (x, y); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + result.Add(chunk); + } + } + } + } + + return result; + } + } + + /// + /// Finds all chunks within a specified distance from a point. + /// + /// Center X coordinate + /// Center Y coordinate + /// Radius to search within + /// All chunks that are within the radius of the point + public IEnumerable FindChunksInRadius(int centerX, int centerY, float radius) + { + lock (_lock) + { + var result = new HashSet(); + float radiusSquared = radius * radius; + + // Calculate the bounding box of the circle + int minX = (int)(centerX - radius); + int minY = (int)(centerY - radius); + int maxX = (int)(centerX + radius); + int maxY = (int)(centerY + radius); + + // Check each grid cell that might overlap with the circle + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + var key = (x, y); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + // Calculate distance from chunk center to query center + float dx = chunk.X - centerX; + float dy = chunk.Y - centerY; + float distanceSquared = dx * dx + dy * dy; + + // Check if the chunk is within the radius + if (distanceSquared <= radiusSquared) + { + result.Add(chunk); + } + } + } + } + } + + return result; + } + } + + /// + /// Finds the nearest chunk to a point. + /// + /// X coordinate + /// Y coordinate + /// The nearest chunk, or null if no chunks are in the index + public T? FindNearestChunk(int x, int y) + { + lock (_lock) + { + if (_allChunks.Count == 0) + return default; + + T? nearest = default; + float nearestDistanceSquared = float.MaxValue; + + foreach (var chunk in _allChunks) + { + float dx = chunk.X - x; + float dy = chunk.Y - y; + float distanceSquared = dx * dx + dy * dy; + + if (distanceSquared < nearestDistanceSquared) + { + nearestDistanceSquared = distanceSquared; + nearest = chunk; + } + } + + return nearest; + } + } + + /// + /// Finds all chunks that contain a point. + /// + /// X coordinate + /// Y coordinate + /// All chunks that contain the point + public IEnumerable FindChunksContainingPoint(int x, int y) + { + lock (_lock) + { + var result = new HashSet(); + + // Get the grid cell for this point + var key = (x, y); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + // Check if the point is within the chunk bounds + if (x >= chunk.X && x < chunk.X + chunk.Width && + y >= chunk.Y && y < chunk.Y + chunk.Height) + { + result.Add(chunk); + } + } + } + + return result; + } + } + + /// + /// Gets all chunks in the index. + /// + public IEnumerable GetAllChunks() + { + lock (_lock) + { + return _allChunks.ToList(); + } + } + + /// + /// Clears all chunks from the index. + /// + public void Clear() + { + lock (_lock) + { + _allChunks.Clear(); + _grid.Clear(); + } + } + + /// + /// Gets the number of chunks in the index. + /// + public int Count + { + get + { + lock (_lock) + { + return _allChunks.Count; + } + } + } + + /// + /// Checks if a chunk is in the index. + /// + /// The chunk to check + /// True if the chunk is in the index, false otherwise + public bool Contains(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + return _allChunks.Contains(chunk); + } + } + + /// + /// Updates the position of a chunk in the spatial index. + /// + /// The chunk to update + /// The old X coordinate + /// The old Y coordinate + /// True if the chunk was found and updated, false otherwise + public bool UpdateChunkPosition(T chunk, int oldX, int oldY) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Remove from old grid cell + var oldKey = (oldX, oldY); + bool removed = false; + + if (_grid.TryGetValue(oldKey, out var chunks)) + { + removed = chunks.Remove(chunk); + if (chunks.Count == 0) + { + _grid.Remove(oldKey); + } + } + + if (!removed) + return false; + + // Add to new grid cell + var newKey = (chunk.X, chunk.Y); + if (!_grid.TryGetValue(newKey, out var newChunks)) + { + newChunks = new HashSet(); + _grid[newKey] = newChunks; + } + newChunks.Add(chunk); + + return true; + } + } + + /// + /// Performs a spatial query using a custom filter. + /// + /// The filter function to apply to each chunk + /// All chunks that pass the filter + public IEnumerable Query(Func filter) + { + if (filter == null) + throw new ArgumentNullException(nameof(filter)); + + lock (_lock) + { + return _allChunks.Where(filter).ToList(); + } + } + + /// + /// Finds all chunks that intersect with a given chunk. + /// + /// The chunk to check for intersections + /// All chunks that intersect with the given chunk + public IEnumerable FindIntersectingChunks(IChunk chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + return FindChunksInRegion( + chunk.X, + chunk.Y, + chunk.X + chunk.Width - 1, + chunk.Y + chunk.Height - 1); + } + + /// + /// Finds all chunks that are neighbors of a given chunk. + /// + /// The chunk to find neighbors for + /// Whether to include diagonal neighbors + /// All neighboring chunks + public IEnumerable FindNeighbors(IChunk chunk, bool includeDiagonals = false) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + var result = new HashSet(); + + // Check the four adjacent cells + CheckNeighbor(chunk.X - 1, chunk.Y, result); + CheckNeighbor(chunk.X + 1, chunk.Y, result); + CheckNeighbor(chunk.X, chunk.Y - 1, result); + CheckNeighbor(chunk.X, chunk.Y + 1, result); + + // Check diagonal cells if requested + if (includeDiagonals) + { + CheckNeighbor(chunk.X - 1, chunk.Y - 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y - 1, result); + CheckNeighbor(chunk.X - 1, chunk.Y + 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y + 1, result); + } + + return result; + } + } + + /// + /// Helper method to check for chunks at a specific grid cell. + /// + private void CheckNeighbor(int x, int y, HashSet result) + { + var key = (x, y); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + result.Add(chunk); + } + } + } + + /// + /// Performs a spatial operation on chunks in parallel. + /// + /// The operation to perform on each chunk + /// Optional filter to apply before processing + /// Maximum degree of parallelism (null for default) + /// A task that completes when all operations are done + public Task ProcessChunksParallelAsync( + Action operation, + Func? filter = null, + int? maxDegreeOfParallelism = null) + { + if (operation == null) + throw new ArgumentNullException(nameof(operation)); + + IEnumerable chunksToProcess; + + lock (_lock) + { + chunksToProcess = filter != null + ? _allChunks.Where(filter).ToList() + : _allChunks.ToList(); + } + + return ChunkTaskScheduler.RunBatchParallelAsync( + chunksToProcess.Select(chunk => new Action(() => operation(chunk))).ToArray(), + maxDegreeOfParallelism); + } + + /// + /// Finds all chunks in a region and performs an operation on them in parallel. + /// + /// Minimum X coordinate + /// Minimum Y coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// The operation to perform on each chunk + /// Maximum degree of parallelism (null for default) + /// A task that completes when all operations are done + public Task ProcessRegionParallelAsync( + int minX, int minY, int maxX, int maxY, + Action operation, + int? maxDegreeOfParallelism = null) + { + if (operation == null) + throw new ArgumentNullException(nameof(operation)); + + var chunksInRegion = FindChunksInRegion(minX, minY, maxX, maxY).ToList(); + + return ChunkTaskScheduler.RunBatchParallelAsync( + chunksInRegion.Select(chunk => new Action(() => operation(chunk))).ToArray(), + maxDegreeOfParallelism); + } + + /// + /// Creates a quadtree-based spatial index for more efficient queries. + /// + /// Minimum X coordinate of the world + /// Minimum Y coordinate of the world + /// Maximum X coordinate of the world + /// Maximum Y coordinate of the world + /// A new quadtree-based spatial index + public QuadtreeSpatialIndex CreateQuadtreeIndex(int minX, int minY, int maxX, int maxY) + { + var quadtree = new QuadtreeSpatialIndex(minX, minY, maxX, maxY); + + lock (_lock) + { + foreach (var chunk in _allChunks) + { + quadtree.AddChunk(chunk); + } + } + + return quadtree; + } + } + + /// + /// Provides a quadtree-based spatial index for more efficient region queries. + /// + public class QuadtreeSpatialIndex where T : IChunk + { + private readonly QuadtreeNode _root; + private readonly HashSet _allChunks = new(); + private readonly object _lock = new(); + + /// + /// Initializes a new instance of the QuadtreeSpatialIndex class. + /// + /// Minimum X coordinate of the world + /// Minimum Y coordinate of the world + /// Maximum X coordinate of the world + /// Maximum Y coordinate of the world + public QuadtreeSpatialIndex(int minX, int minY, int maxX, int maxY) + { + _root = new QuadtreeNode(minX, minY, maxX, maxY); + } + + /// + /// Adds a chunk to the quadtree. + /// + /// The chunk to add + public void AddChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + _allChunks.Add(chunk); + _root.Insert(chunk); + } + } + + /// + /// Removes a chunk from the quadtree. + /// + /// The chunk to remove + /// True if the chunk was found and removed, false otherwise + public bool RemoveChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + bool removed = _allChunks.Remove(chunk); + if (removed) + { + _root.Remove(chunk); + } + return removed; + } + } + + /// + /// Finds all chunks within a rectangular region. + /// + /// Minimum X coordinate + /// Minimum Y coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// All chunks that intersect with the region + public IEnumerable FindChunksInRegion(int minX, int minY, int maxX, int maxY) + { + lock (_lock) + { + var result = new HashSet(); + _root.Query(minX, minY, maxX, maxY, result); + return result; + } + } + + /// + /// Gets all chunks in the quadtree. + /// + public IEnumerable GetAllChunks() + { + lock (_lock) + { + return _allChunks.ToList(); + } + } + + /// + /// Clears all chunks from the quadtree. + /// + public void Clear() + { + lock (_lock) + { + _allChunks.Clear(); + _root.Clear(); + } + } + + /// + /// Gets the number of chunks in the quadtree. + /// + public int Count + { + get + { + lock (_lock) + { + return _allChunks.Count; + } + } + } + + /// + /// Represents a node in the quadtree. + /// + private class QuadtreeNode + { + // Boundary of this node + private readonly int _minX, _minY, _maxX, _maxY; + + // Children nodes (NW, NE, SW, SE) + private QuadtreeNode? _northWest; + private QuadtreeNode? _northEast; + private QuadtreeNode? _southWest; + private QuadtreeNode? _southEast; + + // Chunks in this node + private readonly HashSet _chunks = new(); + + // Maximum number of chunks before splitting + private const int MAX_CHUNKS = 8; + + // Minimum size of a node + private const int MIN_SIZE = 1; + + /// + /// Initializes a new instance of the QuadtreeNode class. + /// + public QuadtreeNode(int minX, int minY, int maxX, int maxY) + { + _minX = minX; + _minY = minY; + _maxX = maxX; + _maxY = maxY; + } + + /// + /// Inserts a chunk into the quadtree. + /// + public void Insert(T chunk) + { + // Check if the chunk is within this node's boundary + if (!Intersects(chunk)) + return; + + // If we haven't split yet and have room, add the chunk to this node + if (_northWest == null && _chunks.Count < MAX_CHUNKS) + { + _chunks.Add(chunk); + return; + } + + // If we haven't split yet but need to, split the node + if (_northWest == null) + { + Split(); + } + + // Try to insert the chunk into the children + _northWest?.Insert(chunk); + _northEast?.Insert(chunk); + _southWest?.Insert(chunk); + _southEast?.Insert(chunk); + } + + /// + /// Removes a chunk from the quadtree. + /// + public bool Remove(T chunk) + { + // Check if the chunk is within this node's boundary + if (!Intersects(chunk)) + return false; + + // Try to remove from this node + bool removed = _chunks.Remove(chunk); + + // If we have children, try to remove from them too + if (_northWest != null) + { + removed |= _northWest.Remove(chunk); + removed |= _northEast!.Remove(chunk); + removed |= _southWest!.Remove(chunk); + removed |= _southEast!.Remove(chunk); + } + + return removed; + } + + /// + /// Queries the quadtree for chunks in a region. + /// + public void Query(int minX, int minY, int maxX, int maxY, HashSet result) + { + // Check if the query region intersects this node + if (maxX < _minX || minX > _maxX || maxY < _minY || minY > _maxY) + return; + + // Add chunks from this node that intersect the query region + foreach (var chunk in _chunks) + { + if (chunk.X <= maxX && chunk.X + chunk.Width >= minX && + chunk.Y <= maxY && chunk.Y + chunk.Height >= minY) + { + result.Add(chunk); + } + } + + // If we have children, query them too + if (_northWest != null) + { + _northWest.Query(minX, minY, maxX, maxY, result); + _northEast!.Query(minX, minY, maxX, maxY, result); + _southWest!.Query(minX, minY, maxX, maxY, result); + _southEast!.Query(minX, minY, maxX, maxY, result); + } + } + + /// + /// Splits this node into four children. + /// + private void Split() + { + int midX = (_minX + _maxX) / 2; + int midY = (_minY + _maxY) / 2; + + // Don't split if the node is too small + if (midX == _minX || midY == _minY) + return; + + _northWest = new QuadtreeNode(_minX, _minY, midX, midY); + _northEast = new QuadtreeNode(midX, _minY, _maxX, midY); + _southWest = new QuadtreeNode(_minX, midY, midX, _maxY); + _southEast = new QuadtreeNode(midX, midY, _maxX, _maxY); + + // Redistribute chunks to children + var chunksToRedistribute = new List(_chunks); + _chunks.Clear(); + + foreach (var chunk in chunksToRedistribute) + { + Insert(chunk); + } + } + + /// + /// Checks if a chunk intersects with this node's boundary. + /// + private bool Intersects(T chunk) + { + return chunk.X <= _maxX && + chunk.X + chunk.Width >= _minX && + chunk.Y <= _maxY && + chunk.Y + chunk.Height >= _minY; + } + + /// + /// Clears all chunks from this node and its children. + /// + public void Clear() + { + _chunks.Clear(); + + if (_northWest != null) + { + _northWest.Clear(); + _northEast!.Clear(); + _southWest!.Clear(); + _southEast!.Clear(); + + _northWest = null; + _northEast = null; + _southWest = null; + _southEast = null; + } + } + } + } + + /// + /// Provides a spatial index for 3D chunks. + /// + public class SpatialChunkIndex3D where T : IChunk3D + { + // Grid-based spatial index for fast lookups + private readonly Dictionary<(int, int, int), HashSet> _grid = new(); + + // All chunks in the index for iteration + private readonly HashSet _allChunks = new(); + + // Lock for thread safety + private readonly object _lock = new(); + + /// + /// Adds a chunk to the spatial index. + /// + /// The chunk to add + public void AddChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Add to all chunks set + _allChunks.Add(chunk); + + // Add to grid cells + var key = (chunk.X, chunk.Y, chunk.Z); + if (!_grid.TryGetValue(key, out var chunks)) + { + chunks = new HashSet(); + _grid[key] = chunks; + } + chunks.Add(chunk); + } + } + + /// + /// Removes a chunk from the spatial index. + /// + /// The chunk to remove + /// True if the chunk was found and removed, false otherwise + public bool RemoveChunk(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Remove from all chunks set + bool removed = _allChunks.Remove(chunk); + + // Remove from grid cells + var key = (chunk.X, chunk.Y, chunk.Z); + if (_grid.TryGetValue(key, out var chunks)) + { + chunks.Remove(chunk); + if (chunks.Count == 0) + { + _grid.Remove(key); + } + } + + return removed; + } + } + + /// + /// Finds all chunks within a rectangular region. + /// + /// Minimum X coordinate + /// Minimum Y coordinate + /// Minimum Z coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// Maximum Z coordinate + /// All chunks that intersect with the region + public IEnumerable FindChunksInRegion(int minX, int minY, int minZ, int maxX, int maxY, int maxZ) + { + lock (_lock) + { + var result = new HashSet(); + + // Check each grid cell that overlaps with the region + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + for (int z = minZ; z <= maxZ; z++) + { + var key = (x, y, z); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + result.Add(chunk); + } + } + } + } + } + + return result; + } + } + + /// + /// Finds all chunks within a specified distance from a point. + /// + /// Center X coordinate + /// Center Y coordinate + /// Center Z coordinate + /// Radius to search within + /// All chunks that are within the radius of the point + public IEnumerable FindChunksInRadius(int centerX, int centerY, int centerZ, float radius) + { + lock (_lock) + { + var result = new HashSet(); + float radiusSquared = radius * radius; + + // Calculate the bounding box of the sphere + int minX = (int)(centerX - radius); + int minY = (int)(centerY - radius); + int minZ = (int)(centerZ - radius); + int maxX = (int)(centerX + radius); + int maxY = (int)(centerY + radius); + int maxZ = (int)(centerZ + radius); + + // Check each grid cell that might overlap with the sphere + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + for (int z = minZ; z <= maxZ; z++) + { + var key = (x, y, z); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + // Calculate distance from chunk center to query center + float dx = chunk.X - centerX; + float dy = chunk.Y - centerY; + float dz = chunk.Z - centerZ; + float distanceSquared = dx * dx + dy * dy + dz * dz; + + // Check if the chunk is within the radius + if (distanceSquared <= radiusSquared) + { + result.Add(chunk); + } + } + } + } + } + } + + return result; + } + } + + /// + /// Finds the nearest chunk to a point. + /// + /// X coordinate + /// Y coordinate + /// Z coordinate + /// The nearest chunk, or null if no chunks are in the index + public T? FindNearestChunk(int x, int y, int z) + { + lock (_lock) + { + if (_allChunks.Count == 0) + return default; + + T? nearest = default; + float nearestDistanceSquared = float.MaxValue; + + foreach (var chunk in _allChunks) + { + float dx = chunk.X - x; + float dy = chunk.Y - y; + float dz = chunk.Z - z; + float distanceSquared = dx * dx + dy * dy + dz * dz; + + if (distanceSquared < nearestDistanceSquared) + { + nearestDistanceSquared = distanceSquared; + nearest = chunk; + } + } + + return nearest; + } + } + + /// + /// Gets all chunks in the index. + /// + public IEnumerable GetAllChunks() + { + lock (_lock) + { + return _allChunks.ToList(); + } + } + + /// + /// Clears all chunks from the index. + /// + public void Clear() + { + lock (_lock) + { + _allChunks.Clear(); + _grid.Clear(); + } + } + + /// + /// Gets the number of chunks in the index. + /// + public int Count + { + get + { + lock (_lock) + { + return _allChunks.Count; + } + } + } + + /// + /// Checks if a chunk is in the index. + /// + /// The chunk to check + /// True if the chunk is in the index, false otherwise + public bool Contains(T chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + return _allChunks.Contains(chunk); + } + } + + /// + /// Updates the position of a chunk in the spatial index. + /// + /// The chunk to update + /// The old X coordinate + /// The old Y coordinate + /// The old Z coordinate + /// True if the chunk was found and updated, false otherwise + public bool UpdateChunkPosition(T chunk, int oldX, int oldY, int oldZ) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + // Remove from old grid cell + var oldKey = (oldX, oldY, oldZ); + bool removed = false; + + if (_grid.TryGetValue(oldKey, out var chunks)) + { + removed = chunks.Remove(chunk); + if (chunks.Count == 0) + { + _grid.Remove(oldKey); + } + } + + if (!removed) + return false; + + // Add to new grid cell + var newKey = (chunk.X, chunk.Y, chunk.Z); + if (!_grid.TryGetValue(newKey, out var newChunks)) + { + newChunks = new HashSet(); + _grid[newKey] = newChunks; + } + newChunks.Add(chunk); + + return true; + } + } + + /// + /// Performs a spatial query using a custom filter. + /// + /// The filter function to apply to each chunk + /// All chunks that pass the filter + public IEnumerable Query(Func filter) + { + if (filter == null) + throw new ArgumentNullException(nameof(filter)); + + lock (_lock) + { + return _allChunks.Where(filter).ToList(); + } + } + + /// + /// Finds all chunks that intersect with a given chunk. + /// + /// The chunk to check for intersections + /// All chunks that intersect with the given chunk + public IEnumerable FindIntersectingChunks(IChunk3D chunk) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + return FindChunksInRegion( + chunk.X, + chunk.Y, + chunk.Z, + chunk.X + chunk.Width - 1, + chunk.Y + chunk.Height - 1, + chunk.Z + chunk.Depth - 1); + } + + /// + /// Finds all chunks that are neighbors of a given chunk. + /// + /// The chunk to find neighbors for + /// Whether to include diagonal neighbors + /// Whether to include edge neighbors + /// All neighboring chunks + public IEnumerable FindNeighbors(IChunk3D chunk, bool includeDiagonals = false, bool includeEdges = true) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + lock (_lock) + { + var result = new HashSet(); + + // Check the six adjacent cells (faces) + CheckNeighbor(chunk.X - 1, chunk.Y, chunk.Z, result); + CheckNeighbor(chunk.X + 1, chunk.Y, chunk.Z, result); + CheckNeighbor(chunk.X, chunk.Y - 1, chunk.Z, result); + CheckNeighbor(chunk.X, chunk.Y + 1, chunk.Z, result); + CheckNeighbor(chunk.X, chunk.Y, chunk.Z - 1, result); + CheckNeighbor(chunk.X, chunk.Y, chunk.Z + 1, result); + + // Check edge neighbors if requested + if (includeEdges) + { + // X-Y edges + CheckNeighbor(chunk.X - 1, chunk.Y - 1, chunk.Z, result); + CheckNeighbor(chunk.X + 1, chunk.Y - 1, chunk.Z, result); + CheckNeighbor(chunk.X - 1, chunk.Y + 1, chunk.Z, result); + CheckNeighbor(chunk.X + 1, chunk.Y + 1, chunk.Z, result); + + // X-Z edges + CheckNeighbor(chunk.X - 1, chunk.Y, chunk.Z - 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y, chunk.Z - 1, result); + CheckNeighbor(chunk.X - 1, chunk.Y, chunk.Z + 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y, chunk.Z + 1, result); + + // Y-Z edges + CheckNeighbor(chunk.X, chunk.Y - 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X, chunk.Y + 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X, chunk.Y - 1, chunk.Z + 1, result); + CheckNeighbor(chunk.X, chunk.Y + 1, chunk.Z + 1, result); + } + + // Check diagonal corners if requested + if (includeDiagonals) + { + CheckNeighbor(chunk.X - 1, chunk.Y - 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y - 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X - 1, chunk.Y + 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y + 1, chunk.Z - 1, result); + CheckNeighbor(chunk.X - 1, chunk.Y - 1, chunk.Z + 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y - 1, chunk.Z + 1, result); + CheckNeighbor(chunk.X - 1, chunk.Y + 1, chunk.Z + 1, result); + CheckNeighbor(chunk.X + 1, chunk.Y + 1, chunk.Z + 1, result); + } + + return result; + } + } + + /// + /// Helper method to check for chunks at a specific grid cell. + /// + private void CheckNeighbor(int x, int y, int z, HashSet result) + { + var key = (x, y, z); + if (_grid.TryGetValue(key, out var chunks)) + { + foreach (var chunk in chunks) + { + result.Add(chunk); + } + } + } + + /// + /// Performs a spatial operation on chunks in parallel. + /// + /// The operation to perform on each chunk + /// Optional filter to apply before processing + /// Maximum degree of parallelism (null for default) + /// A task that completes when all operations are done + public Task ProcessChunksParallelAsync( + Action operation, + Func? filter = null, + int? maxDegreeOfParallelism = null) + { + if (operation == null) + throw new ArgumentNullException(nameof(operation)); + + IEnumerable chunksToProcess; + + lock (_lock) + { + chunksToProcess = filter != null + ? _allChunks.Where(filter).ToList() + : _allChunks.ToList(); + } + + return ChunkTaskScheduler.RunBatchParallelAsync( + chunksToProcess.Select(chunk => new Action(() => operation(chunk))).ToArray(), + maxDegreeOfParallelism); + } + } + + /// + /// Interface for 3D chunks. + /// + public interface IChunk3D : IChunk + { + /// + /// The chunk's Z position in chunk coordinates. + /// + int Z { get; } + + /// + /// The depth of the chunk (in cells/tiles/units). + /// + int Depth { get; } + } +} diff --git a/src/AdvChkSys/Threading/ChunkAsyncLock.cs b/src/AdvChkSys/Threading/ChunkAsyncLock.cs new file mode 100644 index 0000000..27c9088 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkAsyncLock.cs @@ -0,0 +1,167 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Threading +{ + /// + /// Provides asynchronous locking for chunks. + /// + public class ChunkAsyncLock : IDisposable + { + // Lock objects for each chunk + private readonly ConcurrentDictionary _locks = new(); + + // Timer for cleanup + private readonly Timer _cleanupTimer; + + // Last access time for each lock + private readonly ConcurrentDictionary _lastAccessTime = new(); + + // Cleanup interval in minutes + private readonly int _cleanupIntervalMinutes; + + /// + /// Initializes a new instance of the ChunkAsyncLock class. + /// + /// Interval in minutes for cleaning up unused locks + public ChunkAsyncLock(int cleanupIntervalMinutes = 10) + { + _cleanupIntervalMinutes = cleanupIntervalMinutes; + _cleanupTimer = new Timer(CleanupUnusedLocks, null, + TimeSpan.FromMinutes(cleanupIntervalMinutes), + TimeSpan.FromMinutes(cleanupIntervalMinutes)); + } + + /// + /// Acquires a lock on a chunk asynchronously. + /// + /// The chunk to lock + /// Cancellation token + /// A disposable that releases the lock when disposed + public async Task LockAsync(IChunk chunk, CancellationToken cancellationToken = default) + { + var semaphore = _locks.GetOrAdd(chunk, _ => new SemaphoreSlim(1, 1)); + _lastAccessTime[chunk] = DateTime.UtcNow; + + try + { + await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // If canceled, check if we need to clean up the semaphore + if (semaphore.CurrentCount == 1) + { + // No one is waiting, try to remove + _locks.TryRemove(chunk, out _); + _lastAccessTime.TryRemove(chunk, out _); + semaphore.Dispose(); + } + + throw; + } + + return new LockReleaser(semaphore, chunk, this); + } + + /// + /// Tries to acquire a lock on a chunk asynchronously with a timeout. + /// + /// The chunk to lock + /// Timeout for acquiring the lock + /// Cancellation token + /// A tuple with a boolean indicating success and the lock releaser if successful + public async Task<(bool Success, IDisposable? LockReleaser)> TryLockAsync( + IChunk chunk, + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + var semaphore = _locks.GetOrAdd(chunk, _ => new SemaphoreSlim(1, 1)); + _lastAccessTime[chunk] = DateTime.UtcNow; + + if (await semaphore.WaitAsync(timeout, cancellationToken).ConfigureAwait(false)) + { + return (true, new LockReleaser(semaphore, chunk, this)); + } + + return (false, null); + } + + /// + /// Cleans up unused locks. + /// + private void CleanupUnusedLocks(object? state) + { + var now = DateTime.UtcNow; + var threshold = now.AddMinutes(-_cleanupIntervalMinutes); + + foreach (var chunk in _lastAccessTime.Keys) + { + if (_lastAccessTime.TryGetValue(chunk, out var lastAccess) && + lastAccess < threshold && + _locks.TryGetValue(chunk, out var semaphore)) + { + // Only remove if no one is waiting + if (semaphore.CurrentCount == 1) + { + if (_locks.TryRemove(chunk, out var removedSemaphore)) + { + _lastAccessTime.TryRemove(chunk, out _); + removedSemaphore.Dispose(); + } + } + } + } + } + + /// + /// Disposes resources. + /// + public void Dispose() + { + _cleanupTimer.Dispose(); + + foreach (var semaphore in _locks.Values) + { + semaphore.Dispose(); + } + + _locks.Clear(); + _lastAccessTime.Clear(); + } + + /// + /// Releases a lock when disposed. + /// + private class LockReleaser : IDisposable + { + private readonly SemaphoreSlim _semaphore; + private readonly IChunk _chunk; + private readonly ChunkAsyncLock _parent; + private bool _disposed; + + public LockReleaser(SemaphoreSlim semaphore, IChunk chunk, ChunkAsyncLock parent) + { + _semaphore = semaphore; + _chunk = chunk; + _parent = parent; + } + + public void Dispose() + { + if (!_disposed) + { + _semaphore.Release(); + _parent._lastAccessTime[_chunk] = DateTime.UtcNow; + _disposed = true; + } + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkOperationQueue.cs b/src/AdvChkSys/Threading/ChunkOperationQueue.cs new file mode 100644 index 0000000..0a54717 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkOperationQueue.cs @@ -0,0 +1,274 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Threading +{ + /// + /// Provides a queue for sequential operations on chunks. + /// + public class ChunkOperationQueue + { + // Queue of pending operations for each chunk + private readonly ConcurrentDictionary Operation, TaskCompletionSource Completion)>> _pendingOperations = new(); + + // Currently active operations + private readonly ConcurrentDictionary _activeOperations = new(); + + // Semaphore to limit concurrent operations + private readonly SemaphoreSlim _semaphore; + + // Cancellation for shutdown + private readonly CancellationTokenSource _shutdownCts = new(); + + /// + /// Initializes a new instance of the ChunkOperationQueue class. + /// + /// Maximum number of concurrent operations + public ChunkOperationQueue(int maxConcurrentOperations = 4) + { + _semaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); + } + + /// + /// Enqueues an operation to be performed on a chunk. + /// + /// The chunk to operate on + /// The operation to perform + /// A task that completes when the operation is done + public Task EnqueueOperationAsync(IChunk chunk, Func operation) + { + if (chunk == null) + throw new ArgumentNullException(nameof(chunk)); + + if (operation == null) + throw new ArgumentNullException(nameof(operation)); + + // Create a completion source for this operation + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Get or create the queue for this chunk + var queue = _pendingOperations.GetOrAdd(chunk, _ => new Queue<(Func, TaskCompletionSource)>()); + + // Add the operation to the queue + lock (queue) + { + queue.Enqueue((operation, completion)); + + // If this is the only operation, start processing + if (queue.Count == 1 && !_activeOperations.ContainsKey(chunk)) + { + StartProcessingChunkOperations(chunk); + } + } + + return completion.Task; + } + + /// + /// Starts processing operations for a chunk. + /// + private async void StartProcessingChunkOperations(IChunk chunk) + { + // Wait for a semaphore slot + await _semaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false); + + try + { + // Process operations until the queue is empty + while (!_shutdownCts.IsCancellationRequested) + { + // Get the next operation + (Func operation, TaskCompletionSource completion) nextOperation; + + var queue = _pendingOperations.GetOrAdd(chunk, _ => new Queue<(Func, TaskCompletionSource)>()); + + lock (queue) + { + if (queue.Count == 0) + { + // No more operations, remove the active task + _activeOperations.TryRemove(chunk, out _); + break; + } + + nextOperation = queue.Peek(); + } + + // Execute the operation + var task = ExecuteOperationAsync(chunk, nextOperation.operation, nextOperation.completion); + _activeOperations[chunk] = task; + + // Wait for completion + await task.ConfigureAwait(false); + + // Remove the completed operation + lock (queue) + { + queue.Dequeue(); + } + } + } + catch (OperationCanceledException) + { + // Shutdown requested + } + finally + { + // Release the semaphore + _semaphore.Release(); + } + } + + /// + /// Executes an operation and completes the task. + /// + private async Task ExecuteOperationAsync(IChunk chunk, Func operation, TaskCompletionSource completion) + { + try + { + // Track the operation for diagnostics + var operationId = ChunkThreadingDiagnostics.TrackOperationStart("ChunkOperation", chunk); + + try + { + // Execute the operation + await operation().ConfigureAwait(false); + + // Complete the task + completion.TrySetResult(null); + } + catch (Exception ex) + { + // Set the exception + completion.TrySetException(ex); + + // Log the error + ChunkThreadingDiagnostics.LogEvent("OperationError", $"Error in chunk operation: {ex.Message}"); + } + finally + { + // End tracking + ChunkThreadingDiagnostics.TrackOperationEnd(operationId); + } + } + catch (Exception ex) + { + // This should never happen, but just in case + completion.TrySetException(ex); + + // Log the error + ChunkThreadingDiagnostics.LogEvent("CriticalError", $"Critical error in operation execution: {ex.Message}"); + } + } + + /// + /// Gets the number of pending operations for a chunk. + /// + public int GetPendingOperationCount(IChunk chunk) + { + if (_pendingOperations.TryGetValue(chunk, out var queue)) + { + lock (queue) + { + return queue.Count; + } + } + + return 0; + } + + /// + /// Gets the total number of pending operations. + /// + public int PendingOperationCount + { + get + { + int count = 0; + foreach (var queue in _pendingOperations.Values) + { + lock (queue) + { + count += queue.Count; + } + } + return count; + } + } + + /// + /// Gets the number of active operations. + /// + public int ActiveOperationCount => _activeOperations.Count; + + /// + /// Cancels all pending operations for a chunk. + /// + /// The chunk to cancel operations for + /// The number of operations canceled + public int CancelOperations(IChunk chunk) + { + if (_pendingOperations.TryGetValue(chunk, out var queue)) + { + lock (queue) + { + int count = queue.Count; + + // Cancel all pending operations + while (queue.Count > 0) + { + var operation = queue.Dequeue(); + operation.Completion.TrySetCanceled(); + } + + return count; + } + } + + return 0; + } + + /// + /// Cancels all pending operations. + /// + /// The number of operations canceled + public int CancelAllOperations() + { + int count = 0; + + foreach (var chunk in _pendingOperations.Keys) + { + count += CancelOperations(chunk); + } + + return count; + } + + /// + /// Shuts down the operation queue. + /// + public async Task ShutdownAsync() + { + // Cancel all operations + _shutdownCts.Cancel(); + + // Cancel all pending operations + CancelAllOperations(); + + // Wait for active operations to complete + var tasks = new List(_activeOperations.Values); + if (tasks.Count > 0) + { + await Task.WhenAll(tasks).ConfigureAwait(false); + } + + // Dispose resources + _shutdownCts.Dispose(); + _semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkParallelProcessor.cs b/src/AdvChkSys/Threading/ChunkParallelProcessor.cs new file mode 100644 index 0000000..f435c6d --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkParallelProcessor.cs @@ -0,0 +1,252 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Threading +{ + /// + /// Provides advanced parallel processing for chunks. + /// + public static class ChunkParallelProcessor + { + /// + /// Processes chunks with dependency awareness. + /// + /// The chunks to process + /// The function to process each chunk + /// Function to get dependencies for a chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksWithDependenciesAsync( + IEnumerable chunks, + Func processor, + Func> getDependencies, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + // Build dependency graph + var dependencyGraph = BuildDependencyGraph(chunks, getDependencies); + + // Process in dependency order + await ProcessDependencyGraphAsync( + dependencyGraph, + processor, + maxDegreeOfParallelism, + cancellationToken).ConfigureAwait(false); + } + + /// + /// Builds a dependency graph for chunks. + /// + private static DependencyGraph BuildDependencyGraph( + IEnumerable chunks, + Func> getDependencies) + { + var graph = new DependencyGraph(); + + // Add all chunks to the graph + foreach (var chunk in chunks) + { + graph.AddNode(chunk); + } + + // Add dependencies + foreach (var chunk in chunks) + { + var dependencies = getDependencies(chunk); + foreach (var dependency in dependencies) + { + graph.AddDependency(chunk, dependency); + } + } + + return graph; + } + + /// + /// Processes a dependency graph in parallel. + /// + private static async Task ProcessDependencyGraphAsync( + DependencyGraph graph, + Func processor, + int? maxDegreeOfParallelism, + CancellationToken cancellationToken) + { + // Set up semaphore for parallelism control + var semaphore = new SemaphoreSlim( + maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism); + + // Track completed chunks + var completed = new ConcurrentDictionary(); + + // Get initial set of chunks with no dependencies + var readyChunks = new ConcurrentQueue(graph.GetNodesWithNoDependencies()); + + // Track active tasks + var activeTasks = new ConcurrentDictionary(); + + // Process until all chunks are completed + while (!readyChunks.IsEmpty || activeTasks.Count > 0) + { + // Check for cancellation + cancellationToken.ThrowIfCancellationRequested(); + + // Start processing ready chunks + while (readyChunks.TryDequeue(out var chunk)) + { + // Wait for a semaphore slot + await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + + // Start processing this chunk + var task = ProcessChunkAsync( + chunk, + processor, + graph, + completed, + readyChunks, + semaphore, + cancellationToken); + + activeTasks[chunk] = task; + + // When the task completes, remove it from active tasks + _ = task.ContinueWith(_ => + { + activeTasks.TryRemove(chunk, out _); + }, TaskContinuationOptions.ExecuteSynchronously); + } + + // If no ready chunks but active tasks, wait for one to complete + if (readyChunks.IsEmpty && activeTasks.Count > 0) + { + await Task.WhenAny(activeTasks.Values).ConfigureAwait(false); + } + } + + // Clean up + semaphore.Dispose(); + } + + /// + /// Processes a single chunk and updates the ready queue. + /// + private static async Task ProcessChunkAsync( + IChunk chunk, + Func processor, + DependencyGraph graph, + ConcurrentDictionary completed, + ConcurrentQueue readyChunks, + SemaphoreSlim semaphore, + CancellationToken cancellationToken) + { + try + { + // Process the chunk + await processor(chunk).ConfigureAwait(false); + + // Mark as completed + completed[chunk] = true; + + // Find dependents that are now ready + var dependents = graph.GetDependents(chunk); + foreach (var dependent in dependents) + { + // Check if all dependencies are completed + var dependencies = graph.GetDependencies(dependent); + if (dependencies.All(d => completed.ContainsKey(d))) + { + // All dependencies completed, add to ready queue + readyChunks.Enqueue(dependent); + } + } + } + finally + { + // Release the semaphore + semaphore.Release(); + } + } + + /// + /// Represents a dependency graph for chunks. + /// + private class DependencyGraph + { + // Map of chunk to its dependencies + private readonly Dictionary> _dependencies = new(); + + // Map of chunk to chunks that depend on it + private readonly Dictionary> _dependents = new(); + + /// + /// Adds a node to the graph. + /// + public void AddNode(IChunk chunk) + { + if (!_dependencies.ContainsKey(chunk)) + { + _dependencies[chunk] = new HashSet(); + } + + if (!_dependents.ContainsKey(chunk)) + { + _dependents[chunk] = new HashSet(); + } + } + + /// + /// Adds a dependency between two nodes. + /// + public void AddDependency(IChunk dependent, IChunk dependency) + { + // Add nodes if they don't exist + AddNode(dependent); + AddNode(dependency); + + // Add dependency + _dependencies[dependent].Add(dependency); + + // Add dependent + _dependents[dependency].Add(dependent); + } + + /// + /// Gets all nodes with no dependencies. + /// + public IEnumerable GetNodesWithNoDependencies() + { + return _dependencies.Where(kvp => kvp.Value.Count == 0).Select(kvp => kvp.Key); + } + + /// + /// Gets all dependencies of a node. + /// + public IEnumerable GetDependencies(IChunk chunk) + { + if (_dependencies.TryGetValue(chunk, out var dependencies)) + { + return dependencies; + } + + return Enumerable.Empty(); + } + + /// + /// Gets all dependents of a node. + /// + public IEnumerable GetDependents(IChunk chunk) + { + if (_dependents.TryGetValue(chunk, out var dependents)) + { + return dependents; + } + + return Enumerable.Empty(); + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkTaskScheduler.cs b/src/AdvChkSys/Threading/ChunkTaskScheduler.cs index 7a64867..ea13bc4 100644 --- a/src/AdvChkSys/Threading/ChunkTaskScheduler.cs +++ b/src/AdvChkSys/Threading/ChunkTaskScheduler.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -10,6 +12,18 @@ namespace AdvChkSys.Threading /// public static class ChunkTaskScheduler { + // Default maximum degree of parallelism + private static int _maxDegreeOfParallelism = Environment.ProcessorCount; + + /// + /// Gets or sets the maximum degree of parallelism for batch operations. + /// + public static int MaxDegreeOfParallelism + { + get => _maxDegreeOfParallelism; + set => _maxDegreeOfParallelism = value > 0 ? value : Environment.ProcessorCount; + } + /// /// Runs the given action asynchronously on the thread pool. /// @@ -41,5 +55,256 @@ namespace AdvChkSys.Threading { return Task.Run(func, cancellationToken); } + + /// + /// Creates a cancellation token that cancels after a timeout. + /// + /// The timeout + /// Optional token to combine with the timeout + /// A cancellation token that cancels after the timeout or when the input token is canceled + public static CancellationToken CreateTimeoutToken( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (timeout == Timeout.InfiniteTimeSpan && cancellationToken == CancellationToken.None) + return CancellationToken.None; + + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + if (timeout != Timeout.InfiniteTimeSpan) + cts.CancelAfter(timeout); + + return cts.Token; + } + + /// + /// Runs a task with a timeout. + /// + /// The task to run + /// The timeout + /// Optional cancellation token + /// The task result + /// Thrown if the task times out + public static async Task WithTimeoutAsync( + Task task, + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + using var timeoutCts = new CancellationTokenSource(timeout); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + timeoutCts.Token, cancellationToken); + + var completedTask = await Task.WhenAny(task, Task.Delay(timeout, linkedCts.Token)) + .ConfigureAwait(false); + + if (completedTask == task) + { + return await task.ConfigureAwait(false); + } + + throw new TimeoutException($"The operation timed out after {timeout.TotalMilliseconds}ms"); + } + + /// + /// Runs a task with a timeout. + /// + /// The task to run + /// The timeout + /// Optional cancellation token + /// Thrown if the task times out + public static async Task WithTimeoutAsync( + Task task, + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + using var timeoutCts = new CancellationTokenSource(timeout); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + timeoutCts.Token, cancellationToken); + + var completedTask = await Task.WhenAny(task, Task.Delay(timeout, linkedCts.Token)) + .ConfigureAwait(false); + + if (completedTask == task) + { + await task.ConfigureAwait(false); + return; + } + + throw new TimeoutException($"The operation timed out after {timeout.TotalMilliseconds}ms"); + } + + /// + /// Runs a task with a fallback value if it times out. + /// + /// The type of result + /// The task to run + /// The timeout + /// The fallback value to return if the task times out + /// Optional cancellation token + /// The task result or the fallback value if the task times out + public static async Task WithTimeoutOrDefaultAsync( + Task task, + TimeSpan timeout, + T fallbackValue, + CancellationToken cancellationToken = default) + { + try + { + return await WithTimeoutAsync(task, timeout, cancellationToken).ConfigureAwait(false); + } + catch (TimeoutException) + { + return fallbackValue; + } + } + + /// + /// Runs multiple actions in parallel with a limit on the degree of parallelism. + /// + public static Task RunBatchParallelAsync(IEnumerable actions, int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + return Task.Run(() => + { + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? MaxDegreeOfParallelism, + CancellationToken = cancellationToken + }; + + Parallel.ForEach(actions, options, action => action()); + }, cancellationToken); + } + + /// + /// Runs a batch of functions in parallel and returns the results. + /// + /// The type of result + /// The functions to run + /// Maximum degree of parallelism (null for default) + /// Cancellation token + /// The results in the same order as the functions + public static async Task RunBatchParallelAsync( + IEnumerable> functions, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var funcs = functions.ToArray(); + var results = new T[funcs.Length]; + + await Task.Run(() => + { + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? MaxDegreeOfParallelism, + CancellationToken = cancellationToken + }; + + Parallel.For(0, funcs.Length, options, i => + { + results[i] = funcs[i](); + }); + }, cancellationToken).ConfigureAwait(false); + + return results; + } + + /// + /// Runs a batch of async functions in parallel. + /// + /// The async functions to run + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task RunBatchParallelAsync( + IEnumerable> functions, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var funcs = functions.ToArray(); + + if (funcs.Length == 0) + return; + + // Use SemaphoreSlim to limit concurrency + using var semaphore = new SemaphoreSlim( + maxDegreeOfParallelism ?? MaxDegreeOfParallelism); + + // Create tasks for all functions + var tasks = new List(funcs.Length); + + foreach (var func in funcs) + { + // Wait for a slot in the semaphore + await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + + // Create a task that releases the semaphore when done + var task = Task.Run(async () => + { + try + { + await func().ConfigureAwait(false); + } + finally + { + semaphore.Release(); + } + }, cancellationToken); + + tasks.Add(task); + } + + // Wait for all tasks to complete + await Task.WhenAll(tasks).ConfigureAwait(false); + } + + /// + /// Runs a batch of async functions in parallel and returns the results. + /// + /// The type of result + /// The async functions to run + /// Maximum degree of parallelism (null for default) + /// Cancellation token + /// The results in the same order as the functions + public static async Task RunBatchParallelAsync( + IEnumerable>> functions, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var funcs = functions.ToArray(); + + if (funcs.Length == 0) + return Array.Empty(); + + // Use SemaphoreSlim to limit concurrency + using var semaphore = new SemaphoreSlim( + maxDegreeOfParallelism ?? MaxDegreeOfParallelism); + + // Create tasks for all functions + var tasks = new Task[funcs.Length]; + + for (int i = 0; i < funcs.Length; i++) + { + var func = funcs[i]; + var index = i; + + // Wait for a slot in the semaphore + await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + + // Create a task that releases the semaphore when done + tasks[index] = Task.Run(async () => + { + try + { + return await func().ConfigureAwait(false); + } + finally + { + semaphore.Release(); + } + }, cancellationToken); + } + + // Wait for all tasks to complete + return await Task.WhenAll(tasks).ConfigureAwait(false); + } } } \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkTaskSchedulerExtensions.cs b/src/AdvChkSys/Threading/ChunkTaskSchedulerExtensions.cs new file mode 100644 index 0000000..e69de29 diff --git a/src/AdvChkSys/Threading/ChunkThreadSafetyManager.cs b/src/AdvChkSys/Threading/ChunkThreadSafetyManager.cs new file mode 100644 index 0000000..dcb4594 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadSafetyManager.cs @@ -0,0 +1,237 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Threading +{ + /// + /// Provides synchronous locking for chunks. + /// + public class ChunkThreadSafetyManager : IDisposable + { + // Lock objects for each chunk + private readonly ConcurrentDictionary _locks = new(); + + // Timer for cleanup + private readonly Timer _cleanupTimer; + + // Last access time for each lock + private readonly ConcurrentDictionary _lastAccessTime = new(); + + // Cleanup interval in minutes + private readonly int _cleanupIntervalMinutes; + + /// + /// Initializes a new instance of the ChunkThreadSafetyManager class. + /// + /// Interval in minutes for cleaning up unused locks + public ChunkThreadSafetyManager(int cleanupIntervalMinutes = 10) + { + _cleanupIntervalMinutes = cleanupIntervalMinutes; + _cleanupTimer = new Timer(CleanupUnusedLocks, null, + TimeSpan.FromMinutes(cleanupIntervalMinutes), + TimeSpan.FromMinutes(cleanupIntervalMinutes)); + } + + /// + /// Acquires an exclusive lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireLock(IChunk chunk) + { + var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock()); + _lastAccessTime[chunk] = DateTime.UtcNow; + + lockObj.EnterWriteLock(); + + return new LockReleaser(lockObj, chunk, this, LockType.Write); + } + + /// + /// Acquires a read lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireReadLock(IChunk chunk) + { + var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock()); + _lastAccessTime[chunk] = DateTime.UtcNow; + + lockObj.EnterReadLock(); + + return new LockReleaser(lockObj, chunk, this, LockType.Read); + } + + /// + /// Acquires a write lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireWriteLock(IChunk chunk) + { + var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock()); + _lastAccessTime[chunk] = DateTime.UtcNow; + + lockObj.EnterWriteLock(); + + return new LockReleaser(lockObj, chunk, this, LockType.Write); + } + + /// + /// Tries to acquire a read lock on a chunk. + /// + /// The chunk to lock + /// The timeout + /// A disposable that releases the lock when disposed, or null if the lock could not be acquired + public IDisposable? TryAcquireReadLock(IChunk chunk, TimeSpan timeout) + { + var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock()); + _lastAccessTime[chunk] = DateTime.UtcNow; + + if (lockObj.TryEnterReadLock(timeout)) + { + return new LockReleaser(lockObj, chunk, this, LockType.Read); + } + + // Track contention + ChunkThreadingDiagnostics.TrackLockContention(chunk); + + return null; + } + + /// + /// Tries to acquire a write lock on a chunk. + /// + /// The chunk to lock + /// The timeout + /// A disposable that releases the lock when disposed, or null if the lock could not be acquired + public IDisposable? TryAcquireWriteLock(IChunk chunk, TimeSpan timeout) + { + var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock()); + _lastAccessTime[chunk] = DateTime.UtcNow; + + if (lockObj.TryEnterWriteLock(timeout)) + { + return new LockReleaser(lockObj, chunk, this, LockType.Write); + } + + // Track contention + ChunkThreadingDiagnostics.TrackLockContention(chunk); + + return null; + } + + /// + /// Cleans up unused locks. + /// + private void CleanupUnusedLocks(object? state) + { + var now = DateTime.UtcNow; + var threshold = now.AddMinutes(-_cleanupIntervalMinutes); + + foreach (var chunk in _lastAccessTime.Keys) + { + if (_lastAccessTime.TryGetValue(chunk, out var lastAccess) && + lastAccess < threshold && + _locks.TryGetValue(chunk, out var lockObj)) + { + // Only remove if no one is using the lock + if (!lockObj.IsReadLockHeld && !lockObj.IsWriteLockHeld) + { + if (_locks.TryRemove(chunk, out var removedLock)) + { + _lastAccessTime.TryRemove(chunk, out _); + removedLock.Dispose(); + } + } + } + } + } + + /// + /// Disposes resources. + /// + public void Dispose() + { + _cleanupTimer.Dispose(); + + foreach (var lockObj in _locks.Values) + { + lockObj.Dispose(); + } + + _locks.Clear(); + _lastAccessTime.Clear(); + } + + /// + /// Type of lock. + /// + private enum LockType + { + Read, + Write + } + + /// + /// Wrapper around ReaderWriterLockSlim. + /// + private class ChunkLock : IDisposable + { + private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion); + + public void EnterReadLock() => _lock.EnterReadLock(); + public void ExitReadLock() => _lock.ExitReadLock(); + public void EnterWriteLock() => _lock.EnterWriteLock(); + public void ExitWriteLock() => _lock.ExitWriteLock(); + + public bool TryEnterReadLock(TimeSpan timeout) => _lock.TryEnterReadLock(timeout); + public bool TryEnterWriteLock(TimeSpan timeout) => _lock.TryEnterWriteLock(timeout); + + public bool IsReadLockHeld => _lock.IsReadLockHeld; + public bool IsWriteLockHeld => _lock.IsWriteLockHeld; + + public void Dispose() => _lock.Dispose(); + } + + /// + /// Releases a lock when disposed. + /// + private class LockReleaser : IDisposable + { + private readonly ChunkLock _lock; + private readonly IChunk _chunk; + private readonly ChunkThreadSafetyManager _parent; + private readonly LockType _lockType; + private bool _disposed; + + public LockReleaser(ChunkLock lockObj, IChunk chunk, ChunkThreadSafetyManager parent, LockType lockType) + { + _lock = lockObj; + _chunk = chunk; + _parent = parent; + _lockType = lockType; + } + + public void Dispose() + { + if (!_disposed) + { + if (_lockType == LockType.Read) + { + _lock.ExitReadLock(); + } + else + { + _lock.ExitWriteLock(); + } + + _parent._lastAccessTime[_chunk] = DateTime.UtcNow; + _disposed = true; + } + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingConfiguration.cs b/src/AdvChkSys/Threading/ChunkThreadingConfiguration.cs new file mode 100644 index 0000000..0e16d2c --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingConfiguration.cs @@ -0,0 +1,98 @@ +using System; +using System.Threading; + +namespace AdvChkSys.Threading +{ + /// + /// Provides centralized configuration for threading in the chunk system. + /// + public static class ChunkThreadingConfiguration + { + private static int _defaultMaxDegreeOfParallelism = Environment.ProcessorCount; + private static int _chunkOperationTimeout = 30000; // 30 seconds + private static int _lockCleanupInterval = 10; // 10 minutes + + /// + /// Gets or sets the default maximum degree of parallelism for chunk operations. + /// + public static int DefaultMaxDegreeOfParallelism + { + get => _defaultMaxDegreeOfParallelism; + set + { + if (value < 1) + throw new ArgumentOutOfRangeException(nameof(value), "Parallelism must be at least 1"); + _defaultMaxDegreeOfParallelism = value; + ChunkTaskScheduler.MaxDegreeOfParallelism = value; + } + } + + /// + /// Gets or sets the timeout in milliseconds for chunk operations. + /// + public static int ChunkOperationTimeoutMs + { + get => _chunkOperationTimeout; + set + { + if (value < 0) + throw new ArgumentOutOfRangeException(nameof(value), "Timeout cannot be negative"); + _chunkOperationTimeout = value; + } + } + + /// + /// Gets or sets the interval in minutes for cleaning up unused locks. + /// + public static int LockCleanupIntervalMinutes + { + get => _lockCleanupInterval; + set + { + if (value < 1) + throw new ArgumentOutOfRangeException(nameof(value), "Cleanup interval must be at least 1 minute"); + _lockCleanupInterval = value; + } + } + + /// + /// Creates a cancellation token with the default timeout. + /// + public static CancellationToken CreateTimeoutToken(CancellationToken cancellationToken = default) + { + return ChunkTaskScheduler.CreateTimeoutToken( + TimeSpan.FromMilliseconds(_chunkOperationTimeout), + cancellationToken); + } + + /// + /// Configures the system for high throughput (more parallelism, longer timeouts). + /// + public static void ConfigureForHighThroughput() + { + DefaultMaxDegreeOfParallelism = Math.Max(4, Environment.ProcessorCount * 2); + ChunkOperationTimeoutMs = 60000; // 1 minute + LockCleanupIntervalMinutes = 30; + } + + /// + /// Configures the system for low latency (less parallelism, shorter timeouts). + /// + public static void ConfigureForLowLatency() + { + DefaultMaxDegreeOfParallelism = Math.Max(2, Environment.ProcessorCount / 2); + ChunkOperationTimeoutMs = 15000; // 15 seconds + LockCleanupIntervalMinutes = 5; + } + + /// + /// Configures the system for memory efficiency (less parallelism, more aggressive cleanup). + /// + public static void ConfigureForMemoryEfficiency() + { + DefaultMaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 4); + ChunkOperationTimeoutMs = 45000; // 45 seconds + LockCleanupIntervalMinutes = 2; + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingDiagnostics.cs b/src/AdvChkSys/Threading/ChunkThreadingDiagnostics.cs new file mode 100644 index 0000000..dc9f915 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingDiagnostics.cs @@ -0,0 +1,273 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using AdvChkSys.Interfaces; + + +namespace AdvChkSys.Threading +{ + /// + /// Provides diagnostic information about threading operations in the chunk system. + /// + public static class ChunkThreadingDiagnostics + { + // Track operation durations + private static readonly ConcurrentDictionary> _operationDurations = new(); + + // Track lock contention + private static readonly ConcurrentDictionary _lockContentionCount = new(); + + // Track active operations + private static readonly ConcurrentDictionary _activeOperations = new(); + + // Lock for thread safety + private static readonly object _lock = new(); + + // Stopwatch for timing + private static readonly Stopwatch _stopwatch = Stopwatch.StartNew(); + + /// + /// Tracks the duration of an operation. + /// + /// The name of the operation + /// The action to perform + public static void TrackOperation(string operationName, Action action) + { + var operationId = Guid.NewGuid(); + var startTime = _stopwatch.ElapsedMilliseconds; + + try + { + action(); + } + finally + { + var duration = _stopwatch.ElapsedMilliseconds - startTime; + + lock (_lock) + { + if (!_operationDurations.TryGetValue(operationName, out var durations)) + { + durations = new List(); + _operationDurations[operationName] = durations; + } + + durations.Add(duration); + + // Keep only the last 1000 durations + if (durations.Count > 1000) + { + durations.RemoveAt(0); + } + } + } + } + + /// + /// Tracks the start of an operation on a chunk. + /// + /// The name of the operation + /// The chunk being operated on + /// An operation ID for tracking + public static Guid TrackOperationStart(string operationName, IChunk chunk) + { + var operationId = Guid.NewGuid(); + _activeOperations[operationId] = (operationName, DateTime.UtcNow, chunk); + return operationId; + } + + /// + /// Tracks the end of an operation. + /// + /// The operation ID from TrackOperationStart + public static void TrackOperationEnd(Guid operationId) + { + if (_activeOperations.TryRemove(operationId, out var info)) + { + var duration = (DateTime.UtcNow - info.StartTime).TotalMilliseconds; + + lock (_lock) + { + if (!_operationDurations.TryGetValue(info.Operation, out var durations)) + { + durations = new List(); + _operationDurations[info.Operation] = durations; + } + + durations.Add((long)duration); + + // Keep only the last 1000 durations + if (durations.Count > 1000) + { + durations.RemoveAt(0); + } + } + } + } + + /// + /// Tracks lock contention on a chunk. + /// + /// The chunk that had lock contention + public static void TrackLockContention(IChunk chunk) + { + _lockContentionCount.AddOrUpdate(chunk, 1, (_, count) => count + 1); + } + + /// + /// Gets statistics about operation durations. + /// + public static Dictionary GetOperationStatistics() + { + var result = new Dictionary(); + + lock (_lock) + { + foreach (var kvp in _operationDurations) + { + var durations = kvp.Value; + if (durations.Count > 0) + { + result[kvp.Key] = ( + durations.Min(), + durations.Max(), + durations.Average(), + durations.Count + ); + } + } + } + + return result; + } + + /// + /// Gets the chunks with the most lock contention. + /// + /// Number of chunks to return + public static List<(IChunk Chunk, int ContentionCount)> GetTopContentionChunks(int topCount = 10) + { + return _lockContentionCount + .OrderByDescending(kvp => kvp.Value) + .Take(topCount) + .Select(kvp => (kvp.Key, kvp.Value)) + .ToList(); + } + + /// + /// Gets information about currently active operations. + /// + public static List<(string Operation, TimeSpan Duration, IChunk Chunk)> GetActiveOperations() + { + var now = DateTime.UtcNow; + return _activeOperations + .Select(kvp => ( + kvp.Value.Operation, + now - kvp.Value.StartTime, + kvp.Value.Chunk + )) + .OrderByDescending(x => x.Item2) + .ToList(); + } + + /// + /// Gets the total number of tracked operations. + /// + public static int GetTotalOperationCount() + { + lock (_lock) + { + return _operationDurations.Values.Sum(list => list.Count); + } + } + + /// + /// Gets the number of active operations. + /// + public static int GetActiveOperationCount() + { + return _activeOperations.Count; + } + + /// + /// Gets the total number of lock contentions. + /// + public static int GetTotalLockContentionCount() + { + return _lockContentionCount.Values.Sum(); + } + + /// + /// Clears all diagnostic data. + /// + public static void ClearDiagnosticData() + { + lock (_lock) + { + _operationDurations.Clear(); + _lockContentionCount.Clear(); + // Don't clear active operations as they're still in progress + } + } + + /// + /// Gets a comprehensive diagnostic report. + /// + public static string GenerateDiagnosticReport() + { + var report = new System.Text.StringBuilder(); + + report.AppendLine("=== Chunk Threading Diagnostic Report ==="); + report.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC"); + report.AppendLine(); + + // Operation statistics + report.AppendLine("== Operation Statistics =="); + var stats = GetOperationStatistics(); + foreach (var kvp in stats.OrderByDescending(s => s.Value.Average)) + { + report.AppendLine($"{kvp.Key}:"); + report.AppendLine($" Count: {kvp.Value.Count}"); + report.AppendLine($" Min: {kvp.Value.Min}ms"); + report.AppendLine($" Max: {kvp.Value.Max}ms"); + report.AppendLine($" Avg: {kvp.Value.Average:F2}ms"); + } + report.AppendLine(); + + // Lock contention + report.AppendLine("== Lock Contention =="); + var contentions = GetTopContentionChunks(10); + foreach (var (chunk, count) in contentions) + { + report.AppendLine($"Chunk ({chunk.X}, {chunk.Y}): {count} contentions"); + } + report.AppendLine($"Total contentions: {GetTotalLockContentionCount()}"); + report.AppendLine(); + + // Active operations + report.AppendLine("== Active Operations =="); + var activeOps = GetActiveOperations(); + foreach (var (operation, duration, chunk) in activeOps) + { + report.AppendLine($"{operation} on Chunk ({chunk.X}, {chunk.Y}): {duration.TotalMilliseconds:F2}ms"); + } + report.AppendLine($"Total active operations: {GetActiveOperationCount()}"); + + return report.ToString(); + } + + /// + /// Logs a diagnostic event. + /// + /// The name of the event + /// Additional details + public static void LogEvent(string eventName, string details) + { + // This could be expanded to log to a file or other destination + Debug.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] {eventName}: {details}"); + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingExtensions.cs b/src/AdvChkSys/Threading/ChunkThreadingExtensions.cs new file mode 100644 index 0000000..222a836 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingExtensions.cs @@ -0,0 +1,380 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; +using AdvChkSys.Manager; +using AdvChkSys.Loading; + +namespace AdvChkSys.Threading +{ + /// + /// Provides extension methods for threading operations on chunks. + /// + public static class ChunkThreadingExtensions + { + /// + /// Processes all loaded chunks in parallel. + /// + /// The type of chunk data + /// The chunk manager + /// The action to perform on each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static Task ProcessAllChunksParallelAsync( + this ChunkManager2D manager, + Action> action, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + return ChunkTaskScheduler.RunBatchParallelAsync( + chunks.Select(c => new Action(() => action(c))).ToArray(), + maxDegreeOfParallelism, + cancellationToken); + } + + /// + /// Processes all loaded chunks in parallel. + /// + /// The type of chunk data + /// The chunk manager + /// The action to perform on each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static Task ProcessAllChunksParallelAsync( + this ChunkManager3D manager, + Action> action, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + return ChunkTaskScheduler.RunBatchParallelAsync( + chunks.Select(c => new Action(() => action(c))).ToArray(), + maxDegreeOfParallelism, + cancellationToken); + } + + /// + /// Processes chunks in a region in parallel. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate + /// Minimum Y coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// The action to perform on each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksInRegionParallelAsync( + this ChunkManager2D manager, + int minX, int minY, int maxX, int maxY, + Action> action, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = new List>(); + + // Collect chunks in the region + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + var chunk = manager.GetChunk(x, y); + if (chunk != null) + { + chunks.Add(chunk); + } + } + } + + // Process in parallel + await ChunkTaskScheduler.RunBatchParallelAsync( + chunks.Select(c => new Action(() => action(c))).ToArray(), + maxDegreeOfParallelism, + cancellationToken).ConfigureAwait(false); + } + + /// + /// Processes chunks in a region in parallel. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate + /// Minimum Y coordinate + /// Minimum Z coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// Maximum Z coordinate + /// The action to perform on each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksInRegionParallelAsync( + this ChunkManager3D manager, + int minX, int minY, int minZ, int maxX, int maxY, int maxZ, + Action> action, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = new List>(); + + // Collect chunks in the region + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + for (int z = minZ; z <= maxZ; z++) + { + var chunk = manager.GetChunk(x, y, z); + if (chunk != null) + { + chunks.Add(chunk); + } + } + } + } + + // Process in parallel + await ChunkTaskScheduler.RunBatchParallelAsync( + chunks.Select(c => new Action(() => action(c))).ToArray(), + maxDegreeOfParallelism, + cancellationToken).ConfigureAwait(false); + } + + /// + /// Loads chunks in a region asynchronously with a specified priority. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate + /// Minimum Y coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// Chunk width + /// Chunk height + /// Loading priority + /// Cancellation token + public static async Task LoadChunksInRegionAsync( + this ChunkManager2D manager, + int minX, int minY, int maxX, int maxY, + int width, int height, + ChunkLoadingPriority.Priority priority = ChunkLoadingPriority.Priority.Normal, + CancellationToken cancellationToken = default) + { + // Create a loading priority system if not already integrated + var loadingPriority = new ChunkLoadingPriority(); + + // Enqueue all chunks in the region + var tasks = new List(); + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + if (cancellationToken.IsCancellationRequested) + break; + + // Skip if already loaded + if (manager.IsChunkLoaded(x, y)) + continue; + + // Enqueue the request + var request = loadingPriority.EnqueueRequest(x, y, width, height, priority); + + // Create a task that completes when the chunk is loaded + var tcs = new TaskCompletionSource(); + + void Handler(object? sender, ChunkLoadingPriority.ChunkLoadRequest completedRequest) + { + if (completedRequest.RequestId == request.RequestId) + { + loadingPriority.RequestCompleted -= Handler; + tcs.TrySetResult(true); + } + } + + loadingPriority.RequestCompleted += Handler; + + // Add timeout to prevent indefinite waiting + _ = Task.Delay(30000, cancellationToken).ContinueWith(t => + { + loadingPriority.RequestCompleted -= Handler; + if (!t.IsCanceled) + tcs.TrySetResult(false); + }, cancellationToken); + + tasks.Add(tcs.Task); + } + } + + // Wait for all chunks to be loaded + await Task.WhenAll(tasks).ConfigureAwait(false); + + // Clean up + await loadingPriority.ShutdownAsync().ConfigureAwait(false); + } + + /// + /// Loads chunks in a region asynchronously with a specified priority. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate + /// Minimum Y coordinate + /// Minimum Z coordinate + /// Maximum X coordinate + /// Maximum Y coordinate + /// Maximum Z coordinate + /// Chunk width + /// Chunk height + /// Chunk depth + /// Loading priority + /// Cancellation token + public static async Task LoadChunksInRegionAsync( + this ChunkManager3D manager, + int minX, int minY, int minZ, int maxX, int maxY, int maxZ, + int width, int height, int depth, + ChunkLoadingPriority.Priority priority = ChunkLoadingPriority.Priority.Normal, + CancellationToken cancellationToken = default) + { + // Create a loading priority system if not already integrated + var loadingPriority = new ChunkLoadingPriority(); + + // Enqueue all chunks in the region + var tasks = new List(); + for (int x = minX; x <= maxX; x++) + { + for (int y = minY; y <= maxY; y++) + { + for (int z = minZ; z <= maxZ; z++) + { + if (cancellationToken.IsCancellationRequested) + break; + + // Skip if already loaded + if (manager.IsChunkLoaded(x, y, z)) + continue; + + // Enqueue the request + var request = loadingPriority.EnqueueRequest(x, y, z, width, height, depth, priority); + + // Create a task that completes when the chunk is loaded + var tcs = new TaskCompletionSource(); + + void Handler(object? sender, ChunkLoadingPriority.ChunkLoadRequest completedRequest) + { + if (completedRequest.RequestId == request.RequestId) + { + loadingPriority.RequestCompleted -= Handler; + tcs.TrySetResult(true); + } + } + + loadingPriority.RequestCompleted += Handler; + + // Add timeout to prevent indefinite waiting + _ = Task.Delay(30000, cancellationToken).ContinueWith(t => + { + loadingPriority.RequestCompleted -= Handler; + if (!t.IsCanceled) + tcs.TrySetResult(false); + }, cancellationToken); + + tasks.Add(tcs.Task); + } + } + } + + // Wait for all chunks to be loaded + await Task.WhenAll(tasks).ConfigureAwait(false); + + // Clean up + await loadingPriority.ShutdownAsync().ConfigureAwait(false); + } + + /// + /// Unloads chunks outside a specified region asynchronously. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate to keep + /// Minimum Y coordinate to keep + /// Maximum X coordinate to keep + /// Maximum Y coordinate to keep + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task UnloadChunksOutsideRegionAsync( + this ChunkManager2D manager, + int minX, int minY, int maxX, int maxY, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + // Get all loaded chunks + var chunks = manager.GetAllChunks().ToArray(); + + // Filter chunks outside the region + var chunksToUnload = chunks.Where(c => + c.X < minX || c.X > maxX || c.Y < minY || c.Y > maxY).ToArray(); + + // Unload in parallel + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? Environment.ProcessorCount, + CancellationToken = cancellationToken + }; + + await Task.Run(() => + { + Parallel.ForEach(chunksToUnload, options, chunk => + { + manager.UnloadChunk(chunk.X, chunk.Y); + }); + }, cancellationToken).ConfigureAwait(false); + } + + /// + /// Unloads chunks outside a specified region asynchronously. + /// + /// The type of chunk data + /// The chunk manager + /// Minimum X coordinate to keep + /// Minimum Y coordinate to keep + /// Minimum Z coordinate to keep + /// Maximum X coordinate to keep + /// Maximum Y coordinate to keep + /// Maximum Z coordinate to keep + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task UnloadChunksOutsideRegionAsync( + this ChunkManager3D manager, + int minX, int minY, int minZ, int maxX, int maxY, int maxZ, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + // Get all loaded chunks + var chunks = manager.GetAllChunks().ToArray(); + + // Filter chunks outside the region + var chunksToUnload = chunks.Where(c => + c.X < minX || c.X > maxX || + c.Y < minY || c.Y > maxY || + c.Z < minZ || c.Z > maxZ).ToArray(); + + // Unload in parallel + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? Environment.ProcessorCount, + CancellationToken = cancellationToken + }; + + await Task.Run(() => + { + Parallel.ForEach(chunksToUnload, options, chunk => + { + manager.UnloadChunk(chunk.X, chunk.Y, chunk.Z); + }); + }, cancellationToken).ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingExtensions2.cs b/src/AdvChkSys/Threading/ChunkThreadingExtensions2.cs new file mode 100644 index 0000000..38d9388 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingExtensions2.cs @@ -0,0 +1,223 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Chunk; +using AdvChkSys.Dependencies; +using AdvChkSys.Interfaces; +using AdvChkSys.Manager; + +namespace AdvChkSys.Threading +{ + /// + /// Provides additional extension methods for threading operations on chunks. + /// + public static class ChunkThreadingExtensions2 + { + /// + /// Processes chunks in a spiral pattern from the center outward. + /// + /// The type of chunk data + /// The chunk manager + /// Center X coordinate + /// Center Y coordinate + /// Radius in chunks + /// The function to process each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksSpiralAsync( + this ChunkManager2D manager, + int centerX, int centerY, int radius, + Func, Task> processor, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + // Generate spiral coordinates + var coordinates = GenerateSpiralCoordinates(centerX, centerY, radius).ToArray(); + + // Process in batches to maintain the spiral ordering while still using parallelism + int batchSize = Math.Max(1, (int)Math.Sqrt(coordinates.Length)); + int batchCount = (coordinates.Length + batchSize - 1) / batchSize; + + for (int i = 0; i < batchCount; i++) + { + if (cancellationToken.IsCancellationRequested) + break; + + var batchCoords = coordinates + .Skip(i * batchSize) + .Take(batchSize) + .ToArray(); + + var tasks = new List(); + foreach (var (x, y) in batchCoords) + { + var chunk = manager.GetChunk(x, y); + if (chunk != null) + { + tasks.Add(processor(chunk)); + } + } + + if (tasks.Count > 0) + { + await Task.WhenAll(tasks).ConfigureAwait(false); + } + } + } + + /// + /// Generates coordinates in a spiral pattern from the center outward. + /// + private static IEnumerable<(int X, int Y)> GenerateSpiralCoordinates(int centerX, int centerY, int radius) + { + // Start with the center + yield return (centerX, centerY); + + // Spiral outward + for (int layer = 1; layer <= radius; layer++) + { + // Top edge (moving right) + for (int x = centerX - layer + 1; x <= centerX + layer; x++) + { + yield return (x, centerY - layer); + } + + // Right edge (moving down) + for (int y = centerY - layer + 1; y <= centerY + layer; y++) + { + yield return (centerX + layer, y); + } + + // Bottom edge (moving left) + for (int x = centerX + layer - 1; x >= centerX - layer; x--) + { + yield return (x, centerY + layer); + } + + // Left edge (moving up) + for (int y = centerY + layer - 1; y >= centerY - layer; y--) + { + yield return (centerX - layer, y); + } + } + } + + /// + /// Processes chunks with dependency awareness. + /// + /// The type of chunk data + /// The chunk manager + /// The dependency tracker + /// The function to process each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksWithDependenciesAsync( + this ChunkManager2D manager, + ChunkDependencyTracker dependencyTracker, + Func, Task> processor, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + + await ChunkParallelProcessor.ProcessChunksWithDependenciesAsync( + chunks, + chunk => processor((Chunk2D)chunk), + chunk => dependencyTracker.GetDependencies(chunk), + maxDegreeOfParallelism, + cancellationToken).ConfigureAwait(false); + } + + /// + /// Processes chunks with dependency awareness. + /// + /// The type of chunk data + /// The chunk manager + /// The dependency tracker + /// The function to process each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksWithDependenciesAsync( + this ChunkManager3D manager, + ChunkDependencyTracker dependencyTracker, + Func, Task> processor, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + + await ChunkParallelProcessor.ProcessChunksWithDependenciesAsync( + chunks, + chunk => processor((Chunk3D)chunk), + chunk => dependencyTracker.GetDependencies(chunk), + maxDegreeOfParallelism, + cancellationToken).ConfigureAwait(false); + } + + /// + /// Processes chunks with thread safety. + /// + /// The type of chunk data + /// The chunk manager + /// The function to process each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksSafelyAsync( + this ChunkManager2D manager, + Func, Task> processor, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + var threadingManager = ChunkThreadingManager.Instance; + + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism, + CancellationToken = cancellationToken + }; + + await Task.Run(() => Parallel.ForEach(chunks, options, async chunk => + { + await threadingManager.WithLockAsync(chunk, async () => + { + await processor(chunk).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + }), cancellationToken).ConfigureAwait(false); + } + + /// + /// Processes chunks with thread safety. + /// + /// The type of chunk data + /// The chunk manager + /// The function to process each chunk + /// Maximum degree of parallelism (null for default) + /// Cancellation token + public static async Task ProcessChunksSafelyAsync( + this ChunkManager3D manager, + Func, Task> processor, + int? maxDegreeOfParallelism = null, + CancellationToken cancellationToken = default) + { + var chunks = manager.GetAllChunks().ToArray(); + var threadingManager = ChunkThreadingManager.Instance; + + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism, + CancellationToken = cancellationToken + }; + + await Task.Run(() => Parallel.ForEach(chunks, options, async chunk => + { + await threadingManager.WithLockAsync(chunk, async () => + { + await processor(chunk).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + }), cancellationToken).ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingManager.cs b/src/AdvChkSys/Threading/ChunkThreadingManager.cs new file mode 100644 index 0000000..515e370 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingManager.cs @@ -0,0 +1,281 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using AdvChkSys.Interfaces; + +namespace AdvChkSys.Threading +{ + /// + /// Provides a centralized manager for all threading operations in the chunk system. + /// + public class ChunkThreadingManager : IDisposable + { + private static ChunkThreadingManager? _instance; + private static readonly object _instanceLock = new(); + + // Thread safety utilities + private readonly ChunkThreadSafetyManager _threadSafety; + private readonly ChunkAsyncLock _asyncLock; + + // Operation queue + private readonly ChunkOperationQueue _operationQueue; + + // Cancellation for shutdown + private readonly CancellationTokenSource _shutdownCts = new(); + + /// + /// Gets the singleton instance of the ChunkThreadingManager. + /// + public static ChunkThreadingManager Instance + { + get + { + if (_instance == null) + { + lock (_instanceLock) + { + _instance ??= new ChunkThreadingManager(); + } + } + return _instance; + } + } + + /// + /// Initializes a new instance of the ChunkThreadingManager class. + /// + private ChunkThreadingManager() + { + _threadSafety = new ChunkThreadSafetyManager( + ChunkThreadingConfiguration.LockCleanupIntervalMinutes); + + _asyncLock = new ChunkAsyncLock( + ChunkThreadingConfiguration.LockCleanupIntervalMinutes); + + _operationQueue = new ChunkOperationQueue( + ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism); + } + + /// + /// Gets the thread safety manager for synchronous locking. + /// + public ChunkThreadSafetyManager ThreadSafety => _threadSafety; + + /// + /// Gets the async lock manager for asynchronous locking. + /// + public ChunkAsyncLock AsyncLock => _asyncLock; + + /// + /// Gets the operation queue for sequential chunk operations. + /// + public ChunkOperationQueue OperationQueue => _operationQueue; + + /// + /// Acquires an exclusive lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireLock(IChunk chunk) + { + return _threadSafety.AcquireLock(chunk); + } + + /// + /// Acquires a read lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireReadLock(IChunk chunk) + { + return _threadSafety.AcquireReadLock(chunk); + } + + /// + /// Acquires a write lock on a chunk. + /// + /// The chunk to lock + /// A disposable that releases the lock when disposed + public IDisposable AcquireWriteLock(IChunk chunk) + { + return _threadSafety.AcquireWriteLock(chunk); + } + + /// + /// Acquires a lock on a chunk asynchronously. + /// + /// The chunk to lock + /// Cancellation token + /// A disposable that releases the lock when disposed + public Task LockAsync(IChunk chunk, CancellationToken cancellationToken = default) + { + return _asyncLock.LockAsync(chunk, cancellationToken); + } + + /// + /// Enqueues an operation to be performed on a chunk. + /// + /// The chunk to operate on + /// The operation to perform + /// A task that completes when the operation is done + public Task EnqueueOperationAsync(IChunk chunk, Func operation) + { + return _operationQueue.EnqueueOperationAsync(chunk, operation); + } + + /// + /// Runs an action with a lock on the chunk. + /// + /// The chunk to lock + /// The action to perform + public void WithLock(IChunk chunk, Action action) + { + using var lockObj = AcquireLock(chunk); + action(); + } + + /// + /// Runs a function with a lock on the chunk and returns the result. + /// + /// The type of result + /// The chunk to lock + /// The function to perform + /// The result of the function + public T WithLock(IChunk chunk, Func func) + { + using var lockObj = AcquireLock(chunk); + return func(); + } + + /// + /// Runs an async action with a lock on the chunk. + /// + /// The chunk to lock + /// The async action to perform + /// Cancellation token + public async Task WithLockAsync(IChunk chunk, Func action, CancellationToken cancellationToken = default) + { + using var lockObj = await LockAsync(chunk, cancellationToken).ConfigureAwait(false); + await action().ConfigureAwait(false); + } + + /// + /// Runs an async function with a lock on the chunk and returns the result. + /// + /// The type of result + /// The chunk to lock + /// The async function to perform + /// Cancellation token + /// The result of the function + public async Task WithLockAsync(IChunk chunk, Func> func, CancellationToken cancellationToken = default) + { + using var lockObj = await LockAsync(chunk, cancellationToken).ConfigureAwait(false); + return await func().ConfigureAwait(false); + } + + /// + /// Runs an action on multiple chunks with proper locking to avoid deadlocks. + /// + /// The chunks to lock, in order + /// The action to perform + public void WithMultiLock(IChunk[] chunks, Action action) + { + // Sort chunks by ID to prevent deadlocks + var sortedChunks = SortChunksById(chunks); + + // Acquire locks in order + var locks = new IDisposable[sortedChunks.Length]; + try + { + for (int i = 0; i < sortedChunks.Length; i++) + { + locks[i] = AcquireLock(sortedChunks[i]); + } + + // Execute the action + action(); + } + finally + { + // Release locks in reverse order + for (int i = locks.Length - 1; i >= 0; i--) + { + locks[i]?.Dispose(); + } + } + } + + /// + /// Runs an async action on multiple chunks with proper locking to avoid deadlocks. + /// + /// The chunks to lock, in order + /// The async action to perform + /// Cancellation token + public async Task WithMultiLockAsync(IChunk[] chunks, Func action, CancellationToken cancellationToken = default) + { + // Sort chunks by ID to prevent deadlocks + var sortedChunks = SortChunksById(chunks); + + // Acquire locks in order + var locks = new IDisposable[sortedChunks.Length]; + try + { + for (int i = 0; i < sortedChunks.Length; i++) + { + locks[i] = await LockAsync(sortedChunks[i], cancellationToken).ConfigureAwait(false); + } + + // Execute the action + await action().ConfigureAwait(false); + } + finally + { + // Release locks in reverse order + for (int i = locks.Length - 1; i >= 0; i--) + { + locks[i]?.Dispose(); + } + } + } + + /// + /// Sorts chunks by ID to prevent deadlocks when acquiring multiple locks. + /// + private IChunk[] SortChunksById(IChunk[] chunks) + { + // Use System.Linq for OrderBy + return chunks.OrderBy(c => c.GetHashCode()).ToArray(); + } + + /// + /// Disposes all resources. + /// + public void Dispose() + { + _shutdownCts.Cancel(); + + // Shutdown operation queue + _operationQueue.ShutdownAsync().GetAwaiter().GetResult(); + + // Dispose thread safety managers + (_threadSafety as IDisposable)?.Dispose(); + (_asyncLock as IDisposable)?.Dispose(); + + _shutdownCts.Dispose(); + } + + /// + /// Resets the singleton instance (for testing). + /// + internal static void ResetInstance() + { + lock (_instanceLock) + { + _instance?.Dispose(); + _instance = null; + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/ChunkThreadingPerformanceMonitor.cs b/src/AdvChkSys/Threading/ChunkThreadingPerformanceMonitor.cs new file mode 100644 index 0000000..e4edbb5 --- /dev/null +++ b/src/AdvChkSys/Threading/ChunkThreadingPerformanceMonitor.cs @@ -0,0 +1,257 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace AdvChkSys.Threading +{ + /// + /// Monitors and reports on threading performance in the chunk system. + /// + public class ChunkThreadingPerformanceMonitor : IDisposable + { + // Performance metrics + private readonly ConcurrentDictionary _metrics = new(); + + // Sampling timer + private readonly Timer _samplingTimer; + + // Sampling interval + private readonly TimeSpan _samplingInterval; + + // Maximum history to keep + private readonly int _maxHistory; + + // Whether monitoring is enabled + private bool _isEnabled; + + /// + /// Initializes a new instance of the ChunkThreadingPerformanceMonitor class. + /// + /// Sampling interval in milliseconds + /// Maximum number of samples to keep + public ChunkThreadingPerformanceMonitor(int samplingIntervalMs = 1000, int maxHistory = 60) + { + _samplingInterval = TimeSpan.FromMilliseconds(samplingIntervalMs); + _maxHistory = maxHistory; + _samplingTimer = new Timer(SamplePerformance, null, Timeout.Infinite, Timeout.Infinite); + } + + /// + /// Starts monitoring performance. + /// + public void Start() + { + if (!_isEnabled) + { + _isEnabled = true; + _samplingTimer.Change(TimeSpan.Zero, _samplingInterval); + } + } + + /// + /// Stops monitoring performance. + /// + public void Stop() + { + if (_isEnabled) + { + _isEnabled = false; + _samplingTimer.Change(Timeout.Infinite, Timeout.Infinite); + } + } + + /// + /// Samples performance metrics. + /// + private void SamplePerformance(object? state) + { + if (!_isEnabled) + return; + + // Sample thread pool information + ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads); + ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads); + + // Calculate thread pool usage + double workerThreadUsage = 1.0 - ((double)workerThreads / maxWorkerThreads); + double ioThreadUsage = 1.0 - ((double)completionPortThreads / maxCompletionPortThreads); + + // Update metrics + UpdateMetric("ThreadPool.WorkerThreadUsage", workerThreadUsage); + UpdateMetric("ThreadPool.IOThreadUsage", ioThreadUsage); + + // Sample operation queue information + var operationQueue = ChunkThreadingManager.Instance.OperationQueue; + UpdateMetric("OperationQueue.PendingOperations", operationQueue.PendingOperationCount); + UpdateMetric("OperationQueue.ActiveOperations", operationQueue.ActiveOperationCount); + + // Sample diagnostics information + UpdateMetric("Diagnostics.ActiveOperations", ChunkThreadingDiagnostics.GetActiveOperationCount()); + UpdateMetric("Diagnostics.LockContentions", ChunkThreadingDiagnostics.GetTotalLockContentionCount()); + + // Sample process information + var process = Process.GetCurrentProcess(); + // Fix: Use process.TotalProcessorTime and calculate uptime manually + TimeSpan upTime = DateTime.Now - process.StartTime; + UpdateMetric("Process.CPU", process.TotalProcessorTime.TotalMilliseconds / Environment.ProcessorCount / upTime.TotalMilliseconds); + UpdateMetric("Process.Memory", process.WorkingSet64 / 1024.0 / 1024.0); // MB + UpdateMetric("Process.Threads", process.Threads.Count); + } + + /// + /// Updates a performance metric. + /// + private void UpdateMetric(string name, double value) + { + if (!_metrics.TryGetValue(name, out var metric)) + { + metric = new PerformanceMetric(_maxHistory); + _metrics[name] = metric; + } + + metric.AddSample(value); + } + + /// + /// Gets a performance report. + /// + public Dictionary GetPerformanceReport() + { + var report = new Dictionary(); + + foreach (var kvp in _metrics) + { + report[kvp.Key] = kvp.Value.GetReport(); + } + + return report; + } + + /// + /// Gets a formatted performance report. + /// + public string GetFormattedReport() + { + var report = new System.Text.StringBuilder(); + + report.AppendLine("=== Chunk Threading Performance Report ==="); + report.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC"); + report.AppendLine(); + + var metrics = _metrics.OrderBy(m => m.Key).ToList(); + foreach (var kvp in metrics) + { + var metricReport = kvp.Value.GetReport(); + report.AppendLine($"{kvp.Key}:"); + report.AppendLine($" Current: {metricReport.Current:F3}"); + report.AppendLine($" Average: {metricReport.Average:F3}"); + report.AppendLine($" Min: {metricReport.Min:F3}"); + report.AppendLine($" Max: {metricReport.Max:F3}"); + report.AppendLine(); + } + + return report.ToString(); + } + + /// + /// Disposes resources. + /// + public void Dispose() + { + Stop(); + _samplingTimer.Dispose(); + } + + /// + /// Represents a performance metric with history. + /// + private class PerformanceMetric + { + private readonly Queue _samples; + private readonly int _maxSamples; + + public PerformanceMetric(int maxSamples) + { + _maxSamples = maxSamples; + _samples = new Queue(maxSamples); + } + + public void AddSample(double value) + { + lock (_samples) + { + _samples.Enqueue(value); + + while (_samples.Count > _maxSamples) + { + _samples.Dequeue(); + } + } + } + + public PerformanceReport GetReport() + { + lock (_samples) + { + if (_samples.Count == 0) + { + return new PerformanceReport(0, 0, 0, 0, Array.Empty()); + } + + var samplesArray = _samples.ToArray(); + return new PerformanceReport( + samplesArray.Last(), + samplesArray.Average(), + samplesArray.Min(), + samplesArray.Max(), + samplesArray + ); + } + } + } + + /// + /// Represents a performance report. + /// + public class PerformanceReport + { + /// + /// The current value. + /// + public double Current { get; } + + /// + /// The average value. + /// + public double Average { get; } + + /// + /// The minimum value. + /// + public double Min { get; } + + /// + /// The maximum value. + /// + public double Max { get; } + + /// + /// The history of values. + /// + public double[] History { get; } + + public PerformanceReport(double current, double average, double min, double max, double[] history) + { + Current = current; + Average = average; + Min = min; + Max = max; + History = history; + } + } + } +} \ No newline at end of file diff --git a/src/AdvChkSys/Threading/LimitedConcurrencyTaskScheduler.cs b/src/AdvChkSys/Threading/LimitedConcurrencyTaskScheduler.cs new file mode 100644 index 0000000..3e375f3 --- /dev/null +++ b/src/AdvChkSys/Threading/LimitedConcurrencyTaskScheduler.cs @@ -0,0 +1,170 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace AdvChkSys.Threading +{ + /// + /// Provides a task scheduler that ensures a maximum concurrency level while + /// running on top of the ThreadPool. + /// + internal class LimitedConcurrencyTaskScheduler : TaskScheduler + { + // Indicates whether the current thread is processing work items. + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + + // The list of tasks to be executed + private readonly LinkedList _tasks = new LinkedList(); + + // The maximum concurrency level allowed by this scheduler. + private int _maximumConcurrencyLevel; + + // Indicates whether the scheduler is currently processing work items. + private int _delegatesQueuedOrRunning; + + /// + /// Creates a new instance with the specified degree of parallelism. + /// + /// The maximum degree of parallelism + public LimitedConcurrencyTaskScheduler(int maximumConcurrencyLevel) + { + if (maximumConcurrencyLevel < 1) + throw new ArgumentOutOfRangeException(nameof(maximumConcurrencyLevel)); + _maximumConcurrencyLevel = maximumConcurrencyLevel; + } + + /// + /// Gets the maximum concurrency level supported by this scheduler. + /// + public override int MaximumConcurrencyLevel => _maximumConcurrencyLevel; + + /// + /// Sets the maximum concurrency level for this scheduler. + /// + /// The new maximum concurrency level + public void SetMaximumConcurrencyLevel(int value) + { + if (value < 1) + throw new ArgumentOutOfRangeException(nameof(value)); + _maximumConcurrencyLevel = value; + } + + /// + /// Queues a task to the scheduler. + /// + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. + lock (_tasks) + { + _tasks.AddLast(task); + } + + // If there aren't enough delegates currently queued or running to process + // tasks, schedule another. + if (Interlocked.Increment(ref _delegatesQueuedOrRunning) <= _maximumConcurrencyLevel) + { + ThreadPool.QueueUserWorkItem(ProcessQueuedTasks); + } + else + { + Interlocked.Decrement(ref _delegatesQueuedOrRunning); + } + } + + /// + /// Attempts to execute the specified task on the current thread. + /// + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) + return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) + { + // Try to remove the task from the queue. + lock (_tasks) + { + if (_tasks.Contains(task)) + { + _tasks.Remove(task); + } + else + { + // The task isn't in the queue anymore, so it was probably already executed + return false; + } + } + } + + // Try to execute the task. + bool result = TryExecuteTask(task); + return result; + } + + /// + /// Attempts to remove a previously scheduled task from the scheduler. + /// + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) + { + return _tasks.Remove(task); + } + } + + /// + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// + protected sealed override IEnumerable GetScheduledTasks() + { + lock (_tasks) + { + return new List(_tasks); + } + } + + /// + /// Processes tasks in the queue. + /// + private void ProcessQueuedTasks(object? state) + { + // This thread is now processing work items. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task? task = null; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + Interlocked.Decrement(ref _delegatesQueuedOrRunning); + break; + } + + // Get the next item from the queue + task = _tasks.First!.Value; + _tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + TryExecuteTask(task); + } + } + finally + { + // We're done processing items on the current thread + _currentThreadIsProcessingItems = false; + } + } + } +} \ No newline at end of file