diff --git a/main.py b/main.py index 174747e..8a62332 100644 --- a/main.py +++ b/main.py @@ -8,8 +8,7 @@ Usage: python main.py audio test [--device ID] [--duration N] [--save] python main.py audio config """ - from src.cli import app if __name__ == "__main__": - app() \ No newline at end of file + app() diff --git a/src/cli.py b/src/cli.py index ade61d8..63af006 100644 --- a/src/cli.py +++ b/src/cli.py @@ -12,6 +12,7 @@ server_app.command("check")(server.check) audio_app.command("list")(audio.list_devices) audio_app.command("test")(audio.test_device) audio_app.command("config")(audio.config_info) +audio_app.command("install")(audio.install) # Register subcommands app.add_typer(server_app, name="server") diff --git a/src/commands/audio.py b/src/commands/audio.py index 8dcd573..278d13a 100644 --- a/src/commands/audio.py +++ b/src/commands/audio.py @@ -1,79 +1,14 @@ import sounddevice as sd -import subprocess -import typer import numpy as np import os -import json -from typing import Optional, List, Dict, Any, Tuple -from dataclasses import dataclass +from typing import Optional, List -from rich.console import Console -from rich.table import Table - -from ..utils import SortConf, sort_by, FilterConf, filter_by +from ..devices import get_audio_devices_windows, install_audio_cmdlets +from ..core.utils import * from ..config import AppConfig -@dataclass -class WindowsAudioDevice: - """Windows audio device data structure.""" - name: str - status: str - device_type: str # "Input", "Output", "Unknown" - instance_id: str - - -@dataclass -class LinuxAudioDevice: - """Linux audio device data structure.""" - device_id: int - name: str - max_input_channels: int - max_output_channels: int - default_samplerate: float - hostapi_name: str - - -def list_devices( - device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), - pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), - native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), - sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), - desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), - filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), - config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") -): - """List audio devices.""" - - # Load configuration for context - config = AppConfig.load(config_file) - - # Determine title based on environment and options - is_wsl = detect_wsl() - use_windows = is_wsl and not native - sort_config = SortConf(sort_by, not desc) if sort_by else None - filter_config = FilterConf.new(filter_by) if filter_by else None - if use_windows: - typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", - fg=typer.colors.BLUE)) - windows_devices = get_windows_audio_devices() - display_windows_devices(windows_devices, sort_config, filter_config) - - else: - linux_devices = get_linux_audio_devices() - display_linux_devices(linux_devices, config, pulse, sort_config) - - # Show legend - typer.echo(f"\nLegend:") - if use_windows: - typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) - typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED)) - else: - typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) - typer.echo(f" IN/OUT = Input/Output channel count") - - -def detect_wsl() -> bool: +def _detect_wsl() -> bool: """Detect if running under Windows Subsystem for Linux.""" try: # Method 1: Check /proc/version for Microsoft @@ -99,365 +34,7 @@ def detect_wsl() -> bool: except Exception: return False - -def get_windows_audio_devices_com() -> List[WindowsAudioDevice]: - """ - Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell. - Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow. - """ - ps_script = r""" -[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; - -$code = @" -using System; -using System.Runtime.InteropServices; -using System.Collections.Generic; - -[ComImport] -[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] -class MMDeviceEnumerator {} - -[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceEnumerator { - int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices); - // Other methods not used are omitted -} - -enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 } - -[Flags] -enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF } - -[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceCollection { - int GetCount(out int pcDevices); - int Item(int nDevice, out IMMDevice ppDevice); -} - -[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDevice { - int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface); - int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties); - int GetId(out string ppstrId); - int GetState(out int pdwState); -} - -[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IPropertyStore { - int GetCount(out int cProps); - int GetAt(int iProp, out PROPERTYKEY pkey); - int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv); - int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar); - int Commit(); -} - -[StructLayout(LayoutKind.Sequential)] -struct PROPERTYKEY { - public Guid fmtid; - public int pid; -} - -[StructLayout(LayoutKind.Explicit)] -struct PROPVARIANT { - [FieldOffset(0)] public ushort vt; - [FieldOffset(8)] public IntPtr pointerValue; -} - -static class PKEY { - // PKEY_Device_FriendlyName - public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY { - fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14 - }; -} - -public class DeviceInfo { - public string Name { get; set; } - public string Id { get; set; } - public string Type { get; set; } // "Input" or "Output" - public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown" -} - -public static class CoreAudioEnumerator { - [DllImport("ole32.dll")] - private static extern int PropVariantClear(ref PROPVARIANT pvar); - - private static string ReadString(IPropertyStore store, PROPERTYKEY key) { - PROPVARIANT pv = new PROPVARIANT(); - try { - store.GetValue(ref key, out pv); - // VT_LPWSTR = 31 - if (pv.vt == 31) { - return Marshal.PtrToStringUni(pv.pointerValue); - } - return null; - } finally { - PropVariantClear(ref pv); - } - } - - public static List GetAll() { - var result = new List(); - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - - foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) { - IMMDeviceCollection coll; - enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll); - - int count; - coll.GetCount(out count); - - for (int i = 0; i < count; i++) { - IMMDevice dev; - coll.Item(i, out dev); - - string id; - dev.GetId(out id); - - int state; - dev.GetState(out state); - - IPropertyStore store; - dev.OpenPropertyStore(0 /* STGM_READ */, out store); - string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)"; - - string type = flow == EDataFlow.eCapture ? "Input" : "Output"; - - string status = - (state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" : - (state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" : - (state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" : - (state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" : - "Unknown"; - - result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status }); - } - } - - return result; - } -} -"@ - -Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop - -$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object { - [PSCustomObject]@{ - FriendlyName = $_.Name - InstanceId = $_.Id - Type = $_.Type - Status = $_.Status - } -} - -$devices | ConvertTo-Json -""" - try: - result = subprocess.run( - [ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - ps_script, - ], - capture_output=True, - text=True, - encoding="utf-8", - timeout=30, - ) - - if result.returncode != 0: - err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}" - typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED)) - return [] - - output = result.stdout.strip() - if not output: - typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW)) - return [] - - data = json.loads(output) - items = data if isinstance(data, list) else [data] - - devices: List[WindowsAudioDevice] = [] - for it in items: - name = (it.get("FriendlyName") or "").strip() - instance_id = it.get("InstanceId") or "" - dev_type = it.get("Type") or "Unknown" - status = it.get("Status") or "Unknown" - - if not name: - continue - - devices.append( - WindowsAudioDevice( - name=name, - status=status, - device_type=dev_type, - instance_id=instance_id, - ) - ) - - return devices - - except json.JSONDecodeError as e: - typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED)) - return [] - except subprocess.TimeoutExpired: - typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED)) - return [] - except Exception as e: - typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}", - fg=typer.colors.RED)) - return [] - - -def get_windows_default_devices() -> Dict[str, str]: - """Get Windows default audio devices via PowerShell.""" - ps_script = r""" -[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; - -$code = @" -using System; -using System.Runtime.InteropServices; - -[ComImport] -[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] -class MMDeviceEnumerator {} - -[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDeviceEnumerator { - int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint); -} - -enum EDataFlow { eRender = 0, eCapture = 1 } -enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 } - -[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] -interface IMMDevice { - int GetId(out string ppstrId); -} - -public static class DefaultDeviceHelper { - public static string GetDefaultInputDevice() { - try { - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - IMMDevice device; - enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device); - string id; - device.GetId(out id); - return id; - } catch { return null; } - } - - public static string GetDefaultOutputDevice() { - try { - var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator(); - IMMDevice device; - enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device); - string id; - device.GetId(out id); - return id; - } catch { return null; } - } -} -"@ - -Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop - -$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice() -$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice() - -[PSCustomObject]@{ - DefaultInput = $defaultInput - DefaultOutput = $defaultOutput -} | ConvertTo-Json -""" - - try: - result = subprocess.run([ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - ps_script, - ], capture_output=True, text=True, encoding="utf-8", timeout=15) - - if result.returncode != 0: - return {"default_input": None, "default_output": None} - - if not result.stdout.strip(): - return {"default_input": None, "default_output": None} - - data = json.loads(result.stdout) - return { - "default_input": data.get("DefaultInput"), - "default_output": data.get("DefaultOutput") - } - - except Exception: - return {"default_input": None, "default_output": None} - - -def get_windows_audio_devices() -> List[WindowsAudioDevice]: - """Get detailed Windows audio devices via PowerShell from WSL.""" - try: - result = subprocess.run([ - "powershell.exe", - "-NoProfile", - "-NonInteractive", - "-Command", - # Force UTF-8 output from PowerShell before running the command - "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; " - "Get-PnpDevice -Class AudioEndpoint | " - "Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" - ], capture_output=True, text=True, encoding="utf-8", timeout=15) - - if result.returncode != 0: - error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}" - typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED)) - return [] - - if not result.stdout.strip(): - typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) - return [] - - data = json.loads(result.stdout) - devices = data if isinstance(data, list) else [data] - - windows_devices = [] - for device in devices: - name = device.get('FriendlyName', '').strip() - status = device.get('Status', 'Unknown') - instance_id = device.get('InstanceId', '') - - if not name: - continue - - # Determine device type from InstanceId - device_type = _categorize_device_type(instance_id) - - windows_devices.append(WindowsAudioDevice( - name=name, - status=status, - device_type=device_type, - instance_id=instance_id - )) - - return windows_devices - - except json.JSONDecodeError as e: - typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED)) - return [] - except subprocess.TimeoutExpired: - typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED)) - return [] - except Exception as e: - typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED)) - return [] - - -def _categorize_device_type(instance_id: str) -> str: +def _get_device_type(instance_id: str) -> str: """Categorize device type based on InstanceId.""" instance_lower = instance_id.lower() if '0.0.1.00000000' in instance_lower: @@ -467,8 +44,69 @@ def _categorize_device_type(instance_id: str) -> str: else: return "Unknown" +def list_devices( + device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), + pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), + native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), + sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), + desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), + filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), + config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") +): + """List audio devices.""" + + # Load configuration for context + config = AppConfig.load(config_file) + + # Determine title based on environment and options + is_wsl = _detect_wsl() + use_windows = is_wsl and not native + if use_windows: + typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", + fg=typer.colors.BLUE)) + result = get_audio_devices_windows() + if not check_result(result): + return + + windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''}) + + # apply sorting and filtering + windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info) + + display_as_table(windows_devices, [ + {"name": "Index"}, + {"name": "ID"}, + {"name": "Name"}, + {"name": "Type"}, + {"name": "Default"}, + ]) + + + else: + linux_devices = get_linux_audio_devices() + display_linux_devices(linux_devices, config, pulse, sort_config) + + # Show legend + typer.echo(f"\nLegend:") + if use_windows: + typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) + typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED)) + else: + typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) + typer.echo(f" IN/OUT = Input/Output channel count") -def get_linux_audio_devices() -> List[LinuxAudioDevice]: + +def install(): + result = install_audio_cmdlets() + if not check_result(result): + return + + typer.echo(result.stdout) + + + + +def get_linux_audio_devices() -> list: """Get Linux audio devices using sounddevice.""" try: devices = sd.query_devices() @@ -492,87 +130,7 @@ def get_linux_audio_devices() -> List[LinuxAudioDevice]: raise typer.Exit(1) -def display_windows_devices(devices: List[WindowsAudioDevice], - sort_conf: Optional[SortConf] = None, - filter_conf: Optional[FilterConf] = None - ) -> None: - """Display Windows audio devices with optimized formatting.""" - - def _get_id(_instance_id): - return _instance_id[31:39] - - if not devices: - typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED)) - return - - # Get default devices - default_devices = get_windows_default_devices() - default_input_id = default_devices.get("default_input") - default_output_id = default_devices.get("default_output") - - if not devices: - typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED)) - - # Create rich table - table = Table(show_header=True) - table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis") - table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis") - table.add_column("Type", width=8, justify="center") - table.add_column("Status", width=3, justify="center") - table.add_column("*", width=2, justify="center") - - # Add devices to table - rows = [] # list of dict - for device in devices: - # Determine if device is selected (default) - is_selected = False - if device.device_type == "Input" and device.instance_id == default_input_id: - is_selected = True - elif device.device_type == "Output" and device.instance_id == default_output_id: - is_selected = True - - selected_marker = "*" if is_selected else "" - - # Color coding for status - if device.status == "OK": - status_text = f"[green]{device.status}[/green]" - else: - status_text = f"[red]{device.status}[/red]" - - # Style selected row - if is_selected: - row_style = "bold" - id_text = f"[bold]{_get_id(device.instance_id)}[/bold]" - name_text = f"[bold]{device.name}[/bold]" - type_text = f"[bold]{device.device_type}[/bold]" - selected_text = f"[bold yellow]{selected_marker}[/bold yellow]" - else: - row_style = None - id_text = _get_id(device.instance_id) - name_text = device.name - type_text = device.device_type - selected_text = selected_marker - - rows.append({ - "id": id_text, - "name": name_text, - "type": type_text, - "status": status_text, - "selected": selected_text - }) - - rows = sort_by(rows, sort_conf) - rows = filter_by(rows, filter_conf) - - for row in rows: - table.add_row(*row.values()) - - # Display table - console = Console() - console.print(table) - - -def display_linux_devices(devices: List[LinuxAudioDevice], +def display_linux_devices(devices: list, config: AppConfig, show_pulse: bool = False, sort_conf: Optional[SortConf] = None) -> None: @@ -625,14 +183,14 @@ def display_linux_devices(devices: List[LinuxAudioDevice], # Legacy function for backwards compatibility -def get_audio_devices(): - """Get comprehensive information about audio devices.""" - try: - devices = sd.query_devices() - return devices - except Exception as e: - typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True)) - raise typer.Exit(1) +# def get_audio_devices(): +# """Get comprehensive information about audio devices.""" +# try: +# devices = sd.query_devices() +# return devices +# except Exception as e: +# typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True)) +# raise typer.Exit(1) def get_pulseaudio_sources(): diff --git a/src/core/Expando.py b/src/core/Expando.py new file mode 100644 index 0000000..1939c0a --- /dev/null +++ b/src/core/Expando.py @@ -0,0 +1,104 @@ +NO_DEFAULT = object() + + +class Expando: + """ + Readonly dynamic class that eases the access to attributes and sub attributes + It is initialized with a dict + You can then access the property using dot '.' (ex. obj.prop1.prop2) + """ + + def __init__(self, props): + self._props = props + + def __getattr__(self, item): + if item not in self._props: + raise AttributeError(item) + + current = self._props[item] + return Expando(current) if isinstance(current, dict) else current + + def __setitem__(self, key, value): + self._props[key] = value + + def get(self, path): + """ + returns the value, from a string with represents the path + :param path: + :return: + """ + current = self._props + for attr in path.split("."): + if isinstance(current, list): + temp = [] + for value in current: + if value and attr in value: + temp.append(value[attr]) + current = temp + + else: + if current is None or attr not in current: + return None + current = current[attr] + + return current + + def geti(self, item, default=NO_DEFAULT): + """ + insensitive get + :param item: + :param default: + :return: + """ + if item in self._props: + return self._props[item] + + for k, v in self._props.items(): + if k.lower() == item.lower(): + return v + + if default is not NO_DEFAULT: + return default + + raise AttributeError(f"Item '{item}' not found") + + def as_dict(self): + """ + Return the information as a dictionary + :return: + """ + return self._props.copy() + + def to_dict(self, mappings: dict) -> dict: + """ + Return the information as a dictionary, with the given mappings + """ + return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None} + + def __hasattr__(self, item): + return item in self._props + + def values(self): + return self._props.values() + + def copy(self): + return Expando(self._props.copy()) + + def __repr__(self): + if "key" in self._props: + return f"Expando(key={self._props["key"]})" + + props_as_str = str(self._props) + if len(props_as_str) > 50: + props_as_str = props_as_str[:50] + "..." + + return f"Expando({props_as_str})" + + def __eq__(self, other): + if not isinstance(other, Expando): + return False + + return self._props == other._props + + def __hash__(self): + return hash(tuple(sorted(self._props.items()))) diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/utils.py b/src/core/utils.py new file mode 100644 index 0000000..680c963 --- /dev/null +++ b/src/core/utils.py @@ -0,0 +1,221 @@ +import json +import subprocess +from dataclasses import dataclass +from subprocess import CompletedProcess +from typing import Any + +import typer +from rich.console import Console +from rich.table import Table + +from .Expando import Expando + + +@dataclass +class SortConf: + property: str + ascending: bool = True + + +@dataclass +class FilterConf: + property: str + value: str + + @staticmethod + def new(text): + """Initialize FilterConf from text.""" + if text is None: + return None + + parts = text.split("=") + if len(parts) != 2: + return None + + if not parts[0].strip() or not parts[1].strip(): + return None + + return FilterConf(parts[0].strip(), parts[1].strip()) + + +@dataclass +class ProcessResult: + result: Any + error: str = None + + +def sort_by(items: list[Expando], sort_conf: SortConf): + """Sort a list of items by a given property.""" + + def _safe_sort_key(item): + """Get sort key with safe handling of None values.""" + value = item.geti(property_name, "") + # Convert None to empty string for consistent sorting + return "" if value is None else str(value) + + + if sort_conf is None: + return items + + if len(items) == 0: + return items + + if sort_conf.property is None: + return items + + property_name = sort_conf.property.strip() + return sorted(items, key=_safe_sort_key, reverse=not sort_conf.ascending) + + +def filter_by(items, filter_conf: FilterConf): + """Filter a list of items by a given property.""" + + def _contains(item, value): + return str(value).lower() in str(item).lower() + + if (filter_conf is None + or filter_conf.property is None + or len(items) == 0): + return items + + predicate = _contains + property_name = filter_conf.property.strip() + + return [ + item for item in items if predicate(item.geti(property_name), filter_conf.value) + ] + + +def run_ps_command(command: str) -> CompletedProcess[str]: + """Run a PowerShell command and return the output""" + completed = subprocess.run( + ["powershell.exe", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-Command", + "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;", + command], + capture_output=True, + text=True, + encoding="utf-8", timeout=5 + ) + + return completed + + +def run_ps_command_live(command: str) -> CompletedProcess[str]: + """Run a PowerShell command and display output in real-time""" + process = subprocess.Popen( + ["powershell.exe", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-Command", + "[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;", + command], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8" + ) + + stdout_lines = [] + stderr_lines = [] + + # Read output in real-time + while True: + stdout_line = process.stdout.readline() + stderr_line = process.stderr.readline() + + if stdout_line: + print(stdout_line.rstrip()) + stdout_lines.append(stdout_line) + + if stderr_line: + print(f"ERROR: {stderr_line.rstrip()}") + stderr_lines.append(stderr_line) + + # Check if process is finished + if process.poll() is not None: + break + + # Read any remaining lines + remaining_stdout, remaining_stderr = process.communicate() + if remaining_stdout: + print(remaining_stdout.rstrip()) + stdout_lines.append(remaining_stdout) + if remaining_stderr: + print(f"ERROR: {remaining_stderr.rstrip()}") + stderr_lines.append(remaining_stderr) + + # Create CompletedProcess object to maintain compatibility + completed = CompletedProcess( + args=["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command], + returncode=process.returncode, + stdout=''.join(stdout_lines), + stderr=''.join(stderr_lines) + ) + + return completed + + +def get_results(stdout: str, mappings) -> list[dict]: + stripped = stdout.strip() + if not stripped: + typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) + return [] + + data = json.loads(stripped) + as_list = data if isinstance(data, list) else [data] + + res = [] + for item in as_list: + mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()} + res.append(Expando(mapped)) + + return res + + +def display_as_table(result, columns_settings: list): + formatters = {} + table = Table(show_header=True) + for col in [c for c in columns_settings if "name" in c]: + col_name = col["name"] + table.add_column(col_name, **col["attrs"] if "attrs" in col else {}) + formatter = col.pop("formatter", None) + if formatter: + formatters[col_name] = formatter + + for item in result: + cloned = item.copy() + # apply formatters + for col, formatter in formatters.items(): + cloned[col] = formatter(item) + + table.add_row(*[str(value) for value in cloned.values()]) + + # Display table + console = Console() + console.print(table) + + +def check_result(result): + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}" + typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED)) + return False + + return True + + +def filter_and_sort(items, filter_info, sort_info, desc_info=False): + if filter_info: + filter_config = FilterConf.new(filter_info) if filter_by else None + items = filter_by(items, filter_config) + + if sort_info: + sort_config = SortConf(sort_info, not desc_info) if sort_by else None + items = sort_by(items, sort_config) + + return items diff --git a/src/devices.py b/src/devices.py new file mode 100644 index 0000000..38075cd --- /dev/null +++ b/src/devices.py @@ -0,0 +1,78 @@ +from subprocess import CompletedProcess +import sounddevice as sd + +from .core.Expando import Expando +from .core.utils import run_ps_command, run_ps_command_live, ProcessResult + + +def _is_audio_device_cmdlets_installed(): + # PowerShell command to check if module is available + ps_command = "Get-Module -ListAvailable -Name AudioDeviceCmdlets" + + result = run_ps_command(ps_command) + + # If something is returned in stdout, the module is installed + return bool(result.stdout.strip()) + + +def install_audio_cmdlets(): + """Install AudioDevice cmdlet for better audio device management""" + if _is_audio_device_cmdlets_installed(): + return CompletedProcess( + args=["is_audio_device_cmdlets_installed()"], + returncode=0, + stdout="AudioDevice cmdlet is already installed", + stderr="") + + ps_script = r""" + # 1) Trust PSGallery + Set-PSRepository -Name "PSGallery" -InstallationPolicy Trusted -ErrorAction SilentlyContinue + + # 2) Ensure NuGet provider is available + if (-not (Get-PackageProvider -Name NuGet -ErrorAction SilentlyContinue)) { + Install-PackageProvider -Name NuGet -Force -Scope CurrentUser + } + + # 3) Install AudioDeviceCmdlets if missing + if (-not (Get-Module -ListAvailable -Name AudioDeviceCmdlets)) { + Install-Module -Name AudioDeviceCmdlets -Scope CurrentUser -Force -AllowClobber + } + + # 4) Import the module + Import-Module AudioDeviceCmdlets -ErrorAction SilentlyContinue + """ + return run_ps_command_live(ps_script) + + +def get_audio_devices_windows(): + """Get all audio devices using AudioDevice cmdlet""" + ps_script = "Get-AudioDevice -List | ConvertTo-Json" + return run_ps_command(ps_script) + + +def get_audio_devices_windows_from_pnp(): + ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" + return run_ps_command(ps_script) + + +def get_audio_devices_linux(): + """Get all audio devices using sounddevice module""" + try: + devices = sd.query_devices() + result = [] + + for i, device in enumerate(devices): + hostapi_name = sd.query_hostapis(device['hostapi'])['name'] + + result.append(Expando({ + "device_id": i, + "name": device['name'], + "max_input_channels": device['max_input_channels'], + "max_output_channels": device['max_output_channels'], + "default_samplerate": device['default_samplerate'], + "hostapi_name": hostapi_name} + )) + + return ProcessResult(result) + except Exception as e: + ProcessResult(None, f"Error querying Linux audio devices: {e}") diff --git a/src/utils.py b/src/utils.py deleted file mode 100644 index ccd15a9..0000000 --- a/src/utils.py +++ /dev/null @@ -1,66 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class SortConf: - property: str - ascending: bool = True - - -@dataclass -class FilterConf: - property: str - value: str - - @staticmethod - def new(text): - """Initialize FilterConf from text.""" - if text is None: - return None - - parts = text.split("=") - if len(parts) != 2: - return None - - return FilterConf(parts[0].strip(), parts[1].strip()) - - -def sort_by(items, sort_conf: SortConf): - """Sort a list of items by a given property.""" - if sort_conf is None: - return items - - if len(items) == 0: - return items - - if isinstance(items[0], dict): - property_name = sort_conf.property.lower().strip() - return sorted(items, key=lambda item: item.get(property_name), reverse=not sort_conf.ascending) - else: - return sorted(items, key=lambda item: getattr(item, sort_conf.property), reverse=not sort_conf.ascending) - - -def filter_by(items, filter_conf: FilterConf): - """Filter a list of items by a given property.""" - - def _contains(item, value): - return str(value).lower() in str(item).lower() - - if (filter_conf is None - or filter_conf.property is None - or len(items) == 0): - return items - - predicate = _contains - - if isinstance(items[0], dict): - property_name = filter_conf.property.lower().strip() - return [ - item for item in items if predicate(item.get(property_name), filter_conf.value) - ] - - else: - - return [ - item for item in items if getattr(item, filter_conf.property) == filter_conf.value - ] diff --git a/tests/test_audio_devices.py b/tests/test_audio_devices.py deleted file mode 100644 index 9fb2b80..0000000 --- a/tests/test_audio_devices.py +++ /dev/null @@ -1,407 +0,0 @@ -import pytest -from unittest.mock import Mock, patch, mock_open, MagicMock -import json -import subprocess -from pathlib import Path - -from src.commands.audio import ( - detect_wsl, - get_windows_audio_devices, - get_linux_audio_devices, - _categorize_device_type, - display_windows_devices, - display_linux_devices, - WindowsAudioDevice, - LinuxAudioDevice -) -from src.config import AppConfig, AudioConfig, ServerConfig - - -# WSL Detection Tests - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-microsoft-standard')) -def test_detect_wsl_via_proc_version_microsoft(mock_exists): - """Test WSL detection via /proc/version containing 'microsoft'.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-wsl2-standard')) -def test_detect_wsl_via_proc_version_wsl(mock_exists): - """Test WSL detection via /proc/version containing 'wsl'.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv') -def test_detect_wsl_via_env_distro_name(mock_getenv, mock_exists): - """Test WSL detection via WSL_DISTRO_NAME environment variable.""" - mock_exists.return_value = True - mock_getenv.side_effect = lambda key: 'Ubuntu' if key == 'WSL_DISTRO_NAME' else None - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv') -def test_detect_wsl_via_env_wslenv(mock_getenv, mock_exists): - """Test WSL detection via WSLENV environment variable.""" - mock_exists.return_value = True - mock_getenv.side_effect = lambda key: 'PATH/l' if key == 'WSLENV' else None - - result = detect_wsl() - - assert result is True - - -@patch('pathlib.Path.exists') -@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic')) -@patch('os.getenv', return_value=None) -def test_detect_wsl_false_native_linux(mock_getenv, mock_exists): - """Test WSL detection returns False on native Linux.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is False - - -@patch('pathlib.Path.exists', return_value=False) -@patch('os.getenv', return_value=None) -def test_detect_wsl_false_no_proc_version(mock_getenv, mock_exists): - """Test WSL detection returns False when /proc/version doesn't exist.""" - result = detect_wsl() - - assert result is False - - -@patch('pathlib.Path.exists') -@patch('builtins.open', side_effect=Exception("Permission denied")) -@patch('os.getenv', return_value=None) -def test_detect_wsl_handles_proc_version_read_error(mock_getenv, mock_exists): - """Test WSL detection handles /proc/version read errors gracefully.""" - mock_exists.return_value = True - - result = detect_wsl() - - assert result is False - - -# Device Type Categorization Tests - -def test_categorize_input_device(): - """Test categorization of input device.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.1.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\capture" - - result = _categorize_device_type(instance_id) - - assert result == "Input" - - -def test_categorize_output_device(): - """Test categorization of output device.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.0.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\render" - - result = _categorize_device_type(instance_id) - - assert result == "Output" - - -def test_categorize_unknown_device(): - """Test categorization of unknown device type.""" - instance_id = "SWD\\MMDEVAPI\\{0.0.2.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\unknown" - - result = _categorize_device_type(instance_id) - - assert result == "Unknown" - - -def test_categorize_empty_instance_id(): - """Test categorization with empty instance ID.""" - result = _categorize_device_type("") - - assert result == "Unknown" - - -# Windows Audio Devices Tests - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_success_single_device(mock_echo, mock_run): - """Test successful retrieval of single Windows device.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps({ - "FriendlyName": "Blue Yeti Microphone", - "Status": "OK", - "InstanceId": "USB\\VID_0B05&PID_1234\\capture" - }) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 1 - assert result[0].name == "Blue Yeti Microphone" - assert result[0].status == "OK" - assert result[0].device_type == "Input" - assert "USB\\VID_0B05&PID_1234\\capture" in result[0].instance_id - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_success_multiple_devices(mock_echo, mock_run): - """Test successful retrieval of multiple Windows devices.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps([ - { - "FriendlyName": "Microphone", - "Status": "OK", - "InstanceId": "capture_device" - }, - { - "FriendlyName": "Speakers", - "Status": "OK", - "InstanceId": "render_device" - } - ]) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 2 - assert result[0].device_type == "Input" - assert result[1].device_type == "Output" - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_powershell_error(mock_echo, mock_run): - """Test PowerShell command failure.""" - mock_result = Mock() - mock_result.returncode = 1 - mock_result.stdout = "" - mock_result.stderr = "Get-PnpDevice : Access denied" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_empty_output(mock_echo, mock_run): - """Test PowerShell returning empty output.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = "" - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_invalid_json(mock_echo, mock_run): - """Test invalid JSON response from PowerShell.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = "Invalid JSON {" - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run', side_effect=subprocess.TimeoutExpired("powershell.exe", 15)) -@patch('typer.echo') -def test_get_windows_devices_timeout(mock_echo, mock_run): - """Test PowerShell command timeout.""" - result = get_windows_audio_devices() - - assert result == [] - mock_echo.assert_called() - - -@patch('subprocess.run') -@patch('typer.echo') -def test_get_windows_devices_filters_empty_names(mock_echo, mock_run): - """Test filtering of devices with empty names.""" - mock_result = Mock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps([ - { - "FriendlyName": "Valid Device", - "Status": "OK", - "InstanceId": "capture_device" - }, - { - "FriendlyName": "", - "Status": "OK", - "InstanceId": "empty_name_device" - }, - { - "FriendlyName": None, - "Status": "OK", - "InstanceId": "null_name_device" - } - ]) - mock_result.stderr = "" - mock_run.return_value = mock_result - - result = get_windows_audio_devices() - - assert len(result) == 1 - assert result[0].name == "Valid Device" - - -# Linux Audio Devices Tests - -@patch('sounddevice.query_devices') -@patch('sounddevice.query_hostapis') -@patch('typer.echo') -def test_get_linux_devices_success(mock_echo, mock_hostapis, mock_devices): - """Test successful retrieval of Linux devices.""" - mock_devices.return_value = [ - { - 'name': 'pulse', - 'max_input_channels': 32, - 'max_output_channels': 32, - 'default_samplerate': 44100.0, - 'hostapi': 0 - }, - { - 'name': 'default', - 'max_input_channels': 32, - 'max_output_channels': 32, - 'default_samplerate': 44100.0, - 'hostapi': 0 - } - ] - mock_hostapis.return_value = {'name': 'ALSA'} - - result = get_linux_audio_devices() - - assert len(result) == 2 - assert result[0].name == 'pulse' - assert result[0].device_id == 0 - assert result[0].hostapi_name == 'ALSA' - assert result[1].name == 'default' - assert result[1].device_id == 1 - - -@patch('sounddevice.query_devices', side_effect=Exception("ALSA error")) -@patch('typer.echo') -def test_get_linux_devices_sounddevice_error(mock_echo, mock_devices): - """Test sounddevice query failure.""" - with pytest.raises(SystemExit): - get_linux_audio_devices() - - -# Display Functions Tests - -@patch('typer.echo') -def test_display_windows_devices_empty_list(mock_echo): - """Test display with empty device list.""" - display_windows_devices([], True, True) - - mock_echo.assert_called() - - -@patch('typer.echo') -@patch('src.commands.audio._display_single_windows_device') -def test_display_windows_devices_input_only(mock_display_single, mock_echo): - """Test display showing only input devices.""" - devices = [ - WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), - WindowsAudioDevice('Speakers', 'OK', 'Output', 'spk1') - ] - - display_windows_devices(devices, show_inputs=True, show_outputs=False) - - mock_display_single.assert_called_once_with(devices[0]) - - -@patch('typer.echo') -@patch('src.commands.audio._display_single_windows_device') -def test_display_windows_devices_with_unknown(mock_display_single, mock_echo): - """Test display including unknown device types.""" - devices = [ - WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'), - WindowsAudioDevice('Unknown Device', 'OK', 'Unknown', 'unk1') - ] - - display_windows_devices(devices, show_inputs=True, show_outputs=True) - - assert mock_display_single.call_count == 2 - - -@patch('typer.echo') -def test_display_linux_devices_no_matching_devices(mock_echo): - """Test display with no matching devices.""" - devices = [ - LinuxAudioDevice(0, 'pulse', 0, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig()) - - with pytest.raises(SystemExit): - display_linux_devices(devices, show_inputs=True, show_outputs=False, config=config) - - -@patch('typer.echo') -@patch('sounddevice.default') -def test_display_linux_devices_success(mock_default, mock_echo): - """Test successful display of Linux devices.""" - mock_default.device = [0, 1] - - devices = [ - LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig()) - - display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) - - # Verify that echo was called with device information - assert mock_echo.call_count > 0 - - -@patch('typer.echo') -@patch('sounddevice.default') -def test_display_linux_devices_with_configured_device(mock_default, mock_echo): - """Test display with configured device marked.""" - mock_default.device = [0, 1] - - devices = [ - LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA'), - LinuxAudioDevice(1, 'default', 32, 32, 44100.0, 'ALSA') - ] - config = AppConfig(ServerConfig(), AudioConfig(device=1)) - - display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config) - - # Verify that echo was called and device is marked as configured - assert mock_echo.call_count > 0 \ No newline at end of file diff --git a/tests/test_expando.py b/tests/test_expando.py new file mode 100644 index 0000000..0f0ef29 --- /dev/null +++ b/tests/test_expando.py @@ -0,0 +1,95 @@ +import pytest + +from core.Expando import Expando + + +def test_i_can_get_properties(): + props = {"a": 10, + "b": { + "c": "value", + "d": 20 + }} + dynamic = Expando(props) + + assert dynamic.a == 10 + assert dynamic.b.c == "value" + + with pytest.raises(AttributeError): + assert dynamic.unknown == "some_value" + + +def test_i_can_get(): + props = {"a": 10, + "b": { + "c": "value", + "d": 20 + }} + dynamic = Expando(props) + + assert dynamic.get("a") == 10 + assert dynamic.get("b.c") == "value" + assert dynamic.get("unknown") is None + + +def test_i_can_get_insensitive(): + props = {"A": 10, + "b": 20} + dynamic = Expando(props) + + assert dynamic.geti("a") == 10 + assert dynamic.geti("A") == 10 + assert dynamic.geti("a.c", None) is None + with pytest.raises(AttributeError): + assert dynamic.geti("unknown") == "some_value" + + +def test_i_can_get_insensitive_when_the_two_entries_exist(): + props = {"A": 10, + "a": 20} + dynamic = Expando(props) + assert dynamic.geti("A") == 10 + assert dynamic.geti("a") == 20 + + +def test_i_can_get_from_list(): + props = {"a": [{"c": "value1", "d": 1}, {"c": "value2", "d": 2}]} + dynamic = Expando(props) + + assert dynamic.get("a.c") == ["value1", "value2"] + + +def test_none_is_returned_when_get_from_list_and_property_does_not_exist(): + props = {"a": [{"c": "value1", "d": 1}, + {"a": "value2", "d": 2} # 'c' does not exist in the second row + ]} + dynamic = Expando(props) + + assert dynamic.get("a.c") == ["value1"] + + +def test_i_can_manage_none_values(): + props = {"a": 10, + "b": None} + dynamic = Expando(props) + + assert dynamic.get("b.c") is None + + +def test_i_can_manage_none_values_in_list(): + props = {"a": [{"b": {"c": "value"}}, + {"b": None} + ]} + dynamic = Expando(props) + + assert dynamic.get("a.b.c") == ["value"] + + +def test_i_can_add_new_properties(): + props = {"a": 10, + "b": 20} + dynamic = Expando(props) + dynamic["c"] = 30 + + assert dynamic.a == 10 + assert dynamic.b == 20 + assert dynamic.c == 30 diff --git a/tests/test_filter_by.py b/tests/test_filter_by.py new file mode 100644 index 0000000..79bb646 --- /dev/null +++ b/tests/test_filter_by.py @@ -0,0 +1,165 @@ +import pytest + +from src.core.utils import FilterConf, filter_by +from src.core.Expando import Expando + + +def test_i_can_filter_by_matching_string(): + """Test basic filtering with string matching.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "alice" + + +def test_i_can_filter_by_case_insensitive(): + """Test case-insensitive filtering.""" + items = [ + Expando({"name": "ALICE"}), + Expando({"name": "Bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "ALICE" + + +def test_i_can_filter_by_partial_match(): + """Test partial substring matching.""" + items = [ + Expando({"name": "alice smith"}), + Expando({"name": "bob jones"}), + Expando({"name": "charlie brown"}) + ] + filter_conf = FilterConf(property="name", value="smith") + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].name == "alice smith" + + +def test_i_can_handle_none_filter_conf(): + """Test that None filter configuration returns items unchanged.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + + result = filter_by(items, None) + + assert result == items + assert len(result) == 2 + + +def test_i_can_handle_empty_list(): + """Test that empty list returns empty list.""" + items = [] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + assert result == [] + + +def test_i_can_handle_none_property_in_filter_conf(): + """Test that FilterConf with None property returns items unchanged.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + # Create FilterConf with None property (simulating invalid state) + filter_conf = FilterConf(property=None, value="alice") + + result = filter_by(items, filter_conf) + + assert result == items + assert len(result) == 2 + + +def test_i_can_filter_with_no_matches(): + """Test filtering when no items match the criteria.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "bob"}), + Expando({"name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="xyz") + + result = filter_by(items, filter_conf) + + assert result == [] + + +def test_i_cannot_filter_with_missing_properties(): + """Test that missing properties raise AttributeError.""" + items = [ + Expando({"name": "alice"}), + Expando({"age": 25}), # no name property + Expando({"name": "bob"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + # Should raise AttributeError when trying to access missing property + with pytest.raises(AttributeError): + filter_by(items, filter_conf) + + +def test_i_can_filter_with_none_values(): + """Test filtering when properties have None values.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": None}), + Expando({"name": "bob"}) + ] + filter_conf = FilterConf(property="name", value="alice") + + result = filter_by(items, filter_conf) + + # None gets converted to "none" string, shouldn't match "alice" + assert len(result) == 1 + assert result[0].name == "alice" + + +def test_i_can_filter_with_case_insensitive_property_names(): + """Test filtering using case-insensitive property names.""" + items = [ + Expando({"Name": "alice"}), # uppercase N + Expando({"Name": "bob"}), + Expando({"Name": "charlie"}) + ] + filter_conf = FilterConf(property="name", value="alice") # lowercase n + + result = filter_by(items, filter_conf) + + assert len(result) == 1 + assert result[0].Name == "alice" + + +def test_i_can_filter_with_numbers_and_mixed_types(): + """Test filtering with mixed types (numbers, strings).""" + items = [ + Expando({"value": "123"}), + Expando({"value": 123}), + Expando({"value": "hello123"}), + Expando({"value": "world"}) + ] + filter_conf = FilterConf(property="value", value="123") + + result = filter_by(items, filter_conf) + + # Should match "123", 456 (converted to "456"), and "hello123" + assert len(result) == 3 + expected_values = ["123", 123, "hello123"] + actual_values = [item.value for item in result] + assert all(val in expected_values for val in actual_values) \ No newline at end of file diff --git a/tests/test_filter_conf.py b/tests/test_filter_conf.py new file mode 100644 index 0000000..72a3a11 --- /dev/null +++ b/tests/test_filter_conf.py @@ -0,0 +1,85 @@ +from src.core.utils import FilterConf + + +def test_i_can_create_filter_conf_from_valid_text(): + """Test creating FilterConf from valid property=value format.""" + result = FilterConf.new("property=value") + + assert result is not None + assert isinstance(result, FilterConf) + assert result.property == "property" + assert result.value == "value" + + +def test_i_can_handle_none_input(): + """Test that None input returns None.""" + result = FilterConf.new(None) + + assert result is None + + +def test_i_can_handle_whitespace_around_equals(): + """Test that whitespace around property and value is stripped.""" + result = FilterConf.new(" property = value ") + + assert result is not None + assert result.property == "property" + assert result.value == "value" + + +def test_i_cannot_create_filter_conf_without_equals(): + """Test that text without equals sign returns None.""" + result = FilterConf.new("property") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_multiple_equals(): + """Test that text with multiple equals signs returns None.""" + result = FilterConf.new("property=value=another") + + assert result is None + + +def test_i_cannot_create_filter_conf_from_empty_string(): + """Test that empty string returns None.""" + result = FilterConf.new("") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_empty_property(): + """Test that empty property name returns None.""" + result = FilterConf.new("=value") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_empty_value(): + """Test that empty value returns None.""" + result = FilterConf.new("property=") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_both_empty(): + """Test that both empty property and value returns None.""" + result = FilterConf.new("=") + + assert result is None + + +def test_i_cannot_create_filter_conf_with_whitespace_only(): + """Test that whitespace-only parts return None.""" + result = FilterConf.new(" = ") + + assert result is None + + +def test_i_can_handle_special_characters_in_values(): + """Test that special characters in property names and values are handled correctly.""" + result = FilterConf.new("device_type=USB Audio Device (2.0)") + + assert result is not None + assert result.property == "device_type" + assert result.value == "USB Audio Device (2.0)" \ No newline at end of file diff --git a/tests/test_sort_by.py b/tests/test_sort_by.py new file mode 100644 index 0000000..559fd87 --- /dev/null +++ b/tests/test_sort_by.py @@ -0,0 +1,152 @@ +from src.core.utils import FilterConf, SortConf, sort_by +from src.core.Expando import Expando + + +def test_i_can_sort_by_ascending_strings(): + """Test sorting strings in ascending order.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[1].name == "bob" + assert result[2].name == "charlie" + + +def test_i_can_sort_by_descending_strings(): + """Test sorting strings in descending order.""" + items = [ + Expando({"name": "alice"}), + Expando({"name": "charlie"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=False) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "charlie" + assert result[1].name == "bob" + assert result[2].name == "alice" + + +def test_i_can_handle_none_sort_conf(): + """Test that None sort configuration returns items unchanged.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}) + ] + + result = sort_by(items, None) + + assert result == items + assert result[0].name == "charlie" + assert result[1].name == "alice" + + +def test_i_can_handle_empty_list(): + """Test that empty list returns empty list.""" + items = [] + sort_conf = SortConf(property="name") + + result = sort_by(items, sort_conf) + + assert result == [] + + +def test_i_can_sort_with_missing_properties(): + """Test sorting when some objects don't have the property.""" + items = [ + Expando({"name": "bob"}), + Expando({"age": 25}), # no name property + Expando({"name": "alice"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + # Items without the property should get empty string and sort first + assert result[0].geti("name", "") == "" # the one with missing property + assert result[1].name == "alice" + assert result[2].name == "bob" + + +def test_i_can_sort_with_none_values(): + """Test sorting when properties have None values.""" + items = [ + Expando({"name": "bob"}), + Expando({"name": None}), + Expando({"name": "alice"}) + ] + sort_conf = SortConf(property="name", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + # None values should be treated as empty strings and sort first + assert result[0].name is None + assert result[1].name == "alice" + assert result[2].name == "bob" + + +def test_i_can_sort_with_case_insensitive_property_names(): + """Test sorting using case-insensitive property names.""" + items = [ + Expando({"Name": "charlie"}), # uppercase N + Expando({"Name": "alice"}), + Expando({"Name": "bob"}) + ] + sort_conf = SortConf(property="name", ascending=True) # lowercase n + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].Name == "alice" + assert result[1].Name == "bob" + assert result[2].Name == "charlie" + + + +def test_i_can_sort_mixed_types(): + """Test sorting with mixed types (numbers, strings, None).""" + items = [ + Expando({"value": "zebra"}), + Expando({"value": 10}), + Expando({"value": None}), + Expando({"value": "apple"}) + ] + sort_conf = SortConf(property="value", ascending=True) + + result = sort_by(items, sort_conf) + + assert len(result) == 4 + # Mixed types will be converted to strings for comparison + # None -> "", numbers -> str(number), strings stay strings + assert result[0].value is None # None becomes "" and sorts first + assert result[1].value == 10 # "10" sorts before "apple" + assert result[2].value == "apple" # "apple" sorts before "zebra" + assert result[3].value == "zebra" + + +def test_i_can_sort_with_whitespace_in_property_name(): + """Test that property names are stripped of whitespace.""" + items = [ + Expando({"name": "charlie"}), + Expando({"name": "alice"}), + Expando({"name": "bob"}) + ] + sort_conf = SortConf(property=" name ", ascending=True) # whitespace around property + + result = sort_by(items, sort_conf) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[1].name == "bob" + assert result[2].name == "charlie"