Added helper functions alongside their unit tests

This commit is contained in:
2025-09-03 23:25:27 +02:00
parent 343d1d2f93
commit 7aafaa41ed
13 changed files with 978 additions and 993 deletions

View File

@@ -12,6 +12,7 @@ server_app.command("check")(server.check)
audio_app.command("list")(audio.list_devices)
audio_app.command("test")(audio.test_device)
audio_app.command("config")(audio.config_info)
audio_app.command("install")(audio.install)
# Register subcommands
app.add_typer(server_app, name="server")

View File

@@ -1,79 +1,14 @@
import sounddevice as sd
import subprocess
import typer
import numpy as np
import os
import json
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass
from typing import Optional, List
from rich.console import Console
from rich.table import Table
from ..utils import SortConf, sort_by, FilterConf, filter_by
from ..devices import get_audio_devices_windows, install_audio_cmdlets
from ..core.utils import *
from ..config import AppConfig
@dataclass
class WindowsAudioDevice:
"""Windows audio device data structure."""
name: str
status: str
device_type: str # "Input", "Output", "Unknown"
instance_id: str
@dataclass
class LinuxAudioDevice:
"""Linux audio device data structure."""
device_id: int
name: str
max_input_channels: int
max_output_channels: int
default_samplerate: float
hostapi_name: str
def list_devices(
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""List audio devices."""
# Load configuration for context
config = AppConfig.load(config_file)
# Determine title based on environment and options
is_wsl = detect_wsl()
use_windows = is_wsl and not native
sort_config = SortConf(sort_by, not desc) if sort_by else None
filter_config = FilterConf.new(filter_by) if filter_by else None
if use_windows:
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
fg=typer.colors.BLUE))
windows_devices = get_windows_audio_devices()
display_windows_devices(windows_devices, sort_config, filter_config)
else:
linux_devices = get_linux_audio_devices()
display_linux_devices(linux_devices, config, pulse, sort_config)
# Show legend
typer.echo(f"\nLegend:")
if use_windows:
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN))
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
else:
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
def detect_wsl() -> bool:
def _detect_wsl() -> bool:
"""Detect if running under Windows Subsystem for Linux."""
try:
# Method 1: Check /proc/version for Microsoft
@@ -99,365 +34,7 @@ def detect_wsl() -> bool:
except Exception:
return False
def get_windows_audio_devices_com() -> List[WindowsAudioDevice]:
"""
Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell.
Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow.
"""
ps_script = r"""
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
$code = @"
using System;
using System.Runtime.InteropServices;
using System.Collections.Generic;
[ComImport]
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
class MMDeviceEnumerator {}
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IMMDeviceEnumerator {
int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices);
// Other methods not used are omitted
}
enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 }
[Flags]
enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF }
[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IMMDeviceCollection {
int GetCount(out int pcDevices);
int Item(int nDevice, out IMMDevice ppDevice);
}
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IMMDevice {
int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface);
int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties);
int GetId(out string ppstrId);
int GetState(out int pdwState);
}
[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IPropertyStore {
int GetCount(out int cProps);
int GetAt(int iProp, out PROPERTYKEY pkey);
int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv);
int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar);
int Commit();
}
[StructLayout(LayoutKind.Sequential)]
struct PROPERTYKEY {
public Guid fmtid;
public int pid;
}
[StructLayout(LayoutKind.Explicit)]
struct PROPVARIANT {
[FieldOffset(0)] public ushort vt;
[FieldOffset(8)] public IntPtr pointerValue;
}
static class PKEY {
// PKEY_Device_FriendlyName
public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY {
fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14
};
}
public class DeviceInfo {
public string Name { get; set; }
public string Id { get; set; }
public string Type { get; set; } // "Input" or "Output"
public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown"
}
public static class CoreAudioEnumerator {
[DllImport("ole32.dll")]
private static extern int PropVariantClear(ref PROPVARIANT pvar);
private static string ReadString(IPropertyStore store, PROPERTYKEY key) {
PROPVARIANT pv = new PROPVARIANT();
try {
store.GetValue(ref key, out pv);
// VT_LPWSTR = 31
if (pv.vt == 31) {
return Marshal.PtrToStringUni(pv.pointerValue);
}
return null;
} finally {
PropVariantClear(ref pv);
}
}
public static List<DeviceInfo> GetAll() {
var result = new List<DeviceInfo>();
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) {
IMMDeviceCollection coll;
enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll);
int count;
coll.GetCount(out count);
for (int i = 0; i < count; i++) {
IMMDevice dev;
coll.Item(i, out dev);
string id;
dev.GetId(out id);
int state;
dev.GetState(out state);
IPropertyStore store;
dev.OpenPropertyStore(0 /* STGM_READ */, out store);
string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)";
string type = flow == EDataFlow.eCapture ? "Input" : "Output";
string status =
(state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" :
(state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" :
(state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" :
(state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" :
"Unknown";
result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status });
}
}
return result;
}
}
"@
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object {
[PSCustomObject]@{
FriendlyName = $_.Name
InstanceId = $_.Id
Type = $_.Type
Status = $_.Status
}
}
$devices | ConvertTo-Json
"""
try:
result = subprocess.run(
[
"powershell.exe",
"-NoProfile",
"-NonInteractive",
"-Command",
ps_script,
],
capture_output=True,
text=True,
encoding="utf-8",
timeout=30,
)
if result.returncode != 0:
err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED))
return []
output = result.stdout.strip()
if not output:
typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW))
return []
data = json.loads(output)
items = data if isinstance(data, list) else [data]
devices: List[WindowsAudioDevice] = []
for it in items:
name = (it.get("FriendlyName") or "").strip()
instance_id = it.get("InstanceId") or ""
dev_type = it.get("Type") or "Unknown"
status = it.get("Status") or "Unknown"
if not name:
continue
devices.append(
WindowsAudioDevice(
name=name,
status=status,
device_type=dev_type,
instance_id=instance_id,
)
)
return devices
except json.JSONDecodeError as e:
typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED))
return []
except subprocess.TimeoutExpired:
typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED))
return []
except Exception as e:
typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}",
fg=typer.colors.RED))
return []
def get_windows_default_devices() -> Dict[str, str]:
"""Get Windows default audio devices via PowerShell."""
ps_script = r"""
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
$code = @"
using System;
using System.Runtime.InteropServices;
[ComImport]
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
class MMDeviceEnumerator {}
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IMMDeviceEnumerator {
int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint);
}
enum EDataFlow { eRender = 0, eCapture = 1 }
enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 }
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface IMMDevice {
int GetId(out string ppstrId);
}
public static class DefaultDeviceHelper {
public static string GetDefaultInputDevice() {
try {
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
IMMDevice device;
enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device);
string id;
device.GetId(out id);
return id;
} catch { return null; }
}
public static string GetDefaultOutputDevice() {
try {
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
IMMDevice device;
enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device);
string id;
device.GetId(out id);
return id;
} catch { return null; }
}
}
"@
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice()
$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice()
[PSCustomObject]@{
DefaultInput = $defaultInput
DefaultOutput = $defaultOutput
} | ConvertTo-Json
"""
try:
result = subprocess.run([
"powershell.exe",
"-NoProfile",
"-NonInteractive",
"-Command",
ps_script,
], capture_output=True, text=True, encoding="utf-8", timeout=15)
if result.returncode != 0:
return {"default_input": None, "default_output": None}
if not result.stdout.strip():
return {"default_input": None, "default_output": None}
data = json.loads(result.stdout)
return {
"default_input": data.get("DefaultInput"),
"default_output": data.get("DefaultOutput")
}
except Exception:
return {"default_input": None, "default_output": None}
def get_windows_audio_devices() -> List[WindowsAudioDevice]:
"""Get detailed Windows audio devices via PowerShell from WSL."""
try:
result = subprocess.run([
"powershell.exe",
"-NoProfile",
"-NonInteractive",
"-Command",
# Force UTF-8 output from PowerShell before running the command
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; "
"Get-PnpDevice -Class AudioEndpoint | "
"Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
], capture_output=True, text=True, encoding="utf-8", timeout=15)
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}"
typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED))
return []
if not result.stdout.strip():
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
return []
data = json.loads(result.stdout)
devices = data if isinstance(data, list) else [data]
windows_devices = []
for device in devices:
name = device.get('FriendlyName', '').strip()
status = device.get('Status', 'Unknown')
instance_id = device.get('InstanceId', '')
if not name:
continue
# Determine device type from InstanceId
device_type = _categorize_device_type(instance_id)
windows_devices.append(WindowsAudioDevice(
name=name,
status=status,
device_type=device_type,
instance_id=instance_id
))
return windows_devices
except json.JSONDecodeError as e:
typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED))
return []
except subprocess.TimeoutExpired:
typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED))
return []
except Exception as e:
typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED))
return []
def _categorize_device_type(instance_id: str) -> str:
def _get_device_type(instance_id: str) -> str:
"""Categorize device type based on InstanceId."""
instance_lower = instance_id.lower()
if '0.0.1.00000000' in instance_lower:
@@ -467,8 +44,69 @@ def _categorize_device_type(instance_id: str) -> str:
else:
return "Unknown"
def list_devices(
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""List audio devices."""
# Load configuration for context
config = AppConfig.load(config_file)
# Determine title based on environment and options
is_wsl = _detect_wsl()
use_windows = is_wsl and not native
if use_windows:
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
fg=typer.colors.BLUE))
result = get_audio_devices_windows()
if not check_result(result):
return
windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''})
# apply sorting and filtering
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
display_as_table(windows_devices, [
{"name": "Index"},
{"name": "ID"},
{"name": "Name"},
{"name": "Type"},
{"name": "Default"},
])
else:
linux_devices = get_linux_audio_devices()
display_linux_devices(linux_devices, config, pulse, sort_config)
# Show legend
typer.echo(f"\nLegend:")
if use_windows:
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN))
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
else:
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
def get_linux_audio_devices() -> List[LinuxAudioDevice]:
def install():
result = install_audio_cmdlets()
if not check_result(result):
return
typer.echo(result.stdout)
def get_linux_audio_devices() -> list:
"""Get Linux audio devices using sounddevice."""
try:
devices = sd.query_devices()
@@ -492,87 +130,7 @@ def get_linux_audio_devices() -> List[LinuxAudioDevice]:
raise typer.Exit(1)
def display_windows_devices(devices: List[WindowsAudioDevice],
sort_conf: Optional[SortConf] = None,
filter_conf: Optional[FilterConf] = None
) -> None:
"""Display Windows audio devices with optimized formatting."""
def _get_id(_instance_id):
return _instance_id[31:39]
if not devices:
typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED))
return
# Get default devices
default_devices = get_windows_default_devices()
default_input_id = default_devices.get("default_input")
default_output_id = default_devices.get("default_output")
if not devices:
typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED))
# Create rich table
table = Table(show_header=True)
table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis")
table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis")
table.add_column("Type", width=8, justify="center")
table.add_column("Status", width=3, justify="center")
table.add_column("*", width=2, justify="center")
# Add devices to table
rows = [] # list of dict
for device in devices:
# Determine if device is selected (default)
is_selected = False
if device.device_type == "Input" and device.instance_id == default_input_id:
is_selected = True
elif device.device_type == "Output" and device.instance_id == default_output_id:
is_selected = True
selected_marker = "*" if is_selected else ""
# Color coding for status
if device.status == "OK":
status_text = f"[green]{device.status}[/green]"
else:
status_text = f"[red]{device.status}[/red]"
# Style selected row
if is_selected:
row_style = "bold"
id_text = f"[bold]{_get_id(device.instance_id)}[/bold]"
name_text = f"[bold]{device.name}[/bold]"
type_text = f"[bold]{device.device_type}[/bold]"
selected_text = f"[bold yellow]{selected_marker}[/bold yellow]"
else:
row_style = None
id_text = _get_id(device.instance_id)
name_text = device.name
type_text = device.device_type
selected_text = selected_marker
rows.append({
"id": id_text,
"name": name_text,
"type": type_text,
"status": status_text,
"selected": selected_text
})
rows = sort_by(rows, sort_conf)
rows = filter_by(rows, filter_conf)
for row in rows:
table.add_row(*row.values())
# Display table
console = Console()
console.print(table)
def display_linux_devices(devices: List[LinuxAudioDevice],
def display_linux_devices(devices: list,
config: AppConfig,
show_pulse: bool = False,
sort_conf: Optional[SortConf] = None) -> None:
@@ -625,14 +183,14 @@ def display_linux_devices(devices: List[LinuxAudioDevice],
# Legacy function for backwards compatibility
def get_audio_devices():
"""Get comprehensive information about audio devices."""
try:
devices = sd.query_devices()
return devices
except Exception as e:
typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
raise typer.Exit(1)
# def get_audio_devices():
# """Get comprehensive information about audio devices."""
# try:
# devices = sd.query_devices()
# return devices
# except Exception as e:
# typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
# raise typer.Exit(1)
def get_pulseaudio_sources():

104
src/core/Expando.py Normal file
View File

@@ -0,0 +1,104 @@
NO_DEFAULT = object()
class Expando:
"""
Readonly dynamic class that eases the access to attributes and sub attributes
It is initialized with a dict
You can then access the property using dot '.' (ex. obj.prop1.prop2)
"""
def __init__(self, props):
self._props = props
def __getattr__(self, item):
if item not in self._props:
raise AttributeError(item)
current = self._props[item]
return Expando(current) if isinstance(current, dict) else current
def __setitem__(self, key, value):
self._props[key] = value
def get(self, path):
"""
returns the value, from a string with represents the path
:param path:
:return:
"""
current = self._props
for attr in path.split("."):
if isinstance(current, list):
temp = []
for value in current:
if value and attr in value:
temp.append(value[attr])
current = temp
else:
if current is None or attr not in current:
return None
current = current[attr]
return current
def geti(self, item, default=NO_DEFAULT):
"""
insensitive get
:param item:
:param default:
:return:
"""
if item in self._props:
return self._props[item]
for k, v in self._props.items():
if k.lower() == item.lower():
return v
if default is not NO_DEFAULT:
return default
raise AttributeError(f"Item '{item}' not found")
def as_dict(self):
"""
Return the information as a dictionary
:return:
"""
return self._props.copy()
def to_dict(self, mappings: dict) -> dict:
"""
Return the information as a dictionary, with the given mappings
"""
return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None}
def __hasattr__(self, item):
return item in self._props
def values(self):
return self._props.values()
def copy(self):
return Expando(self._props.copy())
def __repr__(self):
if "key" in self._props:
return f"Expando(key={self._props["key"]})"
props_as_str = str(self._props)
if len(props_as_str) > 50:
props_as_str = props_as_str[:50] + "..."
return f"Expando({props_as_str})"
def __eq__(self, other):
if not isinstance(other, Expando):
return False
return self._props == other._props
def __hash__(self):
return hash(tuple(sorted(self._props.items())))

0
src/core/__init__.py Normal file
View File

221
src/core/utils.py Normal file
View File

@@ -0,0 +1,221 @@
import json
import subprocess
from dataclasses import dataclass
from subprocess import CompletedProcess
from typing import Any
import typer
from rich.console import Console
from rich.table import Table
from .Expando import Expando
@dataclass
class SortConf:
property: str
ascending: bool = True
@dataclass
class FilterConf:
property: str
value: str
@staticmethod
def new(text):
"""Initialize FilterConf from text."""
if text is None:
return None
parts = text.split("=")
if len(parts) != 2:
return None
if not parts[0].strip() or not parts[1].strip():
return None
return FilterConf(parts[0].strip(), parts[1].strip())
@dataclass
class ProcessResult:
result: Any
error: str = None
def sort_by(items: list[Expando], sort_conf: SortConf):
"""Sort a list of items by a given property."""
def _safe_sort_key(item):
"""Get sort key with safe handling of None values."""
value = item.geti(property_name, "")
# Convert None to empty string for consistent sorting
return "" if value is None else str(value)
if sort_conf is None:
return items
if len(items) == 0:
return items
if sort_conf.property is None:
return items
property_name = sort_conf.property.strip()
return sorted(items, key=_safe_sort_key, reverse=not sort_conf.ascending)
def filter_by(items, filter_conf: FilterConf):
"""Filter a list of items by a given property."""
def _contains(item, value):
return str(value).lower() in str(item).lower()
if (filter_conf is None
or filter_conf.property is None
or len(items) == 0):
return items
predicate = _contains
property_name = filter_conf.property.strip()
return [
item for item in items if predicate(item.geti(property_name), filter_conf.value)
]
def run_ps_command(command: str) -> CompletedProcess[str]:
"""Run a PowerShell command and return the output"""
completed = subprocess.run(
["powershell.exe",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-Command",
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;",
command],
capture_output=True,
text=True,
encoding="utf-8", timeout=5
)
return completed
def run_ps_command_live(command: str) -> CompletedProcess[str]:
"""Run a PowerShell command and display output in real-time"""
process = subprocess.Popen(
["powershell.exe",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-Command",
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;",
command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8"
)
stdout_lines = []
stderr_lines = []
# Read output in real-time
while True:
stdout_line = process.stdout.readline()
stderr_line = process.stderr.readline()
if stdout_line:
print(stdout_line.rstrip())
stdout_lines.append(stdout_line)
if stderr_line:
print(f"ERROR: {stderr_line.rstrip()}")
stderr_lines.append(stderr_line)
# Check if process is finished
if process.poll() is not None:
break
# Read any remaining lines
remaining_stdout, remaining_stderr = process.communicate()
if remaining_stdout:
print(remaining_stdout.rstrip())
stdout_lines.append(remaining_stdout)
if remaining_stderr:
print(f"ERROR: {remaining_stderr.rstrip()}")
stderr_lines.append(remaining_stderr)
# Create CompletedProcess object to maintain compatibility
completed = CompletedProcess(
args=["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command],
returncode=process.returncode,
stdout=''.join(stdout_lines),
stderr=''.join(stderr_lines)
)
return completed
def get_results(stdout: str, mappings) -> list[dict]:
stripped = stdout.strip()
if not stripped:
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
return []
data = json.loads(stripped)
as_list = data if isinstance(data, list) else [data]
res = []
for item in as_list:
mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()}
res.append(Expando(mapped))
return res
def display_as_table(result, columns_settings: list):
formatters = {}
table = Table(show_header=True)
for col in [c for c in columns_settings if "name" in c]:
col_name = col["name"]
table.add_column(col_name, **col["attrs"] if "attrs" in col else {})
formatter = col.pop("formatter", None)
if formatter:
formatters[col_name] = formatter
for item in result:
cloned = item.copy()
# apply formatters
for col, formatter in formatters.items():
cloned[col] = formatter(item)
table.add_row(*[str(value) for value in cloned.values()])
# Display table
console = Console()
console.print(table)
def check_result(result):
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}"
typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED))
return False
return True
def filter_and_sort(items, filter_info, sort_info, desc_info=False):
if filter_info:
filter_config = FilterConf.new(filter_info) if filter_by else None
items = filter_by(items, filter_config)
if sort_info:
sort_config = SortConf(sort_info, not desc_info) if sort_by else None
items = sort_by(items, sort_config)
return items

78
src/devices.py Normal file
View File

@@ -0,0 +1,78 @@
from subprocess import CompletedProcess
import sounddevice as sd
from .core.Expando import Expando
from .core.utils import run_ps_command, run_ps_command_live, ProcessResult
def _is_audio_device_cmdlets_installed():
# PowerShell command to check if module is available
ps_command = "Get-Module -ListAvailable -Name AudioDeviceCmdlets"
result = run_ps_command(ps_command)
# If something is returned in stdout, the module is installed
return bool(result.stdout.strip())
def install_audio_cmdlets():
"""Install AudioDevice cmdlet for better audio device management"""
if _is_audio_device_cmdlets_installed():
return CompletedProcess(
args=["is_audio_device_cmdlets_installed()"],
returncode=0,
stdout="AudioDevice cmdlet is already installed",
stderr="")
ps_script = r"""
# 1) Trust PSGallery
Set-PSRepository -Name "PSGallery" -InstallationPolicy Trusted -ErrorAction SilentlyContinue
# 2) Ensure NuGet provider is available
if (-not (Get-PackageProvider -Name NuGet -ErrorAction SilentlyContinue)) {
Install-PackageProvider -Name NuGet -Force -Scope CurrentUser
}
# 3) Install AudioDeviceCmdlets if missing
if (-not (Get-Module -ListAvailable -Name AudioDeviceCmdlets)) {
Install-Module -Name AudioDeviceCmdlets -Scope CurrentUser -Force -AllowClobber
}
# 4) Import the module
Import-Module AudioDeviceCmdlets -ErrorAction SilentlyContinue
"""
return run_ps_command_live(ps_script)
def get_audio_devices_windows():
"""Get all audio devices using AudioDevice cmdlet"""
ps_script = "Get-AudioDevice -List | ConvertTo-Json"
return run_ps_command(ps_script)
def get_audio_devices_windows_from_pnp():
ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
return run_ps_command(ps_script)
def get_audio_devices_linux():
"""Get all audio devices using sounddevice module"""
try:
devices = sd.query_devices()
result = []
for i, device in enumerate(devices):
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
result.append(Expando({
"device_id": i,
"name": device['name'],
"max_input_channels": device['max_input_channels'],
"max_output_channels": device['max_output_channels'],
"default_samplerate": device['default_samplerate'],
"hostapi_name": hostapi_name}
))
return ProcessResult(result)
except Exception as e:
ProcessResult(None, f"Error querying Linux audio devices: {e}")

View File

@@ -1,66 +0,0 @@
from dataclasses import dataclass
@dataclass
class SortConf:
property: str
ascending: bool = True
@dataclass
class FilterConf:
property: str
value: str
@staticmethod
def new(text):
"""Initialize FilterConf from text."""
if text is None:
return None
parts = text.split("=")
if len(parts) != 2:
return None
return FilterConf(parts[0].strip(), parts[1].strip())
def sort_by(items, sort_conf: SortConf):
"""Sort a list of items by a given property."""
if sort_conf is None:
return items
if len(items) == 0:
return items
if isinstance(items[0], dict):
property_name = sort_conf.property.lower().strip()
return sorted(items, key=lambda item: item.get(property_name), reverse=not sort_conf.ascending)
else:
return sorted(items, key=lambda item: getattr(item, sort_conf.property), reverse=not sort_conf.ascending)
def filter_by(items, filter_conf: FilterConf):
"""Filter a list of items by a given property."""
def _contains(item, value):
return str(value).lower() in str(item).lower()
if (filter_conf is None
or filter_conf.property is None
or len(items) == 0):
return items
predicate = _contains
if isinstance(items[0], dict):
property_name = filter_conf.property.lower().strip()
return [
item for item in items if predicate(item.get(property_name), filter_conf.value)
]
else:
return [
item for item in items if getattr(item, filter_conf.property) == filter_conf.value
]