import logging from io import BytesIO import pandas as pd from fastapi import UploadFile from fasthtml.components import * from myfasthtml.controls.BaseCommands import BaseCommands from myfasthtml.controls.helpers import mk from myfasthtml.core.commands import Command from myfasthtml.core.dbmanager import DbObject from myfasthtml.core.instances import MultipleInstance logger = logging.getLogger("FileUpload") class FileUploadState(DbObject): def __init__(self, owner): super().__init__(owner) with self.initializing(): # persisted in DB # must not be persisted in DB (prefix ns_ = no_saving_) self.ns_file_name: str | None = None self.ns_sheets_names: list | None = None self.ns_selected_sheet_name: str | None = None self.ns_file_content: bytes | None = None class Commands(BaseCommands): def __init__(self, owner): super().__init__(owner) def upload_file(self): return Command("UploadFile", "Upload file", self._owner.upload_file).htmx(target=f"#sn_{self._id}") class FileUpload(MultipleInstance): """ Represents a file upload component. This class provides functionality to handle the uploading process of a file, extract sheet names from an Excel file, and enables users to select a specific sheet for further processing. It integrates commands and state management to ensure smooth operation within a parent application. """ def __init__(self, parent, _id=None, **kwargs): super().__init__(parent, _id=_id, **kwargs) self.commands = Commands(self) self._state = FileUploadState(self) def upload_file(self, file: UploadFile): logger.debug(f"upload_file: {file=}") if file: self._state.ns_file_content = file.file.read() self._state.ns_sheets_names = self.get_sheets_names(self._state.ns_file_content) self._state.ns_selected_sheet_name = self._state.ns_sheets_names[0] if len(self._state.ns_sheets_names) > 0 else 0 return self.mk_sheet_selector() def mk_sheet_selector(self): options = [Option("Choose a file...", selected=True, disabled=True)] if self._state.ns_sheets_names is None else \ [Option( name, selected=True if name == self._state.ns_selected_sheet_name else None, ) for name in self._state.ns_sheets_names] return Select( *options, name="sheet_name", id=f"sn_{self._id}", # sn stands for 'sheet name' cls="select select-bordered select-sm w-full ml-2" ) def get_content(self): return self._state.ns_file_content @staticmethod def get_sheets_names(file_content): try: excel_file = pd.ExcelFile(BytesIO(file_content)) sheet_names = excel_file.sheet_names except Exception as ex: logger.error(f"get_sheets_names: {ex=}") sheet_names = [] return sheet_names def render(self): return Div( Div( mk.mk(Input(type='file', name='file', id=f"fi_{self._id}", # fn stands for 'file name' value=self._state.ns_file_name, hx_preserve="true", hx_encoding='multipart/form-data', cls="file-input file-input-bordered file-input-sm w-full", ), command=self.commands.upload_file() ), self.mk_sheet_selector(), cls="flex" ), mk.dialog_buttons(), ) def __ft__(self): return self.render()