diff --git a/docs/FEAT-021-sya-concepts-parser-fr.md b/docs/FEAT-021-sya-concepts-parser-fr.md new file mode 100644 index 0000000..f911111 --- /dev/null +++ b/docs/FEAT-021-sya-concepts-parser-fr.md @@ -0,0 +1,433 @@ +# SyaConceptsParser + +## Purpose + +`SyaConceptsParser` parse des **séquences de concepts avec paramètres** (variables). +Il complète `SimpleConceptsParser` qui, lui, ne gère que les concepts sans paramètres. + +Exemples de concepts reconnus : +- `a plus b` → reconnaît `1 plus 2`, `x plus y plus z`, etc. +- `if a then b end` → reconnaît `if x > 0 then print x end` +- `a long named concept b` → reconnaît `1 long named concept 2` + +Le cas fondamental visé est la **composition de concepts** : `1 plus 2 times 3`, où +`times` doit être évalué avant `plus`. C'est ce problème de précédence que résout le +Shunting Yard Algorithm. + +--- + +## Le Shunting Yard Algorithm (SYA) + +Algorithme de Dijkstra (1961) qui convertit une expression en notation infixe +(`1 + 2 * 3`) en **notation polonaise inverse** (RPN : `1 2 3 * +`), en respectant +la précédence des opérateurs. + +### Principe + +Deux structures : un **stack d'opérateurs** et une **queue de sortie**. + +``` +Entrée : 1 + 2 * 3 + + ┌──────────────────────────────────────────────┐ +Token │ Action Stack Sortie │ +─────────┼──────────────────────────────────────────────┤ +1 │ → sortie [] [1] │ ++ │ → stack [+] [1] │ +2 │ → sortie [+] [1, 2] │ +* │ prec(*) > prec(+) [+, *] [1, 2] │ + │ → stack (pas de pop) [1, 2] │ +3 │ → sortie [+, *] [1, 2, 3] │ +fin │ vider stack [] [1, 2, 3, *, +] │ +└──────────────────────────────────────────────────────┘ + +RPN : 1 2 3 * + ≡ 1 + (2 * 3) +``` + +### Règle de pop + +Quand on empile un opérateur `op`, on dépile d'abord tout opérateur `top` tel que : +`précédence(top) >= précédence(op)` + +Cela garantit que les opérateurs de haute précédence sont évalués en premier. + +--- + +## Adaptation dans Sheerka + +Le SYA original travaille sur des **tokens atomiques** (chiffres, `+`, `*`). +Sheerka l'adapte pour travailler sur des **concepts** qui : + +1. **S'identifient par plusieurs tokens** — un concept comme `if a then b end` + contient plusieurs mots-clés (`if`, `then`, `end`) entrelacés avec des paramètres. + L'algorithme original reconnaît un opérateur en un seul token. + +2. **Peuvent contenir N paramètres** — un opérateur binaire a exactement 2 opérandes. + Un concept Sheerka peut en avoir 0, 1, 2 ou plus. + +3. **Les paramètres peuvent eux-mêmes être des concepts** — dans `1 plus 2 times 3`, + le paramètre `b` de `plus` est le résultat du concept `times`. La récursion est + gérée par l'imbrication des workflows. + +### Correspondance SYA ↔ Sheerka + +| SYA original | Sheerka | +|---|---| +| Opérateur (`+`, `*`) | `ConceptToRecognize` (concept avec paramètres) | +| Opérande (nombre, variable) | `UnrecognizedToken` ou `ConceptToken` | +| Stack d'opérateurs | `state_context.stack` | +| Queue de sortie | `state_context.parameters` | +| Précédence | `InitConceptParsing.must_pop()` | +| Résultat RPN | `MetadataToken` dans `state_context.result` | + +### Différences structurelles + +**Reconnaissance multi-tokens** — là où SYA lit un token pour identifier `*`, +Sheerka doit lire `long named concept` (3 tokens) pour identifier le concept +`a long named concept b`. La classe `ReadConcept` gère cette lecture séquentielle. + +**Structure `expected`** — le concept `if a then b end` est décomposé en segments : +``` +[("if ", 0), (" then ", 1), (" end", 1)] + ──────── ────────── ────────── + keyword keyword keyword + 0 params 1 param 1 param + avant avant avant +``` +Chaque segment indique combien de paramètres précèdent ce groupe de tokens, et +quels tokens consommer pour valider ce segment. + +**Précédence non encore implémentée** — `must_pop()` retourne toujours `False`. +La composition de concepts n'est donc pas encore active. C'est la prochaine étape +d'implémentation. + +--- + +## Architecture + +### Deux workflows interdépendants + +```mermaid +graph TD + A[#tokens_wkf] -->|concept keyword found| B[#concept_wkf] + B -->|concept fully parsed| A + A -->|EOF| C[end] +``` + +Le parser démarre toujours dans `#tokens_wkf`. À chaque fois qu'un mot-clé +correspondant au premier token d'un concept est détecté, un **fork** est créé et +envoyé dans `#concept_wkf`. Une fois le concept reconnu, on revient dans +`#tokens_wkf` pour continuer la lecture. + +--- + +## Workflow `#tokens_wkf` + +```mermaid +stateDiagram-v2 + [*] --> start + start --> prepare_read_tokens + prepare_read_tokens --> read_tokens + read_tokens --> read_tokens : no concept found (loop) + read_tokens --> eof : EOF + read_tokens --> concepts_found : concept keyword detected (fork) + eof --> end : ManageUnrecognized + concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf + end --> [*] +``` + +**`PrepareReadTokens`** : initialise le buffer et mémorise `buffer_start_pos`. + +**`ReadTokens`** : lit un token, consulte `get_metadata_from_first_token`. Si un concept +peut démarrer à ce token → **fork** avec un clone du contexte où `concept_to_recognize` +est renseigné. Le chemin principal continue à lire. + +**`ManageUnrecognized("concepts found")`** : traite le buffer accumulé avant le +mot-clé (via `SimpleConceptsParser`). Les tokens non reconnus deviennent +`UnrecognizedToken` et sont ajoutés à `parameters`. + +--- + +## Workflow `#concept_wkf` + +```mermaid +stateDiagram-v2 + [*] --> start + start --> init_concept_parsing + init_concept_parsing --> manage_parameters + + manage_parameters --> read_concept + + read_concept --> read_parameters : more segments + read_concept --> finalize_concept : all segments done + read_concept --> token_mismatch : token mismatch + read_concept --> error_eof : unexpected EOF + + read_parameters --> manage_parameters : loop + read_parameters --> finalize_concept : EOF + + finalize_concept --> tokens_wkf : #tokens_wkf + token_mismatch --> end + error_eof --> end + end --> [*] +``` + +**`InitConceptParsing`** : +- Vérifie que le nombre de paramètres déjà collectés est suffisant +- Retire le premier token du segment (déjà consommé par `ReadTokens`) +- Applique le SYA : empile le concept sur le stack + +**`ReadConcept`** : lit les tokens fixes du segment courant un par un. +Si tous correspondent → `pop(0)` du segment et continue. + +**`ReadParameters`** : lit UN token dans le buffer. Revient à +`ManageUnrecognized` qui tente de le reconnaître via `SimpleConceptsParser`. + +**`FinalizeConceptParsing`** : +- Dépile le concept du stack +- Calcule `start` (depuis le premier paramètre) et `end` (position courante) +- Crée un `MetadataToken(concept.metadata, start, end, resolution_method, "sya")` +- Vide stack et parameters +- Retourne à `#tokens_wkf` + +--- + +## Exemple pas à pas — `"1 plus 2"` + +Concept défini : `a plus b` (variables `a`, `b`). + +**Tokens :** +``` +pos : 0 1 2 3 4 5 +tok : "1" " " "plus" " " "2" EOF +``` + +**`expected` pour ce concept :** +``` +[([" ", "plus", " "], 1), ([], 1)] + segment 0 → 1 param avant, lire " plus " + segment 1 → 1 param avant, lire rien (concept se termine par un param) +``` + +**Déroulé :** + +``` +PrepareReadTokens → buffer_start_pos = 0 + +ReadTokens "1" → no concept, buffer = ["1"] +ReadTokens " " → no concept, buffer = ["1", " "] +ReadTokens "plus" → concept "a plus b" trouvé ! + + ┌── FORK ──────────────────────────────────────────────────────┐ + │ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │ + └──────────────────────────────────────────────────────────────┘ + +ManageUnrecognized("concepts found") + buffer = ["1"," "] → SimpleConceptsParser → not found + parameters = [UT("1 ", start=0, end=1)] + buffer_start_pos = 3 + → #concept_wkf + +InitConceptParsing + expected[0] = ([" ","plus"," "], 1) + need 1 param → have 1 ✓ + strip leading WS → ["plus"," "] + pop "plus" (déjà lu) → [" "] + SYA: stack = [CTR(a_plus_b)] + +ManageUnrecognized("manage parameters") : buffer vide → rien + +ReadConcept : lit [" "] → pos 3 = " " ✓ + expected.pop(0) → remaining = [([], 1)] + → "read parameters" + +ReadParameters : lit "2" (pos 4) + buffer = ["2"] + → "manage parameters" + +ManageUnrecognized("manage parameters") + buffer = ["2"] → not a concept + parameters = [UT("1 ", 0, 1), UT("2", 3, 3)] + buffer_start_pos = 5 + +ReadConcept : expected = [([], 1)], lit 0 tokens + expected.pop(0) → empty → "finalize concept" + +FinalizeConceptParsing + concept = stack.pop() = CTR(a_plus_b) + start = parameters[0].start = 0 + end = parser_input.pos = 4 + result.append(MetadataToken(metadata, 0, 4, "key", "sya")) + → #tokens_wkf + +ReadTokens → EOF → ManageUnrecognized("eof") → end +``` + +**Résultat :** +``` +MultipleChoices([ + [MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")] +]) +``` + +--- + +## Exemple — séquence `"1 plus 2 3 plus 7"` + +Même concept `a plus b`. Le parser reconnaît deux concepts successifs dans un seul passage. + +``` +pos : 0 1 2 3 4 5 6 7 8 9 10 11 +tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF +``` + +Après `FinalizeConceptParsing` du premier concept (pos=4), `#tokens_wkf` repart : + +``` +PrepareReadTokens → buffer_start_pos = 5 +ReadTokens " " → buffer = [" "] +ReadTokens "3" → buffer = [" ","3"] +ReadTokens " " → buffer = [" ","3"," "] +ReadTokens "plus" → fork + +ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9 +... +FinalizeConceptParsing + start = 5, end = 10 + result.append(MetadataToken(1001, 5, 10, "key", "sya")) +``` + +**Résultat final (un seul path, deux concepts) :** +``` +MultipleChoices([ + [ + MetadataToken(1001, start=0, end=4, parser="sya"), + MetadataToken(1001, start=5, end=10, parser="sya"), + ] +]) +``` + +--- + +## Exemple futur — composition `"1 plus 2 times 3"` + +> **Note :** cet exemple nécessite l'implémentation de `must_pop()`. +> Aujourd'hui `must_pop()` retourne toujours `False`. + +Concepts : `a plus b` (basse précédence), `a times b` (haute précédence). + +**Comportement attendu après implémentation :** + +``` +Expression : 1 plus 2 times 3 + +SYA avec précédence times > plus : + +Token "1" → parameters = [1] stack = [] +Token "plus" → stack = [plus] parameters = [1] +Token "2" → parameters = [1, 2] stack = [plus] +Token "times" → prec(times) > prec(plus) → pas de pop + stack = [plus, times] parameters = [1, 2] +Token "3" → parameters = [1, 2, 3] stack = [plus, times] + +Finalize : + pop "times" → MetadataToken(times, params=[2, 3]) + pop "plus" → MetadataToken(plus, params=[1, times_result]) +``` + +**Ce que `must_pop()` doit implémenter :** +```python +def must_pop(self, current_concept, top_of_stack_concept): + return precedence(top_of_stack_concept) >= precedence(current_concept) +``` + +Sans cette règle, les deux concepts seraient traités de gauche à droite avec la même +précédence, ce qui donnerait `(1 plus 2) times 3` au lieu de `1 plus (2 times 3)`. + +--- + +## Structure `expected` en détail + +Pour le concept `if a then b end` (clé `"if __var__0 then __var__1 end"`) : + +``` +_get_expected_tokens("if __var__0 then __var__1 end") + +→ [ + (["if", " "], 0), # lire "if " avant le 1er param + ([" ", "then", " "], 1), # lire " then " avant le 2ème param + ([" ", "end"], 1), # lire " end" avant le 3ème... non, 1 param avant + ] +``` + +Pendant le parsing, `expected` est **modifié en place** : +- `InitConceptParsing` retire le premier token du segment 0 (déjà lu par `ReadTokens`) +- `ReadConcept` consomme les tokens du segment courant puis fait `pop(0)` +- Quand `expected` est vide → `FinalizeConceptParsing` + +--- + +## Structures de données clés + +### `StateMachineContext` + +``` +StateMachineContext +├── parser_input ParserInput flux de tokens + curseur +├── other_parsers [SimpleConceptsParser] +├── buffer list[Token] tokens en attente de classification +├── buffer_start_pos int position de début du buffer courant +├── concept_to_recognize ConceptToRecognize | None +├── stack list[CTR] SYA — stack d'opérateurs +├── parameters list[UT|CT] SYA — queue de sortie +├── result list[MetadataToken] +└── errors list +``` + +### `MetadataToken` (sortie) + +``` +MetadataToken +├── metadata ConceptMetadata (id, name, key, variables, ...) +├── start int position du 1er token de l'expression +├── end int position du dernier token +├── resolution_method "key" | "name" | "id" +└── parser "sya" +``` + +### Positions dans `"1 plus 2"` : + +``` +"1 plus 2" + 0 1 2 3 4 + │ │ │ │ │ + 1 _ plus _ 2 + +MetadataToken : start=0, end=4 +``` + +--- + +## Différences avec `SimpleConceptsParser` + +| | `SimpleConceptsParser` | `SyaConceptsParser` | +|---|---|---| +| Concepts ciblés | Sans paramètres | Avec paramètres | +| `concept_wkf` | 2 états | 8 états | +| Contenu de `result` | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` uniquement | +| Paramètres | N/A | Collectés dans `parameters` | +| Parser tag | `"simple"` | `"sya"` | +| SYA | Non | Oui (précédence à implémenter) | + +--- + +## Gestion des erreurs + +| Erreur | Cause | État atteint | +|---|---|---| +| `UnexpectedToken` | Token lu ≠ token attendu du concept | `TokenMismatch` → `end` | +| `UnexpectedEof` | Fin de l'entrée avant fin du concept | `ErrorEof` → `end` | +| `NotEnoughParameters` | Pas assez de params avant un segment | Exception levée | + +Les erreurs sont collectées depuis **tous les paths** et transmises à `error_sink` dans +`parse()`. Un path avec erreurs est exclu de `_select_best_paths`. diff --git a/docs/FEAT-021-sya-concepts-parser.md b/docs/FEAT-021-sya-concepts-parser.md new file mode 100644 index 0000000..760241c --- /dev/null +++ b/docs/FEAT-021-sya-concepts-parser.md @@ -0,0 +1,545 @@ +# SyaConceptsParser + +## Purpose + +`SyaConceptsParser` parses **sequences of concepts with parameters** (variables). +It complements `SimpleConceptsParser`, which only handles parameter-less concepts. + +Examples of recognized concepts: +- `a plus b` → matches `1 plus 2`, `x plus y`, etc. +- `if a then b end` → matches `if x > 0 then print x end` +- `a long named concept b` → matches `1 long named concept 2` + +The primary goal is **concept composition**: `1 plus 2 times 3`, where `times` must +be evaluated before `plus`. This precedence problem is what the Shunting Yard +Algorithm solves. + +--- + +## The Shunting Yard Algorithm (SYA) + +Dijkstra's algorithm (1961) converts an infix expression (`1 + 2 * 3`) into +**Reverse Polish Notation** (RPN: `1 2 3 * +`), respecting operator precedence. + +### Principle + +Two structures: an **operator stack** and an **output queue**. + +``` +Input: 1 + 2 * 3 + + ┌──────────────────────────────────────────────┐ +Token │ Action Stack Output │ +─────────┼──────────────────────────────────────────────┤ +1 │ → output queue [] [1] │ ++ │ → stack [+] [1] │ +2 │ → output queue [+] [1, 2] │ +* │ prec(*) > prec(+) [+, *] [1, 2] │ + │ → stack (no pop) │ +3 │ → output queue [+, *] [1, 2, 3] │ +end │ flush stack [] [1,2,3,*,+] │ +└──────────────────────────────────────────────────────┘ + +RPN: 1 2 3 * + ≡ 1 + (2 * 3) +``` + +### Pop rule + +When pushing operator `op`, first pop any stack-top operator `top` where: +`precedence(top) >= precedence(op)` + +This ensures higher-precedence operators are evaluated first. + +--- + +## Sheerka's Adaptation + +The original SYA works on **atomic tokens** (digits, `+`, `*`). +Sheerka adapts it for **concepts** that: + +1. **Are identified by multiple tokens** — a concept like `if a then b end` has + several keywords (`if`, `then`, `end`) interleaved with parameters. + The original SYA identifies an operator with a single token. + +2. **Can have N parameters** — a binary operator has exactly 2 operands. + A Sheerka concept can have 0, 1, 2 or more parameters. + +3. **Parameters can themselves be concepts** — in `1 plus 2 times 3`, the parameter + `b` of `plus` is the result of the `times` concept. This recursion is handled + by the nested workflow structure. + +### SYA ↔ Sheerka mapping + +| Original SYA | Sheerka | +|---|---| +| Operator (`+`, `*`) | `ConceptToRecognize` (concept with parameters) | +| Operand (number, variable) | `UnrecognizedToken` or `ConceptToken` | +| Operator stack | `state_context.stack` | +| Output queue | `state_context.parameters` | +| Precedence rule | `InitConceptParsing.must_pop()` | +| RPN result | `MetadataToken` in `state_context.result` | + +### Structural differences + +**Multi-token recognition** — where SYA reads a single token to identify `*`, +Sheerka must read `long named concept` (3 tokens) to identify concept +`a long named concept b`. The `ReadConcept` state handles this sequential reading. + +**The `expected` structure** — concept `if a then b end` is decomposed into segments: +``` +[("if ", 0), (" then ", 1), (" end", 1)] + ───────── ────────── ────────── + keyword keyword keyword + 0 params 1 param 1 param + before before before +``` +Each segment states how many parameters precede it and which tokens to consume +to validate it. + +**Precedence not yet implemented** — `must_pop()` always returns `False`. +Concept composition with precedence rules is the next implementation step. + +--- + +## Architecture + +### Two interdependent workflows + +```mermaid +graph TD + A[#tokens_wkf] -->|concept keyword found - fork| B[#concept_wkf] + A -->|token not a concept keyword - buffered, loop| A + B -->|concept fully parsed| A + A -->|EOF| C[end] +``` + +The parser always starts in `#tokens_wkf`. Tokens that do not match any concept +keyword are accumulated in a buffer and the loop continues. Whenever a token +matching the first keyword of a known concept is detected, a **fork** is created +and sent into `#concept_wkf` — the main path keeps looping independently. Once the +concept is recognized, the fork returns to `#tokens_wkf` to continue reading. + +--- + +## Workflow `#tokens_wkf` + +```mermaid +stateDiagram-v2 + [*] --> start + start --> prepare_read_tokens + prepare_read_tokens --> read_tokens + read_tokens --> read_tokens : no concept found (loop) + read_tokens --> eof : EOF + read_tokens --> concepts_found : concept keyword detected (fork) + eof --> end : ManageUnrecognized + concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf + end --> [*] +``` + +**`PrepareReadTokens`**: initializes the buffer and records `buffer_start_pos`. + +**`ReadTokens`**: reads one token, calls `get_metadata_from_first_token`. If a concept +can start at this token → **fork** with a cloned context where `concept_to_recognize` +is set. The main path continues scanning. + +**`ManageUnrecognized("concepts found")`**: processes the buffer accumulated before +the keyword (via `SimpleConceptsParser`). Unrecognized tokens become +`UnrecognizedToken` and are added to `parameters`. + +--- + +## Workflow `#concept_wkf` + +```mermaid +stateDiagram-v2 + [*] --> start + start --> init_concept_parsing + init_concept_parsing --> manage_parameters + + manage_parameters --> read_concept + + read_concept --> read_parameters : more segments + read_concept --> finalize_concept : all segments done + read_concept --> token_mismatch : token mismatch + read_concept --> error_eof : unexpected EOF + + read_parameters --> manage_parameters : loop + read_parameters --> finalize_concept : EOF + + finalize_concept --> tokens_wkf : #tokens_wkf + token_mismatch --> end + error_eof --> end + end --> [*] +``` + +**`InitConceptParsing`**: +- Verifies the number of already-collected parameters is sufficient +- Removes the first token of segment 0 (already consumed by `ReadTokens`) +- Applies SYA: pushes the concept onto the stack + +**`ReadConcept`**: reads the fixed tokens of the current segment one by one. +If all match → `pop(0)` the segment and continue. + +**`ReadParameters`**: reads ONE token into the buffer. Returns to +`ManageUnrecognized` which tries to recognize it via `SimpleConceptsParser`. + +**`FinalizeConceptParsing`**: +- Pops the concept from the stack +- Computes `start` (from the first parameter) and `end` (current position) +- Creates `MetadataToken(concept.metadata, start, end, resolution_method, "sya")` +- Clears stack and parameters +- Returns to `#tokens_wkf` + +--- + +## Step-by-step example — `"1 plus 2"` + +Concept: `a plus b` (variables `a`, `b`). + +**Tokens:** +``` +pos : 0 1 2 3 4 5 +tok : "1" " " "plus" " " "2" EOF +``` + +**`expected` for this concept:** +``` +[([" ", "plus", " "], 1), ([], 1)] + segment 0 → 1 param before, read " plus " + segment 1 → 1 param before, read nothing (concept ends with a param) +``` + +**Execution trace:** + +``` +PrepareReadTokens → buffer_start_pos = 0 + +ReadTokens "1" → no concept, buffer = ["1"] +ReadTokens " " → no concept, buffer = ["1", " "] +ReadTokens "plus" → concept "a plus b" found! + + ┌── FORK ─────────────────────────────────────────────────────┐ + │ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │ + └─────────────────────────────────────────────────────────────┘ + +ManageUnrecognized("concepts found") + buffer = ["1"," "] → SimpleConceptsParser → not found + parameters = [UnrecognizedToken("1 ", start=0, end=1)] + buffer_start_pos = 3 + → #concept_wkf + +InitConceptParsing + expected[0] = ([" ","plus"," "], 1) + need 1 param → have 1 ✓ + strip leading WS → ["plus"," "] + pop "plus" (already consumed) → [" "] + SYA: stack = [CTR(a_plus_b)] + +ManageUnrecognized("manage parameters"): buffer empty → nothing + +ReadConcept: reads [" "] → pos 3 = " " ✓ + expected.pop(0) → remaining = [([], 1)] + → "read parameters" + +ReadParameters: reads "2" at pos 4 + buffer = ["2"] + → "manage parameters" + +ManageUnrecognized("manage parameters") + buffer = ["2"] → not a concept + parameters = [UT("1 ", 0, 1), UT("2", 3, 3)] + buffer_start_pos = 5 + +ReadConcept: expected = [([], 1)], reads 0 tokens + expected.pop(0) → empty → "finalize concept" + +FinalizeConceptParsing + concept = stack.pop() = CTR(a_plus_b) + start = parameters[0].start = 0 + end = parser_input.pos = 4 + result.append(MetadataToken(metadata, 0, 4, "key", "sya")) + → #tokens_wkf + +ReadTokens → EOF → ManageUnrecognized("eof") → end +``` + +**Result:** +``` +MultipleChoices([ + [MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")] +]) +``` + +--- + +## Example — sequence `"1 plus 2 3 plus 7"` + +Same concept `a plus b`. The parser recognizes two concepts in one pass. + +``` +pos : 0 1 2 3 4 5 6 7 8 9 10 11 +tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF +``` + +After `FinalizeConceptParsing` for the first concept (pos=4), `#tokens_wkf` restarts: + +``` +PrepareReadTokens → buffer_start_pos = 5 +ReadTokens " " → buffer = [" "] +ReadTokens "3" → buffer = [" ","3"] +ReadTokens " " → buffer = [" ","3"," "] +ReadTokens "plus" → fork + +ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9 +... +FinalizeConceptParsing + start = 5, end = 10 + result.append(MetadataToken(1001, 5, 10, "key", "sya")) +``` + +**Final result (one path, two concepts):** +``` +MultipleChoices([ + [ + MetadataToken(1001, start=0, end=4, parser="sya"), + MetadataToken(1001, start=5, end=10, parser="sya"), + ] +]) +``` + +--- + +## Future example — composition `"1 plus 2 times 3"` + +> **Note:** this example requires implementing `must_pop()`. +> Currently `must_pop()` always returns `False`. + +Concepts: `a plus b` (low precedence), `a times b` (high precedence). + +**Expected behavior after implementation:** + +``` +Expression: 1 plus 2 times 3 + +SYA with precedence times > plus: + +Token "1" → parameters = [1] stack = [] +Token "plus" → stack = [plus] parameters = [1] +Token "2" → parameters = [1, 2] stack = [plus] +Token "times" → prec(times) > prec(plus) → no pop + stack = [plus, times] parameters = [1, 2] +Token "3" → parameters = [1, 2, 3] stack = [plus, times] + +Finalize: + pop "times" → MetadataToken(times, params=[2, 3]) + pop "plus" → MetadataToken(plus, params=[1, times_result]) +``` + +**What `must_pop()` must implement:** +```python +def must_pop(self, current_concept, top_of_stack_concept): + return precedence(top_of_stack_concept) >= precedence(current_concept) +``` + +Without this rule, both concepts are processed left-to-right with equal precedence, +yielding `(1 plus 2) times 3` instead of `1 plus (2 times 3)`. + +--- + +## The `expected` structure in detail + +For concept `if a then b end` (key `"if __var__0 then __var__1 end"`): + +``` +_get_expected_tokens("if __var__0 then __var__1 end") + +→ [ + (["if", " "], 0), # read "if " before 1st param + ([" ", "then", " "], 1), # read " then " before 2nd param + ([" ", "end"], 1), # read " end" — 1 param before, no trailing param + ] +``` + +During parsing, `expected` is **modified in place**: +- `InitConceptParsing` removes the first token of segment 0 (already read by `ReadTokens`) +- `ReadConcept` consumes the tokens of the current segment then calls `pop(0)` +- When `expected` is empty → `FinalizeConceptParsing` + +--- + +## Key data structures + +### `StateMachineContext` + +``` +StateMachineContext +├── parser_input ParserInput token stream + cursor +├── other_parsers [SimpleConceptsParser] +├── buffer list[Token] tokens pending classification +├── buffer_start_pos int start position of the current buffer +├── concept_to_recognize ConceptToRecognize | None +├── stack list[CTR] SYA — operator stack +├── parameters list[UT|CT] SYA — output queue +├── result list[MetadataToken] +└── errors list +``` + +### `MetadataToken` (output) + +``` +MetadataToken +├── metadata ConceptMetadata (id, name, key, variables, ...) +├── start int position of the first token of the expression +├── end int position of the last token +├── resolution_method "key" | "name" | "id" +└── parser "sya" +``` + +### Token positions in `"1 plus 2"`: + +``` +"1 plus 2" + 0 1 2 3 4 + │ │ │ │ │ + 1 _ plus _ 2 + +MetadataToken: start=0, end=4 +``` + +--- + +## Differences vs `SimpleConceptsParser` + +| | `SimpleConceptsParser` | `SyaConceptsParser` | +|---|---|---| +| Target concepts | No parameters | With parameters | +| `concept_wkf` states | 2 | 8 | +| `result` contents | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` only | +| Parameters | N/A | Collected in `parameters` list | +| Parser tag | `"simple"` | `"sya"` | +| SYA | No | Yes (precedence to implement) | + +--- + +## Error handling + +| Error | Cause | State reached | +|---|---|---| +| `UnexpectedToken` | Read token ≠ expected concept token | `TokenMismatch` → `end` | +| `UnexpectedEof` | Input ends before concept is complete | `ErrorEof` → `end` | +| `NotEnoughParameters` | Too few params before a segment | Exception raised | + +Errors are collected from **all paths** and forwarded to `error_sink` in `parse()`. +A path with errors is excluded from `_select_best_paths`. + +--- + +## Known limitations and proposed improvements + +The current implementation correctly handles simple cases (single-token parameters, +non-nested concepts). The following issues must be addressed before enabling +precedence and real concept composition. + +### 1. Parameters are limited to a single token + +`ReadParameters` reads ONE token, then immediately calls `ManageUnrecognized`, which +returns to `ReadConcept` to match the next keyword segment. Multi-token parameters +therefore fail. For `if hello world then foo end` with parameter `a = "hello world"`: + +``` +ReadParameters reads "hello" +ManageUnrecognized → UT("hello") → ReadConcept tries to match " then " +ReadConcept reads " " ✓ then "world" ≠ "then" → MISMATCH +``` + +**Proposed fix:** `ReadParameters` should accumulate tokens until it detects the +start of the next keyword segment (lookahead on `expected[0][0]`), then hand the +full buffer to `ManageUnrecognized` for parsing in one pass. + +--- + +### 2. Flat `parameters` list with no arity tracking + +When `FinalizeConceptParsing` runs, `parameters` is a flat list. There is no +information about how many parameters belong to each concept on the stack. Once +`must_pop` is active and multiple concepts are stacked, `FinalizeConceptParsing` +cannot reconstruct the correct nesting. + +Example: `1 plus 2 times 3` with `stack = [plus, times]` and +`parameters = [UT("1"), UT("2"), UT("3")]`. Without arity information there is no +way to determine that `times` consumes the last two parameters and `plus` consumes +the first one and the result of `times`. + +The arity of each concept (`nb_variables`) is available in `expected` at push time +but is lost once `expected` is consumed during parsing. + +**Proposed fix:** record the arity of each concept when it is pushed onto the stack +(in `apply_shunting_yard_algorithm`). `FinalizeConceptParsing` then pops the correct +number of parameters for each concept, from innermost to outermost, building +intermediate `MetadataToken` objects that are re-injected into `parameters` as +`ConceptToken` before processing the next concept on the stack. + +--- + +### 3. Type mismatch in `ManageUnrecognized` for recognized parameters + +When `SimpleConceptsParser` recognizes a token sequence, `ManageUnrecognized` +creates: + +```python +state_context.parameters.append( + ConceptToken(res.items[0], buffer_start_pos, parser_input.pos - 1) +) +``` + +`res.items[0]` is a `list[MetadataToken]` (one complete path from +`SimpleConceptsParser`), but `ConceptToken.concept` is typed as `Concept`. Any +downstream code that uses this `ConceptToken` will receive a list where it expects a +`Concept` instance. + +**Proposed fix:** define a dedicated container for a recognized parameter (e.g. +`ParsedParameterToken`) that wraps a `list[MetadataToken]` with start/end positions, +or flatten the result to a single `MetadataToken` when `res.items[0]` contains +exactly one token. + +--- + +### 4. Variable-to-parameter mapping not applied + +`FinalizeConceptParsing` creates a `MetadataToken` without populating the concept's +variables. `parameters = [UT("1 "), UT("2")]` maps positionally to +`variables = [("a", NotInit), ("b", NotInit)]`, but this mapping is never applied. +The produced `MetadataToken` is therefore incomplete: a downstream evaluator has no +way to retrieve parameter values from the token alone. + +**Proposed fix:** in `FinalizeConceptParsing`, zip `parameters` with +`concept.metadata.variables` and store the result in the `MetadataToken`'s metadata, +or pass it as a dedicated field. + +--- + +### 5. `SyaConceptsParser` absent from `other_parsers` + +`other_parsers = [SimpleConceptsParser()]`. A parameter can be a simple (parameter- +less) concept, but never a composite concept with parameters. True composition — +where a parameter is itself a SYA-parsed concept — is structurally impossible with +the current design. + +**Proposed fix:** add `SyaConceptsParser` to `other_parsers`. A guard is required +to prevent infinite recursion: the nested instance should exclude the concept +currently being recognized from its search space. + +--- + +### Priority order + +| # | Issue | Blocking | +|---|---|---| +| 1 | Multi-token parameters | Practical usability | +| 2 | `ConceptToken` type mismatch | Correctness | +| 3 | Variable-to-parameter mapping | Evaluation pipeline | +| 4 | Arity not tracked on the stack | `must_pop` / precedence | +| 5 | `SyaConceptsParser` absent from `other_parsers` | Real composition | + +Issues 3 and 4 are interdependent with `must_pop`: implementing them independently +(before activating precedence) is still valuable and lays the correct foundation. diff --git a/requirements.txt b/requirements.txt index 9345c50..67b9061 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,10 @@ annotated-doc==0.0.4 annotated-types==0.7.0 anyio==4.13.0 -bcrypt==5.0.0 +argon2-cffi==25.1.0 +argon2-cffi-bindings==25.1.0 certifi==2026.2.25 +cffi==2.0.0 charset-normalizer==3.4.7 click==8.3.2 ecdsa==0.19.2 @@ -13,10 +15,10 @@ httpx==0.28.1 idna==3.11 iniconfig==2.3.0 packaging==26.0 -passlib==1.7.4 pluggy==1.6.0 prompt_toolkit==3.0.52 pyasn1==0.6.3 +pycparser==3.0 pydantic==2.12.5 pydantic_core==2.41.5 Pygments==2.20.0 diff --git a/src/evaluators/RecognizeSimpleConcept.py b/src/evaluators/RecognizeSimpleConcept.py index 0f68de4..a45b018 100644 --- a/src/evaluators/RecognizeSimpleConcept.py +++ b/src/evaluators/RecognizeSimpleConcept.py @@ -28,7 +28,8 @@ class RecognizeSimpleConcept(OneReturnValueEvaluator): parser_input = return_value.value.body parser_input.reset() - parsed = self.parser.parse(context, parser_input) + error_sink = [] + parsed = self.parser.parse(context, parser_input, error_sink) if len(parsed.items) == 0: not_for_me = ReturnValue(self.NAME, False, NotForMe(self.NAME, return_value.value)) diff --git a/src/parsers/SyaConceptsParser.py b/src/parsers/SyaConceptsParser.py index 07f0b3a..b8beb5d 100644 --- a/src/parsers/SyaConceptsParser.py +++ b/src/parsers/SyaConceptsParser.py @@ -5,9 +5,8 @@ from parsers.BaseParser import BaseParser from parsers.ParserInput import ParserInput from parsers.SimpleConceptsParser import SimpleConceptsParser from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens -from parsers.state_machine import ConceptToRecognize, ConceptToken, End, PrepareReadTokens, ReadTokens, Start, State, \ - StateMachine, \ - StateMachineContext, StateResult, UnrecognizedToken +from parsers.state_machine import ConceptToRecognize, ConceptToken, End, MetadataToken, PrepareReadTokens, ReadTokens, \ + Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken from parsers.tokenizer import Token, TokenKind, Tokenizer @@ -99,6 +98,7 @@ class ReadParameters(State): return StateResult("finalize concept") state_context.buffer.append(state_context.parser_input.token) + return StateResult(self.next_states[0]) class ManageUnrecognized(State): @@ -138,23 +138,47 @@ class ManageUnrecognized(State): class TokenMismatch(State): """ - When we realize that we are not parsing the correct concept + When we realize that we are not parsing the correct concept. + The path ends without adding anything to the result. """ - pass + + def run(self, state_context) -> StateResult: + return StateResult(self.next_states[0]) class ErrorEof(State): """ - When EOF (end of file) detected before successfully parsing the concept + When EOF is detected before the concept is fully parsed. + The path ends without adding anything to the result. """ - pass + + def run(self, state_context) -> StateResult: + return StateResult(self.next_states[0]) class FinalizeConceptParsing(State): """ - The concept is fully parsed. Let's wrap up + The concept is fully parsed. + Pops the concept from the stack, builds a MetadataToken from it and its + collected parameters, appends it to the result, then returns to the + tokens workflow to continue parsing the rest of the input. """ - pass + + def run(self, state_context) -> StateResult: + concept = state_context.stack.pop() + + start = state_context.parameters[0].start if state_context.parameters \ + else state_context.buffer_start_pos + end = state_context.parser_input.pos + + state_context.result.append( + MetadataToken(concept.metadata, start, end, concept.resolution_method, "sya") + ) + + state_context.stack.clear() + state_context.parameters.clear() + + return StateResult(self.next_states[0]) class SyaConceptsParser(BaseParser): @@ -180,11 +204,10 @@ class SyaConceptsParser(BaseParser): Start("start", next_states=["init concept parsing"]), InitConceptParsing("init concept parsing", ["manage parameters"]), ManageUnrecognized("manage parameters", next_states=["read concept"]), - ReadConcept("read concept", next_states=["finalize concept", "eof", "wrong concept", "read parameters"]), - ReadParameters("read parameters", next_states=["manage parameters", "eof"]), - ManageUnrecognized("eof", next_states=["end"]), + ReadConcept("read concept", next_states=["finalize concept", "error eof", "token mismatch", "read parameters"]), + ReadParameters("read parameters", next_states=["manage parameters", "finalize concept"]), FinalizeConceptParsing("finalize concept", next_states=["#tokens_wkf"]), - ErrorEof("eof", ["end"]), + ErrorEof("error eof", ["end"]), TokenMismatch("token mismatch", ["end"]), End("end", next_states=None) } @@ -266,6 +289,45 @@ class SyaConceptsParser(BaseParser): for m in context.sheerka.get_metadatas_from_first_token("key", token.value) if m.definition_type == DefinitionType.DEFAULT and len(m.parameters) > 0] + def _select_best_paths(self, sm) -> list: + """Returns the result lists of the highest-scoring error-free paths. + + Args: + sm: The StateMachine after execution. + + Returns: + A list of result lists, one per best-scoring path. + """ + selected = [] + best_score = 1 + for path in sm.paths: + if path.execution_context.errors: + continue + score = self._compute_path_score(path) + if score > best_score: + selected.clear() + selected.append(path.execution_context.result) + best_score = score + elif score == best_score: + selected.append(path.execution_context.result) + return selected + + @staticmethod + def _compute_path_score(path) -> int: + """Scores a path by the total token span covered by MetadataTokens. + + Args: + path: An ExecutionPath whose result is a list of MetadataToken. + + Returns: + Integer score. + """ + return sum( + token.end - token.start + 1 + for token in path.execution_context.result + if isinstance(token, MetadataToken) + ) + def parse(self, context, parser_input, error_sink): sm = StateMachine(self.workflows) sm_context = StateMachineContext(context, @@ -274,4 +336,9 @@ class SyaConceptsParser(BaseParser): [SimpleConceptsParser()]) sm.run("#tokens_wkf", "start", sm_context) - error_sink.extend(sm_context.errors) + selected = self._select_best_paths(sm) + + for path in sm.paths: + error_sink.extend(path.execution_context.errors) + + return MultipleChoices(selected) diff --git a/src/parsers/state_machine.py b/src/parsers/state_machine.py index 466ca8d..15f45a0 100644 --- a/src/parsers/state_machine.py +++ b/src/parsers/state_machine.py @@ -10,317 +10,317 @@ from parsers.tokenizer import Token @dataclass class MetadataToken: - """ - When a concept definition is recognized - We keep track of the start and the end position - MetadataToken is a shortcut for ConceptMetadataToken - """ - metadata: ConceptMetadata # concept that is recognized - start: int # start position in the texts - end: int # end position - resolution_method: Literal["name", "key", "id"] # did we use the name, the id or the key to recognize the concept - parser: str # which parser recognized the concept (SimpleConcepts, Sya, ...) - - def __repr__(self): - return f"(MetadataToken metadata={str_concept(self.metadata, drop_name=True)}, " + \ - f"start={self.start}, end={self.end}, method={self.resolution_method}, origin={self.parser})" - - def __eq__(self, other): - if not isinstance(other, MetadataToken): - return False - - return self.metadata.id == other.metadata.id \ - and self.start == other.start \ - and self.end == other.end \ - and self.parser == other.parser - - def __hash__(self): - return hash((self.metadata.id, self.start, self.end, self.parser)) + """ + When a concept definition is recognized + We keep track of the start and the end position + MetadataToken is a shortcut for ConceptMetadataToken + """ + metadata: ConceptMetadata # concept that is recognized + start: int # start position in the texts + end: int # end position + resolution_method: Literal["name", "key", "id"] # did we use the name, the id or the key to recognize the concept + parser: str # which parser recognized the concept (SimpleConcepts, Sya, ...) + + def __repr__(self): + return f"(MetadataToken metadata={str_concept(self.metadata, drop_name=True)}, " + \ + f"start={self.start}, end={self.end}, method={self.resolution_method}, origin={self.parser})" + + def __eq__(self, other): + if not isinstance(other, MetadataToken): + return False + + return self.metadata.id == other.metadata.id \ + and self.start == other.start \ + and self.end == other.end \ + and self.parser == other.parser + + def __hash__(self): + return hash((self.metadata.id, self.start, self.end, self.parser)) @dataclass class UnrecognizedToken: - """ - Class that represents a text that is not recognized (yet) - We keep track of the start and the end position - """ - buffer: str - start: int - end: int + """ + Class that represents a text that is not recognized (yet) + We keep track of the start and the end position + """ + buffer: str + start: int + end: int @dataclass class ConceptToken: - """ - When an already defined concept is found during the parsing - We keep track of the start and the end position - """ - concept: Concept - start: int # start position in the texts - end: int # end position + """ + When an already defined concept is found during the parsing + We keep track of the start and the end position + """ + concept: Concept + start: int # start position in the texts + end: int # end position @dataclass class StateResult: - next_state: str | None - forks: list = None + next_state: str | None + forks: list = None @dataclass class ConceptToRecognize: - """ - Holds information about the concept to recognize - During the parsing, we have a hint on a concept, But we need to finish the parsing to make sure that we are right - """ - metadata: ConceptMetadata - expected: list[tuple] - resolution_method: Literal["name", "key", "id"] # which attribute was used to resolve the concept - - def __repr__(self): - return f"ConceptToRecognize(#{self.metadata.id}, expected={self.expected})" + """ + Holds information about the concept to recognize + During the parsing, we have a hint on a concept, But we need to finish the parsing to make sure that we are right + """ + metadata: ConceptMetadata + expected: list[tuple] + resolution_method: Literal["name", "key", "id"] # which attribute was used to resolve the concept + + def __repr__(self): + return f"ConceptToRecognize(#{self.metadata.id}, expected={self.expected})" @dataclass class StateMachineContext: + """ + Internal state of a state machine + """ + # initialization + context: ExecutionContext + parser_input: ParserInput + get_metadata_from_first_token: Any # This is a callback that gives the possible concepts, for a token + other_parsers: list # parsers to call when managing unrecognized tokens + + # attributes used when parsing token + # tokens currently being read + buffer: list[Token] = field(default_factory=list) + buffer_start_pos: int = -1 + + # attributes used when parsing concept + # parameters already recognized + Concept under recognition + concept_to_recognize: ConceptToRecognize | None = None + stack: list = field(default_factory=list) + parameters: list = field(default_factory=list) # it is called 'output' in shunting yard explanations + + # runtime info + result: list = field(default_factory=list) # list of tokens found + errors: list = field(default_factory=list) # error sink + + def get_clones(self, concepts_to_recognize): """ - Internal state of a state machine + Helper function that clone the context when multiple concepts are found + :param concepts_to_recognize: + :return: """ - # initialization - context: ExecutionContext - parser_input: ParserInput - get_metadata_from_first_token: Any # This is a callback that gives the possible concepts, for a token - other_parsers: list # parsers to call when managing unrecognized tokens - - # attributes used when parsing token - # tokens currently being read - buffer: list[Token] = field(default_factory=list) - buffer_start_pos: int = -1 - - # attributes used when parsing concept - # parameters already recognized + Concept under recognition - concept_to_recognize: ConceptToRecognize | None = None - stack: list = field(default_factory=list) - parameters: list = field(default_factory=list) # it is called 'output' in shunting yard explanations - - # runtime info - result: list = field(default_factory=list) # list of tokens found - errors: list = field(default_factory=list) # error sink - - def get_clones(self, concepts_to_recognize): - """ - Helper function that clone the context when multiple concepts are found - :param concepts_to_recognize: - :return: - """ - return [StateMachineContext(self.context, - self.parser_input.clone(), - self.get_metadata_from_first_token, - self.other_parsers, - self.buffer.copy(), - self.buffer_start_pos, - concept, - self.stack.copy(), - self.parameters.copy(), - self.result.copy(), - self.errors.copy()) - for concept in concepts_to_recognize] - - def to_debug(self): - return {"pos": self.parser_input.pos, - "token": self.parser_input.token, - "buffer": [token.value for token in self.buffer], - "concept": str_concept(self.concept_to_recognize.metadata) if self.concept_to_recognize else None, - "result": self.result.copy()} + return [StateMachineContext(self.context, + self.parser_input.clone(), + self.get_metadata_from_first_token, + self.other_parsers, + self.buffer.copy(), + self.buffer_start_pos, + concept, + self.stack.copy(), + self.parameters.copy(), + self.result.copy(), + self.errors.copy()) + for concept in concepts_to_recognize] + + def to_debug(self): + return {"pos": self.parser_input.pos, + "token": self.parser_input.token, + "buffer": [token.value for token in self.buffer], + "concept": str_concept(self.concept_to_recognize.metadata) if self.concept_to_recognize else None, + "result": self.result.copy()} class State: - def __init__(self, name, next_states): - self.name = name - self.next_states = next_states - - def run(self, state_context: StateMachineContext) -> StateResult: - pass - - @staticmethod - def get_forks(next_state, states_contexts: list[StateMachineContext]): - """ - Create on fork item for every state context - :param next_state: - :type next_state: - :param states_contexts: - :type states_contexts: - :return: - :rtype: - """ - return [(next_state, state_context) for state_context in states_contexts] - - def __repr__(self): - return f"(State '{self.name}' -> {self.next_states})" + def __init__(self, name, next_states): + self.name = name + self.next_states = next_states + + def run(self, state_context: StateMachineContext) -> StateResult: + pass + + @staticmethod + def get_forks(next_state, states_contexts: list[StateMachineContext]): + """ + Create on fork item for every state context + :param next_state: + :type next_state: + :param states_contexts: + :type states_contexts: + :return: + :rtype: + """ + return [(next_state, state_context) for state_context in states_contexts] + + def __repr__(self): + return f"(State '{self.name}' -> {self.next_states})" class Start(State): - def run(self, state_context) -> StateResult: - # Start state - # give some logs and ask for the next state - return StateResult(self.next_states[0]) - - def __repr__(self): - return f"(StartState '{self.name}' -> '{self.next_states[0]}')" + def run(self, state_context) -> StateResult: + # Start state + # give some logs and ask for the next state + return StateResult(self.next_states[0]) + + def __repr__(self): + return f"(StartState '{self.name}' -> '{self.next_states[0]}')" class PrepareReadTokens(State): - def run(self, state_context: StateMachineContext) -> StateResult: - state_context.buffer.clear() - state_context.buffer_start_pos = state_context.parser_input.pos + 1 - return StateResult(self.next_states[0]) + def run(self, state_context: StateMachineContext) -> StateResult: + state_context.buffer.clear() + state_context.buffer_start_pos = state_context.parser_input.pos + 1 + return StateResult(self.next_states[0]) class ReadTokens(State): - def run(self, state_context) -> StateResult: - if not state_context.parser_input.next_token(False): - return StateResult("eof") - - # try to get the possible concepts to recognize - concepts = state_context.get_metadata_from_first_token(state_context.context, - state_context.parser_input.token) - - forks = self.get_forks("concepts found", state_context.get_clones(concepts)) if concepts else None - - state_context.buffer.append(state_context.parser_input.token) - return StateResult(self.name, forks) + def run(self, state_context) -> StateResult: + if not state_context.parser_input.next_token(False): + return StateResult("eof") + + # try to get the possible concepts to recognize + concepts = state_context.get_metadata_from_first_token(state_context.context, + state_context.parser_input.token) + + forks = self.get_forks("concepts found", state_context.get_clones(concepts)) if concepts else None + + state_context.buffer.append(state_context.parser_input.token) + return StateResult(self.name, forks) class End(State): - def run(self, state_context) -> StateResult: - return StateResult(None) - - def __repr__(self): - return f"(EndState '{self.name}')" + def run(self, state_context) -> StateResult: + return StateResult(None) + + def __repr__(self): + return f"(EndState '{self.name}')" @dataclass class ExecutionPathHistory: - from_state: str - execution_context_debug: dict - to_state: str = "" - forks: list[tuple] = None - parents: list = None - - def clone(self, parent_path_id): - parents = self.parents.copy() if self.parents else [] - parents.append(parent_path_id) - return ExecutionPathHistory(self.from_state, - self.execution_context_debug.copy(), - self.to_state, - self.forks.copy() if self.forks else None, - parents) - - def __repr__(self): - return "History(from '{0}', to '{1}', using {2}, forks={3}, parents={4}".format( - self.from_state, - self.to_state, - self.execution_context_debug, - len(self.forks) if self.forks else 0, - self.parents) + from_state: str + execution_context_debug: dict + to_state: str = "" + forks: list[tuple] = None + parents: list = None + + def clone(self, parent_path_id): + parents = self.parents.copy() if self.parents else [] + parents.append(parent_path_id) + return ExecutionPathHistory(self.from_state, + self.execution_context_debug.copy(), + self.to_state, + self.forks.copy() if self.forks else None, + parents) + + def __repr__(self): + return "History(from '{0}', to '{1}', using {2}, forks={3}, parents={4}".format( + self.from_state, + self.to_state, + self.execution_context_debug, + len(self.forks) if self.forks else 0, + self.parents) @dataclass class ExecutionPath: - path_id: int - execution_context: Any - current_workflow: str - current_state: str - - history: list[ExecutionPathHistory] - ended: bool = False - - def clone(self, path_id, new_execution_path, new_workflow, new_state): - return ExecutionPath(path_id, - new_execution_path, - new_workflow, - new_state, - [h.clone(self.path_id) for h in self.history], - self.ended) - - def __repr__(self): - return f"(Path id={self.path_id}, workflow='{self.current_workflow}', state='{self.current_state}')" - - def get_audit_trail(self): - return [h.from_state for h in self.history] + path_id: int + execution_context: Any + current_workflow: str + current_state: str + + history: list[ExecutionPathHistory] + ended: bool = False + + def clone(self, path_id, new_execution_path, new_workflow, new_state): + return ExecutionPath(path_id, + new_execution_path, + new_workflow, + new_state, + [h.clone(self.path_id) for h in self.history], + self.ended) + + def __repr__(self): + return f"(Path id={self.path_id}, workflow='{self.current_workflow}', state='{self.current_state}')" + + def get_audit_trail(self): + return [h.from_state for h in self.history] class StateMachine: - - def __init__(self, workflows): - self.workflows = workflows - self.paths = None - self.last_path_id = -1 - - def run(self, workflow_name: str, state_name: str, execution_context): - """ - Run the workflow from the state given in parameter - :param workflow_name: - :type workflow_name: - :param state_name: - :type state_name: - :param execution_context: - :type execution_context: - :return: - :rtype: - """ - self.last_path_id = -1 # reset the path ids - self.paths = [ExecutionPath(self._get_new_path_id(), - execution_context, - workflow_name, - state_name, - [], - False)] - - while True: - to_review = [p for p in self.paths if not p.ended] - if len(to_review) == 0: - break - - for path in to_review: - # add traceability - history = ExecutionPathHistory(f"{path.current_workflow}:{path.current_state}", - path.execution_context.to_debug()) - path.history.append(history) - - current_state = self.workflows[path.current_workflow][path.current_state] - res = current_state.run(path.execution_context) - - if res.next_state is None: - path.ended = True - continue # not possible to fork ! - - path.current_workflow, path.current_state = self._compute_next_workflow_and_state(path.current_workflow, - res.next_state) - - # update traceability - history.to_state = f"{path.current_workflow}:{path.current_state}" - - # add forks - if res.forks: - new_paths = [] - for next_state, next_execution_context in res.forks: - next_workflow, next_state = self._compute_next_workflow_and_state(path.current_workflow, - next_state) - new_paths.append(path.clone(self._get_new_path_id(), - next_execution_context, - next_workflow, - next_state)) - - self.paths.extend(new_paths) - history.forks = [p.path_id for p in new_paths] - - def _get_new_path_id(self): - self.last_path_id += 1 - return self.last_path_id - - @staticmethod - def _compute_next_workflow_and_state(workflow, state): - if state.startswith("#"): - return state, "start" - else: - return workflow, state + + def __init__(self, workflows): + self.workflows = workflows + self.paths = None + self.last_path_id = -1 + + def run(self, workflow_name: str, state_name: str, execution_context): + """ + Run the workflow from the state given in parameter + :param workflow_name: + :type workflow_name: + :param state_name: + :type state_name: + :param execution_context: + :type execution_context: + :return: + :rtype: + """ + self.last_path_id = -1 # reset the path ids + self.paths = [ExecutionPath(self._get_new_path_id(), + execution_context, + workflow_name, + state_name, + [], + False)] + + while True: + to_review = [p for p in self.paths if not p.ended] + if len(to_review) == 0: + break + + for path in to_review: + # add traceability + history = ExecutionPathHistory(f"{path.current_workflow}:{path.current_state}", + path.execution_context.to_debug()) + path.history.append(history) + + current_state = self.workflows[path.current_workflow][path.current_state] + res = current_state.run(path.execution_context) + + if res.next_state is None: + path.ended = True + continue # not possible to fork ! + + path.current_workflow, path.current_state = self._compute_next_workflow_and_state(path.current_workflow, + res.next_state) + + # update traceability + history.to_state = f"{path.current_workflow}:{path.current_state}" + + # add forks + if res.forks: + new_paths = [] + for next_state, next_execution_context in res.forks: + next_workflow, next_state = self._compute_next_workflow_and_state(path.current_workflow, + next_state) + new_paths.append(path.clone(self._get_new_path_id(), + next_execution_context, + next_workflow, + next_state)) + + self.paths.extend(new_paths) + history.forks = [p.path_id for p in new_paths] + + def _get_new_path_id(self): + self.last_path_id += 1 + return self.last_path_id + + @staticmethod + def _compute_next_workflow_and_state(workflow, state): + if state.startswith("#"): + return state, "start" + else: + return workflow, state diff --git a/src/server/authentication.py b/src/server/authentication.py index 2e5cb31..9489a0c 100644 --- a/src/server/authentication.py +++ b/src/server/authentication.py @@ -1,9 +1,10 @@ from datetime import datetime, timedelta +from argon2 import PasswordHasher +from argon2.exceptions import VerifyMismatchError from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt -from passlib.context import CryptContext from pydantic import BaseModel from starlette import status @@ -13,7 +14,7 @@ SECRET_KEY = "af95f0590411260f1f127bd7ef9a03409aecadf7729b3e6822b11752433b97b5" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 1 -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +_ph = PasswordHasher() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") fake_users_db = { @@ -22,7 +23,7 @@ fake_users_db = { "firstname": "Kodjo", "lastname": "Sossouvi", "email": "kodjo.sossouvi@gmail.com", - "hashed_password": "$2b$12$fb9jW7QUZ9KIEAAtVmWMEOGtehKy9FafUr7Zfrsb3ZMhsBbzZs7SC", # password is kodjo + "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$77SEG+Po+keKEOY01WNFzQ$J0jJ/XcwIHOsM+uB8/eeoaukZBF1zXtGVPmNHA6c+p4", # password is kodjo "disabled": False, }, } @@ -52,15 +53,16 @@ class UserInDB(User): hashed_password: str -def get_password_hash(password: str): +def get_password_hash(password: str) -> str: + """Hash the password using Argon2id. + + Args: + password: The plaintext password to hash. + + Returns: + The argon2id hash string. """ - Hash the password - :param password: - :type password: - :return: - :rtype: - """ - return pwd_context.hash(password) + return _ph.hash(password) def get_user(db, username: str): @@ -74,7 +76,9 @@ def authenticate_user(fake_db, username: str, password: str): if not user: return False - if not pwd_context.verify(password, user.hashed_password): + try: + _ph.verify(user.hashed_password, password) + except VerifyMismatchError: return False return user diff --git a/tests/conftest.py b/tests/conftest.py index 29e62cb..fdb061f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,103 +12,103 @@ DEFAULT_ONTOLOGY_NAME = "current_test_" @pytest.fixture(scope="session") def sheerka(): - from core.Sheerka import Sheerka - - sheerka = Sheerka() - sheerka.initialize("mem://") - return sheerka + from core.Sheerka import Sheerka + + sheerka = Sheerka() + sheerka.initialize("mem://") + return sheerka @pytest.fixture(scope="module", autouse=True) def on_new_module(sheerka, request): - """ - For each new module, make sure to create a new ontology - Remove it at the end of the module - :param sheerka: - :type sheerka: - :param request: - :type request: - :return: - :rtype: - """ - from core.Event import Event - from core.ExecutionContext import ExecutionContext, ContextActions - module_name = request.module.__name__.split(".")[-1] - context = ExecutionContext("test", - Event(message=f"Executing module {module_name}"), - sheerka, - ContextActions.TESTING, - None) - - ontology = sheerka.om.push_ontology(module_name) - yield - sheerka.om.revert_ontology(context, ontology) + """ + For each new module, make sure to create a new ontology + Remove it at the end of the module + :param sheerka: + :type sheerka: + :param request: + :type request: + :return: + :rtype: + """ + from core.Event import Event + from core.ExecutionContext import ExecutionContext, ContextActions + module_name = request.module.__name__.split(".")[-1] + context = ExecutionContext("test", + Event(message=f"Executing module {module_name}"), + sheerka, + ContextActions.TESTING, + None) + + ontology = sheerka.om.push_ontology(module_name) + yield + sheerka.om.revert_ontology(context, ontology) @pytest.fixture(scope="function") def context(sheerka): - from core.Event import Event - from core.ExecutionContext import ExecutionContext, ContextActions - - return ExecutionContext("test", - Event(message=""), - sheerka, - ContextActions.TESTING, - None) + from core.Event import Event + from core.ExecutionContext import ExecutionContext, ContextActions + + return ExecutionContext("test", + Event(message=""), + sheerka, + ContextActions.TESTING, + None) @pytest.fixture() def next_id(): - return GetNextId() + return GetNextId() @pytest.fixture() def user(): - return User(username="johan doe", email="johan.doe@sheerka.com", firstname="johan", lastname="doe") + return User(username="johan doe", email="johan.doe@sheerka.com", firstname="johan", lastname="doe") class TestUsingFileBasedSheerka: - @pytest.fixture(scope="class") - def sheerka(self): - sheerka = Sheerka() - sheerka.initialize() - return sheerka + @pytest.fixture(scope="class") + def sheerka(self): + sheerka = Sheerka() + sheerka.initialize() + return sheerka class NewOntology: - """ - For some test who may need to declare the same concepts across the tests - """ - from core.ExecutionContext import ExecutionContext - - def __init__(self, context: ExecutionContext, name=None): - self.sheerka = context.sheerka - self.context = context - self.name = name - self.ontology = None - - if self.name is None: - self.name = inspect.stack()[1][3] - - def __enter__(self): - self.ontology = self.sheerka.om.push_ontology(self.name) - return self.ontology - - def __exit__(self, exc_type, exc_val, exc_tb): - self.sheerka.om.revert_ontology(self.context, self.ontology) - return False + """ + For some test who may need to declare the same concepts across the tests + """ + from core.ExecutionContext import ExecutionContext + + def __init__(self, context: ExecutionContext, name=None): + self.sheerka = context.sheerka + self.context = context + self.name = name + self.ontology = None + + if self.name is None: + self.name = inspect.stack()[1][3] + + def __enter__(self): + self.ontology = self.sheerka.om.push_ontology(self.name) + return self.ontology + + def __exit__(self, exc_type, exc_val, exc_tb): + self.sheerka.om.revert_ontology(self.context, self.ontology) + return False def simple_token_compare(a, b): - return a.type == b.type and a.value == b.value + return a.type == b.type and a.value == b.value @contextmanager def comparable_tokens(): - eq = Token.__eq__ - ne = Token.__ne__ - setattr(Token, "__eq__", simple_token_compare) - setattr(Token, "__ne__", lambda a, b: not simple_token_compare(a, b)) - yield - setattr(Token, "__eq__", eq) - setattr(Token, "__ne__", ne) + eq = Token.__eq__ + ne = Token.__ne__ + setattr(Token, "__eq__", simple_token_compare) + setattr(Token, "__ne__", lambda a, b: not simple_token_compare(a, b)) + yield + setattr(Token, "__eq__", eq) + setattr(Token, "__ne__", ne) diff --git a/tests/helpers.py b/tests/helpers.py index 2385691..23ef1dd 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -20,12 +20,12 @@ ATTR_MAP = { class GetNextId: - def __init__(self): - self.seq = 1000 - - def next(self): - self.seq += 1 - return self.seq + def __init__(self): + self.seq = 1000 + + def next(self): + self.seq += 1 + return self.seq def get_concept(name=None, body=None, @@ -47,126 +47,126 @@ def get_concept(name=None, body=None, autouse=False, sequence=None, init_parameters=True) -> Concept: - """ - Create a Concept objet - Caution : 'id' and 'key' are not initialized + """ + Create a Concept objet + Caution : 'id' and 'key' are not initialized - :param name: - :type name: - :param body: - :type body: - :param id: - :type id: - :param key: - :type key: - :param where: - :type where: - :param pre: - :type pre: - :param post: - :type post: - :param ret: - :type ret: - :param definition: - :type definition: - :param definition_type: - :type definition_type: - :param desc: - :type desc: - :param props: - :type props: - :param variables: - :type variables: - :param parameters: - :type parameters: - :param bound_body: - :type bound_body: - :param is_builtin: - :type is_builtin: - :param is_unique: - :type is_unique: - :param autouse: - :type autouse: - :param sequence: - :type sequence: - :return: - :rtype: - """ - metadata = get_metadata( - name, body, - id, - key, - where, - pre, - post, - ret, - definition, - definition_type, - desc, - props, - variables, - parameters, - bound_body, - is_builtin, - is_unique, - autouse - ) - if sequence: - metadata.auto_init(sequence) - else: - metadata.digest = ConceptManager.compute_metadata_digest(metadata) - metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables) - - if init_parameters and metadata.variables: - metadata.parameters = [v[0] if isinstance(v, tuple) else v for v in metadata.variables] - - return Concept(metadata) + :param name: + :type name: + :param body: + :type body: + :param id: + :type id: + :param key: + :type key: + :param where: + :type where: + :param pre: + :type pre: + :param post: + :type post: + :param ret: + :type ret: + :param definition: + :type definition: + :param definition_type: + :type definition_type: + :param desc: + :type desc: + :param props: + :type props: + :param variables: + :type variables: + :param parameters: + :type parameters: + :param bound_body: + :type bound_body: + :param is_builtin: + :type is_builtin: + :param is_unique: + :type is_unique: + :param autouse: + :type autouse: + :param sequence: + :type sequence: + :return: + :rtype: + """ + metadata = get_metadata( + name, body, + id, + key, + where, + pre, + post, + ret, + definition, + definition_type, + desc, + props, + variables, + parameters, + bound_body, + is_builtin, + is_unique, + autouse + ) + if sequence: + metadata.auto_init(sequence) + else: + metadata.digest = ConceptManager.compute_metadata_digest(metadata) + metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables) + + if init_parameters and metadata.variables: + metadata.parameters = [v[0] if isinstance(v, tuple) else v for v in metadata.variables] + + return Concept(metadata) def get_evaluated_concept(blueprint: Concept | ConceptMetadata, **kwargs): - """ - Returns a concept where value are already initialized - :param blueprint: - :type blueprint: - :param kwargs: - :type kwargs: - :return: - :rtype: - """ - - def _isfloat(num): - try: - float(num) - return True - except ValueError: - return False - - res = Concept(blueprint.get_metadata()) - - for attr in ATTR_MAP: - source_code = getattr(res.get_metadata(), attr) - if source_code == "" or source_code is None: - value = NotInit - elif source_code[0] in ("'", '"'): - value = source_code[1:-1] - elif source_code in ("True", "False"): - value = source_code == "True" - elif source_code.isdecimal(): - value = int(source_code) - elif _isfloat(source_code): - value = float(source_code) - else: - raise Exception(f"Cannot manage {attr=}, {source_code=}") - - setattr(res, ATTR_MAP[attr], value) - - # force values - for k, v in kwargs.items(): - res.set_value(ATTR_MAP.get(k, k), v) - - res.get_runtime_info().is_evaluated = True - - return res + """ + Returns a concept where value are already initialized + :param blueprint: + :type blueprint: + :param kwargs: + :type kwargs: + :return: + :rtype: + """ + + def _isfloat(num): + try: + float(num) + return True + except ValueError: + return False + + res = Concept(blueprint.get_metadata()) + + for attr in ATTR_MAP: + source_code = getattr(res.get_metadata(), attr) + if source_code == "" or source_code is None: + value = NotInit + elif source_code[0] in ("'", '"'): + value = source_code[1:-1] + elif source_code in ("True", "False"): + value = source_code == "True" + elif source_code.isdecimal(): + value = int(source_code) + elif _isfloat(source_code): + value = float(source_code) + else: + raise Exception(f"Cannot manage {attr=}, {source_code=}") + + setattr(res, ATTR_MAP[attr], value) + + # force values + for k, v in kwargs.items(): + res.set_value(ATTR_MAP.get(k, k), v) + + res.get_runtime_info().is_evaluated = True + + return res def get_metadata(name=None, body=None, @@ -188,68 +188,68 @@ def get_metadata(name=None, body=None, autouse=False, digest=None, all_attrs=None): - new_variables = [] - if variables: - for v in variables: - if isinstance(v, tuple): - new_variables.append(v) - else: - new_variables.append((v, NotInit)) - - return ConceptMetadata( - id, - name, - key, - is_builtin, - is_unique, - body, - where, - pre, - post, - ret, - definition, - definition_type, - desc, - autouse, - bound_body, - props or {}, - tuple(new_variables), - parameters or [], - digest, - all_attrs, - ) + new_variables = [] + if variables: + for v in variables: + if isinstance(v, tuple): + new_variables.append(v) + else: + new_variables.append((v, NotInit)) + + return ConceptMetadata( + id, + name, + key, + is_builtin, + is_unique, + body, + where, + pre, + post, + ret, + definition, + definition_type, + desc, + autouse, + bound_body, + props or {}, + tuple(new_variables), + parameters or [], + digest, + all_attrs, + ) def metadata_auto_init(self: ConceptMetadata, sequence) -> ConceptMetadata: - """ - Helper function for the unit tests. - This method will be added to the `ConceptMetadata` to ease the writing of the unit tests - It properly initializes the ConceptMetadata - :param self: - :type self: - :param sequence: - :type sequence: - :return: - :rtype: - """ - if not self.id: - self.id = str(sequence.next()) - if not self.key: - self.key = ConceptManager.create_concept_key(self.name, self.definition, self.variables) - if not self.is_unique: - self.is_unique = False - if not self.is_builtin: - self.is_builtin = False - if not self.definition_type: - self.definition_type = DefinitionType.DEFAULT - if not self.all_attrs: - self.all_attrs = ConceptManager.compute_all_attrs(self.variables) - if not self.digest: - self.digest = ConceptManager.compute_metadata_digest(self) - - # Note that I do not automatically update the digest as I don't want to make unnecessary computations - - return self + """ + Helper function for the unit tests. + This method will be added to the `ConceptMetadata` to ease the writing of the unit tests + It properly initializes the ConceptMetadata + :param self: + :type self: + :param sequence: + :type sequence: + :return: + :rtype: + """ + if not self.id: + self.id = str(sequence.next()) + if not self.key: + self.key = ConceptManager.create_concept_key(self.name, self.definition, self.variables) + if not self.is_unique: + self.is_unique = False + if not self.is_builtin: + self.is_builtin = False + if not self.definition_type: + self.definition_type = DefinitionType.DEFAULT + if not self.all_attrs: + self.all_attrs = ConceptManager.compute_all_attrs(self.variables) + if not self.digest: + self.digest = ConceptManager.compute_metadata_digest(self) + + # Note that I do not automatically update the digest as I don't want to make unnecessary computations + + return self def metadata_clone(self: ConceptMetadata, name=None, body=None, @@ -270,75 +270,75 @@ def metadata_clone(self: ConceptMetadata, name=None, body=None, autouse=None, digest=None, all_attrs=None) -> ConceptMetadata: - """ - Helper function for the unit tests. - This method will be added to the `ConceptMetadata` to ease the writing of the unit tests - It clones a ConceptMetadata, but can override some attributes if requested - :param self: - :type self: - :param name: - :type name: - :param body: - :type body: - :param key: - :type key: - :param where: - :type where: - :param pre: - :type pre: - :param post: - :type post: - :param ret: - :type ret: - :param definition: - :type definition: - :param definition_type: - :type definition_type: - :param desc: - :type desc: - :param props: - :type props: - :param variables: - :type variables: - :param parameters: - :type parameters: - :param bound_body: - :type bound_body: - :param is_builtin: - :type is_builtin: - :param is_unique: - :type is_unique: - :param autouse: - :type autouse: - :param digest: - :type digest: - :param all_attrs: - :type all_attrs: - :return: - :rtype: - """ - return ConceptMetadata( - id=self.id, - name=self.name if name is None else name, - body=self.body if body is None else body, - key=self.key if key is None else key, - where=self.where if where is None else where, - pre=self.pre if pre is None else pre, - post=self.post if post is None else post, - ret=self.ret if ret is None else ret, - definition=self.definition if definition is None else definition, - definition_type=self.definition_type if definition_type is None else definition_type, - desc=self.desc if desc is None else desc, - props=self.props if props is None else props, - variables=self.variables if variables is None else variables, - parameters=self.parameters if parameters is None else parameters, - bound_body=self.bound_body if bound_body is None else bound_body, - is_builtin=self.is_builtin if is_builtin is None else is_builtin, - is_unique=self.is_unique if is_unique is None else is_unique, - autouse=self.autouse if autouse is None else autouse, - digest=self.digest if digest is None else digest, - all_attrs=self.all_attrs if all_attrs is None else all_attrs, - ) + """ + Helper function for the unit tests. + This method will be added to the `ConceptMetadata` to ease the writing of the unit tests + It clones a ConceptMetadata, but can override some attributes if requested + :param self: + :type self: + :param name: + :type name: + :param body: + :type body: + :param key: + :type key: + :param where: + :type where: + :param pre: + :type pre: + :param post: + :type post: + :param ret: + :type ret: + :param definition: + :type definition: + :param definition_type: + :type definition_type: + :param desc: + :type desc: + :param props: + :type props: + :param variables: + :type variables: + :param parameters: + :type parameters: + :param bound_body: + :type bound_body: + :param is_builtin: + :type is_builtin: + :param is_unique: + :type is_unique: + :param autouse: + :type autouse: + :param digest: + :type digest: + :param all_attrs: + :type all_attrs: + :return: + :rtype: + """ + return ConceptMetadata( + id=self.id, + name=self.name if name is None else name, + body=self.body if body is None else body, + key=self.key if key is None else key, + where=self.where if where is None else where, + pre=self.pre if pre is None else pre, + post=self.post if post is None else post, + ret=self.ret if ret is None else ret, + definition=self.definition if definition is None else definition, + definition_type=self.definition_type if definition_type is None else definition_type, + desc=self.desc if desc is None else desc, + props=self.props if props is None else props, + variables=self.variables if variables is None else variables, + parameters=self.parameters if parameters is None else parameters, + bound_body=self.bound_body if bound_body is None else bound_body, + is_builtin=self.is_builtin if is_builtin is None else is_builtin, + is_unique=self.is_unique if is_unique is None else is_unique, + autouse=self.autouse if autouse is None else autouse, + digest=self.digest if digest is None else digest, + all_attrs=self.all_attrs if all_attrs is None else all_attrs, + ) # Helpers functions for unit tests @@ -347,175 +347,138 @@ setattr(ConceptMetadata, 'clone', metadata_clone) def get_metadatas(*args, **kwargs): - as_metadatas = [arg if isinstance(arg, ConceptMetadata) else get_metadata(arg) for arg in args] - next_id = kwargs.get("next_id", None) - if next_id: - for metadata in as_metadatas: - metadata_auto_init(metadata, next_id) - - return as_metadatas + as_metadatas = [arg if isinstance(arg, ConceptMetadata) else get_metadata(arg) for arg in args] + next_id = kwargs.get("next_id", None) + if next_id: + for metadata in as_metadatas: + metadata_auto_init(metadata, next_id) + + return as_metadatas def get_concepts(context: ExecutionContext, *concepts, **kwargs) -> list[Concept]: - """ - Simple and quick way to get initialize concepts for a test - :param context: - :param concepts: Concepts to create - :param kwargs: named parameters to tweak the creation of the concepts - use_sheerka : Adds the new concepts to Sheerka. If not simply creates concepts that do not affect Sheerka - sequence : Sequence Manager, to give a correct id to the created concepts - :return: the concepts - """ - res = [] - use_sheerka = kwargs.pop("use_sheerka", False) - sequence = kwargs.pop("sequence", None) - for c in concepts: - if use_sheerka: - c = define_new_concept(context, c) - elif isinstance(c, str): - c = get_concept(c) - - if sequence: - c.get_metadata().auto_init(sequence) - - res.append(c) - - return res + """ + Simple and quick way to get initialize concepts for a test + :param context: + :param concepts: Concepts to create + :param kwargs: named parameters to tweak the creation of the concepts + use_sheerka : Adds the new concepts to Sheerka. If not simply creates concepts that do not affect Sheerka + sequence : Sequence Manager, to give a correct id to the created concepts + :return: the concepts + """ + res = [] + use_sheerka = kwargs.pop("use_sheerka", False) + sequence = kwargs.pop("sequence", None) + for c in concepts: + if use_sheerka: + c = define_new_concept(context, c) + elif isinstance(c, str): + c = get_concept(c) + + if sequence: + c.get_metadata().auto_init(sequence) + + res.append(c) + + return res def get_evaluated_concepts(context, *concepts, use_sheerka=False) -> list[Concept]: - if use_sheerka: - return [context.sheerka.evaluate_concept(context, Concept(c.get_metadata())) for c in concepts] - else: - return [get_evaluated_concept(concept) for concept in concepts] + if use_sheerka: + return [context.sheerka.evaluate_concept(context, Concept(c.get_metadata())) for c in concepts] + else: + return [get_evaluated_concept(concept) for concept in concepts] def define_new_concept(context: ExecutionContext, c: str | Concept | ConceptMetadata) -> Concept: - sheerka = context.sheerka - if isinstance(c, str): - retval = sheerka.define_new_concept(context, c) - else: - metadata = c.get_metadata() - retval = sheerka.define_new_concept(context, - metadata.name, - metadata.is_builtin, - metadata.is_unique, - metadata.body, - metadata.where, - metadata.pre, - metadata.post, - metadata.ret, - metadata.definition, - metadata.definition_type, - metadata.autouse, - metadata.bound_body, - metadata.desc, - metadata.props, - metadata.variables, - metadata.parameters) - - assert retval.status - concept = sheerka.newi(retval.value.metadata.id) - return concept + sheerka = context.sheerka + if isinstance(c, str): + retval = sheerka.define_new_concept(context, c) + else: + metadata = c.get_metadata() + retval = sheerka.define_new_concept(context, + metadata.name, + metadata.is_builtin, + metadata.is_unique, + metadata.body, + metadata.where, + metadata.pre, + metadata.post, + metadata.ret, + metadata.definition, + metadata.definition_type, + metadata.autouse, + metadata.bound_body, + metadata.desc, + metadata.props, + metadata.variables, + metadata.parameters) + + assert retval.status + concept = sheerka.newi(retval.value.metadata.id) + return concept def get_file_content(file_name): - with open(file_name) as f: - return f.read() + with open(file_name) as f: + return f.read() def get_parser_input(text): - pi = ParserInput(text) - assert pi.init() - - return pi + pi = ParserInput(text) + assert pi.init() + + return pi def get_from(*args, **kwargs): - """ - Convert the input to fix the positions - :param args: - :type args: - :return: - :rtype: - """ - cache = {} # I keep the name in cache to avoid having to remind it everytime - pos = 0 - res = [] - for item in args: - start = pos - if isinstance(item, MetadataToken): - if item.metadata.name: - cache[item.metadata.id] = item.metadata.name - - tokens = list(Tokenizer(cache[item.metadata.id], yield_eof=False)) - pos += len(tokens) - resolution_method = kwargs.get("resolution_method", item.resolution_method) - parser = kwargs.get("parser", item.parser) - res.append(MetadataToken(item.metadata, start, pos - 1, resolution_method, parser)) - elif isinstance(item, UnrecognizedToken): - tokens = list(Tokenizer(item.buffer, yield_eof=False)) - pos += len(tokens) - res.append(UnrecognizedToken(item.buffer, start, pos - 1)) - - return res + """ + Convert the input to fix the positions + :param args: + :type args: + :return: + :rtype: + """ + cache = {} # I keep the name in cache to avoid having to remind it everytime + pos = 0 + res = [] + for item in args: + start = pos + if isinstance(item, MetadataToken): + if item.metadata.name: + cache[item.metadata.id] = item.metadata.name + + tokens = list(Tokenizer(cache[item.metadata.id], yield_eof=False)) + pos += len(tokens) + resolution_method = kwargs.get("resolution_method", item.resolution_method) + parser = kwargs.get("parser", item.parser) + res.append(MetadataToken(item.metadata, start, pos - 1, resolution_method, parser)) + elif isinstance(item, UnrecognizedToken): + tokens = list(Tokenizer(item.buffer, yield_eof=False)) + pos += len(tokens) + res.append(UnrecognizedToken(item.buffer, start, pos - 1)) + + return res def _rv(value, who="Test"): - return ReturnValue(who=who, status=True, value=value) + return ReturnValue(who=who, status=True, value=value) def _rvc(concept_name, who="Test"): - next_id = GetNextId() - concept = get_concept(concept_name, sequence=next_id) - return ReturnValue(who=who, status=True, value=concept) + next_id = GetNextId() + concept = get_concept(concept_name, sequence=next_id) + return ReturnValue(who=who, status=True, value=concept) def _rvf(value, who="Test"): - """ - Return Value False - :param value: - :type value: - :return: - :rtype: - """ - return ReturnValue(who=who, status=False, value=value) + """ + Return Value False + :param value: + :type value: + :return: + :rtype: + """ + return ReturnValue(who=who, status=False, value=value) -def _ut(buffer, start=0, end=-1): - """ - helper to UnrecognizedToken - :param buffer: - :type buffer: - :param start: - :type start: - :param end: - :type end: - :return: - :rtype: - """ - return UnrecognizedToken(buffer, start, end) - - -def _mt(concept_id, - start=0, - end=-1, - resolution_method: Literal["name", "key", "id"] = "id", - parser="simple", - **kwargs): - """ - helper to MetadataToken - :param concept_id: - :type concept_id: - :param start: - :type start: - :param end: - :type end: - :return: - :rtype: - """ - name, _id = unstr_concept(concept_id) - variables = [(k, v) for k, v in kwargs.items()] if kwargs else None - metadata = get_metadata(id=concept_id, variables=variables) if _id is None \ - else get_metadata(id=_id, name=name, variables=variables) - return MetadataToken(metadata, start, end, resolution_method, parser) diff --git a/tests/mockserver.py b/tests/mockserver.py deleted file mode 100644 index 93a6066..0000000 --- a/tests/mockserver.py +++ /dev/null @@ -1,62 +0,0 @@ -import logging -from multiprocessing import Process -from time import sleep - -import uvicorn -from fastapi import FastAPI - - -class MockServer: - """ Core application to test. """ - - def __init__(self, endpoints: list[dict]): - """ - - :param endpoints: - :type endpoints: list of {path: '', response:''} - """ - self.api = FastAPI() - - def raise_exception(ex): - raise ex - - # register endpoints - for endpoint in endpoints: - method = endpoint["method"] if "method" in endpoint else "get" - if method == "post": - if "exception" in endpoint: - self.api.post(endpoint["path"])(lambda: raise_exception(endpoint["exception"])) - else: - self.api.post(endpoint["path"])(lambda: endpoint["response"]) - else: - self.api.get(endpoint["path"])(lambda: endpoint["response"]) - - # register shutdown - self.api.on_event("shutdown")(self.close) - - # create the process - self.proc = Process(target=uvicorn.run, - args=(self.api,), - kwargs={ - "host": "127.0.0.1", - "port": 5000, - "log_level": "info"}, - daemon=True) - - async def close(self): - """ Gracefull shutdown. """ - logging.warning("Shutting down the app.") - - def start_server(self): - self.proc.start() - sleep(0.1) - - def stop_server(self): - self.proc.terminate() - - def __enter__(self): - self.start_server() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.stop_server() diff --git a/tests/parsers/conftest.py b/tests/parsers/conftest.py new file mode 100644 index 0000000..da5d27e --- /dev/null +++ b/tests/parsers/conftest.py @@ -0,0 +1,105 @@ +from typing import Literal + +from common.utils import str_concept, unstr_concept +from helpers import get_metadata +from parsers.state_machine import MetadataToken, UnrecognizedToken + + +class MetadataTokenForTest(MetadataToken): + def __repr__(self): + res = f"(MetadataTokenForTest metadata={str_concept(self.metadata, drop_name=True)}" + if self.start is not None: + res += f", start={self.start}" + if self.end is not None: + res += f", end={self.end}" + if self.resolution_method is not None: + res += f", method={self.resolution_method}" + if self.parser is not None: + res += f", origin={self.parser}" + + res += ")" + return res + + def __eq__(self, other): + if not isinstance(other, MetadataToken): + return False + + if self.metadata.id != other.metadata.id: + return False + + if self.start is not None and self.start != other.start: + return False + + if self.end is not None and self.end != other.end: + return False + + if self.parser is not None and self.parser != other.parser: + return False + + if self.resolution_method is not None and self.resolution_method != other.resolution_method: + return False + + return True + + +def _ut(buffer, start=0, end=-1): + """ + helper to UnrecognizedToken + :param buffer: + :type buffer: + :param start: + :type start: + :param end: + :type end: + :return: + :rtype: + """ + return UnrecognizedToken(buffer, start, end) + + +def _mt(concept_id, + start=0, + end=-1, + resolution_method: Literal["name", "key", "id"] = "key", + parser="simple", + **kwargs): + """ + helper to MetadataToken + :param concept_id: + :type concept_id: + :param start: + :type start: + :param end: + :type end: + :return: + :rtype: + """ + name, _id = unstr_concept(concept_id) + variables = [(k, v) for k, v in kwargs.items()] if kwargs else None + metadata = get_metadata(id=concept_id, variables=variables) if _id is None \ + else get_metadata(id=_id, name=name, variables=variables) + return MetadataTokenForTest(metadata, start, end, resolution_method, parser) + + +def _mtsya(concept_id, + start=0, + end=None, + resolution_method: Literal["name", "key", "id"] = "key", + parser="sya", + **kwargs): + """ + helper to MetadataToken + :param concept_id: + :type concept_id: + :param start: + :type start: + :param end: + :type end: + :return: + :rtype: + """ + name, _id = unstr_concept(concept_id) + variables = [(k, v) for k, v in kwargs.items()] if kwargs else None + metadata = get_metadata(id=concept_id, variables=variables) if _id is None \ + else get_metadata(id=_id, name=name, variables=variables) + return MetadataTokenForTest(metadata, start, end, resolution_method, parser) diff --git a/tests/parsers/test_SimpleConceptsParser.py b/tests/parsers/test_SimpleConceptsParser.py index 79c0c26..204f489 100644 --- a/tests/parsers/test_SimpleConceptsParser.py +++ b/tests/parsers/test_SimpleConceptsParser.py @@ -3,148 +3,149 @@ import pytest from base import BaseTest from conftest import NewOntology from evaluators.base_evaluator import MultipleChoices -from helpers import _mt, _ut, get_concepts, get_from, get_metadata, get_parser_input +from helpers import get_concepts, get_from, get_metadata, get_parser_input from parsers.SimpleConceptsParser import SimpleConceptsParser +from tests.parsers.conftest import _mt, _ut class TestSimpleConceptsParser(BaseTest): - - @pytest.fixture() - def parser(self): - return SimpleConceptsParser() - - @pytest.mark.parametrize("text, expected", [ - ("I am a new concept", [_mt("1003", 0, 8)]), - ("xxx yyy I am a new concept", [_ut("xxx yyy ", 0, 3), _mt("1003", 4, 12)]), - ("I am a new concept xxx yyy", [_mt("1003", 0, 8), _ut(" xxx yyy", 9, 12)]), - ("xxx I am a new concept yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 10), _ut(" yyy", 11, 12)]), - ("c:#1003:", [_mt("1003", 0, 0)]), - ("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2), _ut(" yyy", 3, 4)]), - ("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2), _ut(" yyy", 3, 4)]), - (" I am a new concept", [_ut(" ", 0, 0), _mt("1003", 1, 9)]) - ]) - def test_i_can_recognize_a_concept(self, context, parser, text, expected): - with NewOntology(context, "test_i_can_recognize_a_concept"): - get_concepts(context, "I", "I am", "I am a new concept", use_sheerka=True) - - pi = get_parser_input(text) - error_sink = [] - res = parser.parse(context, pi, error_sink) - - assert res == MultipleChoices([expected]) - assert not error_sink - - @pytest.mark.parametrize("text, expected", [ - ("foo", [_mt("1001", 0, 0)]), - ("I am a new concept", [_mt("1001", 0, 8)]) - ]) - def test_i_can_recognize_a_concept_by_its_name_and_its_definition(self, context, parser, text, expected): - with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_and_its_definition"): - get_concepts(context, get_metadata(name="foo", definition="I am a new concept"), use_sheerka=True) - - pi = get_parser_input(text) - error_sink = [] - res = parser.parse(context, pi, error_sink) - - assert res == MultipleChoices([expected]) - assert not error_sink - - @pytest.mark.parametrize("text, expected", [ - ("long concept name", [_mt("1001", 0, 4)]), - ("I am a new concept", [_mt("1001", 0, 8)]) - ]) - def test_i_can_recognize_a_concept_by_its_name_when_long_name(self, context, parser, text, expected): - with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_when_long_name"): - get_concepts(context, get_metadata(name="long concept name", definition="I am a new concept"), - use_sheerka=True) - - pi = get_parser_input(text) - error_sink = [] - res = parser.parse(context, pi, error_sink) - - assert res == MultipleChoices([expected]) - assert not error_sink - - def test_i_can_parse_a_sequence_of_concept(self, context, parser): - with NewOntology(context, "test_i_can_parse_a_sequence_of_concept"): - get_concepts(context, "foo bar", "baz", "qux", use_sheerka=True) - - pi = get_parser_input("foo bar baz foo, qux") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected = [_mt("1001", 0, 2), - _ut(" ", 3, 3), - _mt("1002", 4, 4), - _ut(" foo, ", 5, 8), - _mt("1003", 9, 9)] - - assert res == MultipleChoices([expected]) - assert not error_sink - - def test_i_can_detect_multiple_choices(self, context, parser): - with NewOntology(context, "test_i_can_detect_multiple_choices"): - get_concepts(context, "foo bar", "bar baz", use_sheerka=True) - - pi = get_parser_input("foo bar baz") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected1 = [_mt("1001", 0, 2), _ut(" baz", 3, 4)] - expected2 = [_ut("foo ", 0, 1), _mt("1002", 2, 4)] - - assert res == MultipleChoices([expected1, expected2]) - assert not error_sink - - def test_i_can_detect_multiple_choices_2(self, context, parser): - with NewOntology(context, "test_i_can_detect_multiple_choices_2"): - get_concepts(context, "one two", "one", "two", use_sheerka=True) - - pi = get_parser_input("one two") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected1 = [_mt("1001", 0, 2)] - expected2 = [_mt("1002", 0, 0), _ut(" ", 1, 1), _mt("1003", 2, 2)] - - assert res == MultipleChoices([expected1, expected2]) - assert not error_sink - - def test_i_can_detect_multiple_choices_3(self, context, parser): - with NewOntology(context, "test_i_can_detect_multiple_choices_2"): - get_concepts(context, "one two", "one", "two", use_sheerka=True) - - pi = get_parser_input("one two xxx one two") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - e1 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:#1001:")) - e2 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:one two#1001:")) - e3 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:")) - e4 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:#1002:"), _ut(" "), - _mt("c:#1003:")) - - assert res == MultipleChoices([e1, e2, e3, e4]) - assert not error_sink - - def test_nothing_is_return_is_no_concept_is_recognized(self, context, parser): - pi = get_parser_input("one two three") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - assert res == MultipleChoices([]) - - def test_i_can_manage_attribute_reference(self, context, parser): - with NewOntology(context, "test_i_can_detect_multiple_choices_2"): - get_concepts(context, "foo", "i am a concept", use_sheerka=True) - - pi = get_parser_input("foo.attribute") - error_sink = [] - res = parser.parse(context, pi, error_sink) - expected = [_mt("1001", 0, 0), _ut(".attribute", 1, 2)] - assert res == MultipleChoices([expected]) - - pi = get_parser_input("i am a concept.attribute") - res = parser.parse(context, pi, error_sink) - expected = [_mt("1002", 0, 6), _ut(".attribute", 7, 8)] - assert res == MultipleChoices([expected]) + + @pytest.fixture() + def parser(self): + return SimpleConceptsParser() + + @pytest.mark.parametrize("text, expected", [ + ("I am a new concept", [_mt("1003", 0, 8)]), + ("xxx yyy I am a new concept", [_ut("xxx yyy ", 0, 3), _mt("1003", 4, 12)]), + ("I am a new concept xxx yyy", [_mt("1003", 0, 8), _ut(" xxx yyy", 9, 12)]), + ("xxx I am a new concept yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 10), _ut(" yyy", 11, 12)]), + ("c:#1003:", [_mt("1003", 0, 0, resolution_method="id")]), + ("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2, resolution_method="id"), _ut(" yyy", 3, 4)]), + ("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2, resolution_method="name"), _ut(" yyy", 3, 4)]), + (" I am a new concept", [_ut(" ", 0, 0), _mt("1003", 1, 9)]) + ]) + def test_i_can_recognize_a_concept(self, context, parser, text, expected): + with NewOntology(context, "test_i_can_recognize_a_concept"): + get_concepts(context, "I", "I am", "I am a new concept", use_sheerka=True) + + pi = get_parser_input(text) + error_sink = [] + res = parser.parse(context, pi, error_sink) + + assert res == MultipleChoices([expected]) + assert not error_sink + + @pytest.mark.parametrize("text, expected", [ + ("foo", [_mt("1001", 0, 0, resolution_method="name")]), + ("I am a new concept", [_mt("1001", 0, 8)]) + ]) + def test_i_can_recognize_a_concept_by_its_name_and_its_definition(self, context, parser, text, expected): + with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_and_its_definition"): + get_concepts(context, get_metadata(name="foo", definition="I am a new concept"), use_sheerka=True) + + pi = get_parser_input(text) + error_sink = [] + res = parser.parse(context, pi, error_sink) + + assert res == MultipleChoices([expected]) + assert not error_sink + + @pytest.mark.parametrize("text, expected", [ + ("long concept name", [_mt("1001", 0, 4, resolution_method="name")]), + ("I am a new concept", [_mt("1001", 0, 8)]) + ]) + def test_i_can_recognize_a_concept_by_its_name_when_long_name(self, context, parser, text, expected): + with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_when_long_name"): + get_concepts(context, get_metadata(name="long concept name", definition="I am a new concept"), + use_sheerka=True) + + pi = get_parser_input(text) + error_sink = [] + res = parser.parse(context, pi, error_sink) + + assert res == MultipleChoices([expected]) + assert not error_sink + + def test_i_can_parse_a_sequence_of_concept(self, context, parser): + with NewOntology(context, "test_i_can_parse_a_sequence_of_concept"): + get_concepts(context, "foo bar", "baz", "qux", use_sheerka=True) + + pi = get_parser_input("foo bar baz foo, qux") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected = [_mt("1001", 0, 2), + _ut(" ", 3, 3), + _mt("1002", 4, 4), + _ut(" foo, ", 5, 8), + _mt("1003", 9, 9)] + + assert res == MultipleChoices([expected]) + assert not error_sink + + def test_i_can_detect_multiple_choices(self, context, parser): + with NewOntology(context, "test_i_can_detect_multiple_choices"): + get_concepts(context, "foo bar", "bar baz", use_sheerka=True) + + pi = get_parser_input("foo bar baz") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected1 = [_mt("1001", 0, 2), _ut(" baz", 3, 4)] + expected2 = [_ut("foo ", 0, 1), _mt("1002", 2, 4)] + + assert res == MultipleChoices([expected1, expected2]) + assert not error_sink + + def test_i_can_detect_multiple_choices_2(self, context, parser): + with NewOntology(context, "test_i_can_detect_multiple_choices_2"): + get_concepts(context, "one two", "one", "two", use_sheerka=True) + + pi = get_parser_input("one two") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected1 = [_mt("1001", 0, 2)] + expected2 = [_mt("1002", 0, 0), _ut(" ", 1, 1), _mt("1003", 2, 2)] + + assert res == MultipleChoices([expected1, expected2]) + assert not error_sink + + def test_i_can_detect_multiple_choices_3(self, context, parser): + with NewOntology(context, "test_i_can_detect_multiple_choices_2"): + get_concepts(context, "one two", "one", "two", use_sheerka=True) + + pi = get_parser_input("one two xxx one two") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + e1 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:#1001:")) + e2 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:one two#1001:")) + e3 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:")) + e4 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:#1002:"), _ut(" "), + _mt("c:#1003:")) + + assert res == MultipleChoices([e1, e2, e3, e4]) + assert not error_sink + + def test_nothing_is_return_is_no_concept_is_recognized(self, context, parser): + pi = get_parser_input("one two three") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + assert res == MultipleChoices([]) + + def test_i_can_manage_attribute_reference(self, context, parser): + with NewOntology(context, "test_i_can_detect_multiple_choices_2"): + get_concepts(context, "foo", "i am a concept", use_sheerka=True) + + pi = get_parser_input("foo.attribute") + error_sink = [] + res = parser.parse(context, pi, error_sink) + expected = [_mt("1001", 0, 0), _ut(".attribute", 1, 2)] + assert res == MultipleChoices([expected]) + + pi = get_parser_input("i am a concept.attribute") + res = parser.parse(context, pi, error_sink) + expected = [_mt("1002", 0, 6), _ut(".attribute", 7, 8)] + assert res == MultipleChoices([expected]) diff --git a/tests/parsers/test_SyaConceptsParser.py b/tests/parsers/test_SyaConceptsParser.py index 9f329b5..b594553 100644 --- a/tests/parsers/test_SyaConceptsParser.py +++ b/tests/parsers/test_SyaConceptsParser.py @@ -3,91 +3,92 @@ import pytest from base import BaseTest from conftest import NewOntology, comparable_tokens from evaluators.base_evaluator import MultipleChoices -from helpers import _mt, get_concept, get_concepts, get_parser_input +from helpers import get_concept, get_concepts, get_parser_input from parsers.SyaConceptsParser import SyaConceptsParser from parsers.tokenizer import Tokenizer +from tests.parsers.conftest import _mtsya class TestSyaConceptsParser(BaseTest): - - @pytest.fixture() - def parser(self): - return SyaConceptsParser() - - @pytest.mark.parametrize("concept_key, expected_list", [ - ["a long token name", [("a long token name", 0)]], - ["__var__0 __var__1 __var__2", [("", 3)]], - ["__var__0 __var__1 prefixed", [(" prefixed", 2)]], - ["suffixed __var__0 __var__1", [("suffixed ", 0), ["", 2]]], - ["__var__0 __var__1 infixed __var__0 __var__1", [(" infixed ", 2), ["", 2]]], - ["if __var__0 __var__1 then __var__2 end", [("if ", 0), (" then ", 2), (" end", 1)]] - ]) - def test_i_can_initialize_expected_parameters(self, parser, concept_key, expected_list): - resolved_expected_list = [(list(Tokenizer(source, yield_eof=False)), nb) for source, nb in expected_list] - actual = parser._get_expected_tokens(concept_key) - - with comparable_tokens(): - assert actual == resolved_expected_list - - @pytest.mark.parametrize("concept", [ - get_concept("a plus b", variables=["a", "b"]), - get_concept("add a b", variables=["a", "b"]), - get_concept("a b add", variables=["a", "b"]), - ]) - def test_i_can_parse_a_simple_case(self, context, parser, concept): - with NewOntology(context, "test_i_can_parse_a_simple_case"): - get_concepts(context, concept, use_sheerka=True) - - pi = get_parser_input("1 plus 2") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected = [_mt("1001", a="1 ", b=" 2")] - assert res == MultipleChoices([expected]) - assert not error_sink - - def test_i_can_parse_long_names_concept(self, context, parser): - with NewOntology(context, "test_i_can_parse_a_simple_case"): - get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True) - - pi = get_parser_input("1 long named concept 2") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected = [_mt("1001", a="1 ", b=" 2")] - assert res == MultipleChoices([expected]) - assert not error_sink - - def test_i_can_parse_sequence(self, context, parser): - with NewOntology(context, "test_i_can_parse_sequence"): - get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True) - - pi = get_parser_input("1 plus 2 3 plus 7") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected = [[_mt("1001", a="1 ", b=" 2")], [_mt("1001", a=" 3 ", b=" 7")]] - assert res == MultipleChoices(expected) - assert not error_sink - - def test_not_enough_parameters(self, context, parser): - with NewOntology(context, "test_not_enough_parameters"): - get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True) - - pi = get_parser_input("1 plus 2 3 plus 7") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - expected = [[_mt("1001", a="1 ", b=" 2")], [_mt("1001", a=" 3 ", b=" 7")]] - assert res == MultipleChoices(expected) - assert not error_sink - - def test_i_can_detect_when_name_does_not_match(self, context, parser): - with NewOntology(context, "test_i_can_detect_when_name_does_not_match"): - get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True) - - pi = get_parser_input("1 long named mismatch 2") - error_sink = [] - res = parser.parse(context, pi, error_sink) - - assert error_sink + + @pytest.fixture() + def parser(self): + return SyaConceptsParser() + + @pytest.mark.parametrize("concept_key, expected_list", [ + ["a long token name", [("a long token name", 0)]], + ["__var__0 __var__1 __var__2", [("", 3)]], + ["__var__0 __var__1 prefixed", [(" prefixed", 2)]], + ["suffixed __var__0 __var__1", [("suffixed ", 0), ["", 2]]], + ["__var__0 __var__1 infixed __var__0 __var__1", [(" infixed ", 2), ["", 2]]], + ["if __var__0 __var__1 then __var__2 end", [("if ", 0), (" then ", 2), (" end", 1)]] + ]) + def test_i_can_initialize_expected_parameters(self, parser, concept_key, expected_list): + resolved_expected_list = [(list(Tokenizer(source, yield_eof=False)), nb) for source, nb in expected_list] + actual = parser._get_expected_tokens(concept_key) + + with comparable_tokens(): + assert actual == resolved_expected_list + + @pytest.mark.parametrize("concept, _input", [ + (get_concept("a plus b", variables=["a", "b"]), "1 plus 2"), + (get_concept("add a b", variables=["a", "b"]), "add 1 2"), + (get_concept("a b add", variables=["a", "b"]), "1 2 add") + ]) + def test_i_can_parse_a_simple_case(self, context, parser, concept, _input): + with NewOntology(context, "test_i_can_parse_a_simple_case"): + get_concepts(context, concept, use_sheerka=True) + + pi = get_parser_input(_input) + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected = [_mtsya("1001", a="1 ", b=" 2")] + assert res == MultipleChoices([expected]) + assert not error_sink + + def test_i_can_parse_long_names_concept(self, context, parser): + with NewOntology(context, "test_i_can_parse_a_simple_case"): + get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True) + + pi = get_parser_input("1 long named concept 2") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected = [_mtsya("1001", a="1 ", b=" 2")] + assert res == MultipleChoices([expected]) + assert not error_sink + + def test_i_can_parse_sequence(self, context, parser): + with NewOntology(context, "test_i_can_parse_sequence"): + get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True) + + pi = get_parser_input("1 plus 2 3 plus 7") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]] + assert res == MultipleChoices(expected) + assert not error_sink + + def test_not_enough_parameters(self, context, parser): + with NewOntology(context, "test_not_enough_parameters"): + get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True) + + pi = get_parser_input("1 plus ") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]] + assert res == MultipleChoices(expected) + assert not error_sink + + def test_i_can_detect_when_name_does_not_match(self, context, parser): + with NewOntology(context, "test_i_can_detect_when_name_does_not_match"): + get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True) + + pi = get_parser_input("1 long named mismatch 2") + error_sink = [] + res = parser.parse(context, pi, error_sink) + + assert error_sink diff --git a/tests/test_client.py b/tests/test_client.py index 93e2d74..38c9103 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,11 +1,11 @@ +from unittest.mock import MagicMock, patch + from fastapi import HTTPException from starlette import status from client import SheerkaClient, parse_arguments -from mockserver import MockServer -# @pytest.mark.skip("too long") class TestSheerkaClient: def test_i_can_start_with_a_default_hostname(self): parsed = parse_arguments([]) @@ -41,7 +41,11 @@ class TestSheerkaClient: assert res.message == "Connection refused." def test_i_can_manage_when_resource_is_not_found(self): - with MockServer([]): + mock_response = MagicMock() + mock_response.__bool__ = MagicMock(return_value=False) + mock_response.text = '{"detail":"Not Found"}' + + with patch("requests.get", return_value=mock_response): client = SheerkaClient("http://localhost", 5000) res = client.check_url() @@ -49,29 +53,25 @@ class TestSheerkaClient: assert res.message == '{"detail":"Not Found"}' def test_i_can_connect_to_a_server(self): - with MockServer([{ - "path": "/", - "response": "Hello world" - }]): + mock_response = MagicMock() + mock_response.__bool__ = MagicMock(return_value=True) + mock_response.text = '"Hello world"' + + with patch("requests.get", return_value=mock_response): client = SheerkaClient("http://localhost", 5000) res = client.check_url() + assert res.status assert res.message == '"Hello world"' def test_i_can_manage_when_authentication_fails(self): - with MockServer([{ - "path": "/", - "response": "Hello world" - }, { - "method": "post", - "path": "/token", - "exception": HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - }]): + mock_response = MagicMock() + mock_response.__bool__ = MagicMock(return_value=False) + mock_response.json.return_value = {"detail": "Incorrect username or password"} + + with patch("requests.post", return_value=mock_response): client = SheerkaClient("http://localhost", 5000) res = client.connect("username", "wrong_password") + assert not res.status - assert res.message == 'Incorrect username or password' + assert res.message == "Incorrect username or password" diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 394318d..6bd6774 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -2,279 +2,280 @@ import pytest from common.global_symbols import NotInit from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType -from helpers import GetNextId, _mt, _ut, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \ - get_metadatas +from helpers import GetNextId, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \ + get_metadatas +from tests.parsers.conftest import _mt, _ut def test_i_can_get_default_value_when_get_metadata(): - metadata = get_metadata() - assert metadata.id is None - assert metadata.name is None - assert metadata.name is None - assert metadata.body is None - assert metadata.id is None - assert metadata.key is None - assert metadata.where is None - assert metadata.pre is None - assert metadata.post is None - assert metadata.ret is None - assert metadata.definition is None - assert metadata.definition_type == DefinitionType.DEFAULT - assert metadata.desc is None - assert metadata.props == {} - assert metadata.variables == tuple() - assert metadata.parameters == [] - assert metadata.bound_body is None - assert metadata.is_builtin is False - assert metadata.is_unique is False - assert metadata.autouse is False + metadata = get_metadata() + assert metadata.id is None + assert metadata.name is None + assert metadata.name is None + assert metadata.body is None + assert metadata.id is None + assert metadata.key is None + assert metadata.where is None + assert metadata.pre is None + assert metadata.post is None + assert metadata.ret is None + assert metadata.definition is None + assert metadata.definition_type == DefinitionType.DEFAULT + assert metadata.desc is None + assert metadata.props == {} + assert metadata.variables == tuple() + assert metadata.parameters == [] + assert metadata.bound_body is None + assert metadata.is_builtin is False + assert metadata.is_unique is False + assert metadata.autouse is False def test_i_can_use_shortcut_to_declare_variables(): - metadata = get_metadata(variables=(("var1", NotInit), ("var2", "value"))) - assert metadata.variables == (("var1", NotInit), ("var2", "value")) # default behaviour - - metadata = get_metadata(variables=[("var1", NotInit), ("var2", "value")]) - assert metadata.variables == (("var1", NotInit), ("var2", "value")) # lists are transformed into tuples - - metadata = get_metadata(variables=["var1", "var2"]) - assert metadata.variables == (("var1", NotInit), ("var2", NotInit)) # expanded + metadata = get_metadata(variables=(("var1", NotInit), ("var2", "value"))) + assert metadata.variables == (("var1", NotInit), ("var2", "value")) # default behaviour + + metadata = get_metadata(variables=[("var1", NotInit), ("var2", "value")]) + assert metadata.variables == (("var1", NotInit), ("var2", "value")) # lists are transformed into tuples + + metadata = get_metadata(variables=["var1", "var2"]) + assert metadata.variables == (("var1", NotInit), ("var2", NotInit)) # expanded def test_i_can_clone(): - metadata = ConceptMetadata( - "id", - "name", - "key", - True, - True, - "body", - "where", - "pre", - "post", - "ret", - "definition", - DefinitionType.BNF, - "desc", - True, - "bound_body", - {"prop": "value"}, - (("variable", "value"),), - ("p1",), - "digest", - ("all_attr",), - ) - - clone = metadata.clone() - for attr, value in vars(metadata).items(): - clone_value = getattr(clone, attr) - assert clone_value == value + metadata = ConceptMetadata( + "id", + "name", + "key", + True, + True, + "body", + "where", + "pre", + "post", + "ret", + "definition", + DefinitionType.BNF, + "desc", + True, + "bound_body", + {"prop": "value"}, + (("variable", "value"),), + ("p1",), + "digest", + ("all_attr",), + ) + + clone = metadata.clone() + for attr, value in vars(metadata).items(): + clone_value = getattr(clone, attr) + assert clone_value == value def test_i_can_override_values_when_i_clone_metadata(): - metadata = get_metadata() - assert metadata.clone(name="new_name").name == "new_name" - assert metadata.clone(body="new_body").body == "new_body" - assert metadata.clone(key="new_key").key == "new_key" - assert metadata.clone(where="new_where").where == "new_where" - assert metadata.clone(pre="new_pre").pre == "new_pre" - assert metadata.clone(post="new_post").post == "new_post" - assert metadata.clone(ret="new_ret").ret == "new_ret" - assert metadata.clone(definition="new_definition").definition == "new_definition" - assert metadata.clone(definition_type="new_definition_type").definition_type == "new_definition_type" - assert metadata.clone(desc="new_desc").desc == "new_desc" - assert metadata.clone(props="new_props").props == "new_props" - assert metadata.clone(variables="new_variables").variables == "new_variables" - assert metadata.clone(parameters="new_parameters").parameters == "new_parameters" - assert metadata.clone(bound_body="new_bound_body").bound_body == "new_bound_body" - assert metadata.clone(is_builtin="new_is_builtin").is_builtin == "new_is_builtin" - assert metadata.clone(is_unique="new_is_unique").is_unique == "new_is_unique" - assert metadata.clone(autouse="new_autouse").autouse == "new_autouse" - assert metadata.clone(digest="new_digest").digest == "new_digest" - assert metadata.clone(all_attrs="new_all_attrs").all_attrs == "new_all_attrs" + metadata = get_metadata() + assert metadata.clone(name="new_name").name == "new_name" + assert metadata.clone(body="new_body").body == "new_body" + assert metadata.clone(key="new_key").key == "new_key" + assert metadata.clone(where="new_where").where == "new_where" + assert metadata.clone(pre="new_pre").pre == "new_pre" + assert metadata.clone(post="new_post").post == "new_post" + assert metadata.clone(ret="new_ret").ret == "new_ret" + assert metadata.clone(definition="new_definition").definition == "new_definition" + assert metadata.clone(definition_type="new_definition_type").definition_type == "new_definition_type" + assert metadata.clone(desc="new_desc").desc == "new_desc" + assert metadata.clone(props="new_props").props == "new_props" + assert metadata.clone(variables="new_variables").variables == "new_variables" + assert metadata.clone(parameters="new_parameters").parameters == "new_parameters" + assert metadata.clone(bound_body="new_bound_body").bound_body == "new_bound_body" + assert metadata.clone(is_builtin="new_is_builtin").is_builtin == "new_is_builtin" + assert metadata.clone(is_unique="new_is_unique").is_unique == "new_is_unique" + assert metadata.clone(autouse="new_autouse").autouse == "new_autouse" + assert metadata.clone(digest="new_digest").digest == "new_digest" + assert metadata.clone(all_attrs="new_all_attrs").all_attrs == "new_all_attrs" def test_i_cannot_change_the_id_when_cloning(): - with pytest.raises(TypeError): - metadata = get_metadata() - metadata.clone(id="new_id") + with pytest.raises(TypeError): + metadata = get_metadata() + metadata.clone(id="new_id") def test_i_can_auto_init(): - next_id = GetNextId() - metadata = get_metadata("a plus b", body="a + b", variables=["a", "b"]).auto_init(next_id) - - assert metadata.name == "a plus b" - assert metadata.id == "1001" - assert metadata.key == "__var__0 plus __var__1" - assert metadata.all_attrs == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'a', 'b') - assert metadata.is_unique is False - assert metadata.is_builtin is False - assert metadata.definition_type is DefinitionType.DEFAULT - assert metadata.digest == '9e058bc1261d1e2c785889147066ce89960fd6844db5bb6f1d1d809a8eb790b7' + next_id = GetNextId() + metadata = get_metadata("a plus b", body="a + b", variables=["a", "b"]).auto_init(next_id) + + assert metadata.name == "a plus b" + assert metadata.id == "1001" + assert metadata.key == "__var__0 plus __var__1" + assert metadata.all_attrs == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'a', 'b') + assert metadata.is_unique is False + assert metadata.is_builtin is False + assert metadata.definition_type is DefinitionType.DEFAULT + assert metadata.digest == '9e058bc1261d1e2c785889147066ce89960fd6844db5bb6f1d1d809a8eb790b7' def test_sequences_are_incremented_when_multiples_call(): - next_id = GetNextId() - assert get_metadata("foo").auto_init(next_id).id == "1001" - assert get_metadata("bar").auto_init(next_id).id == "1002" + next_id = GetNextId() + assert get_metadata("foo").auto_init(next_id).id == "1001" + assert get_metadata("bar").auto_init(next_id).id == "1002" def test_i_can_get_multiple_metadatas(): - res = get_metadatas("foo", get_metadata("bar", body="body")) - - assert len(res) == 2 - - metadata = res[0] - assert isinstance(metadata, ConceptMetadata) - assert metadata.name == "foo" - assert metadata.body is None - assert metadata.key is None - assert metadata.id is None - - metadata = res[1] - assert isinstance(metadata, ConceptMetadata) - assert metadata.name == "bar" - assert metadata.body == "body" - assert metadata.key is None - assert metadata.id is None + res = get_metadatas("foo", get_metadata("bar", body="body")) + + assert len(res) == 2 + + metadata = res[0] + assert isinstance(metadata, ConceptMetadata) + assert metadata.name == "foo" + assert metadata.body is None + assert metadata.key is None + assert metadata.id is None + + metadata = res[1] + assert isinstance(metadata, ConceptMetadata) + assert metadata.name == "bar" + assert metadata.body == "body" + assert metadata.key is None + assert metadata.id is None def test_i_can_get_multiple_already_initialized_metadatas(): - res = get_metadatas("foo", get_metadata("bar", body="body"), next_id=GetNextId()) - - assert len(res) == 2 - - metadata = res[0] - assert isinstance(metadata, ConceptMetadata) - assert metadata.name == "foo" - assert metadata.body is None - assert metadata.key == "foo" - assert metadata.id == "1001" - - metadata = res[1] - assert isinstance(metadata, ConceptMetadata) - assert metadata.name == "bar" - assert metadata.body == "body" - assert metadata.key == "bar" - assert metadata.id == "1002" + res = get_metadatas("foo", get_metadata("bar", body="body"), next_id=GetNextId()) + + assert len(res) == 2 + + metadata = res[0] + assert isinstance(metadata, ConceptMetadata) + assert metadata.name == "foo" + assert metadata.body is None + assert metadata.key == "foo" + assert metadata.id == "1001" + + metadata = res[1] + assert isinstance(metadata, ConceptMetadata) + assert metadata.name == "bar" + assert metadata.body == "body" + assert metadata.key == "bar" + assert metadata.id == "1002" def test_i_can_get_a_concept(): - foo = get_concept("foo", variables=("var1",)) - - assert isinstance(foo, Concept) - assert foo.name == "foo" - assert foo.key is None - assert foo.id is None - assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') + foo = get_concept("foo", variables=("var1",)) + + assert isinstance(foo, Concept) + assert foo.name == "foo" + assert foo.key is None + assert foo.id is None + assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') def test_i_can_request_basic_initialization_when_getting_a_concept(): - next_id = GetNextId() - foo = get_concept("foo", variables=("var1",), sequence=next_id) - - assert foo.name == "foo" - assert foo.key == "foo" - assert foo.id == "1001" - assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') + next_id = GetNextId() + foo = get_concept("foo", variables=("var1",), sequence=next_id) + + assert foo.name == "foo" + assert foo.key == "foo" + assert foo.id == "1001" + assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') def test_i_can_get_multiple_concepts(context): - next_id = GetNextId() - - foo, bar, baz = get_concepts(context, - "foo", - "bar", - get_concept("baz", definition="baz var1", variables=("var1",)), - sequence=next_id) - assert foo.name == "foo" - assert foo.id == "1001" - assert foo.key == "foo" - assert bar.name == "bar" - assert bar.id == "1002" - assert bar.key == "bar" - assert baz.name == "baz" - assert baz.id == "1003" - assert baz.key == "baz __var__0" + next_id = GetNextId() + + foo, bar, baz = get_concepts(context, + "foo", + "bar", + get_concept("baz", definition="baz var1", variables=("var1",)), + sequence=next_id) + assert foo.name == "foo" + assert foo.id == "1001" + assert foo.key == "foo" + assert bar.name == "bar" + assert bar.id == "1002" + assert bar.key == "bar" + assert baz.name == "baz" + assert baz.id == "1003" + assert baz.key == "baz __var__0" def test_i_can_get_multiple_concepts_using_sheerka(sheerka, context): - foo, bar, baz = get_concepts(context, - "foo", - "bar", - get_concept("baz", definition="baz var1", variables=("var1",)), - use_sheerka=True) - assert foo.name == "foo" - assert foo.id == "1001" - assert foo.key == "foo" - assert bar.name == "bar" - assert bar.id == "1002" - assert bar.key == "bar" - assert baz.name == "baz" - assert baz.id == "1003" - assert baz.key == "baz __var__0" - assert baz.get_value("var1") is NotInit - - # the concepts are defined in Sheerka, so we can instantiate them - baz2 = sheerka.newn("baz", var1="value for var1") - assert baz2.name == "baz" - assert baz2.id == "1003" - assert baz2.key == "baz __var__0" - assert baz2.get_value("var1") == "value for var1" + foo, bar, baz = get_concepts(context, + "foo", + "bar", + get_concept("baz", definition="baz var1", variables=("var1",)), + use_sheerka=True) + assert foo.name == "foo" + assert foo.id == "1001" + assert foo.key == "foo" + assert bar.name == "bar" + assert bar.id == "1002" + assert bar.key == "bar" + assert baz.name == "baz" + assert baz.id == "1003" + assert baz.key == "baz __var__0" + assert baz.get_value("var1") is NotInit + + # the concepts are defined in Sheerka, so we can instantiate them + baz2 = sheerka.newn("baz", var1="value for var1") + assert baz2.name == "baz" + assert baz2.id == "1003" + assert baz2.key == "baz __var__0" + assert baz2.get_value("var1") == "value for var1" def test_i_can_get_multiple_concepts_when_same_name(sheerka, context): - one_str, one_int = get_concepts(context, - get_metadata("one", body="'one'"), - get_metadata("one", body="1"), - use_sheerka=True) - - assert sheerka.isinstance(one_str, "one") - assert sheerka.isinstance(one_int, "one") + one_str, one_int = get_concepts(context, + get_metadata("one", body="'one'"), + get_metadata("one", body="1"), + use_sheerka=True) + + assert sheerka.isinstance(one_str, "one") + assert sheerka.isinstance(one_int, "one") def test_i_can_create_test_concept(): - concept = get_concept("one", body="'one'") - - test_concept = get_evaluated_concept(concept, body='hello', a="value for a") - - assert test_concept.get_metadata() == concept.get_metadata() - assert test_concept.get_value(ConceptDefaultProps.BODY) == "hello" - assert test_concept.get_value("a") == "value for a" + concept = get_concept("one", body="'one'") + + test_concept = get_evaluated_concept(concept, body='hello', a="value for a") + + assert test_concept.get_metadata() == concept.get_metadata() + assert test_concept.get_value(ConceptDefaultProps.BODY) == "hello" + assert test_concept.get_value("a") == "value for a" def test_i_can_dummy_evaluate_concept(): - concept = get_concept("one", body="'one'", where="True", pre="False", ret="1", post="1.0") - - evaluated = get_evaluated_concept(concept) - assert evaluated.get_value(ConceptDefaultProps.WHERE) is True - assert evaluated.get_value(ConceptDefaultProps.PRE) is False - assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" - assert evaluated.get_value(ConceptDefaultProps.RET) == 1 - assert evaluated.get_value(ConceptDefaultProps.POST) == 1.0 - - concept = get_concept("one", body='"one"', ret="'a value'") - evaluated = get_evaluated_concept(concept, ret='forced value') - assert evaluated.get_value(ConceptDefaultProps.WHERE) == NotInit - assert evaluated.get_value(ConceptDefaultProps.PRE) == NotInit - assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" - assert evaluated.get_value(ConceptDefaultProps.RET) == "forced value" - assert evaluated.get_value(ConceptDefaultProps.POST) == NotInit + concept = get_concept("one", body="'one'", where="True", pre="False", ret="1", post="1.0") + + evaluated = get_evaluated_concept(concept) + assert evaluated.get_value(ConceptDefaultProps.WHERE) is True + assert evaluated.get_value(ConceptDefaultProps.PRE) is False + assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" + assert evaluated.get_value(ConceptDefaultProps.RET) == 1 + assert evaluated.get_value(ConceptDefaultProps.POST) == 1.0 + + concept = get_concept("one", body='"one"', ret="'a value'") + evaluated = get_evaluated_concept(concept, ret='forced value') + assert evaluated.get_value(ConceptDefaultProps.WHERE) == NotInit + assert evaluated.get_value(ConceptDefaultProps.PRE) == NotInit + assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" + assert evaluated.get_value(ConceptDefaultProps.RET) == "forced value" + assert evaluated.get_value(ConceptDefaultProps.POST) == NotInit def test_i_can_get_from(): - res = get_from(_mt("c:i am a concept#1001:")) - assert res == [_mt("1001", 0, 6)] - - res = get_from(_ut("some unrecognized stuff")) - assert res == [_ut("some unrecognized stuff", 0, 4)] - - res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff")) - assert res == [_mt("1001", 0, 6), _ut("some unrecognized stuff", 7, 11)] - - res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff"), parser="other") - assert res == [_mt("1001", 0, 6, parser="other"), _ut("some unrecognized stuff", 7, 11)] - - res = get_from(_mt("c:i am a concept#1001:"), _mt("c:#1001:")) - assert res == [_mt("1001", 0, 6), _mt("1001", 7, 13)] + res = get_from(_mt("c:i am a concept#1001:")) + assert res == [_mt("1001", 0, 6)] + + res = get_from(_ut("some unrecognized stuff")) + assert res == [_ut("some unrecognized stuff", 0, 4)] + + res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff")) + assert res == [_mt("1001", 0, 6), _ut("some unrecognized stuff", 7, 11)] + + res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff"), parser="other") + assert res == [_mt("1001", 0, 6, parser="other"), _ut("some unrecognized stuff", 7, 11)] + + res = get_from(_mt("c:i am a concept#1001:"), _mt("c:#1001:")) + assert res == [_mt("1001", 0, 6), _mt("1001", 7, 13)]