519 lines
14 KiB
Python
519 lines
14 KiB
Python
from typing import Literal
|
|
|
|
from common.global_symbols import NotInit
|
|
from common.utils import unstr_concept
|
|
from core.ExecutionContext import ExecutionContext
|
|
from core.ReturnValue import ReturnValue
|
|
from core.concept import Concept, ConceptDefaultProps, ConceptMetadata, DefinitionType
|
|
from parsers.ParserInput import ParserInput
|
|
from parsers.state_machine import MetadataToken, UnrecognizedToken
|
|
from parsers.tokenizer import Tokenizer
|
|
from services.SheerkaConceptManager import ConceptManager
|
|
|
|
ATTR_MAP = {
|
|
"where": ConceptDefaultProps.WHERE,
|
|
"pre": ConceptDefaultProps.PRE,
|
|
"body": ConceptDefaultProps.BODY,
|
|
"post": ConceptDefaultProps.POST,
|
|
"ret": ConceptDefaultProps.RET,
|
|
}
|
|
|
|
|
|
class GetNextId:
|
|
def __init__(self):
|
|
self.seq = 1000
|
|
|
|
def next(self):
|
|
self.seq += 1
|
|
return self.seq
|
|
|
|
|
|
def get_concept(name=None, body=None,
|
|
id=None,
|
|
key=None,
|
|
where=None,
|
|
pre=None,
|
|
post=None,
|
|
ret=None,
|
|
definition=None,
|
|
definition_type=None,
|
|
desc=None,
|
|
props=None,
|
|
variables=None,
|
|
parameters=None,
|
|
bound_body=None,
|
|
is_builtin=False,
|
|
is_unique=False,
|
|
autouse=False,
|
|
sequence=None) -> Concept:
|
|
"""
|
|
Create a Concept objet
|
|
Caution : 'id' and 'key' are not initialized
|
|
|
|
:param name:
|
|
:type name:
|
|
:param body:
|
|
:type body:
|
|
:param id:
|
|
:type id:
|
|
:param key:
|
|
:type key:
|
|
:param where:
|
|
:type where:
|
|
:param pre:
|
|
:type pre:
|
|
:param post:
|
|
:type post:
|
|
:param ret:
|
|
:type ret:
|
|
:param definition:
|
|
:type definition:
|
|
:param definition_type:
|
|
:type definition_type:
|
|
:param desc:
|
|
:type desc:
|
|
:param props:
|
|
:type props:
|
|
:param variables:
|
|
:type variables:
|
|
:param parameters:
|
|
:type parameters:
|
|
:param bound_body:
|
|
:type bound_body:
|
|
:param is_builtin:
|
|
:type is_builtin:
|
|
:param is_unique:
|
|
:type is_unique:
|
|
:param autouse:
|
|
:type autouse:
|
|
:param sequence:
|
|
:type sequence:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
metadata = get_metadata(
|
|
name, body,
|
|
id,
|
|
key,
|
|
where,
|
|
pre,
|
|
post,
|
|
ret,
|
|
definition,
|
|
definition_type,
|
|
desc,
|
|
props,
|
|
variables,
|
|
parameters,
|
|
bound_body,
|
|
is_builtin,
|
|
is_unique,
|
|
autouse
|
|
)
|
|
if sequence:
|
|
metadata.auto_init(sequence)
|
|
else:
|
|
metadata.digest = ConceptManager.compute_metadata_digest(metadata)
|
|
metadata.all_attrs = ConceptManager.compute_all_attrs(metadata.variables)
|
|
return Concept(metadata)
|
|
|
|
|
|
def get_evaluated_concept(blueprint: Concept | ConceptMetadata, **kwargs):
|
|
"""
|
|
Returns a concept where value are already initialized
|
|
:param blueprint:
|
|
:type blueprint:
|
|
:param kwargs:
|
|
:type kwargs:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
|
|
def _isfloat(num):
|
|
try:
|
|
float(num)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
res = Concept(blueprint.get_metadata())
|
|
|
|
for attr in ATTR_MAP:
|
|
source_code = getattr(res.get_metadata(), attr)
|
|
if source_code == "" or source_code is None:
|
|
value = NotInit
|
|
elif source_code[0] in ("'", '"'):
|
|
value = source_code[1:-1]
|
|
elif source_code in ("True", "False"):
|
|
value = source_code == "True"
|
|
elif source_code.isdecimal():
|
|
value = int(source_code)
|
|
elif _isfloat(source_code):
|
|
value = float(source_code)
|
|
else:
|
|
raise Exception(f"Cannot manage {attr=}, {source_code=}")
|
|
|
|
setattr(res, ATTR_MAP[attr], value)
|
|
|
|
# force values
|
|
for k, v in kwargs.items():
|
|
res.set_value(ATTR_MAP.get(k, k), v)
|
|
|
|
res.get_runtime_info().is_evaluated = True
|
|
|
|
return res
|
|
|
|
|
|
def get_metadata(name=None, body=None,
|
|
id=None,
|
|
key=None,
|
|
where=None,
|
|
pre=None,
|
|
post=None,
|
|
ret=None,
|
|
definition=None,
|
|
definition_type=DefinitionType.DEFAULT,
|
|
desc=None,
|
|
props=None,
|
|
variables=None,
|
|
parameters=None,
|
|
bound_body=None,
|
|
is_builtin=False,
|
|
is_unique=False,
|
|
autouse=False,
|
|
digest=None,
|
|
all_attrs=None):
|
|
new_variables = []
|
|
if variables:
|
|
for v in variables:
|
|
if isinstance(v, tuple):
|
|
new_variables.append(v)
|
|
else:
|
|
new_variables.append((v, NotInit))
|
|
|
|
return ConceptMetadata(
|
|
id,
|
|
name,
|
|
key,
|
|
is_builtin,
|
|
is_unique,
|
|
body,
|
|
where,
|
|
pre,
|
|
post,
|
|
ret,
|
|
definition,
|
|
definition_type,
|
|
desc,
|
|
autouse,
|
|
bound_body,
|
|
props or {},
|
|
tuple(new_variables),
|
|
parameters or [],
|
|
digest,
|
|
all_attrs,
|
|
)
|
|
|
|
|
|
def metadata_auto_init(self: ConceptMetadata, sequence) -> ConceptMetadata:
|
|
"""
|
|
Helper function for the unit tests.
|
|
This method will be added to the `ConceptMetadata` to ease the writing of the unit tests
|
|
It properly initializes the ConceptMetadata
|
|
:param self:
|
|
:type self:
|
|
:param sequence:
|
|
:type sequence:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
if not self.id:
|
|
self.id = str(sequence.next())
|
|
if not self.key:
|
|
self.key = ConceptManager.create_concept_key(self.name, self.definition, self.variables)
|
|
if not self.is_unique:
|
|
self.is_unique = False
|
|
if not self.is_builtin:
|
|
self.is_builtin = False
|
|
if not self.definition_type:
|
|
self.definition_type = DefinitionType.DEFAULT
|
|
if not self.all_attrs:
|
|
self.all_attrs = ConceptManager.compute_all_attrs(self.variables)
|
|
if not self.digest:
|
|
self.digest = ConceptManager.compute_metadata_digest(self)
|
|
|
|
# Note that I do not automatically update the digest as I don't want to make unnecessary computations
|
|
|
|
return self
|
|
|
|
|
|
def metadata_clone(self: ConceptMetadata, name=None, body=None,
|
|
key=None,
|
|
where=None,
|
|
pre=None,
|
|
post=None,
|
|
ret=None,
|
|
definition=None,
|
|
definition_type=None,
|
|
desc=None,
|
|
props=None,
|
|
variables=None,
|
|
parameters=None,
|
|
bound_body=None,
|
|
is_builtin=None,
|
|
is_unique=None,
|
|
autouse=None,
|
|
digest=None,
|
|
all_attrs=None) -> ConceptMetadata:
|
|
"""
|
|
Helper function for the unit tests.
|
|
This method will be added to the `ConceptMetadata` to ease the writing of the unit tests
|
|
It clones a ConceptMetadata, but can override some attributes if requested
|
|
:param self:
|
|
:type self:
|
|
:param name:
|
|
:type name:
|
|
:param body:
|
|
:type body:
|
|
:param key:
|
|
:type key:
|
|
:param where:
|
|
:type where:
|
|
:param pre:
|
|
:type pre:
|
|
:param post:
|
|
:type post:
|
|
:param ret:
|
|
:type ret:
|
|
:param definition:
|
|
:type definition:
|
|
:param definition_type:
|
|
:type definition_type:
|
|
:param desc:
|
|
:type desc:
|
|
:param props:
|
|
:type props:
|
|
:param variables:
|
|
:type variables:
|
|
:param parameters:
|
|
:type parameters:
|
|
:param bound_body:
|
|
:type bound_body:
|
|
:param is_builtin:
|
|
:type is_builtin:
|
|
:param is_unique:
|
|
:type is_unique:
|
|
:param autouse:
|
|
:type autouse:
|
|
:param digest:
|
|
:type digest:
|
|
:param all_attrs:
|
|
:type all_attrs:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
return ConceptMetadata(
|
|
id=self.id,
|
|
name=self.name if name is None else name,
|
|
body=self.body if body is None else body,
|
|
key=self.key if key is None else key,
|
|
where=self.where if where is None else where,
|
|
pre=self.pre if pre is None else pre,
|
|
post=self.post if post is None else post,
|
|
ret=self.ret if ret is None else ret,
|
|
definition=self.definition if definition is None else definition,
|
|
definition_type=self.definition_type if definition_type is None else definition_type,
|
|
desc=self.desc if desc is None else desc,
|
|
props=self.props if props is None else props,
|
|
variables=self.variables if variables is None else variables,
|
|
parameters=self.parameters if parameters is None else parameters,
|
|
bound_body=self.bound_body if bound_body is None else bound_body,
|
|
is_builtin=self.is_builtin if is_builtin is None else is_builtin,
|
|
is_unique=self.is_unique if is_unique is None else is_unique,
|
|
autouse=self.autouse if autouse is None else autouse,
|
|
digest=self.digest if digest is None else digest,
|
|
all_attrs=self.all_attrs if all_attrs is None else all_attrs,
|
|
)
|
|
|
|
|
|
# Helpers functions for unit tests
|
|
setattr(ConceptMetadata, 'auto_init', metadata_auto_init)
|
|
setattr(ConceptMetadata, 'clone', metadata_clone)
|
|
|
|
|
|
def get_metadatas(*args, **kwargs):
|
|
as_metadatas = [arg if isinstance(arg, ConceptMetadata) else get_metadata(arg) for arg in args]
|
|
next_id = kwargs.get("next_id", None)
|
|
if next_id:
|
|
for metadata in as_metadatas:
|
|
metadata_auto_init(metadata, next_id)
|
|
|
|
return as_metadatas
|
|
|
|
|
|
def get_concepts(context: ExecutionContext, *concepts, **kwargs) -> list[Concept]:
|
|
"""
|
|
Simple and quick way to get initialize concepts for a test
|
|
:param context:
|
|
:type context:
|
|
:param concepts:
|
|
:type concepts:
|
|
:param kwargs:
|
|
:type kwargs:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
res = []
|
|
use_sheerka = kwargs.pop("use_sheerka", False)
|
|
sequence = kwargs.pop("sequence", None)
|
|
for c in concepts:
|
|
if use_sheerka:
|
|
c = define_new_concept(context, c)
|
|
elif isinstance(c, str):
|
|
c = get_concept(c)
|
|
|
|
if sequence:
|
|
c.get_metadata().auto_init(sequence)
|
|
|
|
res.append(c)
|
|
|
|
return res
|
|
|
|
|
|
def get_evaluated_concepts(context, *concepts, use_sheerka=False) -> list[Concept]:
|
|
if use_sheerka:
|
|
return [context.sheerka.evaluate_concept(context, Concept(c.get_metadata())) for c in concepts]
|
|
else:
|
|
return [get_evaluated_concept(concept) for concept in concepts]
|
|
|
|
|
|
def define_new_concept(context: ExecutionContext, c: str | Concept | ConceptMetadata) -> Concept:
|
|
sheerka = context.sheerka
|
|
if isinstance(c, str):
|
|
retval = sheerka.define_new_concept(context, c)
|
|
else:
|
|
metadata = c.get_metadata()
|
|
retval = sheerka.define_new_concept(context,
|
|
metadata.name,
|
|
metadata.is_builtin,
|
|
metadata.is_unique,
|
|
metadata.body,
|
|
metadata.where,
|
|
metadata.pre,
|
|
metadata.post,
|
|
metadata.ret,
|
|
metadata.definition,
|
|
metadata.definition_type,
|
|
metadata.autouse,
|
|
metadata.bound_body,
|
|
metadata.desc,
|
|
metadata.props,
|
|
metadata.variables,
|
|
metadata.parameters)
|
|
|
|
assert retval.status
|
|
concept = sheerka.newi(retval.value.metadata.id)
|
|
return concept
|
|
|
|
|
|
def get_file_content(file_name):
|
|
with open(file_name) as f:
|
|
return f.read()
|
|
|
|
|
|
def get_parser_input(text):
|
|
pi = ParserInput(text)
|
|
assert pi.init()
|
|
|
|
return pi
|
|
|
|
|
|
def get_from(*args, **kwargs):
|
|
"""
|
|
Convert the input to fix the positions
|
|
:param args:
|
|
:type args:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
cache = {} # I keep the name in cache to avoid having to remind it everytime
|
|
pos = 0
|
|
res = []
|
|
for item in args:
|
|
start = pos
|
|
if isinstance(item, MetadataToken):
|
|
if item.metadata.name:
|
|
cache[item.metadata.id] = item.metadata.name
|
|
|
|
tokens = list(Tokenizer(cache[item.metadata.id], yield_eof=False))
|
|
pos += len(tokens)
|
|
resolution_method = kwargs.get("resolution_method", item.resolution_method)
|
|
parser = kwargs.get("parser", item.parser)
|
|
res.append(MetadataToken(item.metadata, start, pos - 1, resolution_method, parser))
|
|
elif isinstance(item, UnrecognizedToken):
|
|
tokens = list(Tokenizer(item.buffer, yield_eof=False))
|
|
pos += len(tokens)
|
|
res.append(UnrecognizedToken(item.buffer, start, pos - 1))
|
|
|
|
return res
|
|
|
|
|
|
def _rv(value, who="Test"):
|
|
return ReturnValue(who=who, status=True, value=value)
|
|
|
|
|
|
def _rvc(concept_name, who="Test"):
|
|
next_id = GetNextId()
|
|
concept = get_concept(concept_name, sequence=next_id)
|
|
return ReturnValue(who=who, status=True, value=concept)
|
|
|
|
|
|
def _rvf(value, who="Test"):
|
|
"""
|
|
Return Value False
|
|
:param value:
|
|
:type value:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
return ReturnValue(who=who, status=False, value=value)
|
|
|
|
|
|
def _ut(buffer, start=0, end=-1):
|
|
"""
|
|
helper to UnrecognizedToken
|
|
:param buffer:
|
|
:type buffer:
|
|
:param start:
|
|
:type start:
|
|
:param end:
|
|
:type end:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
return UnrecognizedToken(buffer, start, end)
|
|
|
|
|
|
def _mt(concept_id,
|
|
start=0,
|
|
end=-1,
|
|
resolution_method: Literal["name", "key", "id"] = "id",
|
|
parser="simple",
|
|
**kwargs):
|
|
"""
|
|
helper to MetadataToken
|
|
:param concept_id:
|
|
:type concept_id:
|
|
:param start:
|
|
:type start:
|
|
:param end:
|
|
:type end:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
name, _id = unstr_concept(concept_id)
|
|
variables = [(k, v) for k, v in kwargs.items()] if kwargs else None
|
|
metadata = get_metadata(id=concept_id, variables=variables) if _id is None \
|
|
else get_metadata(id=_id, name=name, variables=variables)
|
|
return MetadataToken(metadata, start, end, resolution_method, parser)
|