Commit c9658e69 authored by Anton Pershin's avatar Anton Pershin

Добавлены файлы реализации и тестов графов

parent 81528781
import collections
class Graph:
def __init__(self, init_state):
self._init_state = init_state
self._initialized = False
def run(self, data):
if not self._initialized:
self._init_state.idle_run([self._init_state.name], initialize_state=True)
self._initialized = True
cur_state = self._init_state
while cur_state is not None:
morph = _run_state(cur_state, data)
if '__EXCEPTION__' in data:
return data, False
cur_state = morph(data)
if '__EXCEPTION__' in data:
return data, False
return data, True
class State:
__slots__ = [
'name',
'input_edges_number',
'looped_edges_number',
'activated_input_edges_number',
'output_edges',
'parallelization_policy',
'parallel_branches_selection_policy',
'_branching_states_history',
]
def __init__(self, name,
parallelization_policy=None,
parallel_branches_selection_policy=None,
):
self.name = name
self.parallelization_policy = parallelization_policy
self.parallel_branches_selection_policy = parallel_branches_selection_policy
self.input_edges_number = 0
self.looped_edges_number = 0
self.activated_input_edges_number = 0
self.output_edges = []
self._branching_states_history = None
def idle_run(self, branching_states_history, initialize_state=False):
# print('{} {} -> '.format(self.name, branching_states_history), end='')
if initialize_state:
self.input_edges_number += 1
if self.input_edges_number != 1:
if self._is_looped_branch(branching_states_history):
# print('Looping found')
self.looped_edges_number += 1
# else:
# print('Branches joint found')
# print('\tStop going further')
return # no need to go further if we already were there
if self._branching_states_history is None:
self._branching_states_history = branching_states_history
else:
self.activated_input_edges_number += 1 # BUG: here we need to choose somehow whether we proceed or not
# if len(self.output_edges) == 0:
# print('Terminate state found')
if len(self.output_edges) == 1:
self.output_edges[0].identity().idle_run(branching_states_history, initialize_state)
else:
for i, edge in enumerate(self.output_edges):
next_state = edge.identity()
next_state.idle_run(branching_states_history + [next_state.name], initialize_state)
def connect_to(self, term_state, edge):
edge.set_output_state(term_state)
self.output_edges.append(edge)
def run(self, data):
# print(self.name, data['a'])
self.activated_input_edges_number += 1
if not self._ready_to_morph():
return None # it means that this state waits for some incoming edges (it is a point of collision of several edges)
self._reset_activity()
if len(self.output_edges) == 0:
return morphism_to_termination
predicate_values = []
for edge in self.output_edges:
predicate_values.append(edge.predicate(data))
selected_edge_indices = self.parallel_branches_selection_policy.select(predicate_values)
if not selected_edge_indices:
raise GraphUnexpectedTermination(
'State {}: Predicate values {} do not conform selection policy'.format(self.name, predicate_values))
selected_edges = [self.output_edges[i] for i in selected_edge_indices]
return self.parallelization_policy.make_morphism(selected_edges)
def _ready_to_morph(self):
required_activated_input_edges_number = self.input_edges_number - self.looped_edges_number
#print(self.input_edges_number, self.looped_edges_number)
return self.activated_input_edges_number == required_activated_input_edges_number
def _reset_activity(self):
self.activated_input_edges_number = 0
def _is_looped_branch(self, branching_states_history):
return set(self._branching_states_history).issubset(branching_states_history)
class Edge:
__slots__ = [
'_predicate',
'_morphism',
'_output_state',
]
def __init__(self, predicate, morphism, output_state=None):
self._predicate = predicate
self._morphism = morphism
self._output_state = output_state
def set_output_state(self, output_state):
self._output_state = output_state
def predicate(self, data):
return self._predicate(data)
def morph(self, data):
self._morphism(data)
return self._output_state
def identity(self):
return self._output_state
def morphism_to_termination(data):
return None
class SerialParallelizationPolicy:
# def __init__(self, data):
# self.data = data
def __init__(self):
pass
def make_morphism(self, edges):
def _morph(data):
next_morphisms = [edge.morph for edge in edges]
cur_morphisms = []
while len(next_morphisms) != 1:
cur_morphisms[:] = next_morphisms[:]
del next_morphisms[:]
for morph in cur_morphisms:
next_state = morph(data)
if next_state is None:
return None
next_morphism = _run_state(next_state, data)
if '__EXCEPTION__' in data:
return None
if next_morphism is not None:
next_morphisms.append(next_morphism)
next_state = next_morphisms[0](data)
return next_state
return _morph
class OnlyOneSelectionPolicy:
def __init__(self):
pass
def select(self, predicate_values):
trues_indices = _get_trues(predicate_values)
if len(trues_indices) != 1:
return None
return trues_indices
class AllSelectionPolicy:
def __init__(self):
pass
def select(self, predicate_values):
trues_indices = _get_trues(predicate_values)
if len(trues_indices) != len(predicate_values):
return None
return trues_indices
class BadGraphStructure(Exception):
pass
class GraphUnexpectedTermination(Exception):
pass
def _get_trues(boolean_list):
return [i for i, val in enumerate(boolean_list) if val == True]
def _run_state(state, data):
try:
next_morphism = state.run(data)
except GraphUnexpectedTermination as e:
data['__EXCEPTION__'] = str(e)
return None
return next_morphism
\ No newline at end of file
import unittest
from resappserver.graph import *
def dummy_edge(data):
pass
def increment_a_edge(data):
data['a'] += 1
def increment_b_edge(data):
data['b'] += 1
def decrement_a_edge(data):
data['a'] -= 1
def dummy_predicate(data):
return True
def nonzero_predicate(data):
return True if data['a'] != 0 else False
def positiveness_predicate(data):
return True if data['a'] > 0 else False
def nonpositiveness_predicate(data):
return True if data['a'] <= 0 else False
def print_exception(exc_data, data):
print('exception data: {}'.format(exc_data))
print('current state of data: {}'.format(data))
class GraphGoodCheck(unittest.TestCase):
initial_conditions = range(-10, 10)
def test_trivial_serial_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_serial_graph([{'a': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a',), (-1, 0), correct_outputs)
def test_trivial_parallel_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_parallel_graph([{'a': ic, 'b': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a', 'b'), (-1, 0), correct_outputs)
def test_trivial_cycled_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_cycled_graph([{'a': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a',), (), correct_outputs)
def test_complex_graph_made_from_trivial_ones(self):
'''
serial graph + parallel graph + cycled graph
'''
s_1, s_2, correct_outputs = self._get_trivial_serial_graph([{'a': ic, 'b': ic} for ic in self.initial_conditions])
s_3, s_4, correct_outputs = self._get_trivial_parallel_graph(correct_outputs)
s_5, s_6, correct_outputs = self._get_trivial_cycled_graph(correct_outputs)
s_2.connect_to(s_3, edge=Edge(dummy_predicate, dummy_edge))
s_4.connect_to(s_5, edge=Edge(dummy_predicate, dummy_edge))
self._run_graph(s_1, ('a', 'b'), (-3, -2, -1, 0), correct_outputs)
def _get_trivial_serial_graph(self, initial_conditions):
'''
s_1 -> s_2 -> s_3,
p_12 = p_23 := a not 0
f_12 = f_23 := a + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
s_1 = State('serial_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_2 = State('serial_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('serial_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1.connect_to(s_2, edge=Edge(nonzero_predicate, increment_a_edge))
s_2.connect_to(s_3, edge=Edge(nonzero_predicate, increment_a_edge))
#correct_outputs = [{'a': ic + 2} for ic in initial_conditions]
correct_outputs = []
for ic in initial_conditions:
ic['a'] += 2
correct_outputs.append(ic)
return s_1, s_3, correct_outputs
def _get_trivial_parallel_graph(self, initial_conditions):
'''
s_1 -> s_2 -> s_4
-> s_3 ->
p_12 = p_24 = p_13 = p_34 := a not 0
f_12 = f_24 := a + 1
f_13 = f_34 := b + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
asp = AllSelectionPolicy()
s_1 = State('parallel_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=asp)
s_2 = State('parallel_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('parallel_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_4 = State('parallel_s_4', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1.connect_to(s_2, edge=Edge(nonzero_predicate, increment_a_edge))
s_2.connect_to(s_4, edge=Edge(nonzero_predicate, increment_a_edge))
s_1.connect_to(s_3, edge=Edge(nonzero_predicate, increment_b_edge))
s_3.connect_to(s_4, edge=Edge(nonzero_predicate, increment_b_edge))
#correct_outputs = [{'a': ic + 2, 'b': ic + 2} for ic in self.initial_conditions]
correct_outputs = []
for ic in initial_conditions:
ic['a'] += 2
ic['b'] += 2
correct_outputs.append(ic)
return s_1, s_4, correct_outputs
def _get_trivial_cycled_graph(self, initial_conditions):
'''
s_1 -> s_2 -> s_3
<-
p_12 := True
p_23 := a > 0
p_23 := a <= 0
f_12 = f_23 = f_24 := a + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
s_1 = State('cycled_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_2 = State('cycled_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('cycled_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1.connect_to(s_2, edge=Edge(dummy_predicate, increment_a_edge))
s_2.connect_to(s_3, edge=Edge(positiveness_predicate, increment_a_edge))
s_2.connect_to(s_1, edge=Edge(nonpositiveness_predicate, increment_a_edge))
# correct_outputs = [{'a': ic + 2} if ic >=0 else {'a': ic%2 + 2} for ic in self.initial_conditions]
correct_outputs = []
for ic in initial_conditions:
if ic['a'] >= 0:
ic['a'] += 2
else:
ic['a'] = ic['a']%2 + 2
correct_outputs.append(ic)
return s_1, s_3, correct_outputs
def _run_graph(self, initial_state, vars_to_initialize, invalid_ics, correct_outputs):
graph = Graph(initial_state)
for ic, correct_output in zip(self.initial_conditions, correct_outputs):
print('Doing ic = {}...'.format(ic))
gotten_output, okay = graph.run({var: ic for var in vars_to_initialize})
if ic in invalid_ics:
print(gotten_output['__EXCEPTION__'])
self.assertEqual('__EXCEPTION__' in gotten_output, True)
self.assertEqual(okay, False)
else:
self.assertEqual(okay, True)
self.assertEqual(gotten_output, correct_output)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment