diff --git a/src/cli.py b/src/cli.py index b3e1f12..41437bf 100644 --- a/src/cli.py +++ b/src/cli.py @@ -10,6 +10,7 @@ audio_app = typer.Typer(help="Audio device operations") # Add commands to subcommands server_app.command("check")(server.check) server_app.command("test")(server.test) +server_app.command("satellite")(server.satellite) audio_app.command("list")(audio.list_devices) audio_app.command("test")(audio.test_device) audio_app.command("install")(audio.install) diff --git a/src/commands/server.py b/src/commands/server.py index 088c28d..d6797f2 100644 --- a/src/commands/server.py +++ b/src/commands/server.py @@ -5,15 +5,15 @@ import typer from typing import Optional from wyoming.audio import AudioStart, AudioChunk, AudioStop +from wyoming.client import AsyncTcpClient +from wyoming.asr import Transcribe, Transcript +from ..wyoming_client.satellite import SatelliteController +from ..wyoming_client.vad import AmplitudeVAD from ..config import AppConfig -import io -import wave import numpy as np import sounddevice as sd import asyncio -from wyoming.client import AsyncTcpClient -from wyoming.asr import Transcribe, Transcript async def _async_transcribe(host: str, port: int, timeout: float, pcm_bytes: bytes, lang: str) -> Optional[str]: @@ -175,3 +175,53 @@ def test( except Exception as e: typer.echo(typer.style(f"ASR request failed: {e}", fg=typer.colors.RED)) raise typer.Exit(1) + + +def satellite( + chunk_duration: float = typer.Option(0.03, "--chunk-duration", help="Audio chunk duration in seconds"), + vad_threshold: float = typer.Option(0.01, "--vad-threshold", help="Voice activity detection threshold"), + speech_timeout: float = typer.Option(1.5, "--speech-timeout", help="Silence duration to end speech (seconds)"), + lang: str = typer.Option("fr", "--lang", help="Language code: 'fr' or 'en'"), + host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"), + port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"), + timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """Run satellite mode with VAD-based audio streaming.""" + # Load configuration + config = AppConfig.load(config_file) + final_host = host or config.server.host + final_port = port or config.server.port + final_timeout = timeout or config.server.timeout + + # Validate language + lang = (lang or "fr").strip().lower() + if lang not in ("fr", "en"): + typer.echo(typer.style("Invalid --lang. Use 'fr' or 'en'.", fg=typer.colors.RED)) + raise typer.Exit(2) + + # Check server reachability first + reachable, latency, err = check_wyoming_server(final_host, final_port, final_timeout) + if not reachable: + typer.echo(typer.style(f"Cannot reach Wyoming server at {final_host}:{final_port}: {err}", fg=typer.colors.RED)) + raise typer.Exit(1) + + # Initialize VAD detector + vad_detector = AmplitudeVAD( + threshold=vad_threshold, + min_speech_duration=0.1, + min_silence_duration=speech_timeout, + sample_rate=16000 + ) + + # Initialize and run satellite + controller = SatelliteController( + host=final_host, + port=final_port, + lang=lang, + vad_detector=vad_detector, + chunk_duration=chunk_duration, + timeout=final_timeout + ) + + controller.run() diff --git a/src/wyoming_client/__init__.py b/src/wyoming_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/wyoming_client/audio_buffer.py b/src/wyoming_client/audio_buffer.py new file mode 100644 index 0000000..4ee8f5d --- /dev/null +++ b/src/wyoming_client/audio_buffer.py @@ -0,0 +1,67 @@ +import numpy as np +from collections import deque +from typing import List, Optional + + +class AudioBuffer: + """Circular audio buffer for pre-VAD audio storage.""" + + def __init__(self, max_duration: float = 1.0, sample_rate: int = 16000): + """ + Initialize audio buffer. + + Args: + max_duration: Maximum buffer duration in seconds + sample_rate: Audio sample rate + """ + self.sample_rate = sample_rate + max_samples = int(max_duration * sample_rate) + self.buffer: deque = deque(maxlen=max_samples) + self.is_recording = False + self.recorded_chunks: List[np.ndarray] = [] + + def add_chunk(self, audio_chunk: np.ndarray) -> None: + """Add audio chunk to buffer.""" + # Always add to circular buffer + for sample in audio_chunk: + self.buffer.append(sample) + + # If recording, also add to recorded chunks + if self.is_recording: + self.recorded_chunks.append(audio_chunk.copy()) + + def start_recording(self) -> np.ndarray: + """Start recording and return pre-buffer content.""" + self.is_recording = True + + # Return current buffer content as pre-buffer + pre_buffer = np.array(list(self.buffer), dtype=np.float32) + self.recorded_chunks = [pre_buffer] if len(pre_buffer) > 0 else [] + + return pre_buffer + + def stop_recording(self) -> np.ndarray: + """Stop recording and return all recorded audio.""" + self.is_recording = False + + if not self.recorded_chunks: + return np.array([], dtype=np.float32) + + # Concatenate all recorded chunks + full_recording = np.concatenate(self.recorded_chunks) + self.recorded_chunks = [] + + return full_recording + + def get_current_recording(self) -> Optional[np.ndarray]: + """Get current recording without stopping.""" + if not self.is_recording or not self.recorded_chunks: + return None + + return np.concatenate(self.recorded_chunks) + + def clear(self) -> None: + """Clear all buffers.""" + self.buffer.clear() + self.recorded_chunks.clear() + self.is_recording = False \ No newline at end of file diff --git a/src/wyoming_client/satellite.py b/src/wyoming_client/satellite.py new file mode 100644 index 0000000..78af37a --- /dev/null +++ b/src/wyoming_client/satellite.py @@ -0,0 +1,198 @@ +from typing import Optional + +from wyoming.asr import Transcript +from wyoming.audio import AudioStart, AudioChunk, AudioStop +from wyoming.asr import Transcribe +from wyoming.client import AsyncTcpClient + +from ..wyoming_client.audio_buffer import AudioBuffer +from ..wyoming_client.vad import VADDetector +from queue import Queue, Empty +import typer +import asyncio +import threading +import numpy as np +import sounddevice as sd +import time + + +class SatelliteController: + """Main satellite controller with VAD-based audio streaming.""" + + def __init__(self, host: str, port: int, lang: str, vad_detector: VADDetector, + chunk_duration: float = 0.03, timeout: float = 5.0): + self.host = host + self.port = port + self.lang = lang + self.vad_detector = vad_detector + self.timeout = timeout + + # Audio settings + self.sample_rate = 16000 + self.channels = 1 + self.chunk_size = int(chunk_duration * self.sample_rate) + + # Components + self.audio_buffer = AudioBuffer(max_duration=1.0, sample_rate=self.sample_rate) + + # State + self.is_running = False + self.is_speaking = False + self.audio_queue = Queue() + self.transcription_queue = Queue() + + def _audio_callback(self, indata, frames, time, status): + """Callback for sounddevice audio stream.""" + if status: + typer.echo(f"Audio callback status: {status}") + + audio_chunk = indata[:, 0].copy() # Extract mono channel + self.audio_queue.put(audio_chunk) + + async def _async_transcribe(self, pcm_bytes: bytes) -> Optional[str]: + """Stream raw PCM data to Wyoming ASR and return transcript text.""" + # Instantiate the async TCP client + client = AsyncTcpClient(self.host, self.port) + + # Audio parameters + rate = 16000 + width = 2 # 16-bit + channels = 1 + + # The client instance is an async context manager. + async with client: + # 1. Send transcription request + await client.write_event(Transcribe(language=self.lang).event()) + + # 2. Start the audio stream + await client.write_event(AudioStart(rate, width, channels).event()) + + # 3. Send audio chunks + chunk_size = 2048 # A reasonable chunk size + for i in range(0, len(pcm_bytes), chunk_size): + chunk_bytes = pcm_bytes[i:i + chunk_size] + await client.write_event(AudioChunk(audio=chunk_bytes, rate=rate, width=width, channels=channels).event()) + + # 4. Stop the audio stream + await client.write_event(AudioStop().event()) + + # 5. Read events until a transcript arrives + transcript_text = None + try: + while True: + event = await asyncio.wait_for(client.read_event(), timeout=self.timeout) + if event is None: + break + + if Transcript.is_type(event.type): + tr = Transcript.from_event(event) + transcript_text = tr.text + break + except asyncio.TimeoutError: + typer.echo(typer.style("Connection timed out waiting for transcript.", fg=typer.colors.YELLOW)) + + return transcript_text + + def _process_audio(self): + """Process audio chunks with VAD in separate thread.""" + while self.is_running: + try: + audio_chunk = self.audio_queue.get(timeout=0.1) + + # Add to buffer + self.audio_buffer.add_chunk(audio_chunk) + + # Check VAD + speech_detected = self.vad_detector.is_speech(audio_chunk) + + if speech_detected and not self.is_speaking: + # Start of speech + typer.echo(typer.style("šŸŽ¤ Speech detected", fg=typer.colors.GREEN)) + self.is_speaking = True + pre_buffer = self.audio_buffer.start_recording() + + # Start async transcription in background + threading.Thread( + target=self._start_transcription, + daemon=True + ).start() + + elif not speech_detected and self.is_speaking: + # End of speech + typer.echo(typer.style("šŸ”‡ Speech ended", fg=typer.colors.YELLOW)) + self.is_speaking = False + full_recording = self.audio_buffer.stop_recording() + + if len(full_recording) > 0: + # Queue for transcription + self.transcription_queue.put(full_recording) + + except Empty: + continue + except Exception as e: + typer.echo(typer.style(f"Audio processing error: {e}", fg=typer.colors.RED)) + + def _start_transcription(self): + """Handle transcription in background thread.""" + try: + # Wait for audio to be queued + recording = self.transcription_queue.get(timeout=2.0) + + # Convert to PCM16 + audio_int16 = np.clip(recording * 32767.0, -32768, 32767).astype(np.int16) + pcm_bytes = audio_int16.tobytes() + + # Send to Wyoming ASR + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + transcript_text = loop.run_until_complete( + self._async_transcribe(pcm_bytes) + ) + + if transcript_text: + typer.echo(typer.style(f"šŸ“ {transcript_text}", fg=typer.colors.CYAN, bold=True)) + else: + typer.echo(typer.style("āŒ No transcription received", fg=typer.colors.YELLOW)) + finally: + loop.close() + + except Empty: + typer.echo(typer.style("ā° Transcription timeout", fg=typer.colors.YELLOW)) + except Exception as e: + typer.echo(typer.style(f"āŒ Transcription error: {e}", fg=typer.colors.RED)) + + def run(self): + """Run the satellite.""" + typer.echo(typer.style("šŸ›°ļø Starting satellite mode...", fg=typer.colors.BLUE, bold=True)) + typer.echo(f"Listening on default microphone ({self.sample_rate} Hz, {self.channels} ch)") + typer.echo(f"Wyoming server: {self.host}:{self.port} (lang: {self.lang})") + typer.echo("Press Ctrl+C to stop") + typer.echo("=" * 50) + + self.is_running = True + + # Start audio processing thread + audio_thread = threading.Thread(target=self._process_audio, daemon=True) + audio_thread.start() + + try: + # Start audio stream + with sd.InputStream( + callback=self._audio_callback, + samplerate=self.sample_rate, + channels=self.channels, + blocksize=self.chunk_size, + dtype='float32' + ): + # Keep running until interrupted + while self.is_running: + time.sleep(0.1) + + except KeyboardInterrupt: + typer.echo(typer.style("\nšŸ›‘ Stopping satellite...", fg=typer.colors.YELLOW)) + except Exception as e: + typer.echo(typer.style(f"āŒ Satellite error: {e}", fg=typer.colors.RED)) + finally: + self.is_running = False diff --git a/src/wyoming_client/vad.py b/src/wyoming_client/vad.py new file mode 100644 index 0000000..80498e2 --- /dev/null +++ b/src/wyoming_client/vad.py @@ -0,0 +1,76 @@ +# VAD = Voice Activity Detection +from abc import ABC, abstractmethod +import numpy as np +from typing import Optional + + +class VADDetector(ABC): + """Abstract base class for Voice Activity Detection.""" + + @abstractmethod + def is_speech(self, audio_chunk: np.ndarray) -> bool: + """Return True if speech is detected in the audio chunk.""" + pass + + @abstractmethod + def reset(self) -> None: + """Reset internal state.""" + pass + + +class AmplitudeVAD(VADDetector): + """Simple VAD based on RMS amplitude threshold.""" + + def __init__(self, + threshold: float = 0.01, + min_speech_duration: float = 0.1, + min_silence_duration: float = 0.5, + sample_rate: int = 16000): + """ + Initialize amplitude-based VAD. + + Args: + threshold: RMS threshold for speech detection + min_speech_duration: Minimum duration to consider as speech (seconds) + min_silence_duration: Minimum silence to end speech (seconds) + sample_rate: Audio sample rate + """ + self.threshold = threshold + self.min_speech_frames = int(min_speech_duration * sample_rate) + self.min_silence_frames = int(min_silence_duration * sample_rate) + self.sample_rate = sample_rate + + self.speech_frames = 0 + self.silence_frames = 0 + self.is_speaking = False + + def is_speech(self, audio_chunk: np.ndarray) -> bool: + """Detect speech in audio chunk based on RMS amplitude.""" + # Calculate RMS + rms = np.sqrt(np.mean(audio_chunk.astype(np.float32) ** 2)) + + if rms > self.threshold: + # Potential speech detected + self.speech_frames += len(audio_chunk) + self.silence_frames = 0 + + # Start speaking if we have enough speech frames + if not self.is_speaking and self.speech_frames >= self.min_speech_frames: + self.is_speaking = True + + else: + # Silence detected + self.silence_frames += len(audio_chunk) + self.speech_frames = max(0, self.speech_frames - len(audio_chunk) // 2) # Decay + + # Stop speaking if we have enough silence + if self.is_speaking and self.silence_frames >= self.min_silence_frames: + self.is_speaking = False + + return self.is_speaking + + def reset(self) -> None: + """Reset VAD state.""" + self.speech_frames = 0 + self.silence_frames = 0 + self.is_speaking = False \ No newline at end of file