334 lines
12 KiB
Python
334 lines
12 KiB
Python
import sounddevice as sd
|
|
import numpy as np
|
|
import os
|
|
from typing import Optional, List
|
|
|
|
from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices, \
|
|
get_audio_devices_linux
|
|
from ..core.utils import *
|
|
from ..config import AppConfig
|
|
|
|
|
|
def _detect_wsl() -> bool:
|
|
"""Detect if running under Windows Subsystem for Linux."""
|
|
try:
|
|
# Method 1: Check /proc/version for Microsoft
|
|
if os.path.exists('/proc/version'):
|
|
with open('/proc/version', 'r') as f:
|
|
version_info = f.read().lower()
|
|
if 'microsoft' in version_info or 'wsl' in version_info:
|
|
return True
|
|
|
|
# Method 2: Check WSL environment variables
|
|
if os.getenv('WSL_DISTRO_NAME') or os.getenv('WSLENV'):
|
|
return True
|
|
|
|
# Method 3: Check if powershell.exe is available
|
|
try:
|
|
subprocess.run(['powershell.exe', '-Command', 'echo test'],
|
|
capture_output=True, timeout=2, check=True)
|
|
return True
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
pass
|
|
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _get_device_type(instance_id: str) -> str:
|
|
"""Categorize device type based on InstanceId."""
|
|
instance_lower = instance_id.lower()
|
|
if '0.0.1.00000000' in instance_lower:
|
|
return "Recording"
|
|
elif '0.0.0.00000000' in instance_lower:
|
|
return "Playback"
|
|
else:
|
|
return "Unknown"
|
|
|
|
|
|
def _get_short_device_id(instance_id: str) -> str:
|
|
"""Get device name based on InstanceId."""
|
|
if len(instance_id) == 68:
|
|
return instance_id[31:-1]
|
|
elif len(instance_id) == 55:
|
|
return instance_id[18:-1]
|
|
else:
|
|
return instance_id
|
|
|
|
|
|
def _get_colored_bool(status: bool) -> str:
|
|
"""Get colored status string based on status."""
|
|
if status:
|
|
return f"[green]{status}[/green]"
|
|
else:
|
|
return str(status)
|
|
|
|
|
|
def _get_colored_status(status: str) -> str:
|
|
"""Get colored status string based on status."""
|
|
if status == "Unknown":
|
|
return f"[red]{status}[/red]"
|
|
else:
|
|
return str(status)
|
|
|
|
|
|
def list_devices(
|
|
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
|
|
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
|
|
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
|
|
all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"),
|
|
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
|
|
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
|
|
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
|
|
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
|
):
|
|
"""List audio devices."""
|
|
|
|
# Load configuration for context
|
|
config = AppConfig.load(config_file)
|
|
|
|
# Determine title based on environment and options
|
|
is_wsl = _detect_wsl()
|
|
if is_wsl and not native:
|
|
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
|
|
fg=typer.colors.BLUE))
|
|
result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows()
|
|
if not check_result(result):
|
|
return
|
|
|
|
windows_devices = get_results(result.stdout)
|
|
if all_devices:
|
|
select_conf = [
|
|
SelectConf(attr="Index", default=""),
|
|
SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)),
|
|
SelectConf(attr="FriendlyName", to="Name"),
|
|
SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)),
|
|
SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)),
|
|
]
|
|
else:
|
|
select_conf = [
|
|
SelectConf(attr="Index"),
|
|
SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)),
|
|
SelectConf(attr="Name"),
|
|
SelectConf(attr="Type"),
|
|
SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)),
|
|
]
|
|
windows_devices = select(windows_devices, select_conf)
|
|
|
|
# apply sorting and filtering
|
|
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
|
|
|
|
display_as_table(windows_devices)
|
|
|
|
|
|
else:
|
|
if native and not is_wsl:
|
|
typer.echo(typer.style("Native is applicable only when WSL is detected.", fg=typer.colors.RED))
|
|
return
|
|
|
|
if all_devices and native:
|
|
typer.echo(typer.style("All devices is not applicable with native mode.", fg=typer.colors.RED))
|
|
return
|
|
|
|
result = get_audio_devices_linux()
|
|
if not check_result(result):
|
|
return
|
|
|
|
linux_devices = result.result
|
|
# apply sorting and filtering
|
|
linux_devices = filter_and_sort(linux_devices, filter_info, sort_info, desc_info)
|
|
|
|
display_as_table(linux_devices)
|
|
|
|
# Show legend
|
|
typer.echo(f"\nLegend:")
|
|
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
|
|
typer.echo(f" IN/OUT = Input/Output channel count")
|
|
|
|
|
|
def install():
|
|
result = install_audio_cmdlets()
|
|
if not check_result(result):
|
|
return
|
|
|
|
typer.echo(result.stdout)
|
|
|
|
|
|
def display_linux_devices(devices: list,
|
|
config: AppConfig,
|
|
show_pulse: bool = False,
|
|
sort_conf: Optional[SortConf] = None) -> None:
|
|
"""Display Linux audio devices with existing formatting logic."""
|
|
|
|
if not devices:
|
|
typer.echo(typer.style("No audio devices found!", fg=typer.colors.RED))
|
|
return
|
|
|
|
# Display header
|
|
typer.echo(f"\nFound {typer.style(str(len(devices)), fg=typer.colors.CYAN, bold=True)} devices:")
|
|
typer.echo(f"{'ID':<3} {'Name':<35} {'IN':<3} {'OUT':<3} {'Rate':<8} {'Host API'}")
|
|
typer.echo("-" * 70)
|
|
|
|
# Display devices
|
|
for device in devices:
|
|
rate = int(device.default_samplerate)
|
|
is_configured = (config.audio.device == device.device_id)
|
|
marker = " *" if is_configured else ""
|
|
|
|
device_line = (f"{device.device_id:<3} {device.name:<35} {device.max_input_channels:<3} "
|
|
f"{device.max_output_channels:<3} {rate:<8} {device.hostapi_name}{marker}")
|
|
typer.echo(typer.style(device_line, bold=is_configured))
|
|
|
|
# Show default devices info
|
|
typer.echo(f"\nDefault devices:")
|
|
try:
|
|
default_device = sd.default.device
|
|
default_input = int(default_device[0])
|
|
default_output = int(default_device[1])
|
|
|
|
input_device = next((d for d in devices if d.device_id == default_input), None)
|
|
if input_device:
|
|
typer.echo(f" Input: [{default_input}] {input_device.name}")
|
|
output_device = next((d for d in devices if d.device_id == default_output), None)
|
|
if output_device:
|
|
typer.echo(f" Output: [{default_output}] {output_device.name}")
|
|
except Exception as e:
|
|
typer.echo(typer.style(f" Error getting defaults: {e}", fg=typer.colors.RED))
|
|
|
|
# Show PulseAudio sources if requested
|
|
if show_pulse:
|
|
typer.echo(f"\nPulseAudio sources:")
|
|
pulse_sources = get_pulseaudio_sources()
|
|
if pulse_sources:
|
|
for source in pulse_sources:
|
|
typer.echo(f" {source['id']:2}: {source['name']}")
|
|
else:
|
|
typer.echo(typer.style(" Could not retrieve PulseAudio sources", fg=typer.colors.RED))
|
|
|
|
|
|
# Legacy function for backwards compatibility
|
|
# def get_audio_devices():
|
|
# """Get comprehensive information about audio devices."""
|
|
# try:
|
|
# devices = sd.query_devices()
|
|
# return devices
|
|
# except Exception as e:
|
|
# typer.echo(typer.style(f"Error querying audio devices: {e}", fg=typer.colors.RED, bold=True))
|
|
# raise typer.Exit(1)
|
|
|
|
|
|
def get_pulseaudio_sources():
|
|
"""Get PulseAudio sources for comparison."""
|
|
try:
|
|
result = subprocess.run(['pactl', 'list', 'sources', 'short'],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
sources = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if line.strip():
|
|
parts = line.split('\t')
|
|
if len(parts) >= 2:
|
|
sources.append({'id': parts[0], 'name': parts[1]})
|
|
return sources
|
|
else:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def test_device(
|
|
device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"),
|
|
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
|
|
save: bool = typer.Option(False, "--save", help="Save recording to WAV file"),
|
|
play: bool = typer.Option(False, "--play", help="Play recorded audio through default speakers"),
|
|
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
|
|
):
|
|
"""Test audio recording from a specific device."""
|
|
|
|
# Load configuration
|
|
config = AppConfig.load(config_file)
|
|
|
|
# Determine device to use
|
|
test_device_id = device if device is not None else config.audio.device
|
|
|
|
typer.echo(typer.style("Audio Device Test", fg=typer.colors.BLUE, bold=True))
|
|
typer.echo("=" * 20)
|
|
|
|
# Get device info
|
|
result = get_audio_devices_linux()
|
|
if not check_result(result):
|
|
return
|
|
|
|
devices_list = result.result
|
|
|
|
if test_device_id is not None:
|
|
if test_device_id >= len(devices_list):
|
|
typer.echo(typer.style(f"Error: Device {test_device_id} not found!", fg=typer.colors.RED))
|
|
raise typer.Exit(1)
|
|
|
|
device_info = devices_list[test_device_id]
|
|
if device_info['max_input_channels'] == 0:
|
|
typer.echo(typer.style(f"Error: Device {test_device_id} has no input channels!", fg=typer.colors.RED))
|
|
raise typer.Exit(1)
|
|
|
|
typer.echo(f"Testing device [{test_device_id}]: {device_info['name']}")
|
|
else:
|
|
typer.echo("Testing default input device")
|
|
|
|
typer.echo(f"Recording for {duration} seconds...")
|
|
typer.echo("Starting in 3... 2... 1...")
|
|
|
|
try:
|
|
# Record audio
|
|
typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True))
|
|
|
|
audio_data = sd.rec(
|
|
int(duration * config.audio.sample_rate),
|
|
samplerate=config.audio.sample_rate,
|
|
channels=config.audio.channels,
|
|
device=test_device_id,
|
|
dtype='float32'
|
|
)
|
|
sd.wait()
|
|
|
|
# Analyze the recording
|
|
audio_int16 = (audio_data.flatten() * 32767).astype(np.int16)
|
|
max_amplitude = np.max(np.abs(audio_int16))
|
|
rms_level = np.sqrt(np.mean(audio_int16.astype(np.float32) ** 2))
|
|
|
|
typer.echo(typer.style("Recording completed!", fg=typer.colors.GREEN))
|
|
typer.echo(f"\nAnalysis:")
|
|
typer.echo(f" Duration: {duration:.1f}s")
|
|
typer.echo(f" Samples: {len(audio_int16)}")
|
|
typer.echo(f" Max amplitude: {max_amplitude} / 32767 ({max_amplitude / 32767 * 100:.1f}%)")
|
|
typer.echo(f" RMS level: {rms_level:.1f}")
|
|
|
|
# Signal quality assessment
|
|
if max_amplitude < 100:
|
|
typer.echo(typer.style(" Status: Very low signal - check microphone", fg=typer.colors.RED))
|
|
elif max_amplitude < 1000:
|
|
typer.echo(typer.style(" Status: Low signal - may need to speak louder", fg=typer.colors.YELLOW))
|
|
elif max_amplitude > 30000:
|
|
typer.echo(typer.style(" Status: Very high signal - may be clipping", fg=typer.colors.YELLOW))
|
|
else:
|
|
typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN))
|
|
|
|
# Play recorded audio if requested
|
|
if play:
|
|
typer.echo(f"\n{typer.style('Playing recorded audio...', fg=typer.colors.CYAN, bold=True)}")
|
|
sd.play(audio_data, samplerate=config.audio.sample_rate)
|
|
sd.wait()
|
|
typer.echo(typer.style("Playback completed!", fg=typer.colors.CYAN))
|
|
|
|
# Save if requested
|
|
if save:
|
|
filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav"
|
|
from scipy.io import wavfile
|
|
wavfile.write(filename, config.audio.sample_rate, audio_int16)
|
|
typer.echo(typer.style(f"Audio saved to: {filename}", fg=typer.colors.MAGENTA, bold=True))
|
|
|
|
except Exception as e:
|
|
typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True))
|
|
raise typer.Exit(1)
|