Added audio commands + "server test" command
This commit is contained in:
3
main.py
3
main.py
@@ -8,8 +8,7 @@ Usage:
|
||||
python main.py audio test [--device ID] [--duration N] [--save]
|
||||
python main.py audio config
|
||||
"""
|
||||
|
||||
from src.cli import app
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
app()
|
||||
|
||||
23
requirements.txt
Normal file
23
requirements.txt
Normal file
@@ -0,0 +1,23 @@
|
||||
cffi==1.17.1
|
||||
click==8.2.1
|
||||
comtypes==1.4.12
|
||||
iniconfig==2.1.0
|
||||
markdown-it-py==4.0.0
|
||||
mdurl==0.1.2
|
||||
numpy==2.3.2
|
||||
packaging==25.0
|
||||
pluggy==1.6.0
|
||||
psutil==7.0.0
|
||||
PyAudio==0.2.14
|
||||
pycaw==20240210
|
||||
pycparser==2.22
|
||||
Pygments==2.19.2
|
||||
pytest==8.4.1
|
||||
PyYAML==6.0.2
|
||||
rich==14.1.0
|
||||
scipy==1.16.1
|
||||
shellingham==1.5.4
|
||||
sounddevice==0.5.2
|
||||
typer==0.17.3
|
||||
typing_extensions==4.15.0
|
||||
wyoming==1.7.2
|
||||
@@ -9,14 +9,14 @@ audio_app = typer.Typer(help="Audio device operations")
|
||||
|
||||
# Add commands to subcommands
|
||||
server_app.command("check")(server.check)
|
||||
server_app.command("test")(server.test)
|
||||
audio_app.command("list")(audio.list_devices)
|
||||
audio_app.command("test")(audio.test_device)
|
||||
audio_app.command("config")(audio.config_info)
|
||||
audio_app.command("install")(audio.install)
|
||||
|
||||
# Register subcommands
|
||||
app.add_typer(server_app, name="server")
|
||||
app.add_typer(audio_app, name="audio")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
app()
|
||||
|
||||
@@ -1,79 +1,15 @@
|
||||
import sounddevice as sd
|
||||
import subprocess
|
||||
import typer
|
||||
import numpy as np
|
||||
import os
|
||||
import json
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, List
|
||||
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from ..utils import SortConf, sort_by, FilterConf, filter_by
|
||||
from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices, \
|
||||
get_audio_devices_linux
|
||||
from ..core.utils import *
|
||||
from ..config import AppConfig
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowsAudioDevice:
|
||||
"""Windows audio device data structure."""
|
||||
name: str
|
||||
status: str
|
||||
device_type: str # "Input", "Output", "Unknown"
|
||||
instance_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinuxAudioDevice:
|
||||
"""Linux audio device data structure."""
|
||||
device_id: int
|
||||
name: str
|
||||
max_input_channels: int
|
||||
max_output_channels: int
|
||||
default_samplerate: float
|
||||
hostapi_name: str
|
||||
|
||||
|
||||
def list_devices(
|
||||
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
|
||||
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
|
||||
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
|
||||
sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
|
||||
desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
|
||||
filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""List audio devices."""
|
||||
|
||||
# Load configuration for context
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
# Determine title based on environment and options
|
||||
is_wsl = detect_wsl()
|
||||
use_windows = is_wsl and not native
|
||||
sort_config = SortConf(sort_by, not desc) if sort_by else None
|
||||
filter_config = FilterConf.new(filter_by) if filter_by else None
|
||||
if use_windows:
|
||||
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
|
||||
fg=typer.colors.BLUE))
|
||||
windows_devices = get_windows_audio_devices()
|
||||
display_windows_devices(windows_devices, sort_config, filter_config)
|
||||
|
||||
else:
|
||||
linux_devices = get_linux_audio_devices()
|
||||
display_linux_devices(linux_devices, config, pulse, sort_config)
|
||||
|
||||
# Show legend
|
||||
typer.echo(f"\nLegend:")
|
||||
if use_windows:
|
||||
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN))
|
||||
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
|
||||
typer.echo(f" IN/OUT = Input/Output channel count")
|
||||
|
||||
|
||||
def detect_wsl() -> bool:
|
||||
def _detect_wsl() -> bool:
|
||||
"""Detect if running under Windows Subsystem for Linux."""
|
||||
try:
|
||||
# Method 1: Check /proc/version for Microsoft
|
||||
@@ -100,479 +36,126 @@ def detect_wsl() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def get_windows_audio_devices_com() -> List[WindowsAudioDevice]:
|
||||
"""
|
||||
Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell.
|
||||
Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow.
|
||||
"""
|
||||
ps_script = r"""
|
||||
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
|
||||
|
||||
$code = @"
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Collections.Generic;
|
||||
|
||||
[ComImport]
|
||||
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
|
||||
class MMDeviceEnumerator {}
|
||||
|
||||
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceEnumerator {
|
||||
int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices);
|
||||
// Other methods not used are omitted
|
||||
}
|
||||
|
||||
enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 }
|
||||
|
||||
[Flags]
|
||||
enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF }
|
||||
|
||||
[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceCollection {
|
||||
int GetCount(out int pcDevices);
|
||||
int Item(int nDevice, out IMMDevice ppDevice);
|
||||
}
|
||||
|
||||
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDevice {
|
||||
int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface);
|
||||
int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties);
|
||||
int GetId(out string ppstrId);
|
||||
int GetState(out int pdwState);
|
||||
}
|
||||
|
||||
[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IPropertyStore {
|
||||
int GetCount(out int cProps);
|
||||
int GetAt(int iProp, out PROPERTYKEY pkey);
|
||||
int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv);
|
||||
int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar);
|
||||
int Commit();
|
||||
}
|
||||
|
||||
[StructLayout(LayoutKind.Sequential)]
|
||||
struct PROPERTYKEY {
|
||||
public Guid fmtid;
|
||||
public int pid;
|
||||
}
|
||||
|
||||
[StructLayout(LayoutKind.Explicit)]
|
||||
struct PROPVARIANT {
|
||||
[FieldOffset(0)] public ushort vt;
|
||||
[FieldOffset(8)] public IntPtr pointerValue;
|
||||
}
|
||||
|
||||
static class PKEY {
|
||||
// PKEY_Device_FriendlyName
|
||||
public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY {
|
||||
fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14
|
||||
};
|
||||
}
|
||||
|
||||
public class DeviceInfo {
|
||||
public string Name { get; set; }
|
||||
public string Id { get; set; }
|
||||
public string Type { get; set; } // "Input" or "Output"
|
||||
public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown"
|
||||
}
|
||||
|
||||
public static class CoreAudioEnumerator {
|
||||
[DllImport("ole32.dll")]
|
||||
private static extern int PropVariantClear(ref PROPVARIANT pvar);
|
||||
|
||||
private static string ReadString(IPropertyStore store, PROPERTYKEY key) {
|
||||
PROPVARIANT pv = new PROPVARIANT();
|
||||
try {
|
||||
store.GetValue(ref key, out pv);
|
||||
// VT_LPWSTR = 31
|
||||
if (pv.vt == 31) {
|
||||
return Marshal.PtrToStringUni(pv.pointerValue);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
PropVariantClear(ref pv);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<DeviceInfo> GetAll() {
|
||||
var result = new List<DeviceInfo>();
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
|
||||
foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) {
|
||||
IMMDeviceCollection coll;
|
||||
enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll);
|
||||
|
||||
int count;
|
||||
coll.GetCount(out count);
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
IMMDevice dev;
|
||||
coll.Item(i, out dev);
|
||||
|
||||
string id;
|
||||
dev.GetId(out id);
|
||||
|
||||
int state;
|
||||
dev.GetState(out state);
|
||||
|
||||
IPropertyStore store;
|
||||
dev.OpenPropertyStore(0 /* STGM_READ */, out store);
|
||||
string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)";
|
||||
|
||||
string type = flow == EDataFlow.eCapture ? "Input" : "Output";
|
||||
|
||||
string status =
|
||||
(state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" :
|
||||
(state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" :
|
||||
(state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" :
|
||||
(state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" :
|
||||
"Unknown";
|
||||
|
||||
result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status });
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
"@
|
||||
|
||||
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
|
||||
|
||||
$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object {
|
||||
[PSCustomObject]@{
|
||||
FriendlyName = $_.Name
|
||||
InstanceId = $_.Id
|
||||
Type = $_.Type
|
||||
Status = $_.Status
|
||||
}
|
||||
}
|
||||
|
||||
$devices | ConvertTo-Json
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
ps_script,
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
|
||||
typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW))
|
||||
return []
|
||||
|
||||
data = json.loads(output)
|
||||
items = data if isinstance(data, list) else [data]
|
||||
|
||||
devices: List[WindowsAudioDevice] = []
|
||||
for it in items:
|
||||
name = (it.get("FriendlyName") or "").strip()
|
||||
instance_id = it.get("InstanceId") or ""
|
||||
dev_type = it.get("Type") or "Unknown"
|
||||
status = it.get("Status") or "Unknown"
|
||||
|
||||
if not name:
|
||||
continue
|
||||
|
||||
devices.append(
|
||||
WindowsAudioDevice(
|
||||
name=name,
|
||||
status=status,
|
||||
device_type=dev_type,
|
||||
instance_id=instance_id,
|
||||
)
|
||||
)
|
||||
|
||||
return devices
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
except subprocess.TimeoutExpired:
|
||||
typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED))
|
||||
return []
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}",
|
||||
fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
|
||||
def get_windows_default_devices() -> Dict[str, str]:
|
||||
"""Get Windows default audio devices via PowerShell."""
|
||||
ps_script = r"""
|
||||
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
|
||||
|
||||
$code = @"
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
|
||||
[ComImport]
|
||||
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
|
||||
class MMDeviceEnumerator {}
|
||||
|
||||
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceEnumerator {
|
||||
int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint);
|
||||
}
|
||||
|
||||
enum EDataFlow { eRender = 0, eCapture = 1 }
|
||||
enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 }
|
||||
|
||||
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDevice {
|
||||
int GetId(out string ppstrId);
|
||||
}
|
||||
|
||||
public static class DefaultDeviceHelper {
|
||||
public static string GetDefaultInputDevice() {
|
||||
try {
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
IMMDevice device;
|
||||
enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device);
|
||||
string id;
|
||||
device.GetId(out id);
|
||||
return id;
|
||||
} catch { return null; }
|
||||
}
|
||||
|
||||
public static string GetDefaultOutputDevice() {
|
||||
try {
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
IMMDevice device;
|
||||
enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device);
|
||||
string id;
|
||||
device.GetId(out id);
|
||||
return id;
|
||||
} catch { return null; }
|
||||
}
|
||||
}
|
||||
"@
|
||||
|
||||
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
|
||||
|
||||
$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice()
|
||||
$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice()
|
||||
|
||||
[PSCustomObject]@{
|
||||
DefaultInput = $defaultInput
|
||||
DefaultOutput = $defaultOutput
|
||||
} | ConvertTo-Json
|
||||
"""
|
||||
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
ps_script,
|
||||
], capture_output=True, text=True, encoding="utf-8", timeout=15)
|
||||
|
||||
if result.returncode != 0:
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
if not result.stdout.strip():
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
return {
|
||||
"default_input": data.get("DefaultInput"),
|
||||
"default_output": data.get("DefaultOutput")
|
||||
}
|
||||
|
||||
except Exception:
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
|
||||
def get_windows_audio_devices() -> List[WindowsAudioDevice]:
|
||||
"""Get detailed Windows audio devices via PowerShell from WSL."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
# Force UTF-8 output from PowerShell before running the command
|
||||
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; "
|
||||
"Get-PnpDevice -Class AudioEndpoint | "
|
||||
"Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
|
||||
], capture_output=True, text=True, encoding="utf-8", timeout=15)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}"
|
||||
typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
if not result.stdout.strip():
|
||||
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
|
||||
return []
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
devices = data if isinstance(data, list) else [data]
|
||||
|
||||
windows_devices = []
|
||||
for device in devices:
|
||||
name = device.get('FriendlyName', '').strip()
|
||||
status = device.get('Status', 'Unknown')
|
||||
instance_id = device.get('InstanceId', '')
|
||||
|
||||
if not name:
|
||||
continue
|
||||
|
||||
# Determine device type from InstanceId
|
||||
device_type = _categorize_device_type(instance_id)
|
||||
|
||||
windows_devices.append(WindowsAudioDevice(
|
||||
name=name,
|
||||
status=status,
|
||||
device_type=device_type,
|
||||
instance_id=instance_id
|
||||
))
|
||||
|
||||
return windows_devices
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
except subprocess.TimeoutExpired:
|
||||
typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED))
|
||||
return []
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
|
||||
def _categorize_device_type(instance_id: str) -> str:
|
||||
def _get_device_type(instance_id: str) -> str:
|
||||
"""Categorize device type based on InstanceId."""
|
||||
instance_lower = instance_id.lower()
|
||||
if '0.0.1.00000000' in instance_lower:
|
||||
return "Input"
|
||||
return "Recording"
|
||||
elif '0.0.0.00000000' in instance_lower:
|
||||
return "Output"
|
||||
return "Playback"
|
||||
else:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def get_linux_audio_devices() -> List[LinuxAudioDevice]:
|
||||
"""Get Linux audio devices using sounddevice."""
|
||||
try:
|
||||
devices = sd.query_devices()
|
||||
linux_devices = []
|
||||
|
||||
for i, device in enumerate(devices):
|
||||
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
||||
|
||||
linux_devices.append(LinuxAudioDevice(
|
||||
device_id=i,
|
||||
name=device['name'],
|
||||
max_input_channels=device['max_input_channels'],
|
||||
max_output_channels=device['max_output_channels'],
|
||||
default_samplerate=device['default_samplerate'],
|
||||
hostapi_name=hostapi_name
|
||||
))
|
||||
|
||||
return linux_devices
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
def _get_short_device_id(instance_id: str) -> str:
|
||||
"""Get device name based on InstanceId."""
|
||||
if len(instance_id) == 68:
|
||||
return instance_id[31:-1]
|
||||
elif len(instance_id) == 55:
|
||||
return instance_id[18:-1]
|
||||
else:
|
||||
return instance_id
|
||||
|
||||
|
||||
def display_windows_devices(devices: List[WindowsAudioDevice],
|
||||
sort_conf: Optional[SortConf] = None,
|
||||
filter_conf: Optional[FilterConf] = None
|
||||
) -> None:
|
||||
"""Display Windows audio devices with optimized formatting."""
|
||||
def _get_colored_bool(status: bool) -> str:
|
||||
"""Get colored status string based on status."""
|
||||
if status:
|
||||
return f"[green]{status}[/green]"
|
||||
else:
|
||||
return str(status)
|
||||
|
||||
|
||||
def _get_colored_status(status: str) -> str:
|
||||
"""Get colored status string based on status."""
|
||||
if status == "Unknown":
|
||||
return f"[red]{status}[/red]"
|
||||
else:
|
||||
return str(status)
|
||||
|
||||
|
||||
def list_devices(
|
||||
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
|
||||
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
|
||||
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
|
||||
all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"),
|
||||
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
|
||||
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
|
||||
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""List audio devices."""
|
||||
|
||||
def _get_id(_instance_id):
|
||||
return _instance_id[31:39]
|
||||
# Load configuration for context
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
if not devices:
|
||||
typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED))
|
||||
# Determine title based on environment and options
|
||||
is_wsl = _detect_wsl()
|
||||
if is_wsl and not native:
|
||||
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
|
||||
fg=typer.colors.BLUE))
|
||||
result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows()
|
||||
if not check_result(result):
|
||||
return
|
||||
|
||||
windows_devices = get_results(result.stdout)
|
||||
if all_devices:
|
||||
select_conf = [
|
||||
SelectConf(attr="Index", default=""),
|
||||
SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)),
|
||||
SelectConf(attr="FriendlyName", to="Name"),
|
||||
SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)),
|
||||
SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)),
|
||||
]
|
||||
else:
|
||||
select_conf = [
|
||||
SelectConf(attr="Index"),
|
||||
SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)),
|
||||
SelectConf(attr="Name"),
|
||||
SelectConf(attr="Type"),
|
||||
SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)),
|
||||
]
|
||||
windows_devices = select(windows_devices, select_conf)
|
||||
|
||||
# apply sorting and filtering
|
||||
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
|
||||
|
||||
display_as_table(windows_devices)
|
||||
|
||||
|
||||
else:
|
||||
if native and not is_wsl:
|
||||
typer.echo(typer.style("Native is applicable only when WSL is detected.", fg=typer.colors.RED))
|
||||
return
|
||||
|
||||
if all_devices and native:
|
||||
typer.echo(typer.style("All devices is not applicable with native mode.", fg=typer.colors.RED))
|
||||
return
|
||||
|
||||
result = get_audio_devices_linux()
|
||||
if not check_result(result):
|
||||
return
|
||||
|
||||
linux_devices = result.result
|
||||
# apply sorting and filtering
|
||||
linux_devices = filter_and_sort(linux_devices, filter_info, sort_info, desc_info)
|
||||
|
||||
display_as_table(linux_devices)
|
||||
|
||||
# Show legend
|
||||
typer.echo(f"\nLegend:")
|
||||
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
|
||||
typer.echo(f" IN/OUT = Input/Output channel count")
|
||||
|
||||
|
||||
def install():
|
||||
result = install_audio_cmdlets()
|
||||
if not check_result(result):
|
||||
return
|
||||
|
||||
# Get default devices
|
||||
default_devices = get_windows_default_devices()
|
||||
default_input_id = default_devices.get("default_input")
|
||||
default_output_id = default_devices.get("default_output")
|
||||
|
||||
if not devices:
|
||||
typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED))
|
||||
|
||||
# Create rich table
|
||||
table = Table(show_header=True)
|
||||
table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis")
|
||||
table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis")
|
||||
table.add_column("Type", width=8, justify="center")
|
||||
table.add_column("Status", width=3, justify="center")
|
||||
table.add_column("*", width=2, justify="center")
|
||||
|
||||
# Add devices to table
|
||||
rows = [] # list of dict
|
||||
for device in devices:
|
||||
# Determine if device is selected (default)
|
||||
is_selected = False
|
||||
if device.device_type == "Input" and device.instance_id == default_input_id:
|
||||
is_selected = True
|
||||
elif device.device_type == "Output" and device.instance_id == default_output_id:
|
||||
is_selected = True
|
||||
|
||||
selected_marker = "*" if is_selected else ""
|
||||
|
||||
# Color coding for status
|
||||
if device.status == "OK":
|
||||
status_text = f"[green]{device.status}[/green]"
|
||||
else:
|
||||
status_text = f"[red]{device.status}[/red]"
|
||||
|
||||
# Style selected row
|
||||
if is_selected:
|
||||
row_style = "bold"
|
||||
id_text = f"[bold]{_get_id(device.instance_id)}[/bold]"
|
||||
name_text = f"[bold]{device.name}[/bold]"
|
||||
type_text = f"[bold]{device.device_type}[/bold]"
|
||||
selected_text = f"[bold yellow]{selected_marker}[/bold yellow]"
|
||||
else:
|
||||
row_style = None
|
||||
id_text = _get_id(device.instance_id)
|
||||
name_text = device.name
|
||||
type_text = device.device_type
|
||||
selected_text = selected_marker
|
||||
|
||||
rows.append({
|
||||
"id": id_text,
|
||||
"name": name_text,
|
||||
"type": type_text,
|
||||
"status": status_text,
|
||||
"selected": selected_text
|
||||
})
|
||||
|
||||
rows = sort_by(rows, sort_conf)
|
||||
rows = filter_by(rows, filter_conf)
|
||||
|
||||
for row in rows:
|
||||
table.add_row(*row.values())
|
||||
|
||||
# Display table
|
||||
console = Console()
|
||||
console.print(table)
|
||||
typer.echo(result.stdout)
|
||||
|
||||
|
||||
def display_linux_devices(devices: List[LinuxAudioDevice],
|
||||
def display_linux_devices(devices: list,
|
||||
config: AppConfig,
|
||||
show_pulse: bool = False,
|
||||
sort_conf: Optional[SortConf] = None) -> None:
|
||||
@@ -625,14 +208,14 @@ def display_linux_devices(devices: List[LinuxAudioDevice],
|
||||
|
||||
|
||||
# Legacy function for backwards compatibility
|
||||
def get_audio_devices():
|
||||
"""Get comprehensive information about audio devices."""
|
||||
try:
|
||||
devices = sd.query_devices()
|
||||
return devices
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
# def get_audio_devices():
|
||||
# """Get comprehensive information about audio devices."""
|
||||
# try:
|
||||
# devices = sd.query_devices()
|
||||
# return devices
|
||||
# except Exception as e:
|
||||
# typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
|
||||
# raise typer.Exit(1)
|
||||
|
||||
|
||||
def get_pulseaudio_sources():
|
||||
@@ -658,6 +241,7 @@ def test_device(
|
||||
device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"),
|
||||
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
|
||||
save: bool = typer.Option(False, "--save", help="Save recording to WAV file"),
|
||||
play: bool = typer.Option(False, "--play", help="Play recorded audio through default speakers"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""Test audio recording from a specific device."""
|
||||
@@ -672,7 +256,11 @@ def test_device(
|
||||
typer.echo("=" * 20)
|
||||
|
||||
# Get device info
|
||||
devices_list = get_linux_audio_devices()
|
||||
result = get_audio_devices_linux()
|
||||
if not check_result(result):
|
||||
return
|
||||
|
||||
devices_list = result.result
|
||||
|
||||
if test_device_id is not None:
|
||||
if test_device_id >= len(devices_list):
|
||||
@@ -726,72 +314,20 @@ def test_device(
|
||||
else:
|
||||
typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN))
|
||||
|
||||
# Play recorded audio if requested
|
||||
if play:
|
||||
typer.echo(f"\n{typer.style('Playing recorded audio...', fg=typer.colors.CYAN, bold=True)}")
|
||||
sd.play(audio_data, samplerate=config.audio.sample_rate)
|
||||
sd.wait()
|
||||
typer.echo(typer.style("Playback completed!", fg=typer.colors.CYAN))
|
||||
|
||||
# Save if requested
|
||||
if save:
|
||||
filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav"
|
||||
# Note: WyomingAudioRecorder is not defined in the current code
|
||||
# This would need to be implemented or the save functionality modified
|
||||
typer.echo(f" Save functionality needs to be implemented")
|
||||
from scipy.io import wavfile
|
||||
wavfile.write(filename, config.audio.sample_rate, audio_int16)
|
||||
typer.echo(typer.style(f"Audio saved to: {filename}", fg=typer.colors.MAGENTA, bold=True))
|
||||
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def config_info(
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""Show current audio configuration."""
|
||||
|
||||
# Load configuration
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True))
|
||||
typer.echo("=" * 21)
|
||||
|
||||
# Show current settings
|
||||
typer.echo(f"\nCurrent settings:")
|
||||
typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}")
|
||||
typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}")
|
||||
|
||||
if config.audio.device is not None:
|
||||
typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}")
|
||||
|
||||
# Show device details
|
||||
try:
|
||||
devices_list = get_linux_audio_devices()
|
||||
if config.audio.device < len(devices_list):
|
||||
device = devices_list[config.audio.device]
|
||||
typer.echo(f"\nConfigured device details:")
|
||||
typer.echo(f" Name: {device['name']}")
|
||||
typer.echo(f" Input channels: {device['max_input_channels']}")
|
||||
typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz")
|
||||
|
||||
if device['max_input_channels'] == 0:
|
||||
typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!",
|
||||
fg=typer.colors.RED, bold=True))
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}")
|
||||
|
||||
# Show default device info
|
||||
try:
|
||||
default_device = sd.default.device
|
||||
if hasattr(default_device, '__len__') and len(default_device) >= 2:
|
||||
default_input = int(default_device[0])
|
||||
else:
|
||||
default_input = int(default_device)
|
||||
|
||||
devices_list = get_linux_audio_devices()
|
||||
device = devices_list[default_input]
|
||||
typer.echo(f"\nDefault input device:")
|
||||
typer.echo(f" ID: {default_input}")
|
||||
typer.echo(f" Name: {device['name']}")
|
||||
typer.echo(f" Input channels: {device['max_input_channels']}")
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED))
|
||||
|
||||
# Show configuration source info
|
||||
|
||||
@@ -4,7 +4,61 @@ from contextlib import closing
|
||||
import typer
|
||||
from typing import Optional
|
||||
|
||||
from wyoming.audio import AudioStart, AudioChunk, AudioStop
|
||||
|
||||
from ..config import AppConfig
|
||||
import io
|
||||
import wave
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
import asyncio
|
||||
from wyoming.client import AsyncTcpClient
|
||||
from wyoming.asr import Transcribe, Transcript
|
||||
|
||||
|
||||
async def _async_transcribe(host: str, port: int, timeout: float, pcm_bytes: bytes, lang: str) -> Optional[str]:
|
||||
"""Stream raw PCM data to Wyoming ASR and return transcript text."""
|
||||
# Instantiate the async TCP client
|
||||
client = AsyncTcpClient(host, port)
|
||||
|
||||
# Audio parameters
|
||||
rate = 16000
|
||||
width = 2 # 16-bit
|
||||
channels = 1
|
||||
|
||||
# The client instance is an async context manager.
|
||||
async with client:
|
||||
# 1. Send transcription request
|
||||
await client.write_event(Transcribe(language=lang).event())
|
||||
|
||||
# 2. Start the audio stream
|
||||
await client.write_event(AudioStart(rate, width, channels).event())
|
||||
|
||||
# 3. Send audio chunks
|
||||
chunk_size = 2048 # A reasonable chunk size
|
||||
for i in range(0, len(pcm_bytes), chunk_size):
|
||||
chunk_bytes = pcm_bytes[i:i + chunk_size]
|
||||
await client.write_event(AudioChunk(audio=chunk_bytes, rate=rate, width=width, channels=channels).event())
|
||||
|
||||
# 4. Stop the audio stream
|
||||
await client.write_event(AudioStop().event())
|
||||
|
||||
# 5. Read events until a transcript arrives
|
||||
transcript_text = None
|
||||
try:
|
||||
while True:
|
||||
event = await asyncio.wait_for(client.read_event(), timeout=timeout)
|
||||
if event is None:
|
||||
break
|
||||
|
||||
if Transcript.is_type(event.type):
|
||||
tr = Transcript.from_event(event)
|
||||
transcript_text = tr.text
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
typer.echo(typer.style("Connection timed out waiting for transcript.", fg=typer.colors.YELLOW))
|
||||
|
||||
return transcript_text
|
||||
|
||||
|
||||
def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]:
|
||||
@@ -57,4 +111,67 @@ def check(
|
||||
typer.echo(typer.style("Wyoming server unreachable!", fg=typer.colors.RED, bold=True))
|
||||
typer.echo(f"Server: {final_host}:{final_port}")
|
||||
typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def test(
|
||||
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
|
||||
lang: str = typer.Option("fr", "--lang", help="Language code: 'fr' or 'en'"),
|
||||
host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"),
|
||||
port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"),
|
||||
timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""Record from default microphone, send to Wyoming ASR server, and print transcription."""
|
||||
# Load configuration
|
||||
config = AppConfig.load(config_file)
|
||||
final_host = host or config.server.host
|
||||
final_port = port or config.server.port
|
||||
final_timeout = timeout or config.server.timeout
|
||||
|
||||
# Validate language (two-letter code)
|
||||
lang = (lang or "fr").strip().lower()
|
||||
if lang not in ("fr", "en"):
|
||||
typer.echo(typer.style("Invalid --lang. Use 'fr' or 'en'.", fg=typer.colors.RED))
|
||||
raise typer.Exit(2)
|
||||
|
||||
# Check server reachability first
|
||||
reachable, latency, err = check_wyoming_server(final_host, final_port, final_timeout)
|
||||
if not reachable:
|
||||
typer.echo(typer.style(f"Cannot reach Wyoming server at {final_host}:{final_port}: {err}", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
|
||||
# Record audio (16 kHz mono float32)
|
||||
sample_rate = 16000
|
||||
channels = 1
|
||||
typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True))
|
||||
try:
|
||||
frames = int(duration * sample_rate)
|
||||
audio = sd.rec(frames, samplerate=sample_rate, channels=channels, dtype="float32")
|
||||
sd.wait()
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Audio recording failed: {e}", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
|
||||
# Convert to PCM16 bytes directly, no need for WAV wrapper
|
||||
audio_int16 = np.clip(audio.flatten() * 32767.0, -32768, 32767).astype(np.int16)
|
||||
pcm_bytes = audio_int16.tobytes()
|
||||
|
||||
# Send to Wyoming ASR (async)
|
||||
try:
|
||||
typer.echo(typer.style(f"Connecting to {final_host}:{final_port} (lang={lang})...", fg=typer.colors.CYAN))
|
||||
|
||||
# Run the async helper
|
||||
transcript_text = asyncio.run(
|
||||
_async_transcribe(final_host, final_port, final_timeout, pcm_bytes, lang)
|
||||
)
|
||||
|
||||
if transcript_text:
|
||||
typer.echo(typer.style("\nTranscription:", fg=typer.colors.GREEN, bold=True))
|
||||
typer.echo(transcript_text)
|
||||
else:
|
||||
typer.echo(typer.style("No transcription received.", fg=typer.colors.YELLOW))
|
||||
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"ASR request failed: {e}", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
|
||||
107
src/core/Expando.py
Normal file
107
src/core/Expando.py
Normal file
@@ -0,0 +1,107 @@
|
||||
NO_DEFAULT = object()
|
||||
|
||||
|
||||
class Expando:
|
||||
"""
|
||||
Readonly dynamic class that eases the access to attributes and sub attributes
|
||||
It is initialized with a dict
|
||||
You can then access the property using dot '.' (ex. obj.prop1.prop2)
|
||||
"""
|
||||
|
||||
def __init__(self, props=None, **kwargs):
|
||||
self._props = props.copy() if props else {}
|
||||
if kwargs:
|
||||
self._props.update(kwargs)
|
||||
|
||||
def __getattr__(self, item):
|
||||
if item not in self._props:
|
||||
raise AttributeError(item)
|
||||
|
||||
current = self._props[item]
|
||||
return Expando(current) if isinstance(current, dict) else current
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._props[key] = value
|
||||
|
||||
def get(self, path, default=None):
|
||||
"""
|
||||
returns the value, from a string with represents the path
|
||||
:param path:
|
||||
:param default: value to return if path is not found
|
||||
:return:
|
||||
"""
|
||||
current = self._props
|
||||
for attr in path.split("."):
|
||||
if isinstance(current, list):
|
||||
temp = []
|
||||
for value in current:
|
||||
if value and attr in value:
|
||||
temp.append(value[attr])
|
||||
current = temp
|
||||
|
||||
else:
|
||||
if current is None or attr not in current:
|
||||
return default
|
||||
current = current[attr]
|
||||
|
||||
return current
|
||||
|
||||
def geti(self, item, default=NO_DEFAULT):
|
||||
"""
|
||||
insensitive get
|
||||
:param item:
|
||||
:param default:
|
||||
:return:
|
||||
"""
|
||||
if item in self._props:
|
||||
return self._props[item]
|
||||
|
||||
for k, v in self._props.items():
|
||||
if k.lower() == item.lower():
|
||||
return v
|
||||
|
||||
if default is not NO_DEFAULT:
|
||||
return default
|
||||
|
||||
raise AttributeError(f"Item '{item}' not found")
|
||||
|
||||
def as_dict(self):
|
||||
"""
|
||||
Return the information as a dictionary
|
||||
:return:
|
||||
"""
|
||||
return self._props.copy()
|
||||
|
||||
def to_dict(self, mappings: dict) -> dict:
|
||||
"""
|
||||
Return the information as a dictionary, with the given mappings
|
||||
"""
|
||||
return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None}
|
||||
|
||||
def __hasattr__(self, item):
|
||||
return item in self._props
|
||||
|
||||
def values(self):
|
||||
return self._props.values()
|
||||
|
||||
def copy(self):
|
||||
return Expando(self._props.copy())
|
||||
|
||||
def __repr__(self):
|
||||
if "key" in self._props:
|
||||
return f"Expando(key={self._props["key"]})"
|
||||
|
||||
props_as_str = str(self._props)
|
||||
if len(props_as_str) > 50:
|
||||
props_as_str = props_as_str[:50] + "..."
|
||||
|
||||
return f"Expando({props_as_str})"
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Expando):
|
||||
return False
|
||||
|
||||
return self._props == other._props
|
||||
|
||||
def __hash__(self):
|
||||
return hash(tuple(sorted(self._props.items())))
|
||||
0
src/core/__init__.py
Normal file
0
src/core/__init__.py
Normal file
257
src/core/utils.py
Normal file
257
src/core/utils.py
Normal file
@@ -0,0 +1,257 @@
|
||||
import json
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from subprocess import CompletedProcess
|
||||
from typing import Any, Callable
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from .Expando import Expando
|
||||
|
||||
|
||||
@dataclass
|
||||
class SortConf:
|
||||
property: str
|
||||
ascending: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterConf:
|
||||
property: str
|
||||
value: str
|
||||
|
||||
@staticmethod
|
||||
def new(text):
|
||||
"""Initialize FilterConf from text."""
|
||||
if text is None:
|
||||
return None
|
||||
|
||||
parts = text.split("=")
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
|
||||
if not parts[0].strip() or not parts[1].strip():
|
||||
return None
|
||||
|
||||
return FilterConf(parts[0].strip(), parts[1].strip())
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelectConf:
|
||||
attr: str # property nam to select
|
||||
to: str = None # rename the property
|
||||
default: str = None # value to use if property is missing
|
||||
formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessResult:
|
||||
result: Any
|
||||
error: str = None
|
||||
|
||||
@property
|
||||
def stderr(self):
|
||||
return self.error
|
||||
|
||||
@property
|
||||
def returncode(self):
|
||||
return 0 if self.result is not None else 1
|
||||
|
||||
def sort_by(items: list[Expando], sort_conf: SortConf):
|
||||
"""Sort a list of items by a given property."""
|
||||
|
||||
def _safe_sort_key(item):
|
||||
"""Get sort key with safe handling of None values."""
|
||||
value = item.geti(property_name, "")
|
||||
# Convert None to empty string for consistent sorting
|
||||
return "" if value is None else str(value)
|
||||
|
||||
if sort_conf is None:
|
||||
return items
|
||||
|
||||
if len(items) == 0:
|
||||
return items
|
||||
|
||||
if sort_conf.property is None:
|
||||
return items
|
||||
|
||||
property_name = sort_conf.property.strip()
|
||||
return sorted(items, key=_safe_sort_key, reverse=not sort_conf.ascending)
|
||||
|
||||
|
||||
def filter_by(items, filter_conf: FilterConf):
|
||||
"""Filter a list of items by a given property."""
|
||||
|
||||
def _contains(item, value):
|
||||
return str(value).lower() in str(item).lower()
|
||||
|
||||
if (filter_conf is None
|
||||
or filter_conf.property is None
|
||||
or len(items) == 0):
|
||||
return items
|
||||
|
||||
predicate = _contains
|
||||
property_name = filter_conf.property.strip()
|
||||
|
||||
return [
|
||||
item for item in items if predicate(item.geti(property_name), filter_conf.value)
|
||||
]
|
||||
|
||||
|
||||
def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]:
|
||||
res = []
|
||||
|
||||
for item in items:
|
||||
new_item = {}
|
||||
for setting in settings:
|
||||
attr = setting.attr.strip()
|
||||
|
||||
key = setting.to or attr
|
||||
value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default)
|
||||
new_item[key] = value
|
||||
|
||||
res.append(Expando(new_item))
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def run_ps_command(command: str) -> CompletedProcess[str]:
|
||||
"""Run a PowerShell command and return the output"""
|
||||
completed = subprocess.run(
|
||||
["powershell.exe",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;",
|
||||
command],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8", timeout=5
|
||||
)
|
||||
|
||||
return completed
|
||||
|
||||
|
||||
def run_ps_command_live(command: str) -> CompletedProcess[str]:
|
||||
"""Run a PowerShell command and display output in real-time"""
|
||||
process = subprocess.Popen(
|
||||
["powershell.exe",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;",
|
||||
command],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
stdout_lines = []
|
||||
stderr_lines = []
|
||||
|
||||
# Read output in real-time
|
||||
while True:
|
||||
stdout_line = process.stdout.readline()
|
||||
stderr_line = process.stderr.readline()
|
||||
|
||||
if stdout_line:
|
||||
print(stdout_line.rstrip())
|
||||
stdout_lines.append(stdout_line)
|
||||
|
||||
if stderr_line:
|
||||
print(f"ERROR: {stderr_line.rstrip()}")
|
||||
stderr_lines.append(stderr_line)
|
||||
|
||||
# Check if process is finished
|
||||
if process.poll() is not None:
|
||||
break
|
||||
|
||||
# Read any remaining lines
|
||||
remaining_stdout, remaining_stderr = process.communicate()
|
||||
if remaining_stdout:
|
||||
print(remaining_stdout.rstrip())
|
||||
stdout_lines.append(remaining_stdout)
|
||||
if remaining_stderr:
|
||||
print(f"ERROR: {remaining_stderr.rstrip()}")
|
||||
stderr_lines.append(remaining_stderr)
|
||||
|
||||
# Create CompletedProcess object to maintain compatibility
|
||||
completed = CompletedProcess(
|
||||
args=["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command],
|
||||
returncode=process.returncode,
|
||||
stdout=''.join(stdout_lines),
|
||||
stderr=''.join(stderr_lines)
|
||||
)
|
||||
|
||||
return completed
|
||||
|
||||
|
||||
def get_results(stdout: str) -> list[Expando]:
|
||||
stripped = stdout.strip()
|
||||
if not stripped:
|
||||
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
|
||||
return []
|
||||
|
||||
data = json.loads(stripped)
|
||||
as_list = data if isinstance(data, list) else [data]
|
||||
|
||||
return [Expando(item) for item in as_list]
|
||||
|
||||
|
||||
def display_as_table(result, columns_settings: list = None):
|
||||
def _create_default_columns_settings():
|
||||
if len(result) == 0:
|
||||
return []
|
||||
|
||||
blue_print = result[0]
|
||||
return [{"name": attr} for attr in blue_print.as_dict().keys()]
|
||||
|
||||
if columns_settings is None:
|
||||
columns_settings = _create_default_columns_settings()
|
||||
|
||||
formatters = {}
|
||||
table = Table(show_header=True)
|
||||
for col in [c for c in columns_settings if "name" in c]:
|
||||
col_name = col["name"]
|
||||
table.add_column(col_name, **col["attrs"] if "attrs" in col else {})
|
||||
formatter = col.pop("formatter", None)
|
||||
if formatter:
|
||||
formatters[col_name] = formatter
|
||||
|
||||
for item in result:
|
||||
cloned = item.copy()
|
||||
# apply formatters
|
||||
for col, formatter in formatters.items():
|
||||
cloned[col] = formatter(item)
|
||||
|
||||
table.add_row(*[str(value) for value in cloned.values()])
|
||||
|
||||
# Display table
|
||||
console = Console()
|
||||
console.print(table)
|
||||
|
||||
|
||||
def check_result(result):
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}"
|
||||
typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED))
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def filter_and_sort(items, filter_info, sort_info, desc_info=False):
|
||||
if filter_info:
|
||||
filter_config = FilterConf.new(filter_info) if filter_by else None
|
||||
items = filter_by(items, filter_config)
|
||||
|
||||
if sort_info:
|
||||
sort_config = SortConf(sort_info, not desc_info) if sort_by else None
|
||||
items = sort_by(items, sort_config)
|
||||
|
||||
return items
|
||||
78
src/devices.py
Normal file
78
src/devices.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from subprocess import CompletedProcess
|
||||
import sounddevice as sd
|
||||
|
||||
from .core.Expando import Expando
|
||||
from .core.utils import run_ps_command, run_ps_command_live, ProcessResult
|
||||
|
||||
|
||||
def _is_audio_device_cmdlets_installed():
|
||||
# PowerShell command to check if module is available
|
||||
ps_command = "Get-Module -ListAvailable -Name AudioDeviceCmdlets"
|
||||
|
||||
result = run_ps_command(ps_command)
|
||||
|
||||
# If something is returned in stdout, the module is installed
|
||||
return bool(result.stdout.strip())
|
||||
|
||||
|
||||
def install_audio_cmdlets():
|
||||
"""Install AudioDevice cmdlet for better audio device management"""
|
||||
if _is_audio_device_cmdlets_installed():
|
||||
return CompletedProcess(
|
||||
args=["is_audio_device_cmdlets_installed()"],
|
||||
returncode=0,
|
||||
stdout="AudioDevice cmdlet is already installed",
|
||||
stderr="")
|
||||
|
||||
ps_script = r"""
|
||||
# 1) Trust PSGallery
|
||||
Set-PSRepository -Name "PSGallery" -InstallationPolicy Trusted -ErrorAction SilentlyContinue
|
||||
|
||||
# 2) Ensure NuGet provider is available
|
||||
if (-not (Get-PackageProvider -Name NuGet -ErrorAction SilentlyContinue)) {
|
||||
Install-PackageProvider -Name NuGet -Force -Scope CurrentUser
|
||||
}
|
||||
|
||||
# 3) Install AudioDeviceCmdlets if missing
|
||||
if (-not (Get-Module -ListAvailable -Name AudioDeviceCmdlets)) {
|
||||
Install-Module -Name AudioDeviceCmdlets -Scope CurrentUser -Force -AllowClobber
|
||||
}
|
||||
|
||||
# 4) Import the module
|
||||
Import-Module AudioDeviceCmdlets -ErrorAction SilentlyContinue
|
||||
"""
|
||||
return run_ps_command_live(ps_script)
|
||||
|
||||
|
||||
def get_audio_devices_windows():
|
||||
"""Get all audio devices using AudioDevice cmdlet"""
|
||||
ps_script = "Get-AudioDevice -List | ConvertTo-Json"
|
||||
return run_ps_command(ps_script)
|
||||
|
||||
|
||||
def get_audio_devices_windows_from_pnp_devices():
|
||||
ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
|
||||
return run_ps_command(ps_script)
|
||||
|
||||
|
||||
def get_audio_devices_linux():
|
||||
"""Get all audio devices using sounddevice module"""
|
||||
try:
|
||||
devices = sd.query_devices()
|
||||
result = []
|
||||
|
||||
for i, device in enumerate(devices):
|
||||
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
||||
|
||||
result.append(Expando({
|
||||
"device_id": i,
|
||||
"name": device['name'],
|
||||
"max_input_channels": device['max_input_channels'],
|
||||
"max_output_channels": device['max_output_channels'],
|
||||
"default_samplerate": device['default_samplerate'],
|
||||
"hostapi_name": hostapi_name}
|
||||
))
|
||||
|
||||
return ProcessResult(result)
|
||||
except Exception as e:
|
||||
ProcessResult(None, f"Error querying Linux audio devices: {e}")
|
||||
66
src/utils.py
66
src/utils.py
@@ -1,66 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class SortConf:
|
||||
property: str
|
||||
ascending: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterConf:
|
||||
property: str
|
||||
value: str
|
||||
|
||||
@staticmethod
|
||||
def new(text):
|
||||
"""Initialize FilterConf from text."""
|
||||
if text is None:
|
||||
return None
|
||||
|
||||
parts = text.split("=")
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
|
||||
return FilterConf(parts[0].strip(), parts[1].strip())
|
||||
|
||||
|
||||
def sort_by(items, sort_conf: SortConf):
|
||||
"""Sort a list of items by a given property."""
|
||||
if sort_conf is None:
|
||||
return items
|
||||
|
||||
if len(items) == 0:
|
||||
return items
|
||||
|
||||
if isinstance(items[0], dict):
|
||||
property_name = sort_conf.property.lower().strip()
|
||||
return sorted(items, key=lambda item: item.get(property_name), reverse=not sort_conf.ascending)
|
||||
else:
|
||||
return sorted(items, key=lambda item: getattr(item, sort_conf.property), reverse=not sort_conf.ascending)
|
||||
|
||||
|
||||
def filter_by(items, filter_conf: FilterConf):
|
||||
"""Filter a list of items by a given property."""
|
||||
|
||||
def _contains(item, value):
|
||||
return str(value).lower() in str(item).lower()
|
||||
|
||||
if (filter_conf is None
|
||||
or filter_conf.property is None
|
||||
or len(items) == 0):
|
||||
return items
|
||||
|
||||
predicate = _contains
|
||||
|
||||
if isinstance(items[0], dict):
|
||||
property_name = filter_conf.property.lower().strip()
|
||||
return [
|
||||
item for item in items if predicate(item.get(property_name), filter_conf.value)
|
||||
]
|
||||
|
||||
else:
|
||||
|
||||
return [
|
||||
item for item in items if getattr(item, filter_conf.property) == filter_conf.value
|
||||
]
|
||||
@@ -1,407 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch, mock_open, MagicMock
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from src.commands.audio import (
|
||||
detect_wsl,
|
||||
get_windows_audio_devices,
|
||||
get_linux_audio_devices,
|
||||
_categorize_device_type,
|
||||
display_windows_devices,
|
||||
display_linux_devices,
|
||||
WindowsAudioDevice,
|
||||
LinuxAudioDevice
|
||||
)
|
||||
from src.config import AppConfig, AudioConfig, ServerConfig
|
||||
|
||||
|
||||
# WSL Detection Tests
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-microsoft-standard'))
|
||||
def test_detect_wsl_via_proc_version_microsoft(mock_exists):
|
||||
"""Test WSL detection via /proc/version containing 'microsoft'."""
|
||||
mock_exists.return_value = True
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-wsl2-standard'))
|
||||
def test_detect_wsl_via_proc_version_wsl(mock_exists):
|
||||
"""Test WSL detection via /proc/version containing 'wsl'."""
|
||||
mock_exists.return_value = True
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic'))
|
||||
@patch('os.getenv')
|
||||
def test_detect_wsl_via_env_distro_name(mock_getenv, mock_exists):
|
||||
"""Test WSL detection via WSL_DISTRO_NAME environment variable."""
|
||||
mock_exists.return_value = True
|
||||
mock_getenv.side_effect = lambda key: 'Ubuntu' if key == 'WSL_DISTRO_NAME' else None
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic'))
|
||||
@patch('os.getenv')
|
||||
def test_detect_wsl_via_env_wslenv(mock_getenv, mock_exists):
|
||||
"""Test WSL detection via WSLENV environment variable."""
|
||||
mock_exists.return_value = True
|
||||
mock_getenv.side_effect = lambda key: 'PATH/l' if key == 'WSLENV' else None
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', mock_open(read_data='Linux version 5.4.0-generic'))
|
||||
@patch('os.getenv', return_value=None)
|
||||
def test_detect_wsl_false_native_linux(mock_getenv, mock_exists):
|
||||
"""Test WSL detection returns False on native Linux."""
|
||||
mock_exists.return_value = True
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists', return_value=False)
|
||||
@patch('os.getenv', return_value=None)
|
||||
def test_detect_wsl_false_no_proc_version(mock_getenv, mock_exists):
|
||||
"""Test WSL detection returns False when /proc/version doesn't exist."""
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
@patch('pathlib.Path.exists')
|
||||
@patch('builtins.open', side_effect=Exception("Permission denied"))
|
||||
@patch('os.getenv', return_value=None)
|
||||
def test_detect_wsl_handles_proc_version_read_error(mock_getenv, mock_exists):
|
||||
"""Test WSL detection handles /proc/version read errors gracefully."""
|
||||
mock_exists.return_value = True
|
||||
|
||||
result = detect_wsl()
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
# Device Type Categorization Tests
|
||||
|
||||
def test_categorize_input_device():
|
||||
"""Test categorization of input device."""
|
||||
instance_id = "SWD\\MMDEVAPI\\{0.0.1.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\capture"
|
||||
|
||||
result = _categorize_device_type(instance_id)
|
||||
|
||||
assert result == "Input"
|
||||
|
||||
|
||||
def test_categorize_output_device():
|
||||
"""Test categorization of output device."""
|
||||
instance_id = "SWD\\MMDEVAPI\\{0.0.0.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\render"
|
||||
|
||||
result = _categorize_device_type(instance_id)
|
||||
|
||||
assert result == "Output"
|
||||
|
||||
|
||||
def test_categorize_unknown_device():
|
||||
"""Test categorization of unknown device type."""
|
||||
instance_id = "SWD\\MMDEVAPI\\{0.0.2.00000000}.{a1b2c3d4-e5f6-7890-abcd-ef1234567890}\\unknown"
|
||||
|
||||
result = _categorize_device_type(instance_id)
|
||||
|
||||
assert result == "Unknown"
|
||||
|
||||
|
||||
def test_categorize_empty_instance_id():
|
||||
"""Test categorization with empty instance ID."""
|
||||
result = _categorize_device_type("")
|
||||
|
||||
assert result == "Unknown"
|
||||
|
||||
|
||||
# Windows Audio Devices Tests
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_success_single_device(mock_echo, mock_run):
|
||||
"""Test successful retrieval of single Windows device."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = json.dumps({
|
||||
"FriendlyName": "Blue Yeti Microphone",
|
||||
"Status": "OK",
|
||||
"InstanceId": "USB\\VID_0B05&PID_1234\\capture"
|
||||
})
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "Blue Yeti Microphone"
|
||||
assert result[0].status == "OK"
|
||||
assert result[0].device_type == "Input"
|
||||
assert "USB\\VID_0B05&PID_1234\\capture" in result[0].instance_id
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_success_multiple_devices(mock_echo, mock_run):
|
||||
"""Test successful retrieval of multiple Windows devices."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = json.dumps([
|
||||
{
|
||||
"FriendlyName": "Microphone",
|
||||
"Status": "OK",
|
||||
"InstanceId": "capture_device"
|
||||
},
|
||||
{
|
||||
"FriendlyName": "Speakers",
|
||||
"Status": "OK",
|
||||
"InstanceId": "render_device"
|
||||
}
|
||||
])
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].device_type == "Input"
|
||||
assert result[1].device_type == "Output"
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_powershell_error(mock_echo, mock_run):
|
||||
"""Test PowerShell command failure."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 1
|
||||
mock_result.stdout = ""
|
||||
mock_result.stderr = "Get-PnpDevice : Access denied"
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert result == []
|
||||
mock_echo.assert_called()
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_empty_output(mock_echo, mock_run):
|
||||
"""Test PowerShell returning empty output."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = ""
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert result == []
|
||||
mock_echo.assert_called()
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_invalid_json(mock_echo, mock_run):
|
||||
"""Test invalid JSON response from PowerShell."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = "Invalid JSON {"
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert result == []
|
||||
mock_echo.assert_called()
|
||||
|
||||
|
||||
@patch('subprocess.run', side_effect=subprocess.TimeoutExpired("powershell.exe", 15))
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_timeout(mock_echo, mock_run):
|
||||
"""Test PowerShell command timeout."""
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert result == []
|
||||
mock_echo.assert_called()
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@patch('typer.echo')
|
||||
def test_get_windows_devices_filters_empty_names(mock_echo, mock_run):
|
||||
"""Test filtering of devices with empty names."""
|
||||
mock_result = Mock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = json.dumps([
|
||||
{
|
||||
"FriendlyName": "Valid Device",
|
||||
"Status": "OK",
|
||||
"InstanceId": "capture_device"
|
||||
},
|
||||
{
|
||||
"FriendlyName": "",
|
||||
"Status": "OK",
|
||||
"InstanceId": "empty_name_device"
|
||||
},
|
||||
{
|
||||
"FriendlyName": None,
|
||||
"Status": "OK",
|
||||
"InstanceId": "null_name_device"
|
||||
}
|
||||
])
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
result = get_windows_audio_devices()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "Valid Device"
|
||||
|
||||
|
||||
# Linux Audio Devices Tests
|
||||
|
||||
@patch('sounddevice.query_devices')
|
||||
@patch('sounddevice.query_hostapis')
|
||||
@patch('typer.echo')
|
||||
def test_get_linux_devices_success(mock_echo, mock_hostapis, mock_devices):
|
||||
"""Test successful retrieval of Linux devices."""
|
||||
mock_devices.return_value = [
|
||||
{
|
||||
'name': 'pulse',
|
||||
'max_input_channels': 32,
|
||||
'max_output_channels': 32,
|
||||
'default_samplerate': 44100.0,
|
||||
'hostapi': 0
|
||||
},
|
||||
{
|
||||
'name': 'default',
|
||||
'max_input_channels': 32,
|
||||
'max_output_channels': 32,
|
||||
'default_samplerate': 44100.0,
|
||||
'hostapi': 0
|
||||
}
|
||||
]
|
||||
mock_hostapis.return_value = {'name': 'ALSA'}
|
||||
|
||||
result = get_linux_audio_devices()
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].name == 'pulse'
|
||||
assert result[0].device_id == 0
|
||||
assert result[0].hostapi_name == 'ALSA'
|
||||
assert result[1].name == 'default'
|
||||
assert result[1].device_id == 1
|
||||
|
||||
|
||||
@patch('sounddevice.query_devices', side_effect=Exception("ALSA error"))
|
||||
@patch('typer.echo')
|
||||
def test_get_linux_devices_sounddevice_error(mock_echo, mock_devices):
|
||||
"""Test sounddevice query failure."""
|
||||
with pytest.raises(SystemExit):
|
||||
get_linux_audio_devices()
|
||||
|
||||
|
||||
# Display Functions Tests
|
||||
|
||||
@patch('typer.echo')
|
||||
def test_display_windows_devices_empty_list(mock_echo):
|
||||
"""Test display with empty device list."""
|
||||
display_windows_devices([], True, True)
|
||||
|
||||
mock_echo.assert_called()
|
||||
|
||||
|
||||
@patch('typer.echo')
|
||||
@patch('src.commands.audio._display_single_windows_device')
|
||||
def test_display_windows_devices_input_only(mock_display_single, mock_echo):
|
||||
"""Test display showing only input devices."""
|
||||
devices = [
|
||||
WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'),
|
||||
WindowsAudioDevice('Speakers', 'OK', 'Output', 'spk1')
|
||||
]
|
||||
|
||||
display_windows_devices(devices, show_inputs=True, show_outputs=False)
|
||||
|
||||
mock_display_single.assert_called_once_with(devices[0])
|
||||
|
||||
|
||||
@patch('typer.echo')
|
||||
@patch('src.commands.audio._display_single_windows_device')
|
||||
def test_display_windows_devices_with_unknown(mock_display_single, mock_echo):
|
||||
"""Test display including unknown device types."""
|
||||
devices = [
|
||||
WindowsAudioDevice('Microphone', 'OK', 'Input', 'mic1'),
|
||||
WindowsAudioDevice('Unknown Device', 'OK', 'Unknown', 'unk1')
|
||||
]
|
||||
|
||||
display_windows_devices(devices, show_inputs=True, show_outputs=True)
|
||||
|
||||
assert mock_display_single.call_count == 2
|
||||
|
||||
|
||||
@patch('typer.echo')
|
||||
def test_display_linux_devices_no_matching_devices(mock_echo):
|
||||
"""Test display with no matching devices."""
|
||||
devices = [
|
||||
LinuxAudioDevice(0, 'pulse', 0, 32, 44100.0, 'ALSA')
|
||||
]
|
||||
config = AppConfig(ServerConfig(), AudioConfig())
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
display_linux_devices(devices, show_inputs=True, show_outputs=False, config=config)
|
||||
|
||||
|
||||
@patch('typer.echo')
|
||||
@patch('sounddevice.default')
|
||||
def test_display_linux_devices_success(mock_default, mock_echo):
|
||||
"""Test successful display of Linux devices."""
|
||||
mock_default.device = [0, 1]
|
||||
|
||||
devices = [
|
||||
LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA')
|
||||
]
|
||||
config = AppConfig(ServerConfig(), AudioConfig())
|
||||
|
||||
display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config)
|
||||
|
||||
# Verify that echo was called with device information
|
||||
assert mock_echo.call_count > 0
|
||||
|
||||
|
||||
@patch('typer.echo')
|
||||
@patch('sounddevice.default')
|
||||
def test_display_linux_devices_with_configured_device(mock_default, mock_echo):
|
||||
"""Test display with configured device marked."""
|
||||
mock_default.device = [0, 1]
|
||||
|
||||
devices = [
|
||||
LinuxAudioDevice(0, 'pulse', 32, 32, 44100.0, 'ALSA'),
|
||||
LinuxAudioDevice(1, 'default', 32, 32, 44100.0, 'ALSA')
|
||||
]
|
||||
config = AppConfig(ServerConfig(), AudioConfig(device=1))
|
||||
|
||||
display_linux_devices(devices, show_inputs=True, show_outputs=True, config=config)
|
||||
|
||||
# Verify that echo was called and device is marked as configured
|
||||
assert mock_echo.call_count > 0
|
||||
99
tests/test_expando.py
Normal file
99
tests/test_expando.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import pytest
|
||||
|
||||
from core.Expando import Expando
|
||||
|
||||
|
||||
def test_i_can_get_properties():
|
||||
props = {"a": 10,
|
||||
"b": {
|
||||
"c": "value",
|
||||
"d": 20
|
||||
}}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.a == 10
|
||||
assert dynamic.b.c == "value"
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
assert dynamic.unknown == "some_value"
|
||||
|
||||
|
||||
def test_i_can_get():
|
||||
props = {"a": 10,
|
||||
"b": {
|
||||
"c": "value",
|
||||
"d": 20
|
||||
}}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.get("a") == 10
|
||||
assert dynamic.get("b.c") == "value"
|
||||
assert dynamic.get("unknown") is None
|
||||
assert dynamic.get("unknown", "default") == "default"
|
||||
assert dynamic.get("b.x") is None
|
||||
assert dynamic.get("b.x", "default") == "default"
|
||||
assert dynamic.get("b.x", None) is None
|
||||
|
||||
|
||||
def test_i_can_get_insensitive():
|
||||
props = {"A": 10,
|
||||
"b": 20}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.geti("a") == 10
|
||||
assert dynamic.geti("A") == 10
|
||||
assert dynamic.geti("a.c", None) is None
|
||||
with pytest.raises(AttributeError):
|
||||
assert dynamic.geti("unknown") == "some_value"
|
||||
|
||||
|
||||
def test_i_can_get_insensitive_when_the_two_entries_exist():
|
||||
props = {"A": 10,
|
||||
"a": 20}
|
||||
dynamic = Expando(props)
|
||||
assert dynamic.geti("A") == 10
|
||||
assert dynamic.geti("a") == 20
|
||||
|
||||
|
||||
def test_i_can_get_from_list():
|
||||
props = {"a": [{"c": "value1", "d": 1}, {"c": "value2", "d": 2}]}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.get("a.c") == ["value1", "value2"]
|
||||
|
||||
|
||||
def test_none_is_returned_when_get_from_list_and_property_does_not_exist():
|
||||
props = {"a": [{"c": "value1", "d": 1},
|
||||
{"a": "value2", "d": 2} # 'c' does not exist in the second row
|
||||
]}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.get("a.c") == ["value1"]
|
||||
|
||||
|
||||
def test_i_can_manage_none_values():
|
||||
props = {"a": 10,
|
||||
"b": None}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.get("b.c") is None
|
||||
|
||||
|
||||
def test_i_can_manage_none_values_in_list():
|
||||
props = {"a": [{"b": {"c": "value"}},
|
||||
{"b": None}
|
||||
]}
|
||||
dynamic = Expando(props)
|
||||
|
||||
assert dynamic.get("a.b.c") == ["value"]
|
||||
|
||||
|
||||
def test_i_can_add_new_properties():
|
||||
props = {"a": 10,
|
||||
"b": 20}
|
||||
dynamic = Expando(props)
|
||||
dynamic["c"] = 30
|
||||
|
||||
assert dynamic.a == 10
|
||||
assert dynamic.b == 20
|
||||
assert dynamic.c == 30
|
||||
165
tests/test_filter_by.py
Normal file
165
tests/test_filter_by.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import pytest
|
||||
|
||||
from src.core.utils import FilterConf, filter_by
|
||||
from src.core.Expando import Expando
|
||||
|
||||
|
||||
def test_i_can_filter_by_matching_string():
|
||||
"""Test basic filtering with string matching."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"}),
|
||||
Expando({"name": "charlie"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="alice")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "alice"
|
||||
|
||||
|
||||
def test_i_can_filter_by_case_insensitive():
|
||||
"""Test case-insensitive filtering."""
|
||||
items = [
|
||||
Expando({"name": "ALICE"}),
|
||||
Expando({"name": "Bob"}),
|
||||
Expando({"name": "charlie"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="alice")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "ALICE"
|
||||
|
||||
|
||||
def test_i_can_filter_by_partial_match():
|
||||
"""Test partial substring matching."""
|
||||
items = [
|
||||
Expando({"name": "alice smith"}),
|
||||
Expando({"name": "bob jones"}),
|
||||
Expando({"name": "charlie brown"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="smith")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "alice smith"
|
||||
|
||||
|
||||
def test_i_can_handle_none_filter_conf():
|
||||
"""Test that None filter configuration returns items unchanged."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
|
||||
result = filter_by(items, None)
|
||||
|
||||
assert result == items
|
||||
assert len(result) == 2
|
||||
|
||||
|
||||
def test_i_can_handle_empty_list():
|
||||
"""Test that empty list returns empty list."""
|
||||
items = []
|
||||
filter_conf = FilterConf(property="name", value="alice")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_i_can_handle_none_property_in_filter_conf():
|
||||
"""Test that FilterConf with None property returns items unchanged."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
# Create FilterConf with None property (simulating invalid state)
|
||||
filter_conf = FilterConf(property=None, value="alice")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert result == items
|
||||
assert len(result) == 2
|
||||
|
||||
|
||||
def test_i_can_filter_with_no_matches():
|
||||
"""Test filtering when no items match the criteria."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"}),
|
||||
Expando({"name": "charlie"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="xyz")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_i_cannot_filter_with_missing_properties():
|
||||
"""Test that missing properties raise AttributeError."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"age": 25}), # no name property
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="alice")
|
||||
|
||||
# Should raise AttributeError when trying to access missing property
|
||||
with pytest.raises(AttributeError):
|
||||
filter_by(items, filter_conf)
|
||||
|
||||
|
||||
def test_i_can_filter_with_none_values():
|
||||
"""Test filtering when properties have None values."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": None}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="alice")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
# None gets converted to "none" string, shouldn't match "alice"
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "alice"
|
||||
|
||||
|
||||
def test_i_can_filter_with_case_insensitive_property_names():
|
||||
"""Test filtering using case-insensitive property names."""
|
||||
items = [
|
||||
Expando({"Name": "alice"}), # uppercase N
|
||||
Expando({"Name": "bob"}),
|
||||
Expando({"Name": "charlie"})
|
||||
]
|
||||
filter_conf = FilterConf(property="name", value="alice") # lowercase n
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].Name == "alice"
|
||||
|
||||
|
||||
def test_i_can_filter_with_numbers_and_mixed_types():
|
||||
"""Test filtering with mixed types (numbers, strings)."""
|
||||
items = [
|
||||
Expando({"value": "123"}),
|
||||
Expando({"value": 123}),
|
||||
Expando({"value": "hello123"}),
|
||||
Expando({"value": "world"})
|
||||
]
|
||||
filter_conf = FilterConf(property="value", value="123")
|
||||
|
||||
result = filter_by(items, filter_conf)
|
||||
|
||||
# Should match "123", 456 (converted to "456"), and "hello123"
|
||||
assert len(result) == 3
|
||||
expected_values = ["123", 123, "hello123"]
|
||||
actual_values = [item.value for item in result]
|
||||
assert all(val in expected_values for val in actual_values)
|
||||
85
tests/test_filter_conf.py
Normal file
85
tests/test_filter_conf.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from src.core.utils import FilterConf
|
||||
|
||||
|
||||
def test_i_can_create_filter_conf_from_valid_text():
|
||||
"""Test creating FilterConf from valid property=value format."""
|
||||
result = FilterConf.new("property=value")
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, FilterConf)
|
||||
assert result.property == "property"
|
||||
assert result.value == "value"
|
||||
|
||||
|
||||
def test_i_can_handle_none_input():
|
||||
"""Test that None input returns None."""
|
||||
result = FilterConf.new(None)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_can_handle_whitespace_around_equals():
|
||||
"""Test that whitespace around property and value is stripped."""
|
||||
result = FilterConf.new(" property = value ")
|
||||
|
||||
assert result is not None
|
||||
assert result.property == "property"
|
||||
assert result.value == "value"
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_without_equals():
|
||||
"""Test that text without equals sign returns None."""
|
||||
result = FilterConf.new("property")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_with_multiple_equals():
|
||||
"""Test that text with multiple equals signs returns None."""
|
||||
result = FilterConf.new("property=value=another")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_from_empty_string():
|
||||
"""Test that empty string returns None."""
|
||||
result = FilterConf.new("")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_with_empty_property():
|
||||
"""Test that empty property name returns None."""
|
||||
result = FilterConf.new("=value")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_with_empty_value():
|
||||
"""Test that empty value returns None."""
|
||||
result = FilterConf.new("property=")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_with_both_empty():
|
||||
"""Test that both empty property and value returns None."""
|
||||
result = FilterConf.new("=")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_cannot_create_filter_conf_with_whitespace_only():
|
||||
"""Test that whitespace-only parts return None."""
|
||||
result = FilterConf.new(" = ")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_i_can_handle_special_characters_in_values():
|
||||
"""Test that special characters in property names and values are handled correctly."""
|
||||
result = FilterConf.new("device_type=USB Audio Device (2.0)")
|
||||
|
||||
assert result is not None
|
||||
assert result.property == "device_type"
|
||||
assert result.value == "USB Audio Device (2.0)"
|
||||
256
tests/test_select.py
Normal file
256
tests/test_select.py
Normal file
@@ -0,0 +1,256 @@
|
||||
import pytest
|
||||
|
||||
from core.utils import SelectConf, select
|
||||
from core.Expando import Expando
|
||||
|
||||
|
||||
def test_i_can_select_basic_attributes():
|
||||
"""Test basic selection of existing attributes."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25, "city": "paris"}),
|
||||
Expando({"name": "bob", "age": 30, "city": "london"})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name"),
|
||||
SelectConf(attr="age")
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].name == "alice"
|
||||
assert result[0].age == 25
|
||||
assert not hasattr(result[0], "city") # city not selected
|
||||
assert result[1].name == "bob"
|
||||
assert result[1].age == 30
|
||||
|
||||
|
||||
def test_i_can_select_with_renamed_attributes():
|
||||
"""Test renaming attributes using SelectConf.to."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25}),
|
||||
Expando({"name": "bob", "age": 30})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name", to="full_name"),
|
||||
SelectConf(attr="age", to="years")
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].full_name == "alice"
|
||||
assert result[0].years == 25
|
||||
assert not hasattr(result[0], "name") # original name not present
|
||||
assert not hasattr(result[0], "age") # original name not present
|
||||
assert result[1].full_name == "bob"
|
||||
assert result[1].years == 30
|
||||
|
||||
|
||||
def test_i_can_select_with_formatter():
|
||||
"""Test applying formatters to selected attributes."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25}),
|
||||
Expando({"name": "bob", "age": 30})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name", formatter=lambda item: item.name.upper()),
|
||||
SelectConf(attr="age", formatter=lambda item: item.age * 2)
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].name == "ALICE"
|
||||
assert result[0].age == 50
|
||||
assert result[1].name == "BOB"
|
||||
assert result[1].age == 60
|
||||
|
||||
|
||||
def test_i_can_select_with_formatter_and_rename():
|
||||
"""Test combining formatter and renaming."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25}),
|
||||
Expando({"name": "bob", "age": 30})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()),
|
||||
SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2)
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].upper_name == "ALICE"
|
||||
assert result[0].double_age == 50
|
||||
assert not hasattr(result[0], "name")
|
||||
assert not hasattr(result[0], "age")
|
||||
assert result[1].upper_name == "BOB"
|
||||
assert result[1].double_age == 60
|
||||
|
||||
|
||||
def test_i_can_handle_empty_items_list():
|
||||
"""Test that empty items list returns empty list."""
|
||||
items = []
|
||||
settings = [SelectConf(attr="name")]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_i_can_handle_empty_settings_list():
|
||||
"""Test that empty settings list returns items with empty properties."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25}),
|
||||
Expando({"name": "bob", "age": 30})
|
||||
]
|
||||
settings = []
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
# Each result should be an Expando with empty properties
|
||||
assert result[0]._props == {}
|
||||
assert result[1]._props == {}
|
||||
|
||||
|
||||
def test_i_can_handle_missing_attributes():
|
||||
"""Test that missing attributes return None (normal behavior)."""
|
||||
items = [
|
||||
Expando({"name": "alice"}), # no age attribute
|
||||
Expando({"name": "bob", "age": 30})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name"),
|
||||
SelectConf(attr="age") # missing in first item
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].name == "alice"
|
||||
assert result[0].age is None # missing attribute returns None
|
||||
assert result[1].name == "bob"
|
||||
assert result[1].age == 30
|
||||
|
||||
|
||||
def test_i_can_handle_whitespace_in_attr_names():
|
||||
"""Test that whitespace in attribute names is stripped."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr=" name "), # whitespace around attr
|
||||
SelectConf(attr="\tage\n") # tabs and newlines
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "alice"
|
||||
assert result[0].age == 25
|
||||
|
||||
|
||||
def test_i_can_select_multiple_attributes():
|
||||
"""Test selecting multiple attributes from multiple items."""
|
||||
items = [
|
||||
Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}),
|
||||
Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}),
|
||||
Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="name"),
|
||||
SelectConf(attr="city", to="location"),
|
||||
SelectConf(attr="country", formatter=lambda item: item.country.upper())
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].name == "alice"
|
||||
assert result[0].location == "paris"
|
||||
assert result[0].country == "FRANCE"
|
||||
assert result[1].name == "bob"
|
||||
assert result[1].location == "london"
|
||||
assert result[1].country == "UK"
|
||||
assert result[2].name == "charlie"
|
||||
assert result[2].location == "madrid"
|
||||
assert result[2].country == "SPAIN"
|
||||
|
||||
|
||||
def test_i_can_handle_formatter_with_whole_item():
|
||||
"""Test that formatter receives the whole item, not just the attribute value."""
|
||||
items = [
|
||||
Expando({"first_name": "alice", "last_name": "smith"}),
|
||||
Expando({"first_name": "bob", "last_name": "jones"})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(
|
||||
attr="first_name",
|
||||
to="full_name",
|
||||
formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}"
|
||||
)
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].full_name == "alice smith"
|
||||
assert result[1].full_name == "bob jones"
|
||||
|
||||
|
||||
def test_i_cannot_select_when_formatter_raises_exception():
|
||||
"""Test that formatter exceptions are propagated."""
|
||||
items = [
|
||||
Expando({"name": "alice", "value": None})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(
|
||||
attr="value",
|
||||
formatter=lambda item: item.get("value").upper() # will fail on None
|
||||
)
|
||||
]
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
select(items, settings)
|
||||
|
||||
|
||||
def test_i_can_handle_none_values_in_formatter():
|
||||
"""Test formatter handling None values gracefully when designed to do so."""
|
||||
items = [
|
||||
Expando({"name": "alice", "value": None}),
|
||||
Expando({"name": "bob", "value": "test"})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(
|
||||
attr="value",
|
||||
formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE"
|
||||
)
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].value == "NO_VALUE"
|
||||
assert result[1].value == "TEST"
|
||||
|
||||
|
||||
def test_i_can_select_nested_attributes():
|
||||
"""Test selecting nested attributes using Expando.get() path notation."""
|
||||
items = [
|
||||
Expando({"user": {"profile": {"name": "alice"}}, "age": 25}),
|
||||
Expando({"user": {"profile": {"name": "bob"}}, "age": 30})
|
||||
]
|
||||
settings = [
|
||||
SelectConf(attr="user.profile.name", to="name"),
|
||||
SelectConf(attr="age")
|
||||
]
|
||||
|
||||
result = select(items, settings)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].name == "alice"
|
||||
assert result[0].age == 25
|
||||
assert result[1].name == "bob"
|
||||
assert result[1].age == 30
|
||||
152
tests/test_sort_by.py
Normal file
152
tests/test_sort_by.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from src.core.utils import FilterConf, SortConf, sort_by
|
||||
from src.core.Expando import Expando
|
||||
|
||||
|
||||
def test_i_can_sort_by_ascending_strings():
|
||||
"""Test sorting strings in ascending order."""
|
||||
items = [
|
||||
Expando({"name": "charlie"}),
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
sort_conf = SortConf(property="name", ascending=True)
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].name == "alice"
|
||||
assert result[1].name == "bob"
|
||||
assert result[2].name == "charlie"
|
||||
|
||||
|
||||
def test_i_can_sort_by_descending_strings():
|
||||
"""Test sorting strings in descending order."""
|
||||
items = [
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "charlie"}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
sort_conf = SortConf(property="name", ascending=False)
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].name == "charlie"
|
||||
assert result[1].name == "bob"
|
||||
assert result[2].name == "alice"
|
||||
|
||||
|
||||
def test_i_can_handle_none_sort_conf():
|
||||
"""Test that None sort configuration returns items unchanged."""
|
||||
items = [
|
||||
Expando({"name": "charlie"}),
|
||||
Expando({"name": "alice"})
|
||||
]
|
||||
|
||||
result = sort_by(items, None)
|
||||
|
||||
assert result == items
|
||||
assert result[0].name == "charlie"
|
||||
assert result[1].name == "alice"
|
||||
|
||||
|
||||
def test_i_can_handle_empty_list():
|
||||
"""Test that empty list returns empty list."""
|
||||
items = []
|
||||
sort_conf = SortConf(property="name")
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_i_can_sort_with_missing_properties():
|
||||
"""Test sorting when some objects don't have the property."""
|
||||
items = [
|
||||
Expando({"name": "bob"}),
|
||||
Expando({"age": 25}), # no name property
|
||||
Expando({"name": "alice"})
|
||||
]
|
||||
sort_conf = SortConf(property="name", ascending=True)
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
# Items without the property should get empty string and sort first
|
||||
assert result[0].geti("name", "") == "" # the one with missing property
|
||||
assert result[1].name == "alice"
|
||||
assert result[2].name == "bob"
|
||||
|
||||
|
||||
def test_i_can_sort_with_none_values():
|
||||
"""Test sorting when properties have None values."""
|
||||
items = [
|
||||
Expando({"name": "bob"}),
|
||||
Expando({"name": None}),
|
||||
Expando({"name": "alice"})
|
||||
]
|
||||
sort_conf = SortConf(property="name", ascending=True)
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
# None values should be treated as empty strings and sort first
|
||||
assert result[0].name is None
|
||||
assert result[1].name == "alice"
|
||||
assert result[2].name == "bob"
|
||||
|
||||
|
||||
def test_i_can_sort_with_case_insensitive_property_names():
|
||||
"""Test sorting using case-insensitive property names."""
|
||||
items = [
|
||||
Expando({"Name": "charlie"}), # uppercase N
|
||||
Expando({"Name": "alice"}),
|
||||
Expando({"Name": "bob"})
|
||||
]
|
||||
sort_conf = SortConf(property="name", ascending=True) # lowercase n
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].Name == "alice"
|
||||
assert result[1].Name == "bob"
|
||||
assert result[2].Name == "charlie"
|
||||
|
||||
|
||||
|
||||
def test_i_can_sort_mixed_types():
|
||||
"""Test sorting with mixed types (numbers, strings, None)."""
|
||||
items = [
|
||||
Expando({"value": "zebra"}),
|
||||
Expando({"value": 10}),
|
||||
Expando({"value": None}),
|
||||
Expando({"value": "apple"})
|
||||
]
|
||||
sort_conf = SortConf(property="value", ascending=True)
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 4
|
||||
# Mixed types will be converted to strings for comparison
|
||||
# None -> "", numbers -> str(number), strings stay strings
|
||||
assert result[0].value is None # None becomes "" and sorts first
|
||||
assert result[1].value == 10 # "10" sorts before "apple"
|
||||
assert result[2].value == "apple" # "apple" sorts before "zebra"
|
||||
assert result[3].value == "zebra"
|
||||
|
||||
|
||||
def test_i_can_sort_with_whitespace_in_property_name():
|
||||
"""Test that property names are stripped of whitespace."""
|
||||
items = [
|
||||
Expando({"name": "charlie"}),
|
||||
Expando({"name": "alice"}),
|
||||
Expando({"name": "bob"})
|
||||
]
|
||||
sort_conf = SortConf(property=" name ", ascending=True) # whitespace around property
|
||||
|
||||
result = sort_by(items, sort_conf)
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].name == "alice"
|
||||
assert result[1].name == "bob"
|
||||
assert result[2].name == "charlie"
|
||||
Reference in New Issue
Block a user