fixed #18 : I can evaluate concept

This commit is contained in:
2023-06-01 22:08:34 +02:00
parent 09a0246420
commit 62391f786e
25 changed files with 1503 additions and 314 deletions
+110 -3
View File
@@ -1,9 +1,42 @@
import ast
from _ast import BoolOp
from dataclasses import dataclass
from typing import Any
from caching.FastCache import FastCache
from common.global_symbols import NotFound
class NamesInExpressionVisitor(ast.NodeVisitor):
def __init__(self):
self._names = set()
def get_names(self, node):
self.visit(node)
return self._names
def visit_Name(self, node):
self._names.add(node.id)
def visit_Call(self, node: ast.Call):
self.visit_selected(node, ["args", "keywords"])
def visit_For(self, node: ast.For):
self.visit_selected(node, ["body", "orelse"])
def visit_selected(self, node, to_visit):
"""Called if no explicit visitor function exists for a node."""
for field in to_visit:
value = getattr(node, field)
if isinstance(value, list):
for item in value:
if isinstance(item, ast.AST):
self.visit(item)
elif isinstance(value, ast.AST):
self.visit(value)
class UnreferencedNamesVisitor(ast.NodeVisitor):
"""
Try to find symbols that will be requested by the ast
@@ -12,8 +45,7 @@ class UnreferencedNamesVisitor(ast.NodeVisitor):
cache = FastCache()
def __init__(self, context):
self.context = context
def __init__(self):
self.names = set()
def get_names(self, node):
@@ -46,7 +78,7 @@ class UnreferencedNamesVisitor(ast.NodeVisitor):
class UnreferencedVariablesVisitor(UnreferencedNamesVisitor):
"""
Try to find variables names that will be requested by the ast
Try to find names that will be requested by the ast in module
This visitor do not yield function names
"""
@@ -54,6 +86,14 @@ class UnreferencedVariablesVisitor(UnreferencedNamesVisitor):
self.visit_selected(node, ["args", "keywords"])
def visit_keyword(self, node: ast.keyword):
"""
Keyword are parameters that are defined with a double star (**) in function / method definition
ex: def fun(positional, *args, **keywords)
:param node:
:type node:
:return:
:rtype:
"""
self.names.add(node.arg)
self.visit_selected(node, ["value"])
@@ -104,3 +144,70 @@ class NamesWithAttributesVisitor(ast.NodeVisitor):
self.temp.reverse()
self.sequences.append(self.temp.copy())
self.temp.clear()
class WhereConstraintVisitor(ast.NodeVisitor):
"""
Parse an expression (undefined behaviour for module)
in order to look for expr
ex :
>>> exp = "isinstance(x, Concept) and isinstance(y, Concept)"
>>> ast_tree = ast.parse(exp, "<user input>", 'eval')
>>> visitor = WhereConstraintVisitor(ast_tree)
>>> assert visitor.get_constraints(exp) == {"x" : WhereConstraint("isinstance(x, Concept)"),
>>> "y" : WhereConstraint("isinstance(y, Concept)")}
"""
@dataclass
class WhereConstraint:
source_code: str
ast_tree: object = None
def __repr__(self):
return f"WhereConstraint({self.source_code})"
def __eq__(self, other):
if not isinstance(other, WhereConstraintVisitor.WhereConstraint):
return False
return self.source_code == other.source_code
def __hash__(self):
return hash(self.source_code)
def __init__(self, ast_tree):
self.constraints = {}
self.ast_tree = ast_tree
def get_constraints(self):
if self.ast_tree is None:
return self.constraints
self.visit(self.ast_tree)
if self.constraints == {}:
self.create_constraint(self.ast_tree)
return self.constraints
def visit_BoolOp(self, node: BoolOp) -> Any:
if node.op.__class__.__name__ != "And": # failed to properly compare the type !
raise NotImplementedError("Cannot manage other than 'and' expression")
for exp in node.values:
self.create_constraint(exp)
def create_constraint(self, node):
source_code = ast.unparse(node)
if not isinstance(node, ast.Expression):
node.lineno = 0
node.col_offset = 1
node_to_use = ast.Expression(node, lineno=0, col_offset=1)
else:
node_to_use = node
constraint = WhereConstraintVisitor.WhereConstraint(source_code, node_to_use)
name_visitor = NamesInExpressionVisitor()
names = name_visitor.get_names(node_to_use)
for name in names:
self.constraints.setdefault(name, []).append(constraint)
+9 -5
View File
@@ -1,8 +1,12 @@
class BuiltinConcepts:
SHEERKA = "__SHEERKA"
NEW_CONCEPT = "__NEW_CONCEPT"
UNKNOWN_CONCEPT = "__UNKNOWN_CONCEPT"
USER_INPUT = "__USER_INPUT"
PARSER_INPUT = "__PARSER_INPUT"
PYTHON_CODE = "__PYTHON_CODE"
NEW_CONCEPT = "__NEW_CONCEPT" # when the definition of a new concept is added
UNKNOWN_CONCEPT = "__UNKNOWN_CONCEPT" # Failed to find the requested concept
USER_INPUT = "__USER_INPUT" # user command
PARSER_INPUT = "__PARSER_INPUT" # command that will be parsed
PYTHON_CODE = "__PYTHON_CODE" # command that is parsed
INVALID_CONCEPT = "__INVALID_CONCEPT" # failed to parse concept attributes
EVALUATION_ERROR = "__EVALUATION_ERROR" # failed to evaluate concept
+4 -5
View File
@@ -18,11 +18,13 @@ class ContextActions:
EVALUATION = "Evaluation"
AFTER_EVALUATION = "After Evaluation"
EVALUATING_PYTHON = "Evaluating python"
EVALUATING_CONCEPT = "Evaluating concept"
BUILD_CONCEPT = "Building all attributes"
BUILD_CONCEPT_ATTR = "Building one attribute"
BUILD_CONCEPT_ATTR = "Building an attribute"
EVAL_CONCEPT = "Evaluating all attributes"
EVAL_CONCEPT_ATTR = "Evaluating one attribute"
EVAL_CONCEPT_ATTR = "Evaluating an attribute"
class ContextHint:
@@ -204,9 +206,6 @@ class ExecutionContext:
hint in self.global_hints or \
hint in self.private_hints
def get_from_short_term_memory(self, key):
return self.sheerka.get_from_short_term_memory(self, key)
def log(self, message: str, who: str = None):
"""Send debug information to logger"""
pass
+13 -1
View File
@@ -11,7 +11,7 @@ from caching.IncCache import IncCache
from common.utils import get_logger_name, get_sub_classes, import_module_and_sub_module
from core.BuiltinConcepts import BuiltinConcepts
from core.Event import Event
from core.ExecutionContext import ContextHint, ExecutionContext, ContextActions
from core.ExecutionContext import ContextActions, ContextHint, ExecutionContext
from core.ReturnValue import ReturnValue
from core.concept import Concept, ConceptMetadata
from core.error import ErrorContext
@@ -332,6 +332,18 @@ class Sheerka:
return a.key == b
def objvalue(self, obj):
if not isinstance(obj, Concept):
return obj
if obj.get_runtime_info().error is not None:
return obj.get_runtime_info().error
if not obj.get_runtime_info().is_evaluated:
return obj
return self.objvalue(obj.body) if isinstance(obj.body, Concept) else obj.body
def echo(self, msg):
"""
test function
+9 -10
View File
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any
from common.global_symbols import NotFound, NotInit
@@ -60,13 +61,13 @@ class ConceptRuntimeInfo:
They are related to the instance of the concept
"""
is_evaluated: bool = False # True is the concept is evaluated by sheerka.eval_concept()
need_validation: bool = True # True if the properties of the concept need to be validated
recognized_by: str = None # RECOGNIZED_BY_ID, RECOGNIZED_BY_NAME, RECOGNIZED_BY_KEY (from Sheerka.py)
error: Any = None # when failed to evaluate the concept
info: dict = field(default_factory=dict) # give context of why 'where' or 'pre' constraints fail
def copy(self):
return ConceptRuntimeInfo(self.is_evaluated,
self.need_validation,
self.recognized_by)
self.error,
self.info)
class Concept:
@@ -79,21 +80,19 @@ class Concept:
def __init__(self, metadata: ConceptMetadata):
self._metadata: ConceptMetadata = metadata
self._compiled = {} # cached ast for the where, pre, post and body parts and variables
self._compiled_context_hints = {} # context hints to use when evaluating compiled
self._bnf = None # compiled bnf expression
self._runtime_info = ConceptRuntimeInfo() # runtime settings for the concept
self._all_attrs = None
def __repr__(self):
text = f"({self._metadata.id}){self._metadata.name}"
text = f"(Concept {self._metadata.name}#{self._metadata.id}"
if self._metadata.pre:
text += f", #pre={self._metadata.pre}"
for attr in [attr for attr in self.all_attrs() if not attr.startswith("#")]:
text += f", {attr}={self.get_value(attr)}"
return text
return text + ")"
def __eq__(self, other):
# I don't want this test to be part of the recursion
@@ -116,7 +115,7 @@ class Concept:
return True
# 1. in order for two concepts to be equal, they must have the same definition
# 2. They must have the same properties and variables
# 2. They must have the same variables
if left.get_definition_digest() != right.get_definition_digest():
return False
+7 -1
View File
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from common.utils import compute_hash
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ExecutionContext
@@ -23,7 +24,7 @@ class ErrorObj:
pass
class ErrorContext:
class ErrorContext(ErrorObj):
"""
This class represents the result of a data flow processing
"""
@@ -63,3 +64,8 @@ class ErrorContext:
temp.append(repr(value))
return ", ".join(temp)
ErrorConcepts = {BuiltinConcepts.UNKNOWN_CONCEPT,
BuiltinConcepts.EVALUATION_ERROR,
BuiltinConcepts.INVALID_CONCEPT}
+1 -1
View File
@@ -14,7 +14,7 @@ class PythonFragment:
def __repr__(self):
ast_type = "expr" if isinstance(self.ast_tree, ast.Expression) else "module"
return "PythonNode(" + ast_type + "='" + self.source_code + "')"
return f"PythonFragment({ast_type}='{self.source_code}', namespace={tuple(self.namespace.keys())})"
def __eq__(self, other):
if not isinstance(other, PythonFragment):
+2 -1
View File
@@ -3,6 +3,7 @@ from core.ExecutionContext import ExecutionContext, ContextActions
from core.ReturnValue import ReturnValue
from core.error import ErrorContext
from evaluators.base_evaluator import EvaluatorEvalResult, EvaluatorMatchResult, OneReturnValueEvaluator
from services.SheerkaPython import EvaluationContext
class PythonEvaluator(OneReturnValueEvaluator):
@@ -22,7 +23,7 @@ class PythonEvaluator(OneReturnValueEvaluator):
sheerka = context.sheerka
fragment = return_value.value.pf
evaluated = sheerka.evaluate_python(context, fragment)
evaluated = sheerka.evaluate_python(context, EvaluationContext(), fragment)
if isinstance(evaluated, ErrorContext):
return EvaluatorEvalResult([ReturnValue(self.name, False, evaluated, parents=[return_value])],
[])
+3 -3
View File
@@ -3,7 +3,7 @@ from dataclasses import dataclass
from common.utils import encode_concept
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ExecutionContext, ContextActions
from core.ExecutionContext import ContextActions, ExecutionContext
from core.ReturnValue import ReturnValue
from core.error import ErrorContext, ErrorObj
from core.python_fragment import PythonFragment
@@ -50,10 +50,10 @@ class PythonParser(OneReturnValueEvaluator):
source_code = parser_input.as_text(python_switcher, tracker).lstrip() # right side spaces must be kept
try:
ast_tree = ast.parse(source_code, f"<user input>", 'eval')
ast_tree = ast.parse(source_code, "<user input>", 'eval')
except:
try:
ast_tree = ast.parse(source_code, f"<user input>", 'exec')
ast_tree = ast.parse(source_code, "<user input>", 'exec')
except Exception as error:
error_context = ErrorContext(self.NAME, context, PythonErrorNode(parser_input.as_text(), error))
error_ret_val = ReturnValue(self.NAME, False, error_context, [return_value])
+2
View File
@@ -1,6 +1,7 @@
from caching.Cache import Cache
from caching.CacheManager import CacheManager
from caching.DictionaryCache import DictionaryCache
from caching.FastCache import FastCache
from caching.SetCache import SetCache
from common.global_symbols import EVENT_CONCEPT_ID_DELETED, \
EVENT_RULE_ID_DELETED, NotFound, \
@@ -65,6 +66,7 @@ class Ontology:
self.cache_manager = cache_manager
self.alt_sdp = alt_sdp
self.concepts_attributes = None
self.fast_cache = FastCache()
def __repr__(self):
return f"Ontology('{self.name}')"
+234 -22
View File
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from caching.FastCache import FastCache
from common.ast_utils import WhereConstraintVisitor
from common.global_symbols import CustomType, NotFound, NotInit
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ContextActions, ExecutionContext
from core.ReturnValue import ReturnValue
@@ -8,20 +10,24 @@ from core.concept import Concept, ConceptDefaultProps, ConceptDefaultPropsAttrs,
from core.error import ErrorObj, SheerkaException
from core.python_fragment import PythonFragment
from services.BaseService import BaseService
from services.SheerkaPython import EvaluationRef
from services.SheerkaPython import EvalMethod, EvaluationContext, EvaluationRef, MultipleResults
PARSING_STEPS = [
ContextActions.BEFORE_PARSING,
ContextActions.PARSING,
]
CONDITIONAL_ATTR = [ConceptDefaultProps.WHERE, ConceptDefaultProps.PRE]
class ConceptCompiled:
"""
Container for all PythonFragment
attribute will be accessed by setattr() and getattr()
"""
pass
def __init__(self):
self.errors = {}
@dataclass
@@ -29,6 +35,40 @@ class ConceptEvaluationHints:
force_evaluation: bool = False
@dataclass
class PredicateIsFalse(ErrorObj):
"""
This error class is issued when a 'pre' or 'where' constraint fails
"""
attr: str = None
predicate: str = None
namespace: dict = None
def get_error_msg(self) -> str:
return f"Failed to match condition '{self.predicate}' with namespace {self.namespace}."
@dataclass
class InfiniteRecursion(ErrorObj):
"""
This error class is issued when an infinite recursion is detected during evaluation
"""
ids: list
@dataclass
class PredicateIsTrue:
"""
Information class to trace that a 'pre' or 'where' constraint passes
"""
attr: str = None
predicate: str = None
namespace: dict = None
def __repr__(self):
return f"(PredicateIsTrue predicate='{self.predicate}', namespace={self.namespace}"
class ConceptEvaluator(BaseService):
"""
The service is used to evaluate a concept
@@ -39,29 +79,56 @@ class ConceptEvaluator(BaseService):
def __init__(self, sheerka):
super().__init__(sheerka)
self.compiled_cache = FastCache()
self.where_constraints_cache = FastCache(default=None)
def initialize(self):
self.sheerka.bind_service_method(self.NAME, self.evaluate_concept, True)
def evaluate_concept(self, context: ExecutionContext,
concept: Concept,
hints: ConceptEvaluationHints = None) -> Concept:
hints = hints or ConceptEvaluationHints()
hints: ConceptEvaluationHints = None):
context.log(f"Evaluating concept '{concept}'")
with context.push(self.NAME, ContextActions.EVALUATING_CONCEPT, {"concept": concept}) as sub_context:
hints = hints or ConceptEvaluationHints()
# if the concept is already evaluated, no need to do it again
if not hints.force_evaluation and concept.get_runtime_info().is_evaluated:
return concept
# try to detect infinite recursion
if ids := self._detect_recursion(context, concept.id):
error = InfiniteRecursion(ids=ids)
concept.get_runtime_info().error = error
concept.get_runtime_info().is_evaluated = True
return context.sheerka.newn(BuiltinConcepts.EVALUATION_ERROR, concept=concept, reason=error)
with context.push(self.NAME, ContextActions.EVALUATING_CONCEPT, {"concept": concept}) as sub_context:
# build the python fragments, if needed
if concept.get_definition_digest() not in self.compiled_cache:
compiled = self.build(sub_context, concept.get_metadata())
# build compiled code if not done yet
compiled = self._build_attributes(sub_context, concept.get_metadata())
self.compiled_cache.put(concept.get_definition_digest(), compiled)
self.inner_eval_concept(context, concept)
return concept
# build the where constraints
if ((where_pf := getattr(compiled, ConceptDefaultProps.WHERE)) is not None
and isinstance(where_pf, PythonFragment)):
where_constraints = self._build_where_constraints(where_pf.ast_tree)
self.where_constraints_cache.put(concept.get_definition_digest(), where_constraints)
def build(self, context: ExecutionContext, metadata: ConceptMetadata):
# eval variables and attributes
return self._evaluate_attributes(sub_context, concept)
def _build_attributes(self, context: ExecutionContext, metadata: ConceptMetadata):
"""
get the compiled code var all variables and concept attributes
:param context:
:type context:
:param metadata:
:type metadata:
:return:
:rtype:
"""
sheerka = context.sheerka
action_context = {ConceptDefaultProps.WHERE: metadata.where,
ConceptDefaultProps.PRE: metadata.pre,
@@ -69,7 +136,7 @@ class ConceptEvaluator(BaseService):
ConceptDefaultProps.POST: metadata.post,
ConceptDefaultProps.RET: metadata.ret}
for k, v in metadata.variables:
action_context[k] = v
action_context[k] = self._get_source_code(v)
compiled = ConceptCompiled()
with context.push(self.NAME, ContextActions.BUILD_CONCEPT, {"metadata": action_context}) as sub_context:
@@ -89,10 +156,13 @@ class ConceptEvaluator(BaseService):
ret = sheerka.execute(attr_context, [start], PARSING_STEPS)
attr_context.add_values(return_values=ret)
# TODO : manage when the parsing fails
value = ret[0].value
if isinstance(value, ErrorObj):
setattr(compiled, attr, value)
compiled.errors[attr] = value.get_error_msg()
else:
# Add reference to internal variables
python_fragment = ret[0].value.pf
python_fragment = value.pf
for k, v in metadata.variables:
python_fragment.namespace[k] = EvaluationRef("self", k)
@@ -100,27 +170,153 @@ class ConceptEvaluator(BaseService):
return compiled
def inner_eval_concept(self, context, concept):
def _get_where_constraints(self, concept, attr):
constraints = self.where_constraints_cache.get(concept.get_definition_digest())
if constraints is NotFound:
return None
return getattr(constraints, attr) if hasattr(constraints, attr) else None
def _evaluate_attributes(self, context: ExecutionContext, concept: Concept):
"""
Evaluate the attributes, in the correct order
:param context:
:type context:
:param concept:
:type concept:
:return:
:rtype:
"""
sheerka = context.sheerka
compiled = self.compiled_cache.get(concept.get_definition_digest())
# no need to evaluate if an error was found during the parsing
if len(compiled.errors) > 0:
invalid = sheerka.newn(BuiltinConcepts.INVALID_CONCEPT, concept_id=concept.id, reason=compiled.errors)
concept.get_runtime_info().error = invalid
concept.get_runtime_info().is_evaluated = True
return invalid
# evaluate every attribute and variable
compiled_debug = self._get_compiled_debug(compiled)
action_debug = {"concept": concept, "compiled": compiled_debug}
with context.push(self.NAME, ContextActions.EVAL_CONCEPT, action_debug) as sub_context:
attributes = self._get_attributes_to_eval(context, concept)
errors = {}
with context.push(self.NAME, ContextActions.EVAL_CONCEPT, {"compiled": compiled_debug}) as sub_context:
# first evaluate the variables
for attr in attributes:
with context.push(self.NAME, ContextActions.EVAL_CONCEPT_ATTR, {"attr": attr}) as attr_context:
res = sheerka.evaluate_python(sub_context,
getattr(compiled, attr),
compiled_attr = getattr(compiled, attr)
if compiled_attr is None:
concept.set_value(attr, NotInit)
continue
action_debug = {"concept": concept, "attr": attr, "compiled": compiled_debug[attr]}
with sub_context.push(self.NAME, ContextActions.EVAL_CONCEPT_ATTR, action_debug) as attr_context:
eval_method = EvalMethod.All if attr[0] != "#" else EvalMethod.UntilSuccess
eval_context = EvaluationContext(eval_method=eval_method)
res = sheerka.evaluate_python(attr_context,
eval_context,
compiled_attr,
{"self": concept})
# TODO : manage errors
# when there are multiple result, use what was found in the 'where' constraint
# to select the correct value
if (attr_constraints := self._get_where_constraints(concept, attr)) is not None:
res = self._apply_attr_constraints(context, attr_constraints, attr, res)
if isinstance(res, ErrorObj):
errors[attr] = res
concept.set_value(attr, NotInit)
res = NotInit
concept.set_value(attr, res)
# stops if 'where' or 'pre' fails
if attr in CONDITIONAL_ATTR:
if sheerka.objvalue(res) is True:
success = PredicateIsTrue(attr, compiled_attr.source_code)
concept.get_runtime_info().info.setdefault(attr, []).append(success)
else:
error = PredicateIsFalse(attr, compiled_attr.source_code)
errors[attr] = error
break # no need to continue in case of failure
concept.get_runtime_info().is_evaluated = True
if errors:
error_concept = sheerka.newn(BuiltinConcepts.EVALUATION_ERROR, concept=concept, reason=errors)
concept.get_runtime_info().error = errors
return error_concept
elif context.sheerka.isinstance(error_in_body := concept.body, BuiltinConcepts.EVALUATION_ERROR):
# if the body is an 'evaluation_error', it needs to be propagated
concept.get_runtime_info().error = error_in_body.reason
return error_in_body
if (ret := concept.get_value(ConceptDefaultProps.RET)) is NotInit:
return concept
else:
return ret
@staticmethod
def _get_attributes_to_eval(context, concept):
def _detect_recursion(context, current_concept_id):
ids = []
c = context
while True:
c = c.get_parent()
if c is None:
return None
if c.action == ContextActions.EVALUATING_CONCEPT:
concept_id = c.action_context["concept"].id
ids.append(concept_id)
if concept_id == current_concept_id:
break
return list(reversed(ids))
@staticmethod
def _build_where_constraints(ast_tree):
visitor = WhereConstraintVisitor(ast_tree)
result = ConceptCompiled()
for attr, constraints in visitor.get_constraints().items():
python_fragments = [PythonFragment(c.source_code, c.ast_tree) for c in constraints]
setattr(result, attr, python_fragments)
return result
@staticmethod
def _apply_attr_constraints(context, constraints: list, attr: str, res):
original_list = res
items = res.items if isinstance(res, MultipleResults) else [res]
validated = None
for constraint in constraints:
validated = []
for item in items:
res = context.sheerka.evaluate_python(context,
EvaluationContext(expression_only=True),
constraint,
{attr: item})
if res is True:
validated.append(item)
if len(validated) == 0:
# stop the validation if no match was found
break
items = validated
if len(validated) == 1:
return validated[0]
elif len(validated) == 0:
constraints_str = ' and '.join([c.source_code for c in constraints])
return PredicateIsFalse(attr, constraints_str, {attr: original_list})
else:
return MultipleResults(*validated)
@staticmethod
def _get_attributes_to_eval(context, concept: Concept):
res = [v[0] for v in concept.get_metadata().variables]
res += ConceptDefaultPropsAttrs
return res
@@ -138,3 +334,19 @@ class ConceptEvaluator(BaseService):
else:
ret[attr] = repr(value)
return ret
@staticmethod
def _get_source_code(obj):
"""
Use to manage when variable default value is NotInit (or other custom type)
:param obj:
:type obj:
:return:
:rtype:
"""
if isinstance(obj, str):
return obj
if isinstance(obj, CustomType):
return repr(obj)[2:-2]
raise Exception(f"Cannot manage '{obj}' when parsing concept attributes.")
+14 -3
View File
@@ -10,7 +10,7 @@ from common.utils import get_logger_name, unstr_concept
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ExecutionContext
from core.ReturnValue import ReturnValue
from core.concept import Concept, ConceptMetadata, ConceptDefaultPropsAttrs, DefinitionType
from core.concept import Concept, ConceptDefaultPropsAttrs, ConceptMetadata, DefinitionType
from core.error import ErrorContext, SheerkaException
from parsers.tokenizer import TokenKind, Tokenizer, strip_tokens
from services.BaseService import BaseService
@@ -105,6 +105,9 @@ class ConceptManager(BaseService):
_(4, BuiltinConcepts.USER_INPUT, desc="Any external input", variables=("command",))
_(5, BuiltinConcepts.PARSER_INPUT, desc="tokenized input", variables=("pi",))
_(6, BuiltinConcepts.PYTHON_CODE, desc="python code", variables=("pf",)) # pf for PythonFragment
_(7, BuiltinConcepts.INVALID_CONCEPT, desc="invalid concept", variables=("concept_id", "reason"))
_(8, BuiltinConcepts.EVALUATION_ERROR, desc="evaluation error", variables=("concept", "reason"))
self.init_log.debug('%s builtin concepts created',
len(self.sheerka.om.current_cache_manager().concept_caches))
@@ -237,8 +240,11 @@ class ConceptManager(BaseService):
:return:
:rtype:
"""
if isinstance(identifier, ConceptMetadata):
return self._inner_new(identifier, **kwargs)
if isinstance(identifier, (ConceptMetadata, Concept)):
return self._inner_new(identifier.get_metadata(), **kwargs)
if isinstance(identifier, list):
return [self.new(item, **kwargs) for item in identifier]
if (tmp := unstr_concept(identifier)) != (None, None):
# manage c:name#id:
@@ -380,4 +386,9 @@ class ConceptManager(BaseService):
concept = Concept(_metadata_def)
for k, v in kwargs.items():
concept.set_value(k, v)
if kwargs:
# if an attribute is set, the concept is considered as evaluated
concept.get_runtime_info().is_evaluated = True
return concept
+4 -47
View File
@@ -1,8 +1,5 @@
from typing import Any
from caching.FastCache import FastCache
from common.global_symbols import NotFound
from core.ExecutionContext import ExecutionContext
from services.BaseService import BaseService
@@ -21,53 +18,13 @@ class SheerkaMemory(BaseService):
def __init__(self, sheerka):
super().__init__(sheerka, order=13)
self.short_term_objects = FastCache()
def initialize(self):
self.sheerka.bind_service_method(self.NAME, self.get_from_short_term_memory, False, visible=False)
self.sheerka.bind_service_method(self.NAME, self.add_to_short_term_memory, True, visible=False)
self.sheerka.bind_service_method(self.NAME, self.list_short_term_memory, False, visible=False)
def get_from_short_term_memory(self, context: ExecutionContext | None, key: str) -> Any:
while True:
try:
id_to_use = context.id if context else self.GLOBAL
return self.short_term_objects.cache[id_to_use][key]
except KeyError:
if context is None:
return NotFound
def get_from_short_term_memory(self, key: str) -> Any:
return self.sheerka.om.current_ontology().fast_cache.get(key)
context = context.get_parent()
def add_to_short_term_memory(self, context: ExecutionContext | None, key: str, value: Any):
if context:
context.stm = True
id_to_use = context.id
else:
id_to_use = SheerkaMemory.GLOBAL
if id_to_use in self.short_term_objects.cache:
self.short_term_objects.cache[id_to_use][key] = value
else:
self.short_term_objects.put(id_to_use, {key: value})
def list_short_term_memory(self, context: ExecutionContext | None):
"""
list all short term memory data (stm data)
:param context:
:type context:
:return:
:rtype:
"""
res = self.short_term_objects.cache[self.GLOBAL].copy() if self.GLOBAL in self.short_term_objects.cache else {}
if context is None:
return res
contexts = [context] + list(context.get_parents())
for ec in reversed(contexts):
try:
res.update(self.short_term_objects.cache[ec.id])
except KeyError:
pass
return res
def add_to_short_term_memory(self, key: str, value: Any):
self.sheerka.om.current_ontology().fast_cache.put(key, value)
+198 -53
View File
@@ -7,10 +7,11 @@ from common.ast_utils import NamesWithAttributesVisitor, UnreferencedNamesVisito
from common.global_symbols import NoFirstToken, NotFound, NotInit, Removed
from common.utils import dict_product
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ContextHint, ExecutionContext
from core.ExecutionContext import ContextActions, ContextHint, ExecutionContext
from core.concept import Concept
from core.error import ErrorContext, ErrorObj, MethodAccessError
from core.error import ErrorConcepts, ErrorContext, ErrorObj, MethodAccessError
from core.python_fragment import PythonFragment
from parsers.tokenizer import Token, TokenKind
from services.BaseService import BaseService
TO_DISABLED = ["breakpoint", "callable", "compile", "delattr", "eval", "exec", "exit", "input", "locals", "open",
@@ -111,6 +112,48 @@ class EvaluationRef:
return hash((self.root, self.attr))
class EvalMethod:
UntilSuccess = "EvalUntilSuccess"
UntilTrue = "EvalUntilTrue"
All = "EvalAll"
@dataclass
class EvaluationContext:
expression_only: bool = False # methods with side effect are not allowed
eval_method: EvalMethod = EvalMethod.UntilSuccess
class MultipleResults:
def __init__(self, *args):
self.items = args
def __iter__(self):
return iter(self.items)
def __repr__(self):
return f"MultipleResults({', '.join([repr(item) for item in self.items])})"
def __eq__(self, other):
if not isinstance(other, MultipleResults):
return False
if len(other.items) != len(self.items):
return False
for _self, _other in zip(self.items, other.items):
if _self != _other:
return False
return True
def __hash__(self):
return hash(tuple(self.items))
def concepts_only(self):
return MultipleResults(*[item for item in self.items if isinstance(item, Concept)])
class SheerkaPython(BaseService):
"""
This service manage evaluation of python fragments
@@ -123,59 +166,87 @@ class SheerkaPython(BaseService):
def initialize(self):
self.sheerka.bind_service_method(self.NAME, self.evaluate_python, False, visible=False)
def evaluate_python(self, context: ExecutionContext, fragment: PythonFragment, global_namespace=None):
def evaluate_python(self, context: ExecutionContext,
eval_context: EvaluationContext,
fragment: PythonFragment,
global_namespace=None):
sheerka = context.sheerka
expression_only = False
global_namespace = global_namespace or {}
context.log(f"Evaluating python fragment {fragment.source_code}'")
with context.push(self.NAME, ContextActions.EVALUATING_PYTHON, {"fragment": fragment}) as sub_context:
sub_context.add_inputs(eval_context=eval_context, global_namespace=global_namespace)
try:
my_globals = self.get_globals(context, fragment, global_namespace, expression_only)
my_namespace = self.get_globals(sub_context, fragment, global_namespace, eval_context.expression_only)
except MethodAccessError as ex:
if context.in_context(ContextHint.EXPRESSION_ONLY_REQUESTED):
if sub_context.in_context(ContextHint.EXPRESSION_ONLY_REQUESTED):
# Quick and dirty,
# When expression_only, it's normal to have some NameError exceptions
error = ErrorContext(self.NAME, context, ex)
error = ErrorContext(self.NAME, sub_context, ex)
else:
eval_error = PythonEvalError(ex, fragment.source_code, traceback.format_exc(), None)
error = ErrorContext(self.NAME, context, eval_error)
error = ErrorContext(self.NAME, sub_context, eval_error)
return error
all_possible_globals = self.get_all_possible_globals(context, my_globals)
expect_success = True
concepts_entries = None
errors = []
evaluated = ReservedNotInit
evaluated = []
my_locals = None
all_results = []
for globals_ in all_possible_globals:
all_possible_namespaces = self.get_all_possible_namespaces(sub_context, my_namespace)
for globals_ in all_possible_namespaces:
try:
# eval
tmp_locals = {}
evaluated = self.evaluate_ast(fragment, globals_, tmp_locals)
res = self.evaluate_ast(fragment, globals_, tmp_locals)
all_results.append(res)
my_locals = tmp_locals
if not expect_success or evaluated:
# in this first version, we stop once a success is found
# it may not be the best result !
if isinstance(res, ErrorObj):
errors.append(res)
else:
evaluated.append(res)
if eval_context.eval_method == EvalMethod.UntilSuccess:
break
if res is True and eval_context.eval_method == EvalMethod.UntilTrue:
break
except Exception as ex:
if concepts_entries is None:
# I don't want to init it if no error is raised
concepts_entries = self.get_concepts_entries_from_globals(my_globals)
concepts_entries = self.get_concepts_entries_from_globals(my_namespace)
eval_error = PythonEvalError(ex,
fragment.source_code,
traceback.format_exc(),
self.get_concepts_values_from_globals(globals_, concepts_entries))
self.get_concepts_values_from_globals(concepts_entries, globals_))
all_results.append(eval_error)
errors.append(eval_error)
# add local namespace to stm
if my_locals:
for k, v in my_locals.items():
sheerka.add_to_short_term_memory(context, k, v)
sheerka.add_to_short_term_memory(k, v)
return ErrorContext(self.NAME, context, errors) if evaluated == ReservedNotInit else evaluated
sub_context.add_values(all_results=all_results,
evaluated=evaluated,
errors=errors,
my_locals=my_locals)
if not evaluated:
# when no practical result if found, return error
return ErrorContext(self.NAME, sub_context, errors)
if len(evaluated) == 1:
# Only one result, Yahoo !
return evaluated[0]
return evaluated[-1] is True if eval_context.eval_method == EvalMethod.UntilTrue \
else MultipleResults(*evaluated)
def get_globals(self, context, fragment, global_namespace, expression_only):
"""
@@ -188,41 +259,32 @@ class SheerkaPython(BaseService):
:param expression_only:
:return:
"""
unreferenced_names_visitor = UnreferencedNamesVisitor(context)
unreferenced_names_visitor = UnreferencedNamesVisitor()
names = unreferenced_names_visitor.get_names(fragment.ast_tree)
if "sheerka" in names:
sheerka_names = set()
sheerka_requested_names = set()
visitor = NamesWithAttributesVisitor()
for sequence in visitor.get_sequences(fragment.ast_tree, "sheerka"):
if len(sequence) > 1:
sheerka_names.add(sequence[1])
sheerka_requested_names.add(sequence[1])
else:
sheerka_names = None
sheerka_requested_names = None
return self.create_namespace(context,
names, # names to look for
sheerka_names, # sheerka methods
sheerka_requested_names, # sheerka methods
fragment.namespace, # objects from python fragment => local namespace
global_namespace, # global namespace
expression_only)
def get_sheerka_method(self, context, who, name, expression_only):
try:
method = context.sheerka.sheerka_methods[name]
if expression_only and method.has_side_effect:
raise MethodAccessError(name)
else:
method_to_use = self.inject_context(context)(method.method) \
if name in context.sheerka.methods_with_context \
else method.method
return method_to_use
except KeyError:
return None
def get_all_possible_namespaces(self, context, namespace):
by_synonym = self.manage_multiple_choices(namespace)
by_concept_body = self.manage_concepts_with_body(context, by_synonym)
return by_concept_body
def create_namespace(self, context,
names: list,
sheerka_objects: dict | None,
sheerka_names: set | None, # list of requested sheerka method or attr
local_namespace: dict,
global_namespace: dict,
expression_only: bool):
@@ -230,7 +292,7 @@ class SheerkaPython(BaseService):
Create a namespace for the requested names
:param context:
:param names: requested names
:param sheerka_objects: requested sheerka names (ex sheerka.isinstance)
:param sheerka_names: requested sheerka names (ex sheerka.isinstance)
:param local_namespace:
:type local_namespace:
:param global_namespace:
@@ -257,7 +319,7 @@ class SheerkaPython(BaseService):
# support reference to sheerka
if name.lower() == "sheerka":
bag = {}
for sheerka_name in sheerka_objects:
for sheerka_name in sheerka_names:
if (method := self.get_sheerka_method(context,
context.who,
sheerka_name,
@@ -267,7 +329,7 @@ class SheerkaPython(BaseService):
continue
# search in short term memory
if (obj := context.get_from_short_term_memory(name)) is not NotFound:
if (obj := context.sheerka.get_from_short_term_memory(name)) is not NotFound:
context.log(f"Resolving '{name}'. Using value found in STM.")
result[name] = obj
continue
@@ -298,19 +360,36 @@ class SheerkaPython(BaseService):
# at last, try to instantiate a new concept
if (metadata := context.sheerka.get_by_name(name)) != NotFound:
context.log(f"Resolving '{name}'. Instantiating new concept.")
result[name] = context.sheerka.new(metadata)
result[name] = self.new_concept(context, metadata)
continue
context.log(f"...'{name}' is not found or cannot be instantiated. Skipping.")
return result
@staticmethod
def resolve_object(context, attr_name, to_resolve, global_namespace):
def resolve_object(self, context, attr_name, to_resolve, global_namespace):
if isinstance(to_resolve, EvaluationRef):
return getattr(global_namespace[to_resolve.root], to_resolve.attr)
if isinstance(to_resolve, Token) and to_resolve.type == TokenKind.CONCEPT:
return self.new_concept(context, to_resolve.value)
raise AttributeError(attr_name)
def get_sheerka_method(self, context, who, name, expression_only):
try:
method = context.sheerka.sheerka_methods[name]
if expression_only and method.has_side_effect:
raise MethodAccessError(name)
else:
method_to_use = self.inject_context(context)(method.method) \
if name in context.sheerka.methods_with_context \
else method.method
return method_to_use
except KeyError:
return None
@staticmethod
def evaluate_ast(fragment, my_globals, my_locals):
compiled = fragment.get_compiled()
@@ -324,7 +403,44 @@ class SheerkaPython(BaseService):
exec(compiled, my_globals, my_locals)
@staticmethod
def get_all_possible_globals(context, my_globals):
def manage_multiple_choices(namespace):
"""
for all entry that contains synonym of concepts, create a new namespace
ex : {"a": 1, "b" : [Concept("foo1"), Concept("foo2")] }
will be transformed into two separate namespaces
{"a": 1, "b" : Concept("foo1") }
and
{"a": 1, "b" : Concept("foo2") }
Note that concepts in error are discarded
:param namespace:
:type namespace:
:return: list of namespaces
:rtype:
"""
# I want to achieve the equivalent of a cartesian product, but with list of dictionaries
synonyms = {}
others = {}
for k, v in namespace.items():
if isinstance(v, MultipleResults):
synonyms[k] = v
else:
others[k] = v
# make the product the rest as cartesian product
res = [others]
for k, concepts in synonyms.items():
res = dict_product(res, [{k: c} for c in concepts
if not isinstance(c, Concept) or
c.name not in ErrorConcepts])
return res
@staticmethod
def manage_concepts_with_body(context, namespaces):
"""
From a dictionary of globals (str, obj)
Creates as many globals as there are combination between a concept and its body
@@ -334,26 +450,37 @@ class SheerkaPython(BaseService):
one with foo: Concept("foo") # we keep the concept as an object
one with foo: 'something' # we substitute its value
:param context:
:param my_globals:
:param namespaces: list of namespaces
:return:
"""
# for each namespace in namespaces,
# I duplicate the namespace if I found a concept which has a value
# I then return a list of the aggregated namespaces
res = []
for namespace in namespaces:
# first pass, get all the non concepts or concepts without a body
# Note that we consider that all concepts are evaluated
# In the future, it may be a good optimisation to defer the evaluation of the body
# until the python evaluation fails
fixed_values = {}
concepts_with_body = {}
for k, v in my_globals.items():
for k, v in namespace.items():
if not isinstance(v, Concept) or not v.get_runtime_info().is_evaluated or v.body is NotInit:
fixed_values[k] = v
else:
concepts_with_body[k] = v
# make the product the rest as cartesian product
res = [fixed_values]
temp_res = [fixed_values]
for k, v in concepts_with_body.items():
res = dict_product(res, [{k: v}, {k: context.sheerka.objvalue(v)}])
temp_res = dict_product(temp_res, [{k: v}, {k: context.sheerka.objvalue(v)}])
res.extend(temp_res)
return res
@@ -377,8 +504,26 @@ class SheerkaPython(BaseService):
@staticmethod
def get_concepts_entries_from_globals(my_globals):
return [k for k, v in my_globals.items() if isinstance(v, Concept)]
"""
Return the name of all concept created
:param my_globals:
:type my_globals:
:return:
:rtype:
"""
return [k for k, v in my_globals.items() if isinstance(v, (Concept, MultipleResults))]
@staticmethod
def get_concepts_values_from_globals(my_globals, names):
def get_concepts_values_from_globals(names, my_globals):
return {name: my_globals[name] for name in names}
@staticmethod
def new_concept(context, identifier):
new_concept = context.sheerka.new(identifier)
if isinstance(new_concept, list):
evaluated = [context.sheerka.evaluate_concept(context, e) for e in new_concept]
return MultipleResults(*evaluated)
else:
evaluated = context.sheerka.evaluate_concept(context, new_concept)
return evaluated
+55 -5
View File
@@ -2,7 +2,8 @@ import ast
import pytest
from common.ast_utils import NamesWithAttributesVisitor, UnreferencedNamesVisitor, UnreferencedVariablesVisitor
from common.ast_utils import NamesWithAttributesVisitor, UnreferencedNamesVisitor, UnreferencedVariablesVisitor, \
WhereConstraintVisitor
@pytest.mark.parametrize("source, expected", [
@@ -15,9 +16,9 @@ from common.ast_utils import NamesWithAttributesVisitor, UnreferencedNamesVisito
("func(x=a, y=b)", {"func", "a", "b"}),
])
def test_i_can_get_unreferenced_names_from_simple_expressions(context, source, expected):
def test_i_can_get_unreferenced_names_from_simple_expressions(source, expected):
ast_ = ast.parse(source)
visitor = UnreferencedNamesVisitor(context)
visitor = UnreferencedNamesVisitor()
visitor.visit(ast_)
assert visitor.names == expected
@@ -42,9 +43,58 @@ def test_name_with_attribute():
("for i in range(10): pass", set()),
("func(x=a, y=b)", {"a", "b", "x", "y"}),
])
def test_i_can_get_unreferenced_variables_from_simple_expressions(context, source, expected):
def test_i_can_get_unreferenced_variables_from_simple_expressions(source, expected):
ast_ = ast.parse(source)
visitor = UnreferencedVariablesVisitor(context)
visitor = UnreferencedVariablesVisitor()
visitor.visit(ast_)
assert visitor.names == expected
def test_i_can_get_where_constraints():
expr = "a == 2"
ast_tree = ast.parse(expr, f"<user input>", 'eval')
visitor = WhereConstraintVisitor(ast_tree)
res = visitor.get_constraints()
assert res == {"a": [WhereConstraintVisitor.WhereConstraint("a == 2")]}
assert isinstance(res["a"][0].ast_tree, ast.Expression)
def test_i_can_get_where_constraints_when_and():
expr = "a == 2 and isinstance(b, Concept)"
ast_tree = ast.parse(expr, f"<user input>", 'eval')
visitor = WhereConstraintVisitor(ast_tree)
res = visitor.get_constraints()
assert res == {"a": [WhereConstraintVisitor.WhereConstraint("a == 2")],
"b": [WhereConstraintVisitor.WhereConstraint("isinstance(b, Concept)")],
"Concept": [WhereConstraintVisitor.WhereConstraint("isinstance(b, Concept)")]}
assert isinstance(res["a"][0].ast_tree, ast.Expression)
assert isinstance(res["b"][0].ast_tree, ast.Expression)
assert isinstance(res["Concept"][0].ast_tree, ast.Expression)
def test_i_can_get_where_constraint_when_name_is_reference_several_times():
expr = "isinstance(a, int) and a == 2"
ast_tree = ast.parse(expr, f"<user input>", 'eval')
visitor = WhereConstraintVisitor(ast_tree)
res = visitor.get_constraints()
assert res == {"a": [WhereConstraintVisitor.WhereConstraint("isinstance(a, int)"),
WhereConstraintVisitor.WhereConstraint("a == 2")],
"int": [WhereConstraintVisitor.WhereConstraint("isinstance(a, int)")]}
assert isinstance(res["a"][0].ast_tree, ast.Expression)
assert isinstance(res["a"][1].ast_tree, ast.Expression)
assert isinstance(res["int"][0].ast_tree, ast.Expression)
def test_i_cannot_get_constraint_when_or():
expr = "isinstance(a, int) or a == 2"
ast_tree = ast.parse(expr, f"<user input>", 'eval')
visitor = WhereConstraintVisitor(ast_tree)
with pytest.raises(NotImplementedError):
visitor.get_constraints()
+3 -3
View File
@@ -95,10 +95,10 @@ def test_i_cannot_get_an_attribute_which_is_not_defined():
def test_i_can_repr_a_concept():
next_id = GetNextId()
foo = get_concept("foo", sequence=next_id)
assert repr(foo) == "(1001)foo"
assert repr(foo) == "(Concept foo#1001)"
bar = get_concept("bar", pre="is an int", sequence=next_id)
assert repr(bar) == "(1002)bar, #pre=is an int"
assert repr(bar) == "(Concept bar#1002, #pre=is an int)"
baz = get_concept("baz", definition="add a b", variables=["a", "b"], sequence=next_id)
assert repr(baz) == "(1003)baz, a=**NotInit**, b=**NotInit**"
assert repr(baz) == "(Concept baz#1003, a=**NotInit**, b=**NotInit**)"
+20
View File
@@ -1,6 +1,9 @@
from os import path
import pytest
from base import UsingFileBasedSheerka
from core.concept import ConceptDefaultProps
from helpers import get_concept, get_concepts, get_file_content
@@ -34,3 +37,20 @@ class TestSheerka(UsingFileBasedSheerka):
assert not sheerka.isinstance(foo, bar.str_id)
assert not sheerka.isinstance(foo, bar)
assert not sheerka.isinstance(foo, bar.get_metadata())
@pytest.mark.parametrize("obj, expected", [
(None, None),
(1, 1),
])
def test_i_can_get_obj_value(self, sheerka, context, obj, expected):
assert sheerka.objvalue(obj) == expected
def test_i_can_get_obj_value_for_concept(self, sheerka, context):
foo = get_concept("foo")
bar = get_concept("bar")
bar.set_value(ConceptDefaultProps.BODY, 1)
foo.set_value(ConceptDefaultProps.BODY, bar)
foo.get_runtime_info().is_evaluated = True
bar.get_runtime_info().is_evaluated = True
assert sheerka.objvalue(foo) == 1
+1
View File
@@ -40,6 +40,7 @@ class TestPythonEvaluator(BaseTest):
("a=10\na", 10),
])
def test_i_can_evaluate_simple_expression(self, sheerka, context, evaluator, text, expected):
with NewOntology(context, "test_i_can_evaluate_simple_expression"):
start = get_parser_input_from(sheerka, context, text)
ret = evaluator.eval(context, None, start)
assert ret.eaten == [start]
+29 -2
View File
@@ -1,9 +1,17 @@
from common.global_symbols import NotInit
from core.ExecutionContext import ExecutionContext
from core.ReturnValue import ReturnValue
from core.concept import Concept, ConceptMetadata, DefinitionType
from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType
from services.SheerkaConceptManager import ConceptManager
ATTR_MAP = {
"where": ConceptDefaultProps.WHERE,
"pre": ConceptDefaultProps.PRE,
"body": ConceptDefaultProps.BODY,
"post": ConceptDefaultProps.POST,
"ret": ConceptDefaultProps.RET,
}
class GetNextId:
def __init__(self):
@@ -104,6 +112,25 @@ def get_concept(name=None, body=None,
return Concept(metadata)
def get_evaluated_concept(blueprint: Concept | ConceptMetadata, **kwargs):
"""
Returns a concept where value are already initialized
:param blueprint:
:type blueprint:
:param kwargs:
:type kwargs:
:return:
:rtype:
"""
res = Concept(blueprint.get_metadata())
for k, v in kwargs.items():
res.set_value(ATTR_MAP.get(k, k), v)
res.get_runtime_info().is_evaluated = True
return res
def get_metadata(name=None, body=None,
id=None,
key=None,
@@ -345,7 +372,7 @@ def define_new_concept(context: ExecutionContext, c: str | Concept | ConceptMeta
metadata.parameters)
assert retval.status
concept = sheerka.newn(retval.value.metadata.name)
concept = sheerka.newi(retval.value.metadata.id)
return concept
+15 -3
View File
@@ -1,6 +1,11 @@
from base import BaseTest
def get_ret_val(res):
assert len(res) == 1
return res[0]
class TestNonReg1(BaseTest):
def test_i_can_evaluate_python(self, sheerka, user):
@@ -10,9 +15,16 @@ class TestNonReg1(BaseTest):
assert ret_val.status is True
assert ret_val.value == 2
def test_i_can_evaluate_variable_that_is_not_defined(self, sheerka, user):
def test_i_cannot_evaluate_variable_that_is_not_defined(self, sheerka, user):
res = sheerka.evaluate_user_input("a", user)
assert len(res) == 1
ret_val = res[0]
ret_val = get_ret_val(res)
assert ret_val.status is False
def test_i_can_remember_variables(self, sheerka, user):
sheerka.evaluate_user_input("a = 10", user)
res = sheerka.evaluate_user_input("a", user)
ret_val = get_ret_val(res)
assert ret_val.value == 10
+440 -5
View File
@@ -3,9 +3,11 @@ import pytest
from base import BaseTest
from common.global_symbols import NotInit
from conftest import NewOntology
from core.BuiltinConcepts import BuiltinConcepts
from core.concept import ConceptDefaultProps
from core.error import ErrorContext
from core.python_fragment import PythonFragment
from helpers import define_new_concept, get_metadata
from helpers import define_new_concept, get_concept, get_concepts, get_metadata
from services.SheerkaConceptEvaluator import ConceptEvaluator
from services.SheerkaPython import EvaluationRef
@@ -27,7 +29,7 @@ class TestConceptManager(BaseTest):
variables=(("a", "1"), ("b", "NotInit"))
)
compiled = service.build(context, metadata)
compiled = service._build_attributes(context, metadata)
pf = getattr(compiled, ConceptDefaultProps.WHERE)
assert isinstance(pf, PythonFragment)
assert pf.source_code == metadata.where
@@ -59,7 +61,7 @@ class TestConceptManager(BaseTest):
def test_i_can_manage_when_no_source_code(self, context, service):
metadata = get_metadata(name="foo")
compiled = service.build(context, metadata)
compiled = service._build_attributes(context, metadata)
assert getattr(compiled, ConceptDefaultProps.WHERE) is None
assert getattr(compiled, ConceptDefaultProps.PRE) is None
assert getattr(compiled, ConceptDefaultProps.BODY) is None
@@ -72,12 +74,32 @@ class TestConceptManager(BaseTest):
body="one + a",
variables=(("a", "1"), ("b", "NotInit")))
compiled = service.build(context, metadata)
compiled = service._build_attributes(context, metadata)
pf = getattr(compiled, ConceptDefaultProps.BODY)
assert isinstance(pf, PythonFragment)
assert pf.namespace == {"a": EvaluationRef("self", "a"),
"b": EvaluationRef("self", "b")}
def test_i_can_manage_parsing_errors(self, context, service):
metadata = get_metadata(
name="foo",
where="isinstance(a, int)", # ok
body="one + ", # not ok
variables=("a",)) # ok
compiled = service._build_attributes(context, metadata)
pf = getattr(compiled, ConceptDefaultProps.WHERE)
assert isinstance(pf, PythonFragment)
assert pf.source_code == metadata.where
pf = getattr(compiled, "a")
assert isinstance(pf, PythonFragment)
assert pf.source_code == "NotInit"
error = getattr(compiled, ConceptDefaultProps.BODY)
assert isinstance(error, ErrorContext)
def test_i_can_eval_concept_attributes(self, context, service):
with NewOntology(context, "test_i_can_eval_concept_attributes"):
foo_metadata = get_metadata(name="foo",
@@ -86,7 +108,7 @@ class TestConceptManager(BaseTest):
body="2 + a",
post="'post parameter'",
ret="self",
variables=(("a", "1"), ("b", "NotInit")))
variables=(("a", "1"), ("b", "NotInit"), ("c", "1 > 2")))
foo = define_new_concept(context, foo_metadata)
res = service.evaluate_concept(context, foo)
@@ -94,8 +116,421 @@ class TestConceptManager(BaseTest):
assert context.sheerka.isinstance(res, foo)
assert res.get_value("a") == 1
assert res.get_value("b") == NotInit
assert res.get_value("c") is False
assert res.get_value(ConceptDefaultProps.WHERE) is True
assert res.get_value(ConceptDefaultProps.PRE) is True
assert res.get_value(ConceptDefaultProps.BODY) == 3
assert res.get_value(ConceptDefaultProps.POST) == "post parameter"
assert res.get_value(ConceptDefaultProps.RET) == res
assert res.get_runtime_info().is_evaluated is True
assert res.get_runtime_info().error is None
assert ConceptDefaultProps.PRE in res.get_runtime_info().info
assert ConceptDefaultProps.WHERE in res.get_runtime_info().info
def test_i_can_can_evaluate_simple_concept(self, context, service):
with NewOntology(context, "test_i_can_eval_concept_attributes"):
foo_metadata = get_metadata(name="foo",
body="1")
foo = define_new_concept(context, foo_metadata)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert res.body == 1
assert res.get_runtime_info().is_evaluated is True
assert res.get_runtime_info().error is None
def test_i_cannot_evaluate_an_invalid_concept(self, context, service):
with NewOntology(context, "test_i_cannot_evaluate_an_invalid_concept"):
foo_metadata = get_metadata(name="foo",
where="isinstance(a, int", # ok
body="one + ", # not ok
variables=("a",))
foo = define_new_concept(context, foo_metadata)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.INVALID_CONCEPT)
assert ConceptDefaultProps.BODY in res.get_value("reason")
assert ConceptDefaultProps.WHERE in res.get_value("reason")
assert foo.get_runtime_info().is_evaluated is True
assert foo.get_runtime_info().error == res
def test_i_can_manage_runtime_errors(self, context, service):
with NewOntology(context, "test_i_can_manage_runtime_errors"):
foo_metadata = get_metadata(name="foo",
where="isinstance(a, int)", # ok
body="one + a", # not ok
variables=(("a", "1"),))
foo = define_new_concept(context, foo_metadata)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert ConceptDefaultProps.BODY in res.get_value("reason")
assert foo.get_runtime_info().is_evaluated is True
assert ConceptDefaultProps.BODY in foo.get_runtime_info().error
assert foo.get_runtime_info().error["#body#"].get_error_msg() == "name 'one' is not defined"
def test_i_do_not_override_initialisation_values_when_i_evaluate(self, context, service):
with NewOntology(context, "test_i_do_not_override_initialisation_values_when_i_evaluate"):
foo_metadata = get_metadata(name="foo",
variables=(("a", "1"), ("b", "'hello world'")))
define_new_concept(context, foo_metadata)
foo_instance = context.sheerka.new("foo", a=42, b="my value")
assert foo_instance.a == 42
assert foo_instance.b == "my value"
foo_evaluated = service.evaluate_concept(context, foo_instance)
assert foo_evaluated.a == 42
assert foo_evaluated.b == "my value"
def test_i_can_reference_other_concept(self, context, service):
with NewOntology(context, "test_i_can_reference_other_concept"):
foo, bar = get_concepts(context, "foo", get_concept("bar", body="foo"), use_sheerka=True)
res = service.evaluate_concept(context, bar)
assert context.sheerka.isinstance(res, bar)
assert bar.body == foo
def test_i_can_reference_other_concept_with_body(self, context, service):
with NewOntology(context, "test_i_can_reference_other_concept_with_body"):
foo, bar = get_concepts(context,
get_concept("foo", body="1"),
get_concept("bar", body="foo"),
use_sheerka=True)
res = service.evaluate_concept(context, bar)
assert context.sheerka.isinstance(res, bar)
assert context.sheerka.isinstance(bar.body, foo)
assert bar.body.body == 1
def test_i_can_eval_concept_of_concept(self, context, service):
with NewOntology(context, "test_i_can_eval_concept_of_concept"):
foo, bar, baz, qux = get_concepts(context,
get_concept("foo", body="1"),
get_concept("bar", body="foo"),
get_concept("baz", body="bar"),
get_concept("qux", body="baz"),
use_sheerka=True)
res = service.evaluate_concept(context, qux)
assert context.sheerka.isinstance(res, qux)
assert context.sheerka.isinstance(qux.body, baz)
assert context.sheerka.isinstance(qux.body.body, bar)
assert context.sheerka.isinstance(qux.body.body.body, foo)
assert context.sheerka.objvalue(qux) == 1
def test_concept_variables_precede_global_concepts(self, context, service):
with NewOntology(context, "test_concept_variables_precede_global_concepts"):
foo, bar, baz = get_concepts(context,
get_concept("foo"),
get_concept("bar"),
get_concept("baz", body="foo", variables=(("foo", "bar"),)),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, baz)
assert context.sheerka.isinstance(res.body, bar)
def test_i_can_evaluate_concept_when_variables_reference_others_concepts_with_body(self, context, service):
with NewOntology(context, "test_i_can_evaluate_concept_when_variables_reference_others_concepts_with_body"):
foo, bar, baz = get_concepts(context,
get_concept("foo", body="1"),
get_concept("bar", body="2"),
get_concept("baz", body="a + b", variables=(("a", "foo"), ("b", "bar"))),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, baz)
assert res.body == 3
def test_i_can_evaluate_concept_when_variable_is_a_concept_token(self, context, service):
with NewOntology(context, "test_i_can_evaluate_concept_when_variable_is_a_concept_token"):
foo, bar = get_concepts(context, "foo", get_concept("bar", body="c:foo:"), use_sheerka=True)
res = service.evaluate_concept(context, bar)
assert context.sheerka.isinstance(res, bar)
assert bar.body == foo
def test_i_can_evaluate_when_concept_attribute_is_referenced(self, context, service):
with NewOntology(context, "test_i_can_evaluate_when_concept_attribute_is_referenced"):
foo, bar = get_concepts(context,
get_concept("foo", variables=(("var", "'I am var'"),)),
get_concept("bar", body="foo.var"),
use_sheerka=True)
res = service.evaluate_concept(context, bar)
assert context.sheerka.isinstance(res, bar)
assert bar.body == 'I am var'
@pytest.mark.skip("I cannot parse complex concept")
def test_i_can_evaluate_when_body_is_a_complex_concept(self, context, service):
with NewOntology(context, "test_i_can_evaluate_concept_when_variable_is_a_concept_token"):
add, plus = get_concepts(context,
get_concept("add", body="a plus b", variables=("a", "b")),
get_concept("plus", body="x + y", variables=("x", "y")),
use_sheerka=True)
add_instance = context.sheerka.new(add, a=1, b=2)
add_instance.get_runtime_info().is_evaluated = False # little hack, before
res = service.evaluate_concept(context, add_instance)
assert context.sheerka.isinstance(res, add)
assert context.sheerka.isinstance(add_instance.body, plus)
assert context.sheerka.objvalue(add_instance) == 3
@pytest.mark.skip("I cannot parse complex concept")
def test_i_can_evaluate_when_body_is_a_complex_concept_and_same_variables_names(self, context, service):
with NewOntology(context, "test_i_can_evaluate_when_body_is_a_complex_concept_and_same_variables_names"):
add, plus = get_concepts(context,
get_concept("add", body="a plus b", variables=("a", "b")),
get_concept("plus", body="a + b", variables=("a", "b")),
use_sheerka=True)
add_instance = context.sheerka.new(add, a=1, b=2)
res = service.evaluate_concept(context, add_instance)
assert context.sheerka.isinstance(res, add)
assert context.sheerka.isinstance(add_instance.body, plus)
assert context.sheerka.objvalue(add_instance) == 3
def test_body_is_not_evaluated_if_where_clause_failed(self, context, service):
foo = get_concept("foo", body="'hello world'", where="False")
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert res.concept.get_value(ConceptDefaultProps.WHERE) is False
assert ConceptDefaultProps.WHERE in res.concept.get_runtime_info().error
assert res.concept.body is NotInit
def test_body_is_not_evaluated_if_pre_clause_failed(self, context, service):
foo = get_concept("foo", body="'hello world'", pre="False")
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert res.concept.get_value(ConceptDefaultProps.PRE) is False
assert ConceptDefaultProps.PRE in res.concept.get_runtime_info().error
assert res.concept.body is NotInit
def test_i_can_evaluate_when_variable_returns_multiple_choices(self, context, service):
with NewOntology(context, "test_i_can_evaluate_when_variable_returns_multiple_choices"):
one_int, one_str, inc = get_concepts(context,
get_concept("one", body="'one'"),
get_concept("one", body="1"),
get_concept("inc", body="a + 2", variables=(("a", "one"),)),
use_sheerka=True)
res = service.evaluate_concept(context, inc)
assert context.sheerka.isinstance(res, inc)
assert res.body == 3
def test_i_can_evaluate_when_variables_refer_to_complex_concept_synonyms(self, context, service):
with NewOntology(context, "test_i_can_evaluate_when_variables_refer_to_complex_concept_synonyms"):
one_int, one_str, inc = get_concepts(context,
get_concept("one", body="'one'"),
get_concept("one", body="1"),
get_concept("inc", body="a + 2", variables=(("a", "one + 1"),)),
use_sheerka=True)
res = service.evaluate_concept(context, inc)
assert context.sheerka.isinstance(res, inc)
assert res.body == 4
def test_i_can_use_where_clause_on_attr_value(self, context, service):
with NewOntology(context, "test_i_can_use_where_clause_on_attr_value"):
one_2, one_1, foo = get_concepts(context,
get_concept("one", body="2"),
get_concept("one", body="1"),
get_concept("foo",
body="a + 2",
where="a == 1",
variables=(("a", "one"),)),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert res.body == 3
def test_i_can_use_where_clause_on_complex_attr_value(self, context, service):
with NewOntology(context, "test_i_can_use_where_clause_on_complex_attr_value"):
one_2, one_1, foo = get_concepts(context,
get_concept("one", body="2"),
get_concept("one", body="1"),
get_concept("foo",
body="a + 2",
where="a == 2",
variables=(("a", "one + 1"),)),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert res.body == 4
def test_i_can_use_complex_where_clause(self, context, service):
# a 'where' clause with an 'and'
with NewOntology(context, "test_i_can_use_multiple_where_clause"):
one_2, one_1, two_2, two_1, foo = get_concepts(context,
get_concept("one", body="2"),
get_concept("one", body="1"),
get_concept("two", body="2"),
get_concept("two", body="1"),
get_concept("foo",
body="a + b",
where="a == 1 and b == 2",
variables=(("a", "one"), ("b", "two"))),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert res.body == 3
assert ConceptDefaultProps.WHERE in res.get_runtime_info().info
def test_where_clause_can_use_other_concept(self, context, service):
with NewOntology(context, "test_where_clause_can_use_other_concept"):
foo1, foo2, true, false = get_concepts(context,
get_concept("foo1", where="true", body="'evaluated !'"),
get_concept("foo2", where="false", body="'not evaluated !'"),
get_concept("true", body="True"),
get_concept("false", body="False"),
use_sheerka=True)
res = service.evaluate_concept(context, foo1)
assert context.sheerka.isinstance(res, foo1)
assert res.body == "evaluated !"
res = service.evaluate_concept(context, foo2)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert res.concept.body is NotInit
def test_i_can_detect_when_where_clause_failed_on_an_attribute(self, context, service):
with NewOntology(context, "test_i_can_detect_when_where_clause_failed_on_an_attribute"):
foo, one = get_concepts(context,
get_concept("foo", body="a + 1", where="a == 3", variables=(("a", "one"),)),
get_concept("one", body="1"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert context.sheerka.isinstance(res.concept, foo)
assert isinstance(res.reason, dict) and "a" in res.reason
assert foo.get_runtime_info().is_evaluated
def test_i_can_use_where_constraint_on_multiple_levels(self, context, service):
with NewOntology(context, "test_i_can_use_where_constraint_on_multiple_levels"):
foo, bar, baz = get_concepts(context,
get_concept("foo", body="True"),
get_concept("bar", body="foo"),
get_concept("baz", where="bar"),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, "baz")
assert ConceptDefaultProps.WHERE in res.get_runtime_info().info
def test_i_can_use_where_constraint_on_multiple_levels_and_fail(self, context, service):
with NewOntology(context, "test_i_can_use_where_constraint_on_multiple_levels_and_fail"):
foo, bar, baz = get_concepts(context,
get_concept("foo", body="False"),
get_concept("bar", body="foo"),
get_concept("baz", where="bar"),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert ConceptDefaultProps.WHERE in res.concept.get_runtime_info().error
def test_i_can_detect_infinite_loop(self, context, service):
with NewOntology(context, "test_i_can_detect_infinite_loop"):
foo, bar, baz = get_concepts(context,
get_concept("foo", body="bar"),
get_concept("bar", body="baz"),
get_concept("baz", body="foo"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert context.sheerka.isinstance(res.concept, foo)
assert res.reason.ids == [foo.id, bar.id, baz.id]
def test_i_can_detect_sub_infinite_loop(self, context, service):
with NewOntology(context, "test_i_can_detect_sub_infinite_loop"):
foo, bar, baz = get_concepts(context,
get_concept("foo", body="bar"),
get_concept("bar", body="baz"),
get_concept("baz", body="bar"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert context.sheerka.isinstance(res.concept, bar)
assert res.reason.ids == [bar.id, baz.id]
def test_i_can_detect_auto_infinite_loop(self, context, service):
with NewOntology(context, "test_i_can_detect_auto_infinite_loop"):
foo, = get_concepts(context,
get_concept("foo", body="foo"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
assert context.sheerka.isinstance(res.concept, foo)
assert res.reason.ids == [foo.id]
def test_i_can_select_the_valid_result_when_multiple_choice_invalid_concept(self, context, service):
with NewOntology(context, "test_i_can_select_the_valid_result_when_multiple_choice"):
foo, two_ok, two_nok = get_concepts(context,
get_concept("foo", body="two"),
get_concept("two", body="1 +"), # has to come before the other 'two'
get_concept("two", body="1 + 1"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert context.sheerka.objvalue(foo) == 2
def test_i_can_select_the_valid_result_when_multiple_choice_evaluation_error(self, context, service):
with NewOntology(context, "test_i_can_select_the_valid_result_when_multiple_choice"):
foo, two_ok, two_nok = get_concepts(context,
get_concept("foo", body="two"),
get_concept("two", body="1 / 0"), # has to come before the other 'two'
get_concept("two", body="1 + 1"),
use_sheerka=True)
res = service.evaluate_concept(context, foo)
assert context.sheerka.isinstance(res, foo)
assert context.sheerka.objvalue(foo) == 2
def test_i_can_eval_when_a_return_value_is_defined(self, context, service):
with NewOntology(context, "test_i_can_eval_when_a_return_value_is_defined"):
foo, bar, baz = get_concepts(context,
get_concept("foo"),
get_concept("bar"),
get_concept("baz", body="foo", ret="bar"),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, bar)
assert context.sheerka.isinstance(baz.body, foo)
def test_i_do_not_use_ret_in_case_of_error(self, context, service):
with NewOntology(context, "test_i_do_not_use_ret_in_case_of_error"):
foo, baz = get_concepts(context,
get_concept("foo"),
get_concept("baz", body="foo", ret="bar"),
use_sheerka=True)
res = service.evaluate_concept(context, baz)
assert context.sheerka.isinstance(res, BuiltinConcepts.EVALUATION_ERROR)
+11 -1
View File
@@ -6,8 +6,8 @@ from conftest import NewOntology
from core.BuiltinConcepts import BuiltinConcepts
from core.concept import ConceptMetadata
from core.error import ErrorContext
from helpers import get_concepts, get_metadata
from services.SheerkaConceptManager import ConceptAlreadyDefined, ConceptManager
from helpers import get_metadata
class TestConceptManager(BaseTest):
@@ -245,6 +245,16 @@ class TestConceptManager(BaseTest):
assert service.new((metadata.name, metadata.id)).id == metadata.id
def test_i_can_instantiate_a_list_of_concepts(self, context, service):
with NewOntology(context, "test_i_can_instantiate_a_list_of_concepts"):
foo, bar = get_concepts(context, "foo", "bar", use_sheerka=True)
res = service.new([foo.get_metadata(), bar.get_metadata()])
assert len(res) == 2
assert context.sheerka.isinstance(res[0], foo)
assert context.sheerka.isinstance(res[1], bar)
def test_unknown_concept_is_return_if_the_identifier_is_not_found(self, service):
assert service.new("unknown").name == BuiltinConcepts.UNKNOWN_CONCEPT
+7 -61
View File
@@ -2,74 +2,20 @@ import pytest
from base import BaseTest, DummyObj
from caching.FastCache import FastCache
from core.ExecutionContext import ContextActions
from common.global_symbols import NotFound
from core.Event import Event
from core.ExecutionContext import ContextActions, ExecutionContext
from services.SheerkaMemory import SheerkaMemory
class TestSheerkaEngine(BaseTest):
@pytest.fixture()
def service(self, sheerka):
return SheerkaMemory(sheerka) # I want a new instance to keep Sheerka clean (when I update stm)
return sheerka.services[SheerkaMemory.NAME]
def test_i_can_add_to_global_short_term_memory(self, service):
dummy = DummyObj()
service.add_to_short_term_memory(None, "a", dummy)
service.add_to_short_term_memory("a", dummy)
assert service.short_term_objects.copy() == {'global': {'a': dummy}}
def test_i_can_add_and_get_stm_data(self, context, service):
sub_context = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
service.add_to_short_term_memory(None, "a", "global level")
service.add_to_short_term_memory(context, "a", "context level")
service.add_to_short_term_memory(sub_context, "a", "sub context level")
assert service.get_from_short_term_memory(sub_context, "a") == "sub context level"
assert service.get_from_short_term_memory(context, "a") == "context level"
assert service.get_from_short_term_memory(None, "a") == "global level"
def test_i_can_list_stm_data(self, context, service):
sub_context = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
service.add_to_short_term_memory(None, "a", "global a")
service.add_to_short_term_memory(None, "b", "global b")
service.add_to_short_term_memory(context, "a", "context a")
service.add_to_short_term_memory(context, "c", "context c")
service.add_to_short_term_memory(sub_context, "d", "sub context d")
service.add_to_short_term_memory(sub_context, "a", "sub context a")
assert service.list_short_term_memory(sub_context) == {"a": "sub context a",
"b": "global b",
"c": "context c",
"d": "sub context d"}
assert service.list_short_term_memory(context) == {"a": "context a",
"b": "global b",
"c": "context c"}
assert service.list_short_term_memory(None) == {"a": "global a",
"b": "global b"}
def test_i_can_list_stm_data_when_context_have_no_entry(self, context, service):
sub_context = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
service.add_to_short_term_memory(sub_context, "d", "sub context d")
service.add_to_short_term_memory(sub_context, "a", "sub context a")
assert service.list_short_term_memory(sub_context) == {"a": "sub context a", "d": "sub context d"}
assert service.list_short_term_memory(context) == {}
assert service.list_short_term_memory(None) == {}
def test_i_value_are_removed_when_cache_is_full(self, context, service):
service.short_term_objects = FastCache(3)
context1 = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
context2 = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
context3 = context.push("TestSheerkaEngine", ContextActions.TESTING, None)
service.add_to_short_term_memory(context, "a", "context")
service.add_to_short_term_memory(context1, "b", "context 1")
service.add_to_short_term_memory(context2, "c", "context 2")
assert context.id in service.short_term_objects
service.add_to_short_term_memory(context3, "d", "context 3")
assert context.id not in service.short_term_objects
assert service.get_from_short_term_memory("a") == dummy
assert service.sheerka.om.current_ontology().fast_cache.copy() == {'a': dummy}
+236 -23
View File
@@ -4,10 +4,14 @@ from base import BaseTest, DummyObj
from common.global_symbols import NoFirstToken, NotFound, NotInit, Removed
from conftest import NewOntology
from core.BuiltinConcepts import BuiltinConcepts
from core.ExecutionContext import ContextActions
from core.concept import ConceptDefaultProps
from core.error import MethodAccessError
from evaluators.PythonParser import PythonParser
from helpers import _rv, define_new_concept, get_concepts, get_metadata
from helpers import _rv, define_new_concept, get_concepts, get_evaluated_concept, get_metadata
from parsers.ParserInput import ParserInput
from services.SheerkaPython import EvaluationRef, SheerkaPython
from parsers.tokenizer import Token, TokenKind
from services.SheerkaPython import EvalMethod, EvaluationContext, EvaluationRef, Expando, MultipleResults, SheerkaPython
def get_python_fragment(sheerka, context, command):
@@ -34,52 +38,53 @@ class TestSheerkaPython(BaseTest):
("NoFirstToken", NoFirstToken),
])
def test_i_can_evaluate_simple_expression(self, sheerka, context, service, text, expected):
with NewOntology(context, "test_i_can_evaluate_simple_expression"):
python_fragment = get_python_fragment(sheerka, context, text)
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == expected
def test_i_can_eval_isinstance_for_type(self, sheerka, context, service):
def test_i_can_eval_isinstance_with_a_type(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "isinstance('some string', str)")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is True
def test_i_can_eval_isinstance_for_concept(self, sheerka, context, service):
def test_i_can_eval_isinstance_with_a_concept(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_isinstance_for_concept"):
get_concepts(context, "foo", use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "isinstance(foo, 'foo')")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is True
# 'foo' is also a Concept
python_fragment = get_python_fragment(sheerka, context, "isinstance(foo, Concept)")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is True
def test_i_can_use_value_from_global_namespace(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "self.a")
ret = service.evaluate_python(context, python_fragment, {"self": DummyObj("my dummy value")})
ret = service.evaluate_python(context, EvaluationContext(), python_fragment,
{"self": DummyObj("my dummy value")})
assert ret == "my dummy value"
def test_i_can_eval_using_eval_ref(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "a")
python_fragment.namespace = {"a": EvaluationRef("self", "a")}
ret = service.evaluate_python(context, python_fragment, {"self": DummyObj("my dummy value")})
ret = service.evaluate_python(context, EvaluationContext(), python_fragment,
{"self": DummyObj("my dummy value")})
assert ret == "my dummy value"
@pytest.mark.skip("Concept evaluation is not implemented")
def test_i_can_eval_concept_properties(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_concept_properties"):
foo_meta = get_metadata("foo", variables=[("a", "hello world")])
foo_meta = get_metadata("foo", variables=[("a", "'hello world'")])
define_new_concept(context, foo_meta)
python_fragment = get_python_fragment(sheerka, context, "foo.a")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == "hello world"
@pytest.mark.skip("Concept evaluation is not implemented")
def test_i_can_eval_python_mixed_with_concept(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_python_mixed_with_concept"):
foo_meta = get_metadata("foo", variables=[("a", "1")])
@@ -87,41 +92,249 @@ class TestSheerkaPython(BaseTest):
get_concepts(context, foo_meta, bar_meta, use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "bar + foo.a")
ret = service.evaluate_python(context, python_fragment)
assert ret == "3"
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == 3
def test_i_can_remember_previous_results(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "a=10")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is None
python_fragment = get_python_fragment(sheerka, context, "a")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == 10
def test_i_can_import_module(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "import math")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is None
python_fragment = get_python_fragment(sheerka, context, "math.sqrt(4)")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == 2
def test_i_can_import_function_from_module(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "from math import sqrt")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret is None
python_fragment = get_python_fragment(sheerka, context, "sqrt(4)")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == 2
def test_i_can_eval_when_context_is_needed(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_when_context_is_needed"):
python_fragment = get_python_fragment(sheerka, context, "define_new_concept('foo')")
ret = service.evaluate_python(context, python_fragment)
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert sheerka.isinstance(ret.value, BuiltinConcepts.NEW_CONCEPT)
# for info, there are two level of value
# one for PythonEvaluator return value
# one for the ConceptManager return value
def test_i_can_return_multiple_results(self, sheerka, context, service):
with NewOntology(context, "test_i_can_return_multiple_values"):
foo_1, foo_2, foo_3 = get_concepts(context,
get_metadata("foo", body="'foo'"),
get_metadata("foo", body="True"),
get_metadata("foo", body="'bar'"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "foo")
evaluation_context = EvaluationContext(eval_method=EvalMethod.All)
ret = service.evaluate_python(context, evaluation_context, python_fragment)
assert ret == MultipleResults(get_evaluated_concept(foo_1, body='foo'),
"foo",
get_evaluated_concept(foo_2, body=True),
True,
get_evaluated_concept(foo_3, body='bar'),
"bar")
def test_i_can_eval_when_multiple_concepts(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_when_multiple_concepts"):
get_concepts(context,
get_metadata("one", body="'one'"),
get_metadata("one", body="1"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "one + 1")
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == 2
def test_i_can_eval_until_a_successful_result_is_found(self, sheerka, context, service):
with NewOntology(context, "test_i_can_eval_when_multiple_concepts"):
get_concepts(context,
get_metadata("one", body="'one'"),
get_metadata("one", body="1"),
get_metadata("one", body="2"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "one + 1")
service.evaluate_python(context, EvaluationContext(), python_fragment)
# check the number of evaluated namespaces
# there are 3 concepts, so there must be 6 results
# But we stop after the second 'one' concept
ec = next(filter(lambda _ec: _ec.action == ContextActions.EVALUATING_PYTHON, context.get_children()))
assert len(ec.values["all_results"]) == 4
def test_i_can_eval_all_namespaces(self, sheerka, context, service):
with NewOntology(context, "test_i_can_return_multiple_values"):
foo_1, foo_2, foo_3 = get_concepts(context,
get_metadata("foo", body="'foo'"),
get_metadata("foo", body="True"),
get_metadata("foo", body="'bar'"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "foo")
evaluation_context = EvaluationContext(eval_method=EvalMethod.All)
service.evaluate_python(context, evaluation_context, python_fragment)
# check the number of evaluated namespaces
ec = next(filter(lambda _ec: _ec.action == ContextActions.EVALUATING_PYTHON, context.get_children()))
assert len(ec.values["all_results"]) == 6
def test_i_can_eval_until_true(self, sheerka, context, service):
with NewOntology(context, "test_i_can_return_multiple_values"):
get_concepts(context,
get_metadata("foo", body="False"),
get_metadata("foo", body="True"),
get_metadata("foo", body="'bar'"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "foo")
evaluation_context = EvaluationContext(eval_method=EvalMethod.UntilTrue)
res = service.evaluate_python(context, evaluation_context, python_fragment)
assert res is True
# check the number of evaluated namespaces
# We stop after the second 'one' concept, so only 4 results
ec = next(filter(lambda _ec: _ec.action == ContextActions.EVALUATING_PYTHON, context.get_children()))
assert len(ec.values["all_results"]) == 4
def test_eval_until_success_return_false_if_true_is_not_found(self, sheerka, context, service):
with NewOntology(context, "test_i_can_return_multiple_values"):
get_concepts(context,
get_metadata("foo", body="False"),
get_metadata("foo", body="'bar'"),
use_sheerka=True)
python_fragment = get_python_fragment(sheerka, context, "foo")
evaluation_context = EvaluationContext(eval_method=EvalMethod.UntilTrue)
ret = service.evaluate_python(context, evaluation_context, python_fragment)
assert ret is False
def test_i_can_return_empty_list(self, sheerka, context, service):
python_fragment = get_python_fragment(sheerka, context, "[]")
ret = service.evaluate_python(context, EvaluationContext(), python_fragment)
assert ret == []
def test_can_create_namespaces(self, context, service):
with NewOntology(context, "test_i_can_eval_when_context_is_needed"):
namespace = service.create_namespace(context, ['in_context'], None, {}, {}, False)
assert namespace == {"in_context": context.in_context}
namespace = service.create_namespace(context, ['isinstance'], None, {}, {}, False)
assert namespace == {"isinstance": context.sheerka.extended_isinstance}
namespace = service.create_namespace(context, ['print'], None, {}, {}, False)
assert namespace == {"print": print}
namespace = service.create_namespace(context, ['print'], None, {}, {}, True)
assert namespace == {} # print method has side effect, so it's excluded
# ###################
# sheerka expando
# ###################
namespace = service.create_namespace(context, ['sheerka'], set(), {}, {}, False)
assert isinstance(namespace["sheerka"], Expando)
assert len(vars(namespace["sheerka"])) == 1 # 'expando_name' only
namespace = service.create_namespace(context, ['sheerka'], {"new", "echo"}, {}, {}, False)
assert isinstance(namespace["sheerka"], Expando)
assert len(vars(namespace["sheerka"])) == 3 # 'expando_name' + new() + echo()
with pytest.raises(MethodAccessError):
# new method is not allowed if expression_only is True
service.create_namespace(context, ['sheerka'], {"new", "echo"}, {}, {}, True)
# ###################
# sheerka methods
# ###################
namespace = service.create_namespace(context, ['new'], None, {}, {}, False)
assert namespace == {"new": context.sheerka.new} # Sheerka methods are not set
with pytest.raises(MethodAccessError):
# new method is not allowed if expression_only is True
service.create_namespace(context, ['new'], None, {}, {}, True)
# ###################
context.sheerka.add_to_short_term_memory("key", "short term memory value")
namespace = service.create_namespace(context, ['key'], None, {}, {}, False)
assert namespace == {"key": "short term memory value"}
foo = define_new_concept(context, get_metadata("foo", body="1"))
foo_token = Token(TokenKind.CONCEPT, ("foo", None), 0, 1, 1)
namespace = service.create_namespace(context, ['foo'], None, {"foo": foo_token}, {}, False)
assert context.sheerka.isinstance(namespace["foo"], foo)
assert namespace["foo"].body == 1 # local namespace are evaluated
namespace = service.create_namespace(context, ['foo'], None, {}, {"foo": foo}, False)
assert context.sheerka.isinstance(namespace["foo"], foo)
assert namespace["foo"].body is NotInit # global namespace are used as is
namespace = service.create_namespace(context, ['foo'], None, {}, {}, False)
assert context.sheerka.isinstance(namespace["foo"], foo)
assert namespace["foo"].body == 1 # concept instantiation are evaluated
def test_i_can_manage_concept_synonyms(self, context, service):
foo, bar, baz = get_concepts(context, "foo", "bar", "baz", use_sheerka=False)
# foo, bar and baz are supposed to be concept synonyms
namespace = {"a": "value a",
"foo": MultipleResults(foo, bar, baz),
"b": "value b",
"bar": MultipleResults(baz, bar)}
res = service.manage_multiple_choices(namespace)
assert len(res) == 6
assert res[0] == {"a": "value a", "b": "value b", "foo": foo, "bar": baz}
assert res[1] == {"a": "value a", "b": "value b", "foo": foo, "bar": bar}
assert res[2] == {"a": "value a", "b": "value b", "foo": bar, "bar": baz}
assert res[3] == {"a": "value a", "b": "value b", "foo": bar, "bar": bar}
assert res[4] == {"a": "value a", "b": "value b", "foo": baz, "bar": baz}
assert res[5] == {"a": "value a", "b": "value b", "foo": baz, "bar": bar}
def test_i_can_manage_namespaces_when_concepts_have_values(self, context, service):
foo, bar, baz = get_concepts(context, "foo", "bar", "baz", use_sheerka=False)
foo.set_value(ConceptDefaultProps.BODY, "foo value")
foo.get_runtime_info().is_evaluated = True
bar.set_value(ConceptDefaultProps.BODY, "bar value")
bar.get_runtime_info().is_evaluated = True
namespaces = [
{"a": "value a", "foo": foo, "baz": baz},
{"a": "value a", "foo": bar, "baz": baz},
]
res = service.manage_concepts_with_body(context, namespaces)
assert len(res) == 4
assert res == [
{"a": "value a", "baz": baz, "foo": foo},
{"a": "value a", "baz": baz, "foo": "foo value"},
{"a": "value a", "baz": baz, "foo": bar},
{"a": "value a", "baz": baz, "foo": "bar value"},
]
def test_multiple_results_concept_only_return_concepts(self, context):
foo, bar = get_concepts(context, "foo", "bar")
assert MultipleResults(foo, "one", bar, 1).concepts_only() == MultipleResults(foo, bar)
assert MultipleResults("one", 1).concepts_only() == MultipleResults()
+22 -2
View File
@@ -1,8 +1,8 @@
import pytest
from common.global_symbols import NotInit
from core.concept import Concept, ConceptMetadata, DefinitionType
from helpers import GetNextId, get_concept, get_concepts, get_metadata, get_metadatas
from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType
from helpers import GetNextId, get_concept, get_concepts, get_metadata, get_metadatas, get_evaluated_concept
def test_i_can_get_default_value_when_get_metadata():
@@ -221,3 +221,23 @@ def test_i_can_get_multiple_concepts_using_sheerka(sheerka, context):
assert baz2.id == "1003"
assert baz2.key == "baz __var__0"
assert baz2.get_value("var1") == "value for var1"
def test_i_can_get_multiple_concepts_when_same_name(sheerka, context):
one_str, one_int = get_concepts(context,
get_metadata("one", body="'one'"),
get_metadata("one", body="1"),
use_sheerka=True)
assert sheerka.isinstance(one_str, "one")
assert sheerka.isinstance(one_int, "one")
def test_i_can_create_test_concept(sheerka, context):
concept = get_concept("one", body="'one'")
test_concept = get_evaluated_concept(concept, body='hello', a="value for a")
assert test_concept.get_metadata() == concept.get_metadata()
assert test_concept.get_value(ConceptDefaultProps.BODY) == "hello"
assert test_concept.get_value("a") == "value for a"