completed thread sync [feature:thread_sync]

completed spatial queries [feature:spatial_queries]

added spatial interfaces [feature:spatial_interfaces]

added parallel processing [new-feature:parallel_processing]

[changelog: Implemented Threading Explicit synchronization and concurrent collections]
[changelog: Implemented Spatial indexing and region queries with 2D/3D support and quadtree optimization]
[changelog: Added spatial interfaces Interface-level spatial query methods with support for custom filters]
[changelog: Added parallel processing of chunks based on spatial queries and regions]
This commit is contained in:
Stan44 2025-05-11 20:15:37 -05:00
parent 09ee0d89df
commit d5c8fd1bf8
23 changed files with 5716 additions and 65 deletions

115
build.bat
View File

@ -4,17 +4,6 @@ setlocal enabledelayedexpansion
:: AdvChkSys Build Script
:: ======================
:: Set colors for console output
set "RESET=[0m"
set "BRIGHT=[1m"
set "RED=[91m"
set "GREEN=[92m"
set "YELLOW=[93m"
set "BLUE=[94m"
set "MAGENTA=[95m"
set "CYAN=[96m"
set "WHITE=[97m"
:: Set build parameters
set "PROJECT_DIR=src\AdvChkSys"
set "BENCHMARKS_DIR=src\AdvChkSys.Benchmarks"
@ -25,22 +14,22 @@ set "VERSION=0.1.8"
if not exist "%OUTPUT_DIR%" mkdir "%OUTPUT_DIR%"
:: Display header
echo %BRIGHT%%CYAN%╔══════════════════════════════════════════════════════════════╗%RESET%
echo %BRIGHT%%CYAN%║ ADVANCED CHUNK SYSTEM BUILD ║%RESET%
echo %BRIGHT%%CYAN%╚══════════════════════════════════════════════════════════════╝%RESET%
echo ╔══════════════════════════════════════════════════════════════╗
echo ║ ADVANCED CHUNK SYSTEM BUILD ║
echo ╚══════════════════════════════════════════════════════════════╝
echo.
echo %BRIGHT%%WHITE%Version: %VERSION%%RESET%
echo %BRIGHT%%WHITE%Date: %DATE% %TIME%%RESET%
echo Version: %VERSION%
echo Date: %DATE% %TIME%
echo.
:: Check for .NET SDK
echo %YELLOW%Checking for .NET SDK...%RESET%
echo Checking for .NET SDK...
dotnet --version > nul 2>&1
if %ERRORLEVEL% neq 0 (
echo %RED%Error: .NET SDK not found. Please install the .NET SDK.%RESET%
echo Error: .NET SDK not found. Please install the .NET SDK.
exit /b 1
)
echo %GREEN%✓ .NET SDK found%RESET%
echo ✓ .NET SDK found
echo.
:: Parse command line arguments
@ -81,40 +70,40 @@ goto parse_args
:end_parse_args
:: Display build configuration
echo %BRIGHT%%MAGENTA%Build Configuration:%RESET%
echo %MAGENTA%• Configuration: %CONFIG%%RESET%
echo %MAGENTA%• Windows: %BUILD_WINDOWS%%RESET%
echo %MAGENTA%• Linux: %BUILD_LINUX%%RESET%
echo %MAGENTA%• macOS: %BUILD_MAC%%RESET%
echo %MAGENTA%• Benchmarks: %BUILD_BENCHMARKS%%RESET%
echo %MAGENTA%• NuGet Package: %BUILD_NUGET%%RESET%
echo %MAGENTA%• Documentation: %BUILD_DOCS%%RESET%
echo Build Configuration:
echo • Configuration: %CONFIG%
echo • Windows: %BUILD_WINDOWS%
echo • Linux: %BUILD_LINUX%
echo • macOS: %BUILD_MAC%
echo • Benchmarks: %BUILD_BENCHMARKS%
echo • NuGet Package: %BUILD_NUGET%
echo • Documentation: %BUILD_DOCS%
echo.
:: Clean previous builds
echo %YELLOW%Cleaning previous builds...%RESET%
echo Cleaning previous builds...
if exist "%PROJECT_DIR%\bin" rd /s /q "%PROJECT_DIR%\bin"
if exist "%PROJECT_DIR%\obj" rd /s /q "%PROJECT_DIR%\obj"
if exist "%BENCHMARKS_DIR%\bin" rd /s /q "%BENCHMARKS_DIR%\bin"
if exist "%BENCHMARKS_DIR%\obj" rd /s /q "%BENCHMARKS_DIR%\obj"
echo %GREEN%✓ Clean completed%RESET%
echo ✓ Clean completed
echo.
:: Restore packages
echo %YELLOW%Restoring packages...%RESET%
echo Restoring packages...
dotnet restore "%PROJECT_DIR%\AdvChkSys.csproj"
if %ERRORLEVEL% neq 0 (
echo %RED%✗ Package restore failed%RESET%
echo ✗ Package restore failed
exit /b 1
)
echo %GREEN%✓ Packages restored%RESET%
echo ✓ Packages restored
echo.
:: Build the library (platform-agnostic)
echo %BRIGHT%%BLUE%Building AdvChkSys library...%RESET%
echo Building AdvChkSys library...
dotnet build "%PROJECT_DIR%\AdvChkSys.csproj" -c %CONFIG% --no-restore
if %ERRORLEVEL% neq 0 (
echo %RED%✗ Library build failed%RESET%
echo ✗ Library build failed
exit /b 1
)
@ -128,27 +117,27 @@ if "%BUILD_WINDOWS%"=="true" (
if not exist "%OUTPUT_DIR%\windows" mkdir "%OUTPUT_DIR%\windows"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\windows\"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\windows\"
echo %GREEN%✓ Windows build completed%RESET%
echo ✓ Windows build completed
)
if "%BUILD_LINUX%"=="true" (
if not exist "%OUTPUT_DIR%\linux" mkdir "%OUTPUT_DIR%\linux"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\linux\"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\linux\"
echo %GREEN%✓ Linux build completed%RESET%
echo ✓ Linux build completed
)
if "%BUILD_MAC%"=="true" (
if not exist "%OUTPUT_DIR%\macos" mkdir "%OUTPUT_DIR%\macos"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.dll" "%OUTPUT_DIR%\macos\"
copy "%PROJECT_DIR%\bin\%CONFIG%\netstandard2.1\AdvChkSys.xml" "%OUTPUT_DIR%\macos\"
echo %GREEN%✓ macOS build completed%RESET%
echo ✓ macOS build completed
)
echo.
:: Build benchmarks
if "%BUILD_BENCHMARKS%"=="true" (
echo %BRIGHT%%BLUE%Building ChunkMark benchmarks...%RESET%
echo Building ChunkMark benchmarks...
:: Restore benchmark packages
dotnet restore "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj"
@ -157,11 +146,11 @@ if "%BUILD_BENCHMARKS%"=="true" (
if "%BUILD_WINDOWS%"=="true" (
dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r win-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true
if %ERRORLEVEL% neq 0 (
echo %RED%✗ Windows benchmark build failed%RESET%
echo ✗ Windows benchmark build failed
) else (
if not exist "%OUTPUT_DIR%\benchmarks\windows" mkdir "%OUTPUT_DIR%\benchmarks\windows"
copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\win-x64\publish\ChunkMark.exe" "%OUTPUT_DIR%\benchmarks\windows\"
echo %GREEN%✓ Windows benchmarks built%RESET%
echo ✓ Windows benchmarks built
)
)
@ -169,11 +158,11 @@ if "%BUILD_BENCHMARKS%"=="true" (
if "%BUILD_LINUX%"=="true" (
dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r linux-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true
if %ERRORLEVEL% neq 0 (
echo %RED%✗ Linux benchmark build failed%RESET%
echo ✗ Linux benchmark build failed
) else (
if not exist "%OUTPUT_DIR%\benchmarks\linux" mkdir "%OUTPUT_DIR%\benchmarks\linux"
copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\linux-x64\publish\ChunkMark" "%OUTPUT_DIR%\benchmarks\linux\"
echo %GREEN%✓ Linux benchmarks built%RESET%
echo ✓ Linux benchmarks built
)
)
@ -181,11 +170,11 @@ if "%BUILD_BENCHMARKS%"=="true" (
if "%BUILD_MAC%"=="true" (
dotnet publish "%BENCHMARKS_DIR%\AdvChkSys.Benchmarks.csproj" -c %CONFIG% -r osx-x64 --self-contained true -p:PublishSingleFile=true -p:PublishTrimmed=true
if %ERRORLEVEL% neq 0 (
echo %RED%✗ macOS benchmark build failed%RESET%
echo ✗ macOS benchmark build failed
) else (
if not exist "%OUTPUT_DIR%\benchmarks\macos" mkdir "%OUTPUT_DIR%\benchmarks\macos"
copy "%BENCHMARKS_DIR%\bin\%CONFIG%\net6.0\osx-x64\publish\ChunkMark" "%OUTPUT_DIR%\benchmarks\macos\"
echo %GREEN%✓ macOS benchmarks built%RESET%
echo ✓ macOS benchmarks built
)
)
echo.
@ -193,56 +182,56 @@ if "%BUILD_BENCHMARKS%"=="true" (
:: Build NuGet package
if "%BUILD_NUGET%"=="true" (
echo %BRIGHT%%BLUE%Building NuGet package...%RESET%
echo Building NuGet package...
dotnet pack "%PROJECT_DIR%\AdvChkSys.csproj" -c %CONFIG% --no-build --output "%OUTPUT_DIR%\nuget"
if %ERRORLEVEL% neq 0 (
echo %RED%✗ NuGet package creation failed%RESET%
echo ✗ NuGet package creation failed
exit /b 1
)
echo %GREEN%✓ NuGet package created%RESET%
echo ✓ NuGet package created
echo.
)
:: Build documentation
if "%BUILD_DOCS%"=="true" (
echo %BRIGHT%%BLUE%Building documentation...%RESET%
echo %YELLOW%Documentation generation not implemented yet.%RESET%
echo Building documentation...
echo Documentation generation not implemented yet.
echo.
)
:: Display summary
echo %BRIGHT%%CYAN%╔══════════════════════════════════════════════════════════════╗%RESET%
echo %BRIGHT%%CYAN%║ BUILD SUMMARY ║%RESET%
echo %BRIGHT%%CYAN%╚══════════════════════════════════════════════════════════════╝%RESET%
echo ╔══════════════════════════════════════════════════════════════╗
echo ║ BUILD SUMMARY ║
echo ╚══════════════════════════════════════════════════════════════╝
echo.
echo %BRIGHT%%GREEN%Build completed successfully!%RESET%
echo Build completed successfully!
echo.
echo %BRIGHT%%WHITE%Output files:%RESET%
echo %WHITE%• Library: %OUTPUT_DIR%\lib\AdvChkSys.dll%RESET%
echo Output files:
echo • Library: %OUTPUT_DIR%\lib\AdvChkSys.dll
if "%BUILD_WINDOWS%"=="true" (
echo %WHITE%• Windows: %OUTPUT_DIR%\windows\AdvChkSys.dll%RESET%
echo • Windows: %OUTPUT_DIR%\windows\AdvChkSys.dll
if "%BUILD_BENCHMARKS%"=="true" (
echo %WHITE%• Windows Benchmark: %OUTPUT_DIR%\benchmarks\windows\ChunkMark.exe%RESET%
echo • Windows Benchmark: %OUTPUT_DIR%\benchmarks\windows\ChunkMark.exe
)
)
if "%BUILD_LINUX%"=="true" (
echo %WHITE%• Linux: %OUTPUT_DIR%\linux\AdvChkSys.dll%RESET%
echo • Linux: %OUTPUT_DIR%\linux\AdvChkSys.dll
if "%BUILD_BENCHMARKS%"=="true" (
echo %WHITE%• Linux Benchmark: %OUTPUT_DIR%\benchmarks\linux\ChunkMark%RESET%
echo • Linux Benchmark: %OUTPUT_DIR%\benchmarks\linux\ChunkMark
)
)
if "%BUILD_MAC%"=="true" (
echo %WHITE%• macOS: %OUTPUT_DIR%\macos\AdvChkSys.dll%RESET%
echo • macOS: %OUTPUT_DIR%\macos\AdvChkSys.dll
if "%BUILD_BENCHMARKS%"=="true" (
echo %WHITE%• macOS Benchmark: %OUTPUT_DIR%\benchmarks\macos\ChunkMark%RESET%
echo • macOS Benchmark: %OUTPUT_DIR%\benchmarks\macos\ChunkMark
)
)
if "%BUILD_NUGET%"=="true" (
echo %WHITE%• NuGet: %OUTPUT_DIR%\nuget\AdvChkSys.%VERSION%.nupkg%RESET%
echo • NuGet: %OUTPUT_DIR%\nuget\AdvChkSys.%VERSION%.nupkg
)
echo.
echo %BRIGHT%%CYAN%Run the build script with --help for more options.%RESET%
echo Run the build script with --help for more options.
echo.
endlocal

122
docs/features.md Normal file
View File

@ -0,0 +1,122 @@
# Feature Tracking
## chunk_management
- Status: completed
- Description: 2D/3D chunk management with memory efficiency
- Last Update: 2025-05-10
## memory_efficiency
- Status: completed
- Description: LRU caching, array pooling, and air-singleton patterns
- Last Update: 2025-05-10
## resource_tracking
- Status: completed
- Description: Track and manage chunk resources
- Last Update: 2025-05-10
## async_tasks
- Status: completed
- Description: Asynchronous chunk loading and processing
- Last Update: 2025-05-10
## serialization
- Status: completed
- Description: Chunk serialization and deserialization
- Last Update: 2025-05-10
## event_system
- Status: completed
- Description: Events for load/unload/save operations
- Last Update: 2025-05-10
## interop
- Status: completed
- Description: Python and .NET interoperability
- Last Update: 2025-05-10
## spatial_queries
- Status: completed
- Description: Spatial indexing and region queries with 2D/3D support and quadtree optimization
- Last Update: 2025-11-08
## thread_sync
- Status: completed
- Description: Explicit synchronization and concurrent collections
- Last Update: 2025-05-11
## spatial_interfaces
- Status: completed
- Description: Interface-level spatial query methods with support for custom filters
- Last Update: 2025-05-11
## parallel_processing
- Status: completed
- Description: Parallel processing of chunks based on spatial queries and regions
- Last Update: 2025-05-11
## priority_loading
- Status: in_progress
- Description: Prioritized chunk loading system (partially implemented)
- Last Update: 2025-05-10
## dependency_tracking
- Status: in_progress
- Description: Dependency-aware disposal logic
- Last Update: 2025-05-10
## dispose_pattern
- Status: in_progress
- Description: Full dispose pattern with finalizers for unmanaged resources
- Last Update: 2025-05-10
## exception_handling
- Status: in_progress
- Description: Better async exception handling with specific types and logging
- Last Update: 2025-05-10
## performance_metrics
- Status: in_progress
- Description: Track load times, cache hit rates, and memory usage (partially implemented)
- Last Update: 2025-05-10
## progress_tracking
- Status: in_progress
- Description: Track progress and auto-update status documents (mostly implemented)
- Last Update: 2025-05-10
## git_integration
- Status: in_progress
- Description: Parse Git logs for status updates (partially implemented)
- Last Update: 2025-05-10
## feature_tracking
- Status: in_progress
- Description: Track feature statuses (partially implemented)
- Last Update: 2025-05-10
## doc_generation
- Status: in_progress
- Description: Auto-generate status docs and changelog (partially implemented)
- Last Update: 2025-05-10
## todo_management
- Status: in_progress
- Description: Automate updates based on code reviews and commits
- Last Update: 2025-05-10
## dependency_interfaces
- Status: planned
- Description: Interface-level dependency methods
- Last Update: 2025-05-10
## runtime_config
- Status: planned
- Description: Runtime-adjustable configuration options
- Last Update: 2025-05-10
## known_issues
- Status: in_progress
- Description: Edge chunk unload delay under high concurrency
- Last Update: 2025-05-10

View File

@ -0,0 +1,35 @@
# AdvChkSys Development Status
Last updated: 2025-05-11
## Feature Status
| Feature | Status | Description | Last Update |
|---------|--------|-------------|-------------|
| Chunk Management | [COMPLETED] | 2D/3D chunk management with memory efficiency | 2025-05-10 |
| Memory Efficiency | [COMPLETED] | LRU caching, array pooling, and air-singleton patterns | 2025-05-10 |
| Resource Tracking | [COMPLETED] | Track and manage chunk resources | 2025-05-10 |
| Async Tasks | [COMPLETED] | Asynchronous chunk loading and processing | 2025-05-10 |
| Serialization | [COMPLETED] | Chunk serialization and deserialization | 2025-05-10 |
| Event System | [COMPLETED] | Events for load/unload/save operations | 2025-05-10 |
| Interop | [COMPLETED] | Python and .NET interoperability | 2025-05-10 |
| Spatial Queries | [COMPLETED] | Spatial indexing and region queries with 2D/3D support and quadtree optimization | 2025-11-08 |
| Thread Sync | [COMPLETED] | Explicit synchronization and concurrent collections | 2025-05-11 |
| Spatial Interfaces | [COMPLETED] | Interface-level spatial query methods with support for custom filters | 2025-05-11 |
| Parallel Processing | [COMPLETED] | Parallel processing of chunks based on spatial queries and regions | 2025-05-11 |
| Priority Loading | [IN PROGRESS] | Prioritized chunk loading system (partially implemented) | 2025-05-10 |
| Dependency Tracking | [IN PROGRESS] | Dependency-aware disposal logic | 2025-05-10 |
| Dispose Pattern | [IN PROGRESS] | Full dispose pattern with finalizers for unmanaged resources | 2025-05-10 |
| Exception Handling | [IN PROGRESS] | Better async exception handling with specific types and logging | 2025-05-10 |
| Performance Metrics | [IN PROGRESS] | Track load times, cache hit rates, and memory usage (partially implemented) | 2025-05-10 |
| Progress Tracking | [IN PROGRESS] | Track progress and auto-update status documents (mostly implemented) | 2025-05-10 |
| Git Integration | [IN PROGRESS] | Parse Git logs for status updates (partially implemented) | 2025-05-10 |
| Feature Tracking | [IN PROGRESS] | Track feature statuses (partially implemented) | 2025-05-10 |
| Doc Generation | [IN PROGRESS] | Auto-generate status docs and changelog (partially implemented) | 2025-05-10 |
| Todo Management | [IN PROGRESS] | Automate updates based on code reviews and commits | 2025-05-10 |
| Dependency Interfaces | [PLANNED] | Interface-level dependency methods | 2025-05-10 |
| Runtime Config | [PLANNED] | Runtime-adjustable configuration options | 2025-05-10 |
| Known Issues | [IN PROGRESS] | Edge chunk unload delay under high concurrency | 2025-05-10 |
## Recent Updates

81
scripts/README.md Normal file
View File

@ -0,0 +1,81 @@
# AdvChkSys Development Scripts
## Track Progress Script
The `track_progress.py` script automates feature tracking, status updates, and changelog management for the AdvChkSys project.
### Overview
This script:
- Tracks features and their status in a human-readable Markdown file
- Generates a status document with current development progress
- Updates the changelog with tagged entries from commit messages
- Automatically detects new features from specially formatted commit messages
### Usage
Run the script periodically to update documentation:
```bash
python scripts/track_progress.py
```
### Commit Message Tags
The script recognizes special tags in commit messages:
#### 1. Feature Status Updates
```
git commit -m "Implemented spatial queries [feature:spatial_queries]"
```
This marks the `spatial_queries` feature as completed and records the date and author.
#### 2. New Features
```
git commit -m "Initial setup [new-feature:custom_serialization:Support for custom serialization formats]"
```
This adds a new feature called `custom_serialization` with the provided description and sets its status to "planned".
#### 3. Changelog Entries
```
git commit -m "Fixed bug in chunk loading [changelog:Fixed race condition in async chunk loading]"
```
This adds an entry to the changelog with the date, message, and commit hash.
#### 4. Status Updates
```
git commit -m "Completed milestone 1 [status:milestone1]"
```
This records a general status update.
### Output Files
The script generates and updates the following files:
- `docs/features.md`: Human-readable feature tracking in Markdown format
- `docs/status/ChunkManager-Status.md`: Status document with feature progress and recent updates
- `CHANGELOG.md`: Project changelog with tagged entries from commits
### Feature Statuses
Features can have the following statuses:
- `planned`: Feature is planned but not yet started
- `in_progress`: Feature is currently being implemented
- `completed`: Feature has been implemented
### Manual Editing
You can manually edit the `docs/features.md` file to update feature descriptions, statuses, or add new features. The script will preserve your changes when it runs.
### Requirements
- Python 3.6+
- Git repository

224
scripts/track_progress.py Normal file
View File

@ -0,0 +1,224 @@
import os
import re
import sys
from datetime import datetime
from subprocess import check_output
# Try to set UTF-8 encoding for better emoji support
sys.stderr = open(
sys.stderr.fileno(), mode="w", encoding="utf-8", errors="replace"
) # Configuration
PROGRESS_FILE = "docs/progress.json"
STATUS_DOC = "docs/status/ChunkManager-Status.md"
CHANGELOG_FILE = "CHANGELOG.md"
FEATURES_FILE = "docs/features.md"
GIT_LOG_LIMIT = 50 # number of commits to parse
# Regular expression to match tags in commit messages
STATUS_TAG_RE = re.compile(r"\[status:(\w+)\]")
CHANGELOG_TAG_RE = re.compile(r"\[changelog:(.+?)\]")
FEATURE_TAG_RE = re.compile(r"\[feature:(\w+)\]")
NEW_FEATURE_RE = re.compile(r"\[new-feature:(\w+):(.+?)\]")
# Ensure directories exist
os.makedirs(os.path.dirname(STATUS_DOC), exist_ok=True)
os.makedirs(os.path.dirname(FEATURES_FILE), exist_ok=True)
# Load features from Markdown file if it exists
features = {}
if os.path.exists(FEATURES_FILE):
with open(FEATURES_FILE, "r", encoding="utf-8") as f:
content = f.read()
# Parse the markdown file
sections = re.split(r"## (\w+)", content)[1:] # Skip the header
for i in range(0, len(sections), 2):
if i + 1 < len(sections):
feature_key = sections[i]
feature_content = sections[i + 1]
# Extract status, description, and date
status_match = re.search(r"- Status: (\w+)", feature_content)
desc_match = re.search(
r"- Description: (.+?)$", feature_content, re.MULTILINE
)
date_match = re.search(
r"- Last Update: (.+?)$", feature_content, re.MULTILINE
)
status = status_match.group(1) if status_match else "planned"
description = desc_match.group(1) if desc_match else ""
date = date_match.group(1) if date_match else ""
features[feature_key] = {
"status": status,
"description": description,
"date": date,
}
else:
# Default features
features = {
"spatial_queries": {
"status": "planned",
"description": "Methods to efficiently find chunks within regions or distances", # noqa: E501
},
"priority_loading": {
"status": "planned",
"description": "API to specify which chunks should be loaded first", # noqa: E501
},
"serialization_optimization": {
"status": "in_progress",
"description": "Further improvements to chunk saving/loading",
},
"chunk_dependency": {
"status": "planned",
"description": "For cases where chunks need to reference neighbors", # noqa: E501
},
}
# Get recent git log entries
git_log = check_output(
[
"git",
"log",
f"-n{GIT_LOG_LIMIT}",
"--pretty=format:%h|%s|%ad|%an",
"--date=short",
]
).decode()
# Parse and collect updates
status_updates = {}
changelog_entries = []
feature_updates = {}
new_features = {}
for line in git_log.splitlines():
parts = line.split("|")
if len(parts) < 4:
continue
commit_hash, subject, date, author = parts
status_match = STATUS_TAG_RE.search(subject)
changelog_match = CHANGELOG_TAG_RE.search(subject)
feature_match = FEATURE_TAG_RE.search(subject)
new_feature_match = NEW_FEATURE_RE.search(subject)
if status_match:
status_key = status_match.group(1)
status_updates[status_key] = {
"status": "done",
"commit": commit_hash,
"date": date,
"message": subject,
}
if changelog_match:
changelog_entries.append(
f"- {date}: {changelog_match.group(1)} ({commit_hash})"
)
if feature_match:
feature_key = feature_match.group(1)
if feature_key in features:
feature_updates[feature_key] = {
"status": "completed",
"date": date,
"author": author,
}
if new_feature_match:
feature_key = new_feature_match.group(1)
feature_desc = new_feature_match.group(2)
if feature_key not in features:
new_features[feature_key] = {
"status": "planned",
"description": feature_desc,
"date": date,
"author": author,
}
# Update features with new information
for feature_key, update in feature_updates.items():
features[feature_key].update(update)
# Add new features
for feature_key, feature_data in new_features.items():
features[feature_key] = feature_data
# Save features to Markdown file
with open(FEATURES_FILE, "w", encoding="utf-8") as f:
f.write("# Feature Tracking\n\n")
for feature_key, feature_data in features.items():
status = feature_data.get("status", "planned")
description = feature_data.get("description", "")
date = feature_data.get("date", "")
author = feature_data.get("author", "")
f.write(f"## {feature_key}\n")
f.write(f"- Status: {status}\n")
f.write(f"- Description: {description}\n")
f.write(f"- Last Update: {date}\n")
if author:
f.write(f"- Updated By: {author}\n")
f.write("\n")
# Generate status document
with open(STATUS_DOC, "w", encoding="utf-8") as f:
f.write("# AdvChkSys Development Status\n\n")
f.write(f"Last updated: {datetime.now().strftime('%Y-%m-%d')}\n\n")
f.write("## Feature Status\n\n")
f.write("| Feature | Status | Description | Last Update |\n")
f.write("|---------|--------|-------------|-------------|\n")
for feature_key, feature_data in features.items():
status = feature_data.get("status", "unknown")
description = feature_data.get("description", "")
date = feature_data.get("date", "")
# Use text indicators instead of emojis
status_indicator = {
"planned": "[PLANNED]",
"in_progress": "[IN PROGRESS]",
"completed": "[COMPLETED]",
"done": "[DONE]",
}.get(status, "[UNKNOWN]")
f.write(
f"| {feature_key.replace('_', ' ').title()} | {status_indicator} | {description} | {date} |\n" # noqa: E501
)
f.write("\n## Recent Updates\n\n")
for entry in changelog_entries[
:10
]: # Show only the 10 most recent entries
f.write(f"{entry}\n")
# Update changelog if there are new entries
if changelog_entries and os.path.exists(CHANGELOG_FILE):
with open(CHANGELOG_FILE, "r", encoding="utf-8") as f:
existing_changelog = f.read()
with open(CHANGELOG_FILE, "w", encoding="utf-8") as f:
# Add new entries at the top, under the first heading
lines = existing_changelog.splitlines()
insertion_point = next(
(i for i, line in enumerate(lines) if line.startswith("##")), 2
)
updated_changelog = "\n".join(lines[:insertion_point]) + "\n\n"
updated_changelog += "\n".join(changelog_entries) + "\n\n"
updated_changelog += "\n".join(lines[insertion_point:])
f.write(updated_changelog)
elif changelog_entries:
# Create new changelog file
with open(CHANGELOG_FILE, "w", encoding="utf-8") as f:
f.write("# Changelog\n\n")
f.write("\n".join(changelog_entries))
f.write("\n")
print(f"Progress tracking updated. Status document generated at {STATUS_DOC}")
print(f"Features updated at {FEATURES_FILE}")

View File

@ -4,7 +4,12 @@ using AdvChkSys.Manager;
using AdvChkSys.Chunk;
using AdvChkSys.Resources;
using AdvChkSys.Diagnostics;
using AdvChkSys.Threading;
using AdvChkSys.Spatial;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace AdvChkSys
{
@ -17,7 +22,7 @@ namespace AdvChkSys
/// <summary>
/// The current version of the AdvChkSys library.
/// </summary>
public static string Version => "0.1.8";
public static string Version => "0.2.0";
/// <summary>
/// Creates a new WorldConstraints object.
@ -49,6 +54,51 @@ namespace AdvChkSys
public static void LogMemoryUsage(Action<string> logAction) =>
MemoryUsageReporter.LogMemoryUsage(logAction);
/// <summary>
/// Gets the threading manager for advanced threading operations.
/// </summary>
public static ChunkThreadingManager ThreadingManager => ChunkThreadingManager.Instance;
/// <summary>
/// Runs a batch of chunk operations in parallel.
/// </summary>
public static Task RunParallelChunkOperationsAsync(IEnumerable<Action> operations,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
return ChunkTaskScheduler.RunBatchParallelAsync(operations, maxDegreeOfParallelism, cancellationToken);
}
/// <summary>
/// Creates a spatial index for 2D chunks.
/// </summary>
public static SpatialChunkIndex<Chunk2D<T>> CreateSpatialIndex2D<T>() =>
new SpatialChunkIndex<Chunk2D<T>>();
/// <summary>
/// Creates a spatial index for 3D chunks.
/// </summary>
public static SpatialChunkIndex<Chunk3D<T>> CreateSpatialIndex3D<T>() =>
new SpatialChunkIndex<Chunk3D<T>>();
/// <summary>
/// Configures the threading system for high throughput.
/// </summary>
public static void ConfigureForHighThroughput() =>
ChunkThreadingConfiguration.ConfigureForHighThroughput();
/// <summary>
/// Configures the threading system for low latency.
/// </summary>
public static void ConfigureForLowLatency() =>
ChunkThreadingConfiguration.ConfigureForLowLatency();
/// <summary>
/// Configures the threading system for memory efficiency.
/// </summary>
public static void ConfigureForMemoryEfficiency() =>
ChunkThreadingConfiguration.ConfigureForMemoryEfficiency();
/// <summary>
/// Performs a basic self-test of the AdvChkSys core functionality.
/// Returns true if all core systems are operational.
@ -84,11 +134,29 @@ namespace AdvChkSys
// Check constraints enforcement
bool constraintsWork = constraints.IsWithinBounds(1, 1) && constraints.IsWithinChunkLimit(2);
// Test spatial indexing
var spatialIndex2D = CreateSpatialIndex2D<byte>();
spatialIndex2D.AddChunk(chunk2D);
var chunksInRegion = spatialIndex2D.FindChunksInRegion(0, 0, 2, 2);
bool spatialIndexWorks = false;
foreach (var c in chunksInRegion)
{
if (c.Equals(chunk2D))
{
spatialIndexWorks = true;
break;
}
}
// Test threading
bool threadingWorks = ThreadingManager != null;
// Clean up
manager2D.UnloadChunk(1, 1);
manager3D.UnloadChunk(1, 1, 1);
return resourceTracking2D && resourceTracking3D && constraintsWork;
return resourceTracking2D && resourceTracking3D && constraintsWork &&
spatialIndexWorks && threadingWorks;
}
}
}

View File

@ -0,0 +1,310 @@
#nullable enable
using System.Collections.Generic;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Dependencies
{
/// <summary>
/// Tracks dependencies between chunks.
/// </summary>
public class ChunkDependencyTracker
{
/// <summary>
/// Defines the type of dependency between chunks.
/// </summary>
public enum DependencyType
{
Neighbor,
Reference,
Update,
Custom
}
// Dictionary to track dependencies: source chunk -> (target chunk, dependency type)
private readonly Dictionary<IChunk, HashSet<(IChunk Target, DependencyType Type)>> _dependencies = new();
// Dictionary to track dependents: target chunk -> (source chunk, dependency type)
private readonly Dictionary<IChunk, HashSet<(IChunk Source, DependencyType Type)>> _dependents = new();
// Lock object for thread safety
private readonly object _lock = new();
/// <summary>
/// Registers a dependency between two chunks.
/// </summary>
public void RegisterDependency(IChunk source, IChunk target, DependencyType type)
{
if (source == null || target == null)
return;
lock (_lock)
{
// Add to dependencies
if (!_dependencies.TryGetValue(source, out var targetSet))
{
targetSet = new HashSet<(IChunk, DependencyType)>();
_dependencies[source] = targetSet;
}
targetSet.Add((target, type));
// Add to dependents
if (!_dependents.TryGetValue(target, out var sourceSet))
{
sourceSet = new HashSet<(IChunk, DependencyType)>();
_dependents[target] = sourceSet;
}
sourceSet.Add((source, type));
}
}
/// <summary>
/// Removes a dependency between two chunks.
/// </summary>
public void RemoveDependency(IChunk source, IChunk target)
{
if (source == null || target == null)
return;
lock (_lock)
{
// Remove from dependencies
if (_dependencies.TryGetValue(source, out var targetSet))
{
targetSet.RemoveWhere(t => t.Target.Equals(target));
if (targetSet.Count == 0)
{
_dependencies.Remove(source);
}
}
// Remove from dependents
if (_dependents.TryGetValue(target, out var sourceSet))
{
sourceSet.RemoveWhere(s => s.Source.Equals(source));
if (sourceSet.Count == 0)
{
_dependents.Remove(target);
}
}
}
}
/// <summary>
/// Gets all chunks that depend on the specified chunk.
/// </summary>
public IEnumerable<IChunk> GetDependents(IChunk chunk)
{
if (chunk == null)
return new List<IChunk>();
lock (_lock)
{
if (!_dependents.TryGetValue(chunk, out var sourceSet))
{
return new List<IChunk>();
}
var result = new List<IChunk>(sourceSet.Count);
foreach (var (source, _) in sourceSet)
{
result.Add(source);
}
return result;
}
}
/// <summary>
/// Gets all chunks that the specified chunk depends on.
/// </summary>
public IEnumerable<IChunk> GetDependencies(IChunk chunk)
{
if (chunk == null)
return new List<IChunk>();
lock (_lock)
{
if (!_dependencies.TryGetValue(chunk, out var targetSet))
{
return new List<IChunk>();
}
var result = new List<IChunk>(targetSet.Count);
foreach (var (target, _) in targetSet)
{
result.Add(target);
}
return result;
}
}
/// <summary>
/// Gets all chunks that depend on the specified chunk with their dependency types.
/// </summary>
public IEnumerable<(IChunk Chunk, DependencyType Type)> GetDependentsWithTypes(IChunk chunk)
{
if (chunk == null)
return new List<(IChunk, DependencyType)>();
lock (_lock)
{
if (!_dependents.TryGetValue(chunk, out var sourceDependents))
{
return new List<(IChunk, DependencyType)>();
}
var result = new List<(IChunk, DependencyType)>(sourceDependents.Count);
foreach (var (source, type) in sourceDependents)
{
result.Add((source, type));
}
return result;
}
}
/// <summary>
/// Gets all chunks that the specified chunk depends on with their dependency types.
/// </summary>
public IEnumerable<(IChunk Chunk, DependencyType Type)> GetDependenciesWithTypes(IChunk chunk)
{
if (chunk == null)
return new List<(IChunk, DependencyType)>();
lock (_lock)
{
if (!_dependencies.TryGetValue(chunk, out var targetSet))
{
return new List<(IChunk, DependencyType)>();
}
var result = new List<(IChunk, DependencyType)>(targetSet.Count);
foreach (var (target, type) in targetSet)
{
result.Add((target, type));
}
return result;
}
}
/// <summary>
/// Checks if a dependency exists between source and target chunks.
/// </summary>
public bool HasDependency(IChunk source, IChunk target)
{
if (source == null || target == null)
return false;
lock (_lock)
{
if (!_dependencies.TryGetValue(source, out var targetSet))
{
return false;
}
foreach (var (t, _) in targetSet)
{
if (t.Equals(target))
{
return true;
}
}
return false;
}
}
/// <summary>
/// Gets the dependency type between source and target chunks, or null if no dependency exists.
/// </summary>
public DependencyType? GetDependencyType(IChunk source, IChunk target)
{
if (source == null || target == null)
return null;
lock (_lock)
{
if (!_dependencies.TryGetValue(source, out var targetSet))
{
return null;
}
foreach (var (t, type) in targetSet)
{
if (t.Equals(target))
{
return type;
}
}
return null;
}
}
/// <summary>
/// Clears all dependencies for a specific chunk.
/// </summary>
public void ClearDependencies(IChunk chunk)
{
if (chunk == null)
return;
lock (_lock)
{
// Remove all dependencies where this chunk is the source
if (_dependencies.TryGetValue(chunk, out var targetSet))
{
foreach (var (target, _) in targetSet)
{
if (_dependents.TryGetValue(target, out var depSources))
{
depSources.RemoveWhere(s => s.Source.Equals(chunk));
if (depSources.Count == 0)
{
_dependents.Remove(target);
}
}
}
_dependencies.Remove(chunk);
}
// Remove all dependencies where this chunk is the target
if (_dependents.TryGetValue(chunk, out var sourceSets))
{
foreach (var (source, _) in sourceSets)
{
if (_dependencies.TryGetValue(source, out var depTargets))
{
depTargets.RemoveWhere(t => t.Target.Equals(chunk));
if (depTargets.Count == 0)
{
_dependencies.Remove(source);
}
}
}
_dependents.Remove(chunk);
}
}
}
/// <summary>
/// Gets all chunks that have any dependencies.
/// </summary>
public IEnumerable<IChunk> GetAllChunksWithDependencies()
{
lock (_lock)
{
return new HashSet<IChunk>(_dependencies.Keys);
}
}
/// <summary>
/// Gets all chunks that have any dependents.
/// </summary>
public IEnumerable<IChunk> GetAllChunksWithDependents()
{
lock (_lock)
{
return new HashSet<IChunk>(_dependents.Keys);
}
}
}
}

View File

@ -0,0 +1,677 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
using AdvChkSys.Threading;
namespace AdvChkSys.Loading
{
/// <summary>
/// Manages prioritized loading of chunks.
/// </summary>
public class ChunkLoadingPriority
{
/// <summary>
/// Priority levels for chunk loading.
/// </summary>
public enum Priority
{
Immediate,
High,
Normal,
Low,
Background
}
/// <summary>
/// Represents a chunk loading request with priority.
/// </summary>
public class ChunkLoadRequest
{
/// <summary>
/// The X coordinate of the chunk.
/// </summary>
public int X { get; }
/// <summary>
/// The Y coordinate of the chunk.
/// </summary>
public int Y { get; }
/// <summary>
/// The Z coordinate of the chunk (optional, for 3D chunks).
/// </summary>
public int Z { get; }
/// <summary>
/// The priority of this loading request.
/// </summary>
public Priority LoadPriority { get; }
/// <summary>
/// The width of the chunk.
/// </summary>
public int Width { get; }
/// <summary>
/// The height of the chunk.
/// </summary>
public int Height { get; }
/// <summary>
/// The depth of the chunk (for 3D chunks).
/// </summary>
public int Depth { get; }
/// <summary>
/// Timestamp when the request was created.
/// </summary>
public DateTime Timestamp { get; }
/// <summary>
/// Unique identifier for this request.
/// </summary>
public Guid RequestId { get; }
/// <summary>
/// Creates a new 2D chunk loading request.
/// </summary>
public ChunkLoadRequest(int x, int y, int width, int height, Priority priority)
{
X = x;
Y = y;
Z = 0;
Width = width;
Height = height;
Depth = 1;
LoadPriority = priority;
Timestamp = DateTime.UtcNow;
RequestId = Guid.NewGuid();
}
/// <summary>
/// Creates a new 3D chunk loading request.
/// </summary>
public ChunkLoadRequest(int x, int y, int z, int width, int height, int depth, Priority priority)
{
X = x;
Y = y;
Z = z;
Width = width;
Height = height;
Depth = depth;
LoadPriority = priority;
Timestamp = DateTime.UtcNow;
RequestId = Guid.NewGuid();
}
}
// Priority queues for each priority level
private readonly Dictionary<Priority, Queue<ChunkLoadRequest>> _requestQueues = new();
// Lookup for fast cancellation and status checks
private readonly Dictionary<Guid, ChunkLoadRequest> _requestLookup = new();
// Active tasks to prevent duplicate loading
private readonly Dictionary<(int X, int Y, int Z), Task> _activeTasks = new();
// Synchronization
private readonly SemaphoreSlim _queueSemaphore = new(1, 1);
private readonly SemaphoreSlim _processSemaphore;
private readonly CancellationTokenSource _cancellationSource = new();
// Processing state
private bool _isProcessing;
private Task? _processingTask;
// Configuration
private readonly int _maxConcurrentLoads;
private readonly TimeSpan _requestTimeout;
/// <summary>
/// Event raised when a chunk load request is completed.
/// </summary>
public event EventHandler<ChunkLoadRequest>? RequestCompleted;
/// <summary>
/// Event raised when a chunk load request fails.
/// </summary>
public event EventHandler<(ChunkLoadRequest Request, Exception Exception)>? RequestFailed;
/// <summary>
/// Initializes a new instance of the ChunkLoadingPriority class.
/// </summary>
/// <param name="maxConcurrentLoads">Maximum number of chunks to load concurrently</param>
/// <param name="requestTimeoutSeconds">Timeout in seconds for chunk load requests</param>
public ChunkLoadingPriority(int maxConcurrentLoads = 4, int requestTimeoutSeconds = 30)
{
_maxConcurrentLoads = maxConcurrentLoads;
_requestTimeout = TimeSpan.FromSeconds(requestTimeoutSeconds);
_processSemaphore = new SemaphoreSlim(maxConcurrentLoads, maxConcurrentLoads);
// Initialize queues for each priority level
foreach (Priority priority in Enum.GetValues(typeof(Priority)))
{
_requestQueues[priority] = new Queue<ChunkLoadRequest>();
}
}
/// <summary>
/// Enqueues a chunk load request with the specified priority.
/// </summary>
/// <returns>The request object that can be used to track or cancel the request</returns>
public ChunkLoadRequest EnqueueRequest(int x, int y, int width, int height, Priority priority = Priority.Normal)
{
var request = new ChunkLoadRequest(x, y, width, height, priority);
EnqueueRequest(request);
return request;
}
/// <summary>
/// Enqueues a 3D chunk load request with the specified priority.
/// </summary>
/// <returns>The request object that can be used to track or cancel the request</returns>
public ChunkLoadRequest EnqueueRequest(int x, int y, int z, int width, int height, int depth, Priority priority = Priority.Normal)
{
var request = new ChunkLoadRequest(x, y, z, width, height, depth, priority);
EnqueueRequest(request);
return request;
}
/// <summary>
/// Enqueues a pre-created chunk load request.
/// </summary>
public void EnqueueRequest(ChunkLoadRequest request)
{
_queueSemaphore.Wait();
try
{
// Check if we already have a request for this chunk
var key = (request.X, request.Y, request.Z);
if (_activeTasks.ContainsKey(key))
{
return; // Already loading this chunk
}
// Add to appropriate queue
_requestQueues[request.LoadPriority].Enqueue(request);
_requestLookup[request.RequestId] = request;
// Start processing if not already running
EnsureProcessingStarted();
}
finally
{
_queueSemaphore.Release();
}
// For immediate priority, wait for processing to start
if (request.LoadPriority == Priority.Immediate)
{
TriggerImmediateProcessing();
}
}
/// <summary>
/// Cancels a pending chunk load request.
/// </summary>
/// <returns>True if the request was found and canceled, false otherwise</returns>
public bool CancelRequest(Guid requestId)
{
_queueSemaphore.Wait();
try
{
if (_requestLookup.TryGetValue(requestId, out var request))
{
_requestLookup.Remove(requestId);
// Note: We don't remove from the queue as that would be inefficient
// Instead, we'll skip it when we dequeue
return true;
}
return false;
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Cancels all pending chunk load requests.
/// </summary>
public void CancelAllRequests()
{
_queueSemaphore.Wait();
try
{
foreach (var queue in _requestQueues.Values)
{
queue.Clear();
}
_requestLookup.Clear();
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Starts the processing of chunk load requests if not already running.
/// </summary>
private void EnsureProcessingStarted()
{
if (_isProcessing)
return;
_isProcessing = true;
_processingTask = Task.Run(ProcessRequestsAsync);
}
/// <summary>
/// Triggers immediate processing of high-priority requests.
/// </summary>
private void TriggerImmediateProcessing()
{
// This is a hint to the processor to check for immediate requests now
// We don't need to do anything special as the processor checks immediate first
}
/// <summary>
/// Main processing loop for chunk load requests.
/// </summary>
private async Task ProcessRequestsAsync()
{
while (!_cancellationSource.IsCancellationRequested)
{
ChunkLoadRequest? request = null;
// Get the next request from the highest priority queue
await _queueSemaphore.WaitAsync();
try
{
request = DequeueNextRequest();
if (request == null)
{
// No requests to process, pause briefly
_isProcessing = false;
_queueSemaphore.Release();
await Task.Delay(50);
continue;
}
// Mark this chunk as being processed
var key = (request.X, request.Y, request.Z);
if (_activeTasks.ContainsKey(key))
{
// Already processing this chunk, skip
continue;
}
// Create a task for this request but don't start it yet
var loadTask = ProcessRequestAsync(request);
_activeTasks[key] = loadTask;
}
finally
{
if (_isProcessing)
{
_queueSemaphore.Release();
}
}
// Wait for a processing slot
await _processSemaphore.WaitAsync();
// Start the task and continue without waiting
_ = Task.Run(async () =>
{
try
{
await ProcessRequestAsync(request!);
}
finally
{
_processSemaphore.Release();
// Remove from active tasks
await _queueSemaphore.WaitAsync();
try
{
_activeTasks.Remove((request!.X, request.Y, request.Z));
}
finally
{
_queueSemaphore.Release();
}
}
});
}
}
/// <summary>
/// Dequeues the next request from the highest priority queue.
/// </summary>
private ChunkLoadRequest? DequeueNextRequest()
{
// Check each priority level in order
foreach (Priority priority in Enum.GetValues(typeof(Priority)))
{
var queue = _requestQueues[priority];
while (queue.Count > 0)
{
var request = queue.Dequeue();
// Skip if the request has been canceled
if (!_requestLookup.ContainsKey(request.RequestId))
continue;
// Skip if the request has timed out
if (DateTime.UtcNow - request.Timestamp > _requestTimeout)
{
_requestLookup.Remove(request.RequestId);
continue;
}
// Valid request found
return request;
}
}
return null; // No valid requests found
}
/// <summary>
/// Processes a single chunk load request.
/// </summary>
private async Task ProcessRequestAsync(ChunkLoadRequest request)
{
try
{
// This would be integrated with your chunk manager
// For now, we'll just simulate the loading with a delay
// Simulate different loading times based on priority
int delayMs = request.LoadPriority switch
{
Priority.Immediate => 50,
Priority.High => 100,
Priority.Normal => 200,
Priority.Low => 500,
Priority.Background => 1000,
_ => 200
};
await Task.Delay(delayMs);
// Notify completion
RequestCompleted?.Invoke(this, request);
}
catch (Exception ex)
{
// Notify failure
RequestFailed?.Invoke(this, (request, ex));
}
finally
{
// Remove from lookup
await _queueSemaphore.WaitAsync();
try
{
_requestLookup.Remove(request.RequestId);
}
finally
{
_queueSemaphore.Release();
}
}
}
/// <summary>
/// Gets the number of pending requests for each priority level.
/// </summary>
public Dictionary<Priority, int> GetQueueSizes()
{
_queueSemaphore.Wait();
try
{
var result = new Dictionary<Priority, int>();
foreach (var kvp in _requestQueues)
{
result[kvp.Key] = kvp.Value.Count;
}
return result;
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Gets the total number of pending requests.
/// </summary>
public int GetTotalQueueSize()
{
_queueSemaphore.Wait();
try
{
int total = 0;
foreach (var queue in _requestQueues.Values)
{
total += queue.Count;
}
return total;
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Stops processing and releases resources.
/// </summary>
public async Task ShutdownAsync()
{
_cancellationSource.Cancel();
if (_processingTask != null)
{
try
{
await _processingTask;
}
catch (OperationCanceledException)
{
// Expected
}
catch (Exception)
{
// Ignore other exceptions during shutdown
}
}
_queueSemaphore.Dispose();
_processSemaphore.Dispose();
_cancellationSource.Dispose();
}
/// <summary>
/// Creates a ChunkLoadingPriority instance with default settings.
/// </summary>
public static ChunkLoadingPriority CreateDefault()
{
return new ChunkLoadingPriority();
}
/// <summary>
/// Creates a ChunkLoadingPriority instance optimized for high throughput.
/// </summary>
public static ChunkLoadingPriority CreateHighThroughput()
{
return new ChunkLoadingPriority(
maxConcurrentLoads: Environment.ProcessorCount * 2,
requestTimeoutSeconds: 60);
}
/// <summary>
/// Creates a ChunkLoadingPriority instance optimized for low latency.
/// </summary>
public static ChunkLoadingPriority CreateLowLatency()
{
return new ChunkLoadingPriority(
maxConcurrentLoads: Math.Max(2, Environment.ProcessorCount / 2),
requestTimeoutSeconds: 15);
}
/// <summary>
/// Integrates with a chunk manager to process load requests.
/// </summary>
/// <typeparam name="T">The type of data in the chunks</typeparam>
/// <param name="chunkManager">The chunk manager to use for loading chunks</param>
public void IntegrateWith<T>(IChunkManager chunkManager)
{
// Replace the ProcessRequestAsync method with one that uses the chunk manager
// This is a simplified example - in a real implementation, you'd need to handle
// both 2D and 3D chunk managers and properly cast the interface
RequestCompleted += (sender, request) =>
{
// The chunk has been loaded, you might want to do something with it
var chunk = chunkManager.GetChunk(request.X, request.Y);
// Additional processing if needed
};
}
/// <summary>
/// Gets the estimated time until a request with the given priority would be processed.
/// </summary>
/// <returns>Estimated wait time in milliseconds, or -1 if cannot be determined</returns>
public int GetEstimatedWaitTime(Priority priority)
{
_queueSemaphore.Wait();
try
{
int totalHigherPriorityRequests = 0;
// Count requests with higher or equal priority
foreach (Priority p in Enum.GetValues(typeof(Priority)))
{
if (p <= priority)
{
totalHigherPriorityRequests += _requestQueues[p].Count;
}
}
// If no requests or no active tasks, return 0
if (totalHigherPriorityRequests == 0 || _activeTasks.Count == 0)
{
return 0;
}
// Estimate based on current processing rate
// This is a very simple estimate and could be improved with actual metrics
int averageProcessingTimeMs = priority switch
{
Priority.Immediate => 50,
Priority.High => 100,
Priority.Normal => 200,
Priority.Low => 500,
Priority.Background => 1000,
_ => 200
};
// Calculate how many batches of concurrent requests we'll need
int batches = (int)Math.Ceiling(totalHigherPriorityRequests / (double)_maxConcurrentLoads);
return batches * averageProcessingTimeMs;
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Adjusts the priority of an existing request.
/// </summary>
/// <returns>True if the request was found and its priority adjusted, false otherwise</returns>
public bool AdjustPriority(Guid requestId, Priority newPriority)
{
// Note: This is not an efficient operation as we don't directly modify the queue
// Instead, we mark the request for priority change when it's processed
_queueSemaphore.Wait();
try
{
if (_requestLookup.TryGetValue(requestId, out var request))
{
// If the request is of lower priority than requested, we'll re-queue it
if (request.LoadPriority > newPriority)
{
// Remove from lookup (it will be skipped when dequeued)
_requestLookup.Remove(requestId);
// Create a new request with the same parameters but higher priority
var newRequest = request.Depth > 1
? new ChunkLoadRequest(request.X, request.Y, request.Z, request.Width, request.Height, request.Depth, newPriority)
: new ChunkLoadRequest(request.X, request.Y, request.Width, request.Height, newPriority);
// Add the new request
_requestQueues[newPriority].Enqueue(newRequest);
_requestLookup[newRequest.RequestId] = newRequest;
// If upgrading to immediate, trigger processing
if (newPriority == Priority.Immediate)
{
TriggerImmediateProcessing();
}
}
return true;
}
return false;
}
finally
{
_queueSemaphore.Release();
}
}
/// <summary>
/// Gets statistics about the current state of the chunk loading system.
/// </summary>
public Dictionary<string, object> GetStatistics()
{
_queueSemaphore.Wait();
try
{
var stats = new Dictionary<string, object>
{
["TotalPendingRequests"] = _requestLookup.Count,
["ActiveTasks"] = _activeTasks.Count,
["IsProcessing"] = _isProcessing,
["MaxConcurrentLoads"] = _maxConcurrentLoads
};
// Add queue sizes for each priority
foreach (Priority priority in Enum.GetValues(typeof(Priority)))
{
stats[$"Queue_{priority}"] = _requestQueues[priority].Count;
}
return stats;
}
finally
{
_queueSemaphore.Release();
}
}
}
}

View File

@ -0,0 +1,40 @@
#nullable enable
using AdvChkSys.Chunk;
namespace AdvChkSys.Spatial
{
/// <summary>
/// Provides extension methods for chunks to work with spatial indexing.
/// </summary>
public static class ChunkExtensions
{
/// <summary>
/// Converts a Chunk3D to an IChunk3D.
/// </summary>
public static IChunk3D AsIChunk3D<T>(this Chunk3D<T> chunk)
{
return new Chunk3DAdapter<T>(chunk);
}
/// <summary>
/// Adapter to make Chunk3D implement IChunk3D.
/// </summary>
private class Chunk3DAdapter<T> : IChunk3D
{
private readonly Chunk3D<T> _chunk;
public Chunk3DAdapter(Chunk3D<T> chunk)
{
_chunk = chunk;
}
public int X => _chunk.X;
public int Y => _chunk.Y;
public int Z => _chunk.Z;
public int Width => _chunk.Width;
public int Height => _chunk.Height;
public int Depth => _chunk.Depth;
public System.Collections.Generic.Dictionary<string, object> Metadata => _chunk.Metadata;
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,167 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides asynchronous locking for chunks.
/// </summary>
public class ChunkAsyncLock : IDisposable
{
// Lock objects for each chunk
private readonly ConcurrentDictionary<IChunk, SemaphoreSlim> _locks = new();
// Timer for cleanup
private readonly Timer _cleanupTimer;
// Last access time for each lock
private readonly ConcurrentDictionary<IChunk, DateTime> _lastAccessTime = new();
// Cleanup interval in minutes
private readonly int _cleanupIntervalMinutes;
/// <summary>
/// Initializes a new instance of the ChunkAsyncLock class.
/// </summary>
/// <param name="cleanupIntervalMinutes">Interval in minutes for cleaning up unused locks</param>
public ChunkAsyncLock(int cleanupIntervalMinutes = 10)
{
_cleanupIntervalMinutes = cleanupIntervalMinutes;
_cleanupTimer = new Timer(CleanupUnusedLocks, null,
TimeSpan.FromMinutes(cleanupIntervalMinutes),
TimeSpan.FromMinutes(cleanupIntervalMinutes));
}
/// <summary>
/// Acquires a lock on a chunk asynchronously.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public async Task<IDisposable> LockAsync(IChunk chunk, CancellationToken cancellationToken = default)
{
var semaphore = _locks.GetOrAdd(chunk, _ => new SemaphoreSlim(1, 1));
_lastAccessTime[chunk] = DateTime.UtcNow;
try
{
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// If canceled, check if we need to clean up the semaphore
if (semaphore.CurrentCount == 1)
{
// No one is waiting, try to remove
_locks.TryRemove(chunk, out _);
_lastAccessTime.TryRemove(chunk, out _);
semaphore.Dispose();
}
throw;
}
return new LockReleaser(semaphore, chunk, this);
}
/// <summary>
/// Tries to acquire a lock on a chunk asynchronously with a timeout.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="timeout">Timeout for acquiring the lock</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A tuple with a boolean indicating success and the lock releaser if successful</returns>
public async Task<(bool Success, IDisposable? LockReleaser)> TryLockAsync(
IChunk chunk,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
if (chunk == null)
throw new ArgumentNullException(nameof(chunk));
var semaphore = _locks.GetOrAdd(chunk, _ => new SemaphoreSlim(1, 1));
_lastAccessTime[chunk] = DateTime.UtcNow;
if (await semaphore.WaitAsync(timeout, cancellationToken).ConfigureAwait(false))
{
return (true, new LockReleaser(semaphore, chunk, this));
}
return (false, null);
}
/// <summary>
/// Cleans up unused locks.
/// </summary>
private void CleanupUnusedLocks(object? state)
{
var now = DateTime.UtcNow;
var threshold = now.AddMinutes(-_cleanupIntervalMinutes);
foreach (var chunk in _lastAccessTime.Keys)
{
if (_lastAccessTime.TryGetValue(chunk, out var lastAccess) &&
lastAccess < threshold &&
_locks.TryGetValue(chunk, out var semaphore))
{
// Only remove if no one is waiting
if (semaphore.CurrentCount == 1)
{
if (_locks.TryRemove(chunk, out var removedSemaphore))
{
_lastAccessTime.TryRemove(chunk, out _);
removedSemaphore.Dispose();
}
}
}
}
}
/// <summary>
/// Disposes resources.
/// </summary>
public void Dispose()
{
_cleanupTimer.Dispose();
foreach (var semaphore in _locks.Values)
{
semaphore.Dispose();
}
_locks.Clear();
_lastAccessTime.Clear();
}
/// <summary>
/// Releases a lock when disposed.
/// </summary>
private class LockReleaser : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly IChunk _chunk;
private readonly ChunkAsyncLock _parent;
private bool _disposed;
public LockReleaser(SemaphoreSlim semaphore, IChunk chunk, ChunkAsyncLock parent)
{
_semaphore = semaphore;
_chunk = chunk;
_parent = parent;
}
public void Dispose()
{
if (!_disposed)
{
_semaphore.Release();
_parent._lastAccessTime[_chunk] = DateTime.UtcNow;
_disposed = true;
}
}
}
}
}

View File

@ -0,0 +1,274 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides a queue for sequential operations on chunks.
/// </summary>
public class ChunkOperationQueue
{
// Queue of pending operations for each chunk
private readonly ConcurrentDictionary<IChunk, Queue<(Func<Task> Operation, TaskCompletionSource<object?> Completion)>> _pendingOperations = new();
// Currently active operations
private readonly ConcurrentDictionary<IChunk, Task> _activeOperations = new();
// Semaphore to limit concurrent operations
private readonly SemaphoreSlim _semaphore;
// Cancellation for shutdown
private readonly CancellationTokenSource _shutdownCts = new();
/// <summary>
/// Initializes a new instance of the ChunkOperationQueue class.
/// </summary>
/// <param name="maxConcurrentOperations">Maximum number of concurrent operations</param>
public ChunkOperationQueue(int maxConcurrentOperations = 4)
{
_semaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
}
/// <summary>
/// Enqueues an operation to be performed on a chunk.
/// </summary>
/// <param name="chunk">The chunk to operate on</param>
/// <param name="operation">The operation to perform</param>
/// <returns>A task that completes when the operation is done</returns>
public Task EnqueueOperationAsync(IChunk chunk, Func<Task> operation)
{
if (chunk == null)
throw new ArgumentNullException(nameof(chunk));
if (operation == null)
throw new ArgumentNullException(nameof(operation));
// Create a completion source for this operation
var completion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
// Get or create the queue for this chunk
var queue = _pendingOperations.GetOrAdd(chunk, _ => new Queue<(Func<Task>, TaskCompletionSource<object?>)>());
// Add the operation to the queue
lock (queue)
{
queue.Enqueue((operation, completion));
// If this is the only operation, start processing
if (queue.Count == 1 && !_activeOperations.ContainsKey(chunk))
{
StartProcessingChunkOperations(chunk);
}
}
return completion.Task;
}
/// <summary>
/// Starts processing operations for a chunk.
/// </summary>
private async void StartProcessingChunkOperations(IChunk chunk)
{
// Wait for a semaphore slot
await _semaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
try
{
// Process operations until the queue is empty
while (!_shutdownCts.IsCancellationRequested)
{
// Get the next operation
(Func<Task> operation, TaskCompletionSource<object?> completion) nextOperation;
var queue = _pendingOperations.GetOrAdd(chunk, _ => new Queue<(Func<Task>, TaskCompletionSource<object?>)>());
lock (queue)
{
if (queue.Count == 0)
{
// No more operations, remove the active task
_activeOperations.TryRemove(chunk, out _);
break;
}
nextOperation = queue.Peek();
}
// Execute the operation
var task = ExecuteOperationAsync(chunk, nextOperation.operation, nextOperation.completion);
_activeOperations[chunk] = task;
// Wait for completion
await task.ConfigureAwait(false);
// Remove the completed operation
lock (queue)
{
queue.Dequeue();
}
}
}
catch (OperationCanceledException)
{
// Shutdown requested
}
finally
{
// Release the semaphore
_semaphore.Release();
}
}
/// <summary>
/// Executes an operation and completes the task.
/// </summary>
private async Task ExecuteOperationAsync(IChunk chunk, Func<Task> operation, TaskCompletionSource<object?> completion)
{
try
{
// Track the operation for diagnostics
var operationId = ChunkThreadingDiagnostics.TrackOperationStart("ChunkOperation", chunk);
try
{
// Execute the operation
await operation().ConfigureAwait(false);
// Complete the task
completion.TrySetResult(null);
}
catch (Exception ex)
{
// Set the exception
completion.TrySetException(ex);
// Log the error
ChunkThreadingDiagnostics.LogEvent("OperationError", $"Error in chunk operation: {ex.Message}");
}
finally
{
// End tracking
ChunkThreadingDiagnostics.TrackOperationEnd(operationId);
}
}
catch (Exception ex)
{
// This should never happen, but just in case
completion.TrySetException(ex);
// Log the error
ChunkThreadingDiagnostics.LogEvent("CriticalError", $"Critical error in operation execution: {ex.Message}");
}
}
/// <summary>
/// Gets the number of pending operations for a chunk.
/// </summary>
public int GetPendingOperationCount(IChunk chunk)
{
if (_pendingOperations.TryGetValue(chunk, out var queue))
{
lock (queue)
{
return queue.Count;
}
}
return 0;
}
/// <summary>
/// Gets the total number of pending operations.
/// </summary>
public int PendingOperationCount
{
get
{
int count = 0;
foreach (var queue in _pendingOperations.Values)
{
lock (queue)
{
count += queue.Count;
}
}
return count;
}
}
/// <summary>
/// Gets the number of active operations.
/// </summary>
public int ActiveOperationCount => _activeOperations.Count;
/// <summary>
/// Cancels all pending operations for a chunk.
/// </summary>
/// <param name="chunk">The chunk to cancel operations for</param>
/// <returns>The number of operations canceled</returns>
public int CancelOperations(IChunk chunk)
{
if (_pendingOperations.TryGetValue(chunk, out var queue))
{
lock (queue)
{
int count = queue.Count;
// Cancel all pending operations
while (queue.Count > 0)
{
var operation = queue.Dequeue();
operation.Completion.TrySetCanceled();
}
return count;
}
}
return 0;
}
/// <summary>
/// Cancels all pending operations.
/// </summary>
/// <returns>The number of operations canceled</returns>
public int CancelAllOperations()
{
int count = 0;
foreach (var chunk in _pendingOperations.Keys)
{
count += CancelOperations(chunk);
}
return count;
}
/// <summary>
/// Shuts down the operation queue.
/// </summary>
public async Task ShutdownAsync()
{
// Cancel all operations
_shutdownCts.Cancel();
// Cancel all pending operations
CancelAllOperations();
// Wait for active operations to complete
var tasks = new List<Task>(_activeOperations.Values);
if (tasks.Count > 0)
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
// Dispose resources
_shutdownCts.Dispose();
_semaphore.Dispose();
}
}
}

View File

@ -0,0 +1,252 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides advanced parallel processing for chunks.
/// </summary>
public static class ChunkParallelProcessor
{
/// <summary>
/// Processes chunks with dependency awareness.
/// </summary>
/// <param name="chunks">The chunks to process</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="getDependencies">Function to get dependencies for a chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksWithDependenciesAsync(
IEnumerable<IChunk> chunks,
Func<IChunk, Task> processor,
Func<IChunk, IEnumerable<IChunk>> getDependencies,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
// Build dependency graph
var dependencyGraph = BuildDependencyGraph(chunks, getDependencies);
// Process in dependency order
await ProcessDependencyGraphAsync(
dependencyGraph,
processor,
maxDegreeOfParallelism,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Builds a dependency graph for chunks.
/// </summary>
private static DependencyGraph BuildDependencyGraph(
IEnumerable<IChunk> chunks,
Func<IChunk, IEnumerable<IChunk>> getDependencies)
{
var graph = new DependencyGraph();
// Add all chunks to the graph
foreach (var chunk in chunks)
{
graph.AddNode(chunk);
}
// Add dependencies
foreach (var chunk in chunks)
{
var dependencies = getDependencies(chunk);
foreach (var dependency in dependencies)
{
graph.AddDependency(chunk, dependency);
}
}
return graph;
}
/// <summary>
/// Processes a dependency graph in parallel.
/// </summary>
private static async Task ProcessDependencyGraphAsync(
DependencyGraph graph,
Func<IChunk, Task> processor,
int? maxDegreeOfParallelism,
CancellationToken cancellationToken)
{
// Set up semaphore for parallelism control
var semaphore = new SemaphoreSlim(
maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism);
// Track completed chunks
var completed = new ConcurrentDictionary<IChunk, bool>();
// Get initial set of chunks with no dependencies
var readyChunks = new ConcurrentQueue<IChunk>(graph.GetNodesWithNoDependencies());
// Track active tasks
var activeTasks = new ConcurrentDictionary<IChunk, Task>();
// Process until all chunks are completed
while (!readyChunks.IsEmpty || activeTasks.Count > 0)
{
// Check for cancellation
cancellationToken.ThrowIfCancellationRequested();
// Start processing ready chunks
while (readyChunks.TryDequeue(out var chunk))
{
// Wait for a semaphore slot
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Start processing this chunk
var task = ProcessChunkAsync(
chunk,
processor,
graph,
completed,
readyChunks,
semaphore,
cancellationToken);
activeTasks[chunk] = task;
// When the task completes, remove it from active tasks
_ = task.ContinueWith(_ =>
{
activeTasks.TryRemove(chunk, out _);
}, TaskContinuationOptions.ExecuteSynchronously);
}
// If no ready chunks but active tasks, wait for one to complete
if (readyChunks.IsEmpty && activeTasks.Count > 0)
{
await Task.WhenAny(activeTasks.Values).ConfigureAwait(false);
}
}
// Clean up
semaphore.Dispose();
}
/// <summary>
/// Processes a single chunk and updates the ready queue.
/// </summary>
private static async Task ProcessChunkAsync(
IChunk chunk,
Func<IChunk, Task> processor,
DependencyGraph graph,
ConcurrentDictionary<IChunk, bool> completed,
ConcurrentQueue<IChunk> readyChunks,
SemaphoreSlim semaphore,
CancellationToken cancellationToken)
{
try
{
// Process the chunk
await processor(chunk).ConfigureAwait(false);
// Mark as completed
completed[chunk] = true;
// Find dependents that are now ready
var dependents = graph.GetDependents(chunk);
foreach (var dependent in dependents)
{
// Check if all dependencies are completed
var dependencies = graph.GetDependencies(dependent);
if (dependencies.All(d => completed.ContainsKey(d)))
{
// All dependencies completed, add to ready queue
readyChunks.Enqueue(dependent);
}
}
}
finally
{
// Release the semaphore
semaphore.Release();
}
}
/// <summary>
/// Represents a dependency graph for chunks.
/// </summary>
private class DependencyGraph
{
// Map of chunk to its dependencies
private readonly Dictionary<IChunk, HashSet<IChunk>> _dependencies = new();
// Map of chunk to chunks that depend on it
private readonly Dictionary<IChunk, HashSet<IChunk>> _dependents = new();
/// <summary>
/// Adds a node to the graph.
/// </summary>
public void AddNode(IChunk chunk)
{
if (!_dependencies.ContainsKey(chunk))
{
_dependencies[chunk] = new HashSet<IChunk>();
}
if (!_dependents.ContainsKey(chunk))
{
_dependents[chunk] = new HashSet<IChunk>();
}
}
/// <summary>
/// Adds a dependency between two nodes.
/// </summary>
public void AddDependency(IChunk dependent, IChunk dependency)
{
// Add nodes if they don't exist
AddNode(dependent);
AddNode(dependency);
// Add dependency
_dependencies[dependent].Add(dependency);
// Add dependent
_dependents[dependency].Add(dependent);
}
/// <summary>
/// Gets all nodes with no dependencies.
/// </summary>
public IEnumerable<IChunk> GetNodesWithNoDependencies()
{
return _dependencies.Where(kvp => kvp.Value.Count == 0).Select(kvp => kvp.Key);
}
/// <summary>
/// Gets all dependencies of a node.
/// </summary>
public IEnumerable<IChunk> GetDependencies(IChunk chunk)
{
if (_dependencies.TryGetValue(chunk, out var dependencies))
{
return dependencies;
}
return Enumerable.Empty<IChunk>();
}
/// <summary>
/// Gets all dependents of a node.
/// </summary>
public IEnumerable<IChunk> GetDependents(IChunk chunk)
{
if (_dependents.TryGetValue(chunk, out var dependents))
{
return dependents;
}
return Enumerable.Empty<IChunk>();
}
}
}
}

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -10,6 +12,18 @@ namespace AdvChkSys.Threading
/// </summary>
public static class ChunkTaskScheduler
{
// Default maximum degree of parallelism
private static int _maxDegreeOfParallelism = Environment.ProcessorCount;
/// <summary>
/// Gets or sets the maximum degree of parallelism for batch operations.
/// </summary>
public static int MaxDegreeOfParallelism
{
get => _maxDegreeOfParallelism;
set => _maxDegreeOfParallelism = value > 0 ? value : Environment.ProcessorCount;
}
/// <summary>
/// Runs the given action asynchronously on the thread pool.
/// </summary>
@ -41,5 +55,256 @@ namespace AdvChkSys.Threading
{
return Task.Run(func, cancellationToken);
}
/// <summary>
/// Creates a cancellation token that cancels after a timeout.
/// </summary>
/// <param name="timeout">The timeout</param>
/// <param name="cancellationToken">Optional token to combine with the timeout</param>
/// <returns>A cancellation token that cancels after the timeout or when the input token is canceled</returns>
public static CancellationToken CreateTimeoutToken(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
if (timeout == Timeout.InfiniteTimeSpan && cancellationToken == CancellationToken.None)
return CancellationToken.None;
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
if (timeout != Timeout.InfiniteTimeSpan)
cts.CancelAfter(timeout);
return cts.Token;
}
/// <summary>
/// Runs a task with a timeout.
/// </summary>
/// <param name="task">The task to run</param>
/// <param name="timeout">The timeout</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>The task result</returns>
/// <exception cref="TimeoutException">Thrown if the task times out</exception>
public static async Task<T> WithTimeoutAsync<T>(
Task<T> task,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var timeoutCts = new CancellationTokenSource(timeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
timeoutCts.Token, cancellationToken);
var completedTask = await Task.WhenAny(task, Task.Delay(timeout, linkedCts.Token))
.ConfigureAwait(false);
if (completedTask == task)
{
return await task.ConfigureAwait(false);
}
throw new TimeoutException($"The operation timed out after {timeout.TotalMilliseconds}ms");
}
/// <summary>
/// Runs a task with a timeout.
/// </summary>
/// <param name="task">The task to run</param>
/// <param name="timeout">The timeout</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <exception cref="TimeoutException">Thrown if the task times out</exception>
public static async Task WithTimeoutAsync(
Task task,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var timeoutCts = new CancellationTokenSource(timeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
timeoutCts.Token, cancellationToken);
var completedTask = await Task.WhenAny(task, Task.Delay(timeout, linkedCts.Token))
.ConfigureAwait(false);
if (completedTask == task)
{
await task.ConfigureAwait(false);
return;
}
throw new TimeoutException($"The operation timed out after {timeout.TotalMilliseconds}ms");
}
/// <summary>
/// Runs a task with a fallback value if it times out.
/// </summary>
/// <typeparam name="T">The type of result</typeparam>
/// <param name="task">The task to run</param>
/// <param name="timeout">The timeout</param>
/// <param name="fallbackValue">The fallback value to return if the task times out</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>The task result or the fallback value if the task times out</returns>
public static async Task<T> WithTimeoutOrDefaultAsync<T>(
Task<T> task,
TimeSpan timeout,
T fallbackValue,
CancellationToken cancellationToken = default)
{
try
{
return await WithTimeoutAsync(task, timeout, cancellationToken).ConfigureAwait(false);
}
catch (TimeoutException)
{
return fallbackValue;
}
}
/// <summary>
/// Runs multiple actions in parallel with a limit on the degree of parallelism.
/// </summary>
public static Task RunBatchParallelAsync(IEnumerable<Action> actions, int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
return Task.Run(() =>
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? MaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
Parallel.ForEach(actions, options, action => action());
}, cancellationToken);
}
/// <summary>
/// Runs a batch of functions in parallel and returns the results.
/// </summary>
/// <typeparam name="T">The type of result</typeparam>
/// <param name="functions">The functions to run</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The results in the same order as the functions</returns>
public static async Task<T[]> RunBatchParallelAsync<T>(
IEnumerable<Func<T>> functions,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var funcs = functions.ToArray();
var results = new T[funcs.Length];
await Task.Run(() =>
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? MaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
Parallel.For(0, funcs.Length, options, i =>
{
results[i] = funcs[i]();
});
}, cancellationToken).ConfigureAwait(false);
return results;
}
/// <summary>
/// Runs a batch of async functions in parallel.
/// </summary>
/// <param name="functions">The async functions to run</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task RunBatchParallelAsync(
IEnumerable<Func<Task>> functions,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var funcs = functions.ToArray();
if (funcs.Length == 0)
return;
// Use SemaphoreSlim to limit concurrency
using var semaphore = new SemaphoreSlim(
maxDegreeOfParallelism ?? MaxDegreeOfParallelism);
// Create tasks for all functions
var tasks = new List<Task>(funcs.Length);
foreach (var func in funcs)
{
// Wait for a slot in the semaphore
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Create a task that releases the semaphore when done
var task = Task.Run(async () =>
{
try
{
await func().ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}, cancellationToken);
tasks.Add(task);
}
// Wait for all tasks to complete
await Task.WhenAll(tasks).ConfigureAwait(false);
}
/// <summary>
/// Runs a batch of async functions in parallel and returns the results.
/// </summary>
/// <typeparam name="T">The type of result</typeparam>
/// <param name="functions">The async functions to run</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The results in the same order as the functions</returns>
public static async Task<T[]> RunBatchParallelAsync<T>(
IEnumerable<Func<Task<T>>> functions,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var funcs = functions.ToArray();
if (funcs.Length == 0)
return Array.Empty<T>();
// Use SemaphoreSlim to limit concurrency
using var semaphore = new SemaphoreSlim(
maxDegreeOfParallelism ?? MaxDegreeOfParallelism);
// Create tasks for all functions
var tasks = new Task<T>[funcs.Length];
for (int i = 0; i < funcs.Length; i++)
{
var func = funcs[i];
var index = i;
// Wait for a slot in the semaphore
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Create a task that releases the semaphore when done
tasks[index] = Task.Run(async () =>
{
try
{
return await func().ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}, cancellationToken);
}
// Wait for all tasks to complete
return await Task.WhenAll(tasks).ConfigureAwait(false);
}
}
}

View File

@ -0,0 +1,237 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides synchronous locking for chunks.
/// </summary>
public class ChunkThreadSafetyManager : IDisposable
{
// Lock objects for each chunk
private readonly ConcurrentDictionary<IChunk, ChunkLock> _locks = new();
// Timer for cleanup
private readonly Timer _cleanupTimer;
// Last access time for each lock
private readonly ConcurrentDictionary<IChunk, DateTime> _lastAccessTime = new();
// Cleanup interval in minutes
private readonly int _cleanupIntervalMinutes;
/// <summary>
/// Initializes a new instance of the ChunkThreadSafetyManager class.
/// </summary>
/// <param name="cleanupIntervalMinutes">Interval in minutes for cleaning up unused locks</param>
public ChunkThreadSafetyManager(int cleanupIntervalMinutes = 10)
{
_cleanupIntervalMinutes = cleanupIntervalMinutes;
_cleanupTimer = new Timer(CleanupUnusedLocks, null,
TimeSpan.FromMinutes(cleanupIntervalMinutes),
TimeSpan.FromMinutes(cleanupIntervalMinutes));
}
/// <summary>
/// Acquires an exclusive lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireLock(IChunk chunk)
{
var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock());
_lastAccessTime[chunk] = DateTime.UtcNow;
lockObj.EnterWriteLock();
return new LockReleaser(lockObj, chunk, this, LockType.Write);
}
/// <summary>
/// Acquires a read lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireReadLock(IChunk chunk)
{
var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock());
_lastAccessTime[chunk] = DateTime.UtcNow;
lockObj.EnterReadLock();
return new LockReleaser(lockObj, chunk, this, LockType.Read);
}
/// <summary>
/// Acquires a write lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireWriteLock(IChunk chunk)
{
var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock());
_lastAccessTime[chunk] = DateTime.UtcNow;
lockObj.EnterWriteLock();
return new LockReleaser(lockObj, chunk, this, LockType.Write);
}
/// <summary>
/// Tries to acquire a read lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="timeout">The timeout</param>
/// <returns>A disposable that releases the lock when disposed, or null if the lock could not be acquired</returns>
public IDisposable? TryAcquireReadLock(IChunk chunk, TimeSpan timeout)
{
var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock());
_lastAccessTime[chunk] = DateTime.UtcNow;
if (lockObj.TryEnterReadLock(timeout))
{
return new LockReleaser(lockObj, chunk, this, LockType.Read);
}
// Track contention
ChunkThreadingDiagnostics.TrackLockContention(chunk);
return null;
}
/// <summary>
/// Tries to acquire a write lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="timeout">The timeout</param>
/// <returns>A disposable that releases the lock when disposed, or null if the lock could not be acquired</returns>
public IDisposable? TryAcquireWriteLock(IChunk chunk, TimeSpan timeout)
{
var lockObj = _locks.GetOrAdd(chunk, _ => new ChunkLock());
_lastAccessTime[chunk] = DateTime.UtcNow;
if (lockObj.TryEnterWriteLock(timeout))
{
return new LockReleaser(lockObj, chunk, this, LockType.Write);
}
// Track contention
ChunkThreadingDiagnostics.TrackLockContention(chunk);
return null;
}
/// <summary>
/// Cleans up unused locks.
/// </summary>
private void CleanupUnusedLocks(object? state)
{
var now = DateTime.UtcNow;
var threshold = now.AddMinutes(-_cleanupIntervalMinutes);
foreach (var chunk in _lastAccessTime.Keys)
{
if (_lastAccessTime.TryGetValue(chunk, out var lastAccess) &&
lastAccess < threshold &&
_locks.TryGetValue(chunk, out var lockObj))
{
// Only remove if no one is using the lock
if (!lockObj.IsReadLockHeld && !lockObj.IsWriteLockHeld)
{
if (_locks.TryRemove(chunk, out var removedLock))
{
_lastAccessTime.TryRemove(chunk, out _);
removedLock.Dispose();
}
}
}
}
}
/// <summary>
/// Disposes resources.
/// </summary>
public void Dispose()
{
_cleanupTimer.Dispose();
foreach (var lockObj in _locks.Values)
{
lockObj.Dispose();
}
_locks.Clear();
_lastAccessTime.Clear();
}
/// <summary>
/// Type of lock.
/// </summary>
private enum LockType
{
Read,
Write
}
/// <summary>
/// Wrapper around ReaderWriterLockSlim.
/// </summary>
private class ChunkLock : IDisposable
{
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion);
public void EnterReadLock() => _lock.EnterReadLock();
public void ExitReadLock() => _lock.ExitReadLock();
public void EnterWriteLock() => _lock.EnterWriteLock();
public void ExitWriteLock() => _lock.ExitWriteLock();
public bool TryEnterReadLock(TimeSpan timeout) => _lock.TryEnterReadLock(timeout);
public bool TryEnterWriteLock(TimeSpan timeout) => _lock.TryEnterWriteLock(timeout);
public bool IsReadLockHeld => _lock.IsReadLockHeld;
public bool IsWriteLockHeld => _lock.IsWriteLockHeld;
public void Dispose() => _lock.Dispose();
}
/// <summary>
/// Releases a lock when disposed.
/// </summary>
private class LockReleaser : IDisposable
{
private readonly ChunkLock _lock;
private readonly IChunk _chunk;
private readonly ChunkThreadSafetyManager _parent;
private readonly LockType _lockType;
private bool _disposed;
public LockReleaser(ChunkLock lockObj, IChunk chunk, ChunkThreadSafetyManager parent, LockType lockType)
{
_lock = lockObj;
_chunk = chunk;
_parent = parent;
_lockType = lockType;
}
public void Dispose()
{
if (!_disposed)
{
if (_lockType == LockType.Read)
{
_lock.ExitReadLock();
}
else
{
_lock.ExitWriteLock();
}
_parent._lastAccessTime[_chunk] = DateTime.UtcNow;
_disposed = true;
}
}
}
}
}

View File

@ -0,0 +1,98 @@
using System;
using System.Threading;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides centralized configuration for threading in the chunk system.
/// </summary>
public static class ChunkThreadingConfiguration
{
private static int _defaultMaxDegreeOfParallelism = Environment.ProcessorCount;
private static int _chunkOperationTimeout = 30000; // 30 seconds
private static int _lockCleanupInterval = 10; // 10 minutes
/// <summary>
/// Gets or sets the default maximum degree of parallelism for chunk operations.
/// </summary>
public static int DefaultMaxDegreeOfParallelism
{
get => _defaultMaxDegreeOfParallelism;
set
{
if (value < 1)
throw new ArgumentOutOfRangeException(nameof(value), "Parallelism must be at least 1");
_defaultMaxDegreeOfParallelism = value;
ChunkTaskScheduler.MaxDegreeOfParallelism = value;
}
}
/// <summary>
/// Gets or sets the timeout in milliseconds for chunk operations.
/// </summary>
public static int ChunkOperationTimeoutMs
{
get => _chunkOperationTimeout;
set
{
if (value < 0)
throw new ArgumentOutOfRangeException(nameof(value), "Timeout cannot be negative");
_chunkOperationTimeout = value;
}
}
/// <summary>
/// Gets or sets the interval in minutes for cleaning up unused locks.
/// </summary>
public static int LockCleanupIntervalMinutes
{
get => _lockCleanupInterval;
set
{
if (value < 1)
throw new ArgumentOutOfRangeException(nameof(value), "Cleanup interval must be at least 1 minute");
_lockCleanupInterval = value;
}
}
/// <summary>
/// Creates a cancellation token with the default timeout.
/// </summary>
public static CancellationToken CreateTimeoutToken(CancellationToken cancellationToken = default)
{
return ChunkTaskScheduler.CreateTimeoutToken(
TimeSpan.FromMilliseconds(_chunkOperationTimeout),
cancellationToken);
}
/// <summary>
/// Configures the system for high throughput (more parallelism, longer timeouts).
/// </summary>
public static void ConfigureForHighThroughput()
{
DefaultMaxDegreeOfParallelism = Math.Max(4, Environment.ProcessorCount * 2);
ChunkOperationTimeoutMs = 60000; // 1 minute
LockCleanupIntervalMinutes = 30;
}
/// <summary>
/// Configures the system for low latency (less parallelism, shorter timeouts).
/// </summary>
public static void ConfigureForLowLatency()
{
DefaultMaxDegreeOfParallelism = Math.Max(2, Environment.ProcessorCount / 2);
ChunkOperationTimeoutMs = 15000; // 15 seconds
LockCleanupIntervalMinutes = 5;
}
/// <summary>
/// Configures the system for memory efficiency (less parallelism, more aggressive cleanup).
/// </summary>
public static void ConfigureForMemoryEfficiency()
{
DefaultMaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 4);
ChunkOperationTimeoutMs = 45000; // 45 seconds
LockCleanupIntervalMinutes = 2;
}
}
}

View File

@ -0,0 +1,273 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides diagnostic information about threading operations in the chunk system.
/// </summary>
public static class ChunkThreadingDiagnostics
{
// Track operation durations
private static readonly ConcurrentDictionary<string, List<long>> _operationDurations = new();
// Track lock contention
private static readonly ConcurrentDictionary<IChunk, int> _lockContentionCount = new();
// Track active operations
private static readonly ConcurrentDictionary<Guid, (string Operation, DateTime StartTime, IChunk Chunk)> _activeOperations = new();
// Lock for thread safety
private static readonly object _lock = new();
// Stopwatch for timing
private static readonly Stopwatch _stopwatch = Stopwatch.StartNew();
/// <summary>
/// Tracks the duration of an operation.
/// </summary>
/// <param name="operationName">The name of the operation</param>
/// <param name="action">The action to perform</param>
public static void TrackOperation(string operationName, Action action)
{
var operationId = Guid.NewGuid();
var startTime = _stopwatch.ElapsedMilliseconds;
try
{
action();
}
finally
{
var duration = _stopwatch.ElapsedMilliseconds - startTime;
lock (_lock)
{
if (!_operationDurations.TryGetValue(operationName, out var durations))
{
durations = new List<long>();
_operationDurations[operationName] = durations;
}
durations.Add(duration);
// Keep only the last 1000 durations
if (durations.Count > 1000)
{
durations.RemoveAt(0);
}
}
}
}
/// <summary>
/// Tracks the start of an operation on a chunk.
/// </summary>
/// <param name="operationName">The name of the operation</param>
/// <param name="chunk">The chunk being operated on</param>
/// <returns>An operation ID for tracking</returns>
public static Guid TrackOperationStart(string operationName, IChunk chunk)
{
var operationId = Guid.NewGuid();
_activeOperations[operationId] = (operationName, DateTime.UtcNow, chunk);
return operationId;
}
/// <summary>
/// Tracks the end of an operation.
/// </summary>
/// <param name="operationId">The operation ID from TrackOperationStart</param>
public static void TrackOperationEnd(Guid operationId)
{
if (_activeOperations.TryRemove(operationId, out var info))
{
var duration = (DateTime.UtcNow - info.StartTime).TotalMilliseconds;
lock (_lock)
{
if (!_operationDurations.TryGetValue(info.Operation, out var durations))
{
durations = new List<long>();
_operationDurations[info.Operation] = durations;
}
durations.Add((long)duration);
// Keep only the last 1000 durations
if (durations.Count > 1000)
{
durations.RemoveAt(0);
}
}
}
}
/// <summary>
/// Tracks lock contention on a chunk.
/// </summary>
/// <param name="chunk">The chunk that had lock contention</param>
public static void TrackLockContention(IChunk chunk)
{
_lockContentionCount.AddOrUpdate(chunk, 1, (_, count) => count + 1);
}
/// <summary>
/// Gets statistics about operation durations.
/// </summary>
public static Dictionary<string, (long Min, long Max, double Average, int Count)> GetOperationStatistics()
{
var result = new Dictionary<string, (long Min, long Max, double Average, int Count)>();
lock (_lock)
{
foreach (var kvp in _operationDurations)
{
var durations = kvp.Value;
if (durations.Count > 0)
{
result[kvp.Key] = (
durations.Min(),
durations.Max(),
durations.Average(),
durations.Count
);
}
}
}
return result;
}
/// <summary>
/// Gets the chunks with the most lock contention.
/// </summary>
/// <param name="topCount">Number of chunks to return</param>
public static List<(IChunk Chunk, int ContentionCount)> GetTopContentionChunks(int topCount = 10)
{
return _lockContentionCount
.OrderByDescending(kvp => kvp.Value)
.Take(topCount)
.Select(kvp => (kvp.Key, kvp.Value))
.ToList();
}
/// <summary>
/// Gets information about currently active operations.
/// </summary>
public static List<(string Operation, TimeSpan Duration, IChunk Chunk)> GetActiveOperations()
{
var now = DateTime.UtcNow;
return _activeOperations
.Select(kvp => (
kvp.Value.Operation,
now - kvp.Value.StartTime,
kvp.Value.Chunk
))
.OrderByDescending(x => x.Item2)
.ToList();
}
/// <summary>
/// Gets the total number of tracked operations.
/// </summary>
public static int GetTotalOperationCount()
{
lock (_lock)
{
return _operationDurations.Values.Sum(list => list.Count);
}
}
/// <summary>
/// Gets the number of active operations.
/// </summary>
public static int GetActiveOperationCount()
{
return _activeOperations.Count;
}
/// <summary>
/// Gets the total number of lock contentions.
/// </summary>
public static int GetTotalLockContentionCount()
{
return _lockContentionCount.Values.Sum();
}
/// <summary>
/// Clears all diagnostic data.
/// </summary>
public static void ClearDiagnosticData()
{
lock (_lock)
{
_operationDurations.Clear();
_lockContentionCount.Clear();
// Don't clear active operations as they're still in progress
}
}
/// <summary>
/// Gets a comprehensive diagnostic report.
/// </summary>
public static string GenerateDiagnosticReport()
{
var report = new System.Text.StringBuilder();
report.AppendLine("=== Chunk Threading Diagnostic Report ===");
report.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC");
report.AppendLine();
// Operation statistics
report.AppendLine("== Operation Statistics ==");
var stats = GetOperationStatistics();
foreach (var kvp in stats.OrderByDescending(s => s.Value.Average))
{
report.AppendLine($"{kvp.Key}:");
report.AppendLine($" Count: {kvp.Value.Count}");
report.AppendLine($" Min: {kvp.Value.Min}ms");
report.AppendLine($" Max: {kvp.Value.Max}ms");
report.AppendLine($" Avg: {kvp.Value.Average:F2}ms");
}
report.AppendLine();
// Lock contention
report.AppendLine("== Lock Contention ==");
var contentions = GetTopContentionChunks(10);
foreach (var (chunk, count) in contentions)
{
report.AppendLine($"Chunk ({chunk.X}, {chunk.Y}): {count} contentions");
}
report.AppendLine($"Total contentions: {GetTotalLockContentionCount()}");
report.AppendLine();
// Active operations
report.AppendLine("== Active Operations ==");
var activeOps = GetActiveOperations();
foreach (var (operation, duration, chunk) in activeOps)
{
report.AppendLine($"{operation} on Chunk ({chunk.X}, {chunk.Y}): {duration.TotalMilliseconds:F2}ms");
}
report.AppendLine($"Total active operations: {GetActiveOperationCount()}");
return report.ToString();
}
/// <summary>
/// Logs a diagnostic event.
/// </summary>
/// <param name="eventName">The name of the event</param>
/// <param name="details">Additional details</param>
public static void LogEvent(string eventName, string details)
{
// This could be expanded to log to a file or other destination
Debug.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] {eventName}: {details}");
}
}
}

View File

@ -0,0 +1,380 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
using AdvChkSys.Manager;
using AdvChkSys.Loading;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides extension methods for threading operations on chunks.
/// </summary>
public static class ChunkThreadingExtensions
{
/// <summary>
/// Processes all loaded chunks in parallel.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="action">The action to perform on each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static Task ProcessAllChunksParallelAsync<T>(
this ChunkManager2D<T> manager,
Action<Chunk.Chunk2D<T>> action,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
return ChunkTaskScheduler.RunBatchParallelAsync(
chunks.Select(c => new Action(() => action(c))).ToArray(),
maxDegreeOfParallelism,
cancellationToken);
}
/// <summary>
/// Processes all loaded chunks in parallel.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="action">The action to perform on each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static Task ProcessAllChunksParallelAsync<T>(
this ChunkManager3D<T> manager,
Action<Chunk.Chunk3D<T>> action,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
return ChunkTaskScheduler.RunBatchParallelAsync(
chunks.Select(c => new Action(() => action(c))).ToArray(),
maxDegreeOfParallelism,
cancellationToken);
}
/// <summary>
/// Processes chunks in a region in parallel.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate</param>
/// <param name="minY">Minimum Y coordinate</param>
/// <param name="maxX">Maximum X coordinate</param>
/// <param name="maxY">Maximum Y coordinate</param>
/// <param name="action">The action to perform on each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksInRegionParallelAsync<T>(
this ChunkManager2D<T> manager,
int minX, int minY, int maxX, int maxY,
Action<Chunk.Chunk2D<T>> action,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = new List<Chunk.Chunk2D<T>>();
// Collect chunks in the region
for (int x = minX; x <= maxX; x++)
{
for (int y = minY; y <= maxY; y++)
{
var chunk = manager.GetChunk(x, y);
if (chunk != null)
{
chunks.Add(chunk);
}
}
}
// Process in parallel
await ChunkTaskScheduler.RunBatchParallelAsync(
chunks.Select(c => new Action(() => action(c))).ToArray(),
maxDegreeOfParallelism,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Processes chunks in a region in parallel.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate</param>
/// <param name="minY">Minimum Y coordinate</param>
/// <param name="minZ">Minimum Z coordinate</param>
/// <param name="maxX">Maximum X coordinate</param>
/// <param name="maxY">Maximum Y coordinate</param>
/// <param name="maxZ">Maximum Z coordinate</param>
/// <param name="action">The action to perform on each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksInRegionParallelAsync<T>(
this ChunkManager3D<T> manager,
int minX, int minY, int minZ, int maxX, int maxY, int maxZ,
Action<Chunk.Chunk3D<T>> action,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = new List<Chunk.Chunk3D<T>>();
// Collect chunks in the region
for (int x = minX; x <= maxX; x++)
{
for (int y = minY; y <= maxY; y++)
{
for (int z = minZ; z <= maxZ; z++)
{
var chunk = manager.GetChunk(x, y, z);
if (chunk != null)
{
chunks.Add(chunk);
}
}
}
}
// Process in parallel
await ChunkTaskScheduler.RunBatchParallelAsync(
chunks.Select(c => new Action(() => action(c))).ToArray(),
maxDegreeOfParallelism,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Loads chunks in a region asynchronously with a specified priority.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate</param>
/// <param name="minY">Minimum Y coordinate</param>
/// <param name="maxX">Maximum X coordinate</param>
/// <param name="maxY">Maximum Y coordinate</param>
/// <param name="width">Chunk width</param>
/// <param name="height">Chunk height</param>
/// <param name="priority">Loading priority</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task LoadChunksInRegionAsync<T>(
this ChunkManager2D<T> manager,
int minX, int minY, int maxX, int maxY,
int width, int height,
ChunkLoadingPriority.Priority priority = ChunkLoadingPriority.Priority.Normal,
CancellationToken cancellationToken = default)
{
// Create a loading priority system if not already integrated
var loadingPriority = new ChunkLoadingPriority();
// Enqueue all chunks in the region
var tasks = new List<Task>();
for (int x = minX; x <= maxX; x++)
{
for (int y = minY; y <= maxY; y++)
{
if (cancellationToken.IsCancellationRequested)
break;
// Skip if already loaded
if (manager.IsChunkLoaded(x, y))
continue;
// Enqueue the request
var request = loadingPriority.EnqueueRequest(x, y, width, height, priority);
// Create a task that completes when the chunk is loaded
var tcs = new TaskCompletionSource<bool>();
void Handler(object? sender, ChunkLoadingPriority.ChunkLoadRequest completedRequest)
{
if (completedRequest.RequestId == request.RequestId)
{
loadingPriority.RequestCompleted -= Handler;
tcs.TrySetResult(true);
}
}
loadingPriority.RequestCompleted += Handler;
// Add timeout to prevent indefinite waiting
_ = Task.Delay(30000, cancellationToken).ContinueWith(t =>
{
loadingPriority.RequestCompleted -= Handler;
if (!t.IsCanceled)
tcs.TrySetResult(false);
}, cancellationToken);
tasks.Add(tcs.Task);
}
}
// Wait for all chunks to be loaded
await Task.WhenAll(tasks).ConfigureAwait(false);
// Clean up
await loadingPriority.ShutdownAsync().ConfigureAwait(false);
}
/// <summary>
/// Loads chunks in a region asynchronously with a specified priority.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate</param>
/// <param name="minY">Minimum Y coordinate</param>
/// <param name="minZ">Minimum Z coordinate</param>
/// <param name="maxX">Maximum X coordinate</param>
/// <param name="maxY">Maximum Y coordinate</param>
/// <param name="maxZ">Maximum Z coordinate</param>
/// <param name="width">Chunk width</param>
/// <param name="height">Chunk height</param>
/// <param name="depth">Chunk depth</param>
/// <param name="priority">Loading priority</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task LoadChunksInRegionAsync<T>(
this ChunkManager3D<T> manager,
int minX, int minY, int minZ, int maxX, int maxY, int maxZ,
int width, int height, int depth,
ChunkLoadingPriority.Priority priority = ChunkLoadingPriority.Priority.Normal,
CancellationToken cancellationToken = default)
{
// Create a loading priority system if not already integrated
var loadingPriority = new ChunkLoadingPriority();
// Enqueue all chunks in the region
var tasks = new List<Task>();
for (int x = minX; x <= maxX; x++)
{
for (int y = minY; y <= maxY; y++)
{
for (int z = minZ; z <= maxZ; z++)
{
if (cancellationToken.IsCancellationRequested)
break;
// Skip if already loaded
if (manager.IsChunkLoaded(x, y, z))
continue;
// Enqueue the request
var request = loadingPriority.EnqueueRequest(x, y, z, width, height, depth, priority);
// Create a task that completes when the chunk is loaded
var tcs = new TaskCompletionSource<bool>();
void Handler(object? sender, ChunkLoadingPriority.ChunkLoadRequest completedRequest)
{
if (completedRequest.RequestId == request.RequestId)
{
loadingPriority.RequestCompleted -= Handler;
tcs.TrySetResult(true);
}
}
loadingPriority.RequestCompleted += Handler;
// Add timeout to prevent indefinite waiting
_ = Task.Delay(30000, cancellationToken).ContinueWith(t =>
{
loadingPriority.RequestCompleted -= Handler;
if (!t.IsCanceled)
tcs.TrySetResult(false);
}, cancellationToken);
tasks.Add(tcs.Task);
}
}
}
// Wait for all chunks to be loaded
await Task.WhenAll(tasks).ConfigureAwait(false);
// Clean up
await loadingPriority.ShutdownAsync().ConfigureAwait(false);
}
/// <summary>
/// Unloads chunks outside a specified region asynchronously.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate to keep</param>
/// <param name="minY">Minimum Y coordinate to keep</param>
/// <param name="maxX">Maximum X coordinate to keep</param>
/// <param name="maxY">Maximum Y coordinate to keep</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task UnloadChunksOutsideRegionAsync<T>(
this ChunkManager2D<T> manager,
int minX, int minY, int maxX, int maxY,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
// Get all loaded chunks
var chunks = manager.GetAllChunks().ToArray();
// Filter chunks outside the region
var chunksToUnload = chunks.Where(c =>
c.X < minX || c.X > maxX || c.Y < minY || c.Y > maxY).ToArray();
// Unload in parallel
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? Environment.ProcessorCount,
CancellationToken = cancellationToken
};
await Task.Run(() =>
{
Parallel.ForEach(chunksToUnload, options, chunk =>
{
manager.UnloadChunk(chunk.X, chunk.Y);
});
}, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Unloads chunks outside a specified region asynchronously.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="minX">Minimum X coordinate to keep</param>
/// <param name="minY">Minimum Y coordinate to keep</param>
/// <param name="minZ">Minimum Z coordinate to keep</param>
/// <param name="maxX">Maximum X coordinate to keep</param>
/// <param name="maxY">Maximum Y coordinate to keep</param>
/// <param name="maxZ">Maximum Z coordinate to keep</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task UnloadChunksOutsideRegionAsync<T>(
this ChunkManager3D<T> manager,
int minX, int minY, int minZ, int maxX, int maxY, int maxZ,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
// Get all loaded chunks
var chunks = manager.GetAllChunks().ToArray();
// Filter chunks outside the region
var chunksToUnload = chunks.Where(c =>
c.X < minX || c.X > maxX ||
c.Y < minY || c.Y > maxY ||
c.Z < minZ || c.Z > maxZ).ToArray();
// Unload in parallel
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? Environment.ProcessorCount,
CancellationToken = cancellationToken
};
await Task.Run(() =>
{
Parallel.ForEach(chunksToUnload, options, chunk =>
{
manager.UnloadChunk(chunk.X, chunk.Y, chunk.Z);
});
}, cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -0,0 +1,223 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Chunk;
using AdvChkSys.Dependencies;
using AdvChkSys.Interfaces;
using AdvChkSys.Manager;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides additional extension methods for threading operations on chunks.
/// </summary>
public static class ChunkThreadingExtensions2
{
/// <summary>
/// Processes chunks in a spiral pattern from the center outward.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="centerX">Center X coordinate</param>
/// <param name="centerY">Center Y coordinate</param>
/// <param name="radius">Radius in chunks</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksSpiralAsync<T>(
this ChunkManager2D<T> manager,
int centerX, int centerY, int radius,
Func<Chunk2D<T>, Task> processor,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
// Generate spiral coordinates
var coordinates = GenerateSpiralCoordinates(centerX, centerY, radius).ToArray();
// Process in batches to maintain the spiral ordering while still using parallelism
int batchSize = Math.Max(1, (int)Math.Sqrt(coordinates.Length));
int batchCount = (coordinates.Length + batchSize - 1) / batchSize;
for (int i = 0; i < batchCount; i++)
{
if (cancellationToken.IsCancellationRequested)
break;
var batchCoords = coordinates
.Skip(i * batchSize)
.Take(batchSize)
.ToArray();
var tasks = new List<Task>();
foreach (var (x, y) in batchCoords)
{
var chunk = manager.GetChunk(x, y);
if (chunk != null)
{
tasks.Add(processor(chunk));
}
}
if (tasks.Count > 0)
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
}
}
/// <summary>
/// Generates coordinates in a spiral pattern from the center outward.
/// </summary>
private static IEnumerable<(int X, int Y)> GenerateSpiralCoordinates(int centerX, int centerY, int radius)
{
// Start with the center
yield return (centerX, centerY);
// Spiral outward
for (int layer = 1; layer <= radius; layer++)
{
// Top edge (moving right)
for (int x = centerX - layer + 1; x <= centerX + layer; x++)
{
yield return (x, centerY - layer);
}
// Right edge (moving down)
for (int y = centerY - layer + 1; y <= centerY + layer; y++)
{
yield return (centerX + layer, y);
}
// Bottom edge (moving left)
for (int x = centerX + layer - 1; x >= centerX - layer; x--)
{
yield return (x, centerY + layer);
}
// Left edge (moving up)
for (int y = centerY + layer - 1; y >= centerY - layer; y--)
{
yield return (centerX - layer, y);
}
}
}
/// <summary>
/// Processes chunks with dependency awareness.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="dependencyTracker">The dependency tracker</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksWithDependenciesAsync<T>(
this ChunkManager2D<T> manager,
ChunkDependencyTracker dependencyTracker,
Func<Chunk2D<T>, Task> processor,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
await ChunkParallelProcessor.ProcessChunksWithDependenciesAsync(
chunks,
chunk => processor((Chunk2D<T>)chunk),
chunk => dependencyTracker.GetDependencies(chunk),
maxDegreeOfParallelism,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Processes chunks with dependency awareness.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="dependencyTracker">The dependency tracker</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksWithDependenciesAsync<T>(
this ChunkManager3D<T> manager,
ChunkDependencyTracker dependencyTracker,
Func<Chunk3D<T>, Task> processor,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
await ChunkParallelProcessor.ProcessChunksWithDependenciesAsync(
chunks,
chunk => processor((Chunk3D<T>)chunk),
chunk => dependencyTracker.GetDependencies(chunk),
maxDegreeOfParallelism,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Processes chunks with thread safety.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksSafelyAsync<T>(
this ChunkManager2D<T> manager,
Func<Chunk2D<T>, Task> processor,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
var threadingManager = ChunkThreadingManager.Instance;
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
await Task.Run(() => Parallel.ForEach(chunks, options, async chunk =>
{
await threadingManager.WithLockAsync(chunk, async () =>
{
await processor(chunk).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);
}), cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Processes chunks with thread safety.
/// </summary>
/// <typeparam name="T">The type of chunk data</typeparam>
/// <param name="manager">The chunk manager</param>
/// <param name="processor">The function to process each chunk</param>
/// <param name="maxDegreeOfParallelism">Maximum degree of parallelism (null for default)</param>
/// <param name="cancellationToken">Cancellation token</param>
public static async Task ProcessChunksSafelyAsync<T>(
this ChunkManager3D<T> manager,
Func<Chunk3D<T>, Task> processor,
int? maxDegreeOfParallelism = null,
CancellationToken cancellationToken = default)
{
var chunks = manager.GetAllChunks().ToArray();
var threadingManager = ChunkThreadingManager.Instance;
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism ?? ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
await Task.Run(() => Parallel.ForEach(chunks, options, async chunk =>
{
await threadingManager.WithLockAsync(chunk, async () =>
{
await processor(chunk).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);
}), cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -0,0 +1,281 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AdvChkSys.Interfaces;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides a centralized manager for all threading operations in the chunk system.
/// </summary>
public class ChunkThreadingManager : IDisposable
{
private static ChunkThreadingManager? _instance;
private static readonly object _instanceLock = new();
// Thread safety utilities
private readonly ChunkThreadSafetyManager _threadSafety;
private readonly ChunkAsyncLock _asyncLock;
// Operation queue
private readonly ChunkOperationQueue _operationQueue;
// Cancellation for shutdown
private readonly CancellationTokenSource _shutdownCts = new();
/// <summary>
/// Gets the singleton instance of the ChunkThreadingManager.
/// </summary>
public static ChunkThreadingManager Instance
{
get
{
if (_instance == null)
{
lock (_instanceLock)
{
_instance ??= new ChunkThreadingManager();
}
}
return _instance;
}
}
/// <summary>
/// Initializes a new instance of the ChunkThreadingManager class.
/// </summary>
private ChunkThreadingManager()
{
_threadSafety = new ChunkThreadSafetyManager(
ChunkThreadingConfiguration.LockCleanupIntervalMinutes);
_asyncLock = new ChunkAsyncLock(
ChunkThreadingConfiguration.LockCleanupIntervalMinutes);
_operationQueue = new ChunkOperationQueue(
ChunkThreadingConfiguration.DefaultMaxDegreeOfParallelism);
}
/// <summary>
/// Gets the thread safety manager for synchronous locking.
/// </summary>
public ChunkThreadSafetyManager ThreadSafety => _threadSafety;
/// <summary>
/// Gets the async lock manager for asynchronous locking.
/// </summary>
public ChunkAsyncLock AsyncLock => _asyncLock;
/// <summary>
/// Gets the operation queue for sequential chunk operations.
/// </summary>
public ChunkOperationQueue OperationQueue => _operationQueue;
/// <summary>
/// Acquires an exclusive lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireLock(IChunk chunk)
{
return _threadSafety.AcquireLock(chunk);
}
/// <summary>
/// Acquires a read lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireReadLock(IChunk chunk)
{
return _threadSafety.AcquireReadLock(chunk);
}
/// <summary>
/// Acquires a write lock on a chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public IDisposable AcquireWriteLock(IChunk chunk)
{
return _threadSafety.AcquireWriteLock(chunk);
}
/// <summary>
/// Acquires a lock on a chunk asynchronously.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A disposable that releases the lock when disposed</returns>
public Task<IDisposable> LockAsync(IChunk chunk, CancellationToken cancellationToken = default)
{
return _asyncLock.LockAsync(chunk, cancellationToken);
}
/// <summary>
/// Enqueues an operation to be performed on a chunk.
/// </summary>
/// <param name="chunk">The chunk to operate on</param>
/// <param name="operation">The operation to perform</param>
/// <returns>A task that completes when the operation is done</returns>
public Task EnqueueOperationAsync(IChunk chunk, Func<Task> operation)
{
return _operationQueue.EnqueueOperationAsync(chunk, operation);
}
/// <summary>
/// Runs an action with a lock on the chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="action">The action to perform</param>
public void WithLock(IChunk chunk, Action action)
{
using var lockObj = AcquireLock(chunk);
action();
}
/// <summary>
/// Runs a function with a lock on the chunk and returns the result.
/// </summary>
/// <typeparam name="T">The type of result</typeparam>
/// <param name="chunk">The chunk to lock</param>
/// <param name="func">The function to perform</param>
/// <returns>The result of the function</returns>
public T WithLock<T>(IChunk chunk, Func<T> func)
{
using var lockObj = AcquireLock(chunk);
return func();
}
/// <summary>
/// Runs an async action with a lock on the chunk.
/// </summary>
/// <param name="chunk">The chunk to lock</param>
/// <param name="action">The async action to perform</param>
/// <param name="cancellationToken">Cancellation token</param>
public async Task WithLockAsync(IChunk chunk, Func<Task> action, CancellationToken cancellationToken = default)
{
using var lockObj = await LockAsync(chunk, cancellationToken).ConfigureAwait(false);
await action().ConfigureAwait(false);
}
/// <summary>
/// Runs an async function with a lock on the chunk and returns the result.
/// </summary>
/// <typeparam name="T">The type of result</typeparam>
/// <param name="chunk">The chunk to lock</param>
/// <param name="func">The async function to perform</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The result of the function</returns>
public async Task<T> WithLockAsync<T>(IChunk chunk, Func<Task<T>> func, CancellationToken cancellationToken = default)
{
using var lockObj = await LockAsync(chunk, cancellationToken).ConfigureAwait(false);
return await func().ConfigureAwait(false);
}
/// <summary>
/// Runs an action on multiple chunks with proper locking to avoid deadlocks.
/// </summary>
/// <param name="chunks">The chunks to lock, in order</param>
/// <param name="action">The action to perform</param>
public void WithMultiLock(IChunk[] chunks, Action action)
{
// Sort chunks by ID to prevent deadlocks
var sortedChunks = SortChunksById(chunks);
// Acquire locks in order
var locks = new IDisposable[sortedChunks.Length];
try
{
for (int i = 0; i < sortedChunks.Length; i++)
{
locks[i] = AcquireLock(sortedChunks[i]);
}
// Execute the action
action();
}
finally
{
// Release locks in reverse order
for (int i = locks.Length - 1; i >= 0; i--)
{
locks[i]?.Dispose();
}
}
}
/// <summary>
/// Runs an async action on multiple chunks with proper locking to avoid deadlocks.
/// </summary>
/// <param name="chunks">The chunks to lock, in order</param>
/// <param name="action">The async action to perform</param>
/// <param name="cancellationToken">Cancellation token</param>
public async Task WithMultiLockAsync(IChunk[] chunks, Func<Task> action, CancellationToken cancellationToken = default)
{
// Sort chunks by ID to prevent deadlocks
var sortedChunks = SortChunksById(chunks);
// Acquire locks in order
var locks = new IDisposable[sortedChunks.Length];
try
{
for (int i = 0; i < sortedChunks.Length; i++)
{
locks[i] = await LockAsync(sortedChunks[i], cancellationToken).ConfigureAwait(false);
}
// Execute the action
await action().ConfigureAwait(false);
}
finally
{
// Release locks in reverse order
for (int i = locks.Length - 1; i >= 0; i--)
{
locks[i]?.Dispose();
}
}
}
/// <summary>
/// Sorts chunks by ID to prevent deadlocks when acquiring multiple locks.
/// </summary>
private IChunk[] SortChunksById(IChunk[] chunks)
{
// Use System.Linq for OrderBy
return chunks.OrderBy(c => c.GetHashCode()).ToArray();
}
/// <summary>
/// Disposes all resources.
/// </summary>
public void Dispose()
{
_shutdownCts.Cancel();
// Shutdown operation queue
_operationQueue.ShutdownAsync().GetAwaiter().GetResult();
// Dispose thread safety managers
(_threadSafety as IDisposable)?.Dispose();
(_asyncLock as IDisposable)?.Dispose();
_shutdownCts.Dispose();
}
/// <summary>
/// Resets the singleton instance (for testing).
/// </summary>
internal static void ResetInstance()
{
lock (_instanceLock)
{
_instance?.Dispose();
_instance = null;
}
}
}
}

View File

@ -0,0 +1,257 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AdvChkSys.Threading
{
/// <summary>
/// Monitors and reports on threading performance in the chunk system.
/// </summary>
public class ChunkThreadingPerformanceMonitor : IDisposable
{
// Performance metrics
private readonly ConcurrentDictionary<string, PerformanceMetric> _metrics = new();
// Sampling timer
private readonly Timer _samplingTimer;
// Sampling interval
private readonly TimeSpan _samplingInterval;
// Maximum history to keep
private readonly int _maxHistory;
// Whether monitoring is enabled
private bool _isEnabled;
/// <summary>
/// Initializes a new instance of the ChunkThreadingPerformanceMonitor class.
/// </summary>
/// <param name="samplingIntervalMs">Sampling interval in milliseconds</param>
/// <param name="maxHistory">Maximum number of samples to keep</param>
public ChunkThreadingPerformanceMonitor(int samplingIntervalMs = 1000, int maxHistory = 60)
{
_samplingInterval = TimeSpan.FromMilliseconds(samplingIntervalMs);
_maxHistory = maxHistory;
_samplingTimer = new Timer(SamplePerformance, null, Timeout.Infinite, Timeout.Infinite);
}
/// <summary>
/// Starts monitoring performance.
/// </summary>
public void Start()
{
if (!_isEnabled)
{
_isEnabled = true;
_samplingTimer.Change(TimeSpan.Zero, _samplingInterval);
}
}
/// <summary>
/// Stops monitoring performance.
/// </summary>
public void Stop()
{
if (_isEnabled)
{
_isEnabled = false;
_samplingTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
}
/// <summary>
/// Samples performance metrics.
/// </summary>
private void SamplePerformance(object? state)
{
if (!_isEnabled)
return;
// Sample thread pool information
ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
// Calculate thread pool usage
double workerThreadUsage = 1.0 - ((double)workerThreads / maxWorkerThreads);
double ioThreadUsage = 1.0 - ((double)completionPortThreads / maxCompletionPortThreads);
// Update metrics
UpdateMetric("ThreadPool.WorkerThreadUsage", workerThreadUsage);
UpdateMetric("ThreadPool.IOThreadUsage", ioThreadUsage);
// Sample operation queue information
var operationQueue = ChunkThreadingManager.Instance.OperationQueue;
UpdateMetric("OperationQueue.PendingOperations", operationQueue.PendingOperationCount);
UpdateMetric("OperationQueue.ActiveOperations", operationQueue.ActiveOperationCount);
// Sample diagnostics information
UpdateMetric("Diagnostics.ActiveOperations", ChunkThreadingDiagnostics.GetActiveOperationCount());
UpdateMetric("Diagnostics.LockContentions", ChunkThreadingDiagnostics.GetTotalLockContentionCount());
// Sample process information
var process = Process.GetCurrentProcess();
// Fix: Use process.TotalProcessorTime and calculate uptime manually
TimeSpan upTime = DateTime.Now - process.StartTime;
UpdateMetric("Process.CPU", process.TotalProcessorTime.TotalMilliseconds / Environment.ProcessorCount / upTime.TotalMilliseconds);
UpdateMetric("Process.Memory", process.WorkingSet64 / 1024.0 / 1024.0); // MB
UpdateMetric("Process.Threads", process.Threads.Count);
}
/// <summary>
/// Updates a performance metric.
/// </summary>
private void UpdateMetric(string name, double value)
{
if (!_metrics.TryGetValue(name, out var metric))
{
metric = new PerformanceMetric(_maxHistory);
_metrics[name] = metric;
}
metric.AddSample(value);
}
/// <summary>
/// Gets a performance report.
/// </summary>
public Dictionary<string, PerformanceReport> GetPerformanceReport()
{
var report = new Dictionary<string, PerformanceReport>();
foreach (var kvp in _metrics)
{
report[kvp.Key] = kvp.Value.GetReport();
}
return report;
}
/// <summary>
/// Gets a formatted performance report.
/// </summary>
public string GetFormattedReport()
{
var report = new System.Text.StringBuilder();
report.AppendLine("=== Chunk Threading Performance Report ===");
report.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC");
report.AppendLine();
var metrics = _metrics.OrderBy(m => m.Key).ToList();
foreach (var kvp in metrics)
{
var metricReport = kvp.Value.GetReport();
report.AppendLine($"{kvp.Key}:");
report.AppendLine($" Current: {metricReport.Current:F3}");
report.AppendLine($" Average: {metricReport.Average:F3}");
report.AppendLine($" Min: {metricReport.Min:F3}");
report.AppendLine($" Max: {metricReport.Max:F3}");
report.AppendLine();
}
return report.ToString();
}
/// <summary>
/// Disposes resources.
/// </summary>
public void Dispose()
{
Stop();
_samplingTimer.Dispose();
}
/// <summary>
/// Represents a performance metric with history.
/// </summary>
private class PerformanceMetric
{
private readonly Queue<double> _samples;
private readonly int _maxSamples;
public PerformanceMetric(int maxSamples)
{
_maxSamples = maxSamples;
_samples = new Queue<double>(maxSamples);
}
public void AddSample(double value)
{
lock (_samples)
{
_samples.Enqueue(value);
while (_samples.Count > _maxSamples)
{
_samples.Dequeue();
}
}
}
public PerformanceReport GetReport()
{
lock (_samples)
{
if (_samples.Count == 0)
{
return new PerformanceReport(0, 0, 0, 0, Array.Empty<double>());
}
var samplesArray = _samples.ToArray();
return new PerformanceReport(
samplesArray.Last(),
samplesArray.Average(),
samplesArray.Min(),
samplesArray.Max(),
samplesArray
);
}
}
}
/// <summary>
/// Represents a performance report.
/// </summary>
public class PerformanceReport
{
/// <summary>
/// The current value.
/// </summary>
public double Current { get; }
/// <summary>
/// The average value.
/// </summary>
public double Average { get; }
/// <summary>
/// The minimum value.
/// </summary>
public double Min { get; }
/// <summary>
/// The maximum value.
/// </summary>
public double Max { get; }
/// <summary>
/// The history of values.
/// </summary>
public double[] History { get; }
public PerformanceReport(double current, double average, double min, double max, double[] history)
{
Current = current;
Average = average;
Min = min;
Max = max;
History = history;
}
}
}
}

View File

@ -0,0 +1,170 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace AdvChkSys.Threading
{
/// <summary>
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
internal class LimitedConcurrencyTaskScheduler : TaskScheduler
{
// Indicates whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed
private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
// The maximum concurrency level allowed by this scheduler.
private int _maximumConcurrencyLevel;
// Indicates whether the scheduler is currently processing work items.
private int _delegatesQueuedOrRunning;
/// <summary>
/// Creates a new instance with the specified degree of parallelism.
/// </summary>
/// <param name="maximumConcurrencyLevel">The maximum degree of parallelism</param>
public LimitedConcurrencyTaskScheduler(int maximumConcurrencyLevel)
{
if (maximumConcurrencyLevel < 1)
throw new ArgumentOutOfRangeException(nameof(maximumConcurrencyLevel));
_maximumConcurrencyLevel = maximumConcurrencyLevel;
}
/// <summary>
/// Gets the maximum concurrency level supported by this scheduler.
/// </summary>
public override int MaximumConcurrencyLevel => _maximumConcurrencyLevel;
/// <summary>
/// Sets the maximum concurrency level for this scheduler.
/// </summary>
/// <param name="value">The new maximum concurrency level</param>
public void SetMaximumConcurrencyLevel(int value)
{
if (value < 1)
throw new ArgumentOutOfRangeException(nameof(value));
_maximumConcurrencyLevel = value;
}
/// <summary>
/// Queues a task to the scheduler.
/// </summary>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed.
lock (_tasks)
{
_tasks.AddLast(task);
}
// If there aren't enough delegates currently queued or running to process
// tasks, schedule another.
if (Interlocked.Increment(ref _delegatesQueuedOrRunning) <= _maximumConcurrencyLevel)
{
ThreadPool.QueueUserWorkItem(ProcessQueuedTasks);
}
else
{
Interlocked.Decrement(ref _delegatesQueuedOrRunning);
}
}
/// <summary>
/// Attempts to execute the specified task on the current thread.
/// </summary>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems)
return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
{
// Try to remove the task from the queue.
lock (_tasks)
{
if (_tasks.Contains(task))
{
_tasks.Remove(task);
}
else
{
// The task isn't in the queue anymore, so it was probably already executed
return false;
}
}
}
// Try to execute the task.
bool result = TryExecuteTask(task);
return result;
}
/// <summary>
/// Attempts to remove a previously scheduled task from the scheduler.
/// </summary>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks)
{
return _tasks.Remove(task);
}
}
/// <summary>
/// Gets an enumerable of the tasks currently scheduled on this scheduler.
/// </summary>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
lock (_tasks)
{
return new List<Task>(_tasks);
}
}
/// <summary>
/// Processes tasks in the queue.
/// </summary>
private void ProcessQueuedTasks(object? state)
{
// This thread is now processing work items.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task? task = null;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
Interlocked.Decrement(ref _delegatesQueuedOrRunning);
break;
}
// Get the next item from the queue
task = _tasks.First!.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
TryExecuteTask(task);
}
}
finally
{
// We're done processing items on the current thread
_currentThreadIsProcessingItems = false;
}
}
}
}