using System.Collections.Concurrent; using Journal.Core.Dtos; using Journal.Core.Services.Speech; using NAudio.Wave; using Whisper.net; using Whisper.net.Ggml; namespace Journal.Sidecar; public sealed class LocalWhisperS2TService : IS2TService, IDisposable { private const int SampleRate = 16000; private const int Bits = 16; private const int Channels = 1; private const int ChunkMs = 2000; private const int MaxBufferedItems = 256; private const int SilenceRmsThreshold = 150; private readonly Lock _sync = new(); private readonly Lock _segmentLock = new(); private readonly ConcurrentQueue _transcripts = new(); private WaveInEvent? _waveIn; private System.Timers.Timer? _flushTimer; private MemoryStream? _segmentBuffer; private BlockingCollection? _chunkQueue; private CancellationTokenSource? _cts; private Task? _worker; private WhisperFactory? _factory; private volatile bool _running; private string _status = "stopped"; private string? _warning; public async Task StartAsync(CancellationToken cancellationToken = default) { lock (_sync) { if (_running) return new S2TStartResultDto(true, _status, _warning); _status = "starting"; _warning = null; } try { var modelPath = await EnsureModelAsync(cancellationToken); lock (_sync) { _factory ??= WhisperFactory.FromPath(modelPath); _segmentBuffer = new MemoryStream(); _chunkQueue = []; _cts = new CancellationTokenSource(); var waveFormat = new WaveFormat(SampleRate, Bits, Channels); _waveIn = new WaveInEvent { DeviceNumber = -1, WaveFormat = waveFormat, BufferMilliseconds = 100 }; _waveIn.DataAvailable += HandleDataAvailable; _flushTimer = new System.Timers.Timer(ChunkMs); _flushTimer.Elapsed += (_, _) => FlushChunk(waveFormat); _worker = Task.Run(() => RunWorkerAsync(waveFormat), _cts.Token); _waveIn.StartRecording(); _flushTimer.Start(); _running = true; _status = "listening"; } } catch (Exception ex) { lock (_sync) { _running = false; _status = "error"; _warning = ex.Message; } return new S2TStartResultDto(false, "error", ex.Message); } return new S2TStartResultDto(true, "listening"); } public async Task StopAsync(CancellationToken cancellationToken = default) { WaveInEvent? waveIn; System.Timers.Timer? flushTimer; BlockingCollection? queue; CancellationTokenSource? cts; Task? worker; lock (_sync) { if (!_running) return new S2TStopResultDto(false, "stopped", _warning); _running = false; _status = "stopped"; waveIn = _waveIn; flushTimer = _flushTimer; queue = _chunkQueue; cts = _cts; worker = _worker; _waveIn = null; _flushTimer = null; _chunkQueue = null; _cts = null; _worker = null; } try { flushTimer?.Stop(); if (waveIn is not null) { waveIn.DataAvailable -= HandleDataAvailable; waveIn.StopRecording(); waveIn.Dispose(); } queue?.CompleteAdding(); cts?.Cancel(); if (worker is not null) { await Task.WhenAny(worker, Task.Delay(1000, cancellationToken)); } } finally { flushTimer?.Dispose(); cts?.Dispose(); lock (_sync) { _segmentBuffer?.Dispose(); _segmentBuffer = null; } } return new S2TStopResultDto(false, "stopped"); } public Task PollAsync(int maxItems = 8, CancellationToken cancellationToken = default) { if (maxItems <= 0) maxItems = 1; if (maxItems > 64) maxItems = 64; var items = new List(maxItems); while (items.Count < maxItems && _transcripts.TryDequeue(out var text)) { items.Add(text); } return Task.FromResult(new S2TPollResultDto(items, _running, _status, _warning)); } private void HandleDataAvailable(object? sender, WaveInEventArgs e) { lock (_segmentLock) { _segmentBuffer?.Write(e.Buffer, 0, e.BytesRecorded); } } private void FlushChunk(WaveFormat waveFormat) { var queue = _chunkQueue; if (queue is null || queue.IsAddingCompleted) return; byte[]? chunk = null; lock (_segmentLock) { if (_segmentBuffer is null) return; if (_segmentBuffer.Length < waveFormat.AverageBytesPerSecond / 2) return; chunk = _segmentBuffer.ToArray(); _segmentBuffer.SetLength(0); } if (chunk is not null && chunk.Length > 0) queue.Add(chunk); } private async Task RunWorkerAsync(WaveFormat waveFormat) { try { var queue = _chunkQueue; if (queue is null || _factory is null) return; using var processor = _factory.CreateBuilder() .WithLanguage("en") .Build(); foreach (var pcmChunk in queue.GetConsumingEnumerable()) { try { if (IsLikelySilence(pcmChunk)) continue; using var pcmStream = new MemoryStream(pcmChunk, writable: false); using var raw = new RawSourceWaveStream(pcmStream, waveFormat); using var wavStream = new MemoryStream(); WaveFileWriter.WriteWavFileToStream(wavStream, raw); wavStream.Position = 0; await foreach (var result in processor.ProcessAsync(wavStream)) { var text = result.Text?.Trim(); if (string.IsNullOrWhiteSpace(text)) continue; if (IsPlaceholderTranscript(text)) continue; EnqueueTranscript(text); } } catch (Exception ex) { _warning = $"Transcription error: {ex.Message}"; } } } catch (Exception ex) { lock (_sync) { _status = "error"; _warning = ex.Message; } } } private static async Task EnsureModelAsync(CancellationToken cancellationToken) { var modelDirectory = Path.Combine( Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), "ProjectJournal", "speech-models"); Directory.CreateDirectory(modelDirectory); var modelPath = Path.Combine(modelDirectory, "ggml-base.en.bin"); if (File.Exists(modelPath)) return modelPath; using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromMinutes(5)); using var modelStream = await WhisperGgmlDownloader.Default.GetGgmlModelAsync( GgmlType.BaseEn, cancellationToken: cts.Token); using var fileWriter = File.OpenWrite(modelPath); await modelStream.CopyToAsync(fileWriter, cts.Token); return modelPath; } private static bool IsLikelySilence(byte[] pcmChunk) { if (pcmChunk.Length < 2) return true; long sumSquares = 0; int samples = pcmChunk.Length / 2; for (int i = 0; i + 1 < pcmChunk.Length; i += 2) { short sample = (short)(pcmChunk[i] | (pcmChunk[i + 1] << 8)); sumSquares += (long)sample * sample; } if (samples <= 0) return true; var rms = Math.Sqrt(sumSquares / (double)samples); return rms < SilenceRmsThreshold; } private static bool IsPlaceholderTranscript(string text) { var normalized = text.Trim(); if (!(normalized.StartsWith('[') && normalized.EndsWith(']'))) return false; return normalized.Equals("[BLANK_AUDIO]", StringComparison.OrdinalIgnoreCase) || normalized.Equals("[NO AUDIO]", StringComparison.OrdinalIgnoreCase) || normalized.Equals("[SILENCE]", StringComparison.OrdinalIgnoreCase); } private void EnqueueTranscript(string text) { _transcripts.Enqueue(text); while (_transcripts.Count > MaxBufferedItems && _transcripts.TryDequeue(out _)) { } } public void Dispose() { StopAsync().GetAwaiter().GetResult(); _factory?.Dispose(); _factory = null; } }