journal/Journal.Sidecar/LocalWhisperS2TService.cs
Jacob Schmidt 0d77300c22 feat: Project Journal backend monorepo
Monorepo with centralized build props, npm workspaces, LlamaSharp AI,
SQLite/SQLCipher storage, Svelte frontend, and unified smoke tests.

Co-Authored-By: Oz <oz-agent@warp.dev>
2026-03-02 20:56:26 -06:00

308 lines
9.4 KiB
C#

using System.Collections.Concurrent;
using Journal.Core.Dtos;
using Journal.Core.Services.Speech;
using NAudio.Wave;
using Whisper.net;
using Whisper.net.Ggml;
namespace Journal.Sidecar;
public sealed class LocalWhisperS2TService : IS2TService, IDisposable
{
private const int SampleRate = 16000;
private const int Bits = 16;
private const int Channels = 1;
private const int ChunkMs = 2000;
private const int MaxBufferedItems = 256;
private const int SilenceRmsThreshold = 150;
private readonly Lock _sync = new();
private readonly Lock _segmentLock = new();
private readonly ConcurrentQueue<string> _transcripts = new();
private WaveInEvent? _waveIn;
private System.Timers.Timer? _flushTimer;
private MemoryStream? _segmentBuffer;
private BlockingCollection<byte[]>? _chunkQueue;
private CancellationTokenSource? _cts;
private Task? _worker;
private WhisperFactory? _factory;
private volatile bool _running;
private string _status = "stopped";
private string? _warning;
public async Task<S2TStartResultDto> StartAsync(CancellationToken cancellationToken = default)
{
lock (_sync)
{
if (_running)
return new S2TStartResultDto(true, _status, _warning);
_status = "starting";
_warning = null;
}
try
{
var modelPath = await EnsureModelAsync(cancellationToken);
lock (_sync)
{
_factory ??= WhisperFactory.FromPath(modelPath);
_segmentBuffer = new MemoryStream();
_chunkQueue = [];
_cts = new CancellationTokenSource();
var waveFormat = new WaveFormat(SampleRate, Bits, Channels);
_waveIn = new WaveInEvent
{
DeviceNumber = -1,
WaveFormat = waveFormat,
BufferMilliseconds = 100
};
_waveIn.DataAvailable += HandleDataAvailable;
_flushTimer = new System.Timers.Timer(ChunkMs);
_flushTimer.Elapsed += (_, _) => FlushChunk(waveFormat);
_worker = Task.Run(() => RunWorkerAsync(waveFormat), _cts.Token);
_waveIn.StartRecording();
_flushTimer.Start();
_running = true;
_status = "listening";
}
}
catch (Exception ex)
{
lock (_sync)
{
_running = false;
_status = "error";
_warning = ex.Message;
}
return new S2TStartResultDto(false, "error", ex.Message);
}
return new S2TStartResultDto(true, "listening");
}
public async Task<S2TStopResultDto> StopAsync(CancellationToken cancellationToken = default)
{
WaveInEvent? waveIn;
System.Timers.Timer? flushTimer;
BlockingCollection<byte[]>? queue;
CancellationTokenSource? cts;
Task? worker;
lock (_sync)
{
if (!_running)
return new S2TStopResultDto(false, "stopped", _warning);
_running = false;
_status = "stopped";
waveIn = _waveIn;
flushTimer = _flushTimer;
queue = _chunkQueue;
cts = _cts;
worker = _worker;
_waveIn = null;
_flushTimer = null;
_chunkQueue = null;
_cts = null;
_worker = null;
}
try
{
flushTimer?.Stop();
if (waveIn is not null)
{
waveIn.DataAvailable -= HandleDataAvailable;
waveIn.StopRecording();
waveIn.Dispose();
}
queue?.CompleteAdding();
cts?.Cancel();
if (worker is not null)
{
await Task.WhenAny(worker, Task.Delay(1000, cancellationToken));
}
}
finally
{
flushTimer?.Dispose();
cts?.Dispose();
lock (_sync)
{
_segmentBuffer?.Dispose();
_segmentBuffer = null;
}
}
return new S2TStopResultDto(false, "stopped");
}
public Task<S2TPollResultDto> PollAsync(int maxItems = 8, CancellationToken cancellationToken = default)
{
if (maxItems <= 0)
maxItems = 1;
if (maxItems > 64)
maxItems = 64;
var items = new List<string>(maxItems);
while (items.Count < maxItems && _transcripts.TryDequeue(out var text))
{
items.Add(text);
}
return Task.FromResult(new S2TPollResultDto(items, _running, _status, _warning));
}
private void HandleDataAvailable(object? sender, WaveInEventArgs e)
{
lock (_segmentLock)
{
_segmentBuffer?.Write(e.Buffer, 0, e.BytesRecorded);
}
}
private void FlushChunk(WaveFormat waveFormat)
{
var queue = _chunkQueue;
if (queue is null || queue.IsAddingCompleted)
return;
byte[]? chunk = null;
lock (_segmentLock)
{
if (_segmentBuffer is null)
return;
if (_segmentBuffer.Length < waveFormat.AverageBytesPerSecond / 2)
return;
chunk = _segmentBuffer.ToArray();
_segmentBuffer.SetLength(0);
}
if (chunk is not null && chunk.Length > 0)
queue.Add(chunk);
}
private async Task RunWorkerAsync(WaveFormat waveFormat)
{
try
{
var queue = _chunkQueue;
if (queue is null || _factory is null)
return;
using var processor = _factory.CreateBuilder()
.WithLanguage("en")
.Build();
foreach (var pcmChunk in queue.GetConsumingEnumerable())
{
try
{
if (IsLikelySilence(pcmChunk))
continue;
using var pcmStream = new MemoryStream(pcmChunk, writable: false);
using var raw = new RawSourceWaveStream(pcmStream, waveFormat);
using var wavStream = new MemoryStream();
WaveFileWriter.WriteWavFileToStream(wavStream, raw);
wavStream.Position = 0;
await foreach (var result in processor.ProcessAsync(wavStream))
{
var text = result.Text?.Trim();
if (string.IsNullOrWhiteSpace(text))
continue;
if (IsPlaceholderTranscript(text))
continue;
EnqueueTranscript(text);
}
}
catch (Exception ex)
{
_warning = $"Transcription error: {ex.Message}";
}
}
}
catch (Exception ex)
{
lock (_sync)
{
_status = "error";
_warning = ex.Message;
}
}
}
private static async Task<string> EnsureModelAsync(CancellationToken cancellationToken)
{
var modelDirectory = Path.Combine(
Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData),
"ProjectJournal",
"speech-models");
Directory.CreateDirectory(modelDirectory);
var modelPath = Path.Combine(modelDirectory, "ggml-base.en.bin");
if (File.Exists(modelPath))
return modelPath;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMinutes(5));
using var modelStream = await WhisperGgmlDownloader.Default.GetGgmlModelAsync(
GgmlType.BaseEn,
cancellationToken: cts.Token);
using var fileWriter = File.OpenWrite(modelPath);
await modelStream.CopyToAsync(fileWriter, cts.Token);
return modelPath;
}
private static bool IsLikelySilence(byte[] pcmChunk)
{
if (pcmChunk.Length < 2)
return true;
long sumSquares = 0;
int samples = pcmChunk.Length / 2;
for (int i = 0; i + 1 < pcmChunk.Length; i += 2)
{
short sample = (short)(pcmChunk[i] | (pcmChunk[i + 1] << 8));
sumSquares += (long)sample * sample;
}
if (samples <= 0)
return true;
var rms = Math.Sqrt(sumSquares / (double)samples);
return rms < SilenceRmsThreshold;
}
private static bool IsPlaceholderTranscript(string text)
{
var normalized = text.Trim();
if (!(normalized.StartsWith('[') && normalized.EndsWith(']')))
return false;
return normalized.Equals("[BLANK_AUDIO]", StringComparison.OrdinalIgnoreCase)
|| normalized.Equals("[NO AUDIO]", StringComparison.OrdinalIgnoreCase)
|| normalized.Equals("[SILENCE]", StringComparison.OrdinalIgnoreCase);
}
private void EnqueueTranscript(string text)
{
_transcripts.Enqueue(text);
while (_transcripts.Count > MaxBufferedItems && _transcripts.TryDequeue(out _))
{
}
}
public void Dispose()
{
StopAsync().GetAwaiter().GetResult();
_factory?.Dispose();
_factory = null;
}
}