From af1e81243a62a023626d4fdf827ddad5ff234510 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Tue, 9 Sep 2025 22:35:33 +0200 Subject: [PATCH] Added audio commands + "server test" command --- main.py | 3 +- requirements.txt | 23 ++ src/cli.py | 6 +- src/commands/audio.py | 730 +++++++----------------------------- src/commands/server.py | 119 +++++- src/core/Expando.py | 107 ++++++ src/core/__init__.py | 0 src/core/utils.py | 257 +++++++++++++ src/devices.py | 78 ++++ src/utils.py | 66 ---- tests/test_audio_devices.py | 407 -------------------- tests/test_expando.py | 99 +++++ tests/test_filter_by.py | 165 ++++++++ tests/test_filter_conf.py | 85 +++++ tests/test_select.py | 256 +++++++++++++ tests/test_sort_by.py | 152 ++++++++ 16 files changed, 1477 insertions(+), 1076 deletions(-) create mode 100644 requirements.txt create mode 100644 src/core/Expando.py create mode 100644 src/core/__init__.py create mode 100644 src/core/utils.py create mode 100644 src/devices.py delete mode 100644 src/utils.py delete mode 100644 tests/test_audio_devices.py create mode 100644 tests/test_expando.py create mode 100644 tests/test_filter_by.py create mode 100644 tests/test_filter_conf.py create mode 100644 tests/test_select.py create mode 100644 tests/test_sort_by.py diff --git a/main.py b/main.py index 174747e..8a62332 100644 --- a/main.py +++ b/main.py @@ -8,8 +8,7 @@ Usage: python main.py audio test [--device ID] [--duration N] [--save] python main.py audio config """ - from src.cli import app if __name__ == "__main__": - app() \ No newline at end of file + app() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..172a82d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +cffi==1.17.1 +click==8.2.1 +comtypes==1.4.12 +iniconfig==2.1.0 +markdown-it-py==4.0.0 +mdurl==0.1.2 +numpy==2.3.2 +packaging==25.0 +pluggy==1.6.0 +psutil==7.0.0 +PyAudio==0.2.14 +pycaw==20240210 +pycparser==2.22 +Pygments==2.19.2 +pytest==8.4.1 +PyYAML==6.0.2 +rich==14.1.0 +scipy==1.16.1 +shellingham==1.5.4 +sounddevice==0.5.2 +typer==0.17.3 +typing_extensions==4.15.0 +wyoming==1.7.2 diff --git a/src/cli.py b/src/cli.py index ade61d8..b3e1f12 100644 --- a/src/cli.py +++ b/src/cli.py @@ -9,14 +9,14 @@ audio_app = typer.Typer(help="Audio device operations") # Add commands to subcommands server_app.command("check")(server.check) +server_app.command("test")(server.test) audio_app.command("list")(audio.list_devices) audio_app.command("test")(audio.test_device) -audio_app.command("config")(audio.config_info) +audio_app.command("install")(audio.install) # Register subcommands app.add_typer(server_app, name="server") app.add_typer(audio_app, name="audio") - if __name__ == "__main__": - app() \ No newline at end of file + app() diff --git a/src/commands/audio.py b/src/commands/audio.py index 8dcd573..943f28a 100644 --- a/src/commands/audio.py +++ b/src/commands/audio.py @@ -1,79 +1,15 @@ import sounddevice as sd -import subprocess -import typer import numpy as np import os -import json -from typing import Optional, List, Dict, Any, Tuple -from dataclasses import dataclass +from typing import Optional, List -from rich.console import Console -from rich.table import Table - -from ..utils import SortConf, sort_by, FilterConf, filter_by +from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices, \ + get_audio_devices_linux +from ..core.utils import * from ..config import AppConfig -@dataclass -class WindowsAudioDevice: - """Windows audio device data structure.""" - name: str - status: str - device_type: str # "Input", "Output", "Unknown" - instance_id: str - - -@dataclass -class LinuxAudioDevice: - """Linux audio device data structure.""" - device_id: int - name: str - max_input_channels: int - max_output_channels: int - default_samplerate: float - hostapi_name: str - - -def list_devices( - device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), - pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), - native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), - sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), - desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), - filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), - config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") -): - """List audio devices.""" - - # Load configuration for context - config = AppConfig.load(config_file) - - # Determine title based on environment and options - is_wsl = detect_wsl() - use_windows = is_wsl and not native - sort_config = SortConf(sort_by, not desc) if sort_by else None - filter_config = FilterConf.new(filter_by) if filter_by else None - if use_windows: - typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", - fg=typer.colors.BLUE)) - windows_devices = get_windows_audio_devices() - display_windows_devices(windows_devices, sort_config, filter_config) - - else: - linux_devices = get_linux_audio_devices() - display_linux_devices(linux_devices, config, pulse, sort_config) - - # Show legend - typer.echo(f"\nLegend:") - if use_windows: - typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) - typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED)) - else: - typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) - typer.echo(f" IN/OUT = Input/Output channel count") - - -def detect_wsl() -> bool: +def _detect_wsl() -> bool: """Detect if running under Windows Subsystem for Linux.""" try: # Method 1: Check /proc/version for Microsoft @@ -100,479 +36,126 @@ def detect_wsl() -> bool: return False -def get_windows_audio_devices_com() -> List[WindowsAudioDevice]: - """ - Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell. - Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow. - """ - ps_script = r""" -[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; - -$code = @" -using System; -using System.Runtime.InteropServices; -using System.Collections.Generic; - -[ComImport] -[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] -class MMDeviceEnumerator {} - -[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceEnumerator { - int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices); - // Other methods not used are omitted -} - -enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 } - -[Flags] -enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF } - -[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceCollection { - int GetCount(out int pcDevices); - int Item(int nDevice, out IMMDevice ppDevice); -} - -[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDevice { - int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface); - int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties); - int GetId(out string ppstrId); - int GetState(out int pdwState); -} - -[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IPropertyStore { - int GetCount(out int cProps); - int GetAt(int iProp, out PROPERTYKEY pkey); - int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv); - int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar); - int Commit(); -} - -[StructLayout(LayoutKind.Sequential)] -struct PROPERTYKEY { - public Guid fmtid; - public int pid; -} - -[StructLayout(LayoutKind.Explicit)] -struct PROPVARIANT { - [FieldOffset(0)] public ushort vt; - [FieldOffset(8)] public IntPtr pointerValue; -} - -static class PKEY { - // PKEY_Device_FriendlyName - public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY { - fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14 - }; -} - -public class DeviceInfo { - public string Name { get; set; } - public string Id { get; set; } - public string Type { get; set; } // "Input" or "Output" - public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown" -} - -public static class CoreAudioEnumerator { - [DllImport("ole32.dll")] - private static extern int PropVariantClear(ref PROPVARIANT pvar); - - private static string ReadString(IPropertyStore store, PROPERTYKEY key) { - PROPVARIANT pv = new PROPVARIANT(); - try { - store.GetValue(ref key, out pv); - // VT_LPWSTR = 31 - if (pv.vt == 31) { - return Marshal.PtrToStringUni(pv.pointerValue); - } - return null; - } finally { - PropVariantClear(ref pv); - } - } - - public static List GetAll() { - var result = new List(); - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - - foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) { - IMMDeviceCollection coll; - enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll); - - int count; - coll.GetCount(out count); - - for (int i = 0; i < count; i++) { - IMMDevice dev; - coll.Item(i, out dev); - - string id; - dev.GetId(out id); - - int state; - dev.GetState(out state); - - IPropertyStore store; - dev.OpenPropertyStore(0 /* STGM_READ */, out store); - string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)"; - - string type = flow == EDataFlow.eCapture ? "Input" : "Output"; - - string status = - (state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" : - (state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" : - (state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" : - (state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" : - "Unknown"; - - result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status }); - } - } - - return result; - } -} -"@ - -Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop - -$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object { - [PSCustomObject]@{ - FriendlyName = $_.Name - InstanceId = $_.Id - Type = $_.Type - Status = $_.Status - } -} - -$devices | ConvertTo-Json -""" - try: - result = subprocess.run( - [ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - ps_script, - ], - capture_output=True, - text=True, - encoding="utf-8", - timeout=30, - ) - - if result.returncode != 0: - err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}" - typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED)) - return [] - - output = result.stdout.strip() - if not output: - typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW)) - return [] - - data = json.loads(output) - items = data if isinstance(data, list) else [data] - - devices: List[WindowsAudioDevice] = [] - for it in items: - name = (it.get("FriendlyName") or "").strip() - instance_id = it.get("InstanceId") or "" - dev_type = it.get("Type") or "Unknown" - status = it.get("Status") or "Unknown" - - if not name: - continue - - devices.append( - WindowsAudioDevice( - name=name, - status=status, - device_type=dev_type, - instance_id=instance_id, - ) - ) - - return devices - - except json.JSONDecodeError as e: - typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED)) - return [] - except subprocess.TimeoutExpired: - typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED)) - return [] - except Exception as e: - typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}", - fg=typer.colors.RED)) - return [] - - -def get_windows_default_devices() -> Dict[str, str]: - """Get Windows default audio devices via PowerShell.""" - ps_script = r""" -[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; - -$code = @" -using System; -using System.Runtime.InteropServices; - -[ComImport] -[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] -class MMDeviceEnumerator {} - -[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceEnumerator { - int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint); -} - -enum EDataFlow { eRender = 0, eCapture = 1 } -enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 } - -[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDevice { - int GetId(out string ppstrId); -} - -public static class DefaultDeviceHelper { - public static string GetDefaultInputDevice() { - try { - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - IMMDevice device; - enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device); - string id; - device.GetId(out id); - return id; - } catch { return null; } - } - - public static string GetDefaultOutputDevice() { - try { - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - IMMDevice device; - enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device); - string id; - device.GetId(out id); - return id; - } catch { return null; } - } -} -"@ - -Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop - -$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice() -$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice() - -[PSCustomObject]@{ - DefaultInput = $defaultInput - DefaultOutput = $defaultOutput -} | ConvertTo-Json -""" - - try: - result = subprocess.run([ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - ps_script, - ], capture_output=True, text=True, encoding="utf-8", timeout=15) - - if result.returncode != 0: - return {"default_input": None, "default_output": None} - - if not result.stdout.strip(): - return {"default_input": None, "default_output": None} - - data = json.loads(result.stdout) - return { - "default_input": data.get("DefaultInput"), - "default_output": data.get("DefaultOutput") - } - - except Exception: - return {"default_input": None, "default_output": None} - - -def get_windows_audio_devices() -> List[WindowsAudioDevice]: - """Get detailed Windows audio devices via PowerShell from WSL.""" - try: - result = subprocess.run([ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - # Force UTF-8 output from PowerShell before running the command - "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; " - "Get-PnpDevice -Class AudioEndpoint | " - "Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" - ], capture_output=True, text=True, encoding="utf-8", timeout=15) - - if result.returncode != 0: - error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}" - typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED)) - return [] - - if not result.stdout.strip(): - typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) - return [] - - data = json.loads(result.stdout) - devices = data if isinstance(data, list) else [data] - - windows_devices = [] - for device in devices: - name = device.get('FriendlyName', '').strip() - status = device.get('Status', 'Unknown') - instance_id = device.get('InstanceId', '') - - if not name: - continue - - # Determine device type from InstanceId - device_type = _categorize_device_type(instance_id) - - windows_devices.append(WindowsAudioDevice( - name=name, - status=status, - device_type=device_type, - instance_id=instance_id - )) - - return windows_devices - - except json.JSONDecodeError as e: - typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED)) - return [] - except subprocess.TimeoutExpired: - typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED)) - return [] - except Exception as e: - typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED)) - return [] - - -def _categorize_device_type(instance_id: str) -> str: +def _get_device_type(instance_id: str) -> str: """Categorize device type based on InstanceId.""" instance_lower = instance_id.lower() if '0.0.1.00000000' in instance_lower: - return "Input" + return "Recording" elif '0.0.0.00000000' in instance_lower: - return "Output" + return "Playback" else: return "Unknown" -def get_linux_audio_devices() -> List[LinuxAudioDevice]: - """Get Linux audio devices using sounddevice.""" - try: - devices = sd.query_devices() - linux_devices = [] - - for i, device in enumerate(devices): - hostapi_name = sd.query_hostapis(device['hostapi'])['name'] - - linux_devices.append(LinuxAudioDevice( - device_id=i, - name=device['name'], - max_input_channels=device['max_input_channels'], - max_output_channels=device['max_output_channels'], - default_samplerate=device['default_samplerate'], - hostapi_name=hostapi_name - )) - - return linux_devices - except Exception as e: - typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True)) - raise typer.Exit(1) +def _get_short_device_id(instance_id: str) -> str: + """Get device name based on InstanceId.""" + if len(instance_id) == 68: + return instance_id[31:-1] + elif len(instance_id) == 55: + return instance_id[18:-1] + else: + return instance_id -def display_windows_devices(devices: List[WindowsAudioDevice], - sort_conf: Optional[SortConf] = None, - filter_conf: Optional[FilterConf] = None - ) -> None: - """Display Windows audio devices with optimized formatting.""" +def _get_colored_bool(status: bool) -> str: + """Get colored status string based on status.""" + if status: + return f"[green]{status}[/green]" + else: + return str(status) + + +def _get_colored_status(status: str) -> str: + """Get colored status string based on status.""" + if status == "Unknown": + return f"[red]{status}[/red]" + else: + return str(status) + + +def list_devices( + device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), + pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), + native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), + all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"), + sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), + desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), + filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """List audio devices.""" - def _get_id(_instance_id): - return _instance_id[31:39] + # Load configuration for context + config = AppConfig.load(config_file) - if not devices: - typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED)) + # Determine title based on environment and options + is_wsl = _detect_wsl() + if is_wsl and not native: + typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", + fg=typer.colors.BLUE)) + result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows() + if not check_result(result): + return + + windows_devices = get_results(result.stdout) + if all_devices: + select_conf = [ + SelectConf(attr="Index", default=""), + SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)), + SelectConf(attr="FriendlyName", to="Name"), + SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)), + SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)), + ] + else: + select_conf = [ + SelectConf(attr="Index"), + SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)), + SelectConf(attr="Name"), + SelectConf(attr="Type"), + SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)), + ] + windows_devices = select(windows_devices, select_conf) + + # apply sorting and filtering + windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info) + + display_as_table(windows_devices) + + + else: + if native and not is_wsl: + typer.echo(typer.style("Native is applicable only when WSL is detected.", fg=typer.colors.RED)) + return + + if all_devices and native: + typer.echo(typer.style("All devices is not applicable with native mode.", fg=typer.colors.RED)) + return + + result = get_audio_devices_linux() + if not check_result(result): + return + + linux_devices = result.result + # apply sorting and filtering + linux_devices = filter_and_sort(linux_devices, filter_info, sort_info, desc_info) + + display_as_table(linux_devices) + + # Show legend + typer.echo(f"\nLegend:") + typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) + typer.echo(f" IN/OUT = Input/Output channel count") + + +def install(): + result = install_audio_cmdlets() + if not check_result(result): return - - # Get default devices - default_devices = get_windows_default_devices() - default_input_id = default_devices.get("default_input") - default_output_id = default_devices.get("default_output") - if not devices: - typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED)) - - # Create rich table - table = Table(show_header=True) - table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis") - table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis") - table.add_column("Type", width=8, justify="center") - table.add_column("Status", width=3, justify="center") - table.add_column("*", width=2, justify="center") - - # Add devices to table - rows = [] # list of dict - for device in devices: - # Determine if device is selected (default) - is_selected = False - if device.device_type == "Input" and device.instance_id == default_input_id: - is_selected = True - elif device.device_type == "Output" and device.instance_id == default_output_id: - is_selected = True - - selected_marker = "*" if is_selected else "" - - # Color coding for status - if device.status == "OK": - status_text = f"[green]{device.status}[/green]" - else: - status_text = f"[red]{device.status}[/red]" - - # Style selected row - if is_selected: - row_style = "bold" - id_text = f"[bold]{_get_id(device.instance_id)}[/bold]" - name_text = f"[bold]{device.name}[/bold]" - type_text = f"[bold]{device.device_type}[/bold]" - selected_text = f"[bold yellow]{selected_marker}[/bold yellow]" - else: - row_style = None - id_text = _get_id(device.instance_id) - name_text = device.name - type_text = device.device_type - selected_text = selected_marker - - rows.append({ - "id": id_text, - "name": name_text, - "type": type_text, - "status": status_text, - "selected": selected_text - }) - - rows = sort_by(rows, sort_conf) - rows = filter_by(rows, filter_conf) - - for row in rows: - table.add_row(*row.values()) - - # Display table - console = Console() - console.print(table) + typer.echo(result.stdout) -def display_linux_devices(devices: List[LinuxAudioDevice], +def display_linux_devices(devices: list, config: AppConfig, show_pulse: bool = False, sort_conf: Optional[SortConf] = None) -> None: @@ -625,14 +208,14 @@ def display_linux_devices(devices: List[LinuxAudioDevice], # Legacy function for backwards compatibility -def get_audio_devices(): - """Get comprehensive information about audio devices.""" - try: - devices = sd.query_devices() - return devices - except Exception as e: - typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True)) - raise typer.Exit(1) +# def get_audio_devices(): +# """Get comprehensive information about audio devices.""" +# try: +# devices = sd.query_devices() +# return devices +# except Exception as e: +# typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True)) +# raise typer.Exit(1) def get_pulseaudio_sources(): @@ -658,6 +241,7 @@ def test_device( device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"), duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"), save: bool = typer.Option(False, "--save", help="Save recording to WAV file"), + play: bool = typer.Option(False, "--play", help="Play recorded audio through default speakers"), config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") ): """Test audio recording from a specific device.""" @@ -672,7 +256,11 @@ def test_device( typer.echo("=" * 20) # Get device info - devices_list = get_linux_audio_devices() + result = get_audio_devices_linux() + if not check_result(result): + return + + devices_list = result.result if test_device_id is not None: if test_device_id >= len(devices_list): @@ -726,72 +314,20 @@ def test_device( else: typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN)) + # Play recorded audio if requested + if play: + typer.echo(f"\n{typer.style('Playing recorded audio...', fg=typer.colors.CYAN, bold=True)}") + sd.play(audio_data, samplerate=config.audio.sample_rate) + sd.wait() + typer.echo(typer.style("Playback completed!", fg=typer.colors.CYAN)) + # Save if requested if save: filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav" - # Note: WyomingAudioRecorder is not defined in the current code - # This would need to be implemented or the save functionality modified - typer.echo(f" Save functionality needs to be implemented") + from scipy.io import wavfile + wavfile.write(filename, config.audio.sample_rate, audio_int16) + typer.echo(typer.style(f"Audio saved to: {filename}", fg=typer.colors.MAGENTA, bold=True)) except Exception as e: typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True)) raise typer.Exit(1) - - -def config_info( - config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") -): - """Show current audio configuration.""" - - # Load configuration - config = AppConfig.load(config_file) - - typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True)) - typer.echo("=" * 21) - - # Show current settings - typer.echo(f"\nCurrent settings:") - typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}") - typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}") - - if config.audio.device is not None: - typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}") - - # Show device details - try: - devices_list = get_linux_audio_devices() - if config.audio.device < len(devices_list): - device = devices_list[config.audio.device] - typer.echo(f"\nConfigured device details:") - typer.echo(f" Name: {device['name']}") - typer.echo(f" Input channels: {device['max_input_channels']}") - typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz") - - if device['max_input_channels'] == 0: - typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED)) - else: - typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!", - fg=typer.colors.RED, bold=True)) - except Exception as e: - typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED)) - else: - typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}") - - # Show default device info - try: - default_device = sd.default.device - if hasattr(default_device, '__len__') and len(default_device) >= 2: - default_input = int(default_device[0]) - else: - default_input = int(default_device) - - devices_list = get_linux_audio_devices() - device = devices_list[default_input] - typer.echo(f"\nDefault input device:") - typer.echo(f" ID: {default_input}") - typer.echo(f" Name: {device['name']}") - typer.echo(f" Input channels: {device['max_input_channels']}") - except Exception as e: - typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED)) - - # Show configuration source info diff --git a/src/commands/server.py b/src/commands/server.py index a99bd4e..088c28d 100644 --- a/src/commands/server.py +++ b/src/commands/server.py @@ -4,7 +4,61 @@ from contextlib import closing import typer from typing import Optional +from wyoming.audio import AudioStart, AudioChunk, AudioStop + from ..config import AppConfig +import io +import wave +import numpy as np +import sounddevice as sd +import asyncio +from wyoming.client import AsyncTcpClient +from wyoming.asr import Transcribe, Transcript + + +async def _async_transcribe(host: str, port: int, timeout: float, pcm_bytes: bytes, lang: str) -> Optional[str]: + """Stream raw PCM data to Wyoming ASR and return transcript text.""" + # Instantiate the async TCP client + client = AsyncTcpClient(host, port) + + # Audio parameters + rate = 16000 + width = 2 # 16-bit + channels = 1 + + # The client instance is an async context manager. + async with client: + # 1. Send transcription request + await client.write_event(Transcribe(language=lang).event()) + + # 2. Start the audio stream + await client.write_event(AudioStart(rate, width, channels).event()) + + # 3. Send audio chunks + chunk_size = 2048 # A reasonable chunk size + for i in range(0, len(pcm_bytes), chunk_size): + chunk_bytes = pcm_bytes[i:i + chunk_size] + await client.write_event(AudioChunk(audio=chunk_bytes, rate=rate, width=width, channels=channels).event()) + + # 4. Stop the audio stream + await client.write_event(AudioStop().event()) + + # 5. Read events until a transcript arrives + transcript_text = None + try: + while True: + event = await asyncio.wait_for(client.read_event(), timeout=timeout) + if event is None: + break + + if Transcript.is_type(event.type): + tr = Transcript.from_event(event) + transcript_text = tr.text + break + except asyncio.TimeoutError: + typer.echo(typer.style("Connection timed out waiting for transcript.", fg=typer.colors.YELLOW)) + + return transcript_text def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]: @@ -57,4 +111,67 @@ def check( typer.echo(typer.style("Wyoming server unreachable!", fg=typer.colors.RED, bold=True)) typer.echo(f"Server: {final_host}:{final_port}") typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED)) - raise typer.Exit(1) \ No newline at end of file + raise typer.Exit(1) + + +def test( + duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"), + lang: str = typer.Option("fr", "--lang", help="Language code: 'fr' or 'en'"), + host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"), + port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"), + timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """Record from default microphone, send to Wyoming ASR server, and print transcription.""" + # Load configuration + config = AppConfig.load(config_file) + final_host = host or config.server.host + final_port = port or config.server.port + final_timeout = timeout or config.server.timeout + + # Validate language (two-letter code) + lang = (lang or "fr").strip().lower() + if lang not in ("fr", "en"): + typer.echo(typer.style("Invalid --lang. Use 'fr' or 'en'.", fg=typer.colors.RED)) + raise typer.Exit(2) + + # Check server reachability first + reachable, latency, err = check_wyoming_server(final_host, final_port, final_timeout) + if not reachable: + typer.echo(typer.style(f"Cannot reach Wyoming server at {final_host}:{final_port}: {err}", fg=typer.colors.RED)) + raise typer.Exit(1) + + # Record audio (16 kHz mono float32) + sample_rate = 16000 + channels = 1 + typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True)) + try: + frames = int(duration * sample_rate) + audio = sd.rec(frames, samplerate=sample_rate, channels=channels, dtype="float32") + sd.wait() + except Exception as e: + typer.echo(typer.style(f"Audio recording failed: {e}", fg=typer.colors.RED)) + raise typer.Exit(1) + + # Convert to PCM16 bytes directly, no need for WAV wrapper + audio_int16 = np.clip(audio.flatten() * 32767.0, -32768, 32767).astype(np.int16) + pcm_bytes = audio_int16.tobytes() + + # Send to Wyoming ASR (async) + try: + typer.echo(typer.style(f"Connecting to {final_host}:{final_port} (lang={lang})...", fg=typer.colors.CYAN)) + + # Run the async helper + transcript_text = asyncio.run( + _async_transcribe(final_host, final_port, final_timeout, pcm_bytes, lang) + ) + + if transcript_text: + typer.echo(typer.style("\nTranscription:", fg=typer.colors.GREEN, bold=True)) + typer.echo(transcript_text) + else: + typer.echo(typer.style("No transcription received.", fg=typer.colors.YELLOW)) + + except Exception as e: + typer.echo(typer.style(f"ASR request failed: {e}", fg=typer.colors.RED)) + raise typer.Exit(1) diff --git a/src/core/Expando.py b/src/core/Expando.py new file mode 100644 index 0000000..bcdc8aa --- /dev/null +++ b/src/core/Expando.py @@ -0,0 +1,107 @@ +NO_DEFAULT = object() + + +class Expando: + """ + Readonly dynamic class that eases the access to attributes and sub attributes + It is initialized with a dict + You can then access the property using dot '.' (ex. obj.prop1.prop2) + """ + + def __init__(self, props=None, **kwargs): + self._props = props.copy() if props else {} + if kwargs: + self._props.update(kwargs) + + def __getattr__(self, item): + if item not in self._props: + raise AttributeError(item) + + current = self._props[item] + return Expando(current) if isinstance(current, dict) else current + + def __setitem__(self, key, value): + self._props[key] = value + + def get(self, path, default=None): + """ + returns the value, from a string with represents the path + :param path: + :param default: value to return if path is not found + :return: + """ + current = self._props + for attr in path.split("."): + if isinstance(current, list): + temp = [] + for value in current: + if value and attr in value: + temp.append(value[attr]) + current = temp + + else: + if current is None or attr not in current: + return default + current = current[attr] + + return current + + def geti(self, item, default=NO_DEFAULT): + """ + insensitive get + :param item: + :param default: + :return: + """ + if item in self._props: + return self._props[item] + + for k, v in self._props.items(): + if k.lower() == item.lower(): + return v + + if default is not NO_DEFAULT: + return default + + raise AttributeError(f"Item '{item}' not found") + + def as_dict(self): + """ + Return the information as a dictionary + :return: + """ + return self._props.copy() + + def to_dict(self, mappings: dict) -> dict: + """ + Return the information as a dictionary, with the given mappings + """ + return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None} + + def __hasattr__(self, item): + return item in self._props + + def values(self): + return self._props.values() + + def copy(self): + return Expando(self._props.copy()) + + def __repr__(self): + if "key" in self._props: + return f"Expando(key={self._props["key"]})" + + props_as_str = str(self._props) + if len(props_as_str) > 50: + props_as_str = props_as_str[:50] + "..." + + return f"Expando({props_as_str})" + + def __eq__(self, other): + if not isinstance(other, Expando): + return False + + return self._props == other._props + + def __hash__(self): + return hash(tuple(sorted(self._props.items()))) diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/utils.py b/src/core/utils.py new file mode 100644 index 0000000..f2675f3 --- /dev/null +++ b/src/core/utils.py @@ -0,0 +1,257 @@ +import json +import subprocess +from dataclasses import dataclass +from subprocess import CompletedProcess +from typing import Any, Callable + +import typer +from rich.console import Console +from rich.table import Table + +from .Expando import Expando + + +@dataclass +class SortConf: + property: str + ascending: bool = True + + +@dataclass +class FilterConf: + property: str + value: str + + @staticmethod + def new(text): + """Initialize FilterConf from text.""" + if text is None: + return None + + parts = text.split("=") + if len(parts) != 2: + return None + + if not parts[0].strip() or not parts[1].strip(): + return None + + return FilterConf(parts[0].strip(), parts[1].strip()) + + +@dataclass +class SelectConf: + attr: str # property nam to select + to: str = None # rename the property + default: str = None # value to use if property is missing + formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item + + +@dataclass +class ProcessResult: + result: Any + error: str = None + + @property + def stderr(self): + return self.error + + @property + def returncode(self): + return 0 if self.result is not None else 1 + +def sort_by(items: list[Expando], sort_conf: SortConf): + """Sort a list of items by a given property.""" + + def _safe_sort_key(item): + """Get sort key with safe handling of None values.""" + value = item.geti(property_name, "") + # Convert None to empty string for consistent sorting + return "" if value is None else str(value) + + if sort_conf is None: + return items + + if len(items) == 0: + return items + + if sort_conf.property is None: + return items + + property_name = sort_conf.property.strip() + return sorted(items, key=_safe_sort_key, reverse=not sort_conf.ascending) + + +def filter_by(items, filter_conf: FilterConf): + """Filter a list of items by a given property.""" + + def _contains(item, value): + return str(value).lower() in str(item).lower() + + if (filter_conf is None + or filter_conf.property is None + or len(items) == 0): + return items + + predicate = _contains + property_name = filter_conf.property.strip() + + return [ + item for item in items if predicate(item.geti(property_name), filter_conf.value) + ] + + +def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]: + res = [] + + for item in items: + new_item = {} + for setting in settings: + attr = setting.attr.strip() + + key = setting.to or attr + value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default) + new_item[key] = value + + res.append(Expando(new_item)) + + return res + + +def run_ps_command(command: str) -> CompletedProcess[str]: + """Run a PowerShell command and return the output""" + completed = subprocess.run( + ["powershell.exe", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-Command", + "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;", + command], + capture_output=True, + text=True, + encoding="utf-8", timeout=5 + ) + + return completed + + +def run_ps_command_live(command: str) -> CompletedProcess[str]: + """Run a PowerShell command and display output in real-time""" + process = subprocess.Popen( + ["powershell.exe", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-Command", + "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;", + command], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8" + ) + + stdout_lines = [] + stderr_lines = [] + + # Read output in real-time + while True: + stdout_line = process.stdout.readline() + stderr_line = process.stderr.readline() + + if stdout_line: + print(stdout_line.rstrip()) + stdout_lines.append(stdout_line) + + if stderr_line: + print(f"ERROR: {stderr_line.rstrip()}") + stderr_lines.append(stderr_line) + + # Check if process is finished + if process.poll() is not None: + break + + # Read any remaining lines + remaining_stdout, remaining_stderr = process.communicate() + if remaining_stdout: + print(remaining_stdout.rstrip()) + stdout_lines.append(remaining_stdout) + if remaining_stderr: + print(f"ERROR: {remaining_stderr.rstrip()}") + stderr_lines.append(remaining_stderr) + + # Create CompletedProcess object to maintain compatibility + completed = CompletedProcess( + args=["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command], + returncode=process.returncode, + stdout=''.join(stdout_lines), + stderr=''.join(stderr_lines) + ) + + return completed + + +def get_results(stdout: str) -> list[Expando]: + stripped = stdout.strip() + if not stripped: + typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) + return [] + + data = json.loads(stripped) + as_list = data if isinstance(data, list) else [data] + + return [Expando(item) for item in as_list] + + +def display_as_table(result, columns_settings: list = None): + def _create_default_columns_settings(): + if len(result) == 0: + return [] + + blue_print = result[0] + return [{"name": attr} for attr in blue_print.as_dict().keys()] + + if columns_settings is None: + columns_settings = _create_default_columns_settings() + + formatters = {} + table = Table(show_header=True) + for col in [c for c in columns_settings if "name" in c]: + col_name = col["name"] + table.add_column(col_name, **col["attrs"] if "attrs" in col else {}) + formatter = col.pop("formatter", None) + if formatter: + formatters[col_name] = formatter + + for item in result: + cloned = item.copy() + # apply formatters + for col, formatter in formatters.items(): + cloned[col] = formatter(item) + + table.add_row(*[str(value) for value in cloned.values()]) + + # Display table + console = Console() + console.print(table) + + +def check_result(result): + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}" + typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED)) + return False + + return True + + +def filter_and_sort(items, filter_info, sort_info, desc_info=False): + if filter_info: + filter_config = FilterConf.new(filter_info) if filter_by else None + items = filter_by(items, filter_config) + + if sort_info: + sort_config = SortConf(sort_info, not desc_info) if sort_by else None + items = sort_by(items, sort_config) + + return items diff --git a/src/devices.py b/src/devices.py new file mode 100644 index 0000000..eeb65de --- /dev/null +++ b/src/devices.py @@ -0,0 +1,78 @@ +from subprocess import CompletedProcess +import sounddevice as sd + +from .core.Expando import Expando +from .core.utils import run_ps_command, run_ps_command_live, ProcessResult + + +def _is_audio_device_cmdlets_installed(): + # PowerShell command to check if module is available + ps_command = "Get-Module -ListAvailable -Name AudioDeviceCmdlets" + + result = run_ps_command(ps_command) + + # If something is returned in stdout, the module is installed + return bool(result.stdout.strip()) + + +def install_audio_cmdlets(): + """Install AudioDevice cmdlet for better audio device management""" + if _is_audio_device_cmdlets_installed(): + return CompletedProcess( + args=["is_audio_device_cmdlets_installed()"], + returncode=0, + stdout="AudioDevice cmdlet is already installed", + stderr="") + + ps_script = r""" + # 1) Trust PSGallery + Set-PSRepository -Name "PSGallery" -InstallationPolicy Trusted -ErrorAction SilentlyContinue + + # 2) Ensure NuGet provider is available + if (-not (Get-PackageProvider -Name NuGet -ErrorAction SilentlyContinue)) { + Install-PackageProvider -Name NuGet -Force -Scope CurrentUser + } + + # 3) Install AudioDeviceCmdlets if missing + if (-not (Get-Module -ListAvailable -Name AudioDeviceCmdlets)) { + Install-Module -Name AudioDeviceCmdlets -Scope CurrentUser -Force -AllowClobber + } + + # 4) Import the module + Import-Module AudioDeviceCmdlets -ErrorAction SilentlyContinue + """ + return run_ps_command_live(ps_script) + + +def get_audio_devices_windows(): + """Get all audio devices using AudioDevice cmdlet""" + ps_script = "Get-AudioDevice -List | ConvertTo-Json" + return run_ps_command(ps_script) + + +def get_audio_devices_windows_from_pnp_devices(): + ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" + return run_ps_command(ps_script) + + +def get_audio_devices_linux(): + """Get all audio devices using sounddevice module""" + try: + devices = sd.query_devices() + result = [] + + for i, device in enumerate(devices): + hostapi_name = sd.query_hostapis(device['hostapi'])['name'] + + result.append(Expando({ + "device_id": i, + "name": device['name'], + "max_input_channels": device['max_input_channels'], + "max_output_channels": device['max_output_channels'], + "default_samplerate": device['default_samplerate'], + "hostapi_name": hostapi_name} + )) + + return ProcessResult(result) + except Exception as e: + ProcessResult(None, f"Error querying Linux audio devices: {e}") diff --git a/src/utils.py b/src/utils.py deleted file mode 100644 index ccd15a9..0000000 --- a/src/utils.py +++ /dev/null @@ -1,66 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class SortConf: - property: str - ascending: bool = True - - -@dataclass -class FilterConf: - property: str - value: str - - @staticmethod - def new(text): - """Initialize FilterConf from text.""" - if text is None: - return None - - parts = text.split("=") - if len(parts) != 2: - return None - - return FilterConf(parts[0].strip(), parts[1].strip()) - - -def sort_by(items, sort_conf: SortConf): - """Sort a list of items by a given property.""" - if sort_conf is None: - return items - - if len(items) == 0: - return items - - if isinstance(items[0], dict): - property_name = sort_conf.property.lower().strip() - return sorted(items, key=lambda item: item.get(property_name), reverse=not sort_conf.ascending) - else: - return sorted(items, key=lambda item: getattr(item, sort_conf.property), reverse=not sort_conf.ascending) - - -def filter_by(items, filter_conf: FilterConf): - """Filter a list of items by a given property.""" - - def _contains(item, value): - return str(value).lower() in str(item).lower() - - if (filter_conf is None - or filter_conf.property is None - or len(items) == 0): - return items - - predicate = _contains - - if isinstance(items[0], dict): - property_name = filter_conf.property.lower().strip() - return [ - item for item in items if predicate(item.get(property_name), filter_conf.value) - ] - - else: - - return [ - item for item in items if getattr(item, filter_conf.property) == filter_conf.value - ] diff --git a/tests/test_audio_devices.py b/tests/test_audio_devices.py deleted file mode 100644 index 9fb2b80..0000000 --- a/tests/test_audio_devices.py +++ /dev/null @@ -1,407 +0,0 @@ -import pytest -from unittest.mock import Mock, patch, mock_open, MagicMock -import json -import subprocess -from pathlib import Path - -from src.commands.audio import ( - detect_wsl, - get_windows_audio_devices, - get_linux_audio_devices, - _categorize_device_type, - display_windows_devices, - display_linux_devices, - WindowsAudioDevice, - LinuxAudioDevice -) -from src.config import AppConfig, AudioConfig, ServerConfig - - -# WSL Detection Tests - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-microsoft-standard')) -def test_detect_wsl_via_proc_version_microsoft(mock_exists): - """Test WSL detection via /proc/version containing 'microsoft'.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-wsl2-standard')) -def test_detect_wsl_via_proc_version_wsl(mock_exists): - """Test WSL detection via /proc/version containing 'wsl'.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv') -def test_detect_wsl_via_env_distro_name(mock_getenv, mock_exists): - """Test WSL detection via WSL_DISTRO_NAME environment variable.""" - mock_exists.return_value = True - mock_getenv.side_effect = lambda key: 'Ubuntu' if key == 'WSL_DISTRO_NAME' else None - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv') -def test_detect_wsl_via_env_wslenv(mock_getenv, mock_exists): - """Test WSL detection via WSLENV environment variable.""" - mock_exists.return_value = True - mock_getenv.side_effect = lambda key: 'PATH/l' if key == 'WSLENV' else None - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv', return_value=None) -def test_detect_wsl_false_native_linux(mock_getenv, mock_exists): - """Test WSL detection returns False on native Linux.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is False - - -@patch('pathlib.Path.exists', return_value=False) -@patch('os.getenv', return_value=None) -def test_detect_wsl_false_no_proc_version(mock_getenv, mock_exists): - """Test WSL detection returns False when /proc/version doesn't exist.""" - result = detect_wsl() - - assert result is False - - -@patch('pathlib.Path.exists') -@patch('builtins.open', side_effect=Exception("Permission denied")) -@patch('os.getenv', return_value=None) -def test_detect_wsl_handles_proc_version_read_error(mock_getenv, mock_exists): - """Test WSL detection handles /proc/version read errors gracefully.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is False - - -# Device Type Categorization Tests - -def test_categorize_input_device(): - """Test categorization of input device.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.1.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\capture" - - result = _categorize_device_type(instance_id) - - assert result == "Input" - - -def test_categorize_output_device(): - """Test categorization of output device.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.0.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\render" - - result = _categorize_device_type(instance_id) - - assert result == "Output" - - -def test_categorize_unknown_device(): - """Test categorization of unknown device type.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.2.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\unknown" - - result = _categorize_device_type(instance_id) - - assert result == "Unknown" - - -def test_categorize_empty_instance_id(): - """Test categorization with empty instance ID.""" - result = _categorize_device_type("") - - assert result == "Unknown" - - -# Windows Audio Devices Tests - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_success_single_device(mock_echo, mock_run): - """Test successful retrieval of single Windows device.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps({ - "FriendlyName": "Blue Yeti Microphone", - "Status": "OK", - "InstanceId": "USB\\VID_0B05&PID_1234\\capture" - }) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 1 - assert result[0].name == "Blue Yeti Microphone" - assert result[0].status == "OK" - assert result[0].device_type == "Input" - assert "USB\\VID_0B05&PID_1234\\capture" in result[0].instance_id - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_success_multiple_devices(mock_echo, mock_run): - """Test successful retrieval of multiple Windows devices.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps([ - { - "FriendlyName": "Microphone", - "Status": "OK", - "InstanceId": "capture_device" - }, - { - "FriendlyName": "Speakers", - "Status": "OK", - "InstanceId": "render_device" - } - ]) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 2 - assert result[0].device_type == "Input" - assert result[1].device_type == "Output" - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_powershell_error(mock_echo, mock_run): - """Test PowerShell command failure.""" - mock_result = Mock() - mock_result.returncode = 1 - mock_result.stdout = "" - mock_result.stderr = "Get-PnpDevice : Access denied" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_empty_output(mock_echo, mock_run): - """Test PowerShell returning empty output.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = "" - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_invalid_json(mock_echo, mock_run): - """Test invalid JSON response from PowerShell.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = "Invalid JSON {" - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run', side_effect=subprocess.TimeoutExpired("powershell.exe", 15)) -@patch('typer.echo') -def test_get_windows_devices_timeout(mock_echo, mock_run): - """Test PowerShell command timeout.""" - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_filters_empty_names(mock_echo, mock_run): - """Test filtering of devices with empty names.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps([ - { - "FriendlyName": "Valid Device", - "Status": "OK", - "InstanceId": "capture_device" - }, - { - "FriendlyName": "", - "Status": "OK", - "InstanceId": "empty_name_device" - }, - { - "FriendlyName": None, - "Status": "OK", - "InstanceId": "null_name_device" - } - ]) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 1 - assert result[0].name == "Valid Device" - - -# Linux Audio Devices Tests - -@patch('sounddevice.query_devices') -@patch('sounddevice.query_hostapis') -@patch('typer.echo') -def test_get_linux_devices_success(mock_echo, mock_hostapis, mock_devices): - """Test successful retrieval of Linux devices.""" - mock_devices.return_value = [ - { - 'name': 'pulse', - 'max_input_channels': 32, - 'max_output_channels': 32, - 'default_samplerate': 44100.0, - 'hostapi': 0 - }, - { - 'name': 'default', - 'max_input_channels': 32, - 'max_output_channels': 32, - 'default_samplerate': 44100.0, - 'hostapi': 0 - } - ] - mock_hostapis.return_value = {'name': 'ALSA'} - - result = get_linux_audio_devices() - - assert len(result) == 2 - assert result[0].name == 'pulse' - assert result[0].device_id == 0 - assert result[0].hostapi_name == 'ALSA' - assert result[1].name == 'default' - assert result[1].device_id == 1 - - -@patch('sounddevice.query_devices', side_effect=Exception("ALSA error")) -@patch('typer.echo') -def test_get_linux_devices_sounddevice_error(mock_echo, mock_devices): - """Test sounddevice query failure.""" - with pytest.raises(SystemExit): - get_linux_audio_devices() - - -# Display Functions Tests - -@patch('typer.echo') -def test_display_windows_devices_empty_list(mock_echo): - """Test display with empty device list.""" - display_windows_devices([], True, True) - - mock_echo.assert_called() - - -@patch('typer.echo') -@patch('src.commands.audio._display_single_windows_device') -def test_display_windows_devices_input_only(mock_display_single, mock_echo): - """Test display showing only input devices.""" - devices = [ - WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), - WindowsAudioDevice('Speakers', 'OK', 'Output', 'spk1') - ] - - display_windows_devices(devices, show_inputs=True, show_outputs=False) - - mock_display_single.assert_called_once_with(devices[0]) - - -@patch('typer.echo') -@patch('src.commands.audio._display_single_windows_device') -def test_display_windows_devices_with_unknown(mock_display_single, mock_echo): - """Test display including unknown device types.""" - devices = [ - WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), - WindowsAudioDevice('Unknown Device', 'OK', 'Unknown', 'unk1') - ] - - display_windows_devices(devices, show_inputs=True, show_outputs=True) - - assert mock_display_single.call_count == 2 - - -@patch('typer.echo') -def test_display_linux_devices_no_matching_devices(mock_echo): - """Test display with no matching devices.""" - devices = [ - LinuxAudioDevice(0, 'pulse', 0, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig()) - - with pytest.raises(SystemExit): - display_linux_devices(devices, show_inputs=True, show_outputs=False, config=config) - - -@patch('typer.echo') -@patch('sounddevice.default') -def test_display_linux_devices_success(mock_default, mock_echo): - """Test successful display of Linux devices.""" - mock_default.device = [0, 1] - - devices = [ - LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig()) - - display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) - - # Verify that echo was called with device information - assert mock_echo.call_count > 0 - - -@patch('typer.echo') -@patch('sounddevice.default') -def test_display_linux_devices_with_configured_device(mock_default, mock_echo): - """Test display with configured device marked.""" - mock_default.device = [0, 1] - - devices = [ - LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA'), - LinuxAudioDevice(1, 'default', 32, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig(device=1)) - - display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) - - # Verify that echo was called and device is marked as configured - assert mock_echo.call_count > 0 \ No newline at end of file diff --git a/tests/test_expando.py b/tests/test_expando.py new file mode 100644 index 0000000..63ca95d --- /dev/null +++ b/tests/test_expando.py @@ -0,0 +1,99 @@ +import pytest + +from core.Expando import Expando + + +def test_i_can_get_properties(): + props = {"a": 10, + "b": { + "c": "value", + "d": 20 + }} + dynamic = Expando(props) + + assert dynamic.a == 10 + assert dynamic.b.c == "value" + + with pytest.raises(AttributeError): + assert dynamic.unknown == "some_value" + + +def test_i_can_get(): + props = {"a": 10, + "b": { + "c": "value", + "d": 20 + }} + dynamic = Expando(props) + + assert dynamic.get("a") == 10 + assert dynamic.get("b.c") == "value" + assert dynamic.get("unknown") is None + assert dynamic.get("unknown", "default") == "default" + assert dynamic.get("b.x") is None + assert dynamic.get("b.x", "default") == "default" + assert dynamic.get("b.x", None) is None + + +def test_i_can_get_insensitive(): + props = {"A": 10, + "b": 20} + dynamic = Expando(props) + + assert dynamic.geti("a") == 10 + assert dynamic.geti("A") == 10 + assert dynamic.geti("a.c", None) is None + with pytest.raises(AttributeError): + assert dynamic.geti("unknown") == "some_value" + + +def test_i_can_get_insensitive_when_the_two_entries_exist(): + props = {"A": 10, + "a": 20} + dynamic = Expando(props) + assert dynamic.geti("A") == 10 + assert dynamic.geti("a") == 20 + + +def test_i_can_get_from_list(): + props = {"a": [{"c": "value1", "d": 1}, {"c": "value2", "d": 2}]} + dynamic = Expando(props) + + assert dynamic.get("a.c") == ["value1", "value2"] + + +def test_none_is_returned_when_get_from_list_and_property_does_not_exist(): + props = {"a": [{"c": "value1", "d": 1}, + {"a": "value2", "d": 2} # 'c' does not exist in the second row + ]} + dynamic = Expando(props) + + assert dynamic.get("a.c") == ["value1"] + + +def test_i_can_manage_none_values(): + props = {"a": 10, + "b": None} + dynamic = Expando(props) + + assert dynamic.get("b.c") is None + + +def test_i_can_manage_none_values_in_list(): + props = {"a": [{"b": {"c": "value"}}, + {"b": None} + ]} + dynamic = Expando(props) + + assert dynamic.get("a.b.c") == ["value"] + + +def test_i_can_add_new_properties(): + props = {"a": 10, + "b": 20} + dynamic = Expando(props) + dynamic["c"] = 30 + + assert dynamic.a == 10 + assert dynamic.b == 20 + assert dynamic.c == 30 diff --git a/tests/test_filter_by.py b/tests/test_filter_by.py new file mode 100644 index 0000000..79bb646 --- /dev/null +++ b/tests/test_filter_by.py @@ -0,0 +1,165 @@ +import pytest + +from src.core.utils import FilterConf, filter_by +from src.core.Expando import Expando + + +def test_i_can_filter_by_matching_string(): + """Test basic filtering with string matching.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "alice" + + +def test_i_can_filter_by_case_insensitive(): + """Test case-insensitive filtering.""" + items = [ + Expando({"name": "ALICE"}), + Expando({"name": "Bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "ALICE" + + +def test_i_can_filter_by_partial_match(): + """Test partial substring matching.""" + items = [ + Expando({"name": "alice smith"}), + Expando({"name": "bob jones"}), + Expando({"name": "charlie brown"}) + ] + filter_conf = FilterConf(property="name", value="smith") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "alice smith" + + +def test_i_can_handle_none_filter_conf(): + """Test that None filter configuration returns items unchanged.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + + result = filter_by(items, None) + + assert result == items + assert len(result) == 2 + + +def test_i_can_handle_empty_list(): + """Test that empty list returns empty list.""" + items = [] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert result == [] + + +def test_i_can_handle_none_property_in_filter_conf(): + """Test that FilterConf with None property returns items unchanged.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + # Create FilterConf with None property (simulating invalid state) + filter_conf = FilterConf(property=None, value="alice") + + result = filter_by(items, filter_conf) + + assert result == items + assert len(result) == 2 + + +def test_i_can_filter_with_no_matches(): + """Test filtering when no items match the criteria.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="xyz") + + result = filter_by(items, filter_conf) + + assert result == [] + + +def test_i_cannot_filter_with_missing_properties(): + """Test that missing properties raise AttributeError.""" + items = [ + Expando({"name": "alice"}), + Expando({"age": 25}), # no name property + Expando({"name": "bob"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + # Should raise AttributeError when trying to access missing property + with pytest.raises(AttributeError): + filter_by(items, filter_conf) + + +def test_i_can_filter_with_none_values(): + """Test filtering when properties have None values.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": None}), + Expando({"name": "bob"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + # None gets converted to "none" string, shouldn't match "alice" + assert len(result) == 1 + assert result[0].name == "alice" + + +def test_i_can_filter_with_case_insensitive_property_names(): + """Test filtering using case-insensitive property names.""" + items = [ + Expando({"Name": "alice"}), # uppercase N + Expando({"Name": "bob"}), + Expando({"Name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") # lowercase n + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].Name == "alice" + + +def test_i_can_filter_with_numbers_and_mixed_types(): + """Test filtering with mixed types (numbers, strings).""" + items = [ + Expando({"value": "123"}), + Expando({"value": 123}), + Expando({"value": "hello123"}), + Expando({"value": "world"}) + ] + filter_conf = FilterConf(property="value", value="123") + + result = filter_by(items, filter_conf) + + # Should match "123", 456 (converted to "456"), and "hello123" + assert len(result) == 3 + expected_values = ["123", 123, "hello123"] + actual_values = [item.value for item in result] + assert all(val in expected_values for val in actual_values) \ No newline at end of file diff --git a/tests/test_filter_conf.py b/tests/test_filter_conf.py new file mode 100644 index 0000000..72a3a11 --- /dev/null +++ b/tests/test_filter_conf.py @@ -0,0 +1,85 @@ +from src.core.utils import FilterConf + + +def test_i_can_create_filter_conf_from_valid_text(): + """Test creating FilterConf from valid property=value format.""" + result = FilterConf.new("property=value") + + assert result is not None + assert isinstance(result, FilterConf) + assert result.property == "property" + assert result.value == "value" + + +def test_i_can_handle_none_input(): + """Test that None input returns None.""" + result = FilterConf.new(None) + + assert result is None + + +def test_i_can_handle_whitespace_around_equals(): + """Test that whitespace around property and value is stripped.""" + result = FilterConf.new(" property = value ") + + assert result is not None + assert result.property == "property" + assert result.value == "value" + + +def test_i_cannot_create_filter_conf_without_equals(): + """Test that text without equals sign returns None.""" + result = FilterConf.new("property") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_multiple_equals(): + """Test that text with multiple equals signs returns None.""" + result = FilterConf.new("property=value=another") + + assert result is None + + +def test_i_cannot_create_filter_conf_from_empty_string(): + """Test that empty string returns None.""" + result = FilterConf.new("") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_empty_property(): + """Test that empty property name returns None.""" + result = FilterConf.new("=value") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_empty_value(): + """Test that empty value returns None.""" + result = FilterConf.new("property=") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_both_empty(): + """Test that both empty property and value returns None.""" + result = FilterConf.new("=") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_whitespace_only(): + """Test that whitespace-only parts return None.""" + result = FilterConf.new(" = ") + + assert result is None + + +def test_i_can_handle_special_characters_in_values(): + """Test that special characters in property names and values are handled correctly.""" + result = FilterConf.new("device_type=USB Audio Device (2.0)") + + assert result is not None + assert result.property == "device_type" + assert result.value == "USB Audio Device (2.0)" \ No newline at end of file diff --git a/tests/test_select.py b/tests/test_select.py new file mode 100644 index 0000000..b937a1c --- /dev/null +++ b/tests/test_select.py @@ -0,0 +1,256 @@ +import pytest + +from core.utils import SelectConf, select +from core.Expando import Expando + + +def test_i_can_select_basic_attributes(): + """Test basic selection of existing attributes.""" + items = [ + Expando({"name": "alice", "age": 25, "city": "paris"}), + Expando({"name": "bob", "age": 30, "city": "london"}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="age") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age == 25 + assert not hasattr(result[0], "city") # city not selected + assert result[1].name == "bob" + assert result[1].age == 30 + + +def test_i_can_select_with_renamed_attributes(): + """Test renaming attributes using SelectConf.to.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", to="full_name"), + SelectConf(attr="age", to="years") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].full_name == "alice" + assert result[0].years == 25 + assert not hasattr(result[0], "name") # original name not present + assert not hasattr(result[0], "age") # original name not present + assert result[1].full_name == "bob" + assert result[1].years == 30 + + +def test_i_can_select_with_formatter(): + """Test applying formatters to selected attributes.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", formatter=lambda item: item.name.upper()), + SelectConf(attr="age", formatter=lambda item: item.age * 2) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "ALICE" + assert result[0].age == 50 + assert result[1].name == "BOB" + assert result[1].age == 60 + + +def test_i_can_select_with_formatter_and_rename(): + """Test combining formatter and renaming.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()), + SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].upper_name == "ALICE" + assert result[0].double_age == 50 + assert not hasattr(result[0], "name") + assert not hasattr(result[0], "age") + assert result[1].upper_name == "BOB" + assert result[1].double_age == 60 + + +def test_i_can_handle_empty_items_list(): + """Test that empty items list returns empty list.""" + items = [] + settings = [SelectConf(attr="name")] + + result = select(items, settings) + + assert result == [] + + +def test_i_can_handle_empty_settings_list(): + """Test that empty settings list returns items with empty properties.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [] + + result = select(items, settings) + + assert len(result) == 2 + # Each result should be an Expando with empty properties + assert result[0]._props == {} + assert result[1]._props == {} + + +def test_i_can_handle_missing_attributes(): + """Test that missing attributes return None (normal behavior).""" + items = [ + Expando({"name": "alice"}), # no age attribute + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="age") # missing in first item + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age is None # missing attribute returns None + assert result[1].name == "bob" + assert result[1].age == 30 + + +def test_i_can_handle_whitespace_in_attr_names(): + """Test that whitespace in attribute names is stripped.""" + items = [ + Expando({"name": "alice", "age": 25}) + ] + settings = [ + SelectConf(attr=" name "), # whitespace around attr + SelectConf(attr="\tage\n") # tabs and newlines + ] + + result = select(items, settings) + + assert len(result) == 1 + assert result[0].name == "alice" + assert result[0].age == 25 + + +def test_i_can_select_multiple_attributes(): + """Test selecting multiple attributes from multiple items.""" + items = [ + Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}), + Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}), + Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="city", to="location"), + SelectConf(attr="country", formatter=lambda item: item.country.upper()) + ] + + result = select(items, settings) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[0].location == "paris" + assert result[0].country == "FRANCE" + assert result[1].name == "bob" + assert result[1].location == "london" + assert result[1].country == "UK" + assert result[2].name == "charlie" + assert result[2].location == "madrid" + assert result[2].country == "SPAIN" + + +def test_i_can_handle_formatter_with_whole_item(): + """Test that formatter receives the whole item, not just the attribute value.""" + items = [ + Expando({"first_name": "alice", "last_name": "smith"}), + Expando({"first_name": "bob", "last_name": "jones"}) + ] + settings = [ + SelectConf( + attr="first_name", + to="full_name", + formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}" + ) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].full_name == "alice smith" + assert result[1].full_name == "bob jones" + + +def test_i_cannot_select_when_formatter_raises_exception(): + """Test that formatter exceptions are propagated.""" + items = [ + Expando({"name": "alice", "value": None}) + ] + settings = [ + SelectConf( + attr="value", + formatter=lambda item: item.get("value").upper() # will fail on None + ) + ] + + with pytest.raises(AttributeError): + select(items, settings) + + +def test_i_can_handle_none_values_in_formatter(): + """Test formatter handling None values gracefully when designed to do so.""" + items = [ + Expando({"name": "alice", "value": None}), + Expando({"name": "bob", "value": "test"}) + ] + settings = [ + SelectConf( + attr="value", + formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE" + ) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].value == "NO_VALUE" + assert result[1].value == "TEST" + + +def test_i_can_select_nested_attributes(): + """Test selecting nested attributes using Expando.get() path notation.""" + items = [ + Expando({"user": {"profile": {"name": "alice"}}, "age": 25}), + Expando({"user": {"profile": {"name": "bob"}}, "age": 30}) + ] + settings = [ + SelectConf(attr="user.profile.name", to="name"), + SelectConf(attr="age") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age == 25 + assert result[1].name == "bob" + assert result[1].age == 30 diff --git a/tests/test_sort_by.py b/tests/test_sort_by.py new file mode 100644 index 0000000..559fd87 --- /dev/null +++ b/tests/test_sort_by.py @@ -0,0 +1,152 @@ +from src.core.utils import FilterConf, SortConf, sort_by +from src.core.Expando import Expando + + +def test_i_can_sort_by_ascending_strings(): + """Test sorting strings in ascending order.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[1].name == "bob" + assert result[2].name == "charlie" + + +def test_i_can_sort_by_descending_strings(): + """Test sorting strings in descending order.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "charlie"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=False) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "charlie" + assert result[1].name == "bob" + assert result[2].name == "alice" + + +def test_i_can_handle_none_sort_conf(): + """Test that None sort configuration returns items unchanged.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}) + ] + + result = sort_by(items, None) + + assert result == items + assert result[0].name == "charlie" + assert result[1].name == "alice" + + +def test_i_can_handle_empty_list(): + """Test that empty list returns empty list.""" + items = [] + sort_conf = SortConf(property="name") + + result = sort_by(items, sort_conf) + + assert result == [] + + +def test_i_can_sort_with_missing_properties(): + """Test sorting when some objects don't have the property.""" + items = [ + Expando({"name": "bob"}), + Expando({"age": 25}), # no name property + Expando({"name": "alice"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + # Items without the property should get empty string and sort first + assert result[0].geti("name", "") == "" # the one with missing property + assert result[1].name == "alice" + assert result[2].name == "bob" + + +def test_i_can_sort_with_none_values(): + """Test sorting when properties have None values.""" + items = [ + Expando({"name": "bob"}), + Expando({"name": None}), + Expando({"name": "alice"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + # None values should be treated as empty strings and sort first + assert result[0].name is None + assert result[1].name == "alice" + assert result[2].name == "bob" + + +def test_i_can_sort_with_case_insensitive_property_names(): + """Test sorting using case-insensitive property names.""" + items = [ + Expando({"Name": "charlie"}), # uppercase N + Expando({"Name": "alice"}), + Expando({"Name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=True) # lowercase n + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].Name == "alice" + assert result[1].Name == "bob" + assert result[2].Name == "charlie" + + + +def test_i_can_sort_mixed_types(): + """Test sorting with mixed types (numbers, strings, None).""" + items = [ + Expando({"value": "zebra"}), + Expando({"value": 10}), + Expando({"value": None}), + Expando({"value": "apple"}) + ] + sort_conf = SortConf(property="value", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 4 + # Mixed types will be converted to strings for comparison + # None -> "", numbers -> str(number), strings stay strings + assert result[0].value is None # None becomes "" and sorts first + assert result[1].value == 10 # "10" sorts before "apple" + assert result[2].value == "apple" # "apple" sorts before "zebra" + assert result[3].value == "zebra" + + +def test_i_can_sort_with_whitespace_in_property_name(): + """Test that property names are stripped of whitespace.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property=" name ", ascending=True) # whitespace around property + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[1].name == "bob" + assert result[2].name == "charlie"