Compare commits

..

4 Commits

Author SHA1 Message Date
kodjo 078d8e5df6 Restarting the project.
Fixing unit tests. Continuing SyaParser
2026-04-12 09:40:04 +02:00
kodjo 3be854d34c intermediate commit 2026-04-11 21:01:39 +02:00
kodjo a729d98a0d Working on #21 : Working on SyaConceptsParser.py 2026-04-11 21:01:39 +02:00
kodjo a7043b1dd8 Working on #21 : Created classes 2026-04-11 21:01:39 +02:00
24 changed files with 2766 additions and 1195 deletions
+433
View File
@@ -0,0 +1,433 @@
# SyaConceptsParser
## Purpose
`SyaConceptsParser` parse des **séquences de concepts avec paramètres** (variables).
Il complète `SimpleConceptsParser` qui, lui, ne gère que les concepts sans paramètres.
Exemples de concepts reconnus :
- `a plus b` → reconnaît `1 plus 2`, `x plus y plus z`, etc.
- `if a then b end` → reconnaît `if x > 0 then print x end`
- `a long named concept b` → reconnaît `1 long named concept 2`
Le cas fondamental visé est la **composition de concepts** : `1 plus 2 times 3`, où
`times` doit être évalué avant `plus`. C'est ce problème de précédence que résout le
Shunting Yard Algorithm.
---
## Le Shunting Yard Algorithm (SYA)
Algorithme de Dijkstra (1961) qui convertit une expression en notation infixe
(`1 + 2 * 3`) en **notation polonaise inverse** (RPN : `1 2 3 * +`), en respectant
la précédence des opérateurs.
### Principe
Deux structures : un **stack d'opérateurs** et une **queue de sortie**.
```
Entrée : 1 + 2 * 3
┌──────────────────────────────────────────────┐
Token │ Action Stack Sortie │
─────────┼──────────────────────────────────────────────┤
1 │ → sortie [] [1] │
+ │ → stack [+] [1] │
2 │ → sortie [+] [1, 2] │
* │ prec(*) > prec(+) [+, *] [1, 2] │
│ → stack (pas de pop) [1, 2] │
3 │ → sortie [+, *] [1, 2, 3] │
fin │ vider stack [] [1, 2, 3, *, +] │
└──────────────────────────────────────────────────────┘
RPN : 1 2 3 * + ≡ 1 + (2 * 3)
```
### Règle de pop
Quand on empile un opérateur `op`, on dépile d'abord tout opérateur `top` tel que :
`précédence(top) >= précédence(op)`
Cela garantit que les opérateurs de haute précédence sont évalués en premier.
---
## Adaptation dans Sheerka
Le SYA original travaille sur des **tokens atomiques** (chiffres, `+`, `*`).
Sheerka l'adapte pour travailler sur des **concepts** qui :
1. **S'identifient par plusieurs tokens** — un concept comme `if a then b end`
contient plusieurs mots-clés (`if`, `then`, `end`) entrelacés avec des paramètres.
L'algorithme original reconnaît un opérateur en un seul token.
2. **Peuvent contenir N paramètres** — un opérateur binaire a exactement 2 opérandes.
Un concept Sheerka peut en avoir 0, 1, 2 ou plus.
3. **Les paramètres peuvent eux-mêmes être des concepts** — dans `1 plus 2 times 3`,
le paramètre `b` de `plus` est le résultat du concept `times`. La récursion est
gérée par l'imbrication des workflows.
### Correspondance SYA ↔ Sheerka
| SYA original | Sheerka |
|---|---|
| Opérateur (`+`, `*`) | `ConceptToRecognize` (concept avec paramètres) |
| Opérande (nombre, variable) | `UnrecognizedToken` ou `ConceptToken` |
| Stack d'opérateurs | `state_context.stack` |
| Queue de sortie | `state_context.parameters` |
| Précédence | `InitConceptParsing.must_pop()` |
| Résultat RPN | `MetadataToken` dans `state_context.result` |
### Différences structurelles
**Reconnaissance multi-tokens** — là où SYA lit un token pour identifier `*`,
Sheerka doit lire `long named concept` (3 tokens) pour identifier le concept
`a long named concept b`. La classe `ReadConcept` gère cette lecture séquentielle.
**Structure `expected`** — le concept `if a then b end` est décomposé en segments :
```
[("if ", 0), (" then ", 1), (" end", 1)]
──────── ────────── ──────────
keyword keyword keyword
0 params 1 param 1 param
avant avant avant
```
Chaque segment indique combien de paramètres précèdent ce groupe de tokens, et
quels tokens consommer pour valider ce segment.
**Précédence non encore implémentée**`must_pop()` retourne toujours `False`.
La composition de concepts n'est donc pas encore active. C'est la prochaine étape
d'implémentation.
---
## Architecture
### Deux workflows interdépendants
```mermaid
graph TD
A[#tokens_wkf] -->|concept keyword found| B[#concept_wkf]
B -->|concept fully parsed| A
A -->|EOF| C[end]
```
Le parser démarre toujours dans `#tokens_wkf`. À chaque fois qu'un mot-clé
correspondant au premier token d'un concept est détecté, un **fork** est créé et
envoyé dans `#concept_wkf`. Une fois le concept reconnu, on revient dans
`#tokens_wkf` pour continuer la lecture.
---
## Workflow `#tokens_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> prepare_read_tokens
prepare_read_tokens --> read_tokens
read_tokens --> read_tokens : no concept found (loop)
read_tokens --> eof : EOF
read_tokens --> concepts_found : concept keyword detected (fork)
eof --> end : ManageUnrecognized
concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf
end --> [*]
```
**`PrepareReadTokens`** : initialise le buffer et mémorise `buffer_start_pos`.
**`ReadTokens`** : lit un token, consulte `get_metadata_from_first_token`. Si un concept
peut démarrer à ce token → **fork** avec un clone du contexte où `concept_to_recognize`
est renseigné. Le chemin principal continue à lire.
**`ManageUnrecognized("concepts found")`** : traite le buffer accumulé avant le
mot-clé (via `SimpleConceptsParser`). Les tokens non reconnus deviennent
`UnrecognizedToken` et sont ajoutés à `parameters`.
---
## Workflow `#concept_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> init_concept_parsing
init_concept_parsing --> manage_parameters
manage_parameters --> read_concept
read_concept --> read_parameters : more segments
read_concept --> finalize_concept : all segments done
read_concept --> token_mismatch : token mismatch
read_concept --> error_eof : unexpected EOF
read_parameters --> manage_parameters : loop
read_parameters --> finalize_concept : EOF
finalize_concept --> tokens_wkf : #tokens_wkf
token_mismatch --> end
error_eof --> end
end --> [*]
```
**`InitConceptParsing`** :
- Vérifie que le nombre de paramètres déjà collectés est suffisant
- Retire le premier token du segment (déjà consommé par `ReadTokens`)
- Applique le SYA : empile le concept sur le stack
**`ReadConcept`** : lit les tokens fixes du segment courant un par un.
Si tous correspondent → `pop(0)` du segment et continue.
**`ReadParameters`** : lit UN token dans le buffer. Revient à
`ManageUnrecognized` qui tente de le reconnaître via `SimpleConceptsParser`.
**`FinalizeConceptParsing`** :
- Dépile le concept du stack
- Calcule `start` (depuis le premier paramètre) et `end` (position courante)
- Crée un `MetadataToken(concept.metadata, start, end, resolution_method, "sya")`
- Vide stack et parameters
- Retourne à `#tokens_wkf`
---
## Exemple pas à pas — `"1 plus 2"`
Concept défini : `a plus b` (variables `a`, `b`).
**Tokens :**
```
pos : 0 1 2 3 4 5
tok : "1" " " "plus" " " "2" EOF
```
**`expected` pour ce concept :**
```
[([" ", "plus", " "], 1), ([], 1)]
segment 0 → 1 param avant, lire " plus "
segment 1 → 1 param avant, lire rien (concept se termine par un param)
```
**Déroulé :**
```
PrepareReadTokens → buffer_start_pos = 0
ReadTokens "1" → no concept, buffer = ["1"]
ReadTokens " " → no concept, buffer = ["1", " "]
ReadTokens "plus" → concept "a plus b" trouvé !
┌── FORK ──────────────────────────────────────────────────────┐
│ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │
└──────────────────────────────────────────────────────────────┘
ManageUnrecognized("concepts found")
buffer = ["1"," "] → SimpleConceptsParser → not found
parameters = [UT("1 ", start=0, end=1)]
buffer_start_pos = 3
→ #concept_wkf
InitConceptParsing
expected[0] = ([" ","plus"," "], 1)
need 1 param → have 1 ✓
strip leading WS → ["plus"," "]
pop "plus" (déjà lu) → [" "]
SYA: stack = [CTR(a_plus_b)]
ManageUnrecognized("manage parameters") : buffer vide → rien
ReadConcept : lit [" "] → pos 3 = " " ✓
expected.pop(0) → remaining = [([], 1)]
→ "read parameters"
ReadParameters : lit "2" (pos 4)
buffer = ["2"]
→ "manage parameters"
ManageUnrecognized("manage parameters")
buffer = ["2"] → not a concept
parameters = [UT("1 ", 0, 1), UT("2", 3, 3)]
buffer_start_pos = 5
ReadConcept : expected = [([], 1)], lit 0 tokens
expected.pop(0) → empty → "finalize concept"
FinalizeConceptParsing
concept = stack.pop() = CTR(a_plus_b)
start = parameters[0].start = 0
end = parser_input.pos = 4
result.append(MetadataToken(metadata, 0, 4, "key", "sya"))
→ #tokens_wkf
ReadTokens → EOF → ManageUnrecognized("eof") → end
```
**Résultat :**
```
MultipleChoices([
[MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")]
])
```
---
## Exemple — séquence `"1 plus 2 3 plus 7"`
Même concept `a plus b`. Le parser reconnaît deux concepts successifs dans un seul passage.
```
pos : 0 1 2 3 4 5 6 7 8 9 10 11
tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF
```
Après `FinalizeConceptParsing` du premier concept (pos=4), `#tokens_wkf` repart :
```
PrepareReadTokens → buffer_start_pos = 5
ReadTokens " " → buffer = [" "]
ReadTokens "3" → buffer = [" ","3"]
ReadTokens " " → buffer = [" ","3"," "]
ReadTokens "plus" → fork
ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9
...
FinalizeConceptParsing
start = 5, end = 10
result.append(MetadataToken(1001, 5, 10, "key", "sya"))
```
**Résultat final (un seul path, deux concepts) :**
```
MultipleChoices([
[
MetadataToken(1001, start=0, end=4, parser="sya"),
MetadataToken(1001, start=5, end=10, parser="sya"),
]
])
```
---
## Exemple futur — composition `"1 plus 2 times 3"`
> **Note :** cet exemple nécessite l'implémentation de `must_pop()`.
> Aujourd'hui `must_pop()` retourne toujours `False`.
Concepts : `a plus b` (basse précédence), `a times b` (haute précédence).
**Comportement attendu après implémentation :**
```
Expression : 1 plus 2 times 3
SYA avec précédence times > plus :
Token "1" → parameters = [1] stack = []
Token "plus" → stack = [plus] parameters = [1]
Token "2" → parameters = [1, 2] stack = [plus]
Token "times" → prec(times) > prec(plus) → pas de pop
stack = [plus, times] parameters = [1, 2]
Token "3" → parameters = [1, 2, 3] stack = [plus, times]
Finalize :
pop "times" → MetadataToken(times, params=[2, 3])
pop "plus" → MetadataToken(plus, params=[1, times_result])
```
**Ce que `must_pop()` doit implémenter :**
```python
def must_pop(self, current_concept, top_of_stack_concept):
return precedence(top_of_stack_concept) >= precedence(current_concept)
```
Sans cette règle, les deux concepts seraient traités de gauche à droite avec la même
précédence, ce qui donnerait `(1 plus 2) times 3` au lieu de `1 plus (2 times 3)`.
---
## Structure `expected` en détail
Pour le concept `if a then b end` (clé `"if __var__0 then __var__1 end"`) :
```
_get_expected_tokens("if __var__0 then __var__1 end")
→ [
(["if", " "], 0), # lire "if " avant le 1er param
([" ", "then", " "], 1), # lire " then " avant le 2ème param
([" ", "end"], 1), # lire " end" avant le 3ème... non, 1 param avant
]
```
Pendant le parsing, `expected` est **modifié en place** :
- `InitConceptParsing` retire le premier token du segment 0 (déjà lu par `ReadTokens`)
- `ReadConcept` consomme les tokens du segment courant puis fait `pop(0)`
- Quand `expected` est vide → `FinalizeConceptParsing`
---
## Structures de données clés
### `StateMachineContext`
```
StateMachineContext
├── parser_input ParserInput flux de tokens + curseur
├── other_parsers [SimpleConceptsParser]
├── buffer list[Token] tokens en attente de classification
├── buffer_start_pos int position de début du buffer courant
├── concept_to_recognize ConceptToRecognize | None
├── stack list[CTR] SYA — stack d'opérateurs
├── parameters list[UT|CT] SYA — queue de sortie
├── result list[MetadataToken]
└── errors list
```
### `MetadataToken` (sortie)
```
MetadataToken
├── metadata ConceptMetadata (id, name, key, variables, ...)
├── start int position du 1er token de l'expression
├── end int position du dernier token
├── resolution_method "key" | "name" | "id"
└── parser "sya"
```
### Positions dans `"1 plus 2"` :
```
"1 plus 2"
0 1 2 3 4
│ │ │ │ │
1 _ plus _ 2
MetadataToken : start=0, end=4
```
---
## Différences avec `SimpleConceptsParser`
| | `SimpleConceptsParser` | `SyaConceptsParser` |
|---|---|---|
| Concepts ciblés | Sans paramètres | Avec paramètres |
| `concept_wkf` | 2 états | 8 états |
| Contenu de `result` | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` uniquement |
| Paramètres | N/A | Collectés dans `parameters` |
| Parser tag | `"simple"` | `"sya"` |
| SYA | Non | Oui (précédence à implémenter) |
---
## Gestion des erreurs
| Erreur | Cause | État atteint |
|---|---|---|
| `UnexpectedToken` | Token lu ≠ token attendu du concept | `TokenMismatch``end` |
| `UnexpectedEof` | Fin de l'entrée avant fin du concept | `ErrorEof``end` |
| `NotEnoughParameters` | Pas assez de params avant un segment | Exception levée |
Les erreurs sont collectées depuis **tous les paths** et transmises à `error_sink` dans
`parse()`. Un path avec erreurs est exclu de `_select_best_paths`.
+545
View File
@@ -0,0 +1,545 @@
# SyaConceptsParser
## Purpose
`SyaConceptsParser` parses **sequences of concepts with parameters** (variables).
It complements `SimpleConceptsParser`, which only handles parameter-less concepts.
Examples of recognized concepts:
- `a plus b` → matches `1 plus 2`, `x plus y`, etc.
- `if a then b end` → matches `if x > 0 then print x end`
- `a long named concept b` → matches `1 long named concept 2`
The primary goal is **concept composition**: `1 plus 2 times 3`, where `times` must
be evaluated before `plus`. This precedence problem is what the Shunting Yard
Algorithm solves.
---
## The Shunting Yard Algorithm (SYA)
Dijkstra's algorithm (1961) converts an infix expression (`1 + 2 * 3`) into
**Reverse Polish Notation** (RPN: `1 2 3 * +`), respecting operator precedence.
### Principle
Two structures: an **operator stack** and an **output queue**.
```
Input: 1 + 2 * 3
┌──────────────────────────────────────────────┐
Token │ Action Stack Output │
─────────┼──────────────────────────────────────────────┤
1 │ → output queue [] [1] │
+ │ → stack [+] [1] │
2 │ → output queue [+] [1, 2] │
* │ prec(*) > prec(+) [+, *] [1, 2] │
│ → stack (no pop) │
3 │ → output queue [+, *] [1, 2, 3] │
end │ flush stack [] [1,2,3,*,+] │
└──────────────────────────────────────────────────────┘
RPN: 1 2 3 * + ≡ 1 + (2 * 3)
```
### Pop rule
When pushing operator `op`, first pop any stack-top operator `top` where:
`precedence(top) >= precedence(op)`
This ensures higher-precedence operators are evaluated first.
---
## Sheerka's Adaptation
The original SYA works on **atomic tokens** (digits, `+`, `*`).
Sheerka adapts it for **concepts** that:
1. **Are identified by multiple tokens** — a concept like `if a then b end` has
several keywords (`if`, `then`, `end`) interleaved with parameters.
The original SYA identifies an operator with a single token.
2. **Can have N parameters** — a binary operator has exactly 2 operands.
A Sheerka concept can have 0, 1, 2 or more parameters.
3. **Parameters can themselves be concepts** — in `1 plus 2 times 3`, the parameter
`b` of `plus` is the result of the `times` concept. This recursion is handled
by the nested workflow structure.
### SYA ↔ Sheerka mapping
| Original SYA | Sheerka |
|---|---|
| Operator (`+`, `*`) | `ConceptToRecognize` (concept with parameters) |
| Operand (number, variable) | `UnrecognizedToken` or `ConceptToken` |
| Operator stack | `state_context.stack` |
| Output queue | `state_context.parameters` |
| Precedence rule | `InitConceptParsing.must_pop()` |
| RPN result | `MetadataToken` in `state_context.result` |
### Structural differences
**Multi-token recognition** — where SYA reads a single token to identify `*`,
Sheerka must read `long named concept` (3 tokens) to identify concept
`a long named concept b`. The `ReadConcept` state handles this sequential reading.
**The `expected` structure** — concept `if a then b end` is decomposed into segments:
```
[("if ", 0), (" then ", 1), (" end", 1)]
───────── ────────── ──────────
keyword keyword keyword
0 params 1 param 1 param
before before before
```
Each segment states how many parameters precede it and which tokens to consume
to validate it.
**Precedence not yet implemented**`must_pop()` always returns `False`.
Concept composition with precedence rules is the next implementation step.
---
## Architecture
### Two interdependent workflows
```mermaid
graph TD
A[#tokens_wkf] -->|concept keyword found - fork| B[#concept_wkf]
A -->|token not a concept keyword - buffered, loop| A
B -->|concept fully parsed| A
A -->|EOF| C[end]
```
The parser always starts in `#tokens_wkf`. Tokens that do not match any concept
keyword are accumulated in a buffer and the loop continues. Whenever a token
matching the first keyword of a known concept is detected, a **fork** is created
and sent into `#concept_wkf` — the main path keeps looping independently. Once the
concept is recognized, the fork returns to `#tokens_wkf` to continue reading.
---
## Workflow `#tokens_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> prepare_read_tokens
prepare_read_tokens --> read_tokens
read_tokens --> read_tokens : no concept found (loop)
read_tokens --> eof : EOF
read_tokens --> concepts_found : concept keyword detected (fork)
eof --> end : ManageUnrecognized
concepts_found --> concept_wkf : ManageUnrecognized → #concept_wkf
end --> [*]
```
**`PrepareReadTokens`**: initializes the buffer and records `buffer_start_pos`.
**`ReadTokens`**: reads one token, calls `get_metadata_from_first_token`. If a concept
can start at this token → **fork** with a cloned context where `concept_to_recognize`
is set. The main path continues scanning.
**`ManageUnrecognized("concepts found")`**: processes the buffer accumulated before
the keyword (via `SimpleConceptsParser`). Unrecognized tokens become
`UnrecognizedToken` and are added to `parameters`.
---
## Workflow `#concept_wkf`
```mermaid
stateDiagram-v2
[*] --> start
start --> init_concept_parsing
init_concept_parsing --> manage_parameters
manage_parameters --> read_concept
read_concept --> read_parameters : more segments
read_concept --> finalize_concept : all segments done
read_concept --> token_mismatch : token mismatch
read_concept --> error_eof : unexpected EOF
read_parameters --> manage_parameters : loop
read_parameters --> finalize_concept : EOF
finalize_concept --> tokens_wkf : #tokens_wkf
token_mismatch --> end
error_eof --> end
end --> [*]
```
**`InitConceptParsing`**:
- Verifies the number of already-collected parameters is sufficient
- Removes the first token of segment 0 (already consumed by `ReadTokens`)
- Applies SYA: pushes the concept onto the stack
**`ReadConcept`**: reads the fixed tokens of the current segment one by one.
If all match → `pop(0)` the segment and continue.
**`ReadParameters`**: reads ONE token into the buffer. Returns to
`ManageUnrecognized` which tries to recognize it via `SimpleConceptsParser`.
**`FinalizeConceptParsing`**:
- Pops the concept from the stack
- Computes `start` (from the first parameter) and `end` (current position)
- Creates `MetadataToken(concept.metadata, start, end, resolution_method, "sya")`
- Clears stack and parameters
- Returns to `#tokens_wkf`
---
## Step-by-step example — `"1 plus 2"`
Concept: `a plus b` (variables `a`, `b`).
**Tokens:**
```
pos : 0 1 2 3 4 5
tok : "1" " " "plus" " " "2" EOF
```
**`expected` for this concept:**
```
[([" ", "plus", " "], 1), ([], 1)]
segment 0 → 1 param before, read " plus "
segment 1 → 1 param before, read nothing (concept ends with a param)
```
**Execution trace:**
```
PrepareReadTokens → buffer_start_pos = 0
ReadTokens "1" → no concept, buffer = ["1"]
ReadTokens " " → no concept, buffer = ["1", " "]
ReadTokens "plus" → concept "a plus b" found!
┌── FORK ─────────────────────────────────────────────────────┐
│ clone: buffer=["1"," "], pos=2, concept_to_recognize=CTR(+) │
└─────────────────────────────────────────────────────────────┘
ManageUnrecognized("concepts found")
buffer = ["1"," "] → SimpleConceptsParser → not found
parameters = [UnrecognizedToken("1 ", start=0, end=1)]
buffer_start_pos = 3
→ #concept_wkf
InitConceptParsing
expected[0] = ([" ","plus"," "], 1)
need 1 param → have 1 ✓
strip leading WS → ["plus"," "]
pop "plus" (already consumed) → [" "]
SYA: stack = [CTR(a_plus_b)]
ManageUnrecognized("manage parameters"): buffer empty → nothing
ReadConcept: reads [" "] → pos 3 = " " ✓
expected.pop(0) → remaining = [([], 1)]
→ "read parameters"
ReadParameters: reads "2" at pos 4
buffer = ["2"]
→ "manage parameters"
ManageUnrecognized("manage parameters")
buffer = ["2"] → not a concept
parameters = [UT("1 ", 0, 1), UT("2", 3, 3)]
buffer_start_pos = 5
ReadConcept: expected = [([], 1)], reads 0 tokens
expected.pop(0) → empty → "finalize concept"
FinalizeConceptParsing
concept = stack.pop() = CTR(a_plus_b)
start = parameters[0].start = 0
end = parser_input.pos = 4
result.append(MetadataToken(metadata, 0, 4, "key", "sya"))
→ #tokens_wkf
ReadTokens → EOF → ManageUnrecognized("eof") → end
```
**Result:**
```
MultipleChoices([
[MetadataToken(id="1001", start=0, end=4, resolution_method="key", parser="sya")]
])
```
---
## Example — sequence `"1 plus 2 3 plus 7"`
Same concept `a plus b`. The parser recognizes two concepts in one pass.
```
pos : 0 1 2 3 4 5 6 7 8 9 10 11
tok : "1" " " "plus" " " "2" " " "3" " " "plus" " " "7" EOF
```
After `FinalizeConceptParsing` for the first concept (pos=4), `#tokens_wkf` restarts:
```
PrepareReadTokens → buffer_start_pos = 5
ReadTokens " " → buffer = [" "]
ReadTokens "3" → buffer = [" ","3"]
ReadTokens " " → buffer = [" ","3"," "]
ReadTokens "plus" → fork
ManageUnrecognized → UT(" 3 ", start=5, end=7), buffer_start_pos=9
...
FinalizeConceptParsing
start = 5, end = 10
result.append(MetadataToken(1001, 5, 10, "key", "sya"))
```
**Final result (one path, two concepts):**
```
MultipleChoices([
[
MetadataToken(1001, start=0, end=4, parser="sya"),
MetadataToken(1001, start=5, end=10, parser="sya"),
]
])
```
---
## Future example — composition `"1 plus 2 times 3"`
> **Note:** this example requires implementing `must_pop()`.
> Currently `must_pop()` always returns `False`.
Concepts: `a plus b` (low precedence), `a times b` (high precedence).
**Expected behavior after implementation:**
```
Expression: 1 plus 2 times 3
SYA with precedence times > plus:
Token "1" → parameters = [1] stack = []
Token "plus" → stack = [plus] parameters = [1]
Token "2" → parameters = [1, 2] stack = [plus]
Token "times" → prec(times) > prec(plus) → no pop
stack = [plus, times] parameters = [1, 2]
Token "3" → parameters = [1, 2, 3] stack = [plus, times]
Finalize:
pop "times" → MetadataToken(times, params=[2, 3])
pop "plus" → MetadataToken(plus, params=[1, times_result])
```
**What `must_pop()` must implement:**
```python
def must_pop(self, current_concept, top_of_stack_concept):
return precedence(top_of_stack_concept) >= precedence(current_concept)
```
Without this rule, both concepts are processed left-to-right with equal precedence,
yielding `(1 plus 2) times 3` instead of `1 plus (2 times 3)`.
---
## The `expected` structure in detail
For concept `if a then b end` (key `"if __var__0 then __var__1 end"`):
```
_get_expected_tokens("if __var__0 then __var__1 end")
→ [
(["if", " "], 0), # read "if " before 1st param
([" ", "then", " "], 1), # read " then " before 2nd param
([" ", "end"], 1), # read " end" — 1 param before, no trailing param
]
```
During parsing, `expected` is **modified in place**:
- `InitConceptParsing` removes the first token of segment 0 (already read by `ReadTokens`)
- `ReadConcept` consumes the tokens of the current segment then calls `pop(0)`
- When `expected` is empty → `FinalizeConceptParsing`
---
## Key data structures
### `StateMachineContext`
```
StateMachineContext
├── parser_input ParserInput token stream + cursor
├── other_parsers [SimpleConceptsParser]
├── buffer list[Token] tokens pending classification
├── buffer_start_pos int start position of the current buffer
├── concept_to_recognize ConceptToRecognize | None
├── stack list[CTR] SYA — operator stack
├── parameters list[UT|CT] SYA — output queue
├── result list[MetadataToken]
└── errors list
```
### `MetadataToken` (output)
```
MetadataToken
├── metadata ConceptMetadata (id, name, key, variables, ...)
├── start int position of the first token of the expression
├── end int position of the last token
├── resolution_method "key" | "name" | "id"
└── parser "sya"
```
### Token positions in `"1 plus 2"`:
```
"1 plus 2"
0 1 2 3 4
│ │ │ │ │
1 _ plus _ 2
MetadataToken: start=0, end=4
```
---
## Differences vs `SimpleConceptsParser`
| | `SimpleConceptsParser` | `SyaConceptsParser` |
|---|---|---|
| Target concepts | No parameters | With parameters |
| `concept_wkf` states | 2 | 8 |
| `result` contents | `MetadataToken` + `UnrecognizedToken` | `MetadataToken` only |
| Parameters | N/A | Collected in `parameters` list |
| Parser tag | `"simple"` | `"sya"` |
| SYA | No | Yes (precedence to implement) |
---
## Error handling
| Error | Cause | State reached |
|---|---|---|
| `UnexpectedToken` | Read token ≠ expected concept token | `TokenMismatch``end` |
| `UnexpectedEof` | Input ends before concept is complete | `ErrorEof``end` |
| `NotEnoughParameters` | Too few params before a segment | Exception raised |
Errors are collected from **all paths** and forwarded to `error_sink` in `parse()`.
A path with errors is excluded from `_select_best_paths`.
---
## Known limitations and proposed improvements
The current implementation correctly handles simple cases (single-token parameters,
non-nested concepts). The following issues must be addressed before enabling
precedence and real concept composition.
### 1. Parameters are limited to a single token
`ReadParameters` reads ONE token, then immediately calls `ManageUnrecognized`, which
returns to `ReadConcept` to match the next keyword segment. Multi-token parameters
therefore fail. For `if hello world then foo end` with parameter `a = "hello world"`:
```
ReadParameters reads "hello"
ManageUnrecognized → UT("hello") → ReadConcept tries to match " then "
ReadConcept reads " " ✓ then "world" ≠ "then" → MISMATCH
```
**Proposed fix:** `ReadParameters` should accumulate tokens until it detects the
start of the next keyword segment (lookahead on `expected[0][0]`), then hand the
full buffer to `ManageUnrecognized` for parsing in one pass.
---
### 2. Flat `parameters` list with no arity tracking
When `FinalizeConceptParsing` runs, `parameters` is a flat list. There is no
information about how many parameters belong to each concept on the stack. Once
`must_pop` is active and multiple concepts are stacked, `FinalizeConceptParsing`
cannot reconstruct the correct nesting.
Example: `1 plus 2 times 3` with `stack = [plus, times]` and
`parameters = [UT("1"), UT("2"), UT("3")]`. Without arity information there is no
way to determine that `times` consumes the last two parameters and `plus` consumes
the first one and the result of `times`.
The arity of each concept (`nb_variables`) is available in `expected` at push time
but is lost once `expected` is consumed during parsing.
**Proposed fix:** record the arity of each concept when it is pushed onto the stack
(in `apply_shunting_yard_algorithm`). `FinalizeConceptParsing` then pops the correct
number of parameters for each concept, from innermost to outermost, building
intermediate `MetadataToken` objects that are re-injected into `parameters` as
`ConceptToken` before processing the next concept on the stack.
---
### 3. Type mismatch in `ManageUnrecognized` for recognized parameters
When `SimpleConceptsParser` recognizes a token sequence, `ManageUnrecognized`
creates:
```python
state_context.parameters.append(
ConceptToken(res.items[0], buffer_start_pos, parser_input.pos - 1)
)
```
`res.items[0]` is a `list[MetadataToken]` (one complete path from
`SimpleConceptsParser`), but `ConceptToken.concept` is typed as `Concept`. Any
downstream code that uses this `ConceptToken` will receive a list where it expects a
`Concept` instance.
**Proposed fix:** define a dedicated container for a recognized parameter (e.g.
`ParsedParameterToken`) that wraps a `list[MetadataToken]` with start/end positions,
or flatten the result to a single `MetadataToken` when `res.items[0]` contains
exactly one token.
---
### 4. Variable-to-parameter mapping not applied
`FinalizeConceptParsing` creates a `MetadataToken` without populating the concept's
variables. `parameters = [UT("1 "), UT("2")]` maps positionally to
`variables = [("a", NotInit), ("b", NotInit)]`, but this mapping is never applied.
The produced `MetadataToken` is therefore incomplete: a downstream evaluator has no
way to retrieve parameter values from the token alone.
**Proposed fix:** in `FinalizeConceptParsing`, zip `parameters` with
`concept.metadata.variables` and store the result in the `MetadataToken`'s metadata,
or pass it as a dedicated field.
---
### 5. `SyaConceptsParser` absent from `other_parsers`
`other_parsers = [SimpleConceptsParser()]`. A parameter can be a simple (parameter-
less) concept, but never a composite concept with parameters. True composition —
where a parameter is itself a SYA-parsed concept — is structurally impossible with
the current design.
**Proposed fix:** add `SyaConceptsParser` to `other_parsers`. A guard is required
to prevent infinite recursion: the nested instance should exclude the concept
currently being recognized from its search space.
---
### Priority order
| # | Issue | Blocking |
|---|---|---|
| 1 | Multi-token parameters | Practical usability |
| 2 | `ConceptToken` type mismatch | Correctness |
| 3 | Variable-to-parameter mapping | Evaluation pipeline |
| 4 | Arity not tracked on the stack | `must_pop` / precedence |
| 5 | `SyaConceptsParser` absent from `other_parsers` | Real composition |
Issues 3 and 4 are interdependent with `must_pop`: implementing them independently
(before activating precedence) is still valuable and lays the correct foundation.
+4 -2
View File
@@ -1,8 +1,10 @@
annotated-doc==0.0.4 annotated-doc==0.0.4
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.13.0 anyio==4.13.0
bcrypt==5.0.0 argon2-cffi==25.1.0
argon2-cffi-bindings==25.1.0
certifi==2026.2.25 certifi==2026.2.25
cffi==2.0.0
charset-normalizer==3.4.7 charset-normalizer==3.4.7
click==8.3.2 click==8.3.2
ecdsa==0.19.2 ecdsa==0.19.2
@@ -13,10 +15,10 @@ httpx==0.28.1
idna==3.11 idna==3.11
iniconfig==2.3.0 iniconfig==2.3.0
packaging==26.0 packaging==26.0
passlib==1.7.4
pluggy==1.6.0 pluggy==1.6.0
prompt_toolkit==3.0.52 prompt_toolkit==3.0.52
pyasn1==0.6.3 pyasn1==0.6.3
pycparser==3.0
pydantic==2.12.5 pydantic==2.12.5
pydantic_core==2.41.5 pydantic_core==2.41.5
Pygments==2.20.0 Pygments==2.20.0
+1 -1
View File
@@ -89,7 +89,7 @@ class Concept:
self._all_attrs = None self._all_attrs = None
def __repr__(self): def __repr__(self):
text = f"(Concept {self._metadata.name}#{self._metadata.id}" text = f"Concept({self._metadata.name}#{self._metadata.id}"
if self._metadata.pre: if self._metadata.pre:
text += f", #pre={self._metadata.pre}" text += f", #pre={self._metadata.pre}"
+14
View File
@@ -18,6 +18,20 @@ class MethodAccessError(SheerkaException):
return f"Cannot access method '{self.method_name}'" return f"Cannot access method '{self.method_name}'"
class NotEnoughParameters(SheerkaException):
"""
Exception when not enough parameters are found during Sya parsing
"""
def __init__(self, concept_to_recognize, expected_nb_parameters, nb_parameters_found):
self.concept = concept_to_recognize
self.expected = expected_nb_parameters
self.found = nb_parameters_found
def get_error_msg(self) -> str:
return f"Failed to parse {self.concept}. Expecting {self.expected} parameters, but only found {self.found}."
@dataclass @dataclass
class ErrorObj: class ErrorObj:
def get_error_msg(self) -> str: def get_error_msg(self) -> str:
+3 -2
View File
@@ -3,7 +3,7 @@ from core.ExecutionContext import ContextActions, ExecutionContext
from core.ReturnValue import ReturnValue from core.ReturnValue import ReturnValue
from core.concept import Concept from core.concept import Concept
from evaluators.base_evaluator import EvaluatorEvalResult, EvaluatorMatchResult, NotForMe, OneReturnValueEvaluator from evaluators.base_evaluator import EvaluatorEvalResult, EvaluatorMatchResult, NotForMe, OneReturnValueEvaluator
from parsers.SimpleParserParser import SimpleConceptsParser from parsers.SimpleConceptsParser import SimpleConceptsParser
from parsers.state_machine import MetadataToken from parsers.state_machine import MetadataToken
@@ -28,7 +28,8 @@ class RecognizeSimpleConcept(OneReturnValueEvaluator):
parser_input = return_value.value.body parser_input = return_value.value.body
parser_input.reset() parser_input.reset()
parsed = self.parser.parse(context, parser_input) error_sink = []
parsed = self.parser.parse(context, parser_input, error_sink)
if len(parsed.items) == 0: if len(parsed.items) == 0:
not_for_me = ReturnValue(self.NAME, False, NotForMe(self.NAME, return_value.value)) not_for_me = ReturnValue(self.NAME, False, NotForMe(self.NAME, return_value.value))
+7
View File
@@ -102,6 +102,13 @@ class MultipleChoices:
return True return True
def __iadd__(self, other):
if not isinstance(other, MultipleChoices):
raise TypeError(f"unsupported operand type(s) for +=: 'MultipleChoices' and '{type(other)}'")
self.items += other.items
return self
def __hash__(self): def __hash__(self):
return hash(tuple(self.items)) return hash(tuple(self.items))
+21
View File
@@ -0,0 +1,21 @@
from core.ExecutionContext import ExecutionContext
from parsers.ParserInput import ParserInput
class BaseParser:
"""
Base class for parser than can be used in concept recognition
"""
def __init__(self, name):
self.name = name # name of the parser
def parse(self, context: ExecutionContext, parser_input: ParserInput, error_sink: list):
"""
Default signature for parsing
:param context:
:param parser_input:
:param error_sink:
:return:
"""
pass
+15
View File
@@ -100,5 +100,20 @@ class ParserInput:
return res return res
@staticmethod
def from_tokens(tokens, text=None):
"""
returns a parser input, given already computed tokens
:param tokens:
:param text:
:return:
"""
res = ParserInput(None)
res.all_tokens = tokens
res.original_text = text or get_text_from_tokens(tokens)
res.pos = -1
res.end = len(res.all_tokens)
return res
def __repr__(self): def __repr__(self):
return f"ParserInput('{self.original_text}', len={len(self.all_tokens)})" return f"ParserInput('{self.original_text}', len={len(self.all_tokens)})"
@@ -1,17 +1,71 @@
from core.concept import DefinitionType from core.concept import DefinitionType
from evaluators.base_evaluator import MultipleChoices from evaluators.base_evaluator import MultipleChoices
from parsers.state_machine import ConceptToRecognize, End, ManageUnrecognized, MetadataToken, PrepareReadTokens, \ from parsers.BaseParser import BaseParser
ReadConcept, ReadTokens, Start, StateMachine, StateMachineContext, UnrecognizedToken from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.state_machine import ConceptToRecognize, End, MetadataToken, PrepareReadTokens, \
ReadTokens, Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken
from parsers.tokenizer import Token, TokenKind, Tokenizer from parsers.tokenizer import Token, TokenKind, Tokenizer
class SimpleConceptsParser: class ReadConcept(State):
def run(self, state_context) -> StateResult:
start = state_context.parser_input.pos
for expected in state_context.concept_to_recognize.expected:
if not state_context.parser_input.next_token(False):
# eof before the concept is recognized
state_context.errors.append(UnexpectedEof(expected, state_context.parser_input.token))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
token = state_context.parser_input.token
if token.value != expected:
# token mismatch
state_context.errors.append(UnexpectedToken(token, expected))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
state_context.result.append(MetadataToken(state_context.concept_to_recognize.metadata,
start,
state_context.parser_input.pos,
state_context.concept_to_recognize.resolution_method,
"simple"))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
if len(state_context.result) > 0 and isinstance(old := state_context.result[-1], UnrecognizedToken):
# merge unrecognized if needed
state_context.result[-1] = UnrecognizedToken(old.buffer + buffer_as_str,
old.start,
state_context.parser_input.pos - 1)
else:
state_context.result.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
# clear the buffer
state_context.buffer.clear()
state_context.buffer_start_pos = state_context.parser_input.pos + 1
return StateResult(self.next_states[0])
class SimpleConceptsParser(BaseParser):
"""" """"
This class to parser concepts with no variable This class is to parse concepts with no parameter
ex : def concept I am a new concept
It parses a sequence of concepts It parses a sequence of concepts
""" """
def __init__(self): def __init__(self):
super().__init__("simple")
tokens_wkf = { tokens_wkf = {
Start("start", next_states=["prepare read tokens"]), Start("start", next_states=["prepare read tokens"]),
PrepareReadTokens("prepare read tokens", next_states=["read tokens"]), PrepareReadTokens("prepare read tokens", next_states=["read tokens"]),
@@ -30,7 +84,6 @@ class SimpleConceptsParser:
"#tokens_wkf": {t.name: t for t in tokens_wkf}, "#tokens_wkf": {t.name: t for t in tokens_wkf},
"#concept_wkf": {t.name: t for t in concept_wkf}, "#concept_wkf": {t.name: t for t in concept_wkf},
} }
self.error_sink = []
@staticmethod @staticmethod
def get_metadata_from_first_token(context, token: Token): def get_metadata_from_first_token(context, token: Token):
@@ -55,12 +108,13 @@ class SimpleConceptsParser:
return concepts_by_key + concepts_by_name return concepts_by_key + concepts_by_name
def parse(self, context, parser_input): def parse(self, context, parser_input, error_sink):
sm = StateMachine(self.workflows) sm = StateMachine(self.workflows)
sm_context = StateMachineContext(context, parser_input, self.get_metadata_from_first_token) sm_context = StateMachineContext(context, parser_input, self.get_metadata_from_first_token, [])
sm.run("#tokens_wkf", "start", sm_context) sm.run("#tokens_wkf", "start", sm_context)
selected = self.select_best_paths(sm) selected = self.select_best_paths(sm)
error_sink.extend(sm_context.errors)
return MultipleChoices(selected) return MultipleChoices(selected)
+344
View File
@@ -0,0 +1,344 @@
from core.concept import DefinitionType
from core.error import NotEnoughParameters
from evaluators.base_evaluator import MultipleChoices
from parsers.BaseParser import BaseParser
from parsers.ParserInput import ParserInput
from parsers.SimpleConceptsParser import SimpleConceptsParser
from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.state_machine import ConceptToRecognize, ConceptToken, End, MetadataToken, PrepareReadTokens, ReadTokens, \
Start, State, StateMachine, StateMachineContext, StateResult, UnrecognizedToken
from parsers.tokenizer import Token, TokenKind, Tokenizer
class InitConceptParsing(State):
"""
A new concept is detected
Add some validations and prepare the list of expected tokens to read
"""
def must_pop(self, current_concept, previous_concept):
return False
def apply_shunting_yard_algorithm(self, state_context):
"""
Apply the sya
for all concepts in the stack
Check the precedence to define the concept must be popped (to result) or not
:param state_context:
:type state_context:
:return:
:rtype:
"""
if len(state_context.stack) > 0:
while self.must_pop(state_context.concept_to_recognize.metadata, state_context.stack[-1].metadata):
state_context.parameters.append(state_context.stack.pop())
state_context.stack.append(state_context.concept_to_recognize)
def run(self, state_context) -> StateResult:
expected = state_context.concept_to_recognize.expected
# check that there is enough parameters
if len(state_context.parameters) < expected[0][1]:
raise NotEnoughParameters(state_context.concept_to_recognize,
expected[0][1],
len(state_context.parameters))
# remove white space before the first token if any
if expected[0][0][0].type == TokenKind.WHITESPACE:
expected[0][0].pop(0)
# pop the first token (as it is already recognized)
expected[0][0].pop(0)
# apply shunting yard algorithm
self.apply_shunting_yard_algorithm(state_context)
return StateResult(self.next_states[0])
class ReadConcept(State):
"""
This state reads the tokens of the concepts that are known (that are not parameters)
For example, given the concept 'let me create the concept x'
We will parse 'let' 'me' 'create' 'the' 'concept'
But we will not parse 'x' because it's a parameter
"""
def run(self, state_context) -> StateResult:
expected = state_context.concept_to_recognize.expected
# eat the tokens
for expected_token in expected[0][0]:
if not state_context.parser_input.next_token(skip_whitespace=False):
# Failed to recognize concept because of eof
state_context.errors.append(UnexpectedEof(expected_token, None))
return StateResult("error eof")
token = state_context.parser_input.token
if expected_token.type != token.type or expected_token.value != token.value:
# Failed to recognize concept because of token mismatch
state_context.errors.append(UnexpectedToken(token, expected_token))
return StateResult("token mismatch")
expected.pop(0)
if not expected:
state_context.concept_to_recognize = None
return StateResult("finalize concept")
else:
return StateResult("read parameters")
class ReadParameters(State):
def run(self, state_context) -> StateResult:
assert not state_context.buffer
if not state_context.parser_input.next_token(False):
return StateResult("finalize concept")
state_context.buffer.append(state_context.parser_input.token)
return StateResult(self.next_states[0])
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
res = MultipleChoices([])
pi = ParserInput.from_tokens(state_context.buffer, text=buffer_as_str)
error_sink = []
# Try to parse the buffer
for parser in state_context.other_parsers:
res += parser.parse(state_context.context, pi, error_sink)
if error_sink:
raise NotImplemented("Cannot manage errors")
if len(res.items) == 0:
state_context.parameters.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
elif len(res.items) == 1:
state_context.parameters.append(ConceptToken(res.items[0],
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
else:
raise NotImplemented("Cannot manage multiple results")
# clear the buffer
state_context.buffer.clear()
state_context.buffer_start_pos = state_context.parser_input.pos + 1
return StateResult(self.next_states[0])
class TokenMismatch(State):
"""
When we realize that we are not parsing the correct concept.
The path ends without adding anything to the result.
"""
def run(self, state_context) -> StateResult:
return StateResult(self.next_states[0])
class ErrorEof(State):
"""
When EOF is detected before the concept is fully parsed.
The path ends without adding anything to the result.
"""
def run(self, state_context) -> StateResult:
return StateResult(self.next_states[0])
class FinalizeConceptParsing(State):
"""
The concept is fully parsed.
Pops the concept from the stack, builds a MetadataToken from it and its
collected parameters, appends it to the result, then returns to the
tokens workflow to continue parsing the rest of the input.
"""
def run(self, state_context) -> StateResult:
concept = state_context.stack.pop()
start = state_context.parameters[0].start if state_context.parameters \
else state_context.buffer_start_pos
end = state_context.parser_input.pos
state_context.result.append(
MetadataToken(concept.metadata, start, end, concept.resolution_method, "sya")
)
state_context.stack.clear()
state_context.parameters.clear()
return StateResult(self.next_states[0])
class SyaConceptsParser(BaseParser):
""""
This class is to parse concepts with parameter
ex : def concept a plus b as a + b
It parses a sequence of concepts
"""
def __init__(self):
super().__init__("sya")
tokens_wkf = {
Start("start", next_states=["prepare read tokens"]),
PrepareReadTokens("prepare read tokens", next_states=["read tokens"]),
ReadTokens("read tokens", next_states=["read tokens", "eof", "concepts found"]),
ManageUnrecognized("eof", next_states=["end"]),
ManageUnrecognized("concepts found", next_states=["#concept_wkf"]),
End("end", next_states=None)
}
concept_wkf = {
Start("start", next_states=["init concept parsing"]),
InitConceptParsing("init concept parsing", ["manage parameters"]),
ManageUnrecognized("manage parameters", next_states=["read concept"]),
ReadConcept("read concept", next_states=["finalize concept", "error eof", "token mismatch", "read parameters"]),
ReadParameters("read parameters", next_states=["manage parameters", "finalize concept"]),
FinalizeConceptParsing("finalize concept", next_states=["#tokens_wkf"]),
ErrorEof("error eof", ["end"]),
TokenMismatch("token mismatch", ["end"]),
End("end", next_states=None)
}
self.workflows = {
"#tokens_wkf": {t.name: t for t in tokens_wkf},
"#concept_wkf": {t.name: t for t in concept_wkf},
}
self.error_sink = []
@staticmethod
def _get_expected_tokens(concept_key):
"""
Return of list of pairs of (expected token, number of expected variable before this token)
ex:
'if x y then z end' => ('if', 0), ('then', 2), ('end', 1)
:param concept_key:
:type concept_key:
:return:
:rtype:
"""
# def custom_strip_tokens(_tokens):
# return _tokens
def custom_strip_tokens(_tokens):
"""
Removes consecutive whitespace tokens
Returns empy list if only whitespace tokens
:param _tokens:
:type _tokens:
:return:
:rtype:
"""
res = []
buffer = None
for t in _tokens:
if t.type == TokenKind.WHITESPACE:
buffer = t
else:
if buffer:
res.append(buffer)
buffer = None
res.append(t)
if res and buffer: # add the buffer only is the result is not empty
res.append(buffer)
return res
expected = [] # tuple of expected token and number of expected variables before this token
tokens = []
nb_variables = 0
parsing_tokens = None # True if we are parsing tokens (and not VAR_DEF)
for token in Tokenizer(concept_key, yield_eof=False):
if token.type == TokenKind.WHITESPACE:
tokens.append(token)
elif token.type == TokenKind.VAR_DEF:
if parsing_tokens is not None and parsing_tokens:
expected.append((custom_strip_tokens(tokens), nb_variables))
nb_variables = 1
tokens = []
parsing_tokens = False
else:
nb_variables += 1
else:
tokens.append(token)
parsing_tokens = True
# do not forget the remaining ones
if tokens or nb_variables:
expected.append((custom_strip_tokens(tokens), nb_variables))
return expected
def get_metadata_from_first_token(self, context, token: Token):
return [ConceptToRecognize(m, self._get_expected_tokens(m.key), "key")
for m in context.sheerka.get_metadatas_from_first_token("key", token.value)
if m.definition_type == DefinitionType.DEFAULT and len(m.parameters) > 0]
def _select_best_paths(self, sm) -> list:
"""Returns the result lists of the highest-scoring error-free paths.
Args:
sm: The StateMachine after execution.
Returns:
A list of result lists, one per best-scoring path.
"""
selected = []
best_score = 1
for path in sm.paths:
if path.execution_context.errors:
continue
score = self._compute_path_score(path)
if score > best_score:
selected.clear()
selected.append(path.execution_context.result)
best_score = score
elif score == best_score:
selected.append(path.execution_context.result)
return selected
@staticmethod
def _compute_path_score(path) -> int:
"""Scores a path by the total token span covered by MetadataTokens.
Args:
path: An ExecutionPath whose result is a list of MetadataToken.
Returns:
Integer score.
"""
return sum(
token.end - token.start + 1
for token in path.execution_context.result
if isinstance(token, MetadataToken)
)
def parse(self, context, parser_input, error_sink):
sm = StateMachine(self.workflows)
sm_context = StateMachineContext(context,
parser_input,
self.get_metadata_from_first_token,
[SimpleConceptsParser()])
sm.run("#tokens_wkf", "start", sm_context)
selected = self._select_best_paths(sm)
for path in sm.paths:
error_sink.extend(path.execution_context.errors)
return MultipleChoices(selected)
+273 -279
View File
@@ -3,330 +3,324 @@ from typing import Any, Literal
from common.utils import str_concept from common.utils import str_concept
from core.ExecutionContext import ExecutionContext from core.ExecutionContext import ExecutionContext
from core.concept import ConceptMetadata from core.concept import Concept, ConceptMetadata
from parsers.ParserInput import ParserInput from parsers.ParserInput import ParserInput
from parsers.parser_utils import UnexpectedEof, UnexpectedToken, get_text_from_tokens
from parsers.tokenizer import Token from parsers.tokenizer import Token
@dataclass @dataclass
class MetadataToken: class MetadataToken:
""" """
Class that represents a text that is recognized as a concept When a concept definition is recognized
We keep track of the start and the end position We keep track of the start and the end position
""" MetadataToken is a shortcut for ConceptMetadataToken
metadata: ConceptMetadata """
start: int metadata: ConceptMetadata # concept that is recognized
end: int start: int # start position in the texts
resolution_method: Literal["name", "key", "id"] end: int # end position
parser: str resolution_method: Literal["name", "key", "id"] # did we use the name, the id or the key to recognize the concept
parser: str # which parser recognized the concept (SimpleConcepts, Sya, ...)
def __repr__(self):
return f"(MetadataToken metadata={str_concept(self.metadata, drop_name=True)}, " + \ def __repr__(self):
f"start={self.start}, end={self.end}, method={self.resolution_method}, origin={self.parser})" return f"(MetadataToken metadata={str_concept(self.metadata, drop_name=True)}, " + \
f"start={self.start}, end={self.end}, method={self.resolution_method}, origin={self.parser})"
def __eq__(self, other):
if not isinstance(other, MetadataToken): def __eq__(self, other):
return False if not isinstance(other, MetadataToken):
return False
return self.metadata.id == other.metadata.id \
and self.start == other.start \ return self.metadata.id == other.metadata.id \
and self.end == other.end \ and self.start == other.start \
and self.parser == other.parser and self.end == other.end \
and self.parser == other.parser
def __hash__(self):
return hash((self.metadata.id, self.start, self.end, self.parser)) def __hash__(self):
return hash((self.metadata.id, self.start, self.end, self.parser))
@dataclass @dataclass
class UnrecognizedToken: class UnrecognizedToken:
""" """
Class that represents a text that is not recognized Class that represents a text that is not recognized (yet)
We keep track of the start and the end position We keep track of the start and the end position
""" """
buffer: str buffer: str
start: int start: int
end: int end: int
@dataclass
class ConceptToken:
"""
When an already defined concept is found during the parsing
We keep track of the start and the end position
"""
concept: Concept
start: int # start position in the texts
end: int # end position
@dataclass @dataclass
class StateResult: class StateResult:
next_state: str | None next_state: str | None
forks: list = None forks: list = None
@dataclass @dataclass
class ConceptToRecognize: class ConceptToRecognize:
""" """
Holds information about the concept to recognize Holds information about the concept to recognize
""" During the parsing, we have a hint on a concept, But we need to finish the parsing to make sure that we are right
metadata: ConceptMetadata """
expected_tokens: list metadata: ConceptMetadata
resolution_method: Literal["name", "key", "id"] # which attribute was used to resolve the concept expected: list[tuple]
resolution_method: Literal["name", "key", "id"] # which attribute was used to resolve the concept
def __repr__(self):
return f"ConceptToRecognize(#{self.metadata.id}, expected={self.expected})"
@dataclass @dataclass
class StateMachineContext: class StateMachineContext:
context: ExecutionContext """
parser_input: ParserInput Internal state of a state machine
get_metadata_from_first_token: Any """
buffer: list[Token] = field(default_factory=list) # initialization
buffer_start_pos: int = -1 context: ExecutionContext
concept_to_recognize: ConceptToRecognize | None = None parser_input: ParserInput
result: list = field(default_factory=list) get_metadata_from_first_token: Any # This is a callback that gives the possible concepts, for a token
errors: list = field(default_factory=list) other_parsers: list # parsers to call when managing unrecognized tokens
def get_clones(self, concepts_to_recognize): # attributes used when parsing token
return [StateMachineContext(self.context, # tokens currently being read
self.parser_input.clone(), buffer: list[Token] = field(default_factory=list)
self.get_metadata_from_first_token, buffer_start_pos: int = -1
self.buffer.copy(),
self.buffer_start_pos, # attributes used when parsing concept
concept, # parameters already recognized + Concept under recognition
self.result.copy(), concept_to_recognize: ConceptToRecognize | None = None
self.errors.copy()) stack: list = field(default_factory=list)
for concept in concepts_to_recognize] parameters: list = field(default_factory=list) # it is called 'output' in shunting yard explanations
def to_debug(self): # runtime info
return {"pos": self.parser_input.pos, result: list = field(default_factory=list) # list of tokens found
"token": self.parser_input.token, errors: list = field(default_factory=list) # error sink
"buffer": [token.value for token in self.buffer],
"concept": str_concept(self.concept_to_recognize.metadata) if self.concept_to_recognize else None, def get_clones(self, concepts_to_recognize):
"result": self.result.copy()} """
Helper function that clone the context when multiple concepts are found
:param concepts_to_recognize:
:return:
"""
return [StateMachineContext(self.context,
self.parser_input.clone(),
self.get_metadata_from_first_token,
self.other_parsers,
self.buffer.copy(),
self.buffer_start_pos,
concept,
self.stack.copy(),
self.parameters.copy(),
self.result.copy(),
self.errors.copy())
for concept in concepts_to_recognize]
def to_debug(self):
return {"pos": self.parser_input.pos,
"token": self.parser_input.token,
"buffer": [token.value for token in self.buffer],
"concept": str_concept(self.concept_to_recognize.metadata) if self.concept_to_recognize else None,
"result": self.result.copy()}
class State: class State:
def __init__(self, name, next_states): def __init__(self, name, next_states):
self.name = name self.name = name
self.next_states = next_states self.next_states = next_states
def run(self, state_context: StateMachineContext) -> StateResult: def run(self, state_context: StateMachineContext) -> StateResult:
pass pass
@staticmethod @staticmethod
def get_forks(next_state, states_contexts: list[StateMachineContext]): def get_forks(next_state, states_contexts: list[StateMachineContext]):
""" """
Create on fork item for every state context Create on fork item for every state context
:param next_state: :param next_state:
:type next_state: :type next_state:
:param states_contexts: :param states_contexts:
:type states_contexts: :type states_contexts:
:return: :return:
:rtype: :rtype:
""" """
return [(next_state, state_context) for state_context in states_contexts] return [(next_state, state_context) for state_context in states_contexts]
def __repr__(self): def __repr__(self):
return f"(State '{self.name}' -> {self.next_states})" return f"(State '{self.name}' -> {self.next_states})"
class Start(State): class Start(State):
def run(self, state_context) -> StateResult: def run(self, state_context) -> StateResult:
# Start state # Start state
# give some logs and ask for the next state # give some logs and ask for the next state
return StateResult(self.next_states[0]) return StateResult(self.next_states[0])
def __repr__(self): def __repr__(self):
return f"(StartState '{self.name}' -> '{self.next_states[0]}')" return f"(StartState '{self.name}' -> '{self.next_states[0]}')"
class PrepareReadTokens(State): class PrepareReadTokens(State):
def run(self, state_context: StateMachineContext) -> StateResult: def run(self, state_context: StateMachineContext) -> StateResult:
state_context.buffer.clear() state_context.buffer.clear()
state_context.buffer_start_pos = state_context.parser_input.pos + 1 state_context.buffer_start_pos = state_context.parser_input.pos + 1
return StateResult(self.next_states[0]) return StateResult(self.next_states[0])
class ReadTokens(State): class ReadTokens(State):
def run(self, state_context) -> StateResult: def run(self, state_context) -> StateResult:
if not state_context.parser_input.next_token(False): if not state_context.parser_input.next_token(False):
return StateResult("eof") return StateResult("eof")
# try to get the possible concepts to recognize # try to get the possible concepts to recognize
concepts = state_context.get_metadata_from_first_token(state_context.context, concepts = state_context.get_metadata_from_first_token(state_context.context,
state_context.parser_input.token) state_context.parser_input.token)
forks = self.get_forks("concepts found", state_context.get_clones(concepts)) if concepts else None forks = self.get_forks("concepts found", state_context.get_clones(concepts)) if concepts else None
state_context.buffer.append(state_context.parser_input.token) state_context.buffer.append(state_context.parser_input.token)
return StateResult(self.name, forks) return StateResult(self.name, forks)
class ManageUnrecognized(State):
def run(self, state_context) -> StateResult:
if state_context.buffer:
buffer_as_str = get_text_from_tokens(state_context.buffer)
if len(state_context.result) > 0 and isinstance(old := state_context.result[-1], UnrecognizedToken):
state_context.result[-1] = UnrecognizedToken(old.buffer + buffer_as_str,
old.start,
state_context.parser_input.pos - 1)
else:
state_context.result.append(UnrecognizedToken(buffer_as_str,
state_context.buffer_start_pos,
state_context.parser_input.pos - 1))
return StateResult(self.next_states[0])
class ReadConcept(State):
def run(self, state_context) -> StateResult:
start = state_context.parser_input.pos
for expected in state_context.concept_to_recognize.expected_tokens:
if not state_context.parser_input.next_token(False):
# eof before the concept is recognized
state_context.errors.append(UnexpectedEof(expected, state_context.parser_input.token))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
token = state_context.parser_input.token
if token.value != expected:
# token mismatch
state_context.errors.append(UnexpectedToken(token, expected))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
state_context.result.append(MetadataToken(state_context.concept_to_recognize.metadata,
start,
state_context.parser_input.pos,
state_context.concept_to_recognize.resolution_method,
"simple"))
state_context.concept_to_recognize = None
return StateResult(self.next_states[0])
class End(State): class End(State):
def run(self, state_context) -> StateResult: def run(self, state_context) -> StateResult:
return StateResult(None) return StateResult(None)
def __repr__(self): def __repr__(self):
return f"(EndState '{self.name}')" return f"(EndState '{self.name}')"
@dataclass @dataclass
class ExecutionPathHistory: class ExecutionPathHistory:
from_state: str from_state: str
execution_context_debug: dict execution_context_debug: dict
to_state: str = "" to_state: str = ""
forks: list[tuple] = None forks: list[tuple] = None
parents: list = None parents: list = None
def clone(self, parent_path_id): def clone(self, parent_path_id):
parents = self.parents.copy() if self.parents else [] parents = self.parents.copy() if self.parents else []
parents.append(parent_path_id) parents.append(parent_path_id)
return ExecutionPathHistory(self.from_state, return ExecutionPathHistory(self.from_state,
self.execution_context_debug.copy(), self.execution_context_debug.copy(),
self.to_state, self.to_state,
self.forks.copy() if self.forks else None, self.forks.copy() if self.forks else None,
parents) parents)
def __repr__(self): def __repr__(self):
return "History(from '{0}', to '{1}', using {2}, forks={3}, parents={4}".format( return "History(from '{0}', to '{1}', using {2}, forks={3}, parents={4}".format(
self.from_state, self.from_state,
self.to_state, self.to_state,
self.execution_context_debug, self.execution_context_debug,
len(self.forks) if self.forks else 0, len(self.forks) if self.forks else 0,
self.parents) self.parents)
@dataclass @dataclass
class ExecutionPath: class ExecutionPath:
path_id: int path_id: int
execution_context: Any execution_context: Any
current_workflow: str current_workflow: str
current_state: str current_state: str
history: list[ExecutionPathHistory] history: list[ExecutionPathHistory]
ended: bool = False ended: bool = False
def clone(self, path_id, new_execution_path, new_workflow, new_state): def clone(self, path_id, new_execution_path, new_workflow, new_state):
return ExecutionPath(path_id, return ExecutionPath(path_id,
new_execution_path, new_execution_path,
new_workflow, new_workflow,
new_state, new_state,
[h.clone(self.path_id) for h in self.history], [h.clone(self.path_id) for h in self.history],
self.ended) self.ended)
def __repr__(self): def __repr__(self):
return f"(Path id={self.path_id}, workflow='{self.current_workflow}', state='{self.current_state}')" return f"(Path id={self.path_id}, workflow='{self.current_workflow}', state='{self.current_state}')"
def get_audit_trail(self): def get_audit_trail(self):
return [h.from_state for h in self.history] return [h.from_state for h in self.history]
class StateMachine: class StateMachine:
def __init__(self, workflows): def __init__(self, workflows):
self.workflows = workflows self.workflows = workflows
self.paths = None self.paths = None
self.last_path_id = -1 self.last_path_id = -1
def run(self, workflow_name: str, state_name: str, execution_context): def run(self, workflow_name: str, state_name: str, execution_context):
""" """
Run the workflow from the state given in parameter Run the workflow from the state given in parameter
:param workflow_name: :param workflow_name:
:type workflow_name: :type workflow_name:
:param state_name: :param state_name:
:type state_name: :type state_name:
:param execution_context: :param execution_context:
:type execution_context: :type execution_context:
:return: :return:
:rtype: :rtype:
""" """
self.last_path_id = -1 # reset the path ids self.last_path_id = -1 # reset the path ids
self.paths = [ExecutionPath(self._get_new_path_id(), self.paths = [ExecutionPath(self._get_new_path_id(),
execution_context, execution_context,
workflow_name, workflow_name,
state_name, state_name,
[], [],
False)] False)]
while True: while True:
to_review = [p for p in self.paths if not p.ended] to_review = [p for p in self.paths if not p.ended]
if len(to_review) == 0: if len(to_review) == 0:
break break
for path in to_review: for path in to_review:
# add traceability # add traceability
history = ExecutionPathHistory(f"{path.current_workflow}:{path.current_state}", history = ExecutionPathHistory(f"{path.current_workflow}:{path.current_state}",
path.execution_context.to_debug()) path.execution_context.to_debug())
path.history.append(history) path.history.append(history)
current_state = self.workflows[path.current_workflow][path.current_state] current_state = self.workflows[path.current_workflow][path.current_state]
res = current_state.run(path.execution_context) res = current_state.run(path.execution_context)
if res.next_state is None: if res.next_state is None:
path.ended = True path.ended = True
continue # not possible to fork ! continue # not possible to fork !
path.current_workflow, path.current_state = self._compute_next_workflow_and_state(path.current_workflow, path.current_workflow, path.current_state = self._compute_next_workflow_and_state(path.current_workflow,
res.next_state) res.next_state)
# update traceability # update traceability
history.to_state = f"{path.current_workflow}:{path.current_state}" history.to_state = f"{path.current_workflow}:{path.current_state}"
# add forks # add forks
if res.forks: if res.forks:
new_paths = [] new_paths = []
for next_state, next_execution_context in res.forks: for next_state, next_execution_context in res.forks:
next_workflow, next_state = self._compute_next_workflow_and_state(path.current_workflow, next_workflow, next_state = self._compute_next_workflow_and_state(path.current_workflow,
next_state) next_state)
new_paths.append(path.clone(self._get_new_path_id(), new_paths.append(path.clone(self._get_new_path_id(),
next_execution_context, next_execution_context,
next_workflow, next_workflow,
next_state)) next_state))
self.paths.extend(new_paths) self.paths.extend(new_paths)
history.forks = [p.path_id for p in new_paths] history.forks = [p.path_id for p in new_paths]
def _get_new_path_id(self): def _get_new_path_id(self):
self.last_path_id += 1 self.last_path_id += 1
return self.last_path_id return self.last_path_id
@staticmethod @staticmethod
def _compute_next_workflow_and_state(workflow, state): def _compute_next_workflow_and_state(workflow, state):
if state.startswith("#"): if state.startswith("#"):
return state, "start" return state, "start"
else: else:
return workflow, state return workflow, state
+16 -12
View File
@@ -1,9 +1,10 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel from pydantic import BaseModel
from starlette import status from starlette import status
@@ -13,7 +14,7 @@ SECRET_KEY = "af95f0590411260f1f127bd7ef9a03409aecadf7729b3e6822b11752433b97b5"
ALGORITHM = "HS256" ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 1 ACCESS_TOKEN_EXPIRE_MINUTES = 1
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") _ph = PasswordHasher()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = { fake_users_db = {
@@ -22,7 +23,7 @@ fake_users_db = {
"firstname": "Kodjo", "firstname": "Kodjo",
"lastname": "Sossouvi", "lastname": "Sossouvi",
"email": "kodjo.sossouvi@gmail.com", "email": "kodjo.sossouvi@gmail.com",
"hashed_password": "$2b$12$fb9jW7QUZ9KIEAAtVmWMEOGtehKy9FafUr7Zfrsb3ZMhsBbzZs7SC", # password is kodjo "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$77SEG+Po+keKEOY01WNFzQ$J0jJ/XcwIHOsM+uB8/eeoaukZBF1zXtGVPmNHA6c+p4", # password is kodjo
"disabled": False, "disabled": False,
}, },
} }
@@ -52,15 +53,16 @@ class UserInDB(User):
hashed_password: str hashed_password: str
def get_password_hash(password: str): def get_password_hash(password: str) -> str:
"""Hash the password using Argon2id.
Args:
password: The plaintext password to hash.
Returns:
The argon2id hash string.
""" """
Hash the password return _ph.hash(password)
:param password:
:type password:
:return:
:rtype:
"""
return pwd_context.hash(password)
def get_user(db, username: str): def get_user(db, username: str):
@@ -74,7 +76,9 @@ def authenticate_user(fake_db, username: str, password: str):
if not user: if not user:
return False return False
if not pwd_context.verify(password, user.hashed_password): try:
_ph.verify(user.hashed_password, password)
except VerifyMismatchError:
return False return False
return user return user
+1 -1
View File
@@ -67,7 +67,7 @@ class ConceptManager(BaseService):
You can define new concept, modify or delete them You can define new concept, modify or delete them
There are also function to help retrieve them easily (like first token cache) There are also function to help retrieve them easily (like first token cache)
Already instantiated concepts are managed by the Memory service Already instantiated concepts are managed by the SheerkaMemory service, not here
""" """
NAME = "ConceptManager" NAME = "ConceptManager"
+1 -1
View File
@@ -6,7 +6,7 @@ from services.BaseService import BaseService
class SheerkaDummyEventManager(BaseService): class SheerkaDummyEventManager(BaseService):
""" """
Manage simple publish and subscribe functions Manage simple publish and subscribe functions
Need to be replaced by a standard in the industry (Redis?) Need to be replaced by a standard in the industry (Kafka, Redis?)
""" """
NAME = "DummyEventManager" NAME = "DummyEventManager"
+76 -59
View File
@@ -1,8 +1,10 @@
import inspect import inspect
from contextlib import contextmanager
import pytest import pytest
from helpers import GetNextId from helpers import GetNextId
from parsers.tokenizer import Token
from server.authentication import User from server.authentication import User
DEFAULT_ONTOLOGY_NAME = "current_test_" DEFAULT_ONTOLOGY_NAME = "current_test_"
@@ -10,88 +12,103 @@ DEFAULT_ONTOLOGY_NAME = "current_test_"
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def sheerka(): def sheerka():
from core.Sheerka import Sheerka from core.Sheerka import Sheerka
sheerka = Sheerka() sheerka = Sheerka()
sheerka.initialize("mem://") sheerka.initialize("mem://")
return sheerka return sheerka
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
def on_new_module(sheerka, request): def on_new_module(sheerka, request):
""" """
For each new module, make sure to create a new ontology For each new module, make sure to create a new ontology
Remove it at the end of the module Remove it at the end of the module
:param sheerka: :param sheerka:
:type sheerka: :type sheerka:
:param request: :param request:
:type request: :type request:
:return: :return:
:rtype: :rtype:
""" """
from core.Event import Event from core.Event import Event
from core.ExecutionContext import ExecutionContext, ContextActions from core.ExecutionContext import ExecutionContext, ContextActions
module_name = request.module.__name__.split(".")[-1] module_name = request.module.__name__.split(".")[-1]
context = ExecutionContext("test", context = ExecutionContext("test",
Event(message=f"Executing module {module_name}"), Event(message=f"Executing module {module_name}"),
sheerka, sheerka,
ContextActions.TESTING, ContextActions.TESTING,
None) None)
ontology = sheerka.om.push_ontology(module_name) ontology = sheerka.om.push_ontology(module_name)
yield yield
sheerka.om.revert_ontology(context, ontology) sheerka.om.revert_ontology(context, ontology)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def context(sheerka): def context(sheerka):
from core.Event import Event from core.Event import Event
from core.ExecutionContext import ExecutionContext, ContextActions from core.ExecutionContext import ExecutionContext, ContextActions
return ExecutionContext("test", return ExecutionContext("test",
Event(message=""), Event(message=""),
sheerka, sheerka,
ContextActions.TESTING, ContextActions.TESTING,
None) None)
@pytest.fixture() @pytest.fixture()
def next_id(): def next_id():
return GetNextId() return GetNextId()
@pytest.fixture() @pytest.fixture()
def user(): def user():
return User(username="johan doe", email="johan.doe@sheerka.com", firstname="johan", lastname="doe") return User(username="johan doe", email="johan.doe@sheerka.com", firstname="johan", lastname="doe")
class TestUsingFileBasedSheerka: class TestUsingFileBasedSheerka:
@pytest.fixture(scope="class") @pytest.fixture(scope="class")
def sheerka(self): def sheerka(self):
sheerka = Sheerka() sheerka = Sheerka()
sheerka.initialize() sheerka.initialize()
return sheerka return sheerka
class NewOntology: class NewOntology:
""" """
For some test who may need to declare the same concepts across the tests For some test who may need to declare the same concepts across the tests
""" """
from core.ExecutionContext import ExecutionContext from core.ExecutionContext import ExecutionContext
def __init__(self, context: ExecutionContext, name=None):
self.sheerka = context.sheerka
self.context = context
self.name = name
self.ontology = None
if self.name is None:
self.name = inspect.stack()[1][3]
def __enter__(self):
self.ontology = self.sheerka.om.push_ontology(self.name)
return self.ontology
def __exit__(self, exc_type, exc_val, exc_tb):
self.sheerka.om.revert_ontology(self.context, self.ontology)
return False
def __init__(self, context: ExecutionContext, name=None):
self.sheerka = context.sheerka
self.context = context
self.name = name
self.ontology = None
if self.name is None: def simple_token_compare(a, b):
self.name = inspect.stack()[1][3] return a.type == b.type and a.value == b.value
def __enter__(self):
self.ontology = self.sheerka.om.push_ontology(self.name)
return self.ontology
def __exit__(self, exc_type, exc_val, exc_tb): @contextmanager
self.sheerka.om.revert_ontology(self.context, self.ontology) def comparable_tokens():
return False eq = Token.__eq__
ne = Token.__ne__
setattr(Token, "__eq__", simple_token_compare)
setattr(Token, "__ne__", lambda a, b: not simple_token_compare(a, b))
yield
setattr(Token, "__eq__", eq)
setattr(Token, "__ne__", ne)
+3 -3
View File
@@ -95,10 +95,10 @@ def test_i_cannot_get_an_attribute_which_is_not_defined():
def test_i_can_repr_a_concept(): def test_i_can_repr_a_concept():
next_id = GetNextId() next_id = GetNextId()
foo = get_concept("foo", sequence=next_id) foo = get_concept("foo", sequence=next_id)
assert repr(foo) == "(Concept foo#1001)" assert repr(foo) == "Concept(foo#1001)"
bar = get_concept("bar", pre="is an int", sequence=next_id) bar = get_concept("bar", pre="is an int", sequence=next_id)
assert repr(bar) == "(Concept bar#1002, #pre=is an int)" assert repr(bar) == "Concept(bar#1002, #pre=is an int)"
baz = get_concept("baz", definition="add a b", variables=["a", "b"], sequence=next_id) baz = get_concept("baz", definition="add a b", variables=["a", "b"], sequence=next_id)
assert repr(baz) == "(Concept baz#1003, a=**NotInit**, b=**NotInit**)" assert repr(baz) == "Concept(baz#1003, a=**NotInit**, b=**NotInit**)"
+360 -387
View File
@@ -1,3 +1,5 @@
from typing import Literal
from common.global_symbols import NotInit from common.global_symbols import NotInit
from common.utils import unstr_concept from common.utils import unstr_concept
from core.ExecutionContext import ExecutionContext from core.ExecutionContext import ExecutionContext
@@ -18,12 +20,12 @@ ATTR_MAP = {
class GetNextId: class GetNextId:
def __init__(self): def __init__(self):
self.seq = 1000 self.seq = 1000
def next(self): def next(self):
self.seq += 1 self.seq += 1
return self.seq return self.seq
def get_concept(name=None, body=None, def get_concept(name=None, body=None,
@@ -43,123 +45,128 @@ def get_concept(name=None, body=None,
is_builtin=False, is_builtin=False,
is_unique=False, is_unique=False,
autouse=False, autouse=False,
sequence=None) -> Concept: sequence=None,
""" init_parameters=True) -> Concept:
Create a Concept objet """
Caution : 'id' and 'key' are not initialized Create a Concept objet
Caution : 'id' and 'key' are not initialized
:param name: :param name:
:type name: :type name:
:param body: :param body:
:type body: :type body:
:param id: :param id:
:type id: :type id:
:param key: :param key:
:type key: :type key:
:param where: :param where:
:type where: :type where:
:param pre: :param pre:
:type pre: :type pre:
:param post: :param post:
:type post: :type post:
:param ret: :param ret:
:type ret: :type ret:
:param definition: :param definition:
:type definition: :type definition:
:param definition_type: :param definition_type:
:type definition_type: :type definition_type:
:param desc: :param desc:
:type desc: :type desc:
:param props: :param props:
:type props: :type props:
:param variables: :param variables:
:type variables: :type variables:
:param parameters: :param parameters:
:type parameters: :type parameters:
:param bound_body: :param bound_body:
:type bound_body: :type bound_body:
:param is_builtin: :param is_builtin:
:type is_builtin: :type is_builtin:
:param is_unique: :param is_unique:
:type is_unique: :type is_unique:
:param autouse: :param autouse:
:type autouse: :type autouse:
:param sequence: :param sequence:
:type sequence: :type sequence:
:return: :return:
:rtype: :rtype:
""" """
metadata = get_metadata( metadata = get_metadata(
name, body, name, body,
id, id,
key, key,
where, where,
pre, pre,
post, post,
ret, ret,
definition, definition,
definition_type, definition_type,
desc, desc,
props, props,
variables, variables,
parameters, parameters,
bound_body, bound_body,
is_builtin, is_builtin,
is_unique, is_unique,
autouse autouse
) )
if sequence: if sequence:
metadata.auto_init(sequence) metadata.auto_init(sequence)
else: else:
metadata.digest = ConceptManager.compute_metadata_digest(metadata) metadata.digest = ConceptManager.compute_metadata_digest(metadata)
metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables) metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables)
return Concept(metadata)
if init_parameters and metadata.variables:
metadata.parameters = [v[0] if isinstance(v, tuple) else v for v in metadata.variables]
return Concept(metadata)
def get_evaluated_concept(blueprint: Concept | ConceptMetadata, **kwargs): def get_evaluated_concept(blueprint: Concept | ConceptMetadata, **kwargs):
""" """
Returns a concept where value are already initialized Returns a concept where value are already initialized
:param blueprint: :param blueprint:
:type blueprint: :type blueprint:
:param kwargs: :param kwargs:
:type kwargs: :type kwargs:
:return: :return:
:rtype: :rtype:
""" """
def _isfloat(num): def _isfloat(num):
try: try:
float(num) float(num)
return True return True
except ValueError: except ValueError:
return False return False
res = Concept(blueprint.get_metadata()) res = Concept(blueprint.get_metadata())
for attr in ATTR_MAP: for attr in ATTR_MAP:
source_code = getattr(res.get_metadata(), attr) source_code = getattr(res.get_metadata(), attr)
if source_code == "" or source_code is None: if source_code == "" or source_code is None:
value = NotInit value = NotInit
elif source_code[0] in ("'", '"'): elif source_code[0] in ("'", '"'):
value = source_code[1:-1] value = source_code[1:-1]
elif source_code in ("True", "False"): elif source_code in ("True", "False"):
value = source_code == "True" value = source_code == "True"
elif source_code.isdecimal(): elif source_code.isdecimal():
value = int(source_code) value = int(source_code)
elif _isfloat(source_code): elif _isfloat(source_code):
value = float(source_code) value = float(source_code)
else: else:
raise Exception(f"Cannot manage {attr=}, {source_code=}") raise Exception(f"Cannot manage {attr=}, {source_code=}")
setattr(res, ATTR_MAP[attr], value) setattr(res, ATTR_MAP[attr], value)
# force values # force values
for k, v in kwargs.items(): for k, v in kwargs.items():
res.set_value(ATTR_MAP.get(k, k), v) res.set_value(ATTR_MAP.get(k, k), v)
res.get_runtime_info().is_evaluated = True res.get_runtime_info().is_evaluated = True
return res return res
def get_metadata(name=None, body=None, def get_metadata(name=None, body=None,
@@ -181,68 +188,68 @@ def get_metadata(name=None, body=None,
autouse=False, autouse=False,
digest=None, digest=None,
all_attrs=None): all_attrs=None):
new_variables = [] new_variables = []
if variables: if variables:
for v in variables: for v in variables:
if isinstance(v, tuple): if isinstance(v, tuple):
new_variables.append(v) new_variables.append(v)
else: else:
new_variables.append((v, NotInit)) new_variables.append((v, NotInit))
return ConceptMetadata( return ConceptMetadata(
id, id,
name, name,
key, key,
is_builtin, is_builtin,
is_unique, is_unique,
body, body,
where, where,
pre, pre,
post, post,
ret, ret,
definition, definition,
definition_type, definition_type,
desc, desc,
autouse, autouse,
bound_body, bound_body,
props or {}, props or {},
tuple(new_variables), tuple(new_variables),
parameters or [], parameters or [],
digest, digest,
all_attrs, all_attrs,
) )
def metadata_auto_init(self: ConceptMetadata, sequence) -> ConceptMetadata: def metadata_auto_init(self: ConceptMetadata, sequence) -> ConceptMetadata:
""" """
Helper function for the unit tests. Helper function for the unit tests.
This method will be added to the `ConceptMetadata` to ease the writing of the unit tests This method will be added to the `ConceptMetadata` to ease the writing of the unit tests
It properly initializes the ConceptMetadata It properly initializes the ConceptMetadata
:param self: :param self:
:type self: :type self:
:param sequence: :param sequence:
:type sequence: :type sequence:
:return: :return:
:rtype: :rtype:
""" """
if not self.id: if not self.id:
self.id = str(sequence.next()) self.id = str(sequence.next())
if not self.key: if not self.key:
self.key = ConceptManager.create_concept_key(self.name, self.definition, self.variables) self.key = ConceptManager.create_concept_key(self.name, self.definition, self.variables)
if not self.is_unique: if not self.is_unique:
self.is_unique = False self.is_unique = False
if not self.is_builtin: if not self.is_builtin:
self.is_builtin = False self.is_builtin = False
if not self.definition_type: if not self.definition_type:
self.definition_type = DefinitionType.DEFAULT self.definition_type = DefinitionType.DEFAULT
if not self.all_attrs: if not self.all_attrs:
self.all_attrs = ConceptManager.compute_all_attrs(self.variables) self.all_attrs = ConceptManager.compute_all_attrs(self.variables)
if not self.digest: if not self.digest:
self.digest = ConceptManager.compute_metadata_digest(self) self.digest = ConceptManager.compute_metadata_digest(self)
# Note that I do not automatically update the digest as I don't want to make unnecessary computations # Note that I do not automatically update the digest as I don't want to make unnecessary computations
return self return self
def metadata_clone(self: ConceptMetadata, name=None, body=None, def metadata_clone(self: ConceptMetadata, name=None, body=None,
@@ -263,75 +270,75 @@ def metadata_clone(self: ConceptMetadata, name=None, body=None,
autouse=None, autouse=None,
digest=None, digest=None,
all_attrs=None) -> ConceptMetadata: all_attrs=None) -> ConceptMetadata:
""" """
Helper function for the unit tests. Helper function for the unit tests.
This method will be added to the `ConceptMetadata` to ease the writing of the unit tests This method will be added to the `ConceptMetadata` to ease the writing of the unit tests
It clones a ConceptMetadata, but can override some attributes if requested It clones a ConceptMetadata, but can override some attributes if requested
:param self: :param self:
:type self: :type self:
:param name: :param name:
:type name: :type name:
:param body: :param body:
:type body: :type body:
:param key: :param key:
:type key: :type key:
:param where: :param where:
:type where: :type where:
:param pre: :param pre:
:type pre: :type pre:
:param post: :param post:
:type post: :type post:
:param ret: :param ret:
:type ret: :type ret:
:param definition: :param definition:
:type definition: :type definition:
:param definition_type: :param definition_type:
:type definition_type: :type definition_type:
:param desc: :param desc:
:type desc: :type desc:
:param props: :param props:
:type props: :type props:
:param variables: :param variables:
:type variables: :type variables:
:param parameters: :param parameters:
:type parameters: :type parameters:
:param bound_body: :param bound_body:
:type bound_body: :type bound_body:
:param is_builtin: :param is_builtin:
:type is_builtin: :type is_builtin:
:param is_unique: :param is_unique:
:type is_unique: :type is_unique:
:param autouse: :param autouse:
:type autouse: :type autouse:
:param digest: :param digest:
:type digest: :type digest:
:param all_attrs: :param all_attrs:
:type all_attrs: :type all_attrs:
:return: :return:
:rtype: :rtype:
""" """
return ConceptMetadata( return ConceptMetadata(
id=self.id, id=self.id,
name=self.name if name is None else name, name=self.name if name is None else name,
body=self.body if body is None else body, body=self.body if body is None else body,
key=self.key if key is None else key, key=self.key if key is None else key,
where=self.where if where is None else where, where=self.where if where is None else where,
pre=self.pre if pre is None else pre, pre=self.pre if pre is None else pre,
post=self.post if post is None else post, post=self.post if post is None else post,
ret=self.ret if ret is None else ret, ret=self.ret if ret is None else ret,
definition=self.definition if definition is None else definition, definition=self.definition if definition is None else definition,
definition_type=self.definition_type if definition_type is None else definition_type, definition_type=self.definition_type if definition_type is None else definition_type,
desc=self.desc if desc is None else desc, desc=self.desc if desc is None else desc,
props=self.props if props is None else props, props=self.props if props is None else props,
variables=self.variables if variables is None else variables, variables=self.variables if variables is None else variables,
parameters=self.parameters if parameters is None else parameters, parameters=self.parameters if parameters is None else parameters,
bound_body=self.bound_body if bound_body is None else bound_body, bound_body=self.bound_body if bound_body is None else bound_body,
is_builtin=self.is_builtin if is_builtin is None else is_builtin, is_builtin=self.is_builtin if is_builtin is None else is_builtin,
is_unique=self.is_unique if is_unique is None else is_unique, is_unique=self.is_unique if is_unique is None else is_unique,
autouse=self.autouse if autouse is None else autouse, autouse=self.autouse if autouse is None else autouse,
digest=self.digest if digest is None else digest, digest=self.digest if digest is None else digest,
all_attrs=self.all_attrs if all_attrs is None else all_attrs, all_attrs=self.all_attrs if all_attrs is None else all_attrs,
) )
# Helpers functions for unit tests # Helpers functions for unit tests
@@ -340,172 +347,138 @@ setattr(ConceptMetadata, 'clone', metadata_clone)
def get_metadatas(*args, **kwargs): def get_metadatas(*args, **kwargs):
as_metadatas = [arg if isinstance(arg, ConceptMetadata) else get_metadata(arg) for arg in args] as_metadatas = [arg if isinstance(arg, ConceptMetadata) else get_metadata(arg) for arg in args]
next_id = kwargs.get("next_id", None) next_id = kwargs.get("next_id", None)
if next_id: if next_id:
for metadata in as_metadatas: for metadata in as_metadatas:
metadata_auto_init(metadata, next_id) metadata_auto_init(metadata, next_id)
return as_metadatas return as_metadatas
def get_concepts(context: ExecutionContext, *concepts, **kwargs) -> list[Concept]: def get_concepts(context: ExecutionContext, *concepts, **kwargs) -> list[Concept]:
""" """
Simple and quick way to get initialize concepts for a test Simple and quick way to get initialize concepts for a test
:param context: :param context:
:type context: :param concepts: Concepts to create
:param concepts: :param kwargs: named parameters to tweak the creation of the concepts
:type concepts: use_sheerka : Adds the new concepts to Sheerka. If not simply creates concepts that do not affect Sheerka
:param kwargs: sequence : Sequence Manager, to give a correct id to the created concepts
:type kwargs: :return: the concepts
:return: """
:rtype: res = []
""" use_sheerka = kwargs.pop("use_sheerka", False)
res = [] sequence = kwargs.pop("sequence", None)
use_sheerka = kwargs.pop("use_sheerka", False) for c in concepts:
sequence = kwargs.pop("sequence", None) if use_sheerka:
for c in concepts: c = define_new_concept(context, c)
if use_sheerka: elif isinstance(c, str):
c = define_new_concept(context, c) c = get_concept(c)
elif isinstance(c, str):
c = get_concept(c) if sequence:
c.get_metadata().auto_init(sequence)
if sequence:
c.get_metadata().auto_init(sequence) res.append(c)
res.append(c) return res
return res
def get_evaluated_concepts(context, *concepts, use_sheerka=False) -> list[Concept]: def get_evaluated_concepts(context, *concepts, use_sheerka=False) -> list[Concept]:
if use_sheerka: if use_sheerka:
return [context.sheerka.evaluate_concept(context, Concept(c.get_metadata())) for c in concepts] return [context.sheerka.evaluate_concept(context, Concept(c.get_metadata())) for c in concepts]
else: else:
return [get_evaluated_concept(concept) for concept in concepts] return [get_evaluated_concept(concept) for concept in concepts]
def define_new_concept(context: ExecutionContext, c: str | Concept | ConceptMetadata) -> Concept: def define_new_concept(context: ExecutionContext, c: str | Concept | ConceptMetadata) -> Concept:
sheerka = context.sheerka sheerka = context.sheerka
if isinstance(c, str): if isinstance(c, str):
retval = sheerka.define_new_concept(context, c) retval = sheerka.define_new_concept(context, c)
else: else:
metadata = c.get_metadata() metadata = c.get_metadata()
retval = sheerka.define_new_concept(context, retval = sheerka.define_new_concept(context,
metadata.name, metadata.name,
metadata.is_builtin, metadata.is_builtin,
metadata.is_unique, metadata.is_unique,
metadata.body, metadata.body,
metadata.where, metadata.where,
metadata.pre, metadata.pre,
metadata.post, metadata.post,
metadata.ret, metadata.ret,
metadata.definition, metadata.definition,
metadata.definition_type, metadata.definition_type,
metadata.autouse, metadata.autouse,
metadata.bound_body, metadata.bound_body,
metadata.desc, metadata.desc,
metadata.props, metadata.props,
metadata.variables, metadata.variables,
metadata.parameters) metadata.parameters)
assert retval.status assert retval.status
concept = sheerka.newi(retval.value.metadata.id) concept = sheerka.newi(retval.value.metadata.id)
return concept return concept
def get_file_content(file_name): def get_file_content(file_name):
with open(file_name) as f: with open(file_name) as f:
return f.read() return f.read()
def get_parser_input(text): def get_parser_input(text):
pi = ParserInput(text) pi = ParserInput(text)
assert pi.init() assert pi.init()
return pi return pi
def get_from(*args, **kwargs): def get_from(*args, **kwargs):
""" """
Convert the input to fix the positions Convert the input to fix the positions
:param args: :param args:
:type args: :type args:
:return: :return:
:rtype: :rtype:
""" """
cache = {} # I keep the name in cache to avoid having to remind it everytime cache = {} # I keep the name in cache to avoid having to remind it everytime
pos = 0 pos = 0
res = [] res = []
for item in args: for item in args:
start = pos start = pos
if isinstance(item, MetadataToken): if isinstance(item, MetadataToken):
if item.metadata.name: if item.metadata.name:
cache[item.metadata.id] = item.metadata.name cache[item.metadata.id] = item.metadata.name
tokens = list(Tokenizer(cache[item.metadata.id], yield_eof=False)) tokens = list(Tokenizer(cache[item.metadata.id], yield_eof=False))
pos += len(tokens) pos += len(tokens)
resolution_method = kwargs.get("resolution_method", item.resolution_method) resolution_method = kwargs.get("resolution_method", item.resolution_method)
parser = kwargs.get("parser", item.parser) parser = kwargs.get("parser", item.parser)
res.append(MetadataToken(item.metadata, start, pos - 1, resolution_method, parser)) res.append(MetadataToken(item.metadata, start, pos - 1, resolution_method, parser))
elif isinstance(item, UnrecognizedToken): elif isinstance(item, UnrecognizedToken):
tokens = list(Tokenizer(item.buffer, yield_eof=False)) tokens = list(Tokenizer(item.buffer, yield_eof=False))
pos += len(tokens) pos += len(tokens)
res.append(UnrecognizedToken(item.buffer, start, pos - 1)) res.append(UnrecognizedToken(item.buffer, start, pos - 1))
return res return res
def _rv(value, who="Test"): def _rv(value, who="Test"):
return ReturnValue(who=who, status=True, value=value) return ReturnValue(who=who, status=True, value=value)
def _rvc(concept_name, who="Test"): def _rvc(concept_name, who="Test"):
next_id = GetNextId() next_id = GetNextId()
concept = get_concept(concept_name, sequence=next_id) concept = get_concept(concept_name, sequence=next_id)
return ReturnValue(who=who, status=True, value=concept) return ReturnValue(who=who, status=True, value=concept)
def _rvf(value, who="Test"): def _rvf(value, who="Test"):
""" """
Return Value False Return Value False
:param value: :param value:
:type value: :type value:
:return: :return:
:rtype: :rtype:
""" """
return ReturnValue(who=who, status=False, value=value) return ReturnValue(who=who, status=False, value=value)
def _ut(buffer, start=0, end=-1):
"""
helper to UnrecognizedToken
:param buffer:
:type buffer:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
return UnrecognizedToken(buffer, start, end)
def _mt(concept_id, start=0, end=-1, resolution_method="id", parser="simple"):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
if _id is None:
return MetadataToken(get_metadata(id=concept_id), start, end, resolution_method, parser)
else:
return MetadataToken(get_metadata(id=_id, name=name), start, end, resolution_method, parser)
-62
View File
@@ -1,62 +0,0 @@
import logging
from multiprocessing import Process
from time import sleep
import uvicorn
from fastapi import FastAPI
class MockServer:
""" Core application to test. """
def __init__(self, endpoints: list[dict]):
"""
:param endpoints:
:type endpoints: list of {path: '', response:''}
"""
self.api = FastAPI()
def raise_exception(ex):
raise ex
# register endpoints
for endpoint in endpoints:
method = endpoint["method"] if "method" in endpoint else "get"
if method == "post":
if "exception" in endpoint:
self.api.post(endpoint["path"])(lambda: raise_exception(endpoint["exception"]))
else:
self.api.post(endpoint["path"])(lambda: endpoint["response"])
else:
self.api.get(endpoint["path"])(lambda: endpoint["response"])
# register shutdown
self.api.on_event("shutdown")(self.close)
# create the process
self.proc = Process(target=uvicorn.run,
args=(self.api,),
kwargs={
"host": "127.0.0.1",
"port": 5000,
"log_level": "info"},
daemon=True)
async def close(self):
""" Gracefull shutdown. """
logging.warning("Shutting down the app.")
def start_server(self):
self.proc.start()
sleep(0.1)
def stop_server(self):
self.proc.terminate()
def __enter__(self):
self.start_server()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_server()
+105
View File
@@ -0,0 +1,105 @@
from typing import Literal
from common.utils import str_concept, unstr_concept
from helpers import get_metadata
from parsers.state_machine import MetadataToken, UnrecognizedToken
class MetadataTokenForTest(MetadataToken):
def __repr__(self):
res = f"(MetadataTokenForTest metadata={str_concept(self.metadata, drop_name=True)}"
if self.start is not None:
res += f", start={self.start}"
if self.end is not None:
res += f", end={self.end}"
if self.resolution_method is not None:
res += f", method={self.resolution_method}"
if self.parser is not None:
res += f", origin={self.parser}"
res += ")"
return res
def __eq__(self, other):
if not isinstance(other, MetadataToken):
return False
if self.metadata.id != other.metadata.id:
return False
if self.start is not None and self.start != other.start:
return False
if self.end is not None and self.end != other.end:
return False
if self.parser is not None and self.parser != other.parser:
return False
if self.resolution_method is not None and self.resolution_method != other.resolution_method:
return False
return True
def _ut(buffer, start=0, end=-1):
"""
helper to UnrecognizedToken
:param buffer:
:type buffer:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
return UnrecognizedToken(buffer, start, end)
def _mt(concept_id,
start=0,
end=-1,
resolution_method: Literal["name", "key", "id"] = "key",
parser="simple",
**kwargs):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
variables = [(k, v) for k, v in kwargs.items()] if kwargs else None
metadata = get_metadata(id=concept_id, variables=variables) if _id is None \
else get_metadata(id=_id, name=name, variables=variables)
return MetadataTokenForTest(metadata, start, end, resolution_method, parser)
def _mtsya(concept_id,
start=0,
end=None,
resolution_method: Literal["name", "key", "id"] = "key",
parser="sya",
**kwargs):
"""
helper to MetadataToken
:param concept_id:
:type concept_id:
:param start:
:type start:
:param end:
:type end:
:return:
:rtype:
"""
name, _id = unstr_concept(concept_id)
variables = [(k, v) for k, v in kwargs.items()] if kwargs else None
metadata = get_metadata(id=concept_id, variables=variables) if _id is None \
else get_metadata(id=_id, name=name, variables=variables)
return MetadataTokenForTest(metadata, start, end, resolution_method, parser)
+143 -134
View File
@@ -3,140 +3,149 @@ import pytest
from base import BaseTest from base import BaseTest
from conftest import NewOntology from conftest import NewOntology
from evaluators.base_evaluator import MultipleChoices from evaluators.base_evaluator import MultipleChoices
from helpers import _mt, _ut, get_concepts, get_from, get_metadata, get_parser_input from helpers import get_concepts, get_from, get_metadata, get_parser_input
from parsers.SimpleParserParser import SimpleConceptsParser from parsers.SimpleConceptsParser import SimpleConceptsParser
from tests.parsers.conftest import _mt, _ut
class TestSimpleConceptsParser(BaseTest): class TestSimpleConceptsParser(BaseTest):
@pytest.fixture() @pytest.fixture()
def parser(self): def parser(self):
return SimpleConceptsParser() return SimpleConceptsParser()
@pytest.mark.parametrize("text, expected", [ @pytest.mark.parametrize("text, expected", [
("I am a new concept", [_mt("1003", 0, 8)]), ("I am a new concept", [_mt("1003", 0, 8)]),
("xxx yyy I am a new concept", [_ut("xxx yyy ", 0, 3), _mt("1003", 4, 12)]), ("xxx yyy I am a new concept", [_ut("xxx yyy ", 0, 3), _mt("1003", 4, 12)]),
("I am a new concept xxx yyy", [_mt("1003", 0, 8), _ut(" xxx yyy", 9, 12)]), ("I am a new concept xxx yyy", [_mt("1003", 0, 8), _ut(" xxx yyy", 9, 12)]),
("xxx I am a new concept yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 10), _ut(" yyy", 11, 12)]), ("xxx I am a new concept yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 10), _ut(" yyy", 11, 12)]),
("c:#1003:", [_mt("1003", 0, 0)]), ("c:#1003:", [_mt("1003", 0, 0, resolution_method="id")]),
("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2), _ut(" yyy", 3, 4)]), ("xxx c:#1003: yyy", [_ut("xxx ", 0, 1), _mt("1003", 2, 2, resolution_method="id"), _ut(" yyy", 3, 4)]),
("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2), _ut(" yyy", 3, 4)]), ("xxx c:I am: yyy", [_ut("xxx ", 0, 1), _mt("1002", 2, 2, resolution_method="name"), _ut(" yyy", 3, 4)]),
(" I am a new concept", [_ut(" ", 0, 0), _mt("1003", 1, 9)]) (" I am a new concept", [_ut(" ", 0, 0), _mt("1003", 1, 9)])
]) ])
def test_i_can_recognize_a_concept(self, context, parser, text, expected): def test_i_can_recognize_a_concept(self, context, parser, text, expected):
with NewOntology(context, "test_i_can_recognize_a_concept"): with NewOntology(context, "test_i_can_recognize_a_concept"):
get_concepts(context, "I", "I am", "I am a new concept", use_sheerka=True) get_concepts(context, "I", "I am", "I am a new concept", use_sheerka=True)
pi = get_parser_input(text) pi = get_parser_input(text)
res = parser.parse(context, pi) error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([expected])
assert not parser.error_sink assert res == MultipleChoices([expected])
assert not error_sink
@pytest.mark.parametrize("text, expected", [
("foo", [_mt("1001", 0, 0)]), @pytest.mark.parametrize("text, expected", [
("I am a new concept", [_mt("1001", 0, 8)]) ("foo", [_mt("1001", 0, 0, resolution_method="name")]),
]) ("I am a new concept", [_mt("1001", 0, 8)])
def test_i_can_recognize_a_concept_by_its_name_and_its_definition(self, context, parser, text, expected): ])
with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_and_its_definition"): def test_i_can_recognize_a_concept_by_its_name_and_its_definition(self, context, parser, text, expected):
get_concepts(context, get_metadata(name="foo", definition="I am a new concept"), use_sheerka=True) with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_and_its_definition"):
get_concepts(context, get_metadata(name="foo", definition="I am a new concept"), use_sheerka=True)
pi = get_parser_input(text)
res = parser.parse(context, pi) pi = get_parser_input(text)
error_sink = []
assert res == MultipleChoices([expected]) res = parser.parse(context, pi, error_sink)
assert not parser.error_sink
assert res == MultipleChoices([expected])
@pytest.mark.parametrize("text, expected", [ assert not error_sink
("long concept name", [_mt("1001", 0, 4)]),
("I am a new concept", [_mt("1001", 0, 8)]) @pytest.mark.parametrize("text, expected", [
]) ("long concept name", [_mt("1001", 0, 4, resolution_method="name")]),
def test_i_can_recognize_a_concept_by_its_name_when_long_name(self, context, parser, text, expected): ("I am a new concept", [_mt("1001", 0, 8)])
with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_when_long_name"): ])
get_concepts(context, get_metadata(name="long concept name", definition="I am a new concept"), def test_i_can_recognize_a_concept_by_its_name_when_long_name(self, context, parser, text, expected):
use_sheerka=True) with NewOntology(context, "test_i_can_recognize_a_concept_by_its_name_when_long_name"):
get_concepts(context, get_metadata(name="long concept name", definition="I am a new concept"),
pi = get_parser_input(text) use_sheerka=True)
res = parser.parse(context, pi)
pi = get_parser_input(text)
assert res == MultipleChoices([expected]) error_sink = []
assert not parser.error_sink res = parser.parse(context, pi, error_sink)
def test_i_can_parse_a_sequence_of_concept(self, context, parser): assert res == MultipleChoices([expected])
with NewOntology(context, "test_i_can_parse_a_sequence_of_concept"): assert not error_sink
get_concepts(context, "foo bar", "baz", "qux", use_sheerka=True)
def test_i_can_parse_a_sequence_of_concept(self, context, parser):
pi = get_parser_input("foo bar baz foo, qux") with NewOntology(context, "test_i_can_parse_a_sequence_of_concept"):
res = parser.parse(context, pi) get_concepts(context, "foo bar", "baz", "qux", use_sheerka=True)
expected = [_mt("1001", 0, 2), pi = get_parser_input("foo bar baz foo, qux")
_ut(" ", 3, 3), error_sink = []
_mt("1002", 4, 4), res = parser.parse(context, pi, error_sink)
_ut(" foo, ", 5, 8),
_mt("1003", 9, 9)] expected = [_mt("1001", 0, 2),
_ut(" ", 3, 3),
assert res == MultipleChoices([expected]) _mt("1002", 4, 4),
assert not parser.error_sink _ut(" foo, ", 5, 8),
_mt("1003", 9, 9)]
def test_i_can_detect_multiple_choices(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices"): assert res == MultipleChoices([expected])
get_concepts(context, "foo bar", "bar baz", use_sheerka=True) assert not error_sink
pi = get_parser_input("foo bar baz") def test_i_can_detect_multiple_choices(self, context, parser):
res = parser.parse(context, pi) with NewOntology(context, "test_i_can_detect_multiple_choices"):
get_concepts(context, "foo bar", "bar baz", use_sheerka=True)
expected1 = [_mt("1001", 0, 2), _ut(" baz", 3, 4)]
expected2 = [_ut("foo ", 0, 1), _mt("1002", 2, 4)] pi = get_parser_input("foo bar baz")
error_sink = []
assert res == MultipleChoices([expected1, expected2]) res = parser.parse(context, pi, error_sink)
assert not parser.error_sink
expected1 = [_mt("1001", 0, 2), _ut(" baz", 3, 4)]
def test_i_can_detect_multiple_choices_2(self, context, parser): expected2 = [_ut("foo ", 0, 1), _mt("1002", 2, 4)]
with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
get_concepts(context, "one two", "one", "two", use_sheerka=True) assert res == MultipleChoices([expected1, expected2])
assert not error_sink
pi = get_parser_input("one two")
res = parser.parse(context, pi) def test_i_can_detect_multiple_choices_2(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
expected1 = [_mt("1001", 0, 2)] get_concepts(context, "one two", "one", "two", use_sheerka=True)
expected2 = [_mt("1002", 0, 0), _ut(" ", 1, 1), _mt("1003", 2, 2)]
pi = get_parser_input("one two")
assert res == MultipleChoices([expected1, expected2]) error_sink = []
assert not parser.error_sink res = parser.parse(context, pi, error_sink)
def test_i_can_detect_multiple_choices_3(self, context, parser): expected1 = [_mt("1001", 0, 2)]
with NewOntology(context, "test_i_can_detect_multiple_choices_2"): expected2 = [_mt("1002", 0, 0), _ut(" ", 1, 1), _mt("1003", 2, 2)]
get_concepts(context, "one two", "one", "two", use_sheerka=True)
assert res == MultipleChoices([expected1, expected2])
pi = get_parser_input("one two xxx one two") assert not error_sink
res = parser.parse(context, pi)
def test_i_can_detect_multiple_choices_3(self, context, parser):
e1 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:#1001:")) with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
e2 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:one two#1001:")) get_concepts(context, "one two", "one", "two", use_sheerka=True)
e3 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"))
e4 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:#1002:"), _ut(" "), pi = get_parser_input("one two xxx one two")
_mt("c:#1003:")) error_sink = []
res = parser.parse(context, pi, error_sink)
assert res == MultipleChoices([e1, e2, e3, e4])
assert not parser.error_sink e1 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:#1001:"))
e2 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:one two#1001:"))
def test_nothing_is_return_is_no_concept_is_recognized(self, context, parser): e3 = get_from(_mt("c:one two#1001:"), _ut(" xxx "), _mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"))
pi = get_parser_input("one two three") e4 = get_from(_mt("c:one#1002:"), _ut(" "), _mt("c:two#1003:"), _ut(" xxx "), _mt("c:#1002:"), _ut(" "),
res = parser.parse(context, pi) _mt("c:#1003:"))
assert res == MultipleChoices([]) assert res == MultipleChoices([e1, e2, e3, e4])
assert not error_sink
def test_i_can_manage_attribute_reference(self, context, parser):
with NewOntology(context, "test_i_can_detect_multiple_choices_2"): def test_nothing_is_return_is_no_concept_is_recognized(self, context, parser):
get_concepts(context, "foo", "i am a concept", use_sheerka=True) pi = get_parser_input("one two three")
error_sink = []
pi = get_parser_input("foo.attribute") res = parser.parse(context, pi, error_sink)
res = parser.parse(context, pi)
expected = [_mt("1001", 0, 0), _ut(".attribute", 1, 2)] assert res == MultipleChoices([])
assert res == MultipleChoices([expected])
def test_i_can_manage_attribute_reference(self, context, parser):
pi = get_parser_input("i am a concept.attribute") with NewOntology(context, "test_i_can_detect_multiple_choices_2"):
res = parser.parse(context, pi) get_concepts(context, "foo", "i am a concept", use_sheerka=True)
expected = [_mt("1002", 0, 6), _ut(".attribute", 7, 8)]
assert res == MultipleChoices([expected]) pi = get_parser_input("foo.attribute")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mt("1001", 0, 0), _ut(".attribute", 1, 2)]
assert res == MultipleChoices([expected])
pi = get_parser_input("i am a concept.attribute")
res = parser.parse(context, pi, error_sink)
expected = [_mt("1002", 0, 6), _ut(".attribute", 7, 8)]
assert res == MultipleChoices([expected])
+94
View File
@@ -0,0 +1,94 @@
import pytest
from base import BaseTest
from conftest import NewOntology, comparable_tokens
from evaluators.base_evaluator import MultipleChoices
from helpers import get_concept, get_concepts, get_parser_input
from parsers.SyaConceptsParser import SyaConceptsParser
from parsers.tokenizer import Tokenizer
from tests.parsers.conftest import _mtsya
class TestSyaConceptsParser(BaseTest):
@pytest.fixture()
def parser(self):
return SyaConceptsParser()
@pytest.mark.parametrize("concept_key, expected_list", [
["a long token name", [("a long token name", 0)]],
["__var__0 __var__1 __var__2", [("", 3)]],
["__var__0 __var__1 prefixed", [(" prefixed", 2)]],
["suffixed __var__0 __var__1", [("suffixed ", 0), ["", 2]]],
["__var__0 __var__1 infixed __var__0 __var__1", [(" infixed ", 2), ["", 2]]],
["if __var__0 __var__1 then __var__2 end", [("if ", 0), (" then ", 2), (" end", 1)]]
])
def test_i_can_initialize_expected_parameters(self, parser, concept_key, expected_list):
resolved_expected_list = [(list(Tokenizer(source, yield_eof=False)), nb) for source, nb in expected_list]
actual = parser._get_expected_tokens(concept_key)
with comparable_tokens():
assert actual == resolved_expected_list
@pytest.mark.parametrize("concept, _input", [
(get_concept("a plus b", variables=["a", "b"]), "1 plus 2"),
(get_concept("add a b", variables=["a", "b"]), "add 1 2"),
(get_concept("a b add", variables=["a", "b"]), "1 2 add")
])
def test_i_can_parse_a_simple_case(self, context, parser, concept, _input):
with NewOntology(context, "test_i_can_parse_a_simple_case"):
get_concepts(context, concept, use_sheerka=True)
pi = get_parser_input(_input)
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mtsya("1001", a="1 ", b=" 2")]
assert res == MultipleChoices([expected])
assert not error_sink
def test_i_can_parse_long_names_concept(self, context, parser):
with NewOntology(context, "test_i_can_parse_a_simple_case"):
get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 long named concept 2")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [_mtsya("1001", a="1 ", b=" 2")]
assert res == MultipleChoices([expected])
assert not error_sink
def test_i_can_parse_sequence(self, context, parser):
with NewOntology(context, "test_i_can_parse_sequence"):
get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 plus 2 3 plus 7")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]]
assert res == MultipleChoices(expected)
assert not error_sink
def test_not_enough_parameters(self, context, parser):
with NewOntology(context, "test_not_enough_parameters"):
get_concepts(context, get_concept("a plus b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 plus ")
error_sink = []
res = parser.parse(context, pi, error_sink)
expected = [[_mtsya("1001", a="1 ", b=" 2")], [_mtsya("1001", a=" 3 ", b=" 7")]]
assert res == MultipleChoices(expected)
assert not error_sink
def test_i_can_detect_when_name_does_not_match(self, context, parser):
with NewOntology(context, "test_i_can_detect_when_name_does_not_match"):
get_concepts(context, get_concept("a long named concept b", variables=["a", "b"]), use_sheerka=True)
pi = get_parser_input("1 long named mismatch 2")
error_sink = []
res = parser.parse(context, pi, error_sink)
assert error_sink
+20 -20
View File
@@ -1,11 +1,11 @@
from unittest.mock import MagicMock, patch
from fastapi import HTTPException from fastapi import HTTPException
from starlette import status from starlette import status
from client import SheerkaClient, parse_arguments from client import SheerkaClient, parse_arguments
from mockserver import MockServer
# @pytest.mark.skip("too long")
class TestSheerkaClient: class TestSheerkaClient:
def test_i_can_start_with_a_default_hostname(self): def test_i_can_start_with_a_default_hostname(self):
parsed = parse_arguments([]) parsed = parse_arguments([])
@@ -41,7 +41,11 @@ class TestSheerkaClient:
assert res.message == "Connection refused." assert res.message == "Connection refused."
def test_i_can_manage_when_resource_is_not_found(self): def test_i_can_manage_when_resource_is_not_found(self):
with MockServer([]): mock_response = MagicMock()
mock_response.__bool__ = MagicMock(return_value=False)
mock_response.text = '{"detail":"Not Found"}'
with patch("requests.get", return_value=mock_response):
client = SheerkaClient("http://localhost", 5000) client = SheerkaClient("http://localhost", 5000)
res = client.check_url() res = client.check_url()
@@ -49,29 +53,25 @@ class TestSheerkaClient:
assert res.message == '{"detail":"Not Found"}' assert res.message == '{"detail":"Not Found"}'
def test_i_can_connect_to_a_server(self): def test_i_can_connect_to_a_server(self):
with MockServer([{ mock_response = MagicMock()
"path": "/", mock_response.__bool__ = MagicMock(return_value=True)
"response": "Hello world" mock_response.text = '"Hello world"'
}]):
with patch("requests.get", return_value=mock_response):
client = SheerkaClient("http://localhost", 5000) client = SheerkaClient("http://localhost", 5000)
res = client.check_url() res = client.check_url()
assert res.status assert res.status
assert res.message == '"Hello world"' assert res.message == '"Hello world"'
def test_i_can_manage_when_authentication_fails(self): def test_i_can_manage_when_authentication_fails(self):
with MockServer([{ mock_response = MagicMock()
"path": "/", mock_response.__bool__ = MagicMock(return_value=False)
"response": "Hello world" mock_response.json.return_value = {"detail": "Incorrect username or password"}
}, {
"method": "post", with patch("requests.post", return_value=mock_response):
"path": "/token",
"exception": HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
}]):
client = SheerkaClient("http://localhost", 5000) client = SheerkaClient("http://localhost", 5000)
res = client.connect("username", "wrong_password") res = client.connect("username", "wrong_password")
assert not res.status assert not res.status
assert res.message == 'Incorrect username or password' assert res.message == "Incorrect username or password"
+226 -225
View File
@@ -2,279 +2,280 @@ import pytest
from common.global_symbols import NotInit from common.global_symbols import NotInit
from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType
from helpers import GetNextId, _mt, _ut, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \ from helpers import GetNextId, get_concept, get_concepts, get_evaluated_concept, get_from, get_metadata, \
get_metadatas get_metadatas
from tests.parsers.conftest import _mt, _ut
def test_i_can_get_default_value_when_get_metadata(): def test_i_can_get_default_value_when_get_metadata():
metadata = get_metadata() metadata = get_metadata()
assert metadata.id is None assert metadata.id is None
assert metadata.name is None assert metadata.name is None
assert metadata.name is None assert metadata.name is None
assert metadata.body is None assert metadata.body is None
assert metadata.id is None assert metadata.id is None
assert metadata.key is None assert metadata.key is None
assert metadata.where is None assert metadata.where is None
assert metadata.pre is None assert metadata.pre is None
assert metadata.post is None assert metadata.post is None
assert metadata.ret is None assert metadata.ret is None
assert metadata.definition is None assert metadata.definition is None
assert metadata.definition_type == DefinitionType.DEFAULT assert metadata.definition_type == DefinitionType.DEFAULT
assert metadata.desc is None assert metadata.desc is None
assert metadata.props == {} assert metadata.props == {}
assert metadata.variables == tuple() assert metadata.variables == tuple()
assert metadata.parameters == [] assert metadata.parameters == []
assert metadata.bound_body is None assert metadata.bound_body is None
assert metadata.is_builtin is False assert metadata.is_builtin is False
assert metadata.is_unique is False assert metadata.is_unique is False
assert metadata.autouse is False assert metadata.autouse is False
def test_i_can_use_shortcut_to_declare_variables(): def test_i_can_use_shortcut_to_declare_variables():
metadata = get_metadata(variables=(("var1", NotInit), ("var2", "value"))) metadata = get_metadata(variables=(("var1", NotInit), ("var2", "value")))
assert metadata.variables == (("var1", NotInit), ("var2", "value")) # default behaviour assert metadata.variables == (("var1", NotInit), ("var2", "value")) # default behaviour
metadata = get_metadata(variables=[("var1", NotInit), ("var2", "value")]) metadata = get_metadata(variables=[("var1", NotInit), ("var2", "value")])
assert metadata.variables == (("var1", NotInit), ("var2", "value")) # lists are transformed into tuples assert metadata.variables == (("var1", NotInit), ("var2", "value")) # lists are transformed into tuples
metadata = get_metadata(variables=["var1", "var2"]) metadata = get_metadata(variables=["var1", "var2"])
assert metadata.variables == (("var1", NotInit), ("var2", NotInit)) # expanded assert metadata.variables == (("var1", NotInit), ("var2", NotInit)) # expanded
def test_i_can_clone(): def test_i_can_clone():
metadata = ConceptMetadata( metadata = ConceptMetadata(
"id", "id",
"name", "name",
"key", "key",
True, True,
True, True,
"body", "body",
"where", "where",
"pre", "pre",
"post", "post",
"ret", "ret",
"definition", "definition",
DefinitionType.BNF, DefinitionType.BNF,
"desc", "desc",
True, True,
"bound_body", "bound_body",
{"prop": "value"}, {"prop": "value"},
(("variable", "value"),), (("variable", "value"),),
("p1",), ("p1",),
"digest", "digest",
("all_attr",), ("all_attr",),
) )
clone = metadata.clone() clone = metadata.clone()
for attr, value in vars(metadata).items(): for attr, value in vars(metadata).items():
clone_value = getattr(clone, attr) clone_value = getattr(clone, attr)
assert clone_value == value assert clone_value == value
def test_i_can_override_values_when_i_clone_metadata(): def test_i_can_override_values_when_i_clone_metadata():
metadata = get_metadata() metadata = get_metadata()
assert metadata.clone(name="new_name").name == "new_name" assert metadata.clone(name="new_name").name == "new_name"
assert metadata.clone(body="new_body").body == "new_body" assert metadata.clone(body="new_body").body == "new_body"
assert metadata.clone(key="new_key").key == "new_key" assert metadata.clone(key="new_key").key == "new_key"
assert metadata.clone(where="new_where").where == "new_where" assert metadata.clone(where="new_where").where == "new_where"
assert metadata.clone(pre="new_pre").pre == "new_pre" assert metadata.clone(pre="new_pre").pre == "new_pre"
assert metadata.clone(post="new_post").post == "new_post" assert metadata.clone(post="new_post").post == "new_post"
assert metadata.clone(ret="new_ret").ret == "new_ret" assert metadata.clone(ret="new_ret").ret == "new_ret"
assert metadata.clone(definition="new_definition").definition == "new_definition" assert metadata.clone(definition="new_definition").definition == "new_definition"
assert metadata.clone(definition_type="new_definition_type").definition_type == "new_definition_type" assert metadata.clone(definition_type="new_definition_type").definition_type == "new_definition_type"
assert metadata.clone(desc="new_desc").desc == "new_desc" assert metadata.clone(desc="new_desc").desc == "new_desc"
assert metadata.clone(props="new_props").props == "new_props" assert metadata.clone(props="new_props").props == "new_props"
assert metadata.clone(variables="new_variables").variables == "new_variables" assert metadata.clone(variables="new_variables").variables == "new_variables"
assert metadata.clone(parameters="new_parameters").parameters == "new_parameters" assert metadata.clone(parameters="new_parameters").parameters == "new_parameters"
assert metadata.clone(bound_body="new_bound_body").bound_body == "new_bound_body" assert metadata.clone(bound_body="new_bound_body").bound_body == "new_bound_body"
assert metadata.clone(is_builtin="new_is_builtin").is_builtin == "new_is_builtin" assert metadata.clone(is_builtin="new_is_builtin").is_builtin == "new_is_builtin"
assert metadata.clone(is_unique="new_is_unique").is_unique == "new_is_unique" assert metadata.clone(is_unique="new_is_unique").is_unique == "new_is_unique"
assert metadata.clone(autouse="new_autouse").autouse == "new_autouse" assert metadata.clone(autouse="new_autouse").autouse == "new_autouse"
assert metadata.clone(digest="new_digest").digest == "new_digest" assert metadata.clone(digest="new_digest").digest == "new_digest"
assert metadata.clone(all_attrs="new_all_attrs").all_attrs == "new_all_attrs" assert metadata.clone(all_attrs="new_all_attrs").all_attrs == "new_all_attrs"
def test_i_cannot_change_the_id_when_cloning(): def test_i_cannot_change_the_id_when_cloning():
with pytest.raises(TypeError): with pytest.raises(TypeError):
metadata = get_metadata() metadata = get_metadata()
metadata.clone(id="new_id") metadata.clone(id="new_id")
def test_i_can_auto_init(): def test_i_can_auto_init():
next_id = GetNextId() next_id = GetNextId()
metadata = get_metadata("a plus b", body="a + b", variables=["a", "b"]).auto_init(next_id) metadata = get_metadata("a plus b", body="a + b", variables=["a", "b"]).auto_init(next_id)
assert metadata.name == "a plus b" assert metadata.name == "a plus b"
assert metadata.id == "1001" assert metadata.id == "1001"
assert metadata.key == "__var__0 plus __var__1" assert metadata.key == "__var__0 plus __var__1"
assert metadata.all_attrs == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'a', 'b') assert metadata.all_attrs == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'a', 'b')
assert metadata.is_unique is False assert metadata.is_unique is False
assert metadata.is_builtin is False assert metadata.is_builtin is False
assert metadata.definition_type is DefinitionType.DEFAULT assert metadata.definition_type is DefinitionType.DEFAULT
assert metadata.digest == '9e058bc1261d1e2c785889147066ce89960fd6844db5bb6f1d1d809a8eb790b7' assert metadata.digest == '9e058bc1261d1e2c785889147066ce89960fd6844db5bb6f1d1d809a8eb790b7'
def test_sequences_are_incremented_when_multiples_call(): def test_sequences_are_incremented_when_multiples_call():
next_id = GetNextId() next_id = GetNextId()
assert get_metadata("foo").auto_init(next_id).id == "1001" assert get_metadata("foo").auto_init(next_id).id == "1001"
assert get_metadata("bar").auto_init(next_id).id == "1002" assert get_metadata("bar").auto_init(next_id).id == "1002"
def test_i_can_get_multiple_metadatas(): def test_i_can_get_multiple_metadatas():
res = get_metadatas("foo", get_metadata("bar", body="body")) res = get_metadatas("foo", get_metadata("bar", body="body"))
assert len(res) == 2 assert len(res) == 2
metadata = res[0] metadata = res[0]
assert isinstance(metadata, ConceptMetadata) assert isinstance(metadata, ConceptMetadata)
assert metadata.name == "foo" assert metadata.name == "foo"
assert metadata.body is None assert metadata.body is None
assert metadata.key is None assert metadata.key is None
assert metadata.id is None assert metadata.id is None
metadata = res[1] metadata = res[1]
assert isinstance(metadata, ConceptMetadata) assert isinstance(metadata, ConceptMetadata)
assert metadata.name == "bar" assert metadata.name == "bar"
assert metadata.body == "body" assert metadata.body == "body"
assert metadata.key is None assert metadata.key is None
assert metadata.id is None assert metadata.id is None
def test_i_can_get_multiple_already_initialized_metadatas(): def test_i_can_get_multiple_already_initialized_metadatas():
res = get_metadatas("foo", get_metadata("bar", body="body"), next_id=GetNextId()) res = get_metadatas("foo", get_metadata("bar", body="body"), next_id=GetNextId())
assert len(res) == 2 assert len(res) == 2
metadata = res[0] metadata = res[0]
assert isinstance(metadata, ConceptMetadata) assert isinstance(metadata, ConceptMetadata)
assert metadata.name == "foo" assert metadata.name == "foo"
assert metadata.body is None assert metadata.body is None
assert metadata.key == "foo" assert metadata.key == "foo"
assert metadata.id == "1001" assert metadata.id == "1001"
metadata = res[1] metadata = res[1]
assert isinstance(metadata, ConceptMetadata) assert isinstance(metadata, ConceptMetadata)
assert metadata.name == "bar" assert metadata.name == "bar"
assert metadata.body == "body" assert metadata.body == "body"
assert metadata.key == "bar" assert metadata.key == "bar"
assert metadata.id == "1002" assert metadata.id == "1002"
def test_i_can_get_a_concept(): def test_i_can_get_a_concept():
foo = get_concept("foo", variables=("var1",)) foo = get_concept("foo", variables=("var1",))
assert isinstance(foo, Concept) assert isinstance(foo, Concept)
assert foo.name == "foo" assert foo.name == "foo"
assert foo.key is None assert foo.key is None
assert foo.id is None assert foo.id is None
assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1')
def test_i_can_request_basic_initialization_when_getting_a_concept(): def test_i_can_request_basic_initialization_when_getting_a_concept():
next_id = GetNextId() next_id = GetNextId()
foo = get_concept("foo", variables=("var1",), sequence=next_id) foo = get_concept("foo", variables=("var1",), sequence=next_id)
assert foo.name == "foo" assert foo.name == "foo"
assert foo.key == "foo" assert foo.key == "foo"
assert foo.id == "1001" assert foo.id == "1001"
assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1') assert foo.all_attrs() == ('#where#', '#pre#', '#post#', '#body#', '#ret#', 'var1')
def test_i_can_get_multiple_concepts(context): def test_i_can_get_multiple_concepts(context):
next_id = GetNextId() next_id = GetNextId()
foo, bar, baz = get_concepts(context, foo, bar, baz = get_concepts(context,
"foo", "foo",
"bar", "bar",
get_concept("baz", definition="baz var1", variables=("var1",)), get_concept("baz", definition="baz var1", variables=("var1",)),
sequence=next_id) sequence=next_id)
assert foo.name == "foo" assert foo.name == "foo"
assert foo.id == "1001" assert foo.id == "1001"
assert foo.key == "foo" assert foo.key == "foo"
assert bar.name == "bar" assert bar.name == "bar"
assert bar.id == "1002" assert bar.id == "1002"
assert bar.key == "bar" assert bar.key == "bar"
assert baz.name == "baz" assert baz.name == "baz"
assert baz.id == "1003" assert baz.id == "1003"
assert baz.key == "baz __var__0" assert baz.key == "baz __var__0"
def test_i_can_get_multiple_concepts_using_sheerka(sheerka, context): def test_i_can_get_multiple_concepts_using_sheerka(sheerka, context):
foo, bar, baz = get_concepts(context, foo, bar, baz = get_concepts(context,
"foo", "foo",
"bar", "bar",
get_concept("baz", definition="baz var1", variables=("var1",)), get_concept("baz", definition="baz var1", variables=("var1",)),
use_sheerka=True) use_sheerka=True)
assert foo.name == "foo" assert foo.name == "foo"
assert foo.id == "1001" assert foo.id == "1001"
assert foo.key == "foo" assert foo.key == "foo"
assert bar.name == "bar" assert bar.name == "bar"
assert bar.id == "1002" assert bar.id == "1002"
assert bar.key == "bar" assert bar.key == "bar"
assert baz.name == "baz" assert baz.name == "baz"
assert baz.id == "1003" assert baz.id == "1003"
assert baz.key == "baz __var__0" assert baz.key == "baz __var__0"
assert baz.get_value("var1") is NotInit assert baz.get_value("var1") is NotInit
# the concepts are defined in Sheerka, so we can instantiate them # the concepts are defined in Sheerka, so we can instantiate them
baz2 = sheerka.newn("baz", var1="value for var1") baz2 = sheerka.newn("baz", var1="value for var1")
assert baz2.name == "baz" assert baz2.name == "baz"
assert baz2.id == "1003" assert baz2.id == "1003"
assert baz2.key == "baz __var__0" assert baz2.key == "baz __var__0"
assert baz2.get_value("var1") == "value for var1" assert baz2.get_value("var1") == "value for var1"
def test_i_can_get_multiple_concepts_when_same_name(sheerka, context): def test_i_can_get_multiple_concepts_when_same_name(sheerka, context):
one_str, one_int = get_concepts(context, one_str, one_int = get_concepts(context,
get_metadata("one", body="'one'"), get_metadata("one", body="'one'"),
get_metadata("one", body="1"), get_metadata("one", body="1"),
use_sheerka=True) use_sheerka=True)
assert sheerka.isinstance(one_str, "one") assert sheerka.isinstance(one_str, "one")
assert sheerka.isinstance(one_int, "one") assert sheerka.isinstance(one_int, "one")
def test_i_can_create_test_concept(): def test_i_can_create_test_concept():
concept = get_concept("one", body="'one'") concept = get_concept("one", body="'one'")
test_concept = get_evaluated_concept(concept, body='hello', a="value for a") test_concept = get_evaluated_concept(concept, body='hello', a="value for a")
assert test_concept.get_metadata() == concept.get_metadata() assert test_concept.get_metadata() == concept.get_metadata()
assert test_concept.get_value(ConceptDefaultProps.BODY) == "hello" assert test_concept.get_value(ConceptDefaultProps.BODY) == "hello"
assert test_concept.get_value("a") == "value for a" assert test_concept.get_value("a") == "value for a"
def test_i_can_dummy_evaluate_concept(): def test_i_can_dummy_evaluate_concept():
concept = get_concept("one", body="'one'", where="True", pre="False", ret="1", post="1.0") concept = get_concept("one", body="'one'", where="True", pre="False", ret="1", post="1.0")
evaluated = get_evaluated_concept(concept) evaluated = get_evaluated_concept(concept)
assert evaluated.get_value(ConceptDefaultProps.WHERE) is True assert evaluated.get_value(ConceptDefaultProps.WHERE) is True
assert evaluated.get_value(ConceptDefaultProps.PRE) is False assert evaluated.get_value(ConceptDefaultProps.PRE) is False
assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" assert evaluated.get_value(ConceptDefaultProps.BODY) == "one"
assert evaluated.get_value(ConceptDefaultProps.RET) == 1 assert evaluated.get_value(ConceptDefaultProps.RET) == 1
assert evaluated.get_value(ConceptDefaultProps.POST) == 1.0 assert evaluated.get_value(ConceptDefaultProps.POST) == 1.0
concept = get_concept("one", body='"one"', ret="'a value'") concept = get_concept("one", body='"one"', ret="'a value'")
evaluated = get_evaluated_concept(concept, ret='forced value') evaluated = get_evaluated_concept(concept, ret='forced value')
assert evaluated.get_value(ConceptDefaultProps.WHERE) == NotInit assert evaluated.get_value(ConceptDefaultProps.WHERE) == NotInit
assert evaluated.get_value(ConceptDefaultProps.PRE) == NotInit assert evaluated.get_value(ConceptDefaultProps.PRE) == NotInit
assert evaluated.get_value(ConceptDefaultProps.BODY) == "one" assert evaluated.get_value(ConceptDefaultProps.BODY) == "one"
assert evaluated.get_value(ConceptDefaultProps.RET) == "forced value" assert evaluated.get_value(ConceptDefaultProps.RET) == "forced value"
assert evaluated.get_value(ConceptDefaultProps.POST) == NotInit assert evaluated.get_value(ConceptDefaultProps.POST) == NotInit
def test_i_can_get_from(): def test_i_can_get_from():
res = get_from(_mt("c:i am a concept#1001:")) res = get_from(_mt("c:i am a concept#1001:"))
assert res == [_mt("1001", 0, 6)] assert res == [_mt("1001", 0, 6)]
res = get_from(_ut("some unrecognized stuff")) res = get_from(_ut("some unrecognized stuff"))
assert res == [_ut("some unrecognized stuff", 0, 4)] assert res == [_ut("some unrecognized stuff", 0, 4)]
res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff")) res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff"))
assert res == [_mt("1001", 0, 6), _ut("some unrecognized stuff", 7, 11)] assert res == [_mt("1001", 0, 6), _ut("some unrecognized stuff", 7, 11)]
res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff"), parser="other") res = get_from(_mt("c:i am a concept#1001:"), _ut("some unrecognized stuff"), parser="other")
assert res == [_mt("1001", 0, 6, parser="other"), _ut("some unrecognized stuff", 7, 11)] assert res == [_mt("1001", 0, 6, parser="other"), _ut("some unrecognized stuff", 7, 11)]
res = get_from(_mt("c:i am a concept#1001:"), _mt("c:#1001:")) res = get_from(_mt("c:i am a concept#1001:"), _mt("c:#1001:"))
assert res == [_mt("1001", 0, 6), _mt("1001", 7, 13)] assert res == [_mt("1001", 0, 6), _mt("1001", 7, 13)]