2 Commits

17 changed files with 550 additions and 34 deletions

1
.gitignore vendored
View File

@@ -8,6 +8,7 @@ htmlcov
.venv
tests/settings_from_unit_testing.json
tests/TestDBEngineRoot
test-results
.sesskey
tools.db
.mytools_db

View File

@@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyInitNewSignatureInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>

View File

@@ -1,4 +1,5 @@
.PHONY: test
.PHONY: test install-playwright install-playwright-deps test-e2e test-e2e-headed test-regression
test:
pytest
@@ -7,6 +8,31 @@ coverage:
coverage run --source=src -m pytest
coverage html
install-playwright:
@echo "Installing Playwright and pytest-playwright via pip..."
pip install playwright pytest-playwright
@echo "Installing Playwright browsers..."
playwright install
@echo "Installing system dependencies for Ubuntu/WSL2..."
playwright install-deps
@echo "Playwright installation complete!"
install-playwright-deps:
@echo "Installing system dependencies for Playwright on Ubuntu/WSL2..."
playwright install-deps
test-e2e:
pytest tests/test_e2e_regression.py -m "e2e" --browser chromium
test-e2e-headed:
pytest -m "e2e" --browser chromium --headed
test-regression:
pytest tests/test_e2e_regression.py -m "regression" --browser chromium
test-all-browsers:
pytest tests/test_e2e_regression.py -m "e2e" --browser chromium --browser firefox --browser webkit
clean:
rm -rf build
rm -rf htmlcov

View File

@@ -41,4 +41,18 @@ docker-compose build
cd src
python -m cProfile -o profile.out main.py
snakeviz profile.out # 'pip install snakeviz' if snakeviz is not installed
```
```
# End to end testing
```shell
make install-playwright
```
Alternatively, you can install Playwright and pytest-playwright via pip:
```shell
pip install playwright pytest-playwright
playwright install
playwright install-deps # may be required on Linux
playwright --version
```

11
pytest.ini Normal file
View File

@@ -0,0 +1,11 @@
[pytest]
python_files = test_*.py
python_classes = Test*
python_functions = test_*
testpaths = tests
pythonpath = src tests
addopts = --tb=short -v
markers =
e2e: marks tests as end-to-end tests
smoke: marks tests as smoke tests
regression: marks tests as regression tests

View File

@@ -10,6 +10,8 @@ click==8.1.7
et-xmlfile==1.1.0
fastcore==1.8.5
fastlite==0.2.1
gprof2dot==2025.4.14
greenlet==3.2.4
h11==0.14.0
httpcore==1.0.5
httptools==0.6.1
@@ -26,28 +28,37 @@ oauthlib==3.2.2
openpyxl==3.1.5
packaging==24.1
pandas==2.2.3
pandas-stubs==2.3.2.250827
playwright==1.55.0
pluggy==1.5.0
pydantic==2.11.5
pydantic-settings==2.9.1
pydantic_core==2.33.2
pyee==13.0.0
Pygments==2.19.1
pytest==8.3.3
pytest-base-url==2.1.0
pytest-mock==3.14.1
pytest-playwright==0.7.0
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-fasthtml==0.12.21
python-multipart==0.0.10
python-slugify==8.0.4
pytz==2024.2
PyYAML==6.0.2
requests==2.32.3
rich==14.0.0
shellingham==1.5.4
six==1.16.0
snakeviz==2.2.2
sniffio==1.3.1
soupsieve==2.6
sqlite-minutils==3.37.0.post3
sse-starlette==2.3.6
starlette==0.38.5
text-unidecode==1.3
tornado==6.5.2
typer==0.16.0
typing-inspection==0.4.1
typing_extensions==4.13.2

View File

@@ -0,0 +1,18 @@
"""Database manager for application initialization."""
import logging
from .user_database import Database, set_user_db
logger = logging.getLogger(__name__)
def initialize_database():
"""Initialize the application database."""
try:
# Create default database instance
db_instance = Database()
set_user_db(db_instance)
logger.info("Database initialized successfully")
return db_instance
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise

View File

@@ -3,15 +3,17 @@ import secrets
from datetime import datetime
from typing import Any
from .user_database import user_db
from .user_database import get_user_db
logger = logging.getLogger(__name__)
class UserDAO:
"""Data Access Object for user management."""
@staticmethod
def create_user(username: str, email: str, password: str | None = None, github_id: str | None = None) -> int:
def create_user(username: str, email: str, password: str | None = None, github_id: str | None = None,
db_instance=None) -> int:
"""
Create a new user with email/password or GitHub authentication.
@@ -20,10 +22,12 @@ class UserDAO:
email: The user's email
password: The user's password (optional)
github_id: GitHub user ID (optional)
db_instance: Database instance (optional, uses default if None)
Returns:
int: ID of the new user or 0 if creation failed
"""
user_db = db_instance or get_user_db()
try:
with user_db.get_connection() as conn:
cursor = conn.cursor()
@@ -48,9 +52,9 @@ class UserDAO:
password_hash = user_db._hash_password(password, salt)
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt, github_id, is_admin)
VALUES (?, ?, ?, ?, ?, 0)
''', (username, email, password_hash, salt, github_id))
INSERT INTO users (username, email, password_hash, salt, github_id, is_admin)
VALUES (?, ?, ?, ?, ?, 0)
''', (username, email, password_hash, salt, github_id))
conn.commit()
return cursor.lastrowid
@@ -59,17 +63,19 @@ class UserDAO:
return 0
@staticmethod
def authenticate_email(email: str, password: str) -> dict[str, Any] | None:
def authenticate_email(email: str, password: str, db_instance=None) -> dict[str, Any] | None:
"""
Authenticate a user with email and password.
Args:
email: The user's email
password: The user's password
db_instance: Database instance (optional, uses default if None)
Returns:
Dict or None: User record if authentication succeeds, None otherwise
"""
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn:
cursor = conn.cursor()
@@ -101,7 +107,8 @@ class UserDAO:
return dict(user)
@staticmethod
def find_or_create_github_user(github_id: str, username: str, email: str | None) -> dict[str, Any] | None:
def find_or_create_github_user(github_id: str, username: str, email: str | None, db_instance=None) -> dict[
str, Any] | None:
"""
Find existing GitHub user or create a new one.
@@ -109,10 +116,12 @@ class UserDAO:
github_id: GitHub user ID
username: The username from GitHub
email: The email from GitHub (may be None)
db_instance: Database instance (optional, uses default if None)
Returns:
Dict or None: User record if found or created, None on error
"""
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn:
cursor = conn.cursor()
@@ -138,9 +147,9 @@ class UserDAO:
try:
cursor.execute('''
INSERT INTO users (username, email, github_id, is_admin)
VALUES (?, ?, ?, 0)
''', (username, user_email, github_id))
INSERT INTO users (username, email, github_id, is_admin)
VALUES (?, ?, ?, 0)
''', (username, user_email, github_id))
user_id = cursor.lastrowid
conn.commit()
@@ -157,16 +166,18 @@ class UserDAO:
return None
@staticmethod
def get_user_by_id(user_id: int) -> dict[str, Any] | None:
def get_user_by_id(user_id: int, db_instance=None) -> dict[str, Any] | None:
"""
Get a user by ID.
Args:
user_id: The user ID
db_instance: Database instance (optional, uses default if None)
Returns:
Dict or None: User record if found, None otherwise
"""
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn:
cursor = conn.cursor()
@@ -179,42 +190,51 @@ class UserDAO:
return dict(user) if user else None
@staticmethod
def get_all_users(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
def get_all_users(limit: int = 100, offset: int = 0, db_instance=None) -> list[dict[str, Any]]:
"""
Get all users with pagination.
Args:
limit: Maximum number of users to return
offset: Number of users to skip
db_instance: Database instance (optional, uses default if None)
Returns:
List of user records
"""
user_db = db_instance or get_user_db()
with user_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, is_admin, created_at, last_login,
(github_id IS NOT NULL) as is_github_user
FROM users
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (limit, offset))
SELECT id,
username,
email,
is_admin,
created_at,
last_login,
(github_id IS NOT NULL) as is_github_user
FROM users
ORDER BY created_at DESC LIMIT ?
OFFSET ?
''', (limit, offset))
return [dict(user) for user in cursor.fetchall()]
@staticmethod
def set_admin_status(user_id: int, is_admin: bool) -> bool:
def set_admin_status(user_id: int, is_admin: bool, db_instance=None) -> bool:
"""
Change a user's admin status.
Args:
user_id: The user ID
is_admin: True to make admin, False to remove admin status
db_instance: Database instance (optional, uses default if None)
Returns:
bool: True if successful, False otherwise
"""
user_db = db_instance or get_user_db()
try:
with user_db.get_connection() as conn:
cursor = conn.cursor()
@@ -231,16 +251,18 @@ class UserDAO:
return False
@staticmethod
def delete_user(user_id: int) -> bool:
def delete_user(user_id: int, db_instance=None) -> bool:
"""
Delete a user and all their data.
Args:
user_id: The user ID
db_instance: Database instance (optional, uses default if None)
Returns:
bool: True if successful, False otherwise
"""
user_db = db_instance or get_user_db()
try:
with user_db.get_connection() as conn:
cursor = conn.cursor()

View File

@@ -111,5 +111,18 @@ class Database:
finally:
conn.close()
# Create a singleton instance
user_db = Database()
# Global instance for backward compatibility (will be modified in tests)
_default_instance = None
def get_user_db():
"""Get the default database instance."""
global _default_instance
if _default_instance is None:
_default_instance = Database()
return _default_instance
def set_user_db(instance):
"""Set a custom database instance (for testing)."""
global _default_instance
_default_instance = instance

View File

@@ -1,6 +1,7 @@
# global layout
import logging.config
import click
import yaml
from fasthtml.common import *
@@ -20,6 +21,7 @@ from components.register.constants import Routes as RegisterRoutes
from config import APP_PORT
from constants import Routes
from core.dbengine import DbException
from core.database_manager import initialize_database
from core.instance_manager import InstanceManager
from core.settings_management import SettingsManager
from pages.admin_import_settings import AdminImportSettings, IMPORT_SETTINGS_PATH, import_settings_app
@@ -38,6 +40,9 @@ logging.config.dictConfig(config)
logger = logging.getLogger("MainApp")
# Initialize database
initialize_database()
# daisy_ui_links_v4 = (
# Link(href="https://cdn.jsdelivr.net/npm/daisyui@latest/dist/full.min.css", rel="stylesheet", type="text/css"),
# Script(src="https://cdn.tailwindcss.com"),
@@ -306,10 +311,11 @@ def not_found(path: str, session=None):
setup_toasts(app)
def main():
logger.info(f" Starting FastHTML server on http://localhost:{APP_PORT}")
serve(port=APP_PORT)
@click.command()
@click.option('--port', default=APP_PORT, help='Port to run the server on')
def main(port):
logger.info(f" Starting FastHTML server on http://localhost:{port}")
serve(port=port)
if __name__ == "__main__":
# Start your application

View File

@@ -1,12 +1,144 @@
from io import BytesIO
import subprocess
import pandas as pd
import pytest
import requests
import time
import sys
import os
from pathlib import Path
from components.datagrid.DataGrid import reset_instances
from playwright_config import BROWSER_CONFIG, BASE_URL
from tests.fixtures.test_database import TestDatabaseManager
from tests.fixtures.test_users import TestUsers
USER_EMAIL = "test@mail.com"
USER_ID = "test_user"
APP_PORT = 5002
@pytest.fixture(scope="session")
def test_database():
"""Configure temporary database for tests"""
# Create temporary DB
temp_db_path = TestDatabaseManager.create_temp_db_path()
# Save original environment
original_env = {
"DB_PATH": os.environ.get("DB_PATH"),
"ADMIN_EMAIL": os.environ.get("ADMIN_EMAIL"),
"ADMIN_PASSWORD": os.environ.get("ADMIN_PASSWORD"),
"SECRET_KEY": os.environ.get("SECRET_KEY")
}
# Configure test environment
TestDatabaseManager.setup_test_environment(temp_db_path)
print(f"Test database created at: {temp_db_path}")
yield temp_db_path
# Cleanup: restore environment and clean up DB
TestDatabaseManager.restore_original_environment(original_env)
TestDatabaseManager.cleanup_test_db(temp_db_path)
print(f"Test database cleaned up: {temp_db_path}")
@pytest.fixture(scope="session")
def test_users():
"""Define available test users"""
return {
"admin": TestUsers.ADMIN,
"regular_user": TestUsers.REGULAR_USER,
"invalid_cases": TestUsers.INVALID_CREDENTIALS,
"protected_urls": TestUsers.PROTECTED_URLS
}
@pytest.fixture(scope="session")
def app_server(test_database, test_users):
"""Start application server with test database"""
# Use the same Python executable that's running pytest
python_executable = sys.executable
# Get the absolute path to the src directory
project_root = Path(__file__).parent.parent
src_path = project_root / "src"
# Create test environment
test_env = os.environ.copy()
test_env["DB_PATH"] = test_database
test_env["ADMIN_EMAIL"] = test_users["admin"]["email"]
test_env["ADMIN_PASSWORD"] = test_users["admin"]["password"]
test_env["SECRET_KEY"] = "test-secret-key-for-e2e-tests"
# Start the application server
print(f"Starting server on url {BASE_URL} with test database...")
port = BASE_URL.split(':')[-1].split('/')[0] if ':' in BASE_URL else APP_PORT
print(f"Using port {port}")
print(f"Test DB path: {test_database}")
server_process = subprocess.Popen(
[python_executable, "main.py", "--port", "5002"],
cwd=str(src_path), # Change to src directory where main.py is located
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=test_env # Use test environment
)
# Wait for the server to start
max_retries = 10 # Wait up to 30 seconds
for i in range(max_retries):
try:
print(f"Waiting retry {i}/{max_retries}")
response = requests.get(BASE_URL, timeout=1)
if response.status_code in [200, 302, 404]: # Server is responding
print(f"Server started successfully after {i + 1} attempts")
break
except requests.exceptions.RequestException:
time.sleep(1)
else:
# If we get here, the server didn't start in time
print(f"Failed to start after {max_retries} attempts.")
server_process.kill()
stdout, stderr = server_process.communicate()
raise RuntimeError(
f"Server failed to start within {max_retries} seconds.\n"
f"STDOUT: {stdout.decode()}\n"
f"STDERR: {stderr.decode()}"
)
# Yield control to the tests
print('Test server started with isolated database!')
yield server_process
# Cleanup: terminate the server after tests
server_process.terminate()
try:
server_process.wait(timeout=5)
print('Test server stopped.')
except subprocess.TimeoutExpired:
server_process.kill()
server_process.wait()
print('Test server killed!')
@pytest.fixture(scope="session")
def browser_context_args(browser_context_args):
"""Configure browser context arguments"""
return {
**browser_context_args,
**BROWSER_CONFIG,
"record_video_dir": "test-results/videos/",
"record_har_path": "test-results/har/trace.har",
}
@pytest.fixture(scope="session")
def app_url():
"""Base URL for the application"""
return BASE_URL
@pytest.fixture

0
tests/fixtures/__init__.py vendored Normal file
View File

0
tests/fixtures/app_factory.py vendored Normal file
View File

56
tests/fixtures/test_database.py vendored Normal file
View File

@@ -0,0 +1,56 @@
import tempfile
import os
import shutil
from pathlib import Path
class TestDatabaseManager:
"""Manager for temporary test databases"""
@staticmethod
def create_temp_db_path():
"""Create a unique temporary path for test database"""
temp_dir = tempfile.mkdtemp(prefix="test_mmt_")
return os.path.join(temp_dir, "test_tools.db")
@staticmethod
def setup_test_environment(db_path):
"""Configure environment to use test database"""
os.environ["DB_PATH"] = db_path
os.environ["ADMIN_EMAIL"] = "test.admin@test.com"
os.environ["ADMIN_PASSWORD"] = "TestAdmin123"
os.environ["SECRET_KEY"] = "test-secret-key-for-e2e-tests"
@staticmethod
def inject_test_database(db_path):
"""Inject test database instance into the application"""
# Import here to avoid circular imports
from src.core.user_database import Database, set_user_db
# Create test database instance
test_db = Database(db_path)
# Set it as the active instance
set_user_db(test_db)
return test_db
@staticmethod
def cleanup_test_db(db_path):
"""Clean up temporary database"""
if os.path.exists(db_path):
# Clean up the complete temporary directory
temp_dir = os.path.dirname(db_path)
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
@staticmethod
def restore_original_environment(original_env):
"""Restore original environment"""
for key, value in original_env.items():
if value is None:
# Variable didn't exist before
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = value

61
tests/fixtures/test_users.py vendored Normal file
View File

@@ -0,0 +1,61 @@
"""Test user definitions for E2E authentication testing"""
class TestUsers:
"""Class containing test user data"""
# Admin user (automatically created on startup)
ADMIN = {
"email": "test.admin@test.com",
"password": "TestAdmin123"
}
# Regular user to create for certain tests
REGULAR_USER = {
"username": "testuser",
"email": "user@test.com",
"password": "TestUser123"
}
# Failed authentication test cases
INVALID_CREDENTIALS = [
{
"name": "wrong_password",
"email": "test.admin@test.com",
"password": "wrongpass",
"expected_error": "Invalid email or password"
},
{
"name": "nonexistent_user",
"email": "nonexistent@test.com",
"password": "TestAdmin123",
"expected_error": "Invalid email or password"
},
{
"name": "invalid_email_format",
"email": "invalid.email",
"password": "TestAdmin123",
"expected_error": "Invalid email or password"
},
{
"name": "empty_email",
"email": "",
"password": "TestAdmin123",
"expected_error": "Email and password are required"
},
{
"name": "empty_password",
"email": "test.admin@test.com",
"password": "",
"expected_error": "Email and password are required"
}
]
# Protected URLs to test for unauthorized access
PROTECTED_URLS = [
"/",
"/admin",
"/repositories",
"/workflows",
"/applications"
]

View File

@@ -0,0 +1,28 @@
import os
from playwright.sync_api import Playwright
# Playwright configuration
BASE_URL = os.getenv("APP_URL", "http://localhost:5002")
TIMEOUT = 30000
EXPECT_TIMEOUT = 5000
def pytest_configure():
"""Configure pytest for Playwright"""
pass
# Browser configuration
BROWSER_CONFIG = {
#"headless": os.getenv("HEADLESS", "true").lower() == "true",
"viewport": {"width": 1280, "height": 720},
"ignore_https_errors": True,
}
# Test configuration
TEST_CONFIG = {
"base_url": BASE_URL,
"timeout": TIMEOUT,
"expect_timeout": EXPECT_TIMEOUT,
"screenshot": "only-on-failure",
"video": "retain-on-failure",
"trace": "retain-on-failure",
}

View File

@@ -0,0 +1,123 @@
# tests/test_e2e_authentication.py
import pytest
from playwright.sync_api import Page, expect
from playwright_config import BASE_URL
class TestAuthentication:
"""Tests for authentication and login functionality"""
@pytest.mark.e2e
@pytest.mark.smoke
def test_unauthenticated_user_redirected_to_login(self, app_server, page: Page):
"""Test that when not logged in, the default page is the login page"""
# Navigate to the root URL
page.goto(BASE_URL)
# Wait for the page to fully load
page.wait_for_load_state("networkidle")
# Check that we're on the login page
# Option 1: Check URL contains login-related path
expect(page).to_have_url(f"{BASE_URL}/authlogin/login")
# Option 2: Check for login form elements
# Look for typical login form elements
login_indicators = [
"input[type='email']",
"input[type='password']",
"input[name*='username']",
"input[name*='email']",
"input[name*='password']",
"button[type='submit']",
"form"
]
# At least one login indicator should be present
login_form_found = False
for selector in login_indicators:
if page.locator(selector).count() > 0:
login_form_found = True
break
assert login_form_found, "No login form elements found on the page"
# Option 3: Check for login-related text content
page_content = page.content().lower()
login_keywords = ["login", "sign in", "authenticate", "username", "password", "email"]
has_login_content = any(keyword in page_content for keyword in login_keywords)
assert has_login_content, "Page does not contain login-related content"
# Option 4: Ensure we're not on a protected page
# Check that we don't see protected content like "dashboard", "logout", "profile", "settings"]
protected_content = ["dashboard", "logout", "profile", "settings"]
page_text = page.locator("body").inner_text().lower()
for protected_word in protected_content:
assert protected_word not in page_text, f"Found protected content '{protected_word}' when not logged in"
@pytest.mark.e2e
@pytest.mark.regression
def test_login_page_has_required_elements(self, app_server, page: Page):
"""Test that the login page contains all required elements"""
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
# Check for essential login form elements
expect(page.locator("form")).to_be_visible()
# Check for input fields (at least email/username and password)
username_input = page.locator("input[type='email'], input[name*='username'], input[name*='email']")
expect(username_input.first).to_be_visible()
password_input = page.locator("input[type='password'], input[name*='password']")
expect(password_input.first).to_be_visible()
# Check for submit button
submit_button = page.locator("button[type='submit'], input[type='submit']")
expect(submit_button.first).to_be_visible()
@pytest.mark.e2e
def test_page_loads_without_errors(self, app_server, page: Page):
"""Test that the login page loads without console errors"""
console_errors = []
def handle_console(msg):
if msg.type == "error":
console_errors.append(msg.text)
page.on("console", handle_console)
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
# Check for console errors
assert len(console_errors) == 0, f"Console errors found: {console_errors}"
# Check that the page actually loaded (not a 404 or 500 error)
expect(page.locator("body")).to_be_visible()
# Check that the page has a title
expect(page).to_have_title("My Managing Tools")
# New test to validate database isolation
@pytest.mark.e2e
@pytest.mark.smoke
def test_test_database_isolation(self, app_server, test_database, test_users):
"""Test that application uses isolated test database"""
import os
# Verify test environment variables are configured
assert os.environ.get("DB_PATH") == test_database
assert os.environ.get("ADMIN_EMAIL") == test_users["admin"]["email"]
assert os.environ.get("ADMIN_PASSWORD") == test_users["admin"]["password"]
# Verify temporary database file exists
assert os.path.exists(test_database), f"Test database file not found: {test_database}"
# Verify path contains 'test_mmt_' to confirm isolation
assert "test_mmt_" in test_database, "Database path should contain test prefix"
print(f"✅ Test database isolation confirmed: {test_database}")