Working on audio commands
This commit is contained in:
797
src/commands/audio.py
Normal file
797
src/commands/audio.py
Normal file
@@ -0,0 +1,797 @@
|
||||
import sounddevice as sd
|
||||
import subprocess
|
||||
import typer
|
||||
import numpy as np
|
||||
import os
|
||||
import json
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from dataclasses import dataclass
|
||||
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from ..utils import SortConf, sort_by, FilterConf, filter_by
|
||||
from ..config import AppConfig
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowsAudioDevice:
|
||||
"""Windows audio device data structure."""
|
||||
name: str
|
||||
status: str
|
||||
device_type: str # "Input", "Output", "Unknown"
|
||||
instance_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinuxAudioDevice:
|
||||
"""Linux audio device data structure."""
|
||||
device_id: int
|
||||
name: str
|
||||
max_input_channels: int
|
||||
max_output_channels: int
|
||||
default_samplerate: float
|
||||
hostapi_name: str
|
||||
|
||||
|
||||
def list_devices(
|
||||
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
|
||||
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
|
||||
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
|
||||
sort_by: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
|
||||
desc: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
|
||||
filter_by: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""List audio devices."""
|
||||
|
||||
# Load configuration for context
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
# Determine title based on environment and options
|
||||
is_wsl = detect_wsl()
|
||||
use_windows = is_wsl and not native
|
||||
sort_config = SortConf(sort_by, not desc) if sort_by else None
|
||||
filter_config = FilterConf.new(filter_by) if filter_by else None
|
||||
if use_windows:
|
||||
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
|
||||
fg=typer.colors.BLUE))
|
||||
windows_devices = get_windows_audio_devices()
|
||||
display_windows_devices(windows_devices, sort_config, filter_config)
|
||||
|
||||
else:
|
||||
linux_devices = get_linux_audio_devices()
|
||||
display_linux_devices(linux_devices, config, pulse, sort_config)
|
||||
|
||||
# Show legend
|
||||
typer.echo(f"\nLegend:")
|
||||
if use_windows:
|
||||
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN))
|
||||
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
|
||||
typer.echo(f" IN/OUT = Input/Output channel count")
|
||||
|
||||
|
||||
def detect_wsl() -> bool:
|
||||
"""Detect if running under Windows Subsystem for Linux."""
|
||||
try:
|
||||
# Method 1: Check /proc/version for Microsoft
|
||||
if os.path.exists('/proc/version'):
|
||||
with open('/proc/version', 'r') as f:
|
||||
version_info = f.read().lower()
|
||||
if 'microsoft' in version_info or 'wsl' in version_info:
|
||||
return True
|
||||
|
||||
# Method 2: Check WSL environment variables
|
||||
if os.getenv('WSL_DISTRO_NAME') or os.getenv('WSLENV'):
|
||||
return True
|
||||
|
||||
# Method 3: Check if powershell.exe is available
|
||||
try:
|
||||
subprocess.run(['powershell.exe', '-Command', 'echo test'],
|
||||
capture_output=True, timeout=2, check=True)
|
||||
return True
|
||||
except (subprocess.SubprocessError, FileNotFoundError):
|
||||
pass
|
||||
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_windows_audio_devices_com() -> List[WindowsAudioDevice]:
|
||||
"""
|
||||
Retrieve Windows audio devices using Core Audio (COM) via an inline C# helper compiled by PowerShell.
|
||||
Returns a list of WindowsAudioDevice with accurate device_type ('Input' or 'Output') based on DataFlow.
|
||||
"""
|
||||
ps_script = r"""
|
||||
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
|
||||
|
||||
$code = @"
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Collections.Generic;
|
||||
|
||||
[ComImport]
|
||||
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
|
||||
class MMDeviceEnumerator {}
|
||||
|
||||
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceEnumerator {
|
||||
int EnumAudioEndpoints(EDataFlow dataFlow, int dwStateMask, out IMMDeviceCollection ppDevices);
|
||||
// Other methods not used are omitted
|
||||
}
|
||||
|
||||
enum EDataFlow { eRender = 0, eCapture = 1, eAll = 2 }
|
||||
|
||||
[Flags]
|
||||
enum DEVICE_STATE : uint { ACTIVE = 0x1, DISABLED = 0x2, NOTPRESENT = 0x4, UNPLUGGED = 0x8, ALL = 0xF }
|
||||
|
||||
[Guid("0BD7A1BE-7A1A-44DB-8397-C0A794AF5630"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceCollection {
|
||||
int GetCount(out int pcDevices);
|
||||
int Item(int nDevice, out IMMDevice ppDevice);
|
||||
}
|
||||
|
||||
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDevice {
|
||||
int Activate(ref Guid iid, int dwClsCtx, IntPtr pActivationParams, out IntPtr ppInterface);
|
||||
int OpenPropertyStore(int stgmAccess, out IPropertyStore ppProperties);
|
||||
int GetId(out string ppstrId);
|
||||
int GetState(out int pdwState);
|
||||
}
|
||||
|
||||
[Guid("886d8eeb-8cf2-4446-8d02-cdba1dbdcf99"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IPropertyStore {
|
||||
int GetCount(out int cProps);
|
||||
int GetAt(int iProp, out PROPERTYKEY pkey);
|
||||
int GetValue(ref PROPERTYKEY key, out PROPVARIANT pv);
|
||||
int SetValue(ref PROPERTYKEY key, ref PROPVARIANT propvar);
|
||||
int Commit();
|
||||
}
|
||||
|
||||
[StructLayout(LayoutKind.Sequential)]
|
||||
struct PROPERTYKEY {
|
||||
public Guid fmtid;
|
||||
public int pid;
|
||||
}
|
||||
|
||||
[StructLayout(LayoutKind.Explicit)]
|
||||
struct PROPVARIANT {
|
||||
[FieldOffset(0)] public ushort vt;
|
||||
[FieldOffset(8)] public IntPtr pointerValue;
|
||||
}
|
||||
|
||||
static class PKEY {
|
||||
// PKEY_Device_FriendlyName
|
||||
public static PROPERTYKEY Device_FriendlyName = new PROPERTYKEY {
|
||||
fmtid = new Guid("A45C254E-DF1C-4EFD-8020-67D146A850E0"), pid = 14
|
||||
};
|
||||
}
|
||||
|
||||
public class DeviceInfo {
|
||||
public string Name { get; set; }
|
||||
public string Id { get; set; }
|
||||
public string Type { get; set; } // "Input" or "Output"
|
||||
public string Status { get; set; } // "OK", "Disabled", "Unplugged", "NotPresent", "Unknown"
|
||||
}
|
||||
|
||||
public static class CoreAudioEnumerator {
|
||||
[DllImport("ole32.dll")]
|
||||
private static extern int PropVariantClear(ref PROPVARIANT pvar);
|
||||
|
||||
private static string ReadString(IPropertyStore store, PROPERTYKEY key) {
|
||||
PROPVARIANT pv = new PROPVARIANT();
|
||||
try {
|
||||
store.GetValue(ref key, out pv);
|
||||
// VT_LPWSTR = 31
|
||||
if (pv.vt == 31) {
|
||||
return Marshal.PtrToStringUni(pv.pointerValue);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
PropVariantClear(ref pv);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<DeviceInfo> GetAll() {
|
||||
var result = new List<DeviceInfo>();
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
|
||||
foreach (EDataFlow flow in new[] { EDataFlow.eCapture, EDataFlow.eRender }) {
|
||||
IMMDeviceCollection coll;
|
||||
enumerator.EnumAudioEndpoints(flow, (int)DEVICE_STATE.ALL, out coll);
|
||||
|
||||
int count;
|
||||
coll.GetCount(out count);
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
IMMDevice dev;
|
||||
coll.Item(i, out dev);
|
||||
|
||||
string id;
|
||||
dev.GetId(out id);
|
||||
|
||||
int state;
|
||||
dev.GetState(out state);
|
||||
|
||||
IPropertyStore store;
|
||||
dev.OpenPropertyStore(0 /* STGM_READ */, out store);
|
||||
string name = ReadString(store, PKEY.Device_FriendlyName) ?? "(unknown)";
|
||||
|
||||
string type = flow == EDataFlow.eCapture ? "Input" : "Output";
|
||||
|
||||
string status =
|
||||
(state & (int)DEVICE_STATE.ACTIVE) != 0 ? "OK" :
|
||||
(state & (int)DEVICE_STATE.DISABLED) != 0 ? "Disabled" :
|
||||
(state & (int)DEVICE_STATE.UNPLUGGED) != 0 ? "Unplugged" :
|
||||
(state & (int)DEVICE_STATE.NOTPRESENT) != 0 ? "NotPresent" :
|
||||
"Unknown";
|
||||
|
||||
result.Add(new DeviceInfo { Name = name, Id = id, Type = type, Status = status });
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
"@
|
||||
|
||||
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
|
||||
|
||||
$devices = [CoreAudioEnumerator]::GetAll() | ForEach-Object {
|
||||
[PSCustomObject]@{
|
||||
FriendlyName = $_.Name
|
||||
InstanceId = $_.Id
|
||||
Type = $_.Type
|
||||
Status = $_.Status
|
||||
}
|
||||
}
|
||||
|
||||
$devices | ConvertTo-Json
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
ps_script,
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
err = result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
|
||||
typer.echo(typer.style(f"PowerShell (CoreAudio) command failed: {err}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
typer.echo(typer.style("PowerShell (CoreAudio) returned empty output", fg=typer.colors.YELLOW))
|
||||
return []
|
||||
|
||||
data = json.loads(output)
|
||||
items = data if isinstance(data, list) else [data]
|
||||
|
||||
devices: List[WindowsAudioDevice] = []
|
||||
for it in items:
|
||||
name = (it.get("FriendlyName") or "").strip()
|
||||
instance_id = it.get("InstanceId") or ""
|
||||
dev_type = it.get("Type") or "Unknown"
|
||||
status = it.get("Status") or "Unknown"
|
||||
|
||||
if not name:
|
||||
continue
|
||||
|
||||
devices.append(
|
||||
WindowsAudioDevice(
|
||||
name=name,
|
||||
status=status,
|
||||
device_type=dev_type,
|
||||
instance_id=instance_id,
|
||||
)
|
||||
)
|
||||
|
||||
return devices
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
typer.echo(typer.style(f"Failed to parse CoreAudio JSON output: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
except subprocess.TimeoutExpired:
|
||||
typer.echo(typer.style("PowerShell (CoreAudio) command timed out after 30 seconds", fg=typer.colors.RED))
|
||||
return []
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Unexpected error calling CoreAudio via PowerShell: {type(e).__name__}: {e}",
|
||||
fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
|
||||
def get_windows_default_devices() -> Dict[str, str]:
|
||||
"""Get Windows default audio devices via PowerShell."""
|
||||
ps_script = r"""
|
||||
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8;
|
||||
|
||||
$code = @"
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
|
||||
[ComImport]
|
||||
[Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")]
|
||||
class MMDeviceEnumerator {}
|
||||
|
||||
[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDeviceEnumerator {
|
||||
int GetDefaultAudioEndpoint(EDataFlow dataFlow, ERole role, out IMMDevice ppEndpoint);
|
||||
}
|
||||
|
||||
enum EDataFlow { eRender = 0, eCapture = 1 }
|
||||
enum ERole { eConsole = 0, eMultimedia = 1, eCommunications = 2 }
|
||||
|
||||
[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
|
||||
interface IMMDevice {
|
||||
int GetId(out string ppstrId);
|
||||
}
|
||||
|
||||
public static class DefaultDeviceHelper {
|
||||
public static string GetDefaultInputDevice() {
|
||||
try {
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
IMMDevice device;
|
||||
enumerator.GetDefaultAudioEndpoint(EDataFlow.eCapture, ERole.eConsole, out device);
|
||||
string id;
|
||||
device.GetId(out id);
|
||||
return id;
|
||||
} catch { return null; }
|
||||
}
|
||||
|
||||
public static string GetDefaultOutputDevice() {
|
||||
try {
|
||||
var enumerator = (IMMDeviceEnumerator)new MMDeviceEnumerator();
|
||||
IMMDevice device;
|
||||
enumerator.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eConsole, out device);
|
||||
string id;
|
||||
device.GetId(out id);
|
||||
return id;
|
||||
} catch { return null; }
|
||||
}
|
||||
}
|
||||
"@
|
||||
|
||||
Add-Type -TypeDefinition $code -Language CSharp -ErrorAction Stop
|
||||
|
||||
$defaultInput = [DefaultDeviceHelper]::GetDefaultInputDevice()
|
||||
$defaultOutput = [DefaultDeviceHelper]::GetDefaultOutputDevice()
|
||||
|
||||
[PSCustomObject]@{
|
||||
DefaultInput = $defaultInput
|
||||
DefaultOutput = $defaultOutput
|
||||
} | ConvertTo-Json
|
||||
"""
|
||||
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
ps_script,
|
||||
], capture_output=True, text=True, encoding="utf-8", timeout=15)
|
||||
|
||||
if result.returncode != 0:
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
if not result.stdout.strip():
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
return {
|
||||
"default_input": data.get("DefaultInput"),
|
||||
"default_output": data.get("DefaultOutput")
|
||||
}
|
||||
|
||||
except Exception:
|
||||
return {"default_input": None, "default_output": None}
|
||||
|
||||
|
||||
def get_windows_audio_devices() -> List[WindowsAudioDevice]:
|
||||
"""Get detailed Windows audio devices via PowerShell from WSL."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-Command",
|
||||
# Force UTF-8 output from PowerShell before running the command
|
||||
"[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; "
|
||||
"Get-PnpDevice -Class AudioEndpoint | "
|
||||
"Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
|
||||
], capture_output=True, text=True, encoding="utf-8", timeout=15)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() if result.stderr else f"PowerShell exit code: {result.returncode}"
|
||||
typer.echo(typer.style(f"PowerShell command failed: {error_msg}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
if not result.stdout.strip():
|
||||
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
|
||||
return []
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
devices = data if isinstance(data, list) else [data]
|
||||
|
||||
windows_devices = []
|
||||
for device in devices:
|
||||
name = device.get('FriendlyName', '').strip()
|
||||
status = device.get('Status', 'Unknown')
|
||||
instance_id = device.get('InstanceId', '')
|
||||
|
||||
if not name:
|
||||
continue
|
||||
|
||||
# Determine device type from InstanceId
|
||||
device_type = _categorize_device_type(instance_id)
|
||||
|
||||
windows_devices.append(WindowsAudioDevice(
|
||||
name=name,
|
||||
status=status,
|
||||
device_type=device_type,
|
||||
instance_id=instance_id
|
||||
))
|
||||
|
||||
return windows_devices
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
typer.echo(typer.style(f"Failed to parse PowerShell JSON output: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
except subprocess.TimeoutExpired:
|
||||
typer.echo(typer.style("PowerShell command timed out after 15 seconds", fg=typer.colors.RED))
|
||||
return []
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Unexpected error calling PowerShell: {type(e).__name__}: {e}", fg=typer.colors.RED))
|
||||
return []
|
||||
|
||||
|
||||
def _categorize_device_type(instance_id: str) -> str:
|
||||
"""Categorize device type based on InstanceId."""
|
||||
instance_lower = instance_id.lower()
|
||||
if '0.0.1.00000000' in instance_lower:
|
||||
return "Input"
|
||||
elif '0.0.0.00000000' in instance_lower:
|
||||
return "Output"
|
||||
else:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def get_linux_audio_devices() -> List[LinuxAudioDevice]:
|
||||
"""Get Linux audio devices using sounddevice."""
|
||||
try:
|
||||
devices = sd.query_devices()
|
||||
linux_devices = []
|
||||
|
||||
for i, device in enumerate(devices):
|
||||
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
||||
|
||||
linux_devices.append(LinuxAudioDevice(
|
||||
device_id=i,
|
||||
name=device['name'],
|
||||
max_input_channels=device['max_input_channels'],
|
||||
max_output_channels=device['max_output_channels'],
|
||||
default_samplerate=device['default_samplerate'],
|
||||
hostapi_name=hostapi_name
|
||||
))
|
||||
|
||||
return linux_devices
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def display_windows_devices(devices: List[WindowsAudioDevice],
|
||||
sort_conf: Optional[SortConf] = None,
|
||||
filter_conf: Optional[FilterConf] = None
|
||||
) -> None:
|
||||
"""Display Windows audio devices with optimized formatting."""
|
||||
|
||||
def _get_id(_instance_id):
|
||||
return _instance_id[31:39]
|
||||
|
||||
if not devices:
|
||||
typer.echo(typer.style("No Windows audio devices found!", fg=typer.colors.RED))
|
||||
return
|
||||
|
||||
# Get default devices
|
||||
default_devices = get_windows_default_devices()
|
||||
default_input_id = default_devices.get("default_input")
|
||||
default_output_id = default_devices.get("default_output")
|
||||
|
||||
if not devices:
|
||||
typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED))
|
||||
|
||||
# Create rich table
|
||||
table = Table(show_header=True)
|
||||
table.add_column("ID", width=8, no_wrap=True, overflow="ellipsis")
|
||||
table.add_column("Name", width=55, no_wrap=True, overflow="ellipsis")
|
||||
table.add_column("Type", width=8, justify="center")
|
||||
table.add_column("Status", width=3, justify="center")
|
||||
table.add_column("*", width=2, justify="center")
|
||||
|
||||
# Add devices to table
|
||||
rows = [] # list of dict
|
||||
for device in devices:
|
||||
# Determine if device is selected (default)
|
||||
is_selected = False
|
||||
if device.device_type == "Input" and device.instance_id == default_input_id:
|
||||
is_selected = True
|
||||
elif device.device_type == "Output" and device.instance_id == default_output_id:
|
||||
is_selected = True
|
||||
|
||||
selected_marker = "*" if is_selected else ""
|
||||
|
||||
# Color coding for status
|
||||
if device.status == "OK":
|
||||
status_text = f"[green]{device.status}[/green]"
|
||||
else:
|
||||
status_text = f"[red]{device.status}[/red]"
|
||||
|
||||
# Style selected row
|
||||
if is_selected:
|
||||
row_style = "bold"
|
||||
id_text = f"[bold]{_get_id(device.instance_id)}[/bold]"
|
||||
name_text = f"[bold]{device.name}[/bold]"
|
||||
type_text = f"[bold]{device.device_type}[/bold]"
|
||||
selected_text = f"[bold yellow]{selected_marker}[/bold yellow]"
|
||||
else:
|
||||
row_style = None
|
||||
id_text = _get_id(device.instance_id)
|
||||
name_text = device.name
|
||||
type_text = device.device_type
|
||||
selected_text = selected_marker
|
||||
|
||||
rows.append({
|
||||
"id": id_text,
|
||||
"name": name_text,
|
||||
"type": type_text,
|
||||
"status": status_text,
|
||||
"selected": selected_text
|
||||
})
|
||||
|
||||
rows = sort_by(rows, sort_conf)
|
||||
rows = filter_by(rows, filter_conf)
|
||||
|
||||
for row in rows:
|
||||
table.add_row(*row.values())
|
||||
|
||||
# Display table
|
||||
console = Console()
|
||||
console.print(table)
|
||||
|
||||
|
||||
def display_linux_devices(devices: List[LinuxAudioDevice],
|
||||
config: AppConfig,
|
||||
show_pulse: bool = False,
|
||||
sort_conf: Optional[SortConf] = None) -> None:
|
||||
"""Display Linux audio devices with existing formatting logic."""
|
||||
|
||||
if not devices:
|
||||
typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED))
|
||||
return
|
||||
|
||||
# Display header
|
||||
typer.echo(f"\nFound {typer.style(str(len(devices)), fg=typer.colors.CYAN, bold=True)} devices:")
|
||||
typer.echo(f"{'ID':<3} {'Name':<35} {'IN':<3} {'OUT':<3} {'Rate':<8} {'Host API'}")
|
||||
typer.echo("-" * 70)
|
||||
|
||||
# Display devices
|
||||
for device in devices:
|
||||
rate = int(device.default_samplerate)
|
||||
is_configured = (config.audio.device == device.device_id)
|
||||
marker = " *" if is_configured else ""
|
||||
|
||||
device_line = (f"{device.device_id:<3} {device.name:<35} {device.max_input_channels:<3} "
|
||||
f"{device.max_output_channels:<3} {rate:<8} {device.hostapi_name}{marker}")
|
||||
typer.echo(typer.style(device_line, bold=is_configured))
|
||||
|
||||
# Show default devices info
|
||||
typer.echo(f"\nDefault devices:")
|
||||
try:
|
||||
default_device = sd.default.device
|
||||
default_input = int(default_device[0])
|
||||
default_output = int(default_device[1])
|
||||
|
||||
input_device = next((d for d in devices if d.device_id == default_input), None)
|
||||
if input_device:
|
||||
typer.echo(f" Input: [{default_input}] {input_device.name}")
|
||||
output_device = next((d for d in devices if d.device_id == default_output), None)
|
||||
if output_device:
|
||||
typer.echo(f" Output: [{default_output}] {output_device.name}")
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f" Error getting defaults: {e}", fg=typer.colors.RED))
|
||||
|
||||
# Show PulseAudio sources if requested
|
||||
if show_pulse:
|
||||
typer.echo(f"\nPulseAudio sources:")
|
||||
pulse_sources = get_pulseaudio_sources()
|
||||
if pulse_sources:
|
||||
for source in pulse_sources:
|
||||
typer.echo(f" {source['id']:2}: {source['name']}")
|
||||
else:
|
||||
typer.echo(typer.style(" Could not retrieve PulseAudio sources", fg=typer.colors.RED))
|
||||
|
||||
|
||||
# Legacy function for backwards compatibility
|
||||
def get_audio_devices():
|
||||
"""Get comprehensive information about audio devices."""
|
||||
try:
|
||||
devices = sd.query_devices()
|
||||
return devices
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def get_pulseaudio_sources():
|
||||
"""Get PulseAudio sources for comparison."""
|
||||
try:
|
||||
result = subprocess.run(['pactl', 'list', 'sources', 'short'],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
sources = []
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if line.strip():
|
||||
parts = line.split('\t')
|
||||
if len(parts) >= 2:
|
||||
sources.append({'id': parts[0], 'name': parts[1]})
|
||||
return sources
|
||||
else:
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def test_device(
|
||||
device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"),
|
||||
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
|
||||
save: bool = typer.Option(False, "--save", help="Save recording to WAV file"),
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""Test audio recording from a specific device."""
|
||||
|
||||
# Load configuration
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
# Determine device to use
|
||||
test_device_id = device if device is not None else config.audio.device
|
||||
|
||||
typer.echo(typer.style("Audio Device Test", fg=typer.colors.BLUE, bold=True))
|
||||
typer.echo("=" * 20)
|
||||
|
||||
# Get device info
|
||||
devices_list = get_linux_audio_devices()
|
||||
|
||||
if test_device_id is not None:
|
||||
if test_device_id >= len(devices_list):
|
||||
typer.echo(typer.style(f"Error: Device {test_device_id} not found!", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
|
||||
device_info = devices_list[test_device_id]
|
||||
if device_info['max_input_channels'] == 0:
|
||||
typer.echo(typer.style(f"Error: Device {test_device_id} has no input channels!", fg=typer.colors.RED))
|
||||
raise typer.Exit(1)
|
||||
|
||||
typer.echo(f"Testing device [{test_device_id}]: {device_info['name']}")
|
||||
else:
|
||||
typer.echo("Testing default input device")
|
||||
|
||||
typer.echo(f"Recording for {duration} seconds...")
|
||||
typer.echo("Starting in 3... 2... 1...")
|
||||
|
||||
try:
|
||||
# Record audio
|
||||
typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True))
|
||||
|
||||
audio_data = sd.rec(
|
||||
int(duration * config.audio.sample_rate),
|
||||
samplerate=config.audio.sample_rate,
|
||||
channels=config.audio.channels,
|
||||
device=test_device_id,
|
||||
dtype='float32'
|
||||
)
|
||||
sd.wait()
|
||||
|
||||
# Analyze the recording
|
||||
audio_int16 = (audio_data.flatten() * 32767).astype(np.int16)
|
||||
max_amplitude = np.max(np.abs(audio_int16))
|
||||
rms_level = np.sqrt(np.mean(audio_int16.astype(np.float32) ** 2))
|
||||
|
||||
typer.echo(typer.style("Recording completed!", fg=typer.colors.GREEN))
|
||||
typer.echo(f"\nAnalysis:")
|
||||
typer.echo(f" Duration: {duration:.1f}s")
|
||||
typer.echo(f" Samples: {len(audio_int16)}")
|
||||
typer.echo(f" Max amplitude: {max_amplitude} / 32767 ({max_amplitude / 32767 * 100:.1f}%)")
|
||||
typer.echo(f" RMS level: {rms_level:.1f}")
|
||||
|
||||
# Signal quality assessment
|
||||
if max_amplitude < 100:
|
||||
typer.echo(typer.style(" Status: Very low signal - check microphone", fg=typer.colors.RED))
|
||||
elif max_amplitude < 1000:
|
||||
typer.echo(typer.style(" Status: Low signal - may need to speak louder", fg=typer.colors.YELLOW))
|
||||
elif max_amplitude > 30000:
|
||||
typer.echo(typer.style(" Status: Very high signal - may be clipping", fg=typer.colors.YELLOW))
|
||||
else:
|
||||
typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN))
|
||||
|
||||
# Save if requested
|
||||
if save:
|
||||
filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav"
|
||||
# Note: WyomingAudioRecorder is not defined in the current code
|
||||
# This would need to be implemented or the save functionality modified
|
||||
typer.echo(f" Save functionality needs to be implemented")
|
||||
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True))
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def config_info(
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
||||
):
|
||||
"""Show current audio configuration."""
|
||||
|
||||
# Load configuration
|
||||
config = AppConfig.load(config_file)
|
||||
|
||||
typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True))
|
||||
typer.echo("=" * 21)
|
||||
|
||||
# Show current settings
|
||||
typer.echo(f"\nCurrent settings:")
|
||||
typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}")
|
||||
typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}")
|
||||
|
||||
if config.audio.device is not None:
|
||||
typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}")
|
||||
|
||||
# Show device details
|
||||
try:
|
||||
devices_list = get_linux_audio_devices()
|
||||
if config.audio.device < len(devices_list):
|
||||
device = devices_list[config.audio.device]
|
||||
typer.echo(f"\nConfigured device details:")
|
||||
typer.echo(f" Name: {device['name']}")
|
||||
typer.echo(f" Input channels: {device['max_input_channels']}")
|
||||
typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz")
|
||||
|
||||
if device['max_input_channels'] == 0:
|
||||
typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!",
|
||||
fg=typer.colors.RED, bold=True))
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED))
|
||||
else:
|
||||
typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}")
|
||||
|
||||
# Show default device info
|
||||
try:
|
||||
default_device = sd.default.device
|
||||
if hasattr(default_device, '__len__') and len(default_device) >= 2:
|
||||
default_input = int(default_device[0])
|
||||
else:
|
||||
default_input = int(default_device)
|
||||
|
||||
devices_list = get_linux_audio_devices()
|
||||
device = devices_list[default_input]
|
||||
typer.echo(f"\nDefault input device:")
|
||||
typer.echo(f" ID: {default_input}")
|
||||
typer.echo(f" Name: {device['name']}")
|
||||
typer.echo(f" Input channels: {device['max_input_channels']}")
|
||||
except Exception as e:
|
||||
typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED))
|
||||
|
||||
# Show configuration source info
|
||||
Reference in New Issue
Block a user