Compare commits

...

2 Commits

9 changed files with 548 additions and 132 deletions

23
requirements.txt Normal file
View File

@@ -0,0 +1,23 @@
cffi==1.17.1
click==8.2.1
comtypes==1.4.12
iniconfig==2.1.0
markdown-it-py==4.0.0
mdurl==0.1.2
numpy==2.3.2
packaging==25.0
pluggy==1.6.0
psutil==7.0.0
PyAudio==0.2.14
pycaw==20240210
pycparser==2.22
Pygments==2.19.2
pytest==8.4.1
PyYAML==6.0.2
rich==14.1.0
scipy==1.16.1
shellingham==1.5.4
sounddevice==0.5.2
typer==0.17.3
typing_extensions==4.15.0
wyoming==1.7.2

View File

@@ -9,15 +9,14 @@ audio_app = typer.Typer(help="Audio device operations")
# Add commands to subcommands
server_app.command("check")(server.check)
server_app.command("test")(server.test)
audio_app.command("list")(audio.list_devices)
audio_app.command("test")(audio.test_device)
audio_app.command("config")(audio.config_info)
audio_app.command("install")(audio.install)
# Register subcommands
app.add_typer(server_app, name="server")
app.add_typer(audio_app, name="audio")
if __name__ == "__main__":
app()
app()

View File

@@ -3,7 +3,8 @@ import numpy as np
import os
from typing import Optional, List
from ..devices import get_audio_devices_windows, install_audio_cmdlets
from ..devices import get_audio_devices_windows, install_audio_cmdlets, get_audio_devices_windows_from_pnp_devices, \
get_audio_devices_linux
from ..core.utils import *
from ..config import AppConfig
@@ -34,20 +35,49 @@ def _detect_wsl() -> bool:
except Exception:
return False
def _get_device_type(instance_id: str) -> str:
"""Categorize device type based on InstanceId."""
instance_lower = instance_id.lower()
if '0.0.1.00000000' in instance_lower:
return "Input"
return "Recording"
elif '0.0.0.00000000' in instance_lower:
return "Output"
return "Playback"
else:
return "Unknown"
def _get_short_device_id(instance_id: str) -> str:
"""Get device name based on InstanceId."""
if len(instance_id) == 68:
return instance_id[31:-1]
elif len(instance_id) == 55:
return instance_id[18:-1]
else:
return instance_id
def _get_colored_bool(status: bool) -> str:
"""Get colored status string based on status."""
if status:
return f"[green]{status}[/green]"
else:
return str(status)
def _get_colored_status(status: str) -> str:
"""Get colored status string based on status."""
if status == "Unknown":
return f"[red]{status}[/red]"
else:
return str(status)
def list_devices(
device: Optional[int] = typer.Argument(default=None, help="Show details of a device, by its Id"),
pulse: bool = typer.Option(False, "--pulse", help="Show PulseAudio sources"),
native: bool = typer.Option(False, "--native", help="Force native Linux view (WSL only)"),
all_devices: bool = typer.Option(False, "--all", help="Display all devices, even those not connected"),
sort_info: Optional[str] = typer.Option(None, "--sort", help="Sort by column"),
desc_info: bool = typer.Option(False, "--desc", help="Set the sorting order to descending"),
filter_info: Optional[str] = typer.Option(None, "--filter", help="Filter by a column value. Use column=value"),
@@ -60,40 +90,61 @@ def list_devices(
# Determine title based on environment and options
is_wsl = _detect_wsl()
use_windows = is_wsl and not native
if use_windows:
if is_wsl and not native:
typer.echo(typer.style("WSL detected, showing Windows audio devices (use --native for WSL view)",
fg=typer.colors.BLUE))
result = get_audio_devices_windows()
result = get_audio_devices_windows_from_pnp_devices() if all_devices else get_audio_devices_windows()
if not check_result(result):
return
windows_devices = get_results(result.stdout, {"Index": '', "ID": '', 'Name': '', 'Type': '', 'Default': ''})
windows_devices = get_results(result.stdout)
if all_devices:
select_conf = [
SelectConf(attr="Index", default=""),
SelectConf(attr="InstanceId", to="ID", formatter=lambda item: _get_short_device_id(item.InstanceId)),
SelectConf(attr="FriendlyName", to="Name"),
SelectConf(attr="Type", formatter=lambda item: _get_device_type(item.InstanceId)),
SelectConf(attr="Status", formatter=lambda item: _get_colored_status(item.Status)),
]
else:
select_conf = [
SelectConf(attr="Index"),
SelectConf(attr="ID", formatter=lambda item: _get_short_device_id(item.ID)),
SelectConf(attr="Name"),
SelectConf(attr="Type"),
SelectConf(attr="Default", formatter=lambda item: _get_colored_bool(item.Default)),
]
windows_devices = select(windows_devices, select_conf)
# apply sorting and filtering
windows_devices = filter_and_sort(windows_devices, filter_info, sort_info, desc_info)
display_as_table(windows_devices, [
{"name": "Index"},
{"name": "ID"},
{"name": "Name"},
{"name": "Type"},
{"name": "Default"},
])
display_as_table(windows_devices)
else:
linux_devices = get_linux_audio_devices()
display_linux_devices(linux_devices, config, pulse, sort_config)
if native and not is_wsl:
typer.echo(typer.style("Native is applicable only when WSL is detected.", fg=typer.colors.RED))
return
if all_devices and native:
typer.echo(typer.style("All devices is not applicable with native mode.", fg=typer.colors.RED))
return
result = get_audio_devices_linux()
if not check_result(result):
return
linux_devices = result.result
# apply sorting and filtering
linux_devices = filter_and_sort(linux_devices, filter_info, sort_info, desc_info)
display_as_table(linux_devices)
# Show legend
typer.echo(f"\nLegend:")
if use_windows:
typer.echo(typer.style(" ✓ = Device working", fg=typer.colors.GREEN))
typer.echo(typer.style(" ✗ = Device problem", fg=typer.colors.RED))
else:
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
typer.echo(typer.style(" * = Configured device", fg=typer.colors.YELLOW))
typer.echo(f" IN/OUT = Input/Output channel count")
def install():
@@ -104,32 +155,6 @@ def install():
typer.echo(result.stdout)
def get_linux_audio_devices() -> list:
"""Get Linux audio devices using sounddevice."""
try:
devices = sd.query_devices()
linux_devices = []
for i, device in enumerate(devices):
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
linux_devices.append(LinuxAudioDevice(
device_id=i,
name=device['name'],
max_input_channels=device['max_input_channels'],
max_output_channels=device['max_output_channels'],
default_samplerate=device['default_samplerate'],
hostapi_name=hostapi_name
))
return linux_devices
except Exception as e:
typer.echo(typer.style(f"Error querying Linux audio devices: {e}", fg=typer.colors.RED, bold=True))
raise typer.Exit(1)
def display_linux_devices(devices: list,
config: AppConfig,
show_pulse: bool = False,
@@ -216,6 +241,7 @@ def test_device(
device: Optional[int] = typer.Option(None, "--device", "-d", help="Device ID to test (default: configured device)"),
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
save: bool = typer.Option(False, "--save", help="Save recording to WAV file"),
play: bool = typer.Option(False, "--play", help="Play recorded audio through default speakers"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""Test audio recording from a specific device."""
@@ -230,7 +256,11 @@ def test_device(
typer.echo("=" * 20)
# Get device info
devices_list = get_linux_audio_devices()
result = get_audio_devices_linux()
if not check_result(result):
return
devices_list = result.result
if test_device_id is not None:
if test_device_id >= len(devices_list):
@@ -284,72 +314,20 @@ def test_device(
else:
typer.echo(typer.style(" Status: Good signal level", fg=typer.colors.GREEN))
# Play recorded audio if requested
if play:
typer.echo(f"\n{typer.style('Playing recorded audio...', fg=typer.colors.CYAN, bold=True)}")
sd.play(audio_data, samplerate=config.audio.sample_rate)
sd.wait()
typer.echo(typer.style("Playback completed!", fg=typer.colors.CYAN))
# Save if requested
if save:
filename = f"test_device_{test_device_id or 'default'}_{int(duration)}s.wav"
# Note: WyomingAudioRecorder is not defined in the current code
# This would need to be implemented or the save functionality modified
typer.echo(f" Save functionality needs to be implemented")
from scipy.io import wavfile
wavfile.write(filename, config.audio.sample_rate, audio_int16)
typer.echo(typer.style(f"Audio saved to: {filename}", fg=typer.colors.MAGENTA, bold=True))
except Exception as e:
typer.echo(typer.style(f"Recording failed: {e}", fg=typer.colors.RED, bold=True))
raise typer.Exit(1)
def config_info(
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""Show current audio configuration."""
# Load configuration
config = AppConfig.load(config_file)
typer.echo(typer.style("Audio Configuration", fg=typer.colors.BLUE, bold=True))
typer.echo("=" * 21)
# Show current settings
typer.echo(f"\nCurrent settings:")
typer.echo(f" Sample rate: {typer.style(f'{config.audio.sample_rate} Hz', fg=typer.colors.CYAN)}")
typer.echo(f" Channels: {typer.style(str(config.audio.channels), fg=typer.colors.CYAN)}")
if config.audio.device is not None:
typer.echo(f" Device: {typer.style(str(config.audio.device), fg=typer.colors.CYAN)}")
# Show device details
try:
devices_list = get_linux_audio_devices()
if config.audio.device < len(devices_list):
device = devices_list[config.audio.device]
typer.echo(f"\nConfigured device details:")
typer.echo(f" Name: {device['name']}")
typer.echo(f" Input channels: {device['max_input_channels']}")
typer.echo(f" Default rate: {int(device['default_samplerate'])} Hz")
if device['max_input_channels'] == 0:
typer.echo(typer.style(" Warning: This device has no input channels!", fg=typer.colors.RED))
else:
typer.echo(typer.style(f" Warning: Configured device {config.audio.device} not found!",
fg=typer.colors.RED, bold=True))
except Exception as e:
typer.echo(typer.style(f" Error getting device info: {e}", fg=typer.colors.RED))
else:
typer.echo(f" Device: {typer.style('default', fg=typer.colors.CYAN)}")
# Show default device info
try:
default_device = sd.default.device
if hasattr(default_device, '__len__') and len(default_device) >= 2:
default_input = int(default_device[0])
else:
default_input = int(default_device)
devices_list = get_linux_audio_devices()
device = devices_list[default_input]
typer.echo(f"\nDefault input device:")
typer.echo(f" ID: {default_input}")
typer.echo(f" Name: {device['name']}")
typer.echo(f" Input channels: {device['max_input_channels']}")
except Exception as e:
typer.echo(typer.style(f" Error getting default device: {e}", fg=typer.colors.RED))
# Show configuration source info

View File

@@ -4,7 +4,61 @@ from contextlib import closing
import typer
from typing import Optional
from wyoming.audio import AudioStart, AudioChunk, AudioStop
from ..config import AppConfig
import io
import wave
import numpy as np
import sounddevice as sd
import asyncio
from wyoming.client import AsyncTcpClient
from wyoming.asr import Transcribe, Transcript
async def _async_transcribe(host: str, port: int, timeout: float, pcm_bytes: bytes, lang: str) -> Optional[str]:
"""Stream raw PCM data to Wyoming ASR and return transcript text."""
# Instantiate the async TCP client
client = AsyncTcpClient(host, port)
# Audio parameters
rate = 16000
width = 2 # 16-bit
channels = 1
# The client instance is an async context manager.
async with client:
# 1. Send transcription request
await client.write_event(Transcribe(language=lang).event())
# 2. Start the audio stream
await client.write_event(AudioStart(rate, width, channels).event())
# 3. Send audio chunks
chunk_size = 2048 # A reasonable chunk size
for i in range(0, len(pcm_bytes), chunk_size):
chunk_bytes = pcm_bytes[i:i + chunk_size]
await client.write_event(AudioChunk(audio=chunk_bytes, rate=rate, width=width, channels=channels).event())
# 4. Stop the audio stream
await client.write_event(AudioStop().event())
# 5. Read events until a transcript arrives
transcript_text = None
try:
while True:
event = await asyncio.wait_for(client.read_event(), timeout=timeout)
if event is None:
break
if Transcript.is_type(event):
tr = Transcript.from_event(event)
transcript_text = tr.text
break
except asyncio.TimeoutError:
typer.echo(typer.style("Connection timed out waiting for transcript.", fg=typer.colors.YELLOW))
return transcript_text
def check_wyoming_server(host: str, port: int, timeout: float = 3.0) -> tuple[bool, float | None, str | None]:
@@ -57,4 +111,67 @@ def check(
typer.echo(typer.style("Wyoming server unreachable!", fg=typer.colors.RED, bold=True))
typer.echo(f"Server: {final_host}:{final_port}")
typer.echo(typer.style(f"Error: {error}", fg=typer.colors.RED))
raise typer.Exit(1)
raise typer.Exit(1)
def test(
duration: float = typer.Option(3.0, "--duration", help="Recording duration in seconds"),
lang: str = typer.Option("fr", "--lang", help="Language code: 'fr' or 'en'"),
host: Optional[str] = typer.Option(None, "--host", "-h", help="Wyoming server host"),
port: Optional[int] = typer.Option(None, "--port", "-p", help="Wyoming server port"),
timeout: Optional[float] = typer.Option(None, "--timeout", "-t", help="Connection timeout in seconds"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to config file")
):
"""Record from default microphone, send to Wyoming ASR server, and print transcription."""
# Load configuration
config = AppConfig.load(config_file)
final_host = host or config.server.host
final_port = port or config.server.port
final_timeout = timeout or config.server.timeout
# Validate language (two-letter code)
lang = (lang or "fr").strip().lower()
if lang not in ("fr", "en"):
typer.echo(typer.style("Invalid --lang. Use 'fr' or 'en'.", fg=typer.colors.RED))
raise typer.Exit(2)
# Check server reachability first
reachable, latency, err = check_wyoming_server(final_host, final_port, final_timeout)
if not reachable:
typer.echo(typer.style(f"Cannot reach Wyoming server at {final_host}:{final_port}: {err}", fg=typer.colors.RED))
raise typer.Exit(1)
# Record audio (16 kHz mono float32)
sample_rate = 16000
channels = 1
typer.echo(typer.style("Recording...", fg=typer.colors.GREEN, bold=True))
try:
frames = int(duration * sample_rate)
audio = sd.rec(frames, samplerate=sample_rate, channels=channels, dtype="float32")
sd.wait()
except Exception as e:
typer.echo(typer.style(f"Audio recording failed: {e}", fg=typer.colors.RED))
raise typer.Exit(1)
# Convert to PCM16 bytes directly, no need for WAV wrapper
audio_int16 = np.clip(audio.flatten() * 32767.0, -32768, 32767).astype(np.int16)
pcm_bytes = audio_int16.tobytes()
# Send to Wyoming ASR (async)
try:
typer.echo(typer.style(f"Connecting to {final_host}:{final_port} (lang={lang})...", fg=typer.colors.CYAN))
# Run the async helper
transcript_text = asyncio.run(
_async_transcribe(final_host, final_port, final_timeout, pcm_bytes, lang)
)
if transcript_text:
typer.echo(typer.style("\nTranscription:", fg=typer.colors.GREEN, bold=True))
typer.echo(transcript_text)
else:
typer.echo(typer.style("No transcription received.", fg=typer.colors.YELLOW))
except Exception as e:
typer.echo(typer.style(f"ASR request failed: {e}", fg=typer.colors.RED))
raise typer.Exit(1)

View File

@@ -8,8 +8,10 @@ class Expando:
You can then access the property using dot '.' (ex. obj.prop1.prop2)
"""
def __init__(self, props):
self._props = props
def __init__(self, props=None, **kwargs):
self._props = props.copy() if props else {}
if kwargs:
self._props.update(kwargs)
def __getattr__(self, item):
if item not in self._props:
@@ -21,10 +23,11 @@ class Expando:
def __setitem__(self, key, value):
self._props[key] = value
def get(self, path):
def get(self, path, default=None):
"""
returns the value, from a string with represents the path
:param path:
:param default: value to return if path is not found
:return:
"""
current = self._props
@@ -38,7 +41,7 @@ class Expando:
else:
if current is None or attr not in current:
return None
return default
current = current[attr]
return current

View File

@@ -2,7 +2,7 @@ import json
import subprocess
from dataclasses import dataclass
from subprocess import CompletedProcess
from typing import Any
from typing import Any, Callable
import typer
from rich.console import Console
@@ -38,11 +38,26 @@ class FilterConf:
return FilterConf(parts[0].strip(), parts[1].strip())
@dataclass
class SelectConf:
attr: str # property nam to select
to: str = None # rename the property
default: str = None # value to use if property is missing
formatter: Callable[[Any], Any] = None # function to apply to the property using the whole item
@dataclass
class ProcessResult:
result: Any
error: str = None
@property
def stderr(self):
return self.error
@property
def returncode(self):
return 0 if self.result is not None else 1
def sort_by(items: list[Expando], sort_conf: SortConf):
"""Sort a list of items by a given property."""
@@ -52,7 +67,6 @@ def sort_by(items: list[Expando], sort_conf: SortConf):
value = item.geti(property_name, "")
# Convert None to empty string for consistent sorting
return "" if value is None else str(value)
if sort_conf is None:
return items
@@ -86,6 +100,23 @@ def filter_by(items, filter_conf: FilterConf):
]
def select(items: list[Expando], settings: list[SelectConf]) -> list[Expando]:
res = []
for item in items:
new_item = {}
for setting in settings:
attr = setting.attr.strip()
key = setting.to or attr
value = setting.formatter(item) if setting.formatter else item.get(attr, setting.default)
new_item[key] = value
res.append(Expando(new_item))
return res
def run_ps_command(command: str) -> CompletedProcess[str]:
"""Run a PowerShell command and return the output"""
completed = subprocess.run(
@@ -160,7 +191,7 @@ def run_ps_command_live(command: str) -> CompletedProcess[str]:
return completed
def get_results(stdout: str, mappings) -> list[dict]:
def get_results(stdout: str) -> list[Expando]:
stripped = stdout.strip()
if not stripped:
typer.echo(typer.style("PowerShell returned empty output", fg=typer.colors.YELLOW))
@@ -169,15 +200,20 @@ def get_results(stdout: str, mappings) -> list[dict]:
data = json.loads(stripped)
as_list = data if isinstance(data, list) else [data]
res = []
for item in as_list:
mapped = {key: item.get(key, default_value) for key, default_value in mappings.items()}
res.append(Expando(mapped))
return [Expando(item) for item in as_list]
def display_as_table(result, columns_settings: list = None):
def _create_default_columns_settings():
if len(result) == 0:
return []
blue_print = result[0]
return [{"name": attr} for attr in blue_print.as_dict().keys()]
if columns_settings is None:
columns_settings = _create_default_columns_settings()
return res
def display_as_table(result, columns_settings: list):
formatters = {}
table = Table(show_header=True)
for col in [c for c in columns_settings if "name" in c]:

View File

@@ -50,7 +50,7 @@ def get_audio_devices_windows():
return run_ps_command(ps_script)
def get_audio_devices_windows_from_pnp():
def get_audio_devices_windows_from_pnp_devices():
ps_script = "Get-PnpDevice -Class AudioEndpoint | Select-Object FriendlyName, Status, InstanceId | ConvertTo-Json"
return run_ps_command(ps_script)

View File

@@ -29,6 +29,10 @@ def test_i_can_get():
assert dynamic.get("a") == 10
assert dynamic.get("b.c") == "value"
assert dynamic.get("unknown") is None
assert dynamic.get("unknown", "default") == "default"
assert dynamic.get("b.x") is None
assert dynamic.get("b.x", "default") == "default"
assert dynamic.get("b.x", None) is None
def test_i_can_get_insensitive():

256
tests/test_select.py Normal file
View File

@@ -0,0 +1,256 @@
import pytest
from core.utils import SelectConf, select
from core.Expando import Expando
def test_i_can_select_basic_attributes():
"""Test basic selection of existing attributes."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris"}),
Expando({"name": "bob", "age": 30, "city": "london"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert not hasattr(result[0], "city") # city not selected
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_select_with_renamed_attributes():
"""Test renaming attributes using SelectConf.to."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="full_name"),
SelectConf(attr="age", to="years")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice"
assert result[0].years == 25
assert not hasattr(result[0], "name") # original name not present
assert not hasattr(result[0], "age") # original name not present
assert result[1].full_name == "bob"
assert result[1].years == 30
def test_i_can_select_with_formatter():
"""Test applying formatters to selected attributes."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "ALICE"
assert result[0].age == 50
assert result[1].name == "BOB"
assert result[1].age == 60
def test_i_can_select_with_formatter_and_rename():
"""Test combining formatter and renaming."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name", to="upper_name", formatter=lambda item: item.name.upper()),
SelectConf(attr="age", to="double_age", formatter=lambda item: item.age * 2)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].upper_name == "ALICE"
assert result[0].double_age == 50
assert not hasattr(result[0], "name")
assert not hasattr(result[0], "age")
assert result[1].upper_name == "BOB"
assert result[1].double_age == 60
def test_i_can_handle_empty_items_list():
"""Test that empty items list returns empty list."""
items = []
settings = [SelectConf(attr="name")]
result = select(items, settings)
assert result == []
def test_i_can_handle_empty_settings_list():
"""Test that empty settings list returns items with empty properties."""
items = [
Expando({"name": "alice", "age": 25}),
Expando({"name": "bob", "age": 30})
]
settings = []
result = select(items, settings)
assert len(result) == 2
# Each result should be an Expando with empty properties
assert result[0]._props == {}
assert result[1]._props == {}
def test_i_can_handle_missing_attributes():
"""Test that missing attributes return None (normal behavior)."""
items = [
Expando({"name": "alice"}), # no age attribute
Expando({"name": "bob", "age": 30})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="age") # missing in first item
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age is None # missing attribute returns None
assert result[1].name == "bob"
assert result[1].age == 30
def test_i_can_handle_whitespace_in_attr_names():
"""Test that whitespace in attribute names is stripped."""
items = [
Expando({"name": "alice", "age": 25})
]
settings = [
SelectConf(attr=" name "), # whitespace around attr
SelectConf(attr="\tage\n") # tabs and newlines
]
result = select(items, settings)
assert len(result) == 1
assert result[0].name == "alice"
assert result[0].age == 25
def test_i_can_select_multiple_attributes():
"""Test selecting multiple attributes from multiple items."""
items = [
Expando({"name": "alice", "age": 25, "city": "paris", "country": "france"}),
Expando({"name": "bob", "age": 30, "city": "london", "country": "uk"}),
Expando({"name": "charlie", "age": 35, "city": "madrid", "country": "spain"})
]
settings = [
SelectConf(attr="name"),
SelectConf(attr="city", to="location"),
SelectConf(attr="country", formatter=lambda item: item.country.upper())
]
result = select(items, settings)
assert len(result) == 3
assert result[0].name == "alice"
assert result[0].location == "paris"
assert result[0].country == "FRANCE"
assert result[1].name == "bob"
assert result[1].location == "london"
assert result[1].country == "UK"
assert result[2].name == "charlie"
assert result[2].location == "madrid"
assert result[2].country == "SPAIN"
def test_i_can_handle_formatter_with_whole_item():
"""Test that formatter receives the whole item, not just the attribute value."""
items = [
Expando({"first_name": "alice", "last_name": "smith"}),
Expando({"first_name": "bob", "last_name": "jones"})
]
settings = [
SelectConf(
attr="first_name",
to="full_name",
formatter=lambda item: f"{item.get('first_name')} {item.get('last_name')}"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].full_name == "alice smith"
assert result[1].full_name == "bob jones"
def test_i_cannot_select_when_formatter_raises_exception():
"""Test that formatter exceptions are propagated."""
items = [
Expando({"name": "alice", "value": None})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() # will fail on None
)
]
with pytest.raises(AttributeError):
select(items, settings)
def test_i_can_handle_none_values_in_formatter():
"""Test formatter handling None values gracefully when designed to do so."""
items = [
Expando({"name": "alice", "value": None}),
Expando({"name": "bob", "value": "test"})
]
settings = [
SelectConf(
attr="value",
formatter=lambda item: item.get("value").upper() if item.get("value") else "NO_VALUE"
)
]
result = select(items, settings)
assert len(result) == 2
assert result[0].value == "NO_VALUE"
assert result[1].value == "TEST"
def test_i_can_select_nested_attributes():
"""Test selecting nested attributes using Expando.get() path notation."""
items = [
Expando({"user": {"profile": {"name": "alice"}}, "age": 25}),
Expando({"user": {"profile": {"name": "bob"}}, "age": 30})
]
settings = [
SelectConf(attr="user.profile.name", to="name"),
SelectConf(attr="age")
]
result = select(items, settings)
assert len(result) == 2
assert result[0].name == "alice"
assert result[0].age == 25
assert result[1].name == "bob"
assert result[1].age == 30