150 lines
5.1 KiB
Python
150 lines
5.1 KiB
Python
import logging
|
|
|
|
from core.builtin_concepts import ReturnValueConcept, BuiltinConcepts
|
|
from parsers.BaseParser import BaseParser
|
|
from core.tokenizer import Tokenizer, Keywords, TokenKind, LexerError
|
|
from core.concept import VARIABLE_PREFIX
|
|
|
|
|
|
class ExactConceptParser(BaseParser):
|
|
"""
|
|
Tries to recognize a single concept
|
|
"""
|
|
|
|
MAX_WORDS_SIZE = 10
|
|
|
|
def __init__(self, **kwargs):
|
|
BaseParser.__init__(self, "ExactConcept", 80)
|
|
|
|
def parse(self, context, text):
|
|
"""
|
|
text can be string, but text can also be an list of tokens
|
|
:param context:
|
|
:param text:
|
|
:return:
|
|
"""
|
|
|
|
context.log(self.verbose_log, f"Parsing '{text}'", self.name)
|
|
res = []
|
|
sheerka = context.sheerka
|
|
try:
|
|
words = self.get_words(text)
|
|
except LexerError as e:
|
|
context.log(self.verbose_log, f"Error found in tokenizer {e}", self.name)
|
|
return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.ERROR, body=e))
|
|
|
|
if len(words) > self.MAX_WORDS_SIZE:
|
|
context.log(self.verbose_log, f"Max words reached. Stopping.", self.name)
|
|
return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.CONCEPT_TOO_LONG, body=text))
|
|
|
|
recognized = False
|
|
for combination in self.combinations(words):
|
|
|
|
concept_key = " ".join(combination)
|
|
result = sheerka.new(concept_key)
|
|
|
|
if sheerka.isinstance(result, BuiltinConcepts.UNKNOWN_CONCEPT):
|
|
continue
|
|
|
|
concepts = result.body if sheerka.isinstance(result, BuiltinConcepts.ENUMERATION) else [result]
|
|
|
|
for concept in concepts:
|
|
context.log(self.verbose_log, f"Recognized concept {concept}.", self.name)
|
|
# update the properties if needed
|
|
for i, token in enumerate(combination):
|
|
if token.startswith(VARIABLE_PREFIX):
|
|
index = int(token[len(VARIABLE_PREFIX):])
|
|
concept.set_prop_by_index(index, words[i])
|
|
if self.verbose_log.isEnabledFor(logging.DEBUG):
|
|
prop_name = list(concept.props.keys())[index]
|
|
context.log(
|
|
self.verbose_log,
|
|
f"Added property {index}: {prop_name}='{words[i]}'.",
|
|
self.name)
|
|
|
|
res.append(ReturnValueConcept(
|
|
self.name,
|
|
True,
|
|
context.sheerka.new(
|
|
BuiltinConcepts.PARSER_RESULT,
|
|
parser=self,
|
|
source=text if isinstance(text, str) else self.get_text_from_tokens(text),
|
|
body=concept,
|
|
try_parsed=concept)))
|
|
recognized = True
|
|
|
|
if recognized:
|
|
if len(res) == 1:
|
|
self.log_result(context, text, res[0])
|
|
else:
|
|
self.log_multiple_results(context, text, res)
|
|
return res
|
|
return res
|
|
|
|
ret = sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=text))
|
|
self.log_result(context, text, ret)
|
|
return ret
|
|
|
|
@staticmethod
|
|
def get_words(text):
|
|
tokens = iter(Tokenizer(text)) if isinstance(text, str) else text
|
|
res = []
|
|
for t in tokens:
|
|
if t.type == TokenKind.EOF:
|
|
break
|
|
if t.type == TokenKind.NEWLINE or t.type == TokenKind.WHITESPACE:
|
|
continue
|
|
res.append(t.value.value if isinstance(t.value, Keywords) else t.value)
|
|
return res
|
|
|
|
def combinations(self, iterable):
|
|
# combinations('foo', 'bar', 'baz') -->
|
|
# ('foo', 'bar', 'baz'),
|
|
# ('__var__0', 'bar', 'baz'),
|
|
# ('foo', '__var__0', 'baz'),
|
|
# ('foo', 'bar', '__var__0'),
|
|
# ('__var__0', '__var__1', 'baz'),
|
|
# ('__var__0', 'bar', '__var__1'),
|
|
# ('foo', '__var__0', '__var__1'),
|
|
# ('__var__0', '__var__1', '__var__2')]
|
|
|
|
pool = tuple(iterable)
|
|
n = len(pool)
|
|
|
|
res = set()
|
|
|
|
for r in range(0, n + 1):
|
|
indices = list(range(r))
|
|
res.add(self.get_tuple(pool, indices))
|
|
while True:
|
|
for i in reversed(range(r)):
|
|
if indices[i] != i + n - r:
|
|
break
|
|
else:
|
|
break
|
|
indices[i] += 1
|
|
for j in range(i + 1, r):
|
|
indices[j] = indices[j - 1] + 1
|
|
res.add(self.get_tuple(pool, indices))
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def get_tuple(pool, indices):
|
|
res = []
|
|
vars = {}
|
|
k = 0
|
|
|
|
# init vars
|
|
for i in indices:
|
|
value = pool[i]
|
|
if value not in vars:
|
|
vars[pool[i]] = f"{VARIABLE_PREFIX}{k}"
|
|
k += 1
|
|
|
|
# create tuple
|
|
for i in range(len(pool)):
|
|
value = pool[i]
|
|
res.append(vars[value] if value in vars else value)
|
|
return tuple(res)
|