Compare commits

..

2 Commits

9 changed files with 548 additions and 132 deletions

23
requirements.txt Normal file
View File

@@ -0,0 +1,23 @@
cffi==1.17.1
click==8.2.1
comtypes==1.4.12
iniconfig==2.1.0
markdown-it-py==4.0.0
mdurl==0.1.2
numpy==2.3.2
packaging==25.0
pluggy==1.6.0
psutil==7.0.0
PyAudio==0.2.14
pycaw==20240210
pycparser==2.22
Pygments==2.19.2
pytest==8.4.1
PyYAML==6.0.2
rich==14.1.0
scipy==1.16.1
shellingham==1.5.4
sounddevice==0.5.2
typer==0.17.3
typing_extensions==4.15.0
wyoming==1.7.2

View File

@@ -9,15 +9,14 @@ audio_app = typer.Typer(help="Audio device operations")
# Add commands to subcommands # Add commands to subcommands
server_app.command("check")(server.check) server_app.command("check")(server.check)
server_app.command("test")(server.test)
audio_app.command("list")(audio.list_devices) audio_app.command("list")(audio.list_devices)
audio_app.command("test")(audio.test_device) audio_app.command("test")(audio.test_device)
audio_app.command("config")(audio.config_info)
audio_app.command("install")(audio.install) audio_app.command("install")(audio.install)
# Register subcommands # Register subcommands
app.add_typer(server_app, name="server") app.add_typer(server_app, name="server")
app.add_typer(audio_app, name="audio") app.add_typer(audio_app, name="audio")
if __name__ == "__main__": if __name__ == "__main__":
app() app()

View File

@@ -3,7 +3,8 @@ import numpy as np
import os import os
from typing import Optional, List from typing import Optional, List
from ..devices import get_audio_devices_windows, install_audio_cmdlets from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices, \
get_audio_devices_linux
from ..core.utils import * from ..core.utils import *
from ..config import AppConfig from ..config import AppConfig
@@ -34,20 +35,49 @@ def _detect_wsl() -> bool:
except Exception: except Exception:
return False return False
def _get_device_type(instance_id: str) -> str: def _get_device_type(instance_id: str) -> str:
"""Categorize device type based on InstanceId.""" """Categorize device type based on InstanceId."""
instance_lower = instance_id.lower() instance_lower = instance_id.lower()
if '0.0.1.00000000' in instance_lower: if '0.0.1.00000000' in instance_lower:
return "Input" return "Recording"
elif '0.0.0.00000000' in instance_lower: elif '0.0.0.00000000' in instance_lower:
return "Output" return "Playback"
else: else:
return "Unknown" return "Unknown"
def _get_short_device_id(instance_id: str) -> str:
"""Get device name based on InstanceId."""
if len(instance_id) == 68:
return instance_id[31:-1]
elif len(instance_id) == 55:
return instance_id[18:-1]
else:
return instance_id
def _get_colored_bool(status: bool) -> str:
"""Get colored status string based on status."""
if status:
return f"[green]{status}[/green]"
else:
return str(status)
def _get_colored_status(status: str) -> str:
"""Get colored status string based on status."""
if status == "Unknown":
return f"[red]{status}[/red]"
else:
return str(status)
def list_devices( def list_devices(
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"), device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"), pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"), native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"),
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"), sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"), desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"), filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
@@ -60,40 +90,61 @@ def list_devices(
# Determine title based on environment and options # Determine title based on environment and options
is_wsl = _detect_wsl() is_wsl = _detect_wsl()
use_windows = is_wsl and not native if is_wsl and not native:
if use_windows:
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)", typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
fg=typer.colors.BLUE)) fg=typer.colors.BLUE))
result = get_audio_devices_windows() result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows()
if not check_result(result): if not check_result(result):
return return
windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''}) windows_devices = get_results(result.stdout)
if all_devices:
select_conf = [
SelectConf(attr="Index", default=""),
SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)),
SelectConf(attr="FriendlyName", to="Name"),
SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)),
SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)),
]
else:
select_conf = [
SelectConf(attr="Index"),
SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)),
SelectConf(attr="Name"),
SelectConf(attr="Type"),
SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)),
]
windows_devices = select(windows_devices, select_conf)
# apply sorting and filtering # apply sorting and filtering
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info) windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
display_as_table(windows_devices, [ display_as_table(windows_devices)
{"name": "Index"},
{"name": "ID"},
{"name": "Name"},
{"name": "Type"},
{"name": "Default"},
])
else: else:
linux_devices = get_linux_audio_devices() if native and not is_wsl:
display_linux_devices(linux_devices, config, pulse, sort_config) typer.echo(typer.style("Native is applicable only when WSL is detected.", fg=typer.colors.RED))
return
if all_devices and native:
typer.echo(typer.style("All devices is not applicable with native mode.", fg=typer.colors.RED))
return
result = get_audio_devices_linux()
if not check_result(result):
return
linux_devices = result.result
# apply sorting and filtering
linux_devices = filter_and_sort(linux_devices, filter_info, sort_info, desc_info)
display_as_table(linux_devices)
# Show legend # Show legend
typer.echo(f"\nLegend:") typer.echo(f"\nLegend:")
if use_windows: typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN)) typer.echo(f" IN/OUT = Input/Output channel count")
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
else:
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
def install(): def install():
@@ -104,32 +155,6 @@ def install():
typer.echo(result.stdout) typer.echo(result.stdout)
def get_linux_audio_devices() -> list:
"""Get Linux audio devices using sounddevice."""
try:
devices = sd.query_devices()
linux_devices = []
for i, device in enumerate(devices):
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
linux_devices.append(LinuxAudioDevice(
device_id=i,
name=device['name'],
max_input_channels=device['max_input_channels'],
max_output_channels=device['max_output_channels'],
default_samplerate=device['default_samplerate'],
hostapi_name=hostapi_name
))
return linux_devices
except Exception as e:
typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True))
raise typer.Exit(1)
def display_linux_devices(devices: list, def display_linux_devices(devices: list,
config: AppConfig, config: AppConfig,
show_pulse: bool = False, show_pulse: bool = False,
@@ -216,6 +241,7 @@ def test_device(
device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"), device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"),
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"), duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
save: bool = typer.Option(False, "--save", help="Save recording to WAV file"), save: bool = typer.Option(False, "--save", help="Save recording to WAV file"),
play: bool = typer.Option(False, "--play", help="Play recorded audio through default speakers"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file") config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
): ):
"""Test audio recording from a specific device.""" """Test audio recording from a specific device."""
@@ -230,7 +256,11 @@ def test_device(
typer.echo("=" * 20) typer.echo("=" * 20)
# Get device info # Get device info
devices_list = get_linux_audio_devices() result = get_audio_devices_linux()
if not check_result(result):
return
devices_list = result.result
if test_device_id is not None: if test_device_id is not None:
if test_device_id >= len(devices_list): if test_device_id >= len(devices_list):
@@ -284,72 +314,20 @@ def test_device(
else: else:
typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN)) typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN))
# Play recorded audio if requested
if play:
typer.echo(f"\n{typer.style('Playing recorded audio...', fg=typer.colors.CYAN, bold=True)}")
sd.play(audio_data, samplerate=config.audio.sample_rate)
sd.wait()
typer.echo(typer.style("Playback completed!", fg=typer.colors.CYAN))
# Save if requested # Save if requested
if save: if save:
filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav" filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav"
# Note: WyomingAudioRecorder is not defined in the current code from scipy.io import wavfile
# This would need to be implemented or the save functionality modified wavfile.write(filename, config.audio.sample_rate, audio_int16)
typer.echo(f" Save functionality needs to be implemented") typer.echo(typer.style(f"Audio saved to: {filename}", fg=typer.colors.MAGENTA, bold=True))
except Exception as e: except Exception as e:
typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True)) typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True))
raise typer.Exit(1) raise typer.Exit(1)
def config_info(
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""Show current audio configuration."""
# Load configuration
config = AppConfig.load(config_file)
typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True))
typer.echo("=" * 21)
# Show current settings
typer.echo(f"\nCurrent settings:")
typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}")
typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}")
if config.audio.device is not None:
typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}")
# Show device details
try:
devices_list = get_linux_audio_devices()
if config.audio.device < len(devices_list):
device = devices_list[config.audio.device]
typer.echo(f"\nConfigured device details:")
typer.echo(f" Name: {device['name']}")
typer.echo(f" Input channels: {device['max_input_channels']}")
typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz")
if device['max_input_channels'] == 0:
typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED))
else:
typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!",
fg=typer.colors.RED, bold=True))
except Exception as e:
typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED))
else:
typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}")
# Show default device info
try:
default_device = sd.default.device
if hasattr(default_device, '__len__') and len(default_device) >= 2:
default_input = int(default_device[0])
else:
default_input = int(default_device)
devices_list = get_linux_audio_devices()
device = devices_list[default_input]
typer.echo(f"\nDefault input device:")
typer.echo(f" ID: {default_input}")
typer.echo(f" Name: {device['name']}")
typer.echo(f" Input channels: {device['max_input_channels']}")
except Exception as e:
typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED))
# Show configuration source info

View File

@@ -4,7 +4,61 @@ from contextlib import closing
import typer import typer
from typing import Optional from typing import Optional
from wyoming.audio import AudioStart, AudioChunk, AudioStop
from ..config import AppConfig from ..config import AppConfig
import io
import wave
import numpy as np
import sounddevice as sd
import asyncio
from wyoming.client import AsyncTcpClient
from wyoming.asr import Transcribe, Transcript
async def _async_transcribe(host: str, port: int, timeout: float, pcm_bytes: bytes, lang: str) -> Optional[str]:
"""Stream raw PCM data to Wyoming ASR and return transcript text."""
# Instantiate the async TCP client
client = AsyncTcpClient(host, port)
# Audio parameters
rate = 16000
width = 2 # 16-bit
channels = 1
# The client instance is an async context manager.
async with client:
# 1. Send transcription request
await client.write_event(Transcribe(language=lang).event())
# 2. Start the audio stream
await client.write_event(AudioStart(rate, width, channels).event())
# 3. Send audio chunks
chunk_size = 2048 # A reasonable chunk size
for i in range(0, len(pcm_bytes), chunk_size):
chunk_bytes = pcm_bytes[i:i + chunk_size]
await client.write_event(AudioChunk(audio=chunk_bytes, rate=rate, width=width, channels=channels).event())
# 4. Stop the audio stream
await client.write_event(AudioStop().event())
# 5. Read events until a transcript arrives
transcript_text = None
try:
while True:
event = await asyncio.wait_for(client.read_event(), timeout=timeout)
if event is None:
break
if Transcript.is_type(event):
tr = Transcript.from_event(event)
transcript_text = tr.text
break
except asyncio.TimeoutError:
typer.echo(typer.style("Connection timed out waiting for transcript.", fg=typer.colors.YELLOW))
return transcript_text
def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]: def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]:
@@ -58,3 +112,66 @@ def check(
typer.echo(f"Server: {final_host}:{final_port}") typer.echo(f"Server: {final_host}:{final_port}")
typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED)) typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED))
raise typer.Exit(1) raise typer.Exit(1)
def test(
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
lang: str = typer.Option("fr", "--lang", help="Language code: 'fr' or 'en'"),
host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"),
port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"),
timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""Record from default microphone, send to Wyoming ASR server, and print transcription."""
# Load configuration
config = AppConfig.load(config_file)
final_host = host or config.server.host
final_port = port or config.server.port
final_timeout = timeout or config.server.timeout
# Validate language (two-letter code)
lang = (lang or "fr").strip().lower()
if lang not in ("fr", "en"):
typer.echo(typer.style("Invalid --lang. Use 'fr' or 'en'.", fg=typer.colors.RED))
raise typer.Exit(2)
# Check server reachability first
reachable, latency, err = check_wyoming_server(final_host, final_port, final_timeout)
if not reachable:
typer.echo(typer.style(f"Cannot reach Wyoming server at {final_host}:{final_port}: {err}", fg=typer.colors.RED))
raise typer.Exit(1)
# Record audio (16 kHz mono float32)
sample_rate = 16000
channels = 1
typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True))
try:
frames = int(duration * sample_rate)
audio = sd.rec(frames, samplerate=sample_rate, channels=channels, dtype="float32")
sd.wait()
except Exception as e:
typer.echo(typer.style(f"Audio recording failed: {e}", fg=typer.colors.RED))
raise typer.Exit(1)
# Convert to PCM16 bytes directly, no need for WAV wrapper
audio_int16 = np.clip(audio.flatten() * 32767.0, -32768, 32767).astype(np.int16)
pcm_bytes = audio_int16.tobytes()
# Send to Wyoming ASR (async)
try:
typer.echo(typer.style(f"Connecting to {final_host}:{final_port} (lang={lang})...", fg=typer.colors.CYAN))
# Run the async helper
transcript_text = asyncio.run(
_async_transcribe(final_host, final_port, final_timeout, pcm_bytes, lang)
)
if transcript_text:
typer.echo(typer.style("\nTranscription:", fg=typer.colors.GREEN, bold=True))
typer.echo(transcript_text)
else:
typer.echo(typer.style("No transcription received.", fg=typer.colors.YELLOW))
except Exception as e:
typer.echo(typer.style(f"ASR request failed: {e}", fg=typer.colors.RED))
raise typer.Exit(1)

View File

@@ -8,8 +8,10 @@ class Expando:
You can then access the property using dot '.' (ex. obj.prop1.prop2) You can then access the property using dot '.' (ex. obj.prop1.prop2)
""" """
def __init__(self, props): def __init__(self, props=None, **kwargs):
self._props = props self._props = props.copy() if props else {}
if kwargs:
self._props.update(kwargs)
def __getattr__(self, item): def __getattr__(self, item):
if item not in self._props: if item not in self._props:
@@ -21,10 +23,11 @@ class Expando:
def __setitem__(self, key, value): def __setitem__(self, key, value):
self._props[key] = value self._props[key] = value
def get(self, path): def get(self, path, default=None):
""" """
returns the value, from a string with represents the path returns the value, from a string with represents the path
:param path: :param path:
:param default: value to return if path is not found
:return: :return:
""" """
current = self._props current = self._props
@@ -38,7 +41,7 @@ class Expando:
else: else:
if current is None or attr not in current: if current is None or attr not in current:
return None return default
current = current[attr] current = current[attr]
return current return current

View File

@@ -2,7 +2,7 @@ import json
import subprocess import subprocess
from dataclasses import dataclass from dataclasses import dataclass
from subprocess import CompletedProcess from subprocess import CompletedProcess
from typing import Any from typing import Any, Callable
import typer import typer
from rich.console import Console from rich.console import Console
@@ -38,11 +38,26 @@ class FilterConf:
return FilterConf(parts[0].strip(), parts[1].strip()) return FilterConf(parts[0].strip(), parts[1].strip())
@dataclass
class SelectConf:
attr: str # property nam to select
to: str = None # rename the property
default: str = None # value to use if property is missing
formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item
@dataclass @dataclass
class ProcessResult: class ProcessResult:
result: Any result: Any
error: str = None error: str = None
@property
def stderr(self):
return self.error
@property
def returncode(self):
return 0 if self.result is not None else 1
def sort_by(items: list[Expando], sort_conf: SortConf): def sort_by(items: list[Expando], sort_conf: SortConf):
"""Sort a list of items by a given property.""" """Sort a list of items by a given property."""
@@ -53,7 +68,6 @@ def sort_by(items: list[Expando], sort_conf: SortConf):
# Convert None to empty string for consistent sorting # Convert None to empty string for consistent sorting
return "" if value is None else str(value) return "" if value is None else str(value)
if sort_conf is None: if sort_conf is None:
return items return items
@@ -86,6 +100,23 @@ def filter_by(items, filter_conf: FilterConf):
] ]
def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]:
res = []
for item in items:
new_item = {}
for setting in settings:
attr = setting.attr.strip()
key = setting.to or attr
value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default)
new_item[key] = value
res.append(Expando(new_item))
return res
def run_ps_command(command: str) -> CompletedProcess[str]: def run_ps_command(command: str) -> CompletedProcess[str]:
"""Run a PowerShell command and return the output""" """Run a PowerShell command and return the output"""
completed = subprocess.run( completed = subprocess.run(
@@ -160,7 +191,7 @@ def run_ps_command_live(command: str) -> CompletedProcess[str]:
return completed return completed
def get_results(stdout: str, mappings) -> list[dict]: def get_results(stdout: str) -> list[Expando]:
stripped = stdout.strip() stripped = stdout.strip()
if not stripped: if not stripped:
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW)) typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
@@ -169,15 +200,20 @@ def get_results(stdout: str, mappings) -> list[dict]:
data = json.loads(stripped) data = json.loads(stripped)
as_list = data if isinstance(data, list) else [data] as_list = data if isinstance(data, list) else [data]
res = [] return [Expando(item) for item in as_list]
for item in as_list:
mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()}
res.append(Expando(mapped))
return res
def display_as_table(result, columns_settings: list): def display_as_table(result, columns_settings: list = None):
def _create_default_columns_settings():
if len(result) == 0:
return []
blue_print = result[0]
return [{"name": attr} for attr in blue_print.as_dict().keys()]
if columns_settings is None:
columns_settings = _create_default_columns_settings()
formatters = {} formatters = {}
table = Table(show_header=True) table = Table(show_header=True)
for col in [c for c in columns_settings if "name" in c]: for col in [c for c in columns_settings if "name" in c]:

View File

@@ -50,7 +50,7 @@ def get_audio_devices_windows():
return run_ps_command(ps_script) return run_ps_command(ps_script)
def get_audio_devices_windows_from_pnp(): def get_audio_devices_windows_from_pnp_devices():
ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json" ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
return run_ps_command(ps_script) return run_ps_command(ps_script)

View File

@@ -29,6 +29,10 @@ def test_i_can_get():
assert dynamic.get("a") == 10 assert dynamic.get("a") == 10
assert dynamic.get("b.c") == "value" assert dynamic.get("b.c") == "value"
assert dynamic.get("unknown") is None assert dynamic.get("unknown") is None
assert dynamic.get("unknown", "default") == "default"
assert dynamic.get("b.x") is None
assert dynamic.get("b.x", "default") == "default"
assert dynamic.get("b.x", None) is None
def test_i_can_get_insensitive(): def test_i_can_get_insensitive():

256
tests/test_select.py Normal file
View File

@@ -0,0 +1,256 @@
import pytest
from core.utils import SelectConf, select
from core.Expando import Expando
def test_i_can_select_basic_attributes():
"""Test basic selection of existing attributes."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris"}),
Expando({"name": "bob", "age": 30, "city": "london"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert not hasattr(result[0], "city") # city not selected
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_select_with_renamed_attributes():
"""Test renaming attributes using SelectConf.to."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="full_name"),
SelectConf(attr="age", to="years")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice"
assert result[0].years == 25
assert not hasattr(result[0], "name") # original name not present
assert not hasattr(result[0], "age") # original name not present
assert result[1].full_name == "bob"
assert result[1].years == 30
def test_i_can_select_with_formatter():
"""Test applying formatters to selected attributes."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "ALICE"
assert result[0].age == 50
assert result[1].name == "BOB"
assert result[1].age == 60
def test_i_can_select_with_formatter_and_rename():
"""Test combining formatter and renaming."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].upper_name == "ALICE"
assert result[0].double_age == 50
assert not hasattr(result[0], "name")
assert not hasattr(result[0], "age")
assert result[1].upper_name == "BOB"
assert result[1].double_age == 60
def test_i_can_handle_empty_items_list():
"""Test that empty items list returns empty list."""
items = []
settings = [SelectConf(attr="name")]
result = select(items, settings)
assert result == []
def test_i_can_handle_empty_settings_list():
"""Test that empty settings list returns items with empty properties."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = []
result = select(items, settings)
assert len(result) == 2
# Each result should be an Expando with empty properties
assert result[0]._props == {}
assert result[1]._props == {}
def test_i_can_handle_missing_attributes():
"""Test that missing attributes return None (normal behavior)."""
items = [
Expando({"name": "alice"}), # no age attribute
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age") # missing in first item
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age is None # missing attribute returns None
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_handle_whitespace_in_attr_names():
"""Test that whitespace in attribute names is stripped."""
items = [
Expando({"name": "alice", "age": 25})
]
settings = [
SelectConf(attr=" name "), # whitespace around attr
SelectConf(attr="\tage\n") # tabs and newlines
]
result = select(items, settings)
assert len(result) == 1
assert result[0].name == "alice"
assert result[0].age == 25
def test_i_can_select_multiple_attributes():
"""Test selecting multiple attributes from multiple items."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}),
Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}),
Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="city", to="location"),
SelectConf(attr="country", formatter=lambda item: item.country.upper())
]
result = select(items, settings)
assert len(result) == 3
assert result[0].name == "alice"
assert result[0].location == "paris"
assert result[0].country == "FRANCE"
assert result[1].name == "bob"
assert result[1].location == "london"
assert result[1].country == "UK"
assert result[2].name == "charlie"
assert result[2].location == "madrid"
assert result[2].country == "SPAIN"
def test_i_can_handle_formatter_with_whole_item():
"""Test that formatter receives the whole item, not just the attribute value."""
items = [
Expando({"first_name": "alice", "last_name": "smith"}),
Expando({"first_name": "bob", "last_name": "jones"})
]
settings = [
SelectConf(
attr="first_name",
to="full_name",
formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice smith"
assert result[1].full_name == "bob jones"
def test_i_cannot_select_when_formatter_raises_exception():
"""Test that formatter exceptions are propagated."""
items = [
Expando({"name": "alice", "value": None})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() # will fail on None
)
]
with pytest.raises(AttributeError):
select(items, settings)
def test_i_can_handle_none_values_in_formatter():
"""Test formatter handling None values gracefully when designed to do so."""
items = [
Expando({"name": "alice", "value": None}),
Expando({"name": "bob", "value": "test"})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].value == "NO_VALUE"
assert result[1].value == "TEST"
def test_i_can_select_nested_attributes():
"""Test selecting nested attributes using Expando.get() path notation."""
items = [
Expando({"user": {"profile": {"name": "alice"}}, "age": 25}),
Expando({"user": {"profile": {"name": "bob"}}, "age": 30})
]
settings = [
SelectConf(attr="user.profile.name", to="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert result[1].name == "bob"
assert result[1].age == 30