382 lines
11 KiB
Python
382 lines
11 KiB
Python
"""
|
|
Unit tests for the indexer module.
|
|
"""
|
|
|
|
import chromadb
|
|
import pytest
|
|
from chromadb.config import Settings
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
from indexer import (
|
|
index_vault,
|
|
_chunk_section,
|
|
_create_chunks_from_document,
|
|
_get_or_create_collection, EMBEDDING_MODEL,
|
|
)
|
|
from obsidian_rag.markdown_parser import ParsedDocument, MarkdownSection
|
|
|
|
|
|
# Fixtures
|
|
|
|
@pytest.fixture
|
|
def tokenizer():
|
|
"""Provide sentence-transformers tokenizer."""
|
|
model = SentenceTransformer(EMBEDDING_MODEL)
|
|
return model.tokenizer
|
|
|
|
|
|
@pytest.fixture
|
|
def embedding_model():
|
|
"""Provide sentence-transformers model."""
|
|
return SentenceTransformer(EMBEDDING_MODEL)
|
|
|
|
|
|
@pytest.fixture
|
|
def chroma_client(tmp_path):
|
|
"""Provide ChromaDB client with temporary storage."""
|
|
client = chromadb.PersistentClient(
|
|
path=str(tmp_path / "chroma_test"),
|
|
settings=Settings(anonymized_telemetry=False)
|
|
)
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def test_vault(tmp_path):
|
|
"""Create a temporary vault with test markdown files."""
|
|
vault_path = tmp_path / "test_vault"
|
|
vault_path.mkdir()
|
|
return vault_path
|
|
|
|
|
|
# Tests for _chunk_section()
|
|
|
|
def test_i_can_chunk_short_section_into_single_chunk(tokenizer):
|
|
"""Test that a short section is not split."""
|
|
# Create text with ~100 tokens
|
|
short_text = " ".join(["word"] * 100)
|
|
|
|
chunks = _chunk_section(
|
|
section_text=short_text,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
)
|
|
|
|
assert len(chunks) == 1
|
|
assert chunks[0] == short_text
|
|
|
|
|
|
def test_i_can_chunk_long_section_with_overlap(tokenizer):
|
|
"""Test splitting long section with overlap."""
|
|
# Create text with ~500 tokens
|
|
long_text = " ".join([f"word{i}" for i in range(500)])
|
|
|
|
chunks = _chunk_section(
|
|
section_text=long_text,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
)
|
|
|
|
# Should create multiple chunks
|
|
assert len(chunks) >= 2
|
|
|
|
# Verify no chunk exceeds max tokens
|
|
for chunk in chunks:
|
|
tokens = tokenizer.encode(chunk, add_special_tokens=False)
|
|
assert len(tokens) <= 200
|
|
|
|
# Verify overlap exists between consecutive chunks
|
|
for i in range(len(chunks) - 1):
|
|
# Check that some words from end of chunk[i] appear in start of chunk[i+1]
|
|
words_chunk1 = chunks[i].split()[-10:] # Last 10 words
|
|
words_chunk2 = chunks[i + 1].split()[:10] # First 10 words
|
|
|
|
# At least some overlap should exist
|
|
overlap_found = any(word in words_chunk2 for word in words_chunk1)
|
|
assert overlap_found
|
|
|
|
|
|
def test_i_can_chunk_empty_section(tokenizer):
|
|
"""Test chunking an empty section."""
|
|
empty_text = ""
|
|
|
|
chunks = _chunk_section(
|
|
section_text=empty_text,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
)
|
|
|
|
assert len(chunks) == 0
|
|
|
|
|
|
# Tests for _create_chunks_from_document()
|
|
|
|
def test_i_can_create_chunks_from_document_with_short_sections(tmp_path, tokenizer):
|
|
"""Test creating chunks from document with only short sections."""
|
|
vault_path = tmp_path / "vault"
|
|
vault_path.mkdir()
|
|
|
|
parsed_doc = ParsedDocument(
|
|
file_path=vault_path / "test.md",
|
|
title="test.md",
|
|
sections=[
|
|
MarkdownSection(1, "Section 1", "This is a short section with few words.", [], 1, 2),
|
|
MarkdownSection(2, "Section 2", "Another short section here.", ["Section 1"], 3, 4),
|
|
MarkdownSection(3, "Section 3", "Third short section.", ["Section 1", "Section 3"], 5, 6),
|
|
],
|
|
raw_content="" # not used in this test
|
|
)
|
|
|
|
chunks = _create_chunks_from_document(
|
|
parsed_doc=parsed_doc,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
vault_path=vault_path,
|
|
)
|
|
|
|
# Should create 3 chunks (one per section)
|
|
assert len(chunks) == 3
|
|
|
|
# Verify metadata
|
|
for i, chunk in enumerate(chunks):
|
|
metadata = chunk.metadata
|
|
assert metadata.file_path == "test.md"
|
|
assert metadata.section_title == f"Section {i + 1}"
|
|
assert isinstance(metadata.line_start, int)
|
|
assert isinstance(metadata.line_end, int)
|
|
|
|
# Verify ID format
|
|
assert "test.md" in chunk.id
|
|
assert f"Section {i + 1}" in chunk.id
|
|
|
|
|
|
def test_i_can_create_chunks_from_document_with_long_section(tmp_path, tokenizer):
|
|
"""Test creating chunks from document with a long section that needs splitting."""
|
|
vault_path = tmp_path / "vault"
|
|
vault_path.mkdir()
|
|
|
|
# Create long content (~500 tokens)
|
|
long_content = " ".join([f"word{i}" for i in range(500)])
|
|
|
|
parsed_doc = ParsedDocument(
|
|
file_path=vault_path / "test.md",
|
|
title="test.md",
|
|
sections=[
|
|
MarkdownSection(1, "Long Section", long_content, [], 1, 1)
|
|
],
|
|
raw_content=long_content,
|
|
)
|
|
|
|
chunks = _create_chunks_from_document(
|
|
parsed_doc=parsed_doc,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
vault_path=vault_path,
|
|
)
|
|
|
|
# Should create multiple chunks
|
|
assert len(chunks) >= 2
|
|
|
|
# All chunks should have same section_title
|
|
for chunk in chunks:
|
|
assert chunk.metadata.section_title == "Long Section"
|
|
assert chunk.metadata.line_start == 1
|
|
assert chunk.metadata.line_end == 1
|
|
|
|
# IDs should include chunk numbers
|
|
assert "::chunk0" in chunks[0].id
|
|
assert "::chunk1" in chunks[1].id
|
|
|
|
|
|
def test_i_can_create_chunks_with_correct_relative_paths(tmp_path, tokenizer):
|
|
"""Test that relative paths are correctly computed."""
|
|
vault_path = tmp_path / "vault"
|
|
vault_path.mkdir()
|
|
|
|
# Create subdirectory
|
|
subdir = vault_path / "subfolder"
|
|
subdir.mkdir()
|
|
|
|
parsed_doc = ParsedDocument(
|
|
file_path=subdir / "nested.md",
|
|
title=f"{subdir} nested.md",
|
|
sections=[
|
|
MarkdownSection(1, "Section", "Some content here.", [], 1, 2),
|
|
],
|
|
raw_content="",
|
|
)
|
|
|
|
chunks = _create_chunks_from_document(
|
|
parsed_doc=parsed_doc,
|
|
tokenizer=tokenizer,
|
|
max_chunk_tokens=200,
|
|
overlap_tokens=30,
|
|
vault_path=vault_path,
|
|
)
|
|
|
|
assert len(chunks) == 1
|
|
assert chunks[0].metadata.file_path == "subfolder/nested.md"
|
|
|
|
|
|
# Tests for _get_or_create_collection()
|
|
|
|
def test_i_can_create_new_collection(chroma_client):
|
|
"""Test creating a new collection that doesn't exist."""
|
|
collection_name = "test_collection"
|
|
|
|
collection = _get_or_create_collection(chroma_client, collection_name)
|
|
|
|
assert collection.name == collection_name
|
|
assert collection.count() == 0 # Should be empty
|
|
|
|
|
|
def test_i_can_reset_existing_collection(chroma_client):
|
|
"""Test that an existing collection is deleted and recreated."""
|
|
collection_name = "test_collection"
|
|
|
|
# Create collection and add data
|
|
first_collection = chroma_client.create_collection(collection_name)
|
|
first_collection.add(
|
|
documents=["test document"],
|
|
ids=["test_id"],
|
|
)
|
|
assert first_collection.count() == 1
|
|
|
|
# Reset collection
|
|
new_collection = _get_or_create_collection(chroma_client, collection_name)
|
|
|
|
assert new_collection.name == collection_name
|
|
assert new_collection.count() == 0 # Should be empty after reset
|
|
|
|
|
|
# Tests for index_vault()
|
|
|
|
def test_i_can_index_single_markdown_file(test_vault, tmp_path, embedding_model):
|
|
"""Test indexing a single markdown file."""
|
|
# Create test markdown file
|
|
test_file = test_vault / "test.md"
|
|
test_file.write_text(
|
|
"# Title\n\nThis is a test document with some content.\n\n## Section\n\nMore content here."
|
|
)
|
|
|
|
chroma_path = tmp_path / "chroma_db"
|
|
|
|
stats = index_vault(
|
|
vault_path=str(test_vault),
|
|
chroma_db_path=str(chroma_path),
|
|
collection_name="test_collection",
|
|
)
|
|
|
|
assert stats["files_processed"] == 1
|
|
assert stats["chunks_created"] > 0
|
|
assert stats["errors"] == []
|
|
assert stats["collection_name"] == "test_collection"
|
|
|
|
# Verify collection contains data
|
|
client = chromadb.PersistentClient(
|
|
path=str(chroma_path),
|
|
settings=Settings(anonymized_telemetry=False)
|
|
)
|
|
collection = client.get_collection("test_collection")
|
|
assert collection.count() == stats["chunks_created"]
|
|
|
|
|
|
def test_i_can_index_multiple_markdown_files(test_vault, tmp_path):
|
|
"""Test indexing multiple markdown files."""
|
|
# Create multiple test files
|
|
for i in range(3):
|
|
test_file = test_vault / f"test{i}.md"
|
|
test_file.write_text(f"# Document {i}\n\nContent for document {i}.")
|
|
|
|
chroma_path = tmp_path / "chroma_db"
|
|
|
|
stats = index_vault(
|
|
vault_path=str(test_vault),
|
|
chroma_db_path=str(chroma_path),
|
|
)
|
|
|
|
assert stats["files_processed"] == 3
|
|
assert stats["chunks_created"] >= 3 # At least one chunk per file
|
|
assert stats["errors"] == []
|
|
|
|
|
|
def test_i_can_continue_indexing_after_file_error(test_vault, tmp_path, monkeypatch):
|
|
"""Test that indexing continues after encountering an error."""
|
|
# Create valid files
|
|
(test_vault / "valid1.md").write_text("# Valid 1\n\nContent here.")
|
|
(test_vault / "valid2.md").write_text("# Valid 2\n\nMore content.")
|
|
(test_vault / "problematic.md").write_text("# Problem\n\nThis will fail.")
|
|
|
|
# Mock parse_markdown_file to fail for problematic.md
|
|
from obsidian_rag import markdown_parser
|
|
original_parse = markdown_parser.parse_markdown_file
|
|
|
|
def mock_parse(file_path):
|
|
if "problematic.md" in str(file_path):
|
|
raise ValueError("Simulated parsing error")
|
|
return original_parse(file_path)
|
|
|
|
monkeypatch.setattr("indexer.parse_markdown_file", mock_parse)
|
|
|
|
chroma_path = tmp_path / "chroma_db"
|
|
|
|
stats = index_vault(
|
|
vault_path=str(test_vault),
|
|
chroma_db_path=str(chroma_path),
|
|
)
|
|
|
|
# Should process 2 valid files
|
|
assert stats["files_processed"] == 2
|
|
assert len(stats["errors"]) == 1
|
|
assert "problematic.md" in stats["errors"][0]["file"]
|
|
assert "Simulated parsing error" in stats["errors"][0]["error"]
|
|
|
|
|
|
def test_i_cannot_index_nonexistent_vault(tmp_path):
|
|
"""Test that indexing a nonexistent vault raises an error."""
|
|
nonexistent_path = tmp_path / "nonexistent_vault"
|
|
chroma_path = tmp_path / "chroma_db"
|
|
|
|
with pytest.raises(ValueError, match="Vault path does not exist"):
|
|
index_vault(
|
|
vault_path=str(nonexistent_path),
|
|
chroma_db_path=str(chroma_path),
|
|
)
|
|
|
|
|
|
def test_i_can_verify_embeddings_are_generated(test_vault, tmp_path):
|
|
"""Test that embeddings are properly generated and stored."""
|
|
# Create test file
|
|
test_file = test_vault / "test.md"
|
|
test_file.write_text("# Test\n\nThis is test content for embedding generation.")
|
|
|
|
chroma_path = tmp_path / "chroma_db"
|
|
|
|
stats = index_vault(
|
|
vault_path=str(test_vault),
|
|
chroma_db_path=str(chroma_path),
|
|
)
|
|
|
|
# Verify embeddings in collection
|
|
client = chromadb.PersistentClient(
|
|
path=str(chroma_path),
|
|
settings=Settings(anonymized_telemetry=False)
|
|
)
|
|
collection = client.get_collection("obsidian_vault")
|
|
|
|
# Get all items
|
|
results = collection.get(include=["embeddings"])
|
|
|
|
assert len(results["ids"]) == stats["chunks_created"]
|
|
assert results["embeddings"] is not None
|
|
|
|
# Verify embeddings are non-zero vectors of correct dimension
|
|
for embedding in results["embeddings"]:
|
|
assert len(embedding) == 384 # all-MiniLM-L6-v2 dimension
|
|
assert any(val != 0 for val in embedding) # Not all zeros
|