From 618e21e012413d25c655bf92bc6edc1ec2ead167 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Sun, 21 Dec 2025 17:42:17 +0100 Subject: [PATCH] Added Custom Ref Handlers --- CLAUDE.md | 145 +++++++++++--- README.md | 389 ++++++++++++++++++++++++++----------- pyproject.toml | 2 +- src/dbengine/dbengine.py | 72 +++++-- src/dbengine/handlers.py | 62 +++++- src/dbengine/serializer.py | 13 +- src/dbengine/utils.py | 16 +- tests/test_serializer.py | 137 ++++++++++++- 8 files changed, 655 insertions(+), 181 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 01c492a..7a4c6c8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,19 +84,36 @@ pip install -e .[dev] - Converts Python objects to/from JSON-compatible dictionaries - Handles circular references using object ID tracking - Supports custom serialization via handlers (see handlers.py) -- Special tags: `__object__`, `__id__`, `__tuple__`, `__set__`, `__ref__`, `__enum__` +- Special tags: `__object__`, `__id__`, `__tuple__`, `__set__`, `__ref__`, `__digest__`, `__enum__` - Objects can define `use_refs()` method to specify fields that should be pickled instead of JSON-serialized +- `__ref__`: Used for `use_refs()` system (pickle-based storage) +- `__digest__`: Used by BaseRefHandler for custom binary formats (numpy, etc.) **Handlers** (`src/dbengine/handlers.py`) - Extensible handler system for custom type serialization -- BaseHandler interface: `is_eligible_for()`, `tag()`, `serialize()`, `deserialize()` -- Currently implements DateHandler for datetime.date objects +- Three-tier hierarchy: + - `BaseHandler`: Base interface with `is_eligible_for()` and `tag()` + - `BaseInlineHandler`: For JSON-inline storage (e.g., DateHandler) + - `BaseRefHandler`: For custom binary formats stored in `refs/` (e.g., DataFrames) +- `BaseInlineHandler`: Implements `serialize(obj) → dict` and `deserialize(dict) → obj` +- `BaseRefHandler`: Implements `serialize_to_bytes(obj) → bytes` and `deserialize_from_bytes(bytes) → obj` +- Currently implements `DateHandler` (BaseInlineHandler) for datetime.date objects - Use `handlers.register_handler()` to add custom handlers **Utils** (`src/dbengine/utils.py`) - Type checking utilities: `is_primitive()`, `is_dictionary()`, `is_list()`, etc. - Class introspection: `get_full_qualified_name()`, `importable_name()`, `get_class()` -- Stream digest computation with SHA-256 +- Digest computation: `compute_digest_from_stream()`, `compute_digest_from_bytes()` + +**RefHelper and PickleRefHelper** (`src/dbengine/dbengine.py`) +- `RefHelper`: Base class for content-addressable storage in `refs/` directory + - `save_ref_from_bytes(data: bytes) → digest`: Store raw bytes + - `load_ref_to_bytes(digest) → bytes`: Load raw bytes + - Used by `BaseRefHandler` for custom binary formats +- `PickleRefHelper(RefHelper)`: Adds pickle serialization layer + - `save_ref(obj) → digest`: Pickle and store object + - `load_ref(digest) → obj`: Load and unpickle object + - Used by `use_refs()` system and `Serializer` ### Storage Architecture @@ -107,11 +124,15 @@ pip install -e .[dev] │ └── objects/ │ └── {digest_prefix}/ # First 24 chars of digest │ └── {full_digest} # JSON snapshot with metadata -└── refs/ # Shared pickled references +└── refs/ # Shared binary references (cross-tenant) └── {digest_prefix}/ - └── {full_digest} + └── {full_digest} # Pickle or custom binary format ``` +**Note**: The `refs/` directory stores binary data in content-addressable format: +- Pickled objects (via `use_refs()` or `PickleRefHelper`) +- Custom binary formats (via `BaseRefHandler`, e.g., numpy arrays) + ### Metadata System Each snapshot includes automatic metadata fields: @@ -163,13 +184,17 @@ Objects can opt into pickle-based storage for specific fields: ### Custom Type Handlers -To serialize custom types that aren't handled by default serialization: +MyDbEngine supports two types of custom handlers for serializing types: -**1. Create a handler class:** +#### 1. BaseInlineHandler - For JSON Storage + +Use when data should be stored directly in the JSON snapshot (human-readable, smaller datasets). + +**Example: Custom date handler** ```python -from dbengine.handlers import BaseHandler, TAG_SPECIAL +from dbengine.handlers import BaseInlineHandler, handlers -class MyCustomHandler(BaseHandler): +class MyCustomHandler(BaseInlineHandler): def is_eligible_for(self, obj): return isinstance(obj, MyCustomType) @@ -178,26 +203,85 @@ class MyCustomHandler(BaseHandler): def serialize(self, obj) -> dict: return { - TAG_SPECIAL: self.tag(), + "__special__": self.tag(), "data": obj.to_dict() } def deserialize(self, data: dict) -> object: return MyCustomType.from_dict(data["data"]) -``` - -**2. Register the handler:** -```python -from dbengine.handlers import handlers +# Register the handler handlers.register_handler(MyCustomHandler()) ``` -**When to use handlers:** -- Complex types that need custom serialization logic -- Types that can't be pickled reliably -- Types requiring validation during deserialization -- External library types (datetime.date example in handlers.py) +**When to use BaseInlineHandler:** +- Small data structures that fit well in JSON +- Types requiring human-readable storage +- Types needing validation during deserialization +- Simple external library types (e.g., datetime.date) + +#### 2. BaseRefHandler - For Binary Storage + +Use when data should be stored in optimized binary format in `refs/` directory (large datasets, better compression). + +**Example: pandas DataFrame handler** +```python +from dbengine.handlers import BaseRefHandler, handlers +import pandas as pd +import json + +class DataFrameHandler(BaseRefHandler): + def is_eligible_for(self, obj): + return isinstance(obj, pd.DataFrame) + + def tag(self): + return "DataFrame" + + def serialize_to_bytes(self, df) -> bytes: + """Convert DataFrame to compact binary format""" + import numpy as np + + # Store metadata + numpy bytes + metadata = { + "columns": df.columns.tolist(), + "index": df.index.tolist(), + "dtype": str(df.values.dtype) + } + metadata_bytes = json.dumps(metadata).encode('utf-8') + metadata_length = len(metadata_bytes).to_bytes(4, 'big') + numpy_bytes = df.to_numpy().tobytes() + + return metadata_length + metadata_bytes + numpy_bytes + + def deserialize_from_bytes(self, data: bytes) -> object: + """Reconstruct DataFrame from binary format""" + import numpy as np + + # Read metadata + metadata_length = int.from_bytes(data[:4], 'big') + metadata = json.loads(data[4:4+metadata_length].decode('utf-8')) + numpy_bytes = data[4+metadata_length:] + + # Reconstruct array and DataFrame + array = np.frombuffer(numpy_bytes, dtype=metadata['dtype']) + array = array.reshape(len(metadata['index']), len(metadata['columns'])) + + return pd.DataFrame(array, columns=metadata['columns'], index=metadata['index']) + +# Register the handler +handlers.register_handler(DataFrameHandler()) +``` + +**When to use BaseRefHandler:** +- Large binary data (DataFrames, numpy arrays, images) +- Data that benefits from custom compression (e.g., numpy's compact format) +- Types that lose information in JSON conversion +- Shared data across snapshots (automatic deduplication via SHA-256) + +**Key differences:** +- `BaseInlineHandler`: Data stored in JSON snapshot → `{"__special__": "Tag", "data": {...}}` +- `BaseRefHandler`: Data stored in `refs/` → `{"__special__": "Tag", "__digest__": "abc123..."}` +- BaseRefHandler provides automatic deduplication and smaller JSON snapshots ### Using References (use_refs) @@ -215,16 +299,23 @@ class MyDataObject: return {"large_dataframe"} ``` -**When to use refs:** -- Large data structures (DataFrames, numpy arrays) -- Objects that lose information in JSON conversion -- Data shared across multiple snapshots/tenants (deduplication benefit) +**When to use use_refs():** +- Quick solution for large nested objects without writing custom handler +- Works with any picklable object +- Per-object control (some fields in JSON, others pickled) + +**use_refs() vs BaseRefHandler:** +- `use_refs()`: Uses pickle (via `PickleRefHelper`), simple but less optimized +- `BaseRefHandler`: Custom binary format (e.g., numpy), optimized but requires handler code +- Both store in `refs/` and get automatic SHA-256 deduplication +- `use_refs()` generates `{"__ref__": "digest"}` tags +- `BaseRefHandler` generates `{"__special__": "Tag", "__digest__": "digest"}` tags **Trade-offs:** - ✅ Smaller JSON snapshots - ✅ Cross-tenant deduplication -- ❌ Less human-readable (binary pickle format) -- ❌ Python version compatibility concerns with pickle +- ❌ Less human-readable (binary format) +- ❌ Python version compatibility concerns with pickle (use_refs only) ## Testing Notes diff --git a/README.md b/README.md index df08862..89ae91e 100644 --- a/README.md +++ b/README.md @@ -1,187 +1,352 @@ -# DbEngine +# MyDbEngine -A lightweight, git-inspired database engine for Python that maintains complete history of all modifications. +A lightweight, git-inspired versioned database engine for Python with content-addressable storage and complete history tracking. -## Overview +## What is MyDbEngine? -DbEngine is a personal implementation of a versioned database engine that stores snapshots of data changes over time. Each modification creates a new immutable snapshot, allowing you to track the complete history of your data. +MyDbEngine is a file-based versioned database that treats data like Git treats code. Every modification creates an immutable snapshot with a SHA-256 digest, enabling complete history tracking, deduplication, and multi-tenant isolation. -## Key Features +**Key Features:** +- **Immutable Snapshots**: Every change creates a new version, never modifying existing data +- **Content-Addressable Storage**: Identical objects stored only once, referenced by SHA-256 digest +- **Multi-Tenant**: Isolated storage per tenant with shared deduplication in `refs/` +- **Extensible Serialization**: Custom handlers for optimized storage (JSON, binary, pickle) +- **Thread-Safe**: Built-in RLock for concurrent access +- **Zero Dependencies**: Pure Python with no runtime dependencies (pytest only for dev) -- **Version Control**: Every change creates a new snapshot with a unique digest (SHA-256 hash) -- **History Tracking**: Access any previous version of your data -- **Multi-tenant Support**: Isolated data storage per tenant -- **Thread-safe**: Built-in locking mechanism for concurrent access -- **Git-inspired Architecture**: Objects are stored in a content-addressable format -- **Efficient Storage**: Identical objects are stored only once +**When to Use:** +- Version tracking for configuration, user data, or application state +- Multi-tenant applications requiring isolated data with shared deduplication +- Scenarios where you need both human-readable JSON and optimized binary storage -## Architecture +**When NOT to Use:** +- High-frequency writes (creates a snapshot per modification) +- Relational queries (no SQL, no joins) +- Large-scale production databases (file-based, not optimized for millions of records) -The engine uses a file-based storage system with the following structure: +## Installation + +```bash +pip install mydbengine +``` + +## Quick Start + +```python +from dbengine.dbengine import DbEngine + +# Initialize engine and tenant +engine = DbEngine(root=".mytools_db") +engine.init("tenant_1") + +# Save and load data +engine.save("tenant_1", "user_1", "config", {"theme": "dark", "lang": "en"}) +data = engine.load("tenant_1", "config") +print(data) # {"theme": "dark", "lang": "en"} +``` + +## Core Concepts + +### Immutable Snapshots + +Each `save()` or `put()` operation creates a new snapshot with automatic metadata: +- `__parent__`: List containing digest of previous version (or `[None]` for first) +- `__user_id__`: User ID who created the snapshot +- `__date__`: ISO timestamp `YYYYMMDD HH:MM:SS %z` + +### Storage Architecture ``` .mytools_db/ ├── {tenant_id}/ -│ ├── head # Points to latest version of each entry +│ ├── head # JSON: {"entry_name": "latest_digest"} │ └── objects/ -│ └── {digest_prefix}/ -│ └── {full_digest} # Actual object data -└── refs/ # Shared references +│ └── {digest_prefix}/ # First 24 chars of digest +│ └── {full_digest} # JSON snapshot with metadata +└── refs/ # Shared binary references (cross-tenant) + └── {digest_prefix}/ + └── {full_digest} # Pickle or custom binary format ``` -## Installation +### Two Usage Patterns +**Pattern 1: Snapshot-based** - Store complete object states ```python -from db_engine import DbEngine - -# Initialize with default root -db = DbEngine() - -# Or specify custom root directory -db = DbEngine(root="/path/to/database") +engine.save("tenant_1", "user_1", "config", {"theme": "dark", "lang": "en"}) +config = engine.load("tenant_1", "config") ``` +**Pattern 2: Record-based** - Incremental updates to collections +```python +engine.put("tenant_1", "user_1", "users", "john", {"name": "John", "age": 30}) +engine.put("tenant_1", "user_1", "users", "jane", {"name": "Jane", "age": 25}) +all_users = engine.get("tenant_1", "users") # Returns list of all users +``` + +**Important:** Do not mix patterns for the same entry - they use different data structures. + ## Basic Usage -### Initialize Database for a Tenant +### Save and Load Complete Snapshots ```python -tenant_id = "my_company" -db.init(tenant_id) -``` +# Save any Python object +data = {"users": ["alice", "bob"], "count": 2} +digest = engine.save("tenant_1", "user_1", "session", data) -### Save Data - -```python -# Save a complete object -user_id = "john_doe" -entry = "users" -data = {"name": "John", "age": 30} - -digest = db.save(tenant_id, user_id, entry, data) -``` - -### Load Data - -```python # Load latest version -data = db.load(tenant_id, entry="users") +session = engine.load("tenant_1", "session") # Load specific version by digest -data = db.load(tenant_id, entry="users", digest="abc123...") +old_session = engine.load("tenant_1", "session", digest=digest) ``` -### Work with Individual Records +### Incremental Record Updates ```python -# Add or update a single record -db.put(tenant_id, user_id, entry="users", key="john", value={"name": "John", "age": 30}) +# Add/update single record +engine.put("tenant_1", "user_1", "users", "alice", {"name": "Alice", "role": "admin"}) -# Add or update multiple records at once -items = { - "john": {"name": "John", "age": 30}, - "jane": {"name": "Jane", "age": 25} +# Add/update multiple records +users = { + "bob": {"name": "Bob", "role": "user"}, + "charlie": {"name": "Charlie", "role": "user"} } -db.put_many(tenant_id, user_id, entry="users", items=items) +engine.put_many("tenant_1", "user_1", "users", users) -# Get a specific record -user = db.get(tenant_id, entry="users", key="john") +# Get specific record +alice = engine.get("tenant_1", "users", key="alice") -# Get all records -all_users = db.get(tenant_id, entry="users") +# Get all records as list +all_users = engine.get("tenant_1", "users") ``` -### Check Existence +### History Navigation ```python -if db.exists(tenant_id, entry="users"): +# Get history chain (list of digests, newest first) +history = engine.history("tenant_1", "config", max_items=10) + +# Load previous version +previous = engine.load("tenant_1", "config", digest=history[1]) + +# Check if entry exists +if engine.exists("tenant_1", "config"): print("Entry exists") ``` -### Access History +## Custom Serialization + +MyDbEngine supports three approaches for custom serialization: + +### 1. BaseInlineHandler - JSON Storage + +For small data types that should be human-readable in snapshots: ```python -# Get history of an entry (returns list of digests) -history = db.history(tenant_id, entry="users", max_items=10) +from dbengine.handlers import BaseInlineHandler, handlers +import datetime -# Load a previous version -old_data = db.load(tenant_id, entry="users", digest=history[1]) +class DateHandler(BaseInlineHandler): + def is_eligible_for(self, obj): + return isinstance(obj, datetime.date) + + def tag(self): + return "Date" + + def serialize(self, obj): + return { + "__special__": self.tag(), + "year": obj.year, + "month": obj.month, + "day": obj.day + } + + def deserialize(self, data): + return datetime.date(year=data["year"], month=data["month"], day=data["day"]) + +handlers.register_handler(DateHandler()) ``` -## Metadata +### 2. BaseRefHandler - Optimized Binary Storage -Each snapshot automatically includes metadata: +For large data structures that benefit from custom binary formats: -- `__parent__`: Digest of the previous version -- `__user_id__`: User ID who made the change -- `__date__`: Timestamp of the change (format: `YYYYMMDD HH:MM:SS`) +```python +from dbengine.handlers import BaseRefHandler, handlers +import pandas as pd +import numpy as np +import json + +class DataFrameHandler(BaseRefHandler): + def is_eligible_for(self, obj): + return isinstance(obj, pd.DataFrame) + + def tag(self): + return "DataFrame" + + def serialize_to_bytes(self, df): + """Convert DataFrame to compact binary format""" + # Store metadata + numpy bytes + metadata = { + "columns": df.columns.tolist(), + "index": df.index.tolist(), + "dtype": str(df.values.dtype) + } + metadata_bytes = json.dumps(metadata).encode('utf-8') + metadata_length = len(metadata_bytes).to_bytes(4, 'big') + numpy_bytes = df.to_numpy().tobytes() + + return metadata_length + metadata_bytes + numpy_bytes + + def deserialize_from_bytes(self, data): + """Reconstruct DataFrame from binary format""" + # Read metadata + metadata_length = int.from_bytes(data[:4], 'big') + metadata = json.loads(data[4:4+metadata_length].decode('utf-8')) + numpy_bytes = data[4+metadata_length:] + + # Reconstruct array and DataFrame + array = np.frombuffer(numpy_bytes, dtype=metadata['dtype']) + array = array.reshape(len(metadata['index']), len(metadata['columns'])) + + return pd.DataFrame(array, columns=metadata['columns'], index=metadata['index']) + +handlers.register_handler(DataFrameHandler()) + +# Now DataFrames are automatically stored in optimized binary format +df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) +engine.save("tenant_1", "user_1", "data", df) +``` + +**Result:** +- JSON snapshot contains: `{"__special__": "DataFrame", "__digest__": "abc123..."}` +- Binary data stored in `refs/abc123...` (more compact than pickle) +- Automatic deduplication across tenants + +### 3. use_refs() - Selective Pickle Storage + +For objects with specific fields that should be pickled: + +```python +class MyDataObject: + def __init__(self, metadata, large_array): + self.metadata = metadata + self.large_array = large_array # Large numpy array or similar + + @staticmethod + def use_refs(): + """Fields to pickle instead of JSON-serialize""" + return {"large_array"} + +# metadata goes to JSON, large_array goes to refs/ (pickled) +obj = MyDataObject({"name": "dataset_1"}, np.zeros((1000, 1000))) +engine.save("tenant_1", "user_1", "my_data", obj) +``` + +**Comparison:** + +| Approach | Storage | Format | Use Case | +|----------|---------|--------|----------| +| `BaseInlineHandler` | JSON snapshot | Custom dict | Small data, human-readable | +| `BaseRefHandler` | `refs/` directory | Custom binary | Large data, optimized format | +| `use_refs()` | `refs/` directory | Pickle | Quick solution, no handler needed | ## API Reference -### Core Methods +### Initialization -#### `init(tenant_id: str)` -Initialize database structure for a tenant. +| Method | Description | +|--------|-------------| +| `DbEngine(root: str = ".mytools_db")` | Initialize engine with storage root | +| `init(tenant_id: str)` | Create tenant directory structure | +| `is_initialized(tenant_id: str) -> bool` | Check if tenant is initialized | -#### `save(tenant_id: str, user_id: str, entry: str, obj: object) -> str` -Save a complete snapshot. Returns the digest of the saved object. +### Data Operations -#### `load(tenant_id: str, entry: str, digest: str = None) -> object` -Load a snapshot. If digest is None, loads the latest version. +| Method | Description | +|--------|-------------| +| `save(tenant_id, user_id, entry, obj) -> str` | Save complete snapshot, returns digest | +| `load(tenant_id, entry, digest=None) -> object` | Load snapshot (latest if digest=None) | +| `put(tenant_id, user_id, entry, key, value) -> bool` | Add/update single record | +| `put_many(tenant_id, user_id, entry, items) -> bool` | Add/update multiple records | +| `get(tenant_id, entry, key=None, digest=None) -> object` | Get record(s) | +| `exists(tenant_id, entry) -> bool` | Check if entry exists | -#### `put(tenant_id: str, user_id: str, entry: str, key: str, value: object) -> bool` -Add or update a single record. Returns True if a new snapshot was created. +### History -#### `put_many(tenant_id: str, user_id: str, entry: str, items: list | dict) -> bool` -Add or update multiple records. Returns True if a new snapshot was created. +| Method | Description | +|--------|-------------| +| `history(tenant_id, entry, digest=None, max_items=1000) -> list` | Get history chain of digests | +| `get_digest(tenant_id, entry) -> str` | Get current digest for entry | -#### `get(tenant_id: str, entry: str, key: str = None, digest: str = None) -> object` -Retrieve record(s). If key is None, returns all records as a list. +## Performance & Limitations -#### `exists(tenant_id: str, entry: str) -> bool` -Check if an entry exists. +**Strengths:** +- ✅ Deduplication: Identical objects stored once (SHA-256 content addressing) +- ✅ History: Complete audit trail with zero overhead for unchanged data +- ✅ Custom formats: Binary handlers optimize storage (e.g., numpy vs pickle) -#### `history(tenant_id: str, entry: str, digest: str = None, max_items: int = 1000) -> list` -Get the history chain of digests for an entry. +**Limitations:** +- ❌ **File-based**: Not suitable for high-throughput applications +- ❌ **No indexing**: No SQL queries, no complex filtering +- ❌ **Snapshot overhead**: Each change creates a new snapshot +- ❌ **History chains**: Long histories require multiple file reads -#### `get_digest(tenant_id: str, entry: str) -> str` -Get the current digest for an entry. +**Performance Tips:** +- Use `put_many()` instead of multiple `put()` calls (creates one snapshot) +- Use `BaseRefHandler` for large binary data instead of pickle +- Limit history traversal with `max_items` parameter +- Consider archiving old snapshots for long-running entries -## Usage Patterns +## Development -### Pattern 1: Snapshot-based (using `save()`) -Best for saving complete states of complex objects. +### Running Tests -```python -config = {"theme": "dark", "language": "en"} -db.save(tenant_id, user_id, "config", config) +```bash +# All tests +pytest + +# Specific test file +pytest tests/test_dbengine.py +pytest tests/test_serializer.py + +# Single test +pytest tests/test_dbengine.py::test_i_can_save_and_load ``` -### Pattern 2: Record-based (using `put()` / `put_many()`) -Best for managing collections of items incrementally. +### Building Package -```python -db.put(tenant_id, user_id, "settings", "theme", "dark") -db.put(tenant_id, user_id, "settings", "language", "en") +```bash +# Build distribution +python -m build + +# Clean build artifacts +make clean ``` -**Note**: Don't mix these patterns for the same entry, as they use different data structures. +### Project Structure -## Thread Safety +``` +src/dbengine/ +├── dbengine.py # Main DbEngine and RefHelper classes +├── serializer.py # JSON serialization with handlers +├── handlers.py # BaseHandler, BaseInlineHandler, BaseRefHandler +└── utils.py # Type checking and digest computation -DbEngine uses `RLock` internally, making it safe for multi-threaded applications. +tests/ +├── test_dbengine.py # DbEngine functionality tests +└── test_serializer.py # Serialization and handler tests +``` -## Exceptions +## Contributing -- `DbException`: Raised for database-related errors (missing entries, invalid parameters, etc.) - -## Performance Considerations - -- Objects are stored as JSON files -- Identical objects (same SHA-256) are stored only once -- History chains can become long; use `max_items` parameter to limit traversal -- File system performance impacts overall speed +This is a personal implementation. For bug reports or feature requests, please contact the author. ## License -This is a personal implementation. Please check with the author for licensing terms. \ No newline at end of file +See LICENSE file for details. + +## Version History +* 0.1.0 - Initial release +* 0.2.0 - Added custom reference handlers \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 5869ce9..23d1e3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mydbengine" -version = "0.1.0" +version = "0.2.0" description = "A lightweight, git-inspired database engine that maintains complete history of all modifications" readme = "README.md" requires-python = ">=3.8" diff --git a/src/dbengine/dbengine.py b/src/dbengine/dbengine.py index 5fb80b6..a6d96f7 100644 --- a/src/dbengine/dbengine.py +++ b/src/dbengine/dbengine.py @@ -8,7 +8,7 @@ import pickle from threading import RLock from dbengine.serializer import Serializer -from dbengine.utils import get_stream_digest +from dbengine.utils import compute_digest_from_bytes TYPE_KEY = "__type__" TAG_PARENT = "__parent__" @@ -24,41 +24,75 @@ class DbException(Exception): class RefHelper: + """Base class for reference storage using content-addressable storage with SHA-256.""" + def __init__(self, get_ref_path): self.get_ref_path = get_ref_path - def save_ref(self, obj): + def save_ref_from_bytes(self, data: bytes): """ + Save raw bytes to refs/ directory using content-addressable storage. + Used by BaseRefHandler for custom serialization formats. - :param obj: - :return: + :param data: Raw bytes to save + :return: SHA-256 digest of the data """ - buffer = io.BytesIO() - pickler = pickle.Pickler(buffer) - pickler.dump(obj) - - digest = get_stream_digest(buffer) + digest = compute_digest_from_bytes(data) target_path = self.get_ref_path(digest) if not os.path.exists(os.path.dirname(target_path)): os.makedirs(os.path.dirname(target_path)) - buffer.seek(0) - with open(self.get_ref_path(digest), "wb") as file: - while chunk := buffer.read(BUFFER_SIZE): - file.write(chunk) + with open(target_path, "wb") as file: + file.write(data) + logger.debug(f"Saved raw bytes with digest {digest}") + return digest + + def load_ref_to_bytes(self, digest): + """ + Load raw bytes from refs/ directory. + Used by BaseRefHandler for custom deserialization formats. + + :param digest: SHA-256 digest of the data + :return: Raw bytes + """ + with open(self.get_ref_path(digest), 'rb') as file: + return file.read() + + +class PickleRefHelper(RefHelper): + """RefHelper that adds pickle serialization layer on top of byte storage.""" + + def save_ref(self, obj): + """ + Pickle an object and save it to refs/ directory. + Used by the use_refs() system for objects. + + :param obj: Object to pickle and save + :return: SHA-256 digest of the pickled data + """ + buffer = io.BytesIO() + pickler = pickle.Pickler(buffer) + pickler.dump(obj) + + buffer.seek(0) + data = buffer.read() + + digest = self.save_ref_from_bytes(data) logger.debug(f"Saved object type '{type(obj).__name__}' with digest {digest}") return digest def load_ref(self, digest): """ + Load pickled object from refs/ directory. + Used by the use_refs() system for objects. - :param digest: - :return: + :param digest: SHA-256 digest of the pickled data + :return: Unpickled object """ - with open(self.get_ref_path(digest), 'rb') as file: - return pickle.load(file) + data = self.load_ref_to_bytes(digest) + return pickle.loads(data) class DbEngine: @@ -316,13 +350,13 @@ class DbEngine: :return: """ with self.lock: - serializer = Serializer(RefHelper(self._get_ref_path)) + serializer = Serializer(PickleRefHelper(self._get_ref_path)) use_refs = getattr(obj, "use_refs")() if hasattr(obj, "use_refs") else None return serializer.serialize(obj, use_refs) def _deserialize(self, as_dict): with self.lock: - serializer = Serializer(RefHelper(self._get_ref_path)) + serializer = Serializer(PickleRefHelper(self._get_ref_path)) return serializer.deserialize(as_dict) def _update_head(self, tenant_id, entry, digest): diff --git a/src/dbengine/handlers.py b/src/dbengine/handlers.py index 24744e8..22d82c5 100644 --- a/src/dbengine/handlers.py +++ b/src/dbengine/handlers.py @@ -8,20 +8,66 @@ TAG_SPECIAL = "__special__" class BaseHandler: + """Base class for all handlers. Subclasses must implement is_eligible_for() and tag().""" + def is_eligible_for(self, obj): - pass - + """Check if this handler can process the given object.""" + raise NotImplementedError + def tag(self): - pass - + """Return a unique tag identifying this handler type.""" + raise NotImplementedError + + +class BaseInlineHandler(BaseHandler): + """Handler that serializes data directly into the JSON snapshot.""" + def serialize(self, obj) -> dict: - pass - + """Serialize object to dict for inline storage in JSON snapshot.""" + raise NotImplementedError + def deserialize(self, data: dict) -> object: - pass + """Deserialize object from dict stored in JSON snapshot.""" + raise NotImplementedError -class DateHandler(BaseHandler): +class BaseRefHandler(BaseHandler): + """Handler that serializes data to refs/ directory using content-addressable storage.""" + + def serialize_to_bytes(self, obj) -> bytes: + """Serialize object to bytes for storage in refs/. Must be implemented by subclass.""" + raise NotImplementedError + + def deserialize_from_bytes(self, data: bytes) -> object: + """Deserialize object from bytes loaded from refs/. Must be implemented by subclass.""" + raise NotImplementedError + + def serialize(self, obj, ref_helper) -> dict: + """ + Default implementation: converts object to bytes, saves to refs/, returns dict with tag and digest. + Can be overridden if custom behavior is needed. + """ + stream = self.serialize_to_bytes(obj) + digest = ref_helper.save_ref_from_bytes(stream) + from dbengine.serializer import TAG_DIGEST + TAG_SPECIAL = "__special__" + return { + TAG_SPECIAL: self.tag(), + TAG_DIGEST: digest + } + + def deserialize(self, data: dict, ref_helper) -> object: + """ + Default implementation: loads bytes from refs/ using digest, deserializes from bytes. + Can be overridden if custom behavior is needed. + """ + from dbengine.serializer import TAG_DIGEST + digest = data[TAG_DIGEST] + stream = ref_helper.load_ref_to_bytes(digest) + return self.deserialize_from_bytes(stream) + + +class DateHandler(BaseInlineHandler): def is_eligible_for(self, obj): return isinstance(obj, datetime.date) diff --git a/src/dbengine/serializer.py b/src/dbengine/serializer.py index 9209fdd..8651309 100644 --- a/src/dbengine/serializer.py +++ b/src/dbengine/serializer.py @@ -1,6 +1,6 @@ import copy -from dbengine.handlers import handlers +from dbengine.handlers import handlers, BaseRefHandler from dbengine.utils import * TAG_ID = "__id__" @@ -8,6 +8,7 @@ TAG_OBJECT = "__object__" TAG_TUPLE = "__tuple__" TAG_SET = "__set__" TAG_REF = "__ref__" +TAG_DIGEST = "__digest__" TAG_ENUM = "__enum__" @@ -56,7 +57,10 @@ class Serializer: return self._deserialize_obj_instance(obj) if (handler := handlers.get_handler(obj)) is not None: - return handler.deserialize(obj) + if has_tag(obj, TAG_DIGEST): + return handler.deserialize(obj, self.ref_helper) + else: + return handler.deserialize(obj) if is_list(obj): return [self.deserialize(v) for v in obj] @@ -115,7 +119,10 @@ class Serializer: use_refs.update(f"{path}.{sub_path}" for sub_path in current_obj_use_refs) if (handler := handlers.get_handler(obj)) is not None: - return handler.serialize(obj) + if isinstance(handler, BaseRefHandler): + return handler.serialize(obj, self.ref_helper) + else: + return handler.serialize(obj) # flatten data = {} diff --git a/src/dbengine/utils.py b/src/dbengine/utils.py index f984e28..00843cd 100644 --- a/src/dbengine/utils.py +++ b/src/dbengine/utils.py @@ -7,7 +7,7 @@ from enum import Enum PRIMITIVES = (str, bool, type(None), int, float) -def get_stream_digest(stream): +def compute_digest_from_stream(stream): """ Compute a SHA256 from a stream :param stream: @@ -19,7 +19,19 @@ def get_stream_digest(stream): stream.seek(0) for byte_block in iter(lambda: stream.read(4096), b""): sha256_hash.update(byte_block) - + + return sha256_hash.hexdigest() + + +def compute_digest_from_bytes(data: bytes): + """ + Compute a SHA256 digest from raw bytes. + + :param data: Raw bytes to hash + :return: SHA-256 hexadecimal digest + """ + sha256_hash = hashlib.sha256() + sha256_hash.update(data) return sha256_hash.hexdigest() diff --git a/tests/test_serializer.py b/tests/test_serializer.py index 5b0922a..22e3af9 100644 --- a/tests/test_serializer.py +++ b/tests/test_serializer.py @@ -6,7 +6,8 @@ from enum import Enum import pytest -from dbengine.serializer import TAG_TUPLE, TAG_SET, Serializer, TAG_OBJECT, TAG_ID, TAG_REF +from dbengine.serializer import TAG_TUPLE, TAG_SET, Serializer, TAG_OBJECT, TAG_ID, TAG_REF, TAG_DIGEST +from dbengine.handlers import BaseRefHandler, handlers class Obj: @@ -72,28 +73,74 @@ class DummyComplexClass: prop3: ObjEnum +class BytesData: + """Simple class to hold binary data for testing BaseRefHandler""" + + def __init__(self, data: bytes): + self.data = data + + def __eq__(self, other): + if not isinstance(other, BytesData): + return False + return self.data == other.data + + def __hash__(self): + return hash(self.data) + + +class BytesDataHandler(BaseRefHandler): + """Test handler that stores BytesData using refs/ directory""" + + def is_eligible_for(self, obj): + return isinstance(obj, BytesData) + + def tag(self): + return "BytesData" + + def serialize_to_bytes(self, obj) -> bytes: + """Just return the raw bytes from the object""" + return obj.data + + def deserialize_from_bytes(self, data: bytes) -> object: + """Reconstruct BytesData from raw bytes""" + return BytesData(data) + + class DummyRefHelper: """ When something is too complicated to serialize, we just default to pickle That is what this helper class is doing """ - + def __init__(self): self.refs = {} - + def save_ref(self, obj): sha256_hash = hashlib.sha256() - + pickled_data = pickle.dumps(obj) sha256_hash.update(pickled_data) digest = sha256_hash.hexdigest() - + self.refs[digest] = pickled_data return digest - + def load_ref(self, digest): return pickle.loads(self.refs[digest]) + def save_ref_from_bytes(self, data: bytes): + """Save raw bytes for BaseRefHandler""" + sha256_hash = hashlib.sha256() + sha256_hash.update(data) + digest = sha256_hash.hexdigest() + + self.refs[digest] = data + return digest + + def load_ref_to_bytes(self, digest): + """Load raw bytes for BaseRefHandler""" + return self.refs[digest] + @pytest.mark.parametrize("obj, expected", [ (1, 1), @@ -260,9 +307,81 @@ def test_can_use_refs_when_circular_reference(): def test_i_can_serialize_date(): obj = datetime.date.today() serializer = Serializer() - + flatten = serializer.serialize(obj) - + decoded = serializer.deserialize(flatten) - + assert decoded == obj + + +def test_i_can_serialize_with_base_ref_handler(): + """Test that BaseRefHandler correctly stores data in refs/ with TAG_DIGEST""" + # Register the test handler + test_handler = BytesDataHandler() + handlers.register_handler(test_handler) + + try: + # Create test data + test_bytes = b"Hello, this is binary data for testing!" + obj = BytesData(test_bytes) + + # Serialize with ref_helper + ref_helper = DummyRefHelper() + serializer = Serializer(ref_helper) + + flatten = serializer.serialize(obj) + + # Verify structure: should have TAG_SPECIAL and TAG_DIGEST + assert "__special__" in flatten + assert flatten["__special__"] == "BytesData" + assert TAG_DIGEST in flatten + + # Verify data was stored in refs + digest = flatten[TAG_DIGEST] + assert digest in ref_helper.refs + assert ref_helper.refs[digest] == test_bytes + + # Deserialize and verify + decoded = serializer.deserialize(flatten) + assert decoded == obj + assert decoded.data == test_bytes + + finally: + # Clean up: remove the test handler + handlers.handlers.remove(test_handler) + + +def test_i_can_serialize_object_containing_base_ref_handler_data(): + """Test that objects containing BaseRefHandler-managed data work correctly""" + # Register the test handler + test_handler = BytesDataHandler() + handlers.register_handler(test_handler) + + try: + # Create an object that contains BytesData + bytes_obj = BytesData(b"Some binary content") + wrapper = Obj(1, "test", bytes_obj) + + # Serialize + ref_helper = DummyRefHelper() + serializer = Serializer(ref_helper) + + flatten = serializer.serialize(wrapper) + + # Verify structure + assert flatten[TAG_OBJECT] == 'tests.test_serializer.Obj' + assert flatten['a'] == 1 + assert flatten['b'] == "test" + assert flatten['c']["__special__"] == "BytesData" + assert TAG_DIGEST in flatten['c'] + + # Deserialize and verify + decoded = serializer.deserialize(flatten) + assert decoded.a == wrapper.a + assert decoded.b == wrapper.b + assert decoded.c == wrapper.c + + finally: + # Clean up + handlers.handlers.remove(test_handler)