Added Custom Ref Handlers

This commit is contained in:
2025-12-21 17:42:17 +01:00
parent b17fc450a2
commit 618e21e012
8 changed files with 655 additions and 181 deletions

145
CLAUDE.md
View File

@@ -84,19 +84,36 @@ pip install -e .[dev]
- Converts Python objects to/from JSON-compatible dictionaries
- Handles circular references using object ID tracking
- Supports custom serialization via handlers (see handlers.py)
- Special tags: `__object__`, `__id__`, `__tuple__`, `__set__`, `__ref__`, `__enum__`
- Special tags: `__object__`, `__id__`, `__tuple__`, `__set__`, `__ref__`, `__digest__`, `__enum__`
- Objects can define `use_refs()` method to specify fields that should be pickled instead of JSON-serialized
- `__ref__`: Used for `use_refs()` system (pickle-based storage)
- `__digest__`: Used by BaseRefHandler for custom binary formats (numpy, etc.)
**Handlers** (`src/dbengine/handlers.py`)
- Extensible handler system for custom type serialization
- BaseHandler interface: `is_eligible_for()`, `tag()`, `serialize()`, `deserialize()`
- Currently implements DateHandler for datetime.date objects
- Three-tier hierarchy:
- `BaseHandler`: Base interface with `is_eligible_for()` and `tag()`
- `BaseInlineHandler`: For JSON-inline storage (e.g., DateHandler)
- `BaseRefHandler`: For custom binary formats stored in `refs/` (e.g., DataFrames)
- `BaseInlineHandler`: Implements `serialize(obj) → dict` and `deserialize(dict) → obj`
- `BaseRefHandler`: Implements `serialize_to_bytes(obj) → bytes` and `deserialize_from_bytes(bytes) → obj`
- Currently implements `DateHandler` (BaseInlineHandler) for datetime.date objects
- Use `handlers.register_handler()` to add custom handlers
**Utils** (`src/dbengine/utils.py`)
- Type checking utilities: `is_primitive()`, `is_dictionary()`, `is_list()`, etc.
- Class introspection: `get_full_qualified_name()`, `importable_name()`, `get_class()`
- Stream digest computation with SHA-256
- Digest computation: `compute_digest_from_stream()`, `compute_digest_from_bytes()`
**RefHelper and PickleRefHelper** (`src/dbengine/dbengine.py`)
- `RefHelper`: Base class for content-addressable storage in `refs/` directory
- `save_ref_from_bytes(data: bytes) → digest`: Store raw bytes
- `load_ref_to_bytes(digest) → bytes`: Load raw bytes
- Used by `BaseRefHandler` for custom binary formats
- `PickleRefHelper(RefHelper)`: Adds pickle serialization layer
- `save_ref(obj) → digest`: Pickle and store object
- `load_ref(digest) → obj`: Load and unpickle object
- Used by `use_refs()` system and `Serializer`
### Storage Architecture
@@ -107,11 +124,15 @@ pip install -e .[dev]
│ └── objects/
│ └── {digest_prefix}/ # First 24 chars of digest
│ └── {full_digest} # JSON snapshot with metadata
└── refs/ # Shared pickled references
└── refs/ # Shared binary references (cross-tenant)
└── {digest_prefix}/
└── {full_digest}
└── {full_digest} # Pickle or custom binary format
```
**Note**: The `refs/` directory stores binary data in content-addressable format:
- Pickled objects (via `use_refs()` or `PickleRefHelper`)
- Custom binary formats (via `BaseRefHandler`, e.g., numpy arrays)
### Metadata System
Each snapshot includes automatic metadata fields:
@@ -163,13 +184,17 @@ Objects can opt into pickle-based storage for specific fields:
### Custom Type Handlers
To serialize custom types that aren't handled by default serialization:
MyDbEngine supports two types of custom handlers for serializing types:
**1. Create a handler class:**
#### 1. BaseInlineHandler - For JSON Storage
Use when data should be stored directly in the JSON snapshot (human-readable, smaller datasets).
**Example: Custom date handler**
```python
from dbengine.handlers import BaseHandler, TAG_SPECIAL
from dbengine.handlers import BaseInlineHandler, handlers
class MyCustomHandler(BaseHandler):
class MyCustomHandler(BaseInlineHandler):
def is_eligible_for(self, obj):
return isinstance(obj, MyCustomType)
@@ -178,26 +203,85 @@ class MyCustomHandler(BaseHandler):
def serialize(self, obj) -> dict:
return {
TAG_SPECIAL: self.tag(),
"__special__": self.tag(),
"data": obj.to_dict()
}
def deserialize(self, data: dict) -> object:
return MyCustomType.from_dict(data["data"])
```
**2. Register the handler:**
```python
from dbengine.handlers import handlers
# Register the handler
handlers.register_handler(MyCustomHandler())
```
**When to use handlers:**
- Complex types that need custom serialization logic
- Types that can't be pickled reliably
- Types requiring validation during deserialization
- External library types (datetime.date example in handlers.py)
**When to use BaseInlineHandler:**
- Small data structures that fit well in JSON
- Types requiring human-readable storage
- Types needing validation during deserialization
- Simple external library types (e.g., datetime.date)
#### 2. BaseRefHandler - For Binary Storage
Use when data should be stored in optimized binary format in `refs/` directory (large datasets, better compression).
**Example: pandas DataFrame handler**
```python
from dbengine.handlers import BaseRefHandler, handlers
import pandas as pd
import json
class DataFrameHandler(BaseRefHandler):
def is_eligible_for(self, obj):
return isinstance(obj, pd.DataFrame)
def tag(self):
return "DataFrame"
def serialize_to_bytes(self, df) -> bytes:
"""Convert DataFrame to compact binary format"""
import numpy as np
# Store metadata + numpy bytes
metadata = {
"columns": df.columns.tolist(),
"index": df.index.tolist(),
"dtype": str(df.values.dtype)
}
metadata_bytes = json.dumps(metadata).encode('utf-8')
metadata_length = len(metadata_bytes).to_bytes(4, 'big')
numpy_bytes = df.to_numpy().tobytes()
return metadata_length + metadata_bytes + numpy_bytes
def deserialize_from_bytes(self, data: bytes) -> object:
"""Reconstruct DataFrame from binary format"""
import numpy as np
# Read metadata
metadata_length = int.from_bytes(data[:4], 'big')
metadata = json.loads(data[4:4+metadata_length].decode('utf-8'))
numpy_bytes = data[4+metadata_length:]
# Reconstruct array and DataFrame
array = np.frombuffer(numpy_bytes, dtype=metadata['dtype'])
array = array.reshape(len(metadata['index']), len(metadata['columns']))
return pd.DataFrame(array, columns=metadata['columns'], index=metadata['index'])
# Register the handler
handlers.register_handler(DataFrameHandler())
```
**When to use BaseRefHandler:**
- Large binary data (DataFrames, numpy arrays, images)
- Data that benefits from custom compression (e.g., numpy's compact format)
- Types that lose information in JSON conversion
- Shared data across snapshots (automatic deduplication via SHA-256)
**Key differences:**
- `BaseInlineHandler`: Data stored in JSON snapshot → `{"__special__": "Tag", "data": {...}}`
- `BaseRefHandler`: Data stored in `refs/``{"__special__": "Tag", "__digest__": "abc123..."}`
- BaseRefHandler provides automatic deduplication and smaller JSON snapshots
### Using References (use_refs)
@@ -215,16 +299,23 @@ class MyDataObject:
return {"large_dataframe"}
```
**When to use refs:**
- Large data structures (DataFrames, numpy arrays)
- Objects that lose information in JSON conversion
- Data shared across multiple snapshots/tenants (deduplication benefit)
**When to use use_refs():**
- Quick solution for large nested objects without writing custom handler
- Works with any picklable object
- Per-object control (some fields in JSON, others pickled)
**use_refs() vs BaseRefHandler:**
- `use_refs()`: Uses pickle (via `PickleRefHelper`), simple but less optimized
- `BaseRefHandler`: Custom binary format (e.g., numpy), optimized but requires handler code
- Both store in `refs/` and get automatic SHA-256 deduplication
- `use_refs()` generates `{"__ref__": "digest"}` tags
- `BaseRefHandler` generates `{"__special__": "Tag", "__digest__": "digest"}` tags
**Trade-offs:**
- ✅ Smaller JSON snapshots
- ✅ Cross-tenant deduplication
- ❌ Less human-readable (binary pickle format)
- ❌ Python version compatibility concerns with pickle
- ❌ Less human-readable (binary format)
- ❌ Python version compatibility concerns with pickle (use_refs only)
## Testing Notes

389
README.md
View File

@@ -1,187 +1,352 @@
# DbEngine
# MyDbEngine
A lightweight, git-inspired database engine for Python that maintains complete history of all modifications.
A lightweight, git-inspired versioned database engine for Python with content-addressable storage and complete history tracking.
## Overview
## What is MyDbEngine?
DbEngine is a personal implementation of a versioned database engine that stores snapshots of data changes over time. Each modification creates a new immutable snapshot, allowing you to track the complete history of your data.
MyDbEngine is a file-based versioned database that treats data like Git treats code. Every modification creates an immutable snapshot with a SHA-256 digest, enabling complete history tracking, deduplication, and multi-tenant isolation.
## Key Features
**Key Features:**
- **Immutable Snapshots**: Every change creates a new version, never modifying existing data
- **Content-Addressable Storage**: Identical objects stored only once, referenced by SHA-256 digest
- **Multi-Tenant**: Isolated storage per tenant with shared deduplication in `refs/`
- **Extensible Serialization**: Custom handlers for optimized storage (JSON, binary, pickle)
- **Thread-Safe**: Built-in RLock for concurrent access
- **Zero Dependencies**: Pure Python with no runtime dependencies (pytest only for dev)
- **Version Control**: Every change creates a new snapshot with a unique digest (SHA-256 hash)
- **History Tracking**: Access any previous version of your data
- **Multi-tenant Support**: Isolated data storage per tenant
- **Thread-safe**: Built-in locking mechanism for concurrent access
- **Git-inspired Architecture**: Objects are stored in a content-addressable format
- **Efficient Storage**: Identical objects are stored only once
**When to Use:**
- Version tracking for configuration, user data, or application state
- Multi-tenant applications requiring isolated data with shared deduplication
- Scenarios where you need both human-readable JSON and optimized binary storage
## Architecture
**When NOT to Use:**
- High-frequency writes (creates a snapshot per modification)
- Relational queries (no SQL, no joins)
- Large-scale production databases (file-based, not optimized for millions of records)
The engine uses a file-based storage system with the following structure:
## Installation
```bash
pip install mydbengine
```
## Quick Start
```python
from dbengine.dbengine import DbEngine
# Initialize engine and tenant
engine = DbEngine(root=".mytools_db")
engine.init("tenant_1")
# Save and load data
engine.save("tenant_1", "user_1", "config", {"theme": "dark", "lang": "en"})
data = engine.load("tenant_1", "config")
print(data) # {"theme": "dark", "lang": "en"}
```
## Core Concepts
### Immutable Snapshots
Each `save()` or `put()` operation creates a new snapshot with automatic metadata:
- `__parent__`: List containing digest of previous version (or `[None]` for first)
- `__user_id__`: User ID who created the snapshot
- `__date__`: ISO timestamp `YYYYMMDD HH:MM:SS %z`
### Storage Architecture
```
.mytools_db/
├── {tenant_id}/
│ ├── head # Points to latest version of each entry
│ ├── head # JSON: {"entry_name": "latest_digest"}
│ └── objects/
│ └── {digest_prefix}/
│ └── {full_digest} # Actual object data
└── refs/ # Shared references
│ └── {digest_prefix}/ # First 24 chars of digest
│ └── {full_digest} # JSON snapshot with metadata
└── refs/ # Shared binary references (cross-tenant)
└── {digest_prefix}/
└── {full_digest} # Pickle or custom binary format
```
## Installation
### Two Usage Patterns
**Pattern 1: Snapshot-based** - Store complete object states
```python
from db_engine import DbEngine
# Initialize with default root
db = DbEngine()
# Or specify custom root directory
db = DbEngine(root="/path/to/database")
engine.save("tenant_1", "user_1", "config", {"theme": "dark", "lang": "en"})
config = engine.load("tenant_1", "config")
```
**Pattern 2: Record-based** - Incremental updates to collections
```python
engine.put("tenant_1", "user_1", "users", "john", {"name": "John", "age": 30})
engine.put("tenant_1", "user_1", "users", "jane", {"name": "Jane", "age": 25})
all_users = engine.get("tenant_1", "users") # Returns list of all users
```
**Important:** Do not mix patterns for the same entry - they use different data structures.
## Basic Usage
### Initialize Database for a Tenant
### Save and Load Complete Snapshots
```python
tenant_id = "my_company"
db.init(tenant_id)
```
# Save any Python object
data = {"users": ["alice", "bob"], "count": 2}
digest = engine.save("tenant_1", "user_1", "session", data)
### Save Data
```python
# Save a complete object
user_id = "john_doe"
entry = "users"
data = {"name": "John", "age": 30}
digest = db.save(tenant_id, user_id, entry, data)
```
### Load Data
```python
# Load latest version
data = db.load(tenant_id, entry="users")
session = engine.load("tenant_1", "session")
# Load specific version by digest
data = db.load(tenant_id, entry="users", digest="abc123...")
old_session = engine.load("tenant_1", "session", digest=digest)
```
### Work with Individual Records
### Incremental Record Updates
```python
# Add or update a single record
db.put(tenant_id, user_id, entry="users", key="john", value={"name": "John", "age": 30})
# Add/update single record
engine.put("tenant_1", "user_1", "users", "alice", {"name": "Alice", "role": "admin"})
# Add or update multiple records at once
items = {
"john": {"name": "John", "age": 30},
"jane": {"name": "Jane", "age": 25}
# Add/update multiple records
users = {
"bob": {"name": "Bob", "role": "user"},
"charlie": {"name": "Charlie", "role": "user"}
}
db.put_many(tenant_id, user_id, entry="users", items=items)
engine.put_many("tenant_1", "user_1", "users", users)
# Get a specific record
user = db.get(tenant_id, entry="users", key="john")
# Get specific record
alice = engine.get("tenant_1", "users", key="alice")
# Get all records
all_users = db.get(tenant_id, entry="users")
# Get all records as list
all_users = engine.get("tenant_1", "users")
```
### Check Existence
### History Navigation
```python
if db.exists(tenant_id, entry="users"):
# Get history chain (list of digests, newest first)
history = engine.history("tenant_1", "config", max_items=10)
# Load previous version
previous = engine.load("tenant_1", "config", digest=history[1])
# Check if entry exists
if engine.exists("tenant_1", "config"):
print("Entry exists")
```
### Access History
## Custom Serialization
MyDbEngine supports three approaches for custom serialization:
### 1. BaseInlineHandler - JSON Storage
For small data types that should be human-readable in snapshots:
```python
# Get history of an entry (returns list of digests)
history = db.history(tenant_id, entry="users", max_items=10)
from dbengine.handlers import BaseInlineHandler, handlers
import datetime
# Load a previous version
old_data = db.load(tenant_id, entry="users", digest=history[1])
class DateHandler(BaseInlineHandler):
def is_eligible_for(self, obj):
return isinstance(obj, datetime.date)
def tag(self):
return "Date"
def serialize(self, obj):
return {
"__special__": self.tag(),
"year": obj.year,
"month": obj.month,
"day": obj.day
}
def deserialize(self, data):
return datetime.date(year=data["year"], month=data["month"], day=data["day"])
handlers.register_handler(DateHandler())
```
## Metadata
### 2. BaseRefHandler - Optimized Binary Storage
Each snapshot automatically includes metadata:
For large data structures that benefit from custom binary formats:
- `__parent__`: Digest of the previous version
- `__user_id__`: User ID who made the change
- `__date__`: Timestamp of the change (format: `YYYYMMDD HH:MM:SS`)
```python
from dbengine.handlers import BaseRefHandler, handlers
import pandas as pd
import numpy as np
import json
class DataFrameHandler(BaseRefHandler):
def is_eligible_for(self, obj):
return isinstance(obj, pd.DataFrame)
def tag(self):
return "DataFrame"
def serialize_to_bytes(self, df):
"""Convert DataFrame to compact binary format"""
# Store metadata + numpy bytes
metadata = {
"columns": df.columns.tolist(),
"index": df.index.tolist(),
"dtype": str(df.values.dtype)
}
metadata_bytes = json.dumps(metadata).encode('utf-8')
metadata_length = len(metadata_bytes).to_bytes(4, 'big')
numpy_bytes = df.to_numpy().tobytes()
return metadata_length + metadata_bytes + numpy_bytes
def deserialize_from_bytes(self, data):
"""Reconstruct DataFrame from binary format"""
# Read metadata
metadata_length = int.from_bytes(data[:4], 'big')
metadata = json.loads(data[4:4+metadata_length].decode('utf-8'))
numpy_bytes = data[4+metadata_length:]
# Reconstruct array and DataFrame
array = np.frombuffer(numpy_bytes, dtype=metadata['dtype'])
array = array.reshape(len(metadata['index']), len(metadata['columns']))
return pd.DataFrame(array, columns=metadata['columns'], index=metadata['index'])
handlers.register_handler(DataFrameHandler())
# Now DataFrames are automatically stored in optimized binary format
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
engine.save("tenant_1", "user_1", "data", df)
```
**Result:**
- JSON snapshot contains: `{"__special__": "DataFrame", "__digest__": "abc123..."}`
- Binary data stored in `refs/abc123...` (more compact than pickle)
- Automatic deduplication across tenants
### 3. use_refs() - Selective Pickle Storage
For objects with specific fields that should be pickled:
```python
class MyDataObject:
def __init__(self, metadata, large_array):
self.metadata = metadata
self.large_array = large_array # Large numpy array or similar
@staticmethod
def use_refs():
"""Fields to pickle instead of JSON-serialize"""
return {"large_array"}
# metadata goes to JSON, large_array goes to refs/ (pickled)
obj = MyDataObject({"name": "dataset_1"}, np.zeros((1000, 1000)))
engine.save("tenant_1", "user_1", "my_data", obj)
```
**Comparison:**
| Approach | Storage | Format | Use Case |
|----------|---------|--------|----------|
| `BaseInlineHandler` | JSON snapshot | Custom dict | Small data, human-readable |
| `BaseRefHandler` | `refs/` directory | Custom binary | Large data, optimized format |
| `use_refs()` | `refs/` directory | Pickle | Quick solution, no handler needed |
## API Reference
### Core Methods
### Initialization
#### `init(tenant_id: str)`
Initialize database structure for a tenant.
| Method | Description |
|--------|-------------|
| `DbEngine(root: str = ".mytools_db")` | Initialize engine with storage root |
| `init(tenant_id: str)` | Create tenant directory structure |
| `is_initialized(tenant_id: str) -> bool` | Check if tenant is initialized |
#### `save(tenant_id: str, user_id: str, entry: str, obj: object) -> str`
Save a complete snapshot. Returns the digest of the saved object.
### Data Operations
#### `load(tenant_id: str, entry: str, digest: str = None) -> object`
Load a snapshot. If digest is None, loads the latest version.
| Method | Description |
|--------|-------------|
| `save(tenant_id, user_id, entry, obj) -> str` | Save complete snapshot, returns digest |
| `load(tenant_id, entry, digest=None) -> object` | Load snapshot (latest if digest=None) |
| `put(tenant_id, user_id, entry, key, value) -> bool` | Add/update single record |
| `put_many(tenant_id, user_id, entry, items) -> bool` | Add/update multiple records |
| `get(tenant_id, entry, key=None, digest=None) -> object` | Get record(s) |
| `exists(tenant_id, entry) -> bool` | Check if entry exists |
#### `put(tenant_id: str, user_id: str, entry: str, key: str, value: object) -> bool`
Add or update a single record. Returns True if a new snapshot was created.
### History
#### `put_many(tenant_id: str, user_id: str, entry: str, items: list | dict) -> bool`
Add or update multiple records. Returns True if a new snapshot was created.
| Method | Description |
|--------|-------------|
| `history(tenant_id, entry, digest=None, max_items=1000) -> list` | Get history chain of digests |
| `get_digest(tenant_id, entry) -> str` | Get current digest for entry |
#### `get(tenant_id: str, entry: str, key: str = None, digest: str = None) -> object`
Retrieve record(s). If key is None, returns all records as a list.
## Performance & Limitations
#### `exists(tenant_id: str, entry: str) -> bool`
Check if an entry exists.
**Strengths:**
- ✅ Deduplication: Identical objects stored once (SHA-256 content addressing)
- ✅ History: Complete audit trail with zero overhead for unchanged data
- ✅ Custom formats: Binary handlers optimize storage (e.g., numpy vs pickle)
#### `history(tenant_id: str, entry: str, digest: str = None, max_items: int = 1000) -> list`
Get the history chain of digests for an entry.
**Limitations:**
-**File-based**: Not suitable for high-throughput applications
-**No indexing**: No SQL queries, no complex filtering
-**Snapshot overhead**: Each change creates a new snapshot
-**History chains**: Long histories require multiple file reads
#### `get_digest(tenant_id: str, entry: str) -> str`
Get the current digest for an entry.
**Performance Tips:**
- Use `put_many()` instead of multiple `put()` calls (creates one snapshot)
- Use `BaseRefHandler` for large binary data instead of pickle
- Limit history traversal with `max_items` parameter
- Consider archiving old snapshots for long-running entries
## Usage Patterns
## Development
### Pattern 1: Snapshot-based (using `save()`)
Best for saving complete states of complex objects.
### Running Tests
```python
config = {"theme": "dark", "language": "en"}
db.save(tenant_id, user_id, "config", config)
```bash
# All tests
pytest
# Specific test file
pytest tests/test_dbengine.py
pytest tests/test_serializer.py
# Single test
pytest tests/test_dbengine.py::test_i_can_save_and_load
```
### Pattern 2: Record-based (using `put()` / `put_many()`)
Best for managing collections of items incrementally.
### Building Package
```python
db.put(tenant_id, user_id, "settings", "theme", "dark")
db.put(tenant_id, user_id, "settings", "language", "en")
```bash
# Build distribution
python -m build
# Clean build artifacts
make clean
```
**Note**: Don't mix these patterns for the same entry, as they use different data structures.
### Project Structure
## Thread Safety
```
src/dbengine/
├── dbengine.py # Main DbEngine and RefHelper classes
├── serializer.py # JSON serialization with handlers
├── handlers.py # BaseHandler, BaseInlineHandler, BaseRefHandler
└── utils.py # Type checking and digest computation
DbEngine uses `RLock` internally, making it safe for multi-threaded applications.
tests/
├── test_dbengine.py # DbEngine functionality tests
└── test_serializer.py # Serialization and handler tests
```
## Exceptions
## Contributing
- `DbException`: Raised for database-related errors (missing entries, invalid parameters, etc.)
## Performance Considerations
- Objects are stored as JSON files
- Identical objects (same SHA-256) are stored only once
- History chains can become long; use `max_items` parameter to limit traversal
- File system performance impacts overall speed
This is a personal implementation. For bug reports or feature requests, please contact the author.
## License
This is a personal implementation. Please check with the author for licensing terms.
See LICENSE file for details.
## Version History
* 0.1.0 - Initial release
* 0.2.0 - Added custom reference handlers

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "mydbengine"
version = "0.1.0"
version = "0.2.0"
description = "A lightweight, git-inspired database engine that maintains complete history of all modifications"
readme = "README.md"
requires-python = ">=3.8"

View File

@@ -8,7 +8,7 @@ import pickle
from threading import RLock
from dbengine.serializer import Serializer
from dbengine.utils import get_stream_digest
from dbengine.utils import compute_digest_from_bytes
TYPE_KEY = "__type__"
TAG_PARENT = "__parent__"
@@ -24,41 +24,75 @@ class DbException(Exception):
class RefHelper:
"""Base class for reference storage using content-addressable storage with SHA-256."""
def __init__(self, get_ref_path):
self.get_ref_path = get_ref_path
def save_ref(self, obj):
def save_ref_from_bytes(self, data: bytes):
"""
Save raw bytes to refs/ directory using content-addressable storage.
Used by BaseRefHandler for custom serialization formats.
:param obj:
:return:
:param data: Raw bytes to save
:return: SHA-256 digest of the data
"""
buffer = io.BytesIO()
pickler = pickle.Pickler(buffer)
pickler.dump(obj)
digest = get_stream_digest(buffer)
digest = compute_digest_from_bytes(data)
target_path = self.get_ref_path(digest)
if not os.path.exists(os.path.dirname(target_path)):
os.makedirs(os.path.dirname(target_path))
buffer.seek(0)
with open(self.get_ref_path(digest), "wb") as file:
while chunk := buffer.read(BUFFER_SIZE):
file.write(chunk)
with open(target_path, "wb") as file:
file.write(data)
logger.debug(f"Saved raw bytes with digest {digest}")
return digest
def load_ref_to_bytes(self, digest):
"""
Load raw bytes from refs/ directory.
Used by BaseRefHandler for custom deserialization formats.
:param digest: SHA-256 digest of the data
:return: Raw bytes
"""
with open(self.get_ref_path(digest), 'rb') as file:
return file.read()
class PickleRefHelper(RefHelper):
"""RefHelper that adds pickle serialization layer on top of byte storage."""
def save_ref(self, obj):
"""
Pickle an object and save it to refs/ directory.
Used by the use_refs() system for objects.
:param obj: Object to pickle and save
:return: SHA-256 digest of the pickled data
"""
buffer = io.BytesIO()
pickler = pickle.Pickler(buffer)
pickler.dump(obj)
buffer.seek(0)
data = buffer.read()
digest = self.save_ref_from_bytes(data)
logger.debug(f"Saved object type '{type(obj).__name__}' with digest {digest}")
return digest
def load_ref(self, digest):
"""
Load pickled object from refs/ directory.
Used by the use_refs() system for objects.
:param digest:
:return:
:param digest: SHA-256 digest of the pickled data
:return: Unpickled object
"""
with open(self.get_ref_path(digest), 'rb') as file:
return pickle.load(file)
data = self.load_ref_to_bytes(digest)
return pickle.loads(data)
class DbEngine:
@@ -316,13 +350,13 @@ class DbEngine:
:return:
"""
with self.lock:
serializer = Serializer(RefHelper(self._get_ref_path))
serializer = Serializer(PickleRefHelper(self._get_ref_path))
use_refs = getattr(obj, "use_refs")() if hasattr(obj, "use_refs") else None
return serializer.serialize(obj, use_refs)
def _deserialize(self, as_dict):
with self.lock:
serializer = Serializer(RefHelper(self._get_ref_path))
serializer = Serializer(PickleRefHelper(self._get_ref_path))
return serializer.deserialize(as_dict)
def _update_head(self, tenant_id, entry, digest):

View File

@@ -8,20 +8,66 @@ TAG_SPECIAL = "__special__"
class BaseHandler:
"""Base class for all handlers. Subclasses must implement is_eligible_for() and tag()."""
def is_eligible_for(self, obj):
pass
"""Check if this handler can process the given object."""
raise NotImplementedError
def tag(self):
pass
"""Return a unique tag identifying this handler type."""
raise NotImplementedError
class BaseInlineHandler(BaseHandler):
"""Handler that serializes data directly into the JSON snapshot."""
def serialize(self, obj) -> dict:
pass
"""Serialize object to dict for inline storage in JSON snapshot."""
raise NotImplementedError
def deserialize(self, data: dict) -> object:
pass
"""Deserialize object from dict stored in JSON snapshot."""
raise NotImplementedError
class DateHandler(BaseHandler):
class BaseRefHandler(BaseHandler):
"""Handler that serializes data to refs/ directory using content-addressable storage."""
def serialize_to_bytes(self, obj) -> bytes:
"""Serialize object to bytes for storage in refs/. Must be implemented by subclass."""
raise NotImplementedError
def deserialize_from_bytes(self, data: bytes) -> object:
"""Deserialize object from bytes loaded from refs/. Must be implemented by subclass."""
raise NotImplementedError
def serialize(self, obj, ref_helper) -> dict:
"""
Default implementation: converts object to bytes, saves to refs/, returns dict with tag and digest.
Can be overridden if custom behavior is needed.
"""
stream = self.serialize_to_bytes(obj)
digest = ref_helper.save_ref_from_bytes(stream)
from dbengine.serializer import TAG_DIGEST
TAG_SPECIAL = "__special__"
return {
TAG_SPECIAL: self.tag(),
TAG_DIGEST: digest
}
def deserialize(self, data: dict, ref_helper) -> object:
"""
Default implementation: loads bytes from refs/ using digest, deserializes from bytes.
Can be overridden if custom behavior is needed.
"""
from dbengine.serializer import TAG_DIGEST
digest = data[TAG_DIGEST]
stream = ref_helper.load_ref_to_bytes(digest)
return self.deserialize_from_bytes(stream)
class DateHandler(BaseInlineHandler):
def is_eligible_for(self, obj):
return isinstance(obj, datetime.date)

View File

@@ -1,6 +1,6 @@
import copy
from dbengine.handlers import handlers
from dbengine.handlers import handlers, BaseRefHandler
from dbengine.utils import *
TAG_ID = "__id__"
@@ -8,6 +8,7 @@ TAG_OBJECT = "__object__"
TAG_TUPLE = "__tuple__"
TAG_SET = "__set__"
TAG_REF = "__ref__"
TAG_DIGEST = "__digest__"
TAG_ENUM = "__enum__"
@@ -56,7 +57,10 @@ class Serializer:
return self._deserialize_obj_instance(obj)
if (handler := handlers.get_handler(obj)) is not None:
return handler.deserialize(obj)
if has_tag(obj, TAG_DIGEST):
return handler.deserialize(obj, self.ref_helper)
else:
return handler.deserialize(obj)
if is_list(obj):
return [self.deserialize(v) for v in obj]
@@ -115,7 +119,10 @@ class Serializer:
use_refs.update(f"{path}.{sub_path}" for sub_path in current_obj_use_refs)
if (handler := handlers.get_handler(obj)) is not None:
return handler.serialize(obj)
if isinstance(handler, BaseRefHandler):
return handler.serialize(obj, self.ref_helper)
else:
return handler.serialize(obj)
# flatten
data = {}

View File

@@ -7,7 +7,7 @@ from enum import Enum
PRIMITIVES = (str, bool, type(None), int, float)
def get_stream_digest(stream):
def compute_digest_from_stream(stream):
"""
Compute a SHA256 from a stream
:param stream:
@@ -23,6 +23,18 @@ def get_stream_digest(stream):
return sha256_hash.hexdigest()
def compute_digest_from_bytes(data: bytes):
"""
Compute a SHA256 digest from raw bytes.
:param data: Raw bytes to hash
:return: SHA-256 hexadecimal digest
"""
sha256_hash = hashlib.sha256()
sha256_hash.update(data)
return sha256_hash.hexdigest()
def has_tag(obj, tag):
"""

View File

@@ -6,7 +6,8 @@ from enum import Enum
import pytest
from dbengine.serializer import TAG_TUPLE, TAG_SET, Serializer, TAG_OBJECT, TAG_ID, TAG_REF
from dbengine.serializer import TAG_TUPLE, TAG_SET, Serializer, TAG_OBJECT, TAG_ID, TAG_REF, TAG_DIGEST
from dbengine.handlers import BaseRefHandler, handlers
class Obj:
@@ -72,6 +73,39 @@ class DummyComplexClass:
prop3: ObjEnum
class BytesData:
"""Simple class to hold binary data for testing BaseRefHandler"""
def __init__(self, data: bytes):
self.data = data
def __eq__(self, other):
if not isinstance(other, BytesData):
return False
return self.data == other.data
def __hash__(self):
return hash(self.data)
class BytesDataHandler(BaseRefHandler):
"""Test handler that stores BytesData using refs/ directory"""
def is_eligible_for(self, obj):
return isinstance(obj, BytesData)
def tag(self):
return "BytesData"
def serialize_to_bytes(self, obj) -> bytes:
"""Just return the raw bytes from the object"""
return obj.data
def deserialize_from_bytes(self, data: bytes) -> object:
"""Reconstruct BytesData from raw bytes"""
return BytesData(data)
class DummyRefHelper:
"""
When something is too complicated to serialize, we just default to pickle
@@ -94,6 +128,19 @@ class DummyRefHelper:
def load_ref(self, digest):
return pickle.loads(self.refs[digest])
def save_ref_from_bytes(self, data: bytes):
"""Save raw bytes for BaseRefHandler"""
sha256_hash = hashlib.sha256()
sha256_hash.update(data)
digest = sha256_hash.hexdigest()
self.refs[digest] = data
return digest
def load_ref_to_bytes(self, digest):
"""Load raw bytes for BaseRefHandler"""
return self.refs[digest]
@pytest.mark.parametrize("obj, expected", [
(1, 1),
@@ -266,3 +313,75 @@ def test_i_can_serialize_date():
decoded = serializer.deserialize(flatten)
assert decoded == obj
def test_i_can_serialize_with_base_ref_handler():
"""Test that BaseRefHandler correctly stores data in refs/ with TAG_DIGEST"""
# Register the test handler
test_handler = BytesDataHandler()
handlers.register_handler(test_handler)
try:
# Create test data
test_bytes = b"Hello, this is binary data for testing!"
obj = BytesData(test_bytes)
# Serialize with ref_helper
ref_helper = DummyRefHelper()
serializer = Serializer(ref_helper)
flatten = serializer.serialize(obj)
# Verify structure: should have TAG_SPECIAL and TAG_DIGEST
assert "__special__" in flatten
assert flatten["__special__"] == "BytesData"
assert TAG_DIGEST in flatten
# Verify data was stored in refs
digest = flatten[TAG_DIGEST]
assert digest in ref_helper.refs
assert ref_helper.refs[digest] == test_bytes
# Deserialize and verify
decoded = serializer.deserialize(flatten)
assert decoded == obj
assert decoded.data == test_bytes
finally:
# Clean up: remove the test handler
handlers.handlers.remove(test_handler)
def test_i_can_serialize_object_containing_base_ref_handler_data():
"""Test that objects containing BaseRefHandler-managed data work correctly"""
# Register the test handler
test_handler = BytesDataHandler()
handlers.register_handler(test_handler)
try:
# Create an object that contains BytesData
bytes_obj = BytesData(b"Some binary content")
wrapper = Obj(1, "test", bytes_obj)
# Serialize
ref_helper = DummyRefHelper()
serializer = Serializer(ref_helper)
flatten = serializer.serialize(wrapper)
# Verify structure
assert flatten[TAG_OBJECT] == 'tests.test_serializer.Obj'
assert flatten['a'] == 1
assert flatten['b'] == "test"
assert flatten['c']["__special__"] == "BytesData"
assert TAG_DIGEST in flatten['c']
# Deserialize and verify
decoded = serializer.deserialize(flatten)
assert decoded.a == wrapper.a
assert decoded.b == wrapper.b
assert decoded.c == wrapper.c
finally:
# Clean up
handlers.handlers.remove(test_handler)