Refactored Database management to allow isolated integration tests

This commit is contained in:
2025-08-30 19:36:19 +02:00
parent db56363b1f
commit 17b08be077
10 changed files with 323 additions and 81 deletions

View File

@@ -0,0 +1,18 @@
"""Database manager for application initialization."""
import logging
from .user_database import Database, set_user_db
logger = logging.getLogger(__name__)
def initialize_database():
"""Initialize the application database."""
try:
# Create default database instance
db_instance = Database()
set_user_db(db_instance)
logger.info("Database initialized successfully")
return db_instance
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise

View File

@@ -3,15 +3,17 @@ import secrets
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from .user_database import user_db from .user_database import get_user_db
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UserDAO: class UserDAO:
"""Data Access Object for user management.""" """Data Access Object for user management."""
@staticmethod @staticmethod
def create_user(username: str, email: str, password: str | None = None, github_id: str | None = None) -> int: def create_user(username: str, email: str, password: str | None = None, github_id: str | None = None,
db_instance=None) -> int:
""" """
Create a new user with email/password or GitHub authentication. Create a new user with email/password or GitHub authentication.
@@ -20,10 +22,12 @@ class UserDAO:
email: The user's email email: The user's email
password: The user's password (optional) password: The user's password (optional)
github_id: GitHub user ID (optional) github_id: GitHub user ID (optional)
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
int: ID of the new user or 0 if creation failed int: ID of the new user or 0 if creation failed
""" """
user_db = db_instance or get_user_db()
try: try:
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -48,9 +52,9 @@ class UserDAO:
password_hash = user_db._hash_password(password, salt) password_hash = user_db._hash_password(password, salt)
cursor.execute(''' cursor.execute('''
INSERT INTO users (username, email, password_hash, salt, github_id, is_admin) INSERT INTO users (username, email, password_hash, salt, github_id, is_admin)
VALUES (?, ?, ?, ?, ?, 0) VALUES (?, ?, ?, ?, ?, 0)
''', (username, email, password_hash, salt, github_id)) ''', (username, email, password_hash, salt, github_id))
conn.commit() conn.commit()
return cursor.lastrowid return cursor.lastrowid
@@ -59,17 +63,19 @@ class UserDAO:
return 0 return 0
@staticmethod @staticmethod
def authenticate_email(email: str, password: str) -> dict[str, Any] | None: def authenticate_email(email: str, password: str, db_instance=None) -> dict[str, Any] | None:
""" """
Authenticate a user with email and password. Authenticate a user with email and password.
Args: Args:
email: The user's email email: The user's email
password: The user's password password: The user's password
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
Dict or None: User record if authentication succeeds, None otherwise Dict or None: User record if authentication succeeds, None otherwise
""" """
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -101,7 +107,8 @@ class UserDAO:
return dict(user) return dict(user)
@staticmethod @staticmethod
def find_or_create_github_user(github_id: str, username: str, email: str | None) -> dict[str, Any] | None: def find_or_create_github_user(github_id: str, username: str, email: str | None, db_instance=None) -> dict[
str, Any] | None:
""" """
Find existing GitHub user or create a new one. Find existing GitHub user or create a new one.
@@ -109,10 +116,12 @@ class UserDAO:
github_id: GitHub user ID github_id: GitHub user ID
username: The username from GitHub username: The username from GitHub
email: The email from GitHub (may be None) email: The email from GitHub (may be None)
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
Dict or None: User record if found or created, None on error Dict or None: User record if found or created, None on error
""" """
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -138,9 +147,9 @@ class UserDAO:
try: try:
cursor.execute(''' cursor.execute('''
INSERT INTO users (username, email, github_id, is_admin) INSERT INTO users (username, email, github_id, is_admin)
VALUES (?, ?, ?, 0) VALUES (?, ?, ?, 0)
''', (username, user_email, github_id)) ''', (username, user_email, github_id))
user_id = cursor.lastrowid user_id = cursor.lastrowid
conn.commit() conn.commit()
@@ -157,16 +166,18 @@ class UserDAO:
return None return None
@staticmethod @staticmethod
def get_user_by_id(user_id: int) -> dict[str, Any] | None: def get_user_by_id(user_id: int, db_instance=None) -> dict[str, Any] | None:
""" """
Get a user by ID. Get a user by ID.
Args: Args:
user_id: The user ID user_id: The user ID
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
Dict or None: User record if found, None otherwise Dict or None: User record if found, None otherwise
""" """
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -179,42 +190,51 @@ class UserDAO:
return dict(user) if user else None return dict(user) if user else None
@staticmethod @staticmethod
def get_all_users(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]: def get_all_users(limit: int = 100, offset: int = 0, db_instance=None) -> list[dict[str, Any]]:
""" """
Get all users with pagination. Get all users with pagination.
Args: Args:
limit: Maximum number of users to return limit: Maximum number of users to return
offset: Number of users to skip offset: Number of users to skip
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
List of user records List of user records
""" """
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(''' cursor.execute('''
SELECT id, username, email, is_admin, created_at, last_login, SELECT id,
(github_id IS NOT NULL) as is_github_user username,
FROM users email,
ORDER BY created_at DESC is_admin,
LIMIT ? OFFSET ? created_at,
''', (limit, offset)) last_login,
(github_id IS NOT NULL) as is_github_user
FROM users
ORDER BY created_at DESC LIMIT ?
OFFSET ?
''', (limit, offset))
return [dict(user) for user in cursor.fetchall()] return [dict(user) for user in cursor.fetchall()]
@staticmethod @staticmethod
def set_admin_status(user_id: int, is_admin: bool) -> bool: def set_admin_status(user_id: int, is_admin: bool, db_instance=None) -> bool:
""" """
Change a user's admin status. Change a user's admin status.
Args: Args:
user_id: The user ID user_id: The user ID
is_admin: True to make admin, False to remove admin status is_admin: True to make admin, False to remove admin status
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
user_db = db_instance or get_user_db()
try: try:
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
@@ -231,16 +251,18 @@ class UserDAO:
return False return False
@staticmethod @staticmethod
def delete_user(user_id: int) -> bool: def delete_user(user_id: int, db_instance=None) -> bool:
""" """
Delete a user and all their data. Delete a user and all their data.
Args: Args:
user_id: The user ID user_id: The user ID
db_instance: Database instance (optional, uses default if None)
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
user_db = db_instance or get_user_db()
try: try:
with user_db.get_connection() as conn: with user_db.get_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()

View File

@@ -111,5 +111,18 @@ class Database:
finally: finally:
conn.close() conn.close()
# Create a singleton instance
user_db = Database() # Global instance for backward compatibility (will be modified in tests)
_default_instance = None
def get_user_db():
"""Get the default database instance."""
global _default_instance
if _default_instance is None:
_default_instance = Database()
return _default_instance
def set_user_db(instance):
"""Set a custom database instance (for testing)."""
global _default_instance
_default_instance = instance

View File

@@ -21,6 +21,7 @@ from components.register.constants import Routes as RegisterRoutes
from config import APP_PORT from config import APP_PORT
from constants import Routes from constants import Routes
from core.dbengine import DbException from core.dbengine import DbException
from core.database_manager import initialize_database
from core.instance_manager import InstanceManager from core.instance_manager import InstanceManager
from core.settings_management import SettingsManager from core.settings_management import SettingsManager
from pages.admin_import_settings import AdminImportSettings, IMPORT_SETTINGS_PATH, import_settings_app from pages.admin_import_settings import AdminImportSettings, IMPORT_SETTINGS_PATH, import_settings_app
@@ -39,6 +40,9 @@ logging.config.dictConfig(config)
logger = logging.getLogger("MainApp") logger = logging.getLogger("MainApp")
# Initialize database
initialize_database()
# daisy_ui_links_v4 = ( # daisy_ui_links_v4 = (
# Link(href="https://cdn.jsdelivr.net/npm/daisyui@latest/dist/full.min.css", rel="stylesheet", type="text/css"), # Link(href="https://cdn.jsdelivr.net/npm/daisyui@latest/dist/full.min.css", rel="stylesheet", type="text/css"),
# Script(src="https://cdn.tailwindcss.com"), # Script(src="https://cdn.tailwindcss.com"),

View File

@@ -11,69 +11,117 @@ from pathlib import Path
from components.datagrid.DataGrid import reset_instances from components.datagrid.DataGrid import reset_instances
from playwright_config import BROWSER_CONFIG, BASE_URL from playwright_config import BROWSER_CONFIG, BASE_URL
from tests.fixtures.test_database import TestDatabaseManager
from tests.fixtures.test_users import TestUsers
USER_EMAIL = "test@mail.com" USER_EMAIL = "test@mail.com"
USER_ID = "test_user" USER_ID = "test_user"
APP_PORT = 5002 APP_PORT = 5002
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def app_server(): def test_database():
"""Start the application server for end-to-end tests""" """Configure temporary database for tests"""
# Use the same Python executable that's running pytest # Create temporary DB
python_executable = sys.executable temp_db_path = TestDatabaseManager.create_temp_db_path()
# Get the absolute path to the src directory # Save original environment
project_root = Path(__file__).parent.parent original_env = {
src_path = project_root / "src" "DB_PATH": os.environ.get("DB_PATH"),
"ADMIN_EMAIL": os.environ.get("ADMIN_EMAIL"),
# Start the application server "ADMIN_PASSWORD": os.environ.get("ADMIN_PASSWORD"),
print(f"Starting server on url {BASE_URL}...") "SECRET_KEY": os.environ.get("SECRET_KEY")
port = BASE_URL.split(':')[-1].split('/')[0] if ':' in BASE_URL else APP_PORT }
print(f"Using port {port}")
# Configure test environment
server_process = subprocess.Popen( TestDatabaseManager.setup_test_environment(temp_db_path)
[python_executable, "main.py", "--port", "5002"],
cwd=str(src_path), # Change to src directory where main.py is located print(f"Test database created at: {temp_db_path}")
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, yield temp_db_path
env=os.environ.copy() # Inherit environment variables
) # Cleanup: restore environment and clean up DB
TestDatabaseManager.restore_original_environment(original_env)
# Wait for the server to start TestDatabaseManager.cleanup_test_db(temp_db_path)
max_retries = 10 # Wait up to 30 seconds print(f"Test database cleaned up: {temp_db_path}")
for i in range(max_retries):
try:
print(f"Waiting retry {i}/{max_retries}") @pytest.fixture(scope="session")
response = requests.get(BASE_URL, timeout=1) def test_users():
if response.status_code in [200, 302, 404]: # Server is responding """Define available test users"""
print(f"Server started successfully after {i + 1} attempts") return {
break "admin": TestUsers.ADMIN,
except requests.exceptions.RequestException: "regular_user": TestUsers.REGULAR_USER,
time.sleep(1) "invalid_cases": TestUsers.INVALID_CREDENTIALS,
else: "protected_urls": TestUsers.PROTECTED_URLS
# If we get here, the server didn't start in time }
print(f"Failed to start after {max_retries} attempts.")
server_process.kill()
stdout, stderr = server_process.communicate() @pytest.fixture(scope="session")
raise RuntimeError( def app_server(test_database, test_users):
f"Server failed to start within {max_retries} seconds.\n" """Start application server with test database"""
f"STDOUT: {stdout.decode()}\n" # Use the same Python executable that's running pytest
f"STDERR: {stderr.decode()}" python_executable = sys.executable
# Get the absolute path to the src directory
project_root = Path(__file__).parent.parent
src_path = project_root / "src"
# Create test environment
test_env = os.environ.copy()
test_env["DB_PATH"] = test_database
test_env["ADMIN_EMAIL"] = test_users["admin"]["email"]
test_env["ADMIN_PASSWORD"] = test_users["admin"]["password"]
test_env["SECRET_KEY"] = "test-secret-key-for-e2e-tests"
# Start the application server
print(f"Starting server on url {BASE_URL} with test database...")
port = BASE_URL.split(':')[-1].split('/')[0] if ':' in BASE_URL else APP_PORT
print(f"Using port {port}")
print(f"Test DB path: {test_database}")
server_process = subprocess.Popen(
[python_executable, "main.py", "--port", "5002"],
cwd=str(src_path), # Change to src directory where main.py is located
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=test_env # Use test environment
) )
# Yield control to the tests # Wait for the server to start
print('Server started !') max_retries = 10 # Wait up to 30 seconds
yield server_process for i in range(max_retries):
try:
# Cleanup: terminate the server after tests print(f"Waiting retry {i}/{max_retries}")
server_process.terminate() response = requests.get(BASE_URL, timeout=1)
try: if response.status_code in [200, 302, 404]: # Server is responding
server_process.wait(timeout=5) print(f"Server started successfully after {i + 1} attempts")
print('Server stopped.') break
except subprocess.TimeoutExpired: except requests.exceptions.RequestException:
server_process.kill() time.sleep(1)
server_process.wait() else:
print('Server killed !') # If we get here, the server didn't start in time
print(f"Failed to start after {max_retries} attempts.")
server_process.kill()
stdout, stderr = server_process.communicate()
raise RuntimeError(
f"Server failed to start within {max_retries} seconds.\n"
f"STDOUT: {stdout.decode()}\n"
f"STDERR: {stderr.decode()}"
)
# Yield control to the tests
print('Test server started with isolated database!')
yield server_process
# Cleanup: terminate the server after tests
server_process.terminate()
try:
server_process.wait(timeout=5)
print('Test server stopped.')
except subprocess.TimeoutExpired:
server_process.kill()
server_process.wait()
print('Test server killed!')
@pytest.fixture(scope="session") @pytest.fixture(scope="session")

0
tests/fixtures/__init__.py vendored Normal file
View File

0
tests/fixtures/app_factory.py vendored Normal file
View File

56
tests/fixtures/test_database.py vendored Normal file
View File

@@ -0,0 +1,56 @@
import tempfile
import os
import shutil
from pathlib import Path
class TestDatabaseManager:
"""Manager for temporary test databases"""
@staticmethod
def create_temp_db_path():
"""Create a unique temporary path for test database"""
temp_dir = tempfile.mkdtemp(prefix="test_mmt_")
return os.path.join(temp_dir, "test_tools.db")
@staticmethod
def setup_test_environment(db_path):
"""Configure environment to use test database"""
os.environ["DB_PATH"] = db_path
os.environ["ADMIN_EMAIL"] = "test.admin@test.com"
os.environ["ADMIN_PASSWORD"] = "TestAdmin123"
os.environ["SECRET_KEY"] = "test-secret-key-for-e2e-tests"
@staticmethod
def inject_test_database(db_path):
"""Inject test database instance into the application"""
# Import here to avoid circular imports
from src.core.user_database import Database, set_user_db
# Create test database instance
test_db = Database(db_path)
# Set it as the active instance
set_user_db(test_db)
return test_db
@staticmethod
def cleanup_test_db(db_path):
"""Clean up temporary database"""
if os.path.exists(db_path):
# Clean up the complete temporary directory
temp_dir = os.path.dirname(db_path)
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
@staticmethod
def restore_original_environment(original_env):
"""Restore original environment"""
for key, value in original_env.items():
if value is None:
# Variable didn't exist before
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = value

61
tests/fixtures/test_users.py vendored Normal file
View File

@@ -0,0 +1,61 @@
"""Test user definitions for E2E authentication testing"""
class TestUsers:
"""Class containing test user data"""
# Admin user (automatically created on startup)
ADMIN = {
"email": "test.admin@test.com",
"password": "TestAdmin123"
}
# Regular user to create for certain tests
REGULAR_USER = {
"username": "testuser",
"email": "user@test.com",
"password": "TestUser123"
}
# Failed authentication test cases
INVALID_CREDENTIALS = [
{
"name": "wrong_password",
"email": "test.admin@test.com",
"password": "wrongpass",
"expected_error": "Invalid email or password"
},
{
"name": "nonexistent_user",
"email": "nonexistent@test.com",
"password": "TestAdmin123",
"expected_error": "Invalid email or password"
},
{
"name": "invalid_email_format",
"email": "invalid.email",
"password": "TestAdmin123",
"expected_error": "Invalid email or password"
},
{
"name": "empty_email",
"email": "",
"password": "TestAdmin123",
"expected_error": "Email and password are required"
},
{
"name": "empty_password",
"email": "test.admin@test.com",
"password": "",
"expected_error": "Email and password are required"
}
]
# Protected URLs to test for unauthorized access
PROTECTED_URLS = [
"/",
"/admin",
"/repositories",
"/workflows",
"/applications"
]

View File

@@ -52,7 +52,7 @@ class TestAuthentication:
assert has_login_content, "Page does not contain login-related content" assert has_login_content, "Page does not contain login-related content"
# Option 4: Ensure we're not on a protected page # Option 4: Ensure we're not on a protected page
# Check that we don't see protected content like "dashboard", "logout", etc. # Check that we don't see protected content like "dashboard", "logout", "profile", "settings"]
protected_content = ["dashboard", "logout", "profile", "settings"] protected_content = ["dashboard", "logout", "profile", "settings"]
page_text = page.locator("body").inner_text().lower() page_text = page.locator("body").inner_text().lower()
@@ -61,7 +61,7 @@ class TestAuthentication:
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.regression @pytest.mark.regression
def test_login_page_has_required_elements(self, page: Page): def test_login_page_has_required_elements(self, app_server, page: Page):
"""Test that the login page contains all required elements""" """Test that the login page contains all required elements"""
page.goto(BASE_URL) page.goto(BASE_URL)
page.wait_for_load_state("networkidle") page.wait_for_load_state("networkidle")
@@ -81,7 +81,7 @@ class TestAuthentication:
expect(submit_button.first).to_be_visible() expect(submit_button.first).to_be_visible()
@pytest.mark.e2e @pytest.mark.e2e
def test_page_loads_without_errors(self, page: Page): def test_page_loads_without_errors(self, app_server, page: Page):
"""Test that the login page loads without console errors""" """Test that the login page loads without console errors"""
console_errors = [] console_errors = []
@@ -101,3 +101,23 @@ class TestAuthentication:
# Check that the page has a title # Check that the page has a title
expect(page).to_have_title("My Managing Tools") expect(page).to_have_title("My Managing Tools")
# New test to validate database isolation
@pytest.mark.e2e
@pytest.mark.smoke
def test_test_database_isolation(self, app_server, test_database, test_users):
"""Test that application uses isolated test database"""
import os
# Verify test environment variables are configured
assert os.environ.get("DB_PATH") == test_database
assert os.environ.get("ADMIN_EMAIL") == test_users["admin"]["email"]
assert os.environ.get("ADMIN_PASSWORD") == test_users["admin"]["password"]
# Verify temporary database file exists
assert os.path.exists(test_database), f"Test database file not found: {test_database}"
# Verify path contains 'test_mmt_' to confirm isolation
assert "test_mmt_" in test_database, "Database path should contain test prefix"
print(f"✅ Test database isolation confirmed: {test_database}")