Commit 140833ce authored by Alexandr Sokolov's avatar Alexandr Sokolov

Merge branch 'dev_parser_aDOT' into 'dev'

Dev parser a dot See merge request !4
parents 55ad2207 b05eec3f
......@@ -2,3 +2,4 @@
config_research.json
*.log
__pycache__
/venv/
......@@ -20,4 +20,4 @@ If you want to update the package to have the most fresh one, please first unins
To start working with `comsdk`, create your own `config_research.json` based on [this example](/config_research.json.example) and put it into `~/.comsdk`:
```bash
$ cp config_research.json.example ~/.comsdk/config_research.json
```
```
\ No newline at end of file
......@@ -92,6 +92,15 @@ class Graph:
self.term_state.is_term_state = True
self._initialized = False
def __repr__(self):
return (
f"Graph(\n"
f" init_state={self.init_state!r},\n"
f" term_state={self.term_state!r},\n"
f" initialized={self._initialized}\n"
f")"
)
def run(self, data):
'''
Goes through the graph and returns boolean denoting whether the graph has finished successfully.
......@@ -114,6 +123,7 @@ class Graph:
return False
# cur_state, implicit_parallelization_info = morph(data)
cur_state = transfer_f(data)
print(cur_state)
# print(morph)
if '__EXCEPTION__' in data:
return False
......
import shutil
from functools import reduce, partial
import os
import re
......@@ -163,7 +164,7 @@ def is_sequence(obj):
'''
Checks whether obj is a sequence (string does not count as a sequence)
'''
return isinstance(obj, collections.Sequence) and (not hasattr(obj, 'strip'))
return isinstance(obj, collections.abc.Sequence) and (not hasattr(obj, 'strip'))
def cp(from_, to_):
'''
......
......@@ -7,26 +7,26 @@ from comsdk.edge import Edge
class Params():
__slots__=(
'module',
'entry_func',
'predicate',
'selector',
'function',
'morphism',
'parallelism',
'comment',
'order',
'subgraph'
__slots__ = (
'module', 'entry_func', 'predicate', 'selector', 'function',
'morphism', 'parallelism', 'comment', 'order', 'subgraph',
'keys_mapping', 'executable_parameters', 'connection_data',
'preprocessor', 'postprocessor', 'edge_index', 'config_section',
)
def __init__(self):
for slot in self.__slots__:
setattr(self, slot, None)
self.comment = "" # Инициализируем как пустую строку вместо None
def __str__(self):
stri = ""
for s in self.__slots__:
stri += ((s+": {}, ".format(getattr(self, s))) if getattr(self, s) is not None else "")
attr = getattr(self, s)
if attr is not None and s != 'comment': # Комментарии выводим отдельно
stri += f"{s}: {attr}, "
if self.comment:
stri += f"comment: '{self.comment}'"
return stri
# entities = {}
......@@ -121,6 +121,7 @@ class GraphFactory():
subgr = parsr.parse_file(self.entities[s].subgraph)
self.states[s].replace_with_graph(subgr)
self.graph = Graph(self.graph.init_state, self.graph.term_state)
print(self.graph)
return self.graph
......@@ -190,34 +191,73 @@ class Parser():
setattr(res[i], sl, vals[sl][i])
return res
#Props is line "[proFp=smth, ...]"
def _param_from_props(self,props):
def _param_from_props(self, props):
parm = Params()
comment = ""
if props =="":
if props == "":
return parm
props = props.replace("]", '')
# Извлекаем комментарий, но не удаляем его полностью
if '\"' in props:
m = [m for m in re.finditer(r'\".*\"', props)][0]
comment = props[m.span()[0]+1:m.span()[1]-1]
props=props[:m.span()[0]]+props[m.span()[1]:]
matches = [m for m in re.finditer(r'\"(.*?)\"', props)]
if matches:
comment = matches[0].group(1).replace("\0", " ")
# Заменяем комментарий на специальный маркер, чтобы не мешал дальнейшему парсингу
props = props[:matches[0].start()] + "COMMENT_PLACEHOLDER" + props[matches[0].end():]
if '(' in props:
mchs = [m for m in re.finditer(r'\((\w+,)*\w+\)', props)]
for m in mchs:
props=props[:m.span()[0]]+(props[m.span()[0]:m.span()[1]]).replace(',','\0')+props[m.span()[1]:]
props = props.replace("(","")
props = props.replace(")","")
rs =props.split(r",") #.split(r", ")
props = props[:m.span()[0]] + (props[m.span()[0]:m.span()[1]]).replace(',', '\0') + props[m.span()[1]:]
props = props.replace("(", "")
props = props.replace(")", "")
# Восстанавливаем комментарий после обработки скобок
props = props.replace("COMMENT_PLACEHOLDER", "")
rs = props.split(r",")
for r in rs:
r=r.split(r"=", 1)
r = r.split(r"=", 1)
if r[0] in parm.__slots__:
setattr(parm, r[0], r[1])
else:
raise Exception("\tERROR:Unknown parameter: "+ r[0])
if comment != "":
setattr(parm, "comment", comment.replace("\0", " "))
raise Exception("\tERROR:Unknown parameter: " + r[0])
if comment != "":
setattr(parm, "comment", comment)
return parm
#Props is line "[proFp=smth, ...]"
#def _param_from_props(self,props):
# parm = Params()
#comment = ""
#if props =="":
# return parm
#props = props.replace("]", '')
#if '\"' in props:
# удаляеться комментарий
# m = [m for m in re.finditer(r'\".*\"', props)][0]
# comment = props[m.span()[0]+1:m.span()[1]-1]
# props=props[:m.span()[0]]+props[m.span()[1]:]
#if '(' in props:
# mchs = [m for m in re.finditer(r'\((\w+,)*\w+\)', props)]
# for m in mchs:
# props=props[:m.span()[0]]+(props[m.span()[0]:m.span()[1]]).replace(',','\0')+props[m.span()[1]:]
#props = props.replace("(","")
#props = props.replace(")","")
#rs =props.split(r",") #.split(r", ")
#for r in rs:
# r=r.split(r"=", 1)
# if r[0] in parm.__slots__:
# setattr(parm, r[0], r[1])
# else:
# raise Exception("\tERROR:Unknown parameter: "+ r[0])
#if comment != "":
# setattr(parm, "comment", comment.replace("\0", " "))
#return parm
def _param_from_entln(self, raw):
res = re.split(r"\[", raw, 1)
return res[0], self._param_from_props(res[1])
......@@ -275,30 +315,49 @@ class Parser():
def parse_file(self, filename):
# @todo В случае, если на вход будет подан файл в отличной от UTF-8 кодировке программа работать не будет
file = open(filename, encoding='utf-8')# "r")
dot = file.read()
with open(filename, "r", encoding="utf-8") as file:
dot = file.read()
# проверка на правильное количесво скобок
self._check_brackets(dot)
# @todo Возможно стоит заменить данный код на библиотеку Lark
# поиск всех подстрок в кавычках
comments = [m for m in re.finditer(r'\".*\"', dot)]
for m in comments:
# Заменяет пробелы внутри кавычек на специальный символ \0
dot=dot[:m.span()[0]]+(dot[m.span()[0]:m.span()[1]]).replace(' ','\0')+dot[m.span()[1]:]
# Удаляет все пробелы, табы(\t) и возвраты каретки(\r), кроме тех, что внутри кавычек(они заменены на \0)
dot = re.sub(r"[ \t\r]", "", dot) #deleting all spaces
# Удаляет ключевые слова digraph, а также фигурные скобки { и }
dot = re.sub(r"((digraph)|}|{)", "", dot)
# Удаляет однострочные комментарии, начинающиеся с //
dot = re.sub(r"\/\/.*", "", dot)
# Удаляет строки, содержащие только перенос строки (\n)
dot = re.sub(r"^\n$", "", dot)
# @ todo заменить нижние две строки на dotlines = [line.strip() for line in dot.splitlines() if line.strip()]
# разбивает строку на список подстрок
dotlines = dot.splitlines()
# фильтрует пустые подстроки
dotlines = list(filter(None, dotlines))
# записали имя графа в класс GraphFactory
self.fact.name = dotlines[0]
# Удалили из строки
dotlines = dotlines[1:]
# ent_re - regular expr for edges, states, functions properties
# ищет строки вида ИМЯ[атрибуты]
print(dotlines)
ent_re = re.compile(r"^\w+\[.*\]$")
# top_re - regular expr for topology properties, most time consuming one
# Строки вида ИСТОЧНИК->ЦЕЛЬ[атрибуты] или ИСТОЧНИК=>ЦЕЛЬ[атрибуты]
top_re = re.compile(r"^(\w+,?)+(->|=>)(\w+,?)+(\[(\w+=(\(?\w+,?\)?)+,?)+\])?")
# (r"^\w[\w\s,]*(->|=>)\s*\w[\w\s,=\[\]()]*$")
for i, ln in enumerate(dotlines):
# функция
if ent_re.match(ln):
name, parm = self._param_from_entln(ln)
print(f'{parm}')
self.fact.entities[name] = parm
# топология
elif top_re.match(ln):
self._topology(ln)
return self.fact.build(Parser.subgr_count)
......@@ -306,6 +365,7 @@ class Parser():
checked=[]
bushes = {}
selectorends = {}
def generate_cpp(self, filename=None):
self.fact.graph.init_state.input_edges_number =0
states_to_check = [self.fact.graph.init_state]
......
def dummy_edge(data):
"""Пустое ребро, не изменяет данные"""
pass
def increment_a_edge(data):
data['a'] += 1
def increment_a_double(data):
data['a'] *= 2
def increment_a_array_edge(data):
for i in range(len(data['a'])):
data['a'][i] += 1
......@@ -15,7 +19,8 @@ def decrement_a_edge(data):
data['a'] -= 1
def nonzero_predicate(data):
return data['a'] != 0
"""Предикат: возвращает True если 'a' не равно 0"""
return data['a'] != 0
def positiveness_predicate(data):
return data['a'] > 0
......@@ -30,5 +35,64 @@ def selector_a_nonpositive(data):
res = data['a'] <= 0
return [res, not res]
def selector_a_positive(data):
res = data['a'] > 0
print(f"Selector check: a={data['a']}, continue={res}")
return [res, not res]
def true_predicate(data):
return True
def process_a(data):
"""Обработка ветки A: умножает a на 2"""
if 'a' not in data:
data['a'] = 0 # Инициализация по умолчанию
data['a'] *= 2
data['processed_by'] = 'A'
def process_b(data):
"""Обработка ветки B: устанавливает b в 10 (вместо добавления)"""
data['b'] += 10
data['processed_by'] = 'B'
def check_condition(data):
"""Предикат для выбора ветки (True - ветка A, False - ветка B)"""
return data.get('value', 0) % 2 == 0 # Безопасное получение value
def branch_selector(data):
"""Селектор ветвления на основе check_condition"""
condition = check_condition(data)
return [condition, not condition]
def init_data(data):
data['initialized'] = True
data['processed'] = False
data['value'] = data.get('input', 0)
def validate_data(data):
if 'value' not in data:
raise ValueError("Data not initialized")
data['valid'] = True
def process_data(data):
if not data.get('valid', False):
raise ValueError("Invalid data")
data['processed'] = True
data['value'] *= 2
def save_result(data):
data['saved'] = True
data['result'] = data['value']
def cleanup(data):
data['clean'] = True
def branch_selector(data):
"""Селектор ветвления для теста атрибутов"""
return [True, False]
class ThreadParallelizationPolicy:
"""Тестовая политика параллелизма"""
pass
digraph SIMPLEST {
FUNCA [module=test_funcs.simplest, entry_func=increment_a_edge]
FUNCB [module=test_funcs.simplest, entry_func=increment_b_edge]
PRED [module=test_funcs.simplest, entry_func=true_predicate]
INCR_A [predicate=PRED, function=FUNCA]
INCR_B [predicate=PRED, function=FUNCB]
FUNCA [module=test_funcs.simplest, entry_func=increment_a_edge, comment="name FUNC_A"]
FUNCB [module=test_funcs.simplest, entry_func=increment_b_edge, comment="name FUNC_B"]
PRED [module=test_funcs.simplest, entry_func=true_predicate, comment="name PRED"]
INCR_A [predicate=PRED, function=FUNCA, comment="name INCR_A"]
INCR_B [predicate=PRED, function=FUNCB, comment="name INCR_B"]
__BEGIN__ -> ROOT
ROOT -> BR1, BR2 [morphism=(INCR_A, INCR_B)]
BR1 -> BR1_ST [morphism=INCR_A]
......
......@@ -4,9 +4,9 @@ digraph SIMPLEST {
INCR_A [predicate=PRED, function=FUNCA]
ST1 [subgraph=tests/adot/trivial.adot]
ST2 [subgraph=tests/adot/cycled.adot]
ST3 [subgraph=tests/adot/branching.adot]
ST1 [subgraph=./adot/trivial.adot]
ST2 [subgraph=./adot/cycled.adot]
ST3 [subgraph=./adot/branching.adot]
__BEGIN__ -> ST1
ST1 -> ST2 [morphism=INCR_A]
......
digraph CYCLIC {
// Селектор для узла LOOP
SEL_LOOP [module="selectors", entry_func="check_iterations"]
// Объявление функций
FUNC [module=test_funcs.simplest, entry_func=decrement_a_edge]
PRED [module=test_funcs.simplest, entry_func=true_predicate]
MORPH_LOOP [predicate=PRED, function=FUNC]
MORPH_EXIT [function=INC]
LOOP [selector=SEL_LOOP] # Привязка селектора к узлу
// Топология
__BEGIN__ -> LOOP [edge_index=0]
LOOP -> LOOP [morphism=MORPH_LOOP, edge_index=1]
LOOP -> __END__ [morphism=MORPH_EXIT, edge_index=2]
}
\ No newline at end of file
digraph TRIVIAL {
// Объявление функций
FUNC [module=test_funcs.simplest, entry_func=increment_a_edge]
PRED [module=test_funcs.simplest, entry_func="always_true"]
MORPH [predicate=PRED, function=FUNC, comment="Basic increment"]
// Топология
__BEGIN__ -> STEP1 [morphism=MORPH, edge_index=0]
STEP1 -> STEP2 [morphism=MORPH, edge_index=1]
STEP2 -> __END__ [morphism=MORPH, edge_index=2]
}
\ No newline at end of file
digraph ATTRIBUTES_TEST {
// Тестовые функции
FUNC1 [module=test_funcs.simplest, entry_func=increment_a_edge, comment="Функция инкремента A"]
FUNC2 [module=test_funcs.simplest, entry_func=increment_b_edge, comment="Функция инкремента B"]
PRED [module=test_funcs.simplest, entry_func=nonzero_predicate, comment="Предикат проверки ненулевого значения"]
SEL [module=test_funcs.simplest, entry_func=branch_selector, comment="Селектор ветвления"]
// Узлы с атрибутами
NODE1 [selector=SEL, parallelism=threading]
NODE2 [subgraph=./test_adot_files/preprocess.adot]
// Ребра с атрибутами
EDGE1 [predicate=PRED, function=FUNC1, comment="Первое ребро с полным набором атрибутов"]
EDGE2 [function=FUNC1, comment="Второе ребро с множественными морфизмами"]
// Структура графа
__BEGIN__ -> NODE1 [morphism=EDGE1]
NODE1 -> NODE2 [morphism=EDGE2]
NODE2 -> __END__
}
\ No newline at end of file
digraph BRANCHING_TEST {
/ Функции
PROCESS_A [module=test_funcs.simplest, entry_func=process_a]
PROCESS_B [module=test_funcs.simplest, entry_func=process_b]
// Морфизмы
EDGE_A [function=PROCESS_A]
EDGE_B [function=PROCESS_B]
// Граф
__BEGIN__ -> DECISION_POINT
DECISION_POINT -> BRANCH_A [morphism=EDGE_A]
DECISION_POINT -> BRANCH_B [morphism=EDGE_B]
BRANCH_A -> MERGE_POINT
BRANCH_B -> MERGE_POINT
MERGE_POINT -> __END__
}
\ No newline at end of file
digraph CYCLED {
// Определение функций
DECREMENT_A [module=test_funcs.simplest, entry_func=decrement_a_edge]
NONZERO_PRED [module=test_funcs.simplest, entry_func=positiveness_predicate]
NONPOS_SELECTOR [module=test_funcs.simplest, entry_func=selector_a_positive]
DUMMY_FUNC [module=test_funcs.simplest, entry_func=dummy_edge]
// Определение морфизмов
DECREMENT_EDGE [predicate=NONZERO_PRED, function=DECREMENT_A, comment="Уменьшает a на 1 пока a != 0"]
EXIT_EDGE [function=DUMMY_FUNC, comment="Выход из цикла"]
// Определение узлов
ST1 [comment="Начальное состояние цикла"]
ST2 [selector=NONPOS_SELECTOR, comment="Узел с селектором"]
// Определение графа
__BEGIN__ -> ST1 [comment="Начало выполнения"]
ST1 -> ST2 [morphism=DECREMENT_EDGE, comment="Переход с уменьшением a"]
ST2 -> ST1 [order=1, comment="Продолжение цикла если a > 0"]
ST2 -> __END__ [order=2, morphism=EXIT_EDGE, comment="Выход если a <= 0"]
}
\ No newline at end of file
digraph EDGE_TYPES_TEST {
// Определение функций
FUNC_A [module=test_funcs.simplest, entry_func=increment_a_edge]
FUNC_B [module=test_funcs.simplest, entry_func=increment_b_edge]
TRUE_PRED [module=test_funcs.simplest, entry_func=true_predicate]
// Определение разных типов ребер
EDGE_SIMPLE [predicate=TRUE_PRED, function=FUNC_A, comment="Обычное ребро (->)"]
EDGE_PARALLEL [predicate=TRUE_PRED, function=FUNC_B, comment="Параллельное ребро (=>)"]
// Определение узлов
__BEGIN__ -> NODE_START
NODE_START -> NODE_MIDDLE [morphism=EDGE_SIMPLE, order=0]
NODE_START => NODE_MIDDLE [morphism=EDGE_PARALLEL, order=1]
NODE_MIDDLE -> __END__
}
\ No newline at end of file
digraph MAIN_GRAPH {
// Функции
MAIN_PROCESS [module=test_funcs.simplest, entry_func=process_data]
/ Функции-предикаты
ALWAYS_TRUE [module=test_funcs.simplest, entry_func=true_predicate, comment = pred_ALWAYS_TRUE]
// Морфизмы
EDGE_MAIN [predicate=ALWAYS_TRUE, function=MAIN_PROCESS, comment = edge_1]
// Главный граф
PREPROCESS [subgraph=./test_adot_files/preprocess.adot]
POSTPROCESS [subgraph=./test_adot_files/postprocess.adot]
__BEGIN__ -> PREPROCESS
PREPROCESS -> MAIN_PROCESS [morphism=EDGE_MAIN]
MAIN_PROCESS -> POSTPROCESS
POSTPROCESS -> __END__
}
\ No newline at end of file
digraph POSTPROCESS {
// Функции постобработки
SAVE_RESULT [module=test_funcs.simplest, entry_func=save_result]
CLEANUP [module=test_funcs.simplest, entry_func=cleanup]
/ Функции-предикаты
ALWAYS_TRUE [module=test_funcs.simplest, entry_func=true_predicate, comment = pred_ALWAYS_TRUE_postprocess]
// Морфизмы
EDGE_1 [predicate=ALWAYS_TRUE, function=SAVE_RESULT, comment = edge_1_postprocess]
EDGE_2 [predicate=ALWAYS_TRUE, function=CLEANUP, comment = edge_2_postprocess]
__BEGIN__ -> SAVE
SAVE -> FINAL [morphism=EDGE_1]
FINAL -> __END__ [morphism=EDGE_2]
}
\ No newline at end of file
digraph PREPROCESS {
// Функции предобработки
INIT_DATA [module=test_funcs.simplest, entry_func=init_data, comment=""]
VALIDATE [module=test_funcs.simplest, entry_func=validate_data]
/ Функции-предикаты
ALWAYS_TRUE [module=test_funcs.simplest, entry_func=true_predicate, comment = pred_ALWAYS_TRUE]
// Морфизмы
EDGE_1 [predicate=ALWAYS_TRUE, function=INIT_DATA, comment = edge_1]
EDGE_2 [predicate=ALWAYS_TRUE, function=VALIDATE, comment = edge_2]
__BEGIN__ -> INIT
INIT -> VALIDATE [morphism=EDGE_1]
VALIDATE -> __END__ [morphism=EDGE_2]
}
\ No newline at end of file
digraph SEQUENTIAL_TEST {
// Функции-обработчики
INCREMENT [module=test_funcs.simplest, entry_func=increment_a_edge, comment = func_INCREMENT]
DOUBLE [module=test_funcs.simplest, entry_func=increment_a_double, comment = func_DOUBLE]
// Функции-предикаты
ALWAYS_TRUE [module=test_funcs.simplest, entry_func=true_predicate, comment = pred_ALWAYS_TRUE]
// Морфизмы
EDGE_1 [predicate=ALWAYS_TRUE, function=INCREMENT, comment = edge_1]
EDGE_2 [predicate=ALWAYS_TRUE, function=DOUBLE, comment = edge_2]
// Граф
__BEGIN__ -> STEP_1 [morphism=EDGE_1]
STEP_1 -> STEP_2 [morphism=EDGE_2]
STEP_2 -> __END__
}
\ No newline at end of file
import unittest
import os
from comsdk.parser import Parser
from comsdk.graph import Graph
class TestADOTParser(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_files_dir = os.path.join(os.path.dirname(__file__), "test_adot_files")
def setUp(self):
self.parser = Parser()
def test_sequential_graph(self):
"""Тест последовательного графа"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "sequential.adot"))
self.assertIsInstance(graph, Graph)
# Проверка структуры
init_state = graph.init_state
self.assertEqual(len(init_state.transfers), 1)
# Проверка выполнения
data = {"a": 1}
result = graph.run(data)
self.assertTrue(result)
self.assertEqual(data["a"], 4) # 1 -> (inc) 2 -> (double) 4
def test_cycled_graph(self):
"""Тест циклического графа с уменьшением a до 0"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "cycled.adot"))
self.assertIsInstance(graph, Graph)
# Проверка выполнения
data = {"a": 5} # Начинаем с 5
result = graph.run(data)
self.assertTrue(result)
self.assertEqual(data["a"], 0,
"Граф должен уменьшать a до 0")
# Дополнительная проверка с другим начальным значением
data = {"a": 10}
result = graph.run(data)
self.assertTrue(result)
self.assertEqual(data["a"], 0)
def test_branching_graph(self):
"""Тест графа с ветвлениями"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "branching.adot"))
self.assertIsInstance(graph, Graph)
data = {"a": 5, "b": 5}
result = graph.run(data)
self.assertTrue(result)
self.assertEqual(data["a"], 10) # 5 * 2 = 10
self.assertEqual(data["b"], 15)
data = {"a": 1, "b": 3}
result = graph.run(data)
self.assertTrue(result)
self.assertEqual(data["b"], 13)
self.assertEqual(data["a"], 2)
def test_subgraph_integration(self):
"""Тест интеграции подграфов"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "main_graph.adot"))
self.assertIsInstance(graph, Graph)
data = {'input': 5}
result = graph.run(data)
print(data)
self.assertTrue(result)
self.assertEqual(data['value'], 10) # 5 * 2 = 10
self.assertTrue(data['initialized'])
self.assertTrue(data['valid'])
self.assertTrue(data['processed'])
self.assertTrue(data['saved'])
self.assertTrue(data['clean'])
self.assertEqual(data['result'], 10)
def test_edge_types(self):
"""Тест разных типов ребер (-> и =>)"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "edge_types.adot"))
self.assertIsInstance(graph, Graph)
# Проверка структуры графа
start_node = graph.init_state.transfers[0].output_state
self.assertEqual(len(start_node.transfers), 2, "Должно быть 2 исходящих ребра")
# Проверка типов ребер
edge_simple = start_node.transfers[0].edge
edge_parallel = start_node.transfers[1].edge
self.assertEqual(edge_simple.order, 0, "Обычное ребро должно иметь order=0")
self.assertEqual(edge_parallel.order, 1, "Параллельное ребро должно иметь order=1")
def test_attributes_parsing(self):
"""Проверка парсинга атрибутов узлов и ребер"""
graph = self.parser.parse_file(os.path.join(self.test_files_dir, "attributes.adot"))
# Проверка атрибутов состояния NODE1
node1 = graph.init_state.transfers[0].output_state
self.assertEqual(node1.selector.name, "branch_selector")
#self.assertEqual(node1.parallelization_policy.__class__.__name__, "ThreadParallelizationPolicy")
#self.assertEqual(node1.comment, "Первый узел с атрибутами")
# Проверка атрибутов состояния NODE2
node2 = node1.transfers[0].output_state
#self.assertEqual(node2.comment, "Второй узел с подграфом")
self.assertTrue(hasattr(node2, '_proxy_state'), "Должен содержать подграф")
# Проверка атрибутов ребра EDGE1
edge1 = graph.init_state.transfers[0].edge
self.assertEqual(edge1.pred_f.name, "nonzero_predicate")
self.assertEqual(edge1.morph_f.name, "increment_a_edge")
self.assertEqual(edge1.comment, "Первое ребро с полным набором атрибутов")
#self.assertEqual(edge1.order, 1) # Проверка edge_index
# Проверка атрибутов ребра EDGE2
edge2 = node1.transfers[0].edge
#self.assertEqual(len(edge2.morph_fs), 2) # Проверка morphisms
#self.assertEqual(edge2.morph_fs[0].name, "increment_a_edge")
#self.assertEqual(edge2.morph_fs[1].name, "increment_b_edge")
# Проверка загрузки подграфа
subgraph = node2._proxy_state
#self.assertIsInstance(subgraph, Graph)
#self.assertEqual(subgraph.init_state.name, "__BEGIN__")
class TestGraphExecution(unittest.TestCase):
def test_parallel_execution(self):
"""Тест параллельного выполнения"""
# Здесь будут тесты реального параллельного выполнения
# с использованием threading/multiprocessing
pass
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
digraph Test {
__BEGIN__ -> NODE_1
NODE_1 [
selector=MY_SELECTOR,
subgraph="subgraph.adot",
parallelism=threading,
comment="Test node with attributes"
]
NODE_1 -> __END__
}
\ No newline at end of file
import gc
import unittest
import subprocess
......@@ -7,18 +8,31 @@ from comsdk.parser import Parser
path_to_comsdk = "/home/lbstr/bmstu/comsdk"
path_to_pycomsdk = "/home/lbstr/bmstu/pycomsdk"
class ParserGoodCheck(unittest.TestCase):
def test_trivial_graph(self):
parsr = Parser()
gr = parsr.parse_file("./tests/adot/trivial.adot")
# Проверка загрузки файла
self.assertTrue(os.path.exists("./adot/trivial_v2.adot"))
gr = parsr.parse_file("./adot/trivial_v2.adot")
print(gr)
# Основной сценарий
data = {"a": 1}
gr.run(data)
self.assertEqual(data["a"], 4)
# Проверка повторного использования
data = {"a": 4}
gr.run(data)
self.assertEqual(data["a"], 7)
def test_branching_graph(self):
parsr = Parser()
gr = parsr.parse_file("./tests/adot/branching.adot")
gr = parsr.parse_file("./adot/branching.adot")
data = {"a": 1, "b": 1}
gr.run(data)
self.assertEqual(data["a"], 4)
......@@ -26,14 +40,14 @@ class ParserGoodCheck(unittest.TestCase):
def test_cycled_graph(self):
parsr = Parser()
gr = parsr.parse_file("./tests/adot/cycled.adot")
gr = parsr.parse_file("./adot/cycled.adot")
data = {"a": 10}
gr.run(data)
self.assertEqual(data["a"], 0)
def test_complex_graph(self):
parsr = Parser()
gr = parsr.parse_file("./tests/adot/complex.adot")
gr = parsr.parse_file("./adot/complex.adot")
data = {"a": 1, "b": 1}
gr.run(data)
self.assertEqual(data["a"], 4)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment