Refactoring DbEngine
This commit is contained in:
@@ -4,178 +4,234 @@ import shutil
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from core.dbengine import DbEngine, TAG_PARENT
|
||||
from core.settings_objects import BudgetTrackerSettings, BudgetTrackerFile, BudgetTrackerFiles
|
||||
from core.dbengine import DbEngine, DbException, TAG_PARENT, TAG_USER, TAG_DATE
|
||||
|
||||
DB_ENGINE_ROOT = "TestDBEngineRoot"
|
||||
FAKE_USER_ID = "FakeUserId"
|
||||
FAKE_USER_EMAIL = "fake_user@me.com"
|
||||
|
||||
|
||||
class DummyObj:
|
||||
def __init__(self, a, b, c):
|
||||
self.a = a
|
||||
self.b = b
|
||||
self.c = c
|
||||
|
||||
def __eq__(self, other):
|
||||
if id(self) == id(other):
|
||||
return True
|
||||
|
||||
if not isinstance(other, DummyObj):
|
||||
return False
|
||||
|
||||
return self.a == other.a and self.b == other.b and self.c == other.c
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.a, self.b, self.c))
|
||||
|
||||
|
||||
class DummyObjWithRef(DummyObj):
|
||||
@staticmethod
|
||||
def use_refs() -> set:
|
||||
return {"c"}
|
||||
|
||||
|
||||
class DummyObjWithKey(DummyObj):
|
||||
def get_key(self) -> set:
|
||||
return self.a
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def engine():
|
||||
if os.path.exists(DB_ENGINE_ROOT):
|
||||
shutil.rmtree(DB_ENGINE_ROOT)
|
||||
|
||||
engine = DbEngine(DB_ENGINE_ROOT)
|
||||
engine.init()
|
||||
return engine
|
||||
if os.path.exists(DB_ENGINE_ROOT):
|
||||
shutil.rmtree(DB_ENGINE_ROOT)
|
||||
|
||||
engine = DbEngine(DB_ENGINE_ROOT)
|
||||
engine.init(FAKE_USER_ID)
|
||||
|
||||
yield engine
|
||||
|
||||
shutil.rmtree(DB_ENGINE_ROOT)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def dummy_obj():
|
||||
return BudgetTrackerSettings(
|
||||
spread_sheet="spread_sheet",
|
||||
col_row_num="row_number",
|
||||
col_project="project",
|
||||
col_owner="owner",
|
||||
col_capex="capex",
|
||||
col_details="details",
|
||||
col_supplier="supplier",
|
||||
col_budget_amt="budget",
|
||||
col_actual_amt="actual",
|
||||
col_forecast5_7_amt="forecast5_7",
|
||||
)
|
||||
return DummyObj(1, "a", False)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def dummy_obj2():
|
||||
return BudgetTrackerSettings(
|
||||
spread_sheet="spread_sheet2",
|
||||
col_row_num="row_number2",
|
||||
col_project="project2",
|
||||
col_owner="owner2",
|
||||
col_capex="capex2",
|
||||
col_details="details2",
|
||||
col_supplier="supplier2",
|
||||
col_budget_amt="budget2",
|
||||
col_actual_amt="actual2",
|
||||
col_forecast5_7_amt="forecast5_72",
|
||||
)
|
||||
return DummyObj(2, "b", True)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def dummy_obj_with_ref():
|
||||
data = {
|
||||
'Key1': ['A', 'B', 'C'],
|
||||
'Key2': ['X', 'Y', 'Z'],
|
||||
'Percentage': [0.1, 0.2, 0.15],
|
||||
}
|
||||
df = pd.DataFrame(data)
|
||||
return DummyObjWithRef(1, "a", df)
|
||||
|
||||
|
||||
def test_i_can_test_init():
|
||||
if os.path.exists(DB_ENGINE_ROOT):
|
||||
shutil.rmtree(DB_ENGINE_ROOT)
|
||||
|
||||
engine = DbEngine(DB_ENGINE_ROOT)
|
||||
assert not engine.is_initialized()
|
||||
|
||||
engine.init()
|
||||
assert engine.is_initialized()
|
||||
if os.path.exists(DB_ENGINE_ROOT):
|
||||
shutil.rmtree(DB_ENGINE_ROOT)
|
||||
|
||||
engine = DbEngine(DB_ENGINE_ROOT)
|
||||
assert not engine.is_initialized(FAKE_USER_ID)
|
||||
|
||||
engine.init(FAKE_USER_ID)
|
||||
assert engine.is_initialized(FAKE_USER_ID)
|
||||
|
||||
|
||||
def test_i_can_save_and_load(engine, dummy_obj):
|
||||
engine.save(FAKE_USER_ID, "MyEntry", dummy_obj)
|
||||
|
||||
res = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert isinstance(res, BudgetTrackerSettings)
|
||||
|
||||
assert res.spread_sheet == dummy_obj.spread_sheet
|
||||
assert res.col_row_num == dummy_obj.col_row_num
|
||||
assert res.col_project == dummy_obj.col_project
|
||||
assert res.col_owner == dummy_obj.col_owner
|
||||
assert res.col_capex == dummy_obj.col_capex
|
||||
assert res.col_details == dummy_obj.col_details
|
||||
assert res.col_supplier == dummy_obj.col_supplier
|
||||
assert res.col_budget_amt == dummy_obj.col_budget_amt
|
||||
assert res.col_actual_amt == dummy_obj.col_actual_amt
|
||||
assert res.col_forecast5_7_amt == dummy_obj.col_forecast5_7_amt
|
||||
digest = engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", dummy_obj)
|
||||
|
||||
res = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
|
||||
assert digest is not None
|
||||
assert isinstance(res, DummyObj)
|
||||
|
||||
assert res.a == dummy_obj.a
|
||||
assert res.b == dummy_obj.b
|
||||
assert res.c == dummy_obj.c
|
||||
|
||||
# check that the files are created
|
||||
assert os.path.exists(os.path.join(DB_ENGINE_ROOT, FAKE_USER_ID, "objects"))
|
||||
assert os.path.exists(os.path.join(DB_ENGINE_ROOT, FAKE_USER_ID, "head"))
|
||||
|
||||
|
||||
def test_i_can_save_using_ref(engine):
|
||||
data = {
|
||||
'Key1': ['A', 'B', 'C'],
|
||||
'Key2': ['X', 'Y', 'Z'],
|
||||
'Percentage': [0.1, 0.2, 0.15],
|
||||
}
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
obj = BudgetTrackerFile(2024, 8, data=df)
|
||||
|
||||
engine.save(FAKE_USER_ID, "MyEntry", obj)
|
||||
|
||||
res = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert isinstance(res, BudgetTrackerFile)
|
||||
|
||||
assert res.year == obj.year
|
||||
assert res.month == obj.month
|
||||
assert res.data.to_dict() == obj.data.to_dict()
|
||||
def test_save_invalid_inputs(engine):
|
||||
"""
|
||||
Test save with invalid inputs.
|
||||
"""
|
||||
with pytest.raises(DbException):
|
||||
engine.save(None, FAKE_USER_EMAIL, "InvalidEntry", DummyObj(1, 2, 3))
|
||||
|
||||
with pytest.raises(DbException):
|
||||
engine.save(FAKE_USER_ID, None, "InvalidEntry", DummyObj(1, 2, 3))
|
||||
|
||||
with pytest.raises(DbException):
|
||||
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "", DummyObj(1, 2, 3))
|
||||
|
||||
with pytest.raises(DbException):
|
||||
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, None, DummyObj(1, 2, 3))
|
||||
|
||||
|
||||
def test_i_can_use_ref_when_subclass(engine):
|
||||
data1 = {'Key': ['A'], 'Value': [0.1]}
|
||||
data2 = {'Key': ['B'], 'Value': [0.2]}
|
||||
file1 = BudgetTrackerFile(2024, 8, data=pd.DataFrame(data1))
|
||||
file2 = BudgetTrackerFile(2024, 9, data=pd.DataFrame(data2))
|
||||
files = BudgetTrackerFiles([file1, file2])
|
||||
def test_i_can_save_using_ref(engine, dummy_obj_with_ref):
|
||||
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", dummy_obj_with_ref)
|
||||
|
||||
res = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert isinstance(res, DummyObjWithRef)
|
||||
|
||||
assert res.a == dummy_obj_with_ref.a
|
||||
assert res.b == dummy_obj_with_ref.b
|
||||
assert res.c.to_dict() == dummy_obj_with_ref.c.to_dict()
|
||||
|
||||
# check that the files are created
|
||||
assert os.path.exists(os.path.join(DB_ENGINE_ROOT, FAKE_USER_ID, "objects"))
|
||||
assert os.path.exists(os.path.join(DB_ENGINE_ROOT, FAKE_USER_ID, "head"))
|
||||
assert os.path.exists(os.path.join(DB_ENGINE_ROOT, "refs"))
|
||||
|
||||
engine.save(FAKE_USER_ID, "MyEntry", files)
|
||||
|
||||
res = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert isinstance(res, BudgetTrackerFiles)
|
||||
assert len(res.files) == 2
|
||||
def test_refs_are_share_across_users(engine, dummy_obj_with_ref):
|
||||
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", dummy_obj_with_ref)
|
||||
engine.save("AnotherUserId", "AnotherUser", "AnotherMyEntry", dummy_obj_with_ref)
|
||||
|
||||
refs_path = os.path.join(DB_ENGINE_ROOT, "refs")
|
||||
assert len(os.listdir(refs_path)) == 1
|
||||
|
||||
|
||||
def test_metadata_are_correctly_set(engine, dummy_obj):
|
||||
digest = engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", dummy_obj)
|
||||
|
||||
as_dict = engine.debug_load(FAKE_USER_ID, digest)
|
||||
assert as_dict[TAG_PARENT] == [None]
|
||||
assert as_dict[TAG_USER] == FAKE_USER_EMAIL
|
||||
assert as_dict[TAG_DATE] is not None
|
||||
|
||||
|
||||
def test_i_can_track_parents(engine):
|
||||
digest = engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", DummyObj(1, "a", False))
|
||||
second_digest = engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", DummyObj(1, "a", True))
|
||||
|
||||
as_dict = engine.debug_load(FAKE_USER_ID, second_digest)
|
||||
|
||||
assert as_dict[TAG_PARENT] == [digest]
|
||||
|
||||
|
||||
def test_i_can_put_and_get_one_object(engine, dummy_obj):
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj)
|
||||
from_db = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
|
||||
assert from_db == dummy_obj
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj)
|
||||
from_db = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
|
||||
assert from_db == dummy_obj
|
||||
|
||||
|
||||
def test_i_can_put_and_get_multiple_objects(engine, dummy_obj, dummy_obj2):
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj)
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key2", dummy_obj2)
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
from_db2 = engine.get(FAKE_USER_ID, "MyEntry", "key2")
|
||||
|
||||
assert from_db1 == dummy_obj
|
||||
assert from_db2 == dummy_obj2
|
||||
|
||||
all_items = engine.get(FAKE_USER_ID, "MyEntry")
|
||||
assert all_items == [dummy_obj, dummy_obj2]
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj)
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key2", dummy_obj2)
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
from_db2 = engine.get(FAKE_USER_ID, "MyEntry", "key2")
|
||||
|
||||
assert from_db1 == dummy_obj
|
||||
assert from_db2 == dummy_obj2
|
||||
|
||||
as_dict = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
|
||||
assert "key1" in as_dict
|
||||
assert "key2" in as_dict
|
||||
assert as_dict["key1"] == dummy_obj
|
||||
assert as_dict["key2"] == dummy_obj2
|
||||
|
||||
|
||||
def test_i_automatically_replace_keys(engine, dummy_obj, dummy_obj2):
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj)
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj2)
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
assert from_db1 == dummy_obj2
|
||||
|
||||
all_items = engine.get(FAKE_USER_ID, "MyEntry")
|
||||
assert all_items == [dummy_obj2]
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj)
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj2)
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "key1")
|
||||
assert from_db1 == dummy_obj2
|
||||
|
||||
all_items = engine.get(FAKE_USER_ID, "MyEntry")
|
||||
assert all_items == [dummy_obj2]
|
||||
|
||||
|
||||
def test_i_do_not_save_twice_when_the_entries_are_the_same(engine, dummy_obj):
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj)
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None]
|
||||
|
||||
# Save the same entry again
|
||||
engine.put(FAKE_USER_ID, "MyEntry", "key1", dummy_obj)
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # still no other parent
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj)
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None]
|
||||
|
||||
# Save the same entry again
|
||||
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", dummy_obj)
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # still no other parent
|
||||
|
||||
|
||||
def test_i_can_put_many(engine, dummy_obj, dummy_obj2):
|
||||
engine.put_many(FAKE_USER_ID, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "spread_sheet")
|
||||
from_db2 = engine.get(FAKE_USER_ID, "MyEntry", "spread_sheet2")
|
||||
|
||||
assert from_db1 == dummy_obj
|
||||
assert from_db2 == dummy_obj2
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # only one save was made
|
||||
def test_i_can_put_many(engine):
|
||||
dummy_obj = DummyObjWithKey("1", "a", True)
|
||||
dummy_obj2 = DummyObjWithKey("2", "b", False)
|
||||
engine.put_many(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
|
||||
from_db1 = engine.get(FAKE_USER_ID, "MyEntry", "1")
|
||||
from_db2 = engine.get(FAKE_USER_ID, "MyEntry", "2")
|
||||
|
||||
assert from_db1 == dummy_obj
|
||||
assert from_db2 == dummy_obj2
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # only one save was made
|
||||
|
||||
|
||||
def test_i_can_do_not_save_in_not_necessary(engine, dummy_obj, dummy_obj2):
|
||||
engine.put_many(FAKE_USER_ID, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
engine.put_many(FAKE_USER_ID, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # Still None, nothing was saved
|
||||
def test_put_many_save_only_if_necessary(engine):
|
||||
dummy_obj = DummyObjWithKey("1", "a", True)
|
||||
dummy_obj2 = DummyObjWithKey("2", "b", False)
|
||||
|
||||
engine.put_many(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
engine.put_many(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", [dummy_obj, dummy_obj2])
|
||||
|
||||
entry_content = engine.load(FAKE_USER_ID, "MyEntry")
|
||||
assert entry_content[TAG_PARENT] == [None] # Still None, nothing was save
|
||||
|
||||
Reference in New Issue
Block a user