commit 343d1d2f938c81aace20cdc7c7f3490998d169b7 Author: Kodjo Sossouvi Date: Mon Sep 1 23:01:08 2025 +0200 Working on audio commands diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..242b4a2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,204 @@ +__pycache__ +app.egg-info +*.pyc +.mypy_cache +.coverage +htmlcov +.cache +.venv +tests/settings_from_unit_testing.json +tests/TestDBEngineRoot +test-results +.sesskey +tools.db +.mytools_db +.idea/MyManagingTools.iml +.idea/misc.xml +.idea_bak +**/*.prof + +# Created by .ignore support plugin (hsz.mobi) +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject +### VirtualEnv template +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +.venv +pip-selfcheck.json + +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +# idea folder, uncomment if you don't need it +# .idea diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..1c2fda5 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/MyTTSClient.iml b/.idea/MyTTSClient.iml new file mode 100644 index 0000000..308e609 --- /dev/null +++ b/.idea/MyTTSClient.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..4069cea --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..6a56995 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..9661ac7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Prerequisite.md b/Prerequisite.md new file mode 100644 index 0000000..5c0df43 --- /dev/null +++ b/Prerequisite.md @@ -0,0 +1,164 @@ +# WSL2 Audio Configuration for Microphone Access + +This guide explains how to configure Windows microphone access from WSL2 for the MyTTSClient project. + +## ๐ŸŽฏ Objective + +Enable a Python application in WSL2 to access the Windows microphone for audio recording and transmission to a Wyoming server. + +## ๐Ÿ“‹ Prerequisites + +- **Windows 10 (Build 19041+)** or **Windows 11** +- **WSL2** installed and configured +- **Linux Distribution** (Ubuntu 20.04+ recommended) +- **Microphone** connected and working in Windows + +## ๐Ÿ”ง Step-by-step Configuration + +### 1. Windows Permissions + +#### 1.1 Enable Microphone Access +1. Open **Windows Settings** (`Win + I`) +2. Navigate to **Privacy & security** โ†’ **Microphone** +3. Enable **"Let apps access your microphone"** +4. Enable **"Let desktop apps access your microphone"** + +### 2. WSL2 Configuration + +#### 2.1 Update WSL2 +```bash +# In PowerShell (Administrator) +wsl --update +wsl --shutdown +# Restart WSL2 +``` + +#### 2.2 Install Audio Dependencies in WSL2 +```bash +# In your WSL2 terminal +sudo apt update +sudo apt install pulseaudio pulseaudio-utils +``` + +#### 2.3 Verify PulseAudio Connection +```bash +# Check connection to Windows server +pulseaudio --check -v +``` + +**Expected Results:** +- โœ… `N: [pulseaudio] main.c: User-configured server at unix:/mnt/wslg/PulseServer, refusing to start/autospawn.` +- โŒ `E: [pulseaudio] core-util.c: Failed to create secure directory` (see Troubleshooting section) + +#### 2.4 Environment Variables (if needed) +```bash +# Add to ~/.bashrc if issues persist +export PULSE_SERVER=unix:/mnt/wslg/PulseServer +export PULSE_RUNTIME_PATH="/mnt/wslg/runtime-dir/pulse" + +# Reload +source ~/.bashrc +``` + +### 3. Test Audio Devices + +#### 3.1 List Audio Sources (microphones) +```bash +pactl list sources short +``` + +**Example Output:** +``` +1 RDPSink.monitor module-rdp-sink.c s16le 2ch 44100Hz SUSPENDED +2 RDPSource module-rdp-source.c s16le 1ch 44100Hz SUSPENDED +``` + +#### 3.2 Test with Python +```bash +# Install Python dependencies +pip install sounddevice numpy + +# Test devices +python -c "import sounddevice; print(sounddevice.query_devices())" +``` + +**Example Output:** +``` + 0 pulse, ALSA (32 in, 32 out) +* 1 default, ALSA (32 in, 32 out) +``` + +## ๐Ÿ” Diagnostics and Troubleshooting + +### Issue: "Permission denied" on /mnt/wslg/runtime-dir/pulse + +**Solution:** +```bash +sudo mkdir -p /mnt/wslg/runtime-dir/pulse +sudo chown $USER:$USER /mnt/wslg/runtime-dir/pulse +chmod 700 /mnt/wslg/runtime-dir/pulse +``` + +### Issue: "Error querying device -1" + +**Cause:** PulseAudio cannot find audio devices + +**Solutions:** +1. Check Windows permissions (step 1.1) +2. Restart WSL2: `wsl --shutdown` then relaunch +3. Verify environment variables (step 2.4) + +### Issue: No input devices detected + +**Solutions:** +1. Test microphone directly in Windows +2. Ensure microphone is not used by another application +3. Restart Windows and WSL2 + +### Issue: "RDPSource" instead of real microphone + +**Explanation:** WSL2 uses RDP (Remote Desktop Protocol) to tunnel audio. `RDPSource` **IS** your Windows microphone. + +## ๐Ÿงช Validation Tests + +### Test 1: PulseAudio Connection +```bash +pulseaudio --check -v +# Should display: "User-configured server at unix:/mnt/wslg/PulseServer" +``` + +### Test 2: Available Devices +```bash +pactl list sources short +# Should list at least RDPSource +``` + +### Test 3: Python Recording +```bash +python main.py +# Should display device list and allow recording +``` + +## ๐Ÿ“š Additional Resources + +- [Official WSL Audio Documentation](https://docs.microsoft.com/en-us/windows/wsl/) +- [PulseAudio Documentation](https://www.freedesktop.org/wiki/Software/PulseAudio/) +- [WSL Audio Issues on GitHub](https://github.com/microsoft/WSL/issues?q=audio) + +## โš ๏ธ Known Limitations + +1. **Audio Latency**: WSL2 may introduce additional latency +2. **Audio Quality**: RDP compression may affect quality +3. **Permissions**: Windows Updates may reset permissions +4. **Multi-user**: Per-Windows-user configuration + +## ๐Ÿ”„ Updates + +This guide is based on WSL2 version 2.0+ and Windows 11. For earlier versions, additional steps may be required. + +--- + +**Version:** 1.0 +**Last Updated:** 2025-08-31 +**Tested On:** Windows 11, WSL2 Ubuntu 22.04 +``` diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..1b3fac0 --- /dev/null +++ b/config.yaml @@ -0,0 +1,12 @@ +# MyTTSClient Configuration File +# This file provides default values. Environment variables will override these settings. + +server: + host: "127.0.0.1" + port: 10300 + timeout: 3.0 + +audio: + sample_rate: 16000 + channels: 1 + device: null # null = use default device \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..174747e --- /dev/null +++ b/main.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +""" +MyTTSClient - Wyoming audio recording and transcription client + +Usage: + python main.py server check + python main.py audio list [--inputs] [--outputs] [--pulse] + python main.py audio test [--device ID] [--duration N] [--save] + python main.py audio config +""" + +from src.cli import app + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cli.py b/src/cli.py new file mode 100644 index 0000000..ade61d8 --- /dev/null +++ b/src/cli.py @@ -0,0 +1,22 @@ +import typer +from .commands import server, audio + +app = typer.Typer(help="MyTTSClient - Wyoming audio recording and transcription client") + +# Create subcommands +server_app = typer.Typer(help="Wyoming server operations") +audio_app = typer.Typer(help="Audio device operations") + +# Add commands to subcommands +server_app.command("check")(server.check) +audio_app.command("list")(audio.list_devices) +audio_app.command("test")(audio.test_device) +audio_app.command("config")(audio.config_info) + +# Register subcommands +app.add_typer(server_app, name="server") +app.add_typer(audio_app, name="audio") + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/commands/__init__.py b/src/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/commands/audio.py b/src/commands/audio.py new file mode 100644 index 0000000..8dcd573 --- /dev/null +++ b/src/commands/audio.py @@ -0,0 +1,797 @@ +import sounddevice as sd +import subprocess +import typer +import numpy as np +import os +import json +from typing import Optional, List, Dict, Any, Tuple +from dataclasses import dataclass + +from rich.console import Console +from rich.table import Table + +from ..utils import SortConf, sort_by, FilterConf, filter_by +from ..config import AppConfig + + +@dataclass +class WindowsAudioDevice: + """Windows audio device data structure.""" + name: str + status: str + device_type: str # "Input", "Output", "Unknown" + instance_id: str + + +@dataclass +class LinuxAudioDevice: + """Linux audio device data structure.""" + device_id: int + name: str + max_input_channels: int + max_output_channels: int + default_samplerate: float + hostapi_name: str + + +def list_devices( + device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), + pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), + native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), + sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), + desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), + filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """List audio devices.""" + + # Load configuration for context + config = AppConfig.load(config_file) + + # Determine title based on environment and options + is_wsl = detect_wsl() + use_windows = is_wsl and not native + sort_config = SortConf(sort_by, not desc) if sort_by else None + filter_config = FilterConf.new(filter_by) if filter_by else None + if use_windows: + typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", + fg=typer.colors.BLUE)) + windows_devices = get_windows_audio_devices() + display_windows_devices(windows_devices, sort_config, filter_config) + + else: + linux_devices = get_linux_audio_devices() + display_linux_devices(linux_devices, config, pulse, sort_config) + + # Show legend + typer.echo(f"\nLegend:") + if use_windows: + typer.echo(typer.style(" โœ“ = Device working", fg=typer.colors.GREEN)) + typer.echo(typer.style(" โœ— = Device problem", fg=typer.colors.RED)) + else: + typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) + typer.echo(f" IN/OUT = Input/Output channel count") + + +def detect_wsl() -> bool: + """Detect if running under Windows Subsystem for Linux.""" + try: + # Method 1: Check /proc/version for Microsoft + if os.path.exists('/proc/version'): + with open('/proc/version', 'r') as f: + version_info = f.read().lower() + if 'microsoft' in version_info or 'wsl' in version_info: + return True + + # Method 2: Check WSL environment variables + if os.getenv('WSL_DISTRO_NAME') or os.getenv('WSLENV'): + return True + + # Method 3: Check if powershell.exe is available + try: + subprocess.run(['powershell.exe', '-Command', 'echo test'], + capture_output=True, timeout=2, check=True) + return True + except (subprocess.SubprocessError, FileNotFoundError): + pass + + return False + except Exception: + return False + + +def get_windows_audio_devices_com() -> List[WindowsAudioDevice]: + """ + Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell. + Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow. + """ + ps_script = r""" +[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; + +$code = @" +using System; +using System.Runtime.InteropServices; +using System.Collections.Generic; + +[ComImport] +[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] +class MMDeviceEnumerator {} + +[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IMMDeviceEnumerator { + int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices); + // Other methods not used are omitted +} + +enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 } + +[Flags] +enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF } + +[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IMMDeviceCollection { + int GetCount(out int pcDevices); + int Item(int nDevice, out IMMDevice ppDevice); +} + +[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IMMDevice { + int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface); + int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties); + int GetId(out string ppstrId); + int GetState(out int pdwState); +} + +[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IPropertyStore { + int GetCount(out int cProps); + int GetAt(int iProp, out PROPERTYKEY pkey); + int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv); + int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar); + int Commit(); +} + +[StructLayout(LayoutKind.Sequential)] +struct PROPERTYKEY { + public Guid fmtid; + public int pid; +} + +[StructLayout(LayoutKind.Explicit)] +struct PROPVARIANT { + [FieldOffset(0)] public ushort vt; + [FieldOffset(8)] public IntPtr pointerValue; +} + +static class PKEY { + // PKEY_Device_FriendlyName + public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY { + fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14 + }; +} + +public class DeviceInfo { + public string Name { get; set; } + public string Id { get; set; } + public string Type { get; set; } // "Input" or "Output" + public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown" +} + +public static class CoreAudioEnumerator { + [DllImport("ole32.dll")] + private static extern int PropVariantClear(ref PROPVARIANT pvar); + + private static string ReadString(IPropertyStore store, PROPERTYKEY key) { + PROPVARIANT pv = new PROPVARIANT(); + try { + store.GetValue(ref key, out pv); + // VT_LPWSTR = 31 + if (pv.vt == 31) { + return Marshal.PtrToStringUni(pv.pointerValue); + } + return null; + } finally { + PropVariantClear(ref pv); + } + } + + public static List GetAll() { + var result = new List(); + var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); + + foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) { + IMMDeviceCollection coll; + enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll); + + int count; + coll.GetCount(out count); + + for (int i = 0; i < count; i++) { + IMMDevice dev; + coll.Item(i, out dev); + + string id; + dev.GetId(out id); + + int state; + dev.GetState(out state); + + IPropertyStore store; + dev.OpenPropertyStore(0 /* STGM_READ */, out store); + string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)"; + + string type = flow == EDataFlow.eCapture ? "Input" : "Output"; + + string status = + (state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" : + (state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" : + (state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" : + (state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" : + "Unknown"; + + result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status }); + } + } + + return result; + } +} +"@ + +Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop + +$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object { + [PSCustomObject]@{ + FriendlyName = $_.Name + InstanceId = $_.Id + Type = $_.Type + Status = $_.Status + } +} + +$devices | ConvertTo-Json +""" + try: + result = subprocess.run( + [ + "powershell.exe", + "-NoProfile", + "-NonInteractive", + "-Command", + ps_script, + ], + capture_output=True, + text=True, + encoding="utf-8", + timeout=30, + ) + + if result.returncode != 0: + err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}" + typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED)) + return [] + + output = result.stdout.strip() + if not output: + typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW)) + return [] + + data = json.loads(output) + items = data if isinstance(data, list) else [data] + + devices: List[WindowsAudioDevice] = [] + for it in items: + name = (it.get("FriendlyName") or "").strip() + instance_id = it.get("InstanceId") or "" + dev_type = it.get("Type") or "Unknown" + status = it.get("Status") or "Unknown" + + if not name: + continue + + devices.append( + WindowsAudioDevice( + name=name, + status=status, + device_type=dev_type, + instance_id=instance_id, + ) + ) + + return devices + + except json.JSONDecodeError as e: + typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED)) + return [] + except subprocess.TimeoutExpired: + typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED)) + return [] + except Exception as e: + typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}", + fg=typer.colors.RED)) + return [] + + +def get_windows_default_devices() -> Dict[str, str]: + """Get Windows default audio devices via PowerShell.""" + ps_script = r""" +[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; + +$code = @" +using System; +using System.Runtime.InteropServices; + +[ComImport] +[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] +class MMDeviceEnumerator {} + +[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IMMDeviceEnumerator { + int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint); +} + +enum EDataFlow { eRender = 0, eCapture = 1 } +enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 } + +[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] +interface IMMDevice { + int GetId(out string ppstrId); +} + +public static class DefaultDeviceHelper { + public static string GetDefaultInputDevice() { + try { + var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); + IMMDevice device; + enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device); + string id; + device.GetId(out id); + return id; + } catch { return null; } + } + + public static string GetDefaultOutputDevice() { + try { + var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); + IMMDevice device; + enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device); + string id; + device.GetId(out id); + return id; + } catch { return null; } + } +} +"@ + +Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop + +$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice() +$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice() + +[PSCustomObject]@{ + DefaultInput = $defaultInput + DefaultOutput = $defaultOutput +} | ConvertTo-Json +""" + + try: + result = subprocess.run([ + "powershell.exe", + "-NoProfile", + "-NonInteractive", + "-Command", + ps_script, + ], capture_output=True, text=True, encoding="utf-8", timeout=15) + + if result.returncode != 0: + return {"default_input": None, "default_output": None} + + if not result.stdout.strip(): + return {"default_input": None, "default_output": None} + + data = json.loads(result.stdout) + return { + "default_input": data.get("DefaultInput"), + "default_output": data.get("DefaultOutput") + } + + except Exception: + return {"default_input": None, "default_output": None} + + +def get_windows_audio_devices() -> List[WindowsAudioDevice]: + """Get detailed Windows audio devices via PowerShell from WSL.""" + try: + result = subprocess.run([ + "powershell.exe", + "-NoProfile", + "-NonInteractive", + "-Command", + # Force UTF-8 output from PowerShell before running the command + "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; " + "Get-PnpDevice -Class AudioEndpoint | " + "Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" + ], capture_output=True, text=True, encoding="utf-8", timeout=15) + + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}" + typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED)) + return [] + + if not result.stdout.strip(): + typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) + return [] + + data = json.loads(result.stdout) + devices = data if isinstance(data, list) else [data] + + windows_devices = [] + for device in devices: + name = device.get('FriendlyName', '').strip() + status = device.get('Status', 'Unknown') + instance_id = device.get('InstanceId', '') + + if not name: + continue + + # Determine device type from InstanceId + device_type = _categorize_device_type(instance_id) + + windows_devices.append(WindowsAudioDevice( + name=name, + status=status, + device_type=device_type, + instance_id=instance_id + )) + + return windows_devices + + except json.JSONDecodeError as e: + typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED)) + return [] + except subprocess.TimeoutExpired: + typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED)) + return [] + except Exception as e: + typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED)) + return [] + + +def _categorize_device_type(instance_id: str) -> str: + """Categorize device type based on InstanceId.""" + instance_lower = instance_id.lower() + if '0.0.1.00000000' in instance_lower: + return "Input" + elif '0.0.0.00000000' in instance_lower: + return "Output" + else: + return "Unknown" + + +def get_linux_audio_devices() -> List[LinuxAudioDevice]: + """Get Linux audio devices using sounddevice.""" + try: + devices = sd.query_devices() + linux_devices = [] + + for i, device in enumerate(devices): + hostapi_name = sd.query_hostapis(device['hostapi'])['name'] + + linux_devices.append(LinuxAudioDevice( + device_id=i, + name=device['name'], + max_input_channels=device['max_input_channels'], + max_output_channels=device['max_output_channels'], + default_samplerate=device['default_samplerate'], + hostapi_name=hostapi_name + )) + + return linux_devices + except Exception as e: + typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True)) + raise typer.Exit(1) + + +def display_windows_devices(devices: List[WindowsAudioDevice], + sort_conf: Optional[SortConf] = None, + filter_conf: Optional[FilterConf] = None + ) -> None: + """Display Windows audio devices with optimized formatting.""" + + def _get_id(_instance_id): + return _instance_id[31:39] + + if not devices: + typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED)) + return + + # Get default devices + default_devices = get_windows_default_devices() + default_input_id = default_devices.get("default_input") + default_output_id = default_devices.get("default_output") + + if not devices: + typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED)) + + # Create rich table + table = Table(show_header=True) + table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis") + table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis") + table.add_column("Type", width=8, justify="center") + table.add_column("Status", width=3, justify="center") + table.add_column("*", width=2, justify="center") + + # Add devices to table + rows = [] # list of dict + for device in devices: + # Determine if device is selected (default) + is_selected = False + if device.device_type == "Input" and device.instance_id == default_input_id: + is_selected = True + elif device.device_type == "Output" and device.instance_id == default_output_id: + is_selected = True + + selected_marker = "*" if is_selected else "" + + # Color coding for status + if device.status == "OK": + status_text = f"[green]{device.status}[/green]" + else: + status_text = f"[red]{device.status}[/red]" + + # Style selected row + if is_selected: + row_style = "bold" + id_text = f"[bold]{_get_id(device.instance_id)}[/bold]" + name_text = f"[bold]{device.name}[/bold]" + type_text = f"[bold]{device.device_type}[/bold]" + selected_text = f"[bold yellow]{selected_marker}[/bold yellow]" + else: + row_style = None + id_text = _get_id(device.instance_id) + name_text = device.name + type_text = device.device_type + selected_text = selected_marker + + rows.append({ + "id": id_text, + "name": name_text, + "type": type_text, + "status": status_text, + "selected": selected_text + }) + + rows = sort_by(rows, sort_conf) + rows = filter_by(rows, filter_conf) + + for row in rows: + table.add_row(*row.values()) + + # Display table + console = Console() + console.print(table) + + +def display_linux_devices(devices: List[LinuxAudioDevice], + config: AppConfig, + show_pulse: bool = False, + sort_conf: Optional[SortConf] = None) -> None: + """Display Linux audio devices with existing formatting logic.""" + + if not devices: + typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED)) + return + + # Display header + typer.echo(f"\nFound {typer.style(str(len(devices)), fg=typer.colors.CYAN, bold=True)} devices:") + typer.echo(f"{'ID':<3} {'Name':<35} {'IN':<3} {'OUT':<3} {'Rate':<8} {'Host API'}") + typer.echo("-" * 70) + + # Display devices + for device in devices: + rate = int(device.default_samplerate) + is_configured = (config.audio.device == device.device_id) + marker = " *" if is_configured else "" + + device_line = (f"{device.device_id:<3} {device.name:<35} {device.max_input_channels:<3} " + f"{device.max_output_channels:<3} {rate:<8} {device.hostapi_name}{marker}") + typer.echo(typer.style(device_line, bold=is_configured)) + + # Show default devices info + typer.echo(f"\nDefault devices:") + try: + default_device = sd.default.device + default_input = int(default_device[0]) + default_output = int(default_device[1]) + + input_device = next((d for d in devices if d.device_id == default_input), None) + if input_device: + typer.echo(f" Input: [{default_input}] {input_device.name}") + output_device = next((d for d in devices if d.device_id == default_output), None) + if output_device: + typer.echo(f" Output: [{default_output}] {output_device.name}") + except Exception as e: + typer.echo(typer.style(f" Error getting defaults: {e}", fg=typer.colors.RED)) + + # Show PulseAudio sources if requested + if show_pulse: + typer.echo(f"\nPulseAudio sources:") + pulse_sources = get_pulseaudio_sources() + if pulse_sources: + for source in pulse_sources: + typer.echo(f" {source['id']:2}: {source['name']}") + else: + typer.echo(typer.style(" Could not retrieve PulseAudio sources", fg=typer.colors.RED)) + + +# Legacy function for backwards compatibility +def get_audio_devices(): + """Get comprehensive information about audio devices.""" + try: + devices = sd.query_devices() + return devices + except Exception as e: + typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True)) + raise typer.Exit(1) + + +def get_pulseaudio_sources(): + """Get PulseAudio sources for comparison.""" + try: + result = subprocess.run(['pactl', 'list', 'sources', 'short'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + sources = [] + for line in result.stdout.strip().split('\n'): + if line.strip(): + parts = line.split('\t') + if len(parts) >= 2: + sources.append({'id': parts[0], 'name': parts[1]}) + return sources + else: + return None + except Exception: + return None + + +def test_device( + device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"), + duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"), + save: bool = typer.Option(False, "--save", help="Save recording to WAV file"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """Test audio recording from a specific device.""" + + # Load configuration + config = AppConfig.load(config_file) + + # Determine device to use + test_device_id = device if device is not None else config.audio.device + + typer.echo(typer.style("Audio Device Test", fg=typer.colors.BLUE, bold=True)) + typer.echo("=" * 20) + + # Get device info + devices_list = get_linux_audio_devices() + + if test_device_id is not None: + if test_device_id >= len(devices_list): + typer.echo(typer.style(f"Error: Device {test_device_id} not found!", fg=typer.colors.RED)) + raise typer.Exit(1) + + device_info = devices_list[test_device_id] + if device_info['max_input_channels'] == 0: + typer.echo(typer.style(f"Error: Device {test_device_id} has no input channels!", fg=typer.colors.RED)) + raise typer.Exit(1) + + typer.echo(f"Testing device [{test_device_id}]: {device_info['name']}") + else: + typer.echo("Testing default input device") + + typer.echo(f"Recording for {duration} seconds...") + typer.echo("Starting in 3... 2... 1...") + + try: + # Record audio + typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True)) + + audio_data = sd.rec( + int(duration * config.audio.sample_rate), + samplerate=config.audio.sample_rate, + channels=config.audio.channels, + device=test_device_id, + dtype='float32' + ) + sd.wait() + + # Analyze the recording + audio_int16 = (audio_data.flatten() * 32767).astype(np.int16) + max_amplitude = np.max(np.abs(audio_int16)) + rms_level = np.sqrt(np.mean(audio_int16.astype(np.float32) ** 2)) + + typer.echo(typer.style("Recording completed!", fg=typer.colors.GREEN)) + typer.echo(f"\nAnalysis:") + typer.echo(f" Duration: {duration:.1f}s") + typer.echo(f" Samples: {len(audio_int16)}") + typer.echo(f" Max amplitude: {max_amplitude} / 32767 ({max_amplitude / 32767 * 100:.1f}%)") + typer.echo(f" RMS level: {rms_level:.1f}") + + # Signal quality assessment + if max_amplitude < 100: + typer.echo(typer.style(" Status: Very low signal - check microphone", fg=typer.colors.RED)) + elif max_amplitude < 1000: + typer.echo(typer.style(" Status: Low signal - may need to speak louder", fg=typer.colors.YELLOW)) + elif max_amplitude > 30000: + typer.echo(typer.style(" Status: Very high signal - may be clipping", fg=typer.colors.YELLOW)) + else: + typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN)) + + # Save if requested + if save: + filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav" + # Note: WyomingAudioRecorder is not defined in the current code + # This would need to be implemented or the save functionality modified + typer.echo(f" Save functionality needs to be implemented") + + except Exception as e: + typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True)) + raise typer.Exit(1) + + +def config_info( + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """Show current audio configuration.""" + + # Load configuration + config = AppConfig.load(config_file) + + typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True)) + typer.echo("=" * 21) + + # Show current settings + typer.echo(f"\nCurrent settings:") + typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}") + typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}") + + if config.audio.device is not None: + typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}") + + # Show device details + try: + devices_list = get_linux_audio_devices() + if config.audio.device < len(devices_list): + device = devices_list[config.audio.device] + typer.echo(f"\nConfigured device details:") + typer.echo(f" Name: {device['name']}") + typer.echo(f" Input channels: {device['max_input_channels']}") + typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz") + + if device['max_input_channels'] == 0: + typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED)) + else: + typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!", + fg=typer.colors.RED, bold=True)) + except Exception as e: + typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED)) + else: + typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}") + + # Show default device info + try: + default_device = sd.default.device + if hasattr(default_device, '__len__') and len(default_device) >= 2: + default_input = int(default_device[0]) + else: + default_input = int(default_device) + + devices_list = get_linux_audio_devices() + device = devices_list[default_input] + typer.echo(f"\nDefault input device:") + typer.echo(f" ID: {default_input}") + typer.echo(f" Name: {device['name']}") + typer.echo(f" Input channels: {device['max_input_channels']}") + except Exception as e: + typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED)) + + # Show configuration source info diff --git a/src/commands/server.py b/src/commands/server.py new file mode 100644 index 0000000..a99bd4e --- /dev/null +++ b/src/commands/server.py @@ -0,0 +1,60 @@ +import socket +import time +from contextlib import closing +import typer +from typing import Optional + +from ..config import AppConfig + + +def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]: + """ + Attempts a plain TCP connection to the given host/port to check if a Wyoming server is reachable. + + Returns a tuple: (is_reachable, connect_latency_seconds, error_message) + - is_reachable: True if a TCP connection could be established, False otherwise + - connect_latency_seconds: time taken to connect (float) if successful, else None + - error_message: error details if connection failed, else None + """ + start = time.perf_counter() + try: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.settimeout(timeout) + s.connect((host, port)) + latency = time.perf_counter() - start + return True, latency, None + except Exception as e: + return False, None, f"{type(e).__name__}: {e}" + + +def check( + host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"), + port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"), + timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """Check Wyoming server connectivity.""" + + # Load configuration + config = AppConfig.load(config_file) + + # Override with command line arguments + final_host = host or config.server.host + final_port = port or config.server.port + final_timeout = timeout or config.server.timeout + + typer.echo("Checking Wyoming server connectivity...") + typer.echo(f"Target: {final_host}:{final_port} (timeout: {final_timeout}s)") + typer.echo("=" * 50) + + is_reachable, latency, error = check_wyoming_server(final_host, final_port, final_timeout) + + if is_reachable: + typer.echo(typer.style("Wyoming server reachable!", fg=typer.colors.GREEN, bold=True)) + typer.echo(f"Connection latency: {typer.style(f'{latency * 1000:.1f}ms', fg=typer.colors.CYAN)}") + typer.echo(f"Server: {final_host}:{final_port}") + else: + typer.echo(typer.style("Wyoming server unreachable!", fg=typer.colors.RED, bold=True)) + typer.echo(f"Server: {final_host}:{final_port}") + typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED)) + raise typer.Exit(1) \ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..1c0fad7 --- /dev/null +++ b/src/config.py @@ -0,0 +1,136 @@ +import os +import yaml +import typer +from pathlib import Path +from typing import Optional, Dict, Any +from dataclasses import dataclass + + +@dataclass +class ServerConfig: + """Wyoming server configuration.""" + host: str = "127.0.0.1" + port: int = 10300 + timeout: float = 3.0 + + +@dataclass +class AudioConfig: + """Audio recording configuration.""" + sample_rate: int = 16000 + channels: int = 1 + device: Optional[int] = None + + +@dataclass +class AppConfig: + """Main application configuration.""" + server: ServerConfig + audio: AudioConfig + + @classmethod + def load(cls, config_file: Optional[str] = None) -> 'AppConfig': + """ + Load configuration from file, environment variables, and defaults. + + Priority order: + 1. Environment variables (highest) + 2. Config file + 3. Defaults (lowest) + + Args: + config_file: Path to YAML config file. If None, looks for 'config.yaml' + + Returns: + AppConfig instance with merged configuration + """ + # Start with defaults + config_data = { + 'server': { + 'host': '127.0.0.1', + 'port': 10300, + 'timeout': 3.0 + }, + 'audio': { + 'sample_rate': 16000, + 'channels': 1, + 'device': None + } + } + + # Load from config file + if config_file is None: + config_file = 'config.yaml' + + config_path = Path(config_file) + if config_path.exists(): + try: + with open(config_path, 'r', encoding='utf-8') as f: + file_config = yaml.safe_load(f) or {} + + # Merge file config + cls._merge_config(config_data, file_config) + except Exception as e: + typer.echo(typer.style(f"Warning: Could not load config file {config_file}: {e}", + fg=typer.colors.YELLOW)) + + # Override with environment variables + env_overrides = { + 'server': { + 'host': os.getenv('WYOMING_HOST'), + 'port': cls._parse_int_env('WYOMING_PORT'), + 'timeout': cls._parse_float_env('WYOMING_TIMEOUT') + }, + 'audio': { + 'sample_rate': cls._parse_int_env('AUDIO_SAMPLE_RATE'), + 'channels': cls._parse_int_env('AUDIO_CHANNELS'), + 'device': cls._parse_int_env('AUDIO_DEVICE') + } + } + + # Apply env overrides + cls._merge_config(config_data, env_overrides, skip_none=True) + + # Create config objects + server_config = ServerConfig(**config_data['server']) + audio_config = AudioConfig(**config_data['audio']) + + return cls(server=server_config, audio=audio_config) + + @staticmethod + def _merge_config(base: Dict[str, Any], override: Dict[str, Any], skip_none: bool = False) -> None: + """Merge override config into base config.""" + for key, value in override.items(): + if skip_none and value is None: + continue + + if key in base and isinstance(base[key], dict) and isinstance(value, dict): + AppConfig._merge_config(base[key], value, skip_none) + else: + base[key] = value + + @staticmethod + def _parse_int_env(env_var: str) -> Optional[int]: + """Parse integer from environment variable.""" + value = os.getenv(env_var) + if value is None: + return None + try: + return int(value) + except ValueError: + typer.echo(typer.style(f"Warning: Invalid integer value for {env_var}: {value}", + fg=typer.colors.YELLOW)) + return None + + @staticmethod + def _parse_float_env(env_var: str) -> Optional[float]: + """Parse float from environment variable.""" + value = os.getenv(env_var) + if value is None: + return None + try: + return float(value) + except ValueError: + typer.echo(typer.style(f"Warning: Invalid float value for {env_var}: {value}", + fg=typer.colors.YELLOW)) + return None \ No newline at end of file diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..ccd15a9 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,66 @@ +from dataclasses import dataclass + + +@dataclass +class SortConf: + property: str + ascending: bool = True + + +@dataclass +class FilterConf: + property: str + value: str + + @staticmethod + def new(text): + """Initialize FilterConf from text.""" + if text is None: + return None + + parts = text.split("=") + if len(parts) != 2: + return None + + return FilterConf(parts[0].strip(), parts[1].strip()) + + +def sort_by(items, sort_conf: SortConf): + """Sort a list of items by a given property.""" + if sort_conf is None: + return items + + if len(items) == 0: + return items + + if isinstance(items[0], dict): + property_name = sort_conf.property.lower().strip() + return sorted(items, key=lambda item: item.get(property_name), reverse=not sort_conf.ascending) + else: + return sorted(items, key=lambda item: getattr(item, sort_conf.property), reverse=not sort_conf.ascending) + + +def filter_by(items, filter_conf: FilterConf): + """Filter a list of items by a given property.""" + + def _contains(item, value): + return str(value).lower() in str(item).lower() + + if (filter_conf is None + or filter_conf.property is None + or len(items) == 0): + return items + + predicate = _contains + + if isinstance(items[0], dict): + property_name = filter_conf.property.lower().strip() + return [ + item for item in items if predicate(item.get(property_name), filter_conf.value) + ] + + else: + + return [ + item for item in items if getattr(item, filter_conf.property) == filter_conf.value + ] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_audio_devices.py b/tests/test_audio_devices.py new file mode 100644 index 0000000..9fb2b80 --- /dev/null +++ b/tests/test_audio_devices.py @@ -0,0 +1,407 @@ +import pytest +from unittest.mock import Mock, patch, mock_open, MagicMock +import json +import subprocess +from pathlib import Path + +from src.commands.audio import ( + detect_wsl, + get_windows_audio_devices, + get_linux_audio_devices, + _categorize_device_type, + display_windows_devices, + display_linux_devices, + WindowsAudioDevice, + LinuxAudioDevice +) +from src.config import AppConfig, AudioConfig, ServerConfig + + +# WSL Detection Tests + +@patch('pathlib.Path.exists') +@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-microsoft-standard')) +def test_detect_wsl_via_proc_version_microsoft(mock_exists): + """Test WSL detection via /proc/version containing 'microsoft'.""" + mock_exists.return_value = True + + result = detect_wsl() + + assert result is True + + +@patch('pathlib.Path.exists') +@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-wsl2-standard')) +def test_detect_wsl_via_proc_version_wsl(mock_exists): + """Test WSL detection via /proc/version containing 'wsl'.""" + mock_exists.return_value = True + + result = detect_wsl() + + assert result is True + + +@patch('pathlib.Path.exists') +@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) +@patch('os.getenv') +def test_detect_wsl_via_env_distro_name(mock_getenv, mock_exists): + """Test WSL detection via WSL_DISTRO_NAME environment variable.""" + mock_exists.return_value = True + mock_getenv.side_effect = lambda key: 'Ubuntu' if key == 'WSL_DISTRO_NAME' else None + + result = detect_wsl() + + assert result is True + + +@patch('pathlib.Path.exists') +@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) +@patch('os.getenv') +def test_detect_wsl_via_env_wslenv(mock_getenv, mock_exists): + """Test WSL detection via WSLENV environment variable.""" + mock_exists.return_value = True + mock_getenv.side_effect = lambda key: 'PATH/l' if key == 'WSLENV' else None + + result = detect_wsl() + + assert result is True + + +@patch('pathlib.Path.exists') +@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) +@patch('os.getenv', return_value=None) +def test_detect_wsl_false_native_linux(mock_getenv, mock_exists): + """Test WSL detection returns False on native Linux.""" + mock_exists.return_value = True + + result = detect_wsl() + + assert result is False + + +@patch('pathlib.Path.exists', return_value=False) +@patch('os.getenv', return_value=None) +def test_detect_wsl_false_no_proc_version(mock_getenv, mock_exists): + """Test WSL detection returns False when /proc/version doesn't exist.""" + result = detect_wsl() + + assert result is False + + +@patch('pathlib.Path.exists') +@patch('builtins.open', side_effect=Exception("Permission denied")) +@patch('os.getenv', return_value=None) +def test_detect_wsl_handles_proc_version_read_error(mock_getenv, mock_exists): + """Test WSL detection handles /proc/version read errors gracefully.""" + mock_exists.return_value = True + + result = detect_wsl() + + assert result is False + + +# Device Type Categorization Tests + +def test_categorize_input_device(): + """Test categorization of input device.""" + instance_id = "SWD\\MMDEVAPI\\{0.0.1.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\capture" + + result = _categorize_device_type(instance_id) + + assert result == "Input" + + +def test_categorize_output_device(): + """Test categorization of output device.""" + instance_id = "SWD\\MMDEVAPI\\{0.0.0.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\render" + + result = _categorize_device_type(instance_id) + + assert result == "Output" + + +def test_categorize_unknown_device(): + """Test categorization of unknown device type.""" + instance_id = "SWD\\MMDEVAPI\\{0.0.2.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\unknown" + + result = _categorize_device_type(instance_id) + + assert result == "Unknown" + + +def test_categorize_empty_instance_id(): + """Test categorization with empty instance ID.""" + result = _categorize_device_type("") + + assert result == "Unknown" + + +# Windows Audio Devices Tests + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_success_single_device(mock_echo, mock_run): + """Test successful retrieval of single Windows device.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps({ + "FriendlyName": "Blue Yeti Microphone", + "Status": "OK", + "InstanceId": "USB\\VID_0B05&PID_1234\\capture" + }) + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert len(result) == 1 + assert result[0].name == "Blue Yeti Microphone" + assert result[0].status == "OK" + assert result[0].device_type == "Input" + assert "USB\\VID_0B05&PID_1234\\capture" in result[0].instance_id + + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_success_multiple_devices(mock_echo, mock_run): + """Test successful retrieval of multiple Windows devices.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps([ + { + "FriendlyName": "Microphone", + "Status": "OK", + "InstanceId": "capture_device" + }, + { + "FriendlyName": "Speakers", + "Status": "OK", + "InstanceId": "render_device" + } + ]) + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert len(result) == 2 + assert result[0].device_type == "Input" + assert result[1].device_type == "Output" + + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_powershell_error(mock_echo, mock_run): + """Test PowerShell command failure.""" + mock_result = Mock() + mock_result.returncode = 1 + mock_result.stdout = "" + mock_result.stderr = "Get-PnpDevice : Access denied" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert result == [] + mock_echo.assert_called() + + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_empty_output(mock_echo, mock_run): + """Test PowerShell returning empty output.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "" + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert result == [] + mock_echo.assert_called() + + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_invalid_json(mock_echo, mock_run): + """Test invalid JSON response from PowerShell.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "Invalid JSON {" + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert result == [] + mock_echo.assert_called() + + +@patch('subprocess.run', side_effect=subprocess.TimeoutExpired("powershell.exe", 15)) +@patch('typer.echo') +def test_get_windows_devices_timeout(mock_echo, mock_run): + """Test PowerShell command timeout.""" + result = get_windows_audio_devices() + + assert result == [] + mock_echo.assert_called() + + +@patch('subprocess.run') +@patch('typer.echo') +def test_get_windows_devices_filters_empty_names(mock_echo, mock_run): + """Test filtering of devices with empty names.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps([ + { + "FriendlyName": "Valid Device", + "Status": "OK", + "InstanceId": "capture_device" + }, + { + "FriendlyName": "", + "Status": "OK", + "InstanceId": "empty_name_device" + }, + { + "FriendlyName": None, + "Status": "OK", + "InstanceId": "null_name_device" + } + ]) + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = get_windows_audio_devices() + + assert len(result) == 1 + assert result[0].name == "Valid Device" + + +# Linux Audio Devices Tests + +@patch('sounddevice.query_devices') +@patch('sounddevice.query_hostapis') +@patch('typer.echo') +def test_get_linux_devices_success(mock_echo, mock_hostapis, mock_devices): + """Test successful retrieval of Linux devices.""" + mock_devices.return_value = [ + { + 'name': 'pulse', + 'max_input_channels': 32, + 'max_output_channels': 32, + 'default_samplerate': 44100.0, + 'hostapi': 0 + }, + { + 'name': 'default', + 'max_input_channels': 32, + 'max_output_channels': 32, + 'default_samplerate': 44100.0, + 'hostapi': 0 + } + ] + mock_hostapis.return_value = {'name': 'ALSA'} + + result = get_linux_audio_devices() + + assert len(result) == 2 + assert result[0].name == 'pulse' + assert result[0].device_id == 0 + assert result[0].hostapi_name == 'ALSA' + assert result[1].name == 'default' + assert result[1].device_id == 1 + + +@patch('sounddevice.query_devices', side_effect=Exception("ALSA error")) +@patch('typer.echo') +def test_get_linux_devices_sounddevice_error(mock_echo, mock_devices): + """Test sounddevice query failure.""" + with pytest.raises(SystemExit): + get_linux_audio_devices() + + +# Display Functions Tests + +@patch('typer.echo') +def test_display_windows_devices_empty_list(mock_echo): + """Test display with empty device list.""" + display_windows_devices([], True, True) + + mock_echo.assert_called() + + +@patch('typer.echo') +@patch('src.commands.audio._display_single_windows_device') +def test_display_windows_devices_input_only(mock_display_single, mock_echo): + """Test display showing only input devices.""" + devices = [ + WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), + WindowsAudioDevice('Speakers', 'OK', 'Output', 'spk1') + ] + + display_windows_devices(devices, show_inputs=True, show_outputs=False) + + mock_display_single.assert_called_once_with(devices[0]) + + +@patch('typer.echo') +@patch('src.commands.audio._display_single_windows_device') +def test_display_windows_devices_with_unknown(mock_display_single, mock_echo): + """Test display including unknown device types.""" + devices = [ + WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), + WindowsAudioDevice('Unknown Device', 'OK', 'Unknown', 'unk1') + ] + + display_windows_devices(devices, show_inputs=True, show_outputs=True) + + assert mock_display_single.call_count == 2 + + +@patch('typer.echo') +def test_display_linux_devices_no_matching_devices(mock_echo): + """Test display with no matching devices.""" + devices = [ + LinuxAudioDevice(0, 'pulse', 0, 32, 44100.0, 'ALSA') + ] + config = AppConfig(ServerConfig(), AudioConfig()) + + with pytest.raises(SystemExit): + display_linux_devices(devices, show_inputs=True, show_outputs=False, config=config) + + +@patch('typer.echo') +@patch('sounddevice.default') +def test_display_linux_devices_success(mock_default, mock_echo): + """Test successful display of Linux devices.""" + mock_default.device = [0, 1] + + devices = [ + LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA') + ] + config = AppConfig(ServerConfig(), AudioConfig()) + + display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) + + # Verify that echo was called with device information + assert mock_echo.call_count > 0 + + +@patch('typer.echo') +@patch('sounddevice.default') +def test_display_linux_devices_with_configured_device(mock_default, mock_echo): + """Test display with configured device marked.""" + mock_default.device = [0, 1] + + devices = [ + LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA'), + LinuxAudioDevice(1, 'default', 32, 32, 44100.0, 'ALSA') + ] + config = AppConfig(ServerConfig(), AudioConfig(device=1)) + + display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) + + # Verify that echo was called and device is marked as configured + assert mock_echo.call_count > 0 \ No newline at end of file