command audio list is working as expected for windows

This commit is contained in:
2025-09-04 23:08:19 +02:00
parent 7aafaa41ed
commit 2ce6d22f89
6 changed files with 361 additions and 36 deletions

View File

@@ -3,7 +3,7 @@ import numpy as np
import os import os
from typing import Optional, List from typing import Optional, List
from ..devices import get_audio_devices_windows, install_audio_cmdlets from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices
from ..core.utils import * from ..core.utils import *
from ..config import AppConfig from ..config import AppConfig
@@ -34,20 +34,51 @@ def _detect_wsl() -> bool:
except Exception: except Exception:
return False return False
def _get_device_type(instance_id: str) -> str: def _get_device_type(instance_id: str) -> str:
"""Categorize device type based on InstanceId.""" """Categorize device type based on InstanceId."""
instance_lower = instance_id.lower() instance_lower = instance_id.lower()
if '0.0.1.00000000' in instance_lower: if '0.0.1.00000000' in instance_lower:
return "Input" return "Recording"
elif '0.0.0.00000000' in instance_lower: elif '0.0.0.00000000' in instance_lower:
return "Output" return "Playback"
else: else:
return "Unknown" return "Unknown"
def _get_short_device_id(instance_id: str) -> str:
"""Get device name based on InstanceId."""
"SWD\MMDEVAPI\{0.0.0.00000000}.{20434736-BDB3-4CE2-A56B-5DF61D4DD593}"
if len(instance_id) == 68:
return instance_id[31:-1]
elif len(instance_id) == 55:
return instance_id[18:-1]
else:
return instance_id
def _get_colored_bool(status: bool) -> str:
"""Get colored status string based on status."""
if status:
return f"[green]{status}[/green]"
else:
return str(status)
def _get_colored_status(status: str) -> str:
"""Get colored status string based on status."""
if status == "Unknown":
return f"[red]{status}[/red]"
else:
return str(status)
def list_devices( def list_devices(
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"),
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
@@ -60,26 +91,36 @@ def list_devices(
# Determine title based on environment and options # Determine title based on environment and options
is_wsl = _detect_wsl() is_wsl = _detect_wsl()
use_windows = is_wsl and not native if is_wsl and not native:
if use_windows:
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
fg=typer.colors.BLUE)) fg=typer.colors.BLUE))
result = get_audio_devices_windows() result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows()
if not check_result(result): if not check_result(result):
return return
windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''}) windows_devices = get_results(result.stdout)
if all_devices:
select_conf = [
SelectConf(attr="Index", default=""),
SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)),
SelectConf(attr="FriendlyName", to="Name"),
SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)),
SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)),
]
else:
select_conf = [
SelectConf(attr="Index"),
SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)),
SelectConf(attr="Name"),
SelectConf(attr="Type"),
SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)),
]
windows_devices = select(windows_devices, select_conf)
# apply sorting and filtering # apply sorting and filtering
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info) windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
display_as_table(windows_devices, [ display_as_table(windows_devices)
{"name": "Index"},
{"name": "ID"},
{"name": "Name"},
{"name": "Type"},
{"name": "Default"},
])
else: else:
@@ -88,12 +129,8 @@ def list_devices(
# Show legend # Show legend
typer.echo(f"\nLegend:") typer.echo(f"\nLegend:")
if use_windows: typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) typer.echo(f" IN/OUT = Input/Output channel count")
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
else:
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
def install(): def install():
@@ -104,8 +141,6 @@ def install():
typer.echo(result.stdout) typer.echo(result.stdout)
def get_linux_audio_devices() -> list: def get_linux_audio_devices() -> list:
"""Get Linux audio devices using sounddevice.""" """Get Linux audio devices using sounddevice."""
try: try:

View File

@@ -21,10 +21,11 @@ class Expando:
def __setitem__(self, key, value): def __setitem__(self, key, value):
self._props[key] = value self._props[key] = value
def get(self, path): def get(self, path, default=None):
""" """
returns the value, from a string with represents the path returns the value, from a string with represents the path
:param path: :param path:
:param default: value to return if path is not found
:return: :return:
""" """
current = self._props current = self._props
@@ -38,7 +39,7 @@ class Expando:
else: else:
if current is None or attr not in current: if current is None or attr not in current:
return None return default
current = current[attr] current = current[attr]
return current return current

View File

@@ -2,7 +2,7 @@ import json
import subprocess import subprocess
from dataclasses import dataclass from dataclasses import dataclass
from subprocess import CompletedProcess from subprocess import CompletedProcess
from typing import Any from typing import Any, Callable
import typer import typer
from rich.console import Console from rich.console import Console
@@ -38,6 +38,14 @@ class FilterConf:
return FilterConf(parts[0].strip(), parts[1].strip()) return FilterConf(parts[0].strip(), parts[1].strip())
@dataclass
class SelectConf:
attr: str # property nam to select
to: str = None # rename the property
default: str = None # value to use if property is missing
formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item
@dataclass @dataclass
class ProcessResult: class ProcessResult:
result: Any result: Any
@@ -52,7 +60,6 @@ def sort_by(items: list[Expando], sort_conf: SortConf):
value = item.geti(property_name, "") value = item.geti(property_name, "")
# Convert None to empty string for consistent sorting # Convert None to empty string for consistent sorting
return "" if value is None else str(value) return "" if value is None else str(value)
if sort_conf is None: if sort_conf is None:
return items return items
@@ -86,6 +93,23 @@ def filter_by(items, filter_conf: FilterConf):
] ]
def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]:
res = []
for item in items:
new_item = {}
for setting in settings:
attr = setting.attr.strip()
key = setting.to or attr
value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default)
new_item[key] = value
res.append(Expando(new_item))
return res
def run_ps_command(command: str) -> CompletedProcess[str]: def run_ps_command(command: str) -> CompletedProcess[str]:
"""Run a PowerShell command and return the output""" """Run a PowerShell command and return the output"""
completed = subprocess.run( completed = subprocess.run(
@@ -160,7 +184,7 @@ def run_ps_command_live(command: str) -> CompletedProcess[str]:
return completed return completed
def get_results(stdout: str, mappings) -> list[dict]: def get_results(stdout: str) -> list[Expando]:
stripped = stdout.strip() stripped = stdout.strip()
if not stripped: if not stripped:
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
@@ -169,15 +193,20 @@ def get_results(stdout: str, mappings) -> list[dict]:
data = json.loads(stripped) data = json.loads(stripped)
as_list = data if isinstance(data, list) else [data] as_list = data if isinstance(data, list) else [data]
res = [] return [Expando(item) for item in as_list]
for item in as_list:
mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()}
res.append(Expando(mapped)) def display_as_table(result, columns_settings: list = None):
def _create_default_columns_settings():
if len(result) == 0:
return []
blue_print = result[0]
return [{"name": attr} for attr in blue_print.as_dict().keys()]
if columns_settings is None:
columns_settings = _create_default_columns_settings()
return res
def display_as_table(result, columns_settings: list):
formatters = {} formatters = {}
table = Table(show_header=True) table = Table(show_header=True)
for col in [c for c in columns_settings if "name" in c]: for col in [c for c in columns_settings if "name" in c]:

View File

@@ -50,7 +50,7 @@ def get_audio_devices_windows():
return run_ps_command(ps_script) return run_ps_command(ps_script)
def get_audio_devices_windows_from_pnp(): def get_audio_devices_windows_from_pnp_devices():
ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
return run_ps_command(ps_script) return run_ps_command(ps_script)

View File

@@ -29,6 +29,10 @@ def test_i_can_get():
assert dynamic.get("a") == 10 assert dynamic.get("a") == 10
assert dynamic.get("b.c") == "value" assert dynamic.get("b.c") == "value"
assert dynamic.get("unknown") is None assert dynamic.get("unknown") is None
assert dynamic.get("unknown", "default") == "default"
assert dynamic.get("b.x") is None
assert dynamic.get("b.x", "default") == "default"
assert dynamic.get("b.x", None) is None
def test_i_can_get_insensitive(): def test_i_can_get_insensitive():

256
tests/test_select.py Normal file
View File

@@ -0,0 +1,256 @@
import pytest
from core.utils import SelectConf, select
from core.Expando import Expando
def test_i_can_select_basic_attributes():
"""Test basic selection of existing attributes."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris"}),
Expando({"name": "bob", "age": 30, "city": "london"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert not hasattr(result[0], "city") # city not selected
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_select_with_renamed_attributes():
"""Test renaming attributes using SelectConf.to."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="full_name"),
SelectConf(attr="age", to="years")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice"
assert result[0].years == 25
assert not hasattr(result[0], "name") # original name not present
assert not hasattr(result[0], "age") # original name not present
assert result[1].full_name == "bob"
assert result[1].years == 30
def test_i_can_select_with_formatter():
"""Test applying formatters to selected attributes."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "ALICE"
assert result[0].age == 50
assert result[1].name == "BOB"
assert result[1].age == 60
def test_i_can_select_with_formatter_and_rename():
"""Test combining formatter and renaming."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].upper_name == "ALICE"
assert result[0].double_age == 50
assert not hasattr(result[0], "name")
assert not hasattr(result[0], "age")
assert result[1].upper_name == "BOB"
assert result[1].double_age == 60
def test_i_can_handle_empty_items_list():
"""Test that empty items list returns empty list."""
items = []
settings = [SelectConf(attr="name")]
result = select(items, settings)
assert result == []
def test_i_can_handle_empty_settings_list():
"""Test that empty settings list returns items with empty properties."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = []
result = select(items, settings)
assert len(result) == 2
# Each result should be an Expando with empty properties
assert result[0]._props == {}
assert result[1]._props == {}
def test_i_can_handle_missing_attributes():
"""Test that missing attributes return None (normal behavior)."""
items = [
Expando({"name": "alice"}), # no age attribute
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age") # missing in first item
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age is None # missing attribute returns None
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_handle_whitespace_in_attr_names():
"""Test that whitespace in attribute names is stripped."""
items = [
Expando({"name": "alice", "age": 25})
]
settings = [
SelectConf(attr=" name "), # whitespace around attr
SelectConf(attr="\tage\n") # tabs and newlines
]
result = select(items, settings)
assert len(result) == 1
assert result[0].name == "alice"
assert result[0].age == 25
def test_i_can_select_multiple_attributes():
"""Test selecting multiple attributes from multiple items."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}),
Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}),
Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="city", to="location"),
SelectConf(attr="country", formatter=lambda item: item.country.upper())
]
result = select(items, settings)
assert len(result) == 3
assert result[0].name == "alice"
assert result[0].location == "paris"
assert result[0].country == "FRANCE"
assert result[1].name == "bob"
assert result[1].location == "london"
assert result[1].country == "UK"
assert result[2].name == "charlie"
assert result[2].location == "madrid"
assert result[2].country == "SPAIN"
def test_i_can_handle_formatter_with_whole_item():
"""Test that formatter receives the whole item, not just the attribute value."""
items = [
Expando({"first_name": "alice", "last_name": "smith"}),
Expando({"first_name": "bob", "last_name": "jones"})
]
settings = [
SelectConf(
attr="first_name",
to="full_name",
formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice smith"
assert result[1].full_name == "bob jones"
def test_i_cannot_select_when_formatter_raises_exception():
"""Test that formatter exceptions are propagated."""
items = [
Expando({"name": "alice", "value": None})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() # will fail on None
)
]
with pytest.raises(AttributeError):
select(items, settings)
def test_i_can_handle_none_values_in_formatter():
"""Test formatter handling None values gracefully when designed to do so."""
items = [
Expando({"name": "alice", "value": None}),
Expando({"name": "bob", "value": "test"})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].value == "NO_VALUE"
assert result[1].value == "TEST"
def test_i_can_select_nested_attributes():
"""Test selecting nested attributes using Expando.get() path notation."""
items = [
Expando({"user": {"profile": {"name": "alice"}}, "age": 25}),
Expando({"user": {"profile": {"name": "bob"}}, "age": 30})
]
settings = [
SelectConf(attr="user.profile.name", to="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert result[1].name == "bob"
assert result[1].age == 30