import os import shutil import subprocess import sys import tempfile import time from io import BytesIO from pathlib import Path import pandas as pd import pytest import requests from components.datagrid.DataGrid import reset_instances from playwright_config import BROWSER_CONFIG, BASE_URL from tests.fixtures.test_database import TestDatabaseManager from tests.fixtures.test_users import TestUsers USER_EMAIL = "test@mail.com" USER_ID = "test_user" APP_PORT = 5002 @pytest.fixture(scope="session") def test_database(): """Configure temporary database for tests""" # Create temporary DB temp_db_path = TestDatabaseManager.create_temp_db_path() # Save original environment original_env = { "DB_PATH": os.environ.get("DB_PATH"), "ADMIN_EMAIL": os.environ.get("ADMIN_EMAIL"), "ADMIN_PASSWORD": os.environ.get("ADMIN_PASSWORD"), "SECRET_KEY": os.environ.get("SECRET_KEY") } # Configure test environment TestDatabaseManager.setup_test_environment(temp_db_path) print(f"Test database created at: {temp_db_path}") yield temp_db_path # Cleanup: restore environment and clean up DB TestDatabaseManager.restore_original_environment(original_env) TestDatabaseManager.cleanup_test_db(temp_db_path) print(f"Test database cleaned up: {temp_db_path}") @pytest.fixture(scope="session") def test_users(): """Define available test users""" return { "admin": TestUsers.ADMIN, "regular_user": TestUsers.REGULAR_USER, "invalid_cases": TestUsers.INVALID_CREDENTIALS, "protected_urls": TestUsers.PROTECTED_URLS } @pytest.fixture(scope="session") def test_setting_manager(): # create a dedicated folder for the db db_engine_folder = tempfile.mkdtemp(prefix="test_db_engine") yield db_engine_folder # clean up folder & restore settings manager if os.path.exists(db_engine_folder): shutil.rmtree(db_engine_folder, ignore_errors=True) @pytest.fixture(scope="session") def app_server(test_database, test_users, test_setting_manager): """Start application server with test database""" # Use the same Python executable that's running pytest python_executable = sys.executable # Get the absolute path to the src directory project_root = Path(__file__).parent.parent src_path = project_root / "src" # Create test environment test_env = os.environ.copy() test_env["DB_PATH"] = test_database test_env["DBENGINE_PATH"] = test_setting_manager test_env["ADMIN_EMAIL"] = test_users["admin"]["email"] test_env["ADMIN_PASSWORD"] = test_users["admin"]["password"] test_env["SECRET_KEY"] = "test-secret-key-for-e2e-tests" # Start the application server print(f"Starting server on url {BASE_URL} with test database...") port = BASE_URL.split(':')[-1].split('/')[0] if ':' in BASE_URL else APP_PORT print(f"Using port {port}") print(f"Test DB path: {test_database}") server_process = subprocess.Popen( [python_executable, "main.py", "--port", "5002"], cwd=str(src_path), # Change to src directory where main.py is located stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=test_env # Use test environment ) # Wait for the server to start max_retries = 10 # Wait up to 30 seconds for i in range(max_retries): try: print(f"Waiting retry {i}/{max_retries}") response = requests.get(BASE_URL, timeout=1) if response.status_code in [200, 302, 404]: # Server is responding print(f"Server started successfully after {i + 1} attempts") break except requests.exceptions.RequestException: time.sleep(1) else: # If we get here, the server didn't start in time print(f"Failed to start after {max_retries} attempts.") server_process.kill() stdout, stderr = server_process.communicate() raise RuntimeError( f"Server failed to start within {max_retries} seconds.\n" f"STDOUT: {stdout.decode()}\n" f"STDERR: {stderr.decode()}" ) # Yield control to the tests print('Test server started with isolated database!') yield server_process # Cleanup: terminate the server after tests server_process.terminate() try: server_process.wait(timeout=5) print('Test server stopped.') except subprocess.TimeoutExpired: server_process.kill() server_process.wait() print('Test server killed!') @pytest.fixture(scope="session") def browser_context_args(browser_context_args): """Configure browser context arguments""" return { **browser_context_args, **BROWSER_CONFIG, "record_video_dir": "test-results/videos/", "record_har_path": "test-results/har/trace.har", "record_video_size": {"width": 1280, "height": 720}, } @pytest.fixture(scope="session") def app_url(): """Base URL for the application""" return BASE_URL @pytest.fixture def excel_file_content(): # Create a simple Excel file in memory df = pd.DataFrame({ 'Column 1': ['Aba', 'Johan', 'Kodjo'], 'Column 2': ['Female', 'Male', 'Male'] }) excel_io = BytesIO() df.to_excel(excel_io, index=False) excel_io.seek(0) return excel_io.read() @pytest.fixture def excel_file_content_2(): # Create a simple Excel file in memory df = pd.DataFrame({ 'Column 1': ['C', 'A', 'B'], 'Column 2': [1, 2, 3] }) excel_io = BytesIO() df.to_excel(excel_io, index=False) excel_io.seek(0) return excel_io.read() @pytest.fixture def excel_file_content_with_sheet_name(): # Create a DataFrame df = pd.DataFrame({ 'Column 1 ': ['Aba', 'Johan', 'Kodjo'], 'Column 2': [False, True, True], 'Column 3 ': [10, 20, 30], }) # Create an in-memory bytes buffer excel_io = BytesIO() # Write the dataframe to the buffer with a custom sheet name df.to_excel(excel_io, index=False, sheet_name="sheet_name") # Move the pointer to the start of the stream excel_io.seek(0) return excel_io.read() # Return the binary data @pytest.fixture(autouse=True) def reset_datagrid_instances(): reset_instances() @pytest.fixture def session(): return {"user_id": USER_ID, "user_email": USER_EMAIL}