import logging from io import BytesIO import pandas as pd from fastapi import UploadFile from fasthtml.components import * from myfasthtml.controls.BaseCommands import BaseCommands from myfasthtml.controls.helpers import mk from myfasthtml.core.commands import Command from myfasthtml.core.dbmanager import DbObject from myfasthtml.core.instances import MultipleInstance logger = logging.getLogger("FileUpload") class FileUploadState(DbObject): def __init__(self, owner): super().__init__(owner) with self.initializing(): # persisted in DB # must not be persisted in DB (prefix ns_ = no_saving_) self.ns_file_name: str | None = None self.ns_sheets_names: list | None = None self.ns_selected_sheet_name: str | None = None self.ns_file_content: bytes | None = None self.ns_on_ok = None self.ns_on_cancel = None class Commands(BaseCommands): def __init__(self, owner): super().__init__(owner) def on_file_uploaded(self): return Command("UploadFile", "Upload file", self._owner, self._owner.upload_file).htmx(target=f"#sn_{self._id}") def on_sheet_selected(self): return Command("SheetSelected", "Sheet selected", self._owner, self._owner.select_sheet).htmx(target=f"#sn_{self._id}") class FileUpload(MultipleInstance): """ Represents a file upload component. This class provides functionality to handle the uploading process of a file, extract sheet names from an Excel file, and enables users to select a specific sheet for further processing. It integrates commands and state management to ensure smooth operation within a parent application. """ def __init__(self, parent, _id=None, **kwargs): super().__init__(parent, _id=_id, **kwargs) self.commands = Commands(self) self._state = FileUploadState(self) self._state.ns_on_ok = None def set_on_ok(self, callback): self._state.ns_on_ok = callback def upload_file(self, file: UploadFile): logger.debug(f"upload_file: {file=}") if file: self._state.ns_file_content = file.file.read() self._state.ns_file_name = file.filename self._state.ns_sheets_names = self.get_sheets_names(self._state.ns_file_content) self._state.ns_selected_sheet_name = self._state.ns_sheets_names[0] if len(self._state.ns_sheets_names) > 0 else 0 return self.mk_sheet_selector() def select_sheet(self, sheet_name: str): logger.debug(f"select_sheet: {sheet_name=}") self._state.ns_selected_sheet_name = sheet_name return self.mk_sheet_selector() def mk_sheet_selector(self): options = [Option("Choose a file...", selected=True, disabled=True)] if self._state.ns_sheets_names is None else \ [Option( name, selected=True if name == self._state.ns_selected_sheet_name else None, ) for name in self._state.ns_sheets_names] return mk.mk(Select( *options, name="sheet_name", id=f"sn_{self._id}", # sn stands for 'sheet name' cls="select select-bordered select-sm w-full ml-2" ), command=self.commands.on_sheet_selected()) def get_content(self): return self._state.ns_file_content def get_file_name(self): return self._state.ns_file_name def get_file_basename(self): if self._state.ns_file_name is None: return None return self._state.ns_file_name.split(".")[0] def get_sheet_name(self): return self._state.ns_selected_sheet_name @staticmethod def get_sheets_names(file_content): try: excel_file = pd.ExcelFile(BytesIO(file_content)) sheet_names = excel_file.sheet_names except Exception as ex: logger.error(f"get_sheets_names: {ex=}") sheet_names = [] return sheet_names def render(self): return Div( Div( mk.mk(Input(type='file', name='file', id=f"fi_{self._id}", # fn stands for 'file name' value=self._state.ns_file_name, hx_preserve="true", hx_encoding='multipart/form-data', cls="file-input file-input-bordered file-input-sm w-full", ), command=self.commands.on_file_uploaded() ), self.mk_sheet_selector(), cls="flex" ), mk.dialog_buttons(on_ok=self._state.ns_on_ok, on_cancel=self._state.ns_on_cancel), ) def __ft__(self): return self.render()