182 lines
6.6 KiB
Python
182 lines
6.6 KiB
Python
import logging
|
|
|
|
import core.builtin_helpers
|
|
from core.builtin_concepts import ReturnValueConcept, BuiltinConcepts
|
|
from core.concept import VARIABLE_PREFIX
|
|
from core.sheerka.services.SheerkaExecute import ParserInput
|
|
from core.tokenizer import Keywords, TokenKind, LexerError
|
|
from core.utils import str_concept
|
|
from parsers.BaseParser import BaseParser
|
|
|
|
|
|
class ExactConceptParser(BaseParser):
|
|
"""
|
|
Tries to recognize a single concept
|
|
"""
|
|
|
|
MAX_WORDS_SIZE = 6
|
|
|
|
def __init__(self, max_word_size=None, **kwargs):
|
|
BaseParser.__init__(self, "ExactConcept", 80)
|
|
self.max_word_size = max_word_size
|
|
|
|
def parse(self, context, parser_input: ParserInput):
|
|
"""
|
|
text can be string, but text can also be an list of tokens
|
|
:param context:
|
|
:param parser_input:
|
|
:return:
|
|
"""
|
|
|
|
context.log(f"Parsing '{parser_input}'", self.name)
|
|
sheerka = context.sheerka
|
|
|
|
try:
|
|
parser_input.reset()
|
|
words = self.get_words(parser_input)
|
|
except LexerError as e:
|
|
context.log(f"Error found in tokenizer {e}", self.name)
|
|
return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.ERROR, body=e))
|
|
|
|
if len(words) > (self.max_word_size or self.MAX_WORDS_SIZE):
|
|
context.log(f"Max words reached. Stopping.", self.name)
|
|
too_long = sheerka.new(BuiltinConcepts.CONCEPT_TOO_LONG, body=parser_input.as_text())
|
|
body = sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text(), reason=too_long)
|
|
return sheerka.ret(self.name, False, body)
|
|
|
|
already_recognized = [] # keep track of the concepts founds
|
|
for combination in self.combinations(words):
|
|
|
|
concept_key = " ".join(combination)
|
|
result = sheerka.new(concept_key) # use new(), not get() because we need a new instance
|
|
|
|
if sheerka.isinstance(result, BuiltinConcepts.UNKNOWN_CONCEPT):
|
|
continue
|
|
|
|
concepts = result if isinstance(result, list) else [result]
|
|
|
|
for concept in concepts:
|
|
if concept in already_recognized:
|
|
context.log(f"Recognized concept {concept} again. Skipping.", self.name)
|
|
# example
|
|
# if the input is foo a and a concept is defined as foo a
|
|
# The will be two matches. One for 'foo a' and 'foo _var_0'
|
|
# but it's the same concept foo a
|
|
continue
|
|
|
|
context.log(f"Recognized concept {concept}.", self.name)
|
|
# update the properties if needed
|
|
for i, token in enumerate(combination):
|
|
if token.startswith(VARIABLE_PREFIX):
|
|
index = int(token[len(VARIABLE_PREFIX):])
|
|
value = words[i]
|
|
concept.def_var_by_index(index, str_concept(value) if isinstance(value, tuple) else value)
|
|
concept.metadata.need_validation = True
|
|
if self.verbose_log.isEnabledFor(logging.DEBUG):
|
|
prop_name = concept.metadata.variables[index][0]
|
|
context.log(
|
|
f"Added variable {index}: {prop_name}='{words[i]}'.",
|
|
self.name)
|
|
|
|
already_recognized.append(concept)
|
|
|
|
by_name = sheerka.resolve(parser_input.as_text())
|
|
core.builtin_helpers.set_is_evaluated(by_name)
|
|
recognized = self.merge_concepts(already_recognized, by_name)
|
|
|
|
if len(recognized) == 0:
|
|
ret = sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT,
|
|
body=parser_input.as_text()))
|
|
self.log_result(context, parser_input, ret)
|
|
return ret
|
|
else:
|
|
res = [self.as_return_value(context, parser_input, c) for c in recognized]
|
|
if len(res) == 1:
|
|
self.log_result(context, parser_input, res[0])
|
|
else:
|
|
self.log_multiple_results(context, parser_input, res)
|
|
return res
|
|
|
|
@staticmethod
|
|
def get_words(parser_input):
|
|
res = []
|
|
for t in parser_input.as_tokens():
|
|
if t.type == TokenKind.EOF:
|
|
break
|
|
if t.type == TokenKind.NEWLINE or t.type == TokenKind.WHITESPACE:
|
|
continue
|
|
res.append(t.value.value if isinstance(t.value, Keywords) else t.value)
|
|
return res
|
|
|
|
def combinations(self, iterable):
|
|
# combinations('foo', 'bar', 'baz') -->
|
|
# ('foo', 'bar', 'baz'),
|
|
# ('__var__0', 'bar', 'baz'),
|
|
# ('foo', '__var__0', 'baz'),
|
|
# ('foo', 'bar', '__var__0'),
|
|
# ('__var__0', '__var__1', 'baz'),
|
|
# ('__var__0', 'bar', '__var__1'),
|
|
# ('foo', '__var__0', '__var__1'),
|
|
# ('__var__0', '__var__1', '__var__2')]
|
|
|
|
pool = tuple(iterable)
|
|
n = len(pool)
|
|
|
|
res = set()
|
|
|
|
for r in range(0, n + 1):
|
|
indices = list(range(r))
|
|
res.add(self.get_tuple(pool, indices))
|
|
while True:
|
|
for i in reversed(range(r)):
|
|
if indices[i] != i + n - r:
|
|
break
|
|
else:
|
|
break
|
|
indices[i] += 1
|
|
for j in range(i + 1, r):
|
|
indices[j] = indices[j - 1] + 1
|
|
res.add(self.get_tuple(pool, indices))
|
|
|
|
# remove all result that contains a token concepts
|
|
# They are not valid entries, since a token concept MUST be replaced by a variable
|
|
filtered = set()
|
|
for combination in res:
|
|
for entry in combination:
|
|
if isinstance(entry, tuple):
|
|
break
|
|
else:
|
|
filtered.add(combination)
|
|
|
|
return filtered
|
|
|
|
@staticmethod
|
|
def get_tuple(pool, indices):
|
|
res = []
|
|
vars = {}
|
|
k = 0
|
|
|
|
# init vars
|
|
for i in indices:
|
|
value = pool[i]
|
|
if value not in vars:
|
|
vars[pool[i]] = f"{VARIABLE_PREFIX}{k}"
|
|
k += 1
|
|
|
|
# create tuple
|
|
for i in range(len(pool)):
|
|
value = pool[i]
|
|
res.append(vars[value] if value in vars else value)
|
|
return tuple(res)
|
|
|
|
def as_return_value(self, context, parser_input, concept):
|
|
return ReturnValueConcept(
|
|
self.name,
|
|
True,
|
|
context.sheerka.new(
|
|
BuiltinConcepts.PARSER_RESULT,
|
|
parser=self,
|
|
source=parser_input.as_text(),
|
|
body=concept,
|
|
try_parsed=concept))
|