SettingUpClientServer (#7)

Reviewed-on: #7
This commit is contained in:
2023-01-19 16:08:23 +00:00
parent 95459c5b02
commit 21a397861a
19 changed files with 2480 additions and 2 deletions
+62
View File
@@ -0,0 +1,62 @@
import logging
from multiprocessing import Process
from time import sleep
import uvicorn
from fastapi import FastAPI
class MockServer:
""" Core application to test. """
def __init__(self, endpoints: list[dict]):
"""
:param endpoints:
:type endpoints: list of {path: '', response:''}
"""
self.api = FastAPI()
def raise_exception(ex):
raise ex
# register endpoints
for endpoint in endpoints:
method = endpoint["method"] if "method" in endpoint else "get"
if method == "post":
if "exception" in endpoint:
self.api.post(endpoint["path"])(lambda: raise_exception(endpoint["exception"]))
else:
self.api.post(endpoint["path"])(lambda: endpoint["response"])
else:
self.api.get(endpoint["path"])(lambda: endpoint["response"])
# register shutdown
self.api.on_event("shutdown")(self.close)
# create the process
self.proc = Process(target=uvicorn.run,
args=(self.api,),
kwargs={
"host": "127.0.0.1",
"port": 5000,
"log_level": "info"},
daemon=True)
async def close(self):
""" Gracefull shutdown. """
logging.warning("Shutting down the app.")
def start_server(self):
self.proc.start()
sleep(0.1)
def stop_server(self):
self.proc.terminate()
def __enter__(self):
self.start_server()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_server()