from core.concept import DefinitionType from evaluators.base_evaluator import MultipleChoices from parsers.BaseParser import BaseParser from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens from parsers.state_machine import ConceptToRecognize, End, MetadataToken, PrepareReadTokens, \ ReadTokens, Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken from parsers.tokenizer import Token, TokenKind, Tokenizer class ReadConcept(State): def run(self, state_context) -> StateResult: start = state_context.parser_input.pos for expected in state_context.concept_to_recognize.expected: if not state_context.parser_input.next_token(False): # eof before the concept is recognized state_context.errors.append(UnexpectedEof(expected, state_context.parser_input.token)) state_context.concept_to_recognize = None return StateResult(self.next_states[0]) token = state_context.parser_input.token if token.value != expected: # token mismatch state_context.errors.append(UnexpectedToken(token, expected)) state_context.concept_to_recognize = None return StateResult(self.next_states[0]) state_context.result.append(MetadataToken(state_context.concept_to_recognize.metadata, start, state_context.parser_input.pos, state_context.concept_to_recognize.resolution_method, "simple")) state_context.concept_to_recognize = None return StateResult(self.next_states[0]) class ManageUnrecognized(State): def run(self, state_context) -> StateResult: if state_context.buffer: buffer_as_str = get_text_from_tokens(state_context.buffer) if len(state_context.result) > 0 and isinstance(old := state_context.result[-1], UnrecognizedToken): # merge unrecognized if needed state_context.result[-1] = UnrecognizedToken(old.buffer + buffer_as_str, old.start, state_context.parser_input.pos - 1) else: state_context.result.append(UnrecognizedToken(buffer_as_str, state_context.buffer_start_pos, state_context.parser_input.pos - 1)) # clear the buffer state_context.buffer.clear() state_context.buffer_start_pos = state_context.parser_input.pos + 1 return StateResult(self.next_states[0]) class SimpleConceptsParser(BaseParser): """" This class is to parse concepts with no parameter ex : def concept I am a new concept It parses a sequence of concepts """ def __init__(self): super().__init__("simple") tokens_wkf = { Start("start", next_states=["prepare read tokens"]), PrepareReadTokens("prepare read tokens", next_states=["read tokens"]), ReadTokens("read tokens", next_states=["read tokens", "eof", "concepts found"]), ManageUnrecognized("eof", next_states=["end"]), ManageUnrecognized("concepts found", next_states=["#concept_wkf"]), End("end", next_states=None) } concept_wkf = { Start("start", next_states=["read concept"]), ReadConcept("read concept", next_states=["#tokens_wkf"]), } self.workflows = { "#tokens_wkf": {t.name: t for t in tokens_wkf}, "#concept_wkf": {t.name: t for t in concept_wkf}, } @staticmethod def get_metadata_from_first_token(context, token: Token): def _get_expected_tokens(_metadata, attr): return [t.strip_quote for t in Tokenizer(getattr(_metadata, attr), yield_eof=False)][1:] if token.type == TokenKind.CONCEPT: name, concept_id = token.value if concept_id: return [ConceptToRecognize(context.sheerka.get_by_id(concept_id), [], "id")] else: metadata = context.sheerka.get_by_name(name) return [ConceptToRecognize(metadata, [], "name")] if not isinstance(metadata, list) else \ [ConceptToRecognize(m, [], "name") for m in metadata] concepts_by_key = [ConceptToRecognize(m, _get_expected_tokens(m, "key"), "key") for m in context.sheerka.get_metadatas_from_first_token("key", token.value) if m.definition_type == DefinitionType.DEFAULT and len(m.parameters) == 0] concepts_by_name = [ConceptToRecognize(m, _get_expected_tokens(m, "name"), "name") for m in context.sheerka.get_metadatas_from_first_token("name", token.value)] return concepts_by_key + concepts_by_name def parse(self, context, parser_input, error_sink): sm = StateMachine(self.workflows) sm_context = StateMachineContext(context, parser_input, self.get_metadata_from_first_token, []) sm.run("#tokens_wkf", "start", sm_context) selected = self.select_best_paths(sm) error_sink.extend(sm_context.errors) return MultipleChoices(selected) def select_best_paths(self, sm): """ Returns a list of sequence :param sm: :type sm: :return: :rtype: """ selected = [] best_score = 1 for path in sm.paths: if path.execution_context.errors: continue score = self._compute_path_score(path) if score > best_score: selected.clear() selected.append(path.execution_context.result) best_score = score elif score == best_score: selected.append(path.execution_context.result) return selected @staticmethod def _compute_path_score(path): """ To compute the score of a path We look at the MetadataToken, that represent the concepts that are recognized The first idea was to look at the concepts that use the maximum of token in a row example : Concept("I am a concept") is better than Concept("I am") + Unrecognized(" a concept") but : Concept("one two") should be equivalent to Concept("one") followed by Concept("two") :param path: :type path: :return: :rtype: """ score = 0 for token in path.execution_context.result: if isinstance(token, MetadataToken): score += token.end - token.start + 1 elif isinstance(token, UnrecognizedToken) and token.buffer.isspace(): score += len(token.buffer) return score