Compare commits

...

4 Commits

Author SHA1 Message Date
kodjo 078d8e5df6 Restarting the project.
Fixing unit tests. Continuing SyaParser
2026-04-12 09:40:04 +02:00
kodjo 3be854d34c intermediate commit 2026-04-11 21:01:39 +02:00
kodjo a729d98a0d Working on #21 : Working on SyaConceptsParser.py 2026-04-11 21:01:39 +02:00
kodjo a7043b1dd8 Working on #21 : Created classes 2026-04-11 21:01:39 +02:00
24 changed files with 2766 additions and 1195 deletions
+433
View File
@@ -0,0 +1,433 @@
# SyaConceptsParser
## Purpose
`SyaConceptsParser` parse des **séquences de concepts avec paramètres** (variables).
Il complète `SimpleConceptsParser` qui, lui, ne gère que les concepts sans paramètres.
Exemples de concepts reconnus :
- `a plus b` → reconnaît `1 plus 2`, `x plus y plus z`, etc.
- `if a then b end` → reconnaît `if x > 0 then print x end`
- `a long named concept b` → reconnaît `1 long named concept 2`
Le cas fondamental visé est la **composition de concepts** : `1 plus 2 times 3`, où
`times` doit être évalué avant `plus`. C'est ce problème de précédence que résout le
Shunting Yard Algorithm.
---
## Le Shunting Yard Algorithm (SYA)
Algorithme de Dijkstra (1961) qui convertit une expression en notation infixe
(`1 + 2 * 3`) en **notation polonaise inverse** (RPN : `1 2 3 * +`), en respectant
la précédence des opérateurs.
### Principe
Deux structures : un **stack d'opérateurs** et une **queue de sortie**.
```
Entrée : 1 + 2 * 3
┌──────────────────────────────────────────────┐
Token │ Action Stack Sortie │
─────────┼──────────────────────────────────────────────┤
1 │ → sortie [] [1] │
+ │ → stack [+] [1] │
2 │ → sortie [+] [1, 2] │
* │ prec(*) > prec(+) [+, *] [1, 2] │
│ → stack (pas de pop) [1, 2] │
3 │ → sortie [+, *] [1, 2, 3] │
fin │ vider stack [] [1, 2, 3, *, +] │
└──────────────────────────────────────────────────────┘
RPN : 1 2 3 * + ≡ 1 + (2 * 3)
```
### Règle de pop
Quand on empile un opérateur `op`, on dépile d'abord tout opérateur `top` tel que :
`précédence(top) >= précédence(op)`
Cela garantit que les opérateurs de haute précédence sont évalués en premier.
---
## Adaptation dans Sheerka
Le SYA original travaille sur des **tokens atomiques** (chiffres, `+`, `*`).
Sheerka l'adapte pour travailler sur des **concepts** qui :
1. **S'identifient par plusieurs tokens** — un concept comme `if a then b end`
contient plusieurs mots-clés (`if`, `then`, `end`) entrelacés avec des paramètres.
L'algorithme original reconnaît un opérateur en un seul token.
2. **Peuvent contenir N paramètres** — un opérateur binaire a exactement 2 opérandes.
Un concept Sheerka peut en avoir 0, 1, 2 ou plus.
3. **Les paramètres peuvent eux-mêmes être des concepts** — dans `1 plus 2 times 3`,
le paramètre `b` de `plus` est le résultat du concept `times`. La récursion est
gérée par l'imbrication des workflows.
### Correspondance SYA ↔ Sheerka
| SYA original | Sheerka |
|---|---|
| Opérateur (`+`, `*`) | `ConceptToRecognize` (concept avec paramètres) |
| Opérande (nombre, variable) | `UnrecognizedToken` ou `ConceptToken` |
| Stack d'opérateurs | `state_context.stack` |
| Queue de sortie | `state_context.parameters` |
| Précédence | `InitConceptParsing.must_pop()` |
| Résultat RPN | `MetadataToken` dans `state_context.result` |
### Différences structurelles
**Reconnaissance multi-tokens** — là où SYA lit un token pour identifier `*`,
Sheerka doit lire `long named concept` (3 tokens) pour identifier le concept
`a long named concept b`. La classe `ReadConcept` gère cette lecture séquentielle.
**Structure `expected`** — le concept `if a then b end` est décomposé en segments :
```
[("if ", 0), (" then ", 1), (" end", 1)]
──────── ────────── ──────────
keyword keyword keyword
0 params 1 param 1 param
avant avant avant
```
Chaque segment indique combien de paramètres précèdent ce groupe de tokens, et
quels tokens consommer pour valider ce segment.
**Précédence non encore implémentée**`must_pop()` retourne toujours `False`.
La composition de concepts n'est donc pas encore active. C'est la prochaine étape
d'implémentation.
---
## Architecture
### Deux workflows interdépendants
```mermaid
graph TD
A[#tokens_wkf] -->|concept keyword found| B[#concept_wkf]
B -->|concept fully parsed| A
A -->|EOF| C[end]
```
Le parser démarre toujours dans `#tokens_wkf`. À chaque fois qu'un mot-clé
correspondant au premier token d'un concept est détecté, un **fork** est créé et
envoyé dans `#concept_wkf`. Une fois le concept reconnu, on revient dans
`#tokens_wkf` pour continuer la lecture.
---
## Workflow `#tokens_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> prepare_read_tokens
prepare_read_tokens --> read_tokens
read_tokens --> read_tokens : no concept found (loop)
read_tokens --> eof : EOF
read_tokens --> concepts_found : concept keyword detected (fork)
eof --> end : ManageUnrecognized
concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf
end --> [*]
```
**`PrepareReadTokens`** : initialise le buffer et mémorise `buffer_start_pos`.
**`ReadTokens`** : lit un token, consulte `get_metadata_from_first_token`. Si un concept
peut démarrer à ce token → **fork** avec un clone du contexte où `concept_to_recognize`
est renseigné. Le chemin principal continue à lire.
**`ManageUnrecognized("concepts found")`** : traite le buffer accumulé avant le
mot-clé (via `SimpleConceptsParser`). Les tokens non reconnus deviennent
`UnrecognizedToken` et sont ajoutés à `parameters`.
---
## Workflow `#concept_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> init_concept_parsing
init_concept_parsing --> manage_parameters
manage_parameters --> read_concept
read_concept --> read_parameters : more segments
read_concept --> finalize_concept : all segments done
read_concept --> token_mismatch : token mismatch
read_concept --> error_eof : unexpected EOF
read_parameters --> manage_parameters : loop
read_parameters --> finalize_concept : EOF
finalize_concept --> tokens_wkf : #tokens_wkf
token_mismatch --> end
error_eof --> end
end --> [*]
```
**`InitConceptParsing`** :
- Vérifie que le nombre de paramètres déjà collectés est suffisant
- Retire le premier token du segment (déjà consommé par `ReadTokens`)
- Applique le SYA : empile le concept sur le stack
**`ReadConcept`** : lit les tokens fixes du segment courant un par un.
Si tous correspondent → `pop(0)` du segment et continue.
**`ReadParameters`** : lit UN token dans le buffer. Revient à
`ManageUnrecognized` qui tente de le reconnaître via `SimpleConceptsParser`.
**`FinalizeConceptParsing`** :
- Dépile le concept du stack
- Calcule `start` (depuis le premier paramètre) et `end` (position courante)
- Crée un `MetadataToken(concept.metadata, start, end, resolution_method, "sya")`
- Vide stack et parameters
- Retourne à `#tokens_wkf`
---
## Exemple pas à pas — `"1 plus 2"`
Concept défini : `a plus b` (variables `a`, `b`).
**Tokens :**
```
pos : 0 1 2 3 4 5
tok : "1" " " "plus" " " "2" EOF
```
**`expected` pour ce concept :**
```
[([" ", "plus", " "], 1), ([], 1)]
segment 0 → 1 param avant, lire " plus "
segment 1 → 1 param avant, lire rien (concept se termine par un param)
```
**Déroulé :**
```
PrepareReadTokens → buffer_start_pos = 0
ReadTokens "1" → no concept, buffer = ["1"]
ReadTokens " " → no concept, buffer = ["1", " "]
ReadTokens "plus" → concept "a plus b" trouvé !
┌── FORK ──────────────────────────────────────────────────────┐
│ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │
└──────────────────────────────────────────────────────────────┘
ManageUnrecognized("concepts found")
buffer = ["1"," "] → SimpleConceptsParser → not found
parameters = [UT("1 ", start=0, end=1)]
buffer_start_pos = 3
→ #concept_wkf
InitConceptParsing
expected[0] = ([" ","plus"," "], 1)
need 1 param → have 1 ✓
strip leading WS → ["plus"," "]
pop "plus" (déjà lu) → [" "]
SYA: stack = [CTR(a_plus_b)]
ManageUnrecognized("manage parameters") : buffer vide → rien
ReadConcept : lit [" "] → pos 3 = " " ✓
expected.pop(0) → remaining = [([], 1)]
→ "read parameters"
ReadParameters : lit "2" (pos 4)
buffer = ["2"]
→ "manage parameters"
ManageUnrecognized("manage parameters")
buffer = ["2"] → not a concept
parameters = [UT("1 ", 0, 1), UT("2", 3, 3)]
buffer_start_pos = 5
ReadConcept : expected = [([], 1)], lit 0 tokens
expected.pop(0) → empty → "finalize concept"
FinalizeConceptParsing
concept = stack.pop() = CTR(a_plus_b)
start = parameters[0].start = 0
end = parser_input.pos = 4
result.append(MetadataToken(metadata, 0, 4, "key", "sya"))
→ #tokens_wkf
ReadTokens → EOF → ManageUnrecognized("eof") → end
```
**Résultat :**
```
MultipleChoices([
[MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")]
])
```
---
## Exemple — séquence `"1 plus 2 3 plus 7"`
Même concept `a plus b`. Le parser reconnaît deux concepts successifs dans un seul passage.
```
pos : 0 1 2 3 4 5 6 7 8 9 10 11
tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF
```
Après `FinalizeConceptParsing` du premier concept (pos=4), `#tokens_wkf` repart :
```
PrepareReadTokens → buffer_start_pos = 5
ReadTokens " " → buffer = [" "]
ReadTokens "3" → buffer = [" ","3"]
ReadTokens " " → buffer = [" ","3"," "]
ReadTokens "plus" → fork
ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9
...
FinalizeConceptParsing
start = 5, end = 10
result.append(MetadataToken(1001, 5, 10, "key", "sya"))
```
**Résultat final (un seul path, deux concepts) :**
```
MultipleChoices([
[
MetadataToken(1001, start=0, end=4, parser="sya"),
MetadataToken(1001, start=5, end=10, parser="sya"),
]
])
```
---
## Exemple futur — composition `"1 plus 2 times 3"`
> **Note :** cet exemple nécessite l'implémentation de `must_pop()`.
> Aujourd'hui `must_pop()` retourne toujours `False`.
Concepts : `a plus b` (basse précédence), `a times b` (haute précédence).
**Comportement attendu après implémentation :**
```
Expression : 1 plus 2 times 3
SYA avec précédence times > plus :
Token "1" → parameters = [1] stack = []
Token "plus" → stack = [plus] parameters = [1]
Token "2" → parameters = [1, 2] stack = [plus]
Token "times" → prec(times) > prec(plus) → pas de pop
stack = [plus, times] parameters = [1, 2]
Token "3" → parameters = [1, 2, 3] stack = [plus, times]
Finalize :
pop "times" → MetadataToken(times, params=[2, 3])
pop "plus" → MetadataToken(plus, params=[1, times_result])
```
**Ce que `must_pop()` doit implémenter :**
```python
def must_pop(self, current_concept, top_of_stack_concept):
return precedence(top_of_stack_concept) >= precedence(current_concept)
```
Sans cette règle, les deux concepts seraient traités de gauche à droite avec la même
précédence, ce qui donnerait `(1 plus 2) times 3` au lieu de `1 plus (2 times 3)`.
---
## Structure `expected` en détail
Pour le concept `if a then b end` (clé `"if __var__0 then __var__1 end"`) :
```
_get_expected_tokens("if __var__0 then __var__1 end")
→ [
(["if", " "], 0), # lire "if " avant le 1er param
([" ", "then", " "], 1), # lire " then " avant le 2ème param
([" ", "end"], 1), # lire " end" avant le 3ème... non, 1 param avant
]
```
Pendant le parsing, `expected` est **modifié en place** :
- `InitConceptParsing` retire le premier token du segment 0 (déjà lu par `ReadTokens`)
- `ReadConcept` consomme les tokens du segment courant puis fait `pop(0)`
- Quand `expected` est vide → `FinalizeConceptParsing`
---
## Structures de données clés
### `StateMachineContext`
```
StateMachineContext
├── parser_input ParserInput flux de tokens + curseur
├── other_parsers [SimpleConceptsParser]
├── buffer list[Token] tokens en attente de classification
├── buffer_start_pos int position de début du buffer courant
├── concept_to_recognize ConceptToRecognize | None
├── stack list[CTR] SYA — stack d'opérateurs
├── parameters list[UT|CT] SYA — queue de sortie
├── result list[MetadataToken]
└── errors list
```
### `MetadataToken` (sortie)
```
MetadataToken
├── metadata ConceptMetadata (id, name, key, variables, ...)
├── start int position du 1er token de l'expression
├── end int position du dernier token
├── resolution_method "key" | "name" | "id"
└── parser "sya"
```
### Positions dans `"1 plus 2"` :
```
"1 plus 2"
0 1 2 3 4
│ │ │ │ │
1 _ plus _ 2
MetadataToken : start=0, end=4
```
---
## Différences avec `SimpleConceptsParser`
| | `SimpleConceptsParser` | `SyaConceptsParser` |
|---|---|---|
| Concepts ciblés | Sans paramètres | Avec paramètres |
| `concept_wkf` | 2 états | 8 états |
| Contenu de `result` | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` uniquement |
| Paramètres | N/A | Collectés dans `parameters` |
| Parser tag | `"simple"` | `"sya"` |
| SYA | Non | Oui (précédence à implémenter) |
---
## Gestion des erreurs
| Erreur | Cause | État atteint |
|---|---|---|
| `UnexpectedToken` | Token lu ≠ token attendu du concept | `TokenMismatch``end` |
| `UnexpectedEof` | Fin de l'entrée avant fin du concept | `ErrorEof``end` |
| `NotEnoughParameters` | Pas assez de params avant un segment | Exception levée |
Les erreurs sont collectées depuis **tous les paths** et transmises à `error_sink` dans
`parse()`. Un path avec erreurs est exclu de `_select_best_paths`.
+545
View File
@@ -0,0 +1,545 @@
# SyaConceptsParser
## Purpose
`SyaConceptsParser` parses **sequences of concepts with parameters** (variables).
It complements `SimpleConceptsParser`, which only handles parameter-less concepts.
Examples of recognized concepts:
- `a plus b` → matches `1 plus 2`, `x plus y`, etc.
- `if a then b end` → matches `if x > 0 then print x end`
- `a long named concept b` → matches `1 long named concept 2`
The primary goal is **concept composition**: `1 plus 2 times 3`, where `times` must
be evaluated before `plus`. This precedence problem is what the Shunting Yard
Algorithm solves.
---
## The Shunting Yard Algorithm (SYA)
Dijkstra's algorithm (1961) converts an infix expression (`1 + 2 * 3`) into
**Reverse Polish Notation** (RPN: `1 2 3 * +`), respecting operator precedence.
### Principle
Two structures: an **operator stack** and an **output queue**.
```
Input: 1 + 2 * 3
┌──────────────────────────────────────────────┐
Token │ Action Stack Output │
─────────┼──────────────────────────────────────────────┤
1 │ → output queue [] [1] │
+ │ → stack [+] [1] │
2 │ → output queue [+] [1, 2] │
* │ prec(*) > prec(+) [+, *] [1, 2] │
│ → stack (no pop) │
3 │ → output queue [+, *] [1, 2, 3] │
end │ flush stack [] [1,2,3,*,+] │
└──────────────────────────────────────────────────────┘
RPN: 1 2 3 * + ≡ 1 + (2 * 3)
```
### Pop rule
When pushing operator `op`, first pop any stack-top operator `top` where:
`precedence(top) >= precedence(op)`
This ensures higher-precedence operators are evaluated first.
---
## Sheerka's Adaptation
The original SYA works on **atomic tokens** (digits, `+`, `*`).
Sheerka adapts it for **concepts** that:
1. **Are identified by multiple tokens** — a concept like `if a then b end` has
several keywords (`if`, `then`, `end`) interleaved with parameters.
The original SYA identifies an operator with a single token.
2. **Can have N parameters** — a binary operator has exactly 2 operands.
A Sheerka concept can have 0, 1, 2 or more parameters.
3. **Parameters can themselves be concepts** — in `1 plus 2 times 3`, the parameter
`b` of `plus` is the result of the `times` concept. This recursion is handled
by the nested workflow structure.
### SYA ↔ Sheerka mapping
| Original SYA | Sheerka |
|---|---|
| Operator (`+`, `*`) | `ConceptToRecognize` (concept with parameters) |
| Operand (number, variable) | `UnrecognizedToken` or `ConceptToken` |
| Operator stack | `state_context.stack` |
| Output queue | `state_context.parameters` |
| Precedence rule | `InitConceptParsing.must_pop()` |
| RPN result | `MetadataToken` in `state_context.result` |
### Structural differences
**Multi-token recognition** — where SYA reads a single token to identify `*`,
Sheerka must read `long named concept` (3 tokens) to identify concept
`a long named concept b`. The `ReadConcept` state handles this sequential reading.
**The `expected` structure** — concept `if a then b end` is decomposed into segments:
```
[("if ", 0), (" then ", 1), (" end", 1)]
───────── ────────── ──────────
keyword keyword keyword
0 params 1 param 1 param
before before before
```
Each segment states how many parameters precede it and which tokens to consume
to validate it.
**Precedence not yet implemented**`must_pop()` always returns `False`.
Concept composition with precedence rules is the next implementation step.
---
## Architecture
### Two interdependent workflows
```mermaid
graph TD
A[#tokens_wkf] -->|concept keyword found - fork| B[#concept_wkf]
A -->|token not a concept keyword - buffered, loop| A
B -->|concept fully parsed| A
A -->|EOF| C[end]
```
The parser always starts in `#tokens_wkf`. Tokens that do not match any concept
keyword are accumulated in a buffer and the loop continues. Whenever a token
matching the first keyword of a known concept is detected, a **fork** is created
and sent into `#concept_wkf` — the main path keeps looping independently. Once the
concept is recognized, the fork returns to `#tokens_wkf` to continue reading.
---
## Workflow `#tokens_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> prepare_read_tokens
prepare_read_tokens --> read_tokens
read_tokens --> read_tokens : no concept found (loop)
read_tokens --> eof : EOF
read_tokens --> concepts_found : concept keyword detected (fork)
eof --> end : ManageUnrecognized
concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf
end --> [*]
```
**`PrepareReadTokens`**: initializes the buffer and records `buffer_start_pos`.
**`ReadTokens`**: reads one token, calls `get_metadata_from_first_token`. If a concept
can start at this token → **fork** with a cloned context where `concept_to_recognize`
is set. The main path continues scanning.
**`ManageUnrecognized("concepts found")`**: processes the buffer accumulated before
the keyword (via `SimpleConceptsParser`). Unrecognized tokens become
`UnrecognizedToken` and are added to `parameters`.
---
## Workflow `#concept_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> init_concept_parsing
init_concept_parsing --> manage_parameters
manage_parameters --> read_concept
read_concept --> read_parameters : more segments
read_concept --> finalize_concept : all segments done
read_concept --> token_mismatch : token mismatch
read_concept --> error_eof : unexpected EOF
read_parameters --> manage_parameters : loop
read_parameters --> finalize_concept : EOF
finalize_concept --> tokens_wkf : #tokens_wkf
token_mismatch --> end
error_eof --> end
end --> [*]
```
**`InitConceptParsing`**:
- Verifies the number of already-collected parameters is sufficient
- Removes the first token of segment 0 (already consumed by `ReadTokens`)
- Applies SYA: pushes the concept onto the stack
**`ReadConcept`**: reads the fixed tokens of the current segment one by one.
If all match → `pop(0)` the segment and continue.
**`ReadParameters`**: reads ONE token into the buffer. Returns to
`ManageUnrecognized` which tries to recognize it via `SimpleConceptsParser`.
**`FinalizeConceptParsing`**:
- Pops the concept from the stack
- Computes `start` (from the first parameter) and `end` (current position)
- Creates `MetadataToken(concept.metadata, start, end, resolution_method, "sya")`
- Clears stack and parameters
- Returns to `#tokens_wkf`
---
## Step-by-step example — `"1 plus 2"`
Concept: `a plus b` (variables `a`, `b`).
**Tokens:**
```
pos : 0 1 2 3 4 5
tok : "1" " " "plus" " " "2" EOF
```
**`expected` for this concept:**
```
[([" ", "plus", " "], 1), ([], 1)]
segment 0 → 1 param before, read " plus "
segment 1 → 1 param before, read nothing (concept ends with a param)
```
**Execution trace:**
```
PrepareReadTokens → buffer_start_pos = 0
ReadTokens "1" → no concept, buffer = ["1"]
ReadTokens " " → no concept, buffer = ["1", " "]
ReadTokens "plus" → concept "a plus b" found!
┌── FORK ─────────────────────────────────────────────────────┐
│ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │
└─────────────────────────────────────────────────────────────┘
ManageUnrecognized("concepts found")
buffer = ["1"," "] → SimpleConceptsParser → not found
parameters = [UnrecognizedToken("1 ", start=0, end=1)]
buffer_start_pos = 3
→ #concept_wkf
InitConceptParsing
expected[0] = ([" ","plus"," "], 1)
need 1 param → have 1 ✓
strip leading WS → ["plus"," "]
pop "plus" (already consumed) → [" "]
SYA: stack = [CTR(a_plus_b)]
ManageUnrecognized("manage parameters"): buffer empty → nothing
ReadConcept: reads [" "] → pos 3 = " " ✓
expected.pop(0) → remaining = [([], 1)]
→ "read parameters"
ReadParameters: reads "2" at pos 4
buffer = ["2"]
→ "manage parameters"
ManageUnrecognized("manage parameters")
buffer = ["2"] → not a concept
parameters = [UT("1 ", 0, 1), UT("2", 3, 3)]
buffer_start_pos = 5
ReadConcept: expected = [([], 1)], reads 0 tokens
expected.pop(0) → empty → "finalize concept"
FinalizeConceptParsing
concept = stack.pop() = CTR(a_plus_b)
start = parameters[0].start = 0
end = parser_input.pos = 4
result.append(MetadataToken(metadata, 0, 4, "key", "sya"))
→ #tokens_wkf
ReadTokens → EOF → ManageUnrecognized("eof") → end
```
**Result:**
```
MultipleChoices([
[MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")]
])
```
---
## Example — sequence `"1 plus 2 3 plus 7"`
Same concept `a plus b`. The parser recognizes two concepts in one pass.
```
pos : 0 1 2 3 4 5 6 7 8 9 10 11
tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF
```
After `FinalizeConceptParsing` for the first concept (pos=4), `#tokens_wkf` restarts:
```
PrepareReadTokens → buffer_start_pos = 5
ReadTokens " " → buffer = [" "]
ReadTokens "3" → buffer = [" ","3"]
ReadTokens " " → buffer = [" ","3"," "]
ReadTokens "plus" → fork
ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9
...
FinalizeConceptParsing
start = 5, end = 10
result.append(MetadataToken(1001, 5, 10, "key", "sya"))
```
**Final result (one path, two concepts):**
```
MultipleChoices([
[
MetadataToken(1001, start=0, end=4, parser="sya"),
MetadataToken(1001, start=5, end=10, parser="sya"),
]
])
```
---
## Future example — composition `"1 plus 2 times 3"`
> **Note:** this example requires implementing `must_pop()`.
> Currently `must_pop()` always returns `False`.
Concepts: `a plus b` (low precedence), `a times b` (high precedence).
**Expected behavior after implementation:**
```
Expression: 1 plus 2 times 3
SYA with precedence times > plus:
Token "1" → parameters = [1] stack = []
Token "plus" → stack = [plus] parameters = [1]
Token "2" → parameters = [1, 2] stack = [plus]
Token "times" → prec(times) > prec(plus) → no pop
stack = [plus, times] parameters = [1, 2]
Token "3" → parameters = [1, 2, 3] stack = [plus, times]
Finalize:
pop "times" → MetadataToken(times, params=[2, 3])
pop "plus" → MetadataToken(plus, params=[1, times_result])
```
**What `must_pop()` must implement:**
```python
def must_pop(self, current_concept, top_of_stack_concept):
return precedence(top_of_stack_concept) >= precedence(current_concept)
```
Without this rule, both concepts are processed left-to-right with equal precedence,
yielding `(1 plus 2) times 3` instead of `1 plus (2 times 3)`.
---
## The `expected` structure in detail
For concept `if a then b end` (key `"if __var__0 then __var__1 end"`):
```
_get_expected_tokens("if __var__0 then __var__1 end")
→ [
(["if", " "], 0), # read "if " before 1st param
([" ", "then", " "], 1), # read " then " before 2nd param
([" ", "end"], 1), # read " end" — 1 param before, no trailing param
]
```
During parsing, `expected` is **modified in place**:
- `InitConceptParsing` removes the first token of segment 0 (already read by `ReadTokens`)
- `ReadConcept` consumes the tokens of the current segment then calls `pop(0)`
- When `expected` is empty → `FinalizeConceptParsing`
---
## Key data structures
### `StateMachineContext`
```
StateMachineContext
├── parser_input ParserInput token stream + cursor
├── other_parsers [SimpleConceptsParser]
├── buffer list[Token] tokens pending classification
├── buffer_start_pos int start position of the current buffer
├── concept_to_recognize ConceptToRecognize | None
├── stack list[CTR] SYA — operator stack
├── parameters list[UT|CT] SYA — output queue
├── result list[MetadataToken]
└── errors list
```
### `MetadataToken` (output)
```
MetadataToken
├── metadata ConceptMetadata (id, name, key, variables, ...)
├── start int position of the first token of the expression
├── end int position of the last token
├── resolution_method "key" | "name" | "id"
└── parser "sya"
```
### Token positions in `"1 plus 2"`:
```
"1 plus 2"
0 1 2 3 4
│ │ │ │ │
1 _ plus _ 2
MetadataToken: start=0, end=4
```
---
## Differences vs `SimpleConceptsParser`
| | `SimpleConceptsParser` | `SyaConceptsParser` |
|---|---|---|
| Target concepts | No parameters | With parameters |
| `concept_wkf` states | 2 | 8 |
| `result` contents | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` only |
| Parameters | N/A | Collected in `parameters` list |
| Parser tag | `"simple"` | `"sya"` |
| SYA | No | Yes (precedence to implement) |
---
## Error handling
| Error | Cause | State reached |
|---|---|---|
| `UnexpectedToken` | Read token ≠ expected concept token | `TokenMismatch``end` |
| `UnexpectedEof` | Input ends before concept is complete | `ErrorEof``end` |
| `NotEnoughParameters` | Too few params before a segment | Exception raised |
Errors are collected from **all paths** and forwarded to `error_sink` in `parse()`.
A path with errors is excluded from `_select_best_paths`.
---
## Known limitations and proposed improvements
The current implementation correctly handles simple cases (single-token parameters,
non-nested concepts). The following issues must be addressed before enabling
precedence and real concept composition.
### 1. Parameters are limited to a single token
`ReadParameters` reads ONE token, then immediately calls `ManageUnrecognized`, which
returns to `ReadConcept` to match the next keyword segment. Multi-token parameters
therefore fail. For `if hello world then foo end` with parameter `a = "hello world"`:
```
ReadParameters reads "hello"
ManageUnrecognized → UT("hello") → ReadConcept tries to match " then "
ReadConcept reads " " ✓ then "world" ≠ "then" → MISMATCH
```
**Proposed fix:** `ReadParameters` should accumulate tokens until it detects the
start of the next keyword segment (lookahead on `expected[0][0]`), then hand the
full buffer to `ManageUnrecognized` for parsing in one pass.
---
### 2. Flat `parameters` list with no arity tracking
When `FinalizeConceptParsing` runs, `parameters` is a flat list. There is no
information about how many parameters belong to each concept on the stack. Once
`must_pop` is active and multiple concepts are stacked, `FinalizeConceptParsing`
cannot reconstruct the correct nesting.
Example: `1 plus 2 times 3` with `stack = [plus, times]` and
`parameters = [UT("1"), UT("2"), UT("3")]`. Without arity information there is no
way to determine that `times` consumes the last two parameters and `plus` consumes
the first one and the result of `times`.
The arity of each concept (`nb_variables`) is available in `expected` at push time
but is lost once `expected` is consumed during parsing.
**Proposed fix:** record the arity of each concept when it is pushed onto the stack
(in `apply_shunting_yard_algorithm`). `FinalizeConceptParsing` then pops the correct
number of parameters for each concept, from innermost to outermost, building
intermediate `MetadataToken` objects that are re-injected into `parameters` as
`ConceptToken` before processing the next concept on the stack.
---
### 3. Type mismatch in `ManageUnrecognized` for recognized parameters
When `SimpleConceptsParser` recognizes a token sequence, `ManageUnrecognized`
creates:
```python
state_context.parameters.append(
ConceptToken(res.items[0], buffer_start_pos, parser_input.pos - 1)
)
```
`res.items[0]` is a `list[MetadataToken]` (one complete path from
`SimpleConceptsParser`), but `ConceptToken.concept` is typed as `Concept`. Any
downstream code that uses this `ConceptToken` will receive a list where it expects a
`Concept` instance.
**Proposed fix:** define a dedicated container for a recognized parameter (e.g.
`ParsedParameterToken`) that wraps a `list[MetadataToken]` with start/end positions,
or flatten the result to a single `MetadataToken` when `res.items[0]` contains
exactly one token.
---
### 4. Variable-to-parameter mapping not applied
`FinalizeConceptParsing` creates a `MetadataToken` without populating the concept's
variables. `parameters = [UT("1 "), UT("2")]` maps positionally to
`variables = [("a", NotInit), ("b", NotInit)]`, but this mapping is never applied.
The produced `MetadataToken` is therefore incomplete: a downstream evaluator has no
way to retrieve parameter values from the token alone.
**Proposed fix:** in `FinalizeConceptParsing`, zip `parameters` with
`concept.metadata.variables` and store the result in the `MetadataToken`'s metadata,
or pass it as a dedicated field.
---
### 5. `SyaConceptsParser` absent from `other_parsers`
`other_parsers = [SimpleConceptsParser()]`. A parameter can be a simple (parameter-
less) concept, but never a composite concept with parameters. True composition —
where a parameter is itself a SYA-parsed concept — is structurally impossible with
the current design.
**Proposed fix:** add `SyaConceptsParser` to `other_parsers`. A guard is required
to prevent infinite recursion: the nested instance should exclude the concept
currently being recognized from its search space.
---
### Priority order
| # | Issue | Blocking |
|---|---|---|
| 1 | Multi-token parameters | Practical usability |
| 2 | `ConceptToken` type mismatch | Correctness |
| 3 | Variable-to-parameter mapping | Evaluation pipeline |
| 4 | Arity not tracked on the stack | `must_pop` / precedence |
| 5 | `SyaConceptsParser` absent from `other_parsers` | Real composition |
Issues 3 and 4 are interdependent with `must_pop`: implementing them independently
(before activating precedence) is still valuable and lays the correct foundation.
+4 -2
View File
@@ -1,8 +1,10 @@
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.13.0
bcrypt==5.0.0
argon2-cffi==25.1.0
argon2-cffi-bindings==25.1.0
certifi==2026.2.25
cffi==2.0.0
charset-normalizer==3.4.7
click==8.3.2
ecdsa==0.19.2
@@ -13,10 +15,10 @@ httpx==0.28.1
idna==3.11
iniconfig==2.3.0
packaging==26.0
passlib==1.7.4
pluggy==1.6.0
prompt_toolkit==3.0.52
pyasn1==0.6.3
pycparser==3.0
pydantic==2.12.5
pydantic_core==2.41.5
Pygments==2.20.0
+1 -1
View File
@@ -89,7 +89,7 @@ class Concept:
self._all_attrs = None
def __repr__(self):
text = f"(Concept {self._metadata.name}#{self._metadata.id}"
text = f"Concept({self._metadata.name}#{self._metadata.id}"
if self._metadata.pre:
text += f", #pre={self._metadata.pre}"
+14
View File
@@ -18,6 +18,20 @@ class MethodAccessError(SheerkaException):
return f"Cannot access method '{self.method_name}'"
class NotEnoughParameters(SheerkaException):
"""
Exception when not enough parameters are found during Sya parsing
"""
def __init__(self, concept_to_recognize, expected_nb_parameters, nb_parameters_found):
self.concept = concept_to_recognize
self.expected = expected_nb_parameters
self.found = nb_parameters_found
def get_error_msg(self) -> str:
return f"Failed to parse {self.concept}. Expecting {self.expected} parameters, but only found {self.found}."
@dataclass
class ErrorObj:
def get_error_msg(self) -> str:
+3 -2
View File
@@ -3,7 +3,7 @@ from core.ExecutionContext import ContextActions, ExecutionContext
from core.ReturnValue import ReturnValue
from core.concept import Concept
from evaluators.base_evaluator import EvaluatorEvalResult, EvaluatorMatchResult, NotForMe, OneReturnValueEvaluator
from parsers.SimpleParserParser import SimpleConceptsParser
from parsers.SimpleConceptsParser import SimpleConceptsParser
from parsers.state_machine import MetadataToken
@@ -28,7 +28,8 @@ class RecognizeSimpleConcept(OneReturnValueEvaluator):
parser_input = return_value.value.body
parser_input.reset()
parsed = self.parser.parse(context, parser_input)
error_sink = []
parsed = self.parser.parse(context, parser_input, error_sink)
if len(parsed.items) == 0:
not_for_me = ReturnValue(self.NAME, False, NotForMe(self.NAME, return_value.value))
+7
View File
@@ -102,6 +102,13 @@ class MultipleChoices:
return True
def __iadd__(self, other):
if not isinstance(other, MultipleChoices):
raise TypeError(f"unsupported operand type(s) for +=: 'MultipleChoices' and '{type(other)}'")
self.items += other.items
return self
def __hash__(self):
return hash(tuple(self.items))
+21
View File
@@ -0,0 +1,21 @@
from core.ExecutionContext import ExecutionContext
from parsers.ParserInput import ParserInput
class BaseParser:
"""
Base class for parser than can be used in concept recognition
"""
def __init__(self, name):
self.name = name # name of the parser
def parse(self, context: ExecutionContext, parser_input: ParserInput, error_sink: list):
"""
Default signature for parsing
:param context:
:param parser_input:
:param error_sink:
:return:
"""
pass
+15
View File
@@ -100,5 +100,20 @@ class ParserInput:
return res
@staticmethod
def from_tokens(tokens, text=None):
"""
returns a parser input, given already computed tokens
:param tokens:
:param text:
:return:
"""
res = ParserInput(None)
res.all_tokens = tokens
res.original_text = text or get_text_from_tokens(tokens)
res.pos = -1
res.end = len(res.all_tokens)
return res
def __repr__(self):
return f"ParserInput('{self.original_text}', len={len(self.all_tokens)})"
@@ -1,17 +1,71 @@
from core.concept import DefinitionType
from evaluators.base_evaluator import MultipleChoices
from parsers.state_machine import ConceptToRecognize, End, ManageUnrecognized, MetadataToken, PrepareReadTokens, \
ReadConcept, ReadTokens, Start, StateMachine, StateMachineContext, UnrecognizedToken
from parsers.BaseParser import BaseParser
from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.state_machine import ConceptToRecognize, End, MetadataToken, PrepareReadTokens, \
ReadTokens, Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken
from parsers.tokenizer import Token, TokenKind, Tokenizer
class SimpleConceptsParser:
class ReadConcept(State):
def run(self, state_context) -> StateResult:
start = state_context.parser_input.pos
for expected in state_context.concept_to_recognize.expected:
if not state_context.parser_input.next_token(False):
# eof before the concept is recognized
state_context.errors.append(UnexpectedEof(expected, state_context.parser_input.token))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
token = state_context.parser_input.token
if token.value != expected:
# token mismatch
state_context.errors.append(UnexpectedToken(token, expected))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
state_context.result.append(MetadataToken(state_context.concept_to_recognize.metadata,
start,
state_context.parser_input.pos,
state_context.concept_to_recognize.resolution_method,
"simple"))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
if len(state_context.result) > 0 and isinstance(old := state_context.result[-1], UnrecognizedToken):
# merge unrecognized if needed
state_context.result[-1] = UnrecognizedToken(old.buffer + buffer_as_str,
old.start,
state_context.parser_input.pos - 1)
else:
state_context.result.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
# clear the buffer
state_context.buffer.clear()
state_context.buffer_start_pos = state_context.parser_input.pos + 1
return StateResult(self.next_states[0])
class SimpleConceptsParser(BaseParser):
""""
This class to parser concepts with no variable
This class is to parse concepts with no parameter
ex : def concept I am a new concept
It parses a sequence of concepts
"""
def __init__(self):
super().__init__("simple")
tokens_wkf = {
Start("start", next_states=["prepare read tokens"]),
PrepareReadTokens("prepare read tokens", next_states=["read tokens"]),
@@ -30,7 +84,6 @@ class SimpleConceptsParser:
"#tokens_wkf": {t.name: t for t in tokens_wkf},
"#concept_wkf": {t.name: t for t in concept_wkf},
}
self.error_sink = []
@staticmethod
def get_metadata_from_first_token(context, token: Token):
@@ -55,12 +108,13 @@ class SimpleConceptsParser:
return concepts_by_key + concepts_by_name
def parse(self, context, parser_input):
def parse(self, context, parser_input, error_sink):
sm = StateMachine(self.workflows)
sm_context = StateMachineContext(context, parser_input, self.get_metadata_from_first_token)
sm_context = StateMachineContext(context, parser_input, self.get_metadata_from_first_token, [])
sm.run("#tokens_wkf", "start", sm_context)
selected = self.select_best_paths(sm)
error_sink.extend(sm_context.errors)
return MultipleChoices(selected)
+344
View File
@@ -0,0 +1,344 @@
from core.concept import DefinitionType
from core.error import NotEnoughParameters
from evaluators.base_evaluator import MultipleChoices
from parsers.BaseParser import BaseParser
from parsers.ParserInput import ParserInput
from parsers.SimpleConceptsParser import SimpleConceptsParser
from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.state_machine import ConceptToRecognize, ConceptToken, End, MetadataToken, PrepareReadTokens, ReadTokens, \
Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken
from parsers.tokenizer import Token, TokenKind, Tokenizer
class InitConceptParsing(State):
"""
A new concept is detected
Add some validations and prepare the list of expected tokens to read
"""
def must_pop(self, current_concept, previous_concept):
return False
def apply_shunting_yard_algorithm(self, state_context):
"""
Apply the sya
for all concepts in the stack
Check the precedence to define the concept must be popped (to result) or not
:param state_context:
:type state_context:
:return:
:rtype:
"""
if len(state_context.stack) > 0:
while self.must_pop(state_context.concept_to_recognize.metadata, state_context.stack[-1].metadata):
state_context.parameters.append(state_context.stack.pop())
state_context.stack.append(state_context.concept_to_recognize)
def run(self, state_context) -> StateResult:
expected = state_context.concept_to_recognize.expected
# check that there is enough parameters
if len(state_context.parameters) < expected[0][1]:
raise NotEnoughParameters(state_context.concept_to_recognize,
expected[0][1],
len(state_context.parameters))
# remove white space before the first token if any
if expected[0][0][0].type == TokenKind.WHITESPACE:
expected[0][0].pop(0)
# pop the first token (as it is already recognized)
expected[0][0].pop(0)
# apply shunting yard algorithm
self.apply_shunting_yard_algorithm(state_context)
return StateResult(self.next_states[0])
class ReadConcept(State):
"""
This state reads the tokens of the concepts that are known (that are not parameters)
For example, given the concept 'let me create the concept x'
We will parse 'let' 'me' 'create' 'the' 'concept'
But we will not parse 'x' because it's a parameter
"""
def run(self, state_context) -> StateResult:
expected = state_context.concept_to_recognize.expected
# eat the tokens
for expected_token in expected[0][0]:
if not state_context.parser_input.next_token(skip_whitespace=False):
# Failed to recognize concept because of eof
state_context.errors.append(UnexpectedEof(expected_token, None))
return StateResult("error eof")
token = state_context.parser_input.token
if expected_token.type != token.type or expected_token.value != token.value:
# Failed to recognize concept because of token mismatch
state_context.errors.append(UnexpectedToken(token, expected_token))
return StateResult("token mismatch")
expected.pop(0)
if not expected:
state_context.concept_to_recognize = None
return StateResult("finalize concept")
else:
return StateResult("read parameters")
class ReadParameters(State):
def run(self, state_context) -> StateResult:
assert not state_context.buffer
if not state_context.parser_input.next_token(False):
return StateResult("finalize concept")
state_context.buffer.append(state_context.parser_input.token)
return StateResult(self.next_states[0])
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
res = MultipleChoices([])
pi = ParserInput.from_tokens(state_context.buffer, text=buffer_as_str)
error_sink = []
# Try to parse the buffer
for parser in state_context.other_parsers:
res += parser.parse(state_context.context, pi, error_sink)
if error_sink:
raise NotImplemented("Cannot manage errors")
if len(res.items) == 0:
state_context.parameters.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
elif len(res.items) == 1:
state_context.parameters.append(ConceptToken(res.items[0],
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
else:
raise NotImplemented("Cannot manage multiple results")
# clear the buffer
state_context.buffer.clear()
state_context.buffer_start_pos = state_context.parser_input.pos + 1
return StateResult(self.next_states[0])
class TokenMismatch(State):
"""
When we realize that we are not parsing the correct concept.
The path ends without adding anything to the result.
"""
def run(self, state_context) -> StateResult:
return StateResult(self.next_states[0])
class ErrorEof(State):
"""
When EOF is detected before the concept is fully parsed.
The path ends without adding anything to the result.
"""
def run(self, state_context) -> StateResult:
return StateResult(self.next_states[0])
class FinalizeConceptParsing(State):
"""
The concept is fully parsed.
Pops the concept from the stack, builds a MetadataToken from it and its
collected parameters, appends it to the result, then returns to the
tokens workflow to continue parsing the rest of the input.
"""
def run(self, state_context) -> StateResult:
concept = state_context.stack.pop()
start = state_context.parameters[0].start if state_context.parameters \
else state_context.buffer_start_pos
end = state_context.parser_input.pos
state_context.result.append(
MetadataToken(concept.metadata, start, end, concept.resolution_method, "sya")
)
state_context.stack.clear()
state_context.parameters.clear()
return StateResult(self.next_states[0])
class SyaConceptsParser(BaseParser):
""""
This class is to parse concepts with parameter
ex : def concept a plus b as a + b
It parses a sequence of concepts
"""
def __init__(self):
super().__init__("sya")
tokens_wkf = {
Start("start", next_states=["prepare read tokens"]),
PrepareReadTokens("prepare read tokens", next_states=["read tokens"]),
ReadTokens("read tokens", next_states=["read tokens", "eof", "concepts found"]),
ManageUnrecognized("eof", next_states=["end"]),
ManageUnrecognized("concepts found", next_states=["#concept_wkf"]),
End("end", next_states=None)
}
concept_wkf = {
Start("start", next_states=["init concept parsing"]),
InitConceptParsing("init concept parsing", ["manage parameters"]),
ManageUnrecognized("manage parameters", next_states=["read concept"]),
ReadConcept("read concept", next_states=["finalize concept", "error eof", "token mismatch", "read parameters"]),
ReadParameters("read parameters", next_states=["manage parameters", "finalize concept"]),
FinalizeConceptParsing("finalize concept", next_states=["#tokens_wkf"]),
ErrorEof("error eof", ["end"]),
TokenMismatch("token mismatch", ["end"]),
End("end", next_states=None)
}
self.workflows = {
"#tokens_wkf": {t.name: t for t in tokens_wkf},
"#concept_wkf": {t.name: t for t in concept_wkf},
}
self.error_sink = []
@staticmethod
def _get_expected_tokens(concept_key):
"""
Return of list of pairs of (expected token, number of expected variable before this token)
ex:
'if x y then z end' => ('if', 0), ('then', 2), ('end', 1)
:param concept_key:
:type concept_key:
:return:
:rtype:
"""
# def custom_strip_tokens(_tokens):
# return _tokens
def custom_strip_tokens(_tokens):
"""
Removes consecutive whitespace tokens
Returns empy list if only whitespace tokens
:param _tokens:
:type _tokens:
:return:
:rtype:
"""
res = []
buffer = None
for t in _tokens:
if t.type == TokenKind.WHITESPACE:
buffer = t
else:
if buffer:
res.append(buffer)
buffer = None
res.append(t)
if res and buffer: # add the buffer only is the result is not empty
res.append(buffer)
return res
expected = [] # tuple of expected token and number of expected variables before this token
tokens = []
nb_variables = 0
parsing_tokens = None # True if we are parsing tokens (and not VAR_DEF)
for token in Tokenizer(concept_key, yield_eof=False):
if token.type == TokenKind.WHITESPACE:
tokens.append(token)
elif token.type == TokenKind.VAR_DEF:
if parsing_tokens is not None and parsing_tokens:
expected.append((custom_strip_tokens(tokens), nb_variables))
nb_variables = 1
tokens = []
parsing_tokens = False
else:
nb_variables += 1
else:
tokens.append(token)
parsing_tokens = True
# do not forget the remaining ones
if tokens or nb_variables:
expected.append((custom_strip_tokens(tokens), nb_variables))
return expected
def get_metadata_from_first_token(self, context, token: Token):
return [ConceptToRecognize(m, self._get_expected_tokens(m.key), "key")
for m in context.sheerka.get_metadatas_from_first_token("key", token.value)
if m.definition_type == DefinitionType.DEFAULT and len(m.parameters) > 0]
def _select_best_paths(self, sm) -> list:
"""Returns the result lists of the highest-scoring error-free paths.
Args:
sm: The StateMachine after execution.
Returns:
A list of result lists, one per best-scoring path.
"""
selected = []
best_score = 1
for path in sm.paths:
if path.execution_context.errors:
continue
score = self._compute_path_score(path)
if score > best_score:
selected.clear()
selected.append(path.execution_context.result)
best_score = score
elif score == best_score:
selected.append(path.execution_context.result)
return selected
@staticmethod
def _compute_path_score(path) -> int:
"""Scores a path by the total token span covered by MetadataTokens.
Args:
path: An ExecutionPath whose result is a list of MetadataToken.
Returns:
Integer score.
"""
return sum(
token.end - token.start + 1
for token in path.execution_context.result
if isinstance(token, MetadataToken)
)
def parse(self, context, parser_input, error_sink):
sm = StateMachine(self.workflows)
sm_context = StateMachineContext(context,
parser_input,
self.get_metadata_from_first_token,
[SimpleConceptsParser()])
sm.run("#tokens_wkf", "start", sm_context)
selected = self._select_best_paths(sm)
for path in sm.paths:
error_sink.extend(path.execution_context.errors)
return MultipleChoices(selected)
+51 -57
View File
@@ -3,23 +3,23 @@ from typing import Any, Literal
from common.utils import str_concept
from core.ExecutionContext import ExecutionContext
from core.concept import ConceptMetadata
from core.concept import Concept, ConceptMetadata
from parsers.ParserInput import ParserInput
from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.tokenizer import Token
@dataclass
class MetadataToken:
"""
Class that represents a text that is recognized as a concept
When a concept definition is recognized
We keep track of the start and the end position
MetadataToken is a shortcut for ConceptMetadataToken
"""
metadata: ConceptMetadata
start: int
end: int
resolution_method: Literal["name", "key", "id"]
parser: str
metadata: ConceptMetadata # concept that is recognized
start: int # start position in the texts
end: int # end position
resolution_method: Literal["name", "key", "id"] # did we use the name, the id or the key to recognize the concept
parser: str # which parser recognized the concept (SimpleConcepts, Sya, ...)
def __repr__(self):
return f"(MetadataToken metadata={str_concept(self.metadata, drop_name=True)}, " + \
@@ -41,7 +41,7 @@ class MetadataToken:
@dataclass
class UnrecognizedToken:
"""
Class that represents a text that is not recognized
Class that represents a text that is not recognized (yet)
We keep track of the start and the end position
"""
buffer: str
@@ -49,6 +49,17 @@ class UnrecognizedToken:
end: int
@dataclass
class ConceptToken:
"""
When an already defined concept is found during the parsing
We keep track of the start and the end position
"""
concept: Concept
start: int # start position in the texts
end: int # end position
@dataclass
class StateResult:
next_state: str | None
@@ -59,30 +70,57 @@ class StateResult:
class ConceptToRecognize:
"""
Holds information about the concept to recognize
During the parsing, we have a hint on a concept, But we need to finish the parsing to make sure that we are right
"""
metadata: ConceptMetadata
expected_tokens: list
expected: list[tuple]
resolution_method: Literal["name", "key", "id"] # which attribute was used to resolve the concept
def __repr__(self):
return f"ConceptToRecognize(#{self.metadata.id}, expected={self.expected})"
@dataclass
class StateMachineContext:
"""
Internal state of a state machine
"""
# initialization
context: ExecutionContext
parser_input: ParserInput
get_metadata_from_first_token: Any
get_metadata_from_first_token: Any # This is a callback that gives the possible concepts, for a token
other_parsers: list # parsers to call when managing unrecognized tokens
# attributes used when parsing token
# tokens currently being read
buffer: list[Token] = field(default_factory=list)
buffer_start_pos: int = -1
# attributes used when parsing concept
# parameters already recognized + Concept under recognition
concept_to_recognize: ConceptToRecognize | None = None
result: list = field(default_factory=list)
errors: list = field(default_factory=list)
stack: list = field(default_factory=list)
parameters: list = field(default_factory=list) # it is called 'output' in shunting yard explanations
# runtime info
result: list = field(default_factory=list) # list of tokens found
errors: list = field(default_factory=list) # error sink
def get_clones(self, concepts_to_recognize):
"""
Helper function that clone the context when multiple concepts are found
:param concepts_to_recognize:
:return:
"""
return [StateMachineContext(self.context,
self.parser_input.clone(),
self.get_metadata_from_first_token,
self.other_parsers,
self.buffer.copy(),
self.buffer_start_pos,
concept,
self.stack.copy(),
self.parameters.copy(),
self.result.copy(),
self.errors.copy())
for concept in concepts_to_recognize]
@@ -152,50 +190,6 @@ class ReadTokens(State):
return StateResult(self.name, forks)
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
if len(state_context.result) > 0 and isinstance(old := state_context.result[-1], UnrecognizedToken):
state_context.result[-1] = UnrecognizedToken(old.buffer + buffer_as_str,
old.start,
state_context.parser_input.pos - 1)
else:
state_context.result.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
return StateResult(self.next_states[0])
class ReadConcept(State):
def run(self, state_context) -> StateResult:
start = state_context.parser_input.pos
for expected in state_context.concept_to_recognize.expected_tokens:
if not state_context.parser_input.next_token(False):
# eof before the concept is recognized
state_context.errors.append(UnexpectedEof(expected, state_context.parser_input.token))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
token = state_context.parser_input.token
if token.value != expected:
# token mismatch
state_context.errors.append(UnexpectedToken(token, expected))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
state_context.result.append(MetadataToken(state_context.concept_to_recognize.metadata,
start,
state_context.parser_input.pos,
state_context.concept_to_recognize.resolution_method,
"simple"))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
class End(State):
def run(self, state_context) -> StateResult:
return StateResult(None)
+16 -12
View File
@@ -1,9 +1,10 @@
from datetime import datetime, timedelta
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from starlette import status
@@ -13,7 +14,7 @@ SECRET_KEY = "af95f0590411260f1f127bd7ef9a03409aecadf7729b3e6822b11752433b97b5"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 1
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
_ph = PasswordHasher()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
@@ -22,7 +23,7 @@ fake_users_db = {
"firstname": "Kodjo",
"lastname": "Sossouvi",
"email": "kodjo.sossouvi@gmail.com",
"hashed_password": "$2b$12$fb9jW7QUZ9KIEAAtVmWMEOGtehKy9FafUr7Zfrsb3ZMhsBbzZs7SC", # password is kodjo
"hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$77SEG+Po+keKEOY01WNFzQ$J0jJ/XcwIHOsM+uB8/eeoaukZBF1zXtGVPmNHA6c+p4", # password is kodjo
"disabled": False,
},
}
@@ -52,15 +53,16 @@ class UserInDB(User):
hashed_password: str
def get_password_hash(password: str):
def get_password_hash(password: str) -> str:
"""Hash the password using Argon2id.
Args:
password: The plaintext password to hash.
Returns:
The argon2id hash string.
"""
Hash the password
:param password:
:type password:
:return:
:rtype:
"""
return pwd_context.hash(password)
return _ph.hash(password)
def get_user(db, username: str):
@@ -74,7 +76,9 @@ def authenticate_user(fake_db, username: str, password: str):
if not user:
return False
if not pwd_context.verify(password, user.hashed_password):
try:
_ph.verify(user.hashed_password, password)
except VerifyMismatchError:
return False
return user
+1 -1
View File
@@ -67,7 +67,7 @@ class ConceptManager(BaseService):
You can define new concept, modify or delete them
There are also function to help retrieve them easily (like first token cache)
Already instantiated concepts are managed by the Memory service
Already instantiated concepts are managed by the SheerkaMemory service, not here
"""
NAME = "ConceptManager"
+1 -1
View File
@@ -6,7 +6,7 @@ from services.BaseService import BaseService
class SheerkaDummyEventManager(BaseService):
"""
Manage simple publish and subscribe functions
Need to be replaced by a standard in the industry (Redis?)
Need to be replaced by a standard in the industry (Kafka, Redis?)
"""
NAME = "DummyEventManager"
+17
View File
@@ -1,8 +1,10 @@
import inspect
from contextlib import contextmanager
import pytest
from helpers import GetNextId
from parsers.tokenizer import Token
from server.authentication import User
DEFAULT_ONTOLOGY_NAME = "current_test_"
@@ -95,3 +97,18 @@ class NewOntology:
def __exit__(self, exc_type, exc_val, exc_tb):
self.sheerka.om.revert_ontology(self.context, self.ontology)
return False
def simple_token_compare(a, b):
return a.type == b.type and a.value == b.value
@contextmanager
def comparable_tokens():
eq = Token.__eq__
ne = Token.__ne__
setattr(Token, "__eq__", simple_token_compare)
setattr(Token, "__ne__", lambda a, b: not simple_token_compare(a, b))
yield
setattr(Token, "__eq__", eq)
setattr(Token, "__ne__", ne)
+3 -3
View File
@@ -95,10 +95,10 @@ def test_i_cannot_get_an_attribute_which_is_not_defined():
def test_i_can_repr_a_concept():
next_id = GetNextId()
foo = get_concept("foo", sequence=next_id)
assert repr(foo) == "(Concept foo#1001)"
assert repr(foo) == "Concept(foo#1001)"
bar = get_concept("bar", pre="is an int", sequence=next_id)
assert repr(bar) == "(Concept bar#1002, #pre=is an int)"
assert repr(bar) == "Concept(bar#1002, #pre=is an int)"
baz = get_concept("baz", definition="add a b", variables=["a", "b"], sequence=next_id)
assert repr(baz) == "(Concept baz#1003, a=**NotInit**, b=**NotInit**)"
assert repr(baz) == "Concept(baz#1003, a=**NotInit**, b=**NotInit**)"
+13 -40
View File
@@ -1,3 +1,5 @@
from typing import Literal
from common.global_symbols import NotInit
from common.utils import unstr_concept
from core.ExecutionContext import ExecutionContext
@@ -43,7 +45,8 @@ def get_concept(name=None, body=None,
is_builtin=False,
is_unique=False,
autouse=False,
sequence=None) -> Concept:
sequence=None,
init_parameters=True) -> Concept:
"""
Create a Concept objet
Caution : 'id' and 'key' are not initialized
@@ -113,6 +116,10 @@ def get_concept(name=None, body=None,
else:
metadata.digest = ConceptManager.compute_metadata_digest(metadata)
metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables)
if init_parameters and metadata.variables:
metadata.parameters = [v[0] if isinstance(v, tuple) else v for v in metadata.variables]
return Concept(metadata)
@@ -353,13 +360,11 @@ def get_concepts(context: ExecutionContext, *concepts, **kwargs) -> list[Concept
"""
Simple and quick way to get initialize concepts for a test
:param context:
:type context:
:param concepts:
:type concepts:
:param kwargs:
:type kwargs:
:return:
:rtype:
:param concepts: Concepts to create
:param kwargs: named parameters to tweak the creation of the concepts
use_sheerka : Adds the new concepts to Sheerka. If not simply creates concepts that do not affect Sheerka
sequence : Sequence Manager, to give a correct id to the created concepts
:return: the concepts
"""
res = []
use_sheerka = kwargs.pop("use_sheerka", False)
@@ -477,35 +482,3 @@ def _rvf(value, who="Test"):
return ReturnValue(who=who, status=False, value=value)
def _ut(buffer, start=0, end=-1):
"""
helper to UnrecognizedToken
:param buffer:
:type buffer:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
return UnrecognizedToken(buffer, start, end)
def _mt(concept_id, start=0, end=-1, resolution_method="id", parser="simple"):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
if _id is None:
return MetadataToken(get_metadata(id=concept_id), start, end, resolution_method, parser)
else:
return MetadataToken(get_metadata(id=_id, name=name), start, end, resolution_method, parser)
-62
View File
@@ -1,62 +0,0 @@
import logging
from multiprocessing import Process
from time import sleep
import uvicorn
from fastapi import FastAPI
class MockServer:
""" Core application to test. """
def __init__(self, endpoints: list[dict]):
"""
:param endpoints:
:type endpoints: list of {path: '', response:''}
"""
self.api = FastAPI()
def raise_exception(ex):
raise ex
# register endpoints
for endpoint in endpoints:
method = endpoint["method"] if "method" in endpoint else "get"
if method == "post":
if "exception" in endpoint:
self.api.post(endpoint["path"])(lambda: raise_exception(endpoint["exception"]))
else:
self.api.post(endpoint["path"])(lambda: endpoint["response"])
else:
self.api.get(endpoint["path"])(lambda: endpoint["response"])
# register shutdown
self.api.on_event("shutdown")(self.close)
# create the process
self.proc = Process(target=uvicorn.run,
args=(self.api,),
kwargs={
"host": "127.0.0.1",
"port": 5000,
"log_level": "info"},
daemon=True)
async def close(self):
""" Gracefull shutdown. """
logging.warning("Shutting down the app.")
def start_server(self):
self.proc.start()
sleep(0.1)
def stop_server(self):
self.proc.terminate()
def __enter__(self):
self.start_server()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_server()
+105
View File
@@ -0,0 +1,105 @@
from typing import Literal
from common.utils import str_concept, unstr_concept
from helpers import get_metadata
from parsers.state_machine import MetadataToken, UnrecognizedToken
class MetadataTokenForTest(MetadataToken):
def __repr__(self):
res = f"(MetadataTokenForTest metadata={str_concept(self.metadata, drop_name=True)}"
if self.start is not None:
res += f", start={self.start}"
if self.end is not None:
res += f", end={self.end}"
if self.resolution_method is not None:
res += f", method={self.resolution_method}"
if self.parser is not None:
res += f", origin={self.parser}"
res += ")"
return res
def __eq__(self, other):
if not isinstance(other, MetadataToken):
return False
if self.metadata.id != other.metadata.id:
return False
if self.start is not None and self.start != other.start:
return False
if self.end is not None and self.end != other.end:
return False
if self.parser is not None and self.parser != other.parser:
return False
if self.resolution_method is not None and self.resolution_method != other.resolution_method:
return False
return True
def _ut(buffer, start=0, end=-1):
"""
helper to UnrecognizedToken
:param buffer:
:type buffer:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
return UnrecognizedToken(buffer, start, end)
def _mt(concept_id,
start=0,
end=-1,
resolution_method: Literal["name", "key", "id"] = "key",
parser="simple",
**kwargs):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
variables = [(k, v) for k, v in kwargs.items()] if kwargs else None
metadata = get_metadata(id=concept_id, variables=variables) if _id is None \
else get_metadata(id=_id, name=name, variables=variables)
return MetadataTokenForTest(metadata, start, end, resolution_method, parser)
def _mtsya(concept_id,
start=0,
end=None,
resolution_method: Literal["name", "key", "id"] = "key",
parser="sya",
**kwargs):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
variables = [(k, v) for k, v in kwargs.items()] if kwargs else None
metadata = get_metadata(id=concept_id, variables=variables) if _id is None \
else get_metadata(id=_id, name=name, variables=variables)
return MetadataTokenForTest(metadata, start, end, resolution_method, parser)
+34 -25
View File
@@ -3,8 +3,9 @@ import pytest
from base import BaseTest
from conftest import NewOntology
from evaluators.base_evaluator import MultipleChoices
from helpers import _mt, _ut, get_concepts, get_from, get_metadata, get_parser_input
from parsers.SimpleParserParser import SimpleConceptsParser
from helpers import get_concepts, get_from, get_metadata, get_parser_input
from parsers.SimpleConceptsParser import SimpleConceptsParser
from tests.parsers.conftest import _mt, _ut
class TestSimpleConceptsParser(BaseTest):
@@ -18,9 +19,9 @@ class TestSimpleConceptsParser(BaseTest):
("xxx yyy I am a new concept", [_ut("xxx yyy ", 0, 3), _mt("1003", 4, 12)]),
("I am a new concept xxx yyy", [_mt("1003", 0, 8), _ut(" xxx yyy", 9, 12)]),
("xxx I am a new concept yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 10), _ut(" yyy", 11, 12)]),
("c:#1003:", [_mt("1003", 0, 0)]),
("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2), _ut(" yyy", 3, 4)]),
("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2), _ut(" yyy", 3, 4)]),
("c:#1003:", [_mt("1003", 0, 0, resolution_method="id")]),
("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2, resolution_method="id"), _ut(" yyy", 3, 4)]),
("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2, resolution_method="name"), _ut(" yyy", 3, 4)]),
(" I am a new concept", [_ut(" ", 0, 0), _mt("1003", 1, 9)])
])
def test_i_can_recognize_a_concept(self, context, parser, text, expected):
@@ -28,13 +29,14 @@ class TestSimpleConceptsParser(BaseTest):
get_concepts(context, "I", "I am", "I am a new concept", use_sheerka=True)
pi = get_parser_input(text)
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([expected])
assert not parser.error_sink
assert not error_sink
@pytest.mark.parametrize("text, expected", [
("foo", [_mt("1001", 0, 0)]),
("foo", [_mt("1001", 0, 0, resolution_method="name")]),
("I am a new concept", [_mt("1001", 0, 8)])
])
def test_i_can_recognize_a_concept_by_its_name_and_its_definition(self, context, parser, text, expected):
@@ -42,13 +44,14 @@ class TestSimpleConceptsParser(BaseTest):
get_concepts(context, get_metadata(name="foo", definition="I am a new concept"), use_sheerka=True)
pi = get_parser_input(text)
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([expected])
assert not parser.error_sink
assert not error_sink
@pytest.mark.parametrize("text, expected", [
("long concept name", [_mt("1001", 0, 4)]),
("long concept name", [_mt("1001", 0, 4, resolution_method="name")]),
("I am a new concept", [_mt("1001", 0, 8)])
])
def test_i_can_recognize_a_concept_by_its_name_when_long_name(self, context, parser, text, expected):
@@ -57,17 +60,19 @@ class TestSimpleConceptsParser(BaseTest):
use_sheerka=True)
pi = get_parser_input(text)
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([expected])
assert not parser.error_sink
assert not error_sink
def test_i_can_parse_a_sequence_of_concept(self, context, parser):
with NewOntology(context, "test_i_can_parse_a_sequence_of_concept"):
get_concepts(context, "foo bar", "baz", "qux", use_sheerka=True)
pi = get_parser_input("foo bar baz foo, qux")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mt("1001", 0, 2),
_ut(" ", 3, 3),
@@ -76,40 +81,43 @@ class TestSimpleConceptsParser(BaseTest):
_mt("1003", 9, 9)]
assert res == MultipleChoices([expected])
assert not parser.error_sink
assert not error_sink
def test_i_can_detect_multiple_choices(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices"):
get_concepts(context, "foo bar", "bar baz", use_sheerka=True)
pi = get_parser_input("foo bar baz")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected1 = [_mt("1001", 0, 2), _ut(" baz", 3, 4)]
expected2 = [_ut("foo ", 0, 1), _mt("1002", 2, 4)]
assert res == MultipleChoices([expected1, expected2])
assert not parser.error_sink
assert not error_sink
def test_i_can_detect_multiple_choices_2(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
get_concepts(context, "one two", "one", "two", use_sheerka=True)
pi = get_parser_input("one two")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected1 = [_mt("1001", 0, 2)]
expected2 = [_mt("1002", 0, 0), _ut(" ", 1, 1), _mt("1003", 2, 2)]
assert res == MultipleChoices([expected1, expected2])
assert not parser.error_sink
assert not error_sink
def test_i_can_detect_multiple_choices_3(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
get_concepts(context, "one two", "one", "two", use_sheerka=True)
pi = get_parser_input("one two xxx one two")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
e1 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:#1001:"))
e2 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:one two#1001:"))
@@ -118,11 +126,12 @@ class TestSimpleConceptsParser(BaseTest):
_mt("c:#1003:"))
assert res == MultipleChoices([e1, e2, e3, e4])
assert not parser.error_sink
assert not error_sink
def test_nothing_is_return_is_no_concept_is_recognized(self, context, parser):
pi = get_parser_input("one two three")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([])
@@ -131,12 +140,12 @@ class TestSimpleConceptsParser(BaseTest):
get_concepts(context, "foo", "i am a concept", use_sheerka=True)
pi = get_parser_input("foo.attribute")
res = parser.parse(context, pi)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mt("1001", 0, 0), _ut(".attribute", 1, 2)]
assert res == MultipleChoices([expected])
pi = get_parser_input("i am a concept.attribute")
res = parser.parse(context, pi)
res = parser.parse(context, pi, error_sink)
expected = [_mt("1002", 0, 6), _ut(".attribute", 7, 8)]
assert res == MultipleChoices([expected])
+94
View File
@@ -0,0 +1,94 @@
import pytest
from base import BaseTest
from conftest import NewOntology, comparable_tokens
from evaluators.base_evaluator import MultipleChoices
from helpers import get_concept, get_concepts, get_parser_input
from parsers.SyaConceptsParser import SyaConceptsParser
from parsers.tokenizer import Tokenizer
from tests.parsers.conftest import _mtsya
class TestSyaConceptsParser(BaseTest):
@pytest.fixture()
def parser(self):
return SyaConceptsParser()
@pytest.mark.parametrize("concept_key, expected_list", [
["a long token name", [("a long token name", 0)]],
["__var__0 __var__1 __var__2", [("", 3)]],
["__var__0 __var__1 prefixed", [(" prefixed", 2)]],
["suffixed __var__0 __var__1", [("suffixed ", 0), ["", 2]]],
["__var__0 __var__1 infixed __var__0 __var__1", [(" infixed ", 2), ["", 2]]],
["if __var__0 __var__1 then __var__2 end", [("if ", 0), (" then ", 2), (" end", 1)]]
])
def test_i_can_initialize_expected_parameters(self, parser, concept_key, expected_list):
resolved_expected_list = [(list(Tokenizer(source, yield_eof=False)), nb) for source, nb in expected_list]
actual = parser._get_expected_tokens(concept_key)
with comparable_tokens():
assert actual == resolved_expected_list
@pytest.mark.parametrize("concept, _input", [
(get_concept("a plus b", variables=["a", "b"]), "1 plus 2"),
(get_concept("add a b", variables=["a", "b"]), "add 1 2"),
(get_concept("a b add", variables=["a", "b"]), "1 2 add")
])
def test_i_can_parse_a_simple_case(self, context, parser, concept, _input):
with NewOntology(context, "test_i_can_parse_a_simple_case"):
get_concepts(context, concept, use_sheerka=True)
pi = get_parser_input(_input)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mtsya("1001", a="1 ", b=" 2")]
assert res == MultipleChoices([expected])
assert not error_sink
def test_i_can_parse_long_names_concept(self, context, parser):
with NewOntology(context, "test_i_can_parse_a_simple_case"):
get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 long named concept 2")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mtsya("1001", a="1 ", b=" 2")]
assert res == MultipleChoices([expected])
assert not error_sink
def test_i_can_parse_sequence(self, context, parser):
with NewOntology(context, "test_i_can_parse_sequence"):
get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 plus 2 3 plus 7")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]]
assert res == MultipleChoices(expected)
assert not error_sink
def test_not_enough_parameters(self, context, parser):
with NewOntology(context, "test_not_enough_parameters"):
get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 plus ")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]]
assert res == MultipleChoices(expected)
assert not error_sink
def test_i_can_detect_when_name_does_not_match(self, context, parser):
with NewOntology(context, "test_i_can_detect_when_name_does_not_match"):
get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 long named mismatch 2")
error_sink = []
res = parser.parse(context, pi, error_sink)
assert error_sink
+20 -20
View File
@@ -1,11 +1,11 @@
from unittest.mock import MagicMock, patch
from fastapi import HTTPException
from starlette import status
from client import SheerkaClient, parse_arguments
from mockserver import MockServer
# @pytest.mark.skip("too long")
class TestSheerkaClient:
def test_i_can_start_with_a_default_hostname(self):
parsed = parse_arguments([])
@@ -41,7 +41,11 @@ class TestSheerkaClient:
assert res.message == "Connection refused."
def test_i_can_manage_when_resource_is_not_found(self):
with MockServer([]):
mock_response = MagicMock()
mock_response.__bool__ = MagicMock(return_value=False)
mock_response.text = '{"detail":"Not Found"}'
with patch("requests.get", return_value=mock_response):
client = SheerkaClient("http://localhost", 5000)
res = client.check_url()
@@ -49,29 +53,25 @@ class TestSheerkaClient:
assert res.message == '{"detail":"Not Found"}'
def test_i_can_connect_to_a_server(self):
with MockServer([{
"path": "/",
"response": "Hello world"
}]):
mock_response = MagicMock()
mock_response.__bool__ = MagicMock(return_value=True)
mock_response.text = '"Hello world"'
with patch("requests.get", return_value=mock_response):
client = SheerkaClient("http://localhost", 5000)
res = client.check_url()
assert res.status
assert res.message == '"Hello world"'
def test_i_can_manage_when_authentication_fails(self):
with MockServer([{
"path": "/",
"response": "Hello world"
}, {
"method": "post",
"path": "/token",
"exception": HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
}]):
mock_response = MagicMock()
mock_response.__bool__ = MagicMock(return_value=False)
mock_response.json.return_value = {"detail": "Incorrect username or password"}
with patch("requests.post", return_value=mock_response):
client = SheerkaClient("http://localhost", 5000)
res = client.connect("username", "wrong_password")
assert not res.status
assert res.message == 'Incorrect username or password'
assert res.message == "Incorrect username or password"
+2 -1
View File
@@ -2,8 +2,9 @@ import pytest
from common.global_symbols import NotInit
from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType
from helpers import GetNextId, _mt, _ut, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \
from helpers import GetNextId, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \
get_metadatas
from tests.parsers.conftest import _mt, _ut
def test_i_can_get_default_value_when_get_metadata():