diff --git a/src/commands/audio.py b/src/commands/audio.py index 278d13a..f8fa405 100644 --- a/src/commands/audio.py +++ b/src/commands/audio.py @@ -3,7 +3,7 @@ import numpy as np import os from typing import Optional, List -from ..devices import get_audio_devices_windows, install_audio_cmdlets +from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices from ..core.utils import * from ..config import AppConfig @@ -34,20 +34,51 @@ def _detect_wsl() -> bool: except Exception: return False + def _get_device_type(instance_id: str) -> str: """Categorize device type based on InstanceId.""" instance_lower = instance_id.lower() if '0.0.1.00000000' in instance_lower: - return "Input" + return "Recording" elif '0.0.0.00000000' in instance_lower: - return "Output" + return "Playback" else: return "Unknown" + +def _get_short_device_id(instance_id: str) -> str: + """Get device name based on InstanceId.""" + "SWD\MMDEVAPI\{0.0.0.00000000}.{20434736-BDB3-4CE2-A56B-5DF61D4DD593}" + + if len(instance_id) == 68: + return instance_id[31:-1] + elif len(instance_id) == 55: + return instance_id[18:-1] + else: + return instance_id + + +def _get_colored_bool(status: bool) -> str: + """Get colored status string based on status.""" + if status: + return f"[green]{status}[/green]" + else: + return str(status) + + +def _get_colored_status(status: str) -> str: + """Get colored status string based on status.""" + if status == "Unknown": + return f"[red]{status}[/red]" + else: + return str(status) + + def list_devices( device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), + all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"), sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), @@ -60,26 +91,36 @@ def list_devices( # Determine title based on environment and options is_wsl = _detect_wsl() - use_windows = is_wsl and not native - if use_windows: + if is_wsl and not native: typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", fg=typer.colors.BLUE)) - result = get_audio_devices_windows() + result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows() if not check_result(result): return - windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''}) + windows_devices = get_results(result.stdout) + if all_devices: + select_conf = [ + SelectConf(attr="Index", default=""), + SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)), + SelectConf(attr="FriendlyName", to="Name"), + SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)), + SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)), + ] + else: + select_conf = [ + SelectConf(attr="Index"), + SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)), + SelectConf(attr="Name"), + SelectConf(attr="Type"), + SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)), + ] + windows_devices = select(windows_devices, select_conf) # apply sorting and filtering windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info) - display_as_table(windows_devices, [ - {"name": "Index"}, - {"name": "ID"}, - {"name": "Name"}, - {"name": "Type"}, - {"name": "Default"}, - ]) + display_as_table(windows_devices) else: @@ -88,12 +129,8 @@ def list_devices( # Show legend typer.echo(f"\nLegend:") - if use_windows: - typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) - typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED)) - else: - typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) - typer.echo(f" IN/OUT = Input/Output channel count") + typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW)) + typer.echo(f" IN/OUT = Input/Output channel count") def install(): @@ -104,8 +141,6 @@ def install(): typer.echo(result.stdout) - - def get_linux_audio_devices() -> list: """Get Linux audio devices using sounddevice.""" try: diff --git a/src/core/Expando.py b/src/core/Expando.py index 1939c0a..e538810 100644 --- a/src/core/Expando.py +++ b/src/core/Expando.py @@ -21,10 +21,11 @@ class Expando: def __setitem__(self, key, value): self._props[key] = value - def get(self, path): + def get(self, path, default=None): """ returns the value, from a string with represents the path :param path: + :param default: value to return if path is not found :return: """ current = self._props @@ -38,7 +39,7 @@ class Expando: else: if current is None or attr not in current: - return None + return default current = current[attr] return current diff --git a/src/core/utils.py b/src/core/utils.py index 680c963..f1a1dd9 100644 --- a/src/core/utils.py +++ b/src/core/utils.py @@ -2,7 +2,7 @@ import json import subprocess from dataclasses import dataclass from subprocess import CompletedProcess -from typing import Any +from typing import Any, Callable import typer from rich.console import Console @@ -38,6 +38,14 @@ class FilterConf: return FilterConf(parts[0].strip(), parts[1].strip()) +@dataclass +class SelectConf: + attr: str # property nam to select + to: str = None # rename the property + default: str = None # value to use if property is missing + formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item + + @dataclass class ProcessResult: result: Any @@ -52,7 +60,6 @@ def sort_by(items: list[Expando], sort_conf: SortConf): value = item.geti(property_name, "") # Convert None to empty string for consistent sorting return "" if value is None else str(value) - if sort_conf is None: return items @@ -86,6 +93,23 @@ def filter_by(items, filter_conf: FilterConf): ] +def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]: + res = [] + + for item in items: + new_item = {} + for setting in settings: + attr = setting.attr.strip() + + key = setting.to or attr + value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default) + new_item[key] = value + + res.append(Expando(new_item)) + + return res + + def run_ps_command(command: str) -> CompletedProcess[str]: """Run a PowerShell command and return the output""" completed = subprocess.run( @@ -160,7 +184,7 @@ def run_ps_command_live(command: str) -> CompletedProcess[str]: return completed -def get_results(stdout: str, mappings) -> list[dict]: +def get_results(stdout: str) -> list[Expando]: stripped = stdout.strip() if not stripped: typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) @@ -169,15 +193,20 @@ def get_results(stdout: str, mappings) -> list[dict]: data = json.loads(stripped) as_list = data if isinstance(data, list) else [data] - res = [] - for item in as_list: - mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()} - res.append(Expando(mapped)) + return [Expando(item) for item in as_list] + + +def display_as_table(result, columns_settings: list = None): + def _create_default_columns_settings(): + if len(result) == 0: + return [] + + blue_print = result[0] + return [{"name": attr} for attr in blue_print.as_dict().keys()] + + if columns_settings is None: + columns_settings = _create_default_columns_settings() - return res - - -def display_as_table(result, columns_settings: list): formatters = {} table = Table(show_header=True) for col in [c for c in columns_settings if "name" in c]: diff --git a/src/devices.py b/src/devices.py index 38075cd..eeb65de 100644 --- a/src/devices.py +++ b/src/devices.py @@ -50,7 +50,7 @@ def get_audio_devices_windows(): return run_ps_command(ps_script) -def get_audio_devices_windows_from_pnp(): +def get_audio_devices_windows_from_pnp_devices(): ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" return run_ps_command(ps_script) diff --git a/tests/test_expando.py b/tests/test_expando.py index 0f0ef29..63ca95d 100644 --- a/tests/test_expando.py +++ b/tests/test_expando.py @@ -29,6 +29,10 @@ def test_i_can_get(): assert dynamic.get("a") == 10 assert dynamic.get("b.c") == "value" assert dynamic.get("unknown") is None + assert dynamic.get("unknown", "default") == "default" + assert dynamic.get("b.x") is None + assert dynamic.get("b.x", "default") == "default" + assert dynamic.get("b.x", None) is None def test_i_can_get_insensitive(): diff --git a/tests/test_select.py b/tests/test_select.py new file mode 100644 index 0000000..b937a1c --- /dev/null +++ b/tests/test_select.py @@ -0,0 +1,256 @@ +import pytest + +from core.utils import SelectConf, select +from core.Expando import Expando + + +def test_i_can_select_basic_attributes(): + """Test basic selection of existing attributes.""" + items = [ + Expando({"name": "alice", "age": 25, "city": "paris"}), + Expando({"name": "bob", "age": 30, "city": "london"}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="age") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age == 25 + assert not hasattr(result[0], "city") # city not selected + assert result[1].name == "bob" + assert result[1].age == 30 + + +def test_i_can_select_with_renamed_attributes(): + """Test renaming attributes using SelectConf.to.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", to="full_name"), + SelectConf(attr="age", to="years") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].full_name == "alice" + assert result[0].years == 25 + assert not hasattr(result[0], "name") # original name not present + assert not hasattr(result[0], "age") # original name not present + assert result[1].full_name == "bob" + assert result[1].years == 30 + + +def test_i_can_select_with_formatter(): + """Test applying formatters to selected attributes.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", formatter=lambda item: item.name.upper()), + SelectConf(attr="age", formatter=lambda item: item.age * 2) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "ALICE" + assert result[0].age == 50 + assert result[1].name == "BOB" + assert result[1].age == 60 + + +def test_i_can_select_with_formatter_and_rename(): + """Test combining formatter and renaming.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()), + SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].upper_name == "ALICE" + assert result[0].double_age == 50 + assert not hasattr(result[0], "name") + assert not hasattr(result[0], "age") + assert result[1].upper_name == "BOB" + assert result[1].double_age == 60 + + +def test_i_can_handle_empty_items_list(): + """Test that empty items list returns empty list.""" + items = [] + settings = [SelectConf(attr="name")] + + result = select(items, settings) + + assert result == [] + + +def test_i_can_handle_empty_settings_list(): + """Test that empty settings list returns items with empty properties.""" + items = [ + Expando({"name": "alice", "age": 25}), + Expando({"name": "bob", "age": 30}) + ] + settings = [] + + result = select(items, settings) + + assert len(result) == 2 + # Each result should be an Expando with empty properties + assert result[0]._props == {} + assert result[1]._props == {} + + +def test_i_can_handle_missing_attributes(): + """Test that missing attributes return None (normal behavior).""" + items = [ + Expando({"name": "alice"}), # no age attribute + Expando({"name": "bob", "age": 30}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="age") # missing in first item + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age is None # missing attribute returns None + assert result[1].name == "bob" + assert result[1].age == 30 + + +def test_i_can_handle_whitespace_in_attr_names(): + """Test that whitespace in attribute names is stripped.""" + items = [ + Expando({"name": "alice", "age": 25}) + ] + settings = [ + SelectConf(attr=" name "), # whitespace around attr + SelectConf(attr="\tage\n") # tabs and newlines + ] + + result = select(items, settings) + + assert len(result) == 1 + assert result[0].name == "alice" + assert result[0].age == 25 + + +def test_i_can_select_multiple_attributes(): + """Test selecting multiple attributes from multiple items.""" + items = [ + Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}), + Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}), + Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"}) + ] + settings = [ + SelectConf(attr="name"), + SelectConf(attr="city", to="location"), + SelectConf(attr="country", formatter=lambda item: item.country.upper()) + ] + + result = select(items, settings) + + assert len(result) == 3 + assert result[0].name == "alice" + assert result[0].location == "paris" + assert result[0].country == "FRANCE" + assert result[1].name == "bob" + assert result[1].location == "london" + assert result[1].country == "UK" + assert result[2].name == "charlie" + assert result[2].location == "madrid" + assert result[2].country == "SPAIN" + + +def test_i_can_handle_formatter_with_whole_item(): + """Test that formatter receives the whole item, not just the attribute value.""" + items = [ + Expando({"first_name": "alice", "last_name": "smith"}), + Expando({"first_name": "bob", "last_name": "jones"}) + ] + settings = [ + SelectConf( + attr="first_name", + to="full_name", + formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}" + ) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].full_name == "alice smith" + assert result[1].full_name == "bob jones" + + +def test_i_cannot_select_when_formatter_raises_exception(): + """Test that formatter exceptions are propagated.""" + items = [ + Expando({"name": "alice", "value": None}) + ] + settings = [ + SelectConf( + attr="value", + formatter=lambda item: item.get("value").upper() # will fail on None + ) + ] + + with pytest.raises(AttributeError): + select(items, settings) + + +def test_i_can_handle_none_values_in_formatter(): + """Test formatter handling None values gracefully when designed to do so.""" + items = [ + Expando({"name": "alice", "value": None}), + Expando({"name": "bob", "value": "test"}) + ] + settings = [ + SelectConf( + attr="value", + formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE" + ) + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].value == "NO_VALUE" + assert result[1].value == "TEST" + + +def test_i_can_select_nested_attributes(): + """Test selecting nested attributes using Expando.get() path notation.""" + items = [ + Expando({"user": {"profile": {"name": "alice"}}, "age": 25}), + Expando({"user": {"profile": {"name": "bob"}}, "age": 30}) + ] + settings = [ + SelectConf(attr="user.profile.name", to="name"), + SelectConf(attr="age") + ] + + result = select(items, settings) + + assert len(result) == 2 + assert result[0].name == "alice" + assert result[0].age == 25 + assert result[1].name == "bob" + assert result[1].age == 30