Files
MyObsidianAI/obsidian_rag/indexer.py
Kodjo Sossouvi d4925f7969 Initial commit
2025-12-12 11:31:44 +01:00

285 lines
8.7 KiB
Python

"""
Indexer module for Obsidian RAG Backend.
This module handles the indexing of markdown files into a ChromaDB vector store
using local embeddings from sentence-transformers.
"""
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, List, Optional, Callable
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from markdown_parser import ParsedDocument
from markdown_parser import parse_markdown_file
# EMBEDDING_MODEL = "all-MiniLM-L6-v2"
EMBEDDING_MODEL = "all-MiniLM-L12-v2"
@dataclass
class ChunkMetadata:
file_path: str
section_title: str
line_start: int
line_end: int
@dataclass
class Chunk:
id: str
text: str
metadata: ChunkMetadata
def index_vault(
vault_path: str,
chroma_db_path: str,
collection_name: str = "obsidian_vault",
embedding_model: str = EMBEDDING_MODEL,
max_chunk_tokens: int = 200,
overlap_tokens: int = 30,
progress_callback: Optional[Callable[[str, int, int], None]] = None,
) -> Dict:
"""
Index all markdown files from vault into ChromaDB.
Args:
vault_path: Path to the Obsidian vault directory
chroma_db_path: Path where ChromaDB will store its data
collection_name: Name of the ChromaDB collection
embedding_model: Name of the sentence-transformers model to use
max_chunk_tokens: Maximum tokens per chunk
overlap_tokens: Number of overlapping tokens between chunks
progress_callback: Optional callback function called for each file processed.
Signature: callback(current_file: str, files_processed: int, total_files: int)
Returns:
Dictionary with indexing statistics:
- files_processed: Number of files successfully processed
- chunks_created: Total number of chunks created
- errors: List of errors encountered (file path and error message)
- collection_name: Name of the collection used
"""
vault_path_obj = Path(vault_path)
if not vault_path_obj.exists():
raise ValueError(f"Vault path does not exist: {vault_path}")
# Initialize embedding model and tokenizer
model = SentenceTransformer(embedding_model)
tokenizer = model.tokenizer
# Initialize ChromaDB client and collection
chroma_client = chromadb.PersistentClient(
path=chroma_db_path,
settings=Settings(anonymized_telemetry=False)
)
collection = _get_or_create_collection(chroma_client, collection_name)
# Find all markdown files
md_files = list(vault_path_obj.rglob("*.md"))
total_files = len(md_files)
# Statistics tracking
stats = {
"files_processed": 0,
"chunks_created": 0,
"errors": [],
"collection_name": collection_name,
}
# Process each file
for md_file in md_files:
# Get relative path for display
relative_path = md_file.relative_to(vault_path_obj)
# Notify callback that we're starting this file
if progress_callback:
progress_callback(str(relative_path), stats["files_processed"], total_files)
try:
# Parse markdown file
parsed_doc = parse_markdown_file(md_file)
# Create chunks from document
chunks = _create_chunks_from_document(
parsed_doc,
tokenizer,
max_chunk_tokens,
overlap_tokens,
vault_path_obj,
)
if chunks:
# Extract data for ChromaDB
documents = [chunk.text for chunk in chunks]
metadatas = [asdict(chunk.metadata) for chunk in chunks]
ids = [chunk.id for chunk in chunks]
# Generate embeddings and add to collection
embeddings = model.encode(documents, show_progress_bar=False)
collection.add(
documents=documents,
metadatas=metadatas,
ids=ids,
embeddings=embeddings.tolist(),
)
stats["chunks_created"] += len(chunks)
stats["files_processed"] += 1
except Exception as e:
# Log error but continue processing
stats["errors"].append({
"file": str(relative_path),
"error": str(e),
})
return stats
def _get_or_create_collection(
chroma_client: chromadb.PersistentClient,
collection_name: str,
) -> chromadb.Collection:
"""
Get or create a ChromaDB collection, resetting it if it already exists.
Args:
chroma_client: ChromaDB client instance
collection_name: Name of the collection
Returns:
ChromaDB collection instance
"""
try:
# Try to delete existing collection
chroma_client.delete_collection(name=collection_name)
except Exception:
# Collection doesn't exist, that's fine
pass
# Create fresh collection
collection = chroma_client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
return collection
def _create_chunks_from_document(
parsed_doc: ParsedDocument,
tokenizer,
max_chunk_tokens: int,
overlap_tokens: int,
vault_path: Path,
) -> List[Chunk]:
"""
Transform a parsed document into chunks with metadata.
Implements hybrid chunking strategy:
- Short sections (≤max_chunk_tokens): one chunk per section
- Long sections (>max_chunk_tokens): split with sliding window
Args:
parsed_doc: Parsed document from markdown_parser
tokenizer: Tokenizer from sentence-transformers model
max_chunk_tokens: Maximum tokens per chunk
overlap_tokens: Number of overlapping tokens between chunks
vault_path: Path to vault root (for relative path calculation)
Returns:
List of chunk dictionaries with 'text', 'metadata', and 'id' keys
"""
chunks = []
file_path = parsed_doc.file_path
relative_path = file_path.relative_to(vault_path)
for section in parsed_doc.sections:
section_text = f"{parsed_doc.title} {section.title} {section.content}"
section_title = section.title
line_start = section.start_line
line_end = section.end_line
# Tokenize section to check length
tokens = tokenizer.encode(section_text, add_special_tokens=False)
if len(tokens) <= max_chunk_tokens:
# Short section: create single chunk
chunk_id = f"{relative_path}::{section_title}::{line_start}-{line_end}"
chunks.append(Chunk(chunk_id, section_text, ChunkMetadata(str(relative_path),
section_title,
line_start,
line_start
)))
else:
# Long section: split with sliding window
sub_chunks = _chunk_section(
section_text,
tokenizer,
max_chunk_tokens,
overlap_tokens,
)
# Create chunk for each sub-chunk
for idx, sub_chunk_text in enumerate(sub_chunks):
chunk_id = f"{relative_path}::{section_title}::{line_start}-{line_end}::chunk{idx}"
chunks.append(Chunk(chunk_id, sub_chunk_text, ChunkMetadata(str(relative_path),
section_title,
line_start,
line_start
)))
return chunks
def _chunk_section(
section_text: str,
tokenizer,
max_chunk_tokens: int,
overlap_tokens: int,
) -> List[str]:
"""
Split a section into overlapping chunks using sliding window.
Args:
section_text: Text content to chunk
tokenizer: Tokenizer from sentence-transformers model
max_chunk_tokens: Maximum tokens per chunk
overlap_tokens: Number of overlapping tokens between chunks
Returns:
List of text chunks
"""
# Apply safety margin to prevent decode/encode inconsistencies
# from exceeding the max token limit
max_chunk_tokens_to_use = int(max_chunk_tokens * 0.98)
# Tokenize the full text
tokens = tokenizer.encode(section_text, add_special_tokens=False)
chunks = []
start_idx = 0
while start_idx < len(tokens):
# Extract chunk tokens
end_idx = start_idx + max_chunk_tokens_to_use
chunk_tokens = tokens[start_idx:end_idx]
# Decode back to text
chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
chunks.append(chunk_text)
# Move window forward (with overlap)
start_idx += max_chunk_tokens_to_use - overlap_tokens
# Avoid infinite loop if overlap >= max_chunk_tokens
if start_idx <= start_idx - (max_chunk_tokens_to_use - overlap_tokens):
break
return chunks