"""The reference implementation of the MDF execution engine; allows for executing :class:`~modeci.mdf.Graph`
objects in Python.
This module implements a set of classes for executing loaded MDF models in Python.
The implementation is organized such that each class present in :mod:`~modeci_mdf.mdf`
has a corresponding :code:`Evaluable` version of the class. Each of these classes implements
the execution of these components and tracks their state during execution. The organization of the entire execution of
the model is implemented at the top-level :func:`~modeci_mdf.execution_engine.EvaluableGraph.evaluate` method
of the :class:`EvaluableGraph` class. The external library `graph-scheduler
<https://pypi.org/project/graph-scheduler/>`_ is used to implement the scheduling of nodes under declarative
conditional constraints.
"""
import ast
import builtins
import copy
import functools
import inspect
import itertools
import os
import re
import sys
import math
import attr
import numpy as np
import graph_scheduler
import onnxruntime
from modeci_mdf.functions.standard import mdf_functions, create_python_expression
from modelspec.utils import evaluate as evaluate_params_modelspec
from modelspec.utils import _params_info, _val_info
from modelspec.utils import FORMAT_NUMPY
from collections import OrderedDict
from typing import Union, List, Dict, Optional, Any, Tuple
from modeci_mdf.mdf import (
Function,
Graph,
Condition,
Edge,
OutputPort,
InputPort,
Node,
Parameter,
)
import modeci_mdf.functions.onnx as onnx_ops
import modeci_mdf.functions.actr as actr_funcs
import modeci_mdf.functions.ddm as ddm_funcs
FORMAT_DEFAULT = FORMAT_NUMPY
KNOWN_PARAMETERS = ["constant", "math", "numpy"] + dir(builtins)
time_scale_str_regex = r"(TimeScale)?\.(.*)"
[docs]def evaluate_expr(
expr: Union[str, List[str], np.ndarray, "tf.tensor"] = None,
func_params: Dict[str, Any] = None,
array_format: str = FORMAT_DEFAULT,
allow_strings_returned: Optional[bool] = False,
verbose: Optional[bool] = False,
) -> np.ndarray:
"""Evaluates an expression given in string format and a :code:`dict` of parameters.
Args:
expr: Expression or list of expressions to be evaluated
func_params: A dict of parameters (e.g. :code:`{'weight': 2}`)
array_format: It can be a n-dimensional array or a tensor
allow_strings_returned: Don't throw an error if the expression evaluates to a string
verbose: If set to True provides in-depth information else verbose message is not displayed
Returns:
n-dimensional array
"""
e = evaluate_params_modelspec(
expr, func_params, array_format=array_format, verbose=verbose
)
if type(e) == str and e not in KNOWN_PARAMETERS and not allow_strings_returned:
raise Exception(
"Error! Could not evaluate expression [%s] with params %s, returned [%s] which is a %s"
% (expr, _params_info(func_params, multiline=True), e, type(e))
)
return e
[docs]def evaluate_onnx_expr(
expr: str,
base_parameters: Dict[str, Any],
evaluated_parameters: Dict,
verbose: bool = False,
) -> Any:
"""Evaluates a simple expression in string format representing an
ONNX function call
Args:
expr (str): Expression to be evaluated
base_parameters (Dict[str, Any]): A dict of parameters that may contain variables
evaluated_parameters (Dict): A dict mapping variables used in **base_parameters** to actual values
verbose (bool, optional): If set to True provides in-depth information else verbose message is not displayed. Defaults to False.
Returns:
Any: the return value of **expr**
"""
# Get the ONNX function
onnx_name = expr.split("(")[0].split(".")[-1]
onnx_function = getattr(onnx_ops, onnx_name)
onnx_schema = onnx_ops.get_onnx_schema(onnx_name)
onnx_arguments = set(
list(onnx_schema.attributes.keys()) + [i.name for i in onnx_schema.inputs]
)
# used to attempt to match inputs to expected onnx input types
onnx_typecast_mappings = {
onnx_schema.AttrType.INT: int,
onnx_schema.AttrType.FLOAT: float,
onnx_schema.AttrType.STRING: str,
onnx_schema.AttrType.INTS: functools.partial(np.array, dtype=int),
onnx_schema.AttrType.FLOATS: functools.partial(np.array, dtype=float),
onnx_schema.AttrType.STRINGS: functools.partial(np.array, dtype=str),
# TODO: add tensor and graph types?
}
try:
has_variadic = (
onnx_schema.inputs[0].option == onnx_schema.FormalParameterOption.Variadic
)
except IndexError:
has_variadic = False
# ONNX functions expect input args or kwargs first, followed by parameters (called attributes in ONNX) as
# kwargs. Lets construct this.
kwargs_for_onnx = {}
for kw, arg_expr in base_parameters.items():
if isinstance(arg_expr, str):
arg_expr_list = get_required_variables_from_expression(arg_expr)
for a in arg_expr_list:
try:
kwargs_for_onnx[a] = evaluated_parameters[a]
except KeyError:
pass
try:
if arg_expr[0] == "[" and arg_expr[-1] == "]":
# matches previous behavior
continue
except IndexError:
pass
kwargs_for_onnx[kw] = evaluated_parameters[kw]
kwargs_for_onnx = {
k: v
for k, v in kwargs_for_onnx.items()
if (
(k in onnx_arguments or has_variadic)
and "onnx_" not in k # filter Evaluable__ class names
)
}
# attempt to cast attributes to what onnx_function expects
for k, v in kwargs_for_onnx.items():
try:
onnx_attr = onnx_schema.attributes[k]
except KeyError:
continue
try:
cast_type = onnx_typecast_mappings[onnx_attr.type]
except KeyError:
continue
try:
kwargs_for_onnx[k] = cast_type(v)
except (TypeError, ValueError):
pass
if verbose:
print(f"Evaluating ONNX function {onnx_name} with {kwargs_for_onnx}")
try:
result = onnx_function(**kwargs_for_onnx)
except (
onnxruntime.capi.onnxruntime_pybind11_state.NotImplemented,
onnxruntime.capi.onnxruntime_pybind11_state.Fail,
) as e:
err = str(e)
if (
"bound to different types (tensor(double) and tensor(float)" not in err
and "Could not find an implementation for the node" not in err
):
raise
# assume this is related to lack of support for float64/double
# for Cos, Relu (and likely others) on onnx CPUExecutionProvider
result = onnx_function(
**{
k: v.astype(np.float32)
if hasattr(v, "dtype") and v.dtype == np.float64
else v
for k, v in kwargs_for_onnx.items()
}
)
try:
if result.dtype == np.float32:
result = result.astype(np.float64)
except AttributeError:
pass
return result
[docs]def get_required_variables_from_expression(expr: str) -> List[str]:
"""Produces a list containing variable symbols in **expr**"""
def recursively_extract_subscripted_values(s):
res = []
subscript_indices = []
depth = 0
len_s = len(s)
for i in range(len_s):
if s[i] == "[":
if depth == 0:
subscript_indices.append([i, None])
depth += 1
if s[i] == "]":
depth -= 1
if depth == 0:
subscript_indices[-1][1] = i
# s contains no subscripts, so it won't be added in below loop
if len(subscript_indices) == 0 and len_s > 0:
res.append(s)
last = 0
for start, end in subscript_indices:
if end is None:
end = len_s
res.extend(recursively_extract_subscripted_values(s[start + 1 : end]))
# add expression being subscripted
if last != start:
res.append(s[last:start])
last = end + 1
return res
if not isinstance(expr, str):
return []
result = []
for e in recursively_extract_subscripted_values(expr):
result.extend(
[
str(elem.id)
for elem in ast.walk(
ast.parse(e.strip(" ,+-*/%^&").lstrip("])").rstrip("[("))
)
if isinstance(elem, ast.Name)
]
)
return result
[docs]class EvaluableFunction:
"""Evaluates a :class:`~modeci_mdf.mdf.Function` value during MDF graph execution.
Args:
function: :func:`~modeci_mdf.mdf.Function` to be evaluated e.g. mdf standard function
verbose: If set to True Provides in-depth information else verbose message is not displayed
"""
def __init__(self, function: Function = False, verbose: Optional[bool] = False):
self.verbose = verbose
self.function = function
[docs] def evaluate(
self,
parameters: Dict[str, Any] = None,
array_format: str = FORMAT_DEFAULT,
) -> Dict[str, Any]:
r"""Performs evaluation on the basis of given parameters and array_format
Args:
parameters: A dictionary of function parameters,e.g.logistic, parameters={'gain': 2,"bias": 3,"offset": 1}
array_format: It can be a n-dimensional array or a tensor
Returns:
value of function after evaluation in Dictionary
"""
expr = None
# print("functions value and function>>>", self.function.value, self.function.function)
# func_val = self.function.value
if self.function.function:
for f in mdf_functions:
if f == self.function.function:
expr = create_python_expression(
mdf_functions[f]["expression_string"]
)
break
if expr is None:
expr = self.function.value
# #raise "Unknown function: {}. Known functions: {}".format(
# # self.function.function,
# # mdf_functions.keys,
# #)
func_params = {}
func_params.update(parameters)
if self.verbose:
print(
" Evaluating %s with %s, i.e. [%s]"
% (self.function, _params_info(func_params), expr)
)
if self.function.args is not None:
for arg in self.function.args:
func_params[arg] = evaluate_expr(
self.function.args[arg],
func_params,
verbose=False,
array_format=array_format,
)
if self.verbose:
print(
" Arg: {} became: {}".format(
arg, _val_info(func_params[arg])
)
)
# If this is an ONNX operation, evaluate it without modelspec.
if "onnx_ops." in expr:
if self.verbose:
print(f"{self.function.id} is evaluating ONNX function {expr}")
self.curr_value = evaluate_onnx_expr(
expr,
# parameters get overridden by self.function.args
{**parameters, **self.function.args},
func_params,
self.verbose,
)
elif "actr." in expr:
actr_function = getattr(actr_funcs, expr.split("(")[0].split(".")[-1])
self.curr_value = actr_function(
*[func_params[arg] for arg in self.function.args]
)
elif "ddm." in expr:
actr_function = getattr(ddm_funcs, expr.split("(")[0].split(".")[-1])
self.curr_value = ddm_function(
*[func_params[arg] for arg in self.function.args]
)
else:
self.curr_value = evaluate_expr(
expr, func_params, verbose=self.verbose, array_format=array_format
)
if self.verbose:
print(
" Evaluated %s with %s =\t%s"
% (self.function, _params_info(func_params), _val_info(self.curr_value))
)
return self.curr_value
[docs]class EvaluableParameter:
"""
Evaluates the current value of a :class:`~modeci_mdf.mdf.Parameter` during the MDF graph execution.
Args:
parameter: The parameter to evaluate during execution.
verbose: Whether to print output of parameter calculations.
"""
DEFAULT_INIT_VALUE = 0.0 # Temporary!
def __init__(self, parameter: Parameter, verbose: bool = False):
self.verbose = verbose
self.parameter = parameter
self.curr_value = None
[docs] def get_current_value(
self, parameters: Dict[str, Any], array_format: str = FORMAT_DEFAULT
) -> Any:
"""
Get the current value of the parameter; evaluates the expression if the current value has not yet been set. Note:
this is different from :code:`'evaluate'`, as calling that method multiple times can change the state of the parameter,
but calling this should not reevaluate the parameter if it has a current value.
Args:
parameters: a dictionary of parameters and their values that may or may not be needed to evaluate this
parameter.
array_format: The array format to use (either :code:`'numpy'` or :code:`tensorflow'`).
Returns:
The evaluated value of the parameter.
"""
# FIXME: Shouldn't this just call self.evaluate, seems like there is redundant code here?
if self.curr_value is None:
if (
self.parameter.value is not None
or self.parameter.default_initial_value is not None
):
if self.parameter.is_stateful():
if self.verbose:
print(f" Initial eval of <{self.parameter.summary()}> ")
if self.parameter.default_initial_value is not None:
return evaluate_expr(
self.parameter.default_initial_value,
parameters,
verbose=self.verbose,
array_format=array_format,
)
else:
return self.DEFAULT_INIT_VALUE
else:
ips = {}
ips.update(parameters)
ips[self.parameter.id] = self.DEFAULT_INIT_VALUE
self.curr_value = evaluate_expr(
self.parameter.value,
ips,
verbose=self.verbose,
array_format=array_format,
)
if self.verbose:
print(
" Initial eval of <{}> = {} ".format(
self.parameter, self.curr_value
)
)
return self.curr_value
[docs] def evaluate(
self,
parameters: Dict[str, Any],
time_increment: Optional[float] = None,
array_format: str = FORMAT_DEFAULT,
) -> Any:
"""
Evaluate the parameter and store the result in the :code:`curr_value` attribute.
Args:
parameters: a dictionary of parameters and their values that may or may not be needed to evaluate this
parameter.
time_increment: a floating point value specifying the timestep size, only used for :code:`time_derivative`
parameters
array_format: The array format to use (either :code:`'numpy'` or :code:`tensorflow'`).
Returns:
The current value of the parameter.
"""
if self.verbose:
print(
" Evaluating {} with {} ".format(
self.parameter.summary(), _params_info(parameters)
)
)
if self.parameter.value is not None:
self.curr_value = evaluate_expr(
self.parameter.value,
parameters,
verbose=False,
array_format=array_format,
)
elif self.parameter.function:
expr = None
for f in mdf_functions:
if self.parameter.function == f:
expr = create_python_expression(
mdf_functions[f]["expression_string"]
)
if not expr:
expr = self.parameter.function
# raise "Unknown function: {}. Known functions: {}".format(
# self.parameter.function,
# mdf_functions.keys,
# )
func_params = {}
func_params.update(parameters)
if self.verbose:
print(
" Evaluating %s with %s, i.e. [%s]"
% (self.parameter, _params_info(func_params), expr)
)
for arg in self.parameter.args:
func_params[arg] = evaluate_expr(
self.parameter.args[arg],
func_params,
verbose=False,
array_format=array_format,
)
if self.verbose:
print(
" Arg: {} became: {}".format(
arg, _val_info(func_params[arg])
)
)
# If this is an ONNX operation, evaluate it without modelspec.
if "onnx_ops." in expr:
if self.verbose:
print(f"{self.parameter.id} is evaluating ONNX function {expr}")
self.curr_value = evaluate_onnx_expr(
expr,
# parameters get overridden by self.parameter.args
{**parameters, **self.parameter.args},
func_params,
self.verbose,
)
elif "actr." in expr:
actr_function = getattr(actr_funcs, expr.split("(")[0].split(".")[-1])
self.curr_value = actr_function(
*[func_params[arg] for arg in self.parameter.args]
)
else:
self.curr_value = evaluate_expr(
expr,
func_params,
verbose=self.verbose,
array_format=array_format,
)
elif self.parameter.time_derivative is not None:
if time_increment == None:
self.curr_value = evaluate_expr(
self.parameter.default_initial_value,
parameters,
verbose=self.verbose,
array_format=array_format,
)
else:
td = evaluate_expr(
self.parameter.time_derivative,
parameters,
verbose=self.verbose,
array_format=array_format,
)
if self.verbose:
print(
f"Incrementing {self.parameter.id} from {self.curr_value} by {td} over time {time_increment}"
)
self.curr_value = np.add(
self.curr_value, td * time_increment, casting="safe"
)
cond_mask = None
val_if_true = None
if len(self.parameter.conditions) > 0:
for condition in self.parameter.conditions:
cond_mask = evaluate_expr(
condition.test,
parameters,
verbose=False,
array_format=array_format,
)
val_if_true = evaluate_expr(
condition.value,
parameters,
verbose=False,
array_format=array_format,
)
if self.verbose:
print(
" --- Condition: %s: %s = %s: true? %s"
% (condition.id, condition.test, val_if_true, cond_mask)
)
# e.g. if the parameter value is set only by a set of conditions...
if self.curr_value is None:
self.curr_value = self.DEFAULT_INIT_VALUE
self.curr_value = np.where(cond_mask, val_if_true, self.curr_value)
if self.verbose:
print(
" Evaluated this: %s with %s \n =\t%s"
% (
self.parameter.summary(),
_params_info(parameters),
_val_info(self.curr_value),
)
)
return self.curr_value
[docs]class EvaluableOutput:
r"""Evaluates the current value of an :class:`~modeci_mdf.mdf.OutputPort` during MDF graph execution.
Args:
output_port: Attribute of a Node which exports information to the dependent Node object
verbose: If set to True Provides in-depth information else verbose message is not displayed
"""
def __init__(self, output_port: OutputPort, verbose: Optional[bool] = False):
self.verbose = verbose
self.output_port = output_port
self.curr_value = None
[docs] def evaluate(
self,
parameters: Dict[str, Any] = None,
array_format: str = FORMAT_DEFAULT,
) -> Union[int, np.ndarray]:
"""Evaluate the value at the output port on the basis of parameters and array_format
Args:
parameters: Dictionary of global parameters of the Output Port
array_format: It is a n-dimensional array
Returns:
value at output port
"""
if self.verbose:
print(
" Evaluating %s with %s "
% (self.output_port, _params_info(parameters))
)
self.curr_value = evaluate_expr(
self.output_port.value, parameters, verbose=False, array_format=array_format
)
if self.verbose:
print(
" Evaluated %s with %s \n =\t%s"
% (
self.output_port,
_params_info(parameters),
_val_info(self.curr_value),
)
)
return self.curr_value
[docs]class EvaluableNode:
r"""Evaluates a :class:`~modeci_mdf.mdf.Node` during MDF graph execution.
Args:
node: A self contained unit of evaluation receiving input from other :class:`~modeci_mdf.mdf.Node`\(s) on
:class:`~modeci_mdf.mdf.InputPort`\(s).
verbose: If set to True Provides in-depth information else verbose message is not displayed
"""
def __init__(self, node: Node, verbose: Optional[bool] = False):
self.verbose = verbose
self.node = node
self.evaluable_inputs = {}
self.evaluable_parameters = OrderedDict()
self.evaluable_functions = OrderedDict()
self.evaluable_outputs = {}
all_known_vars = []
all_known_vars += KNOWN_PARAMETERS
for ip in node.input_ports:
rip = EvaluableInput(ip, self.verbose)
self.evaluable_inputs[ip.id] = rip
all_known_vars.append(ip.id)
# params_init[ip] = ip.curr_value
for p in node.parameters:
all_known_vars.append(p.id)
"""
for p in node.parameters:
ep = EvaluableParameter(p, self.verbose)
self.evaluable_parameters[p.id] = ep
all_known_vars.append(p.id)
# params_init[s] = s.curr_value"""
# TODO: the below checks for evaluability of functions and
# parameters using known variables are very similar and could be
# simplified with a function
all_funcs = [f for f in node.functions]
num_funcs_remaining = {f.id: None for f in node.functions}
func_missing_vars = {f.id: [] for f in node.functions}
# Order the functions into the correct sequence
while len(all_funcs) > 0:
f = all_funcs.pop(0) # pop first off list
if verbose:
print(
" Checking whether function: %s with args %s is sufficiently determined by known vars %s"
% (f.id, f.args, all_known_vars)
)
all_req_vars = []
if f.args:
for arg in f.args:
arg_expr = f.args[arg]
# If we are dealing with a list of symbols, each must treated separately
all_req_vars.extend(
[
v
for v in get_required_variables_from_expression(arg_expr)
if v not in f.args
]
)
if f.value is not None:
all_req_vars.extend(
[
v
for v in get_required_variables_from_expression(f.value)
if f.args is None or v not in f.args
]
)
all_present = [v in all_known_vars for v in all_req_vars]
func_missing_vars[f.id] = {
v for v in all_req_vars if v not in all_known_vars
}
if verbose:
print(
" Are all of %s in %s? %s"
% (all_req_vars, all_known_vars, all_present)
)
if all(all_present):
rf = EvaluableFunction(f, self.verbose)
self.evaluable_functions[f.id] = rf
all_known_vars.append(f.id)
# params_init[f] = self.evaluable_functions[f.id].evaluate(
# params_init, array_format=FORMAT_DEFAULT
# )
else:
# track the number of remaining functions each time f
# is examined. If it's the same as last time, we know
# every function was examined and nothing changed, so
# we can stop because otherwise it will just infinitely
# loop
if num_funcs_remaining[f.id] == len(all_funcs):
func_missing_vars = {
f: ", ".join(v) for f, v in func_missing_vars.items()
}
raise ValueError(
"Error! Could not evaluate functions using known vars. The following vars are missing:\n\t"
+ "\n\t".join(
f"{f}: {v}"
for f, v in func_missing_vars.items()
if len(v) > 0
)
)
else:
num_funcs_remaining[f.id] = len(all_funcs)
all_funcs.append(f)
all_params_to_check = [p for p in node.parameters]
num_params_remaining = {p.id: None for p in node.parameters}
param_missing_vars = {f.id: [] for f in node.parameters}
if self.verbose:
print("all_params_to_check: %s" % all_params_to_check)
# Order the parameters into the correct sequence
while len(all_params_to_check) > 0:
p = all_params_to_check.pop(0) # pop first off list
if verbose:
print(
" Checking whether parameter: %s with args: %s, value: %s (%s) is sufficiently determined by known vars %s"
% (p.id, p.args, p.value, type(p.value), all_known_vars)
)
all_req_vars = []
if p.value is not None and type(p.value) == str:
all_req_vars.extend(
[
v
for v in get_required_variables_from_expression(p.value)
if p.args is None or v not in p.args
]
)
if p.args is not None:
for arg in p.args:
arg_expr = p.args[arg]
if isinstance(arg_expr, str):
all_req_vars.extend(
[
v
for v in get_required_variables_from_expression(
arg_expr
)
if v not in p.args
]
)
all_known_vars_plus_this = all_known_vars + [p.id]
all_present = [v in all_known_vars_plus_this for v in all_req_vars]
param_missing_vars[p.id] = {
v for v in all_req_vars if v not in all_known_vars
}
if verbose:
print(
" Are all of %s in %s? %s, i.e. %s"
% (
all_req_vars,
all_known_vars_plus_this,
all_present,
all(all_present),
)
)
if all(all_present):
ep = EvaluableParameter(p, self.verbose)
self.evaluable_parameters[p.id] = ep
all_known_vars.append(p.id)
else:
if num_params_remaining[p.id] == len(all_params_to_check):
param_missing_vars = {
p: ", ".join(v) for p, v in param_missing_vars.items()
}
raise ValueError(
"Error! Could not evaluate parameters using known vars. The following vars are missing:\n\t"
+ "\n\t".join(
f"{p}: {v}"
for p, v in param_missing_vars.items()
if len(v) > 0
)
)
else:
num_params_remaining[p.id] = len(all_params_to_check)
all_params_to_check.append(p) # Add back to end of list...
for op in node.output_ports:
rop = EvaluableOutput(op, self.verbose)
self.evaluable_outputs[op.id] = rop
[docs] def evaluate(
self,
time_increment: Union[int, float] = None,
array_format: str = FORMAT_DEFAULT,
):
"""
Evaluate the Node for one time-step
Args:
time_increment: The time-increment to use for this evaluation.
array_format: The format to use for arrays.
"""
if self.verbose:
print(
"\n ---------------\n Evaluating Node: %s with %s"
% (self.node.id, [p.id for p in self.node.parameters])
)
curr_params = {}
for eip in self.evaluable_inputs:
i = self.evaluable_inputs[eip].evaluate(
curr_params, array_format=array_format
)
curr_params[eip] = i
# First set params to previous parameter values for use in funcs and states...
for ep in self.evaluable_parameters:
curr_params[ep] = self.evaluable_parameters[ep].get_current_value(
curr_params, array_format=array_format
)
for ef in self.evaluable_functions:
curr_params[ef] = self.evaluable_functions[ef].evaluate(
curr_params, array_format=array_format
)
# Now evaluate and set params to new parameter values for use in output...
for ep in self.evaluable_parameters:
curr_params[ep] = self.evaluable_parameters[ep].evaluate(
curr_params, time_increment=time_increment, array_format=array_format
)
for eop in self.evaluable_outputs:
self.evaluable_outputs[eop].evaluate(curr_params, array_format=array_format)
[docs] def get_output(self, id: str = None) -> Union[int, np.ndarray, Tuple]:
"""Get value at output port for given output port's id
Args:
id: Unique identifier of the output port. If None, return a tuple for all output ports.
Returns:
value at the output port. If id is None, return all outputs as a tuple. If there is only
one output, return just its value.
"""
if id is not None:
for rop in self.evaluable_outputs:
if rop == id:
return self.evaluable_outputs[rop].curr_value
else:
outputs = tuple(
self.evaluable_outputs[rop].curr_value for rop in self.evaluable_outputs
)
if len(outputs) == 1:
return outputs[0]
else:
return outputs
[docs]class EvaluableGraph:
r"""
Evaluates a :class:`~modeci_mdf.mdf.Graph` with the MDF execution engine. This is the top-level interface to the execution engine.
Args:
graph: A directed graph consisting of :class:`~modeci_mdf.mdf.Node`\(s) connected via :class:`~modeci_mdf.mdf.Edge`\(s)
verbose: If set to True Provides in-depth information else verbose message is not displayed
"""
def __init__(self, graph: Graph, verbose: Optional[bool] = False):
self.verbose = verbose
print("\nInit graph: %s" % graph.id)
self.graph = graph
self.enodes = {}
self.root_nodes = []
self.output_nodes = []
self.order_of_execution = []
# Get the root (input nodes) of the graph. We will assume all are root nodes
# and then remove those that have edges to them.
for node in graph.nodes:
if self.verbose:
print("\n Init node: %s" % node.id)
en = EvaluableNode(node, self.verbose)
self.enodes[node.id] = en
self.root_nodes.append(node.id)
for edge in graph.edges:
if (
edge.receiver in self.root_nodes
): # It could have been already removed...
self.root_nodes.remove(edge.receiver)
self.ordered_edges = []
evaluated_nodes = []
for rn in self.root_nodes:
evaluated_nodes.append(rn)
edges_to_eval = [edge for edge in self.graph.edges]
while len(edges_to_eval) > 0:
edge = edges_to_eval.pop(0)
if edge.sender not in evaluated_nodes:
edges_to_eval.append(edge) # Add back to end of list...
else:
self.ordered_edges.append(edge)
evaluated_nodes.append(edge.receiver)
if self.graph.conditions is not None:
if self.graph.conditions.node_specific is None:
conditions = {}
else:
conditions = {
self.graph.get_node(node): self.parse_condition(cond)
for node, cond in self.graph.conditions.node_specific.items()
}
termination_conds = {}
if self.graph.conditions.termination is not None:
for scale, cond in self.graph.conditions.termination.items():
cond = self.parse_condition(cond)
# check for TimeScale in form of enum or equivalent unambiguous strings
try:
scale = re.match(time_scale_str_regex, scale).groups()[1]
except (AttributeError, IndexError, TypeError):
pass
try:
termination_conds[graph_scheduler.TimeScale[scale]] = cond
except KeyError:
termination_conds[scale] = cond
else:
conditions = {}
termination_conds = {}
self.scheduler = graph_scheduler.Scheduler(
graph=self.graph.dependency_dict,
conditions=conditions,
termination_conds=termination_conds,
)
# We also need to get the output nodes
self.output_nodes = [
n
for n in self.graph.nodes
if n.id not in (e.sender for e in self.graph.edges)
]
# Lets also get the corresponding EvaluableNode for outputs
self.output_enodes = [self.enodes[n.id] for n in self.output_nodes]
[docs] def evaluate(
self,
time_increment: Union[int, float] = None,
array_format: str = FORMAT_DEFAULT,
initializer: Optional[Dict[str, Any]] = None,
):
"""
Evaluates a :class:`~modeci_mdf.mdf.Graph`. This is the top-level interface to the execution engine.
Args:
time_increment: Time step for next execution
array_format: A n-dimensional array
initializer: sets the initial value of parameters of the node
"""
# Any values that are set via the passed in initalizer, set their values. This lets us avoid creating
# dummy input nodes with parameters for evaluating the graph
for en_id, en in self.enodes.items():
for inp_name, inp in en.evaluable_inputs.items():
if initializer and inp_name in initializer:
inp.set_input_value(initializer[inp_name])
print(
"Evaluating graph: %s, root nodes: %s, with array format %s"
% (self.graph.id, self.root_nodes, array_format)
)
str_conds_nb = "\n ".join(
[
f"{node.id}: {cond}"
for node, cond in self.scheduler.conditions.conditions.items()
]
)
str_conds_term = "\n ".join(
[
f"{scale}: {cond}"
for scale, cond in self.scheduler.termination_conds.items()
]
)
if self.verbose:
print(" node-based conditions\n %s" % str_conds_nb)
print(" termination conditions\n %s" % str_conds_term)
incoming_edges = {n: set() for n in self.graph.nodes}
for edge in self.graph.edges:
incoming_edges[self.graph.get_node(edge.receiver)].add(edge)
for ts in self.scheduler.run():
if self.verbose:
print(
"> Evaluating time step: %s"
% self.scheduler.get_clock(None).simple_time
)
for node in ts:
self.order_of_execution.append(node.id)
for edge in incoming_edges[node]:
self.evaluate_edge(
edge, time_increment=time_increment, array_format=array_format
)
self.enodes[node.id].evaluate(
time_increment=time_increment, array_format=array_format
)
if self.verbose:
print("> Order of execution of nodes: %s" % self.order_of_execution)
print("\n Trial terminated")
[docs] def evaluate_edge(
self,
edge: Edge,
time_increment: Union[int, float] = None,
array_format: str = FORMAT_DEFAULT,
):
"""Evaluates edges in graph
Args:
time_increment: Time step for next execution
array_format: A n-dimensional array
"""
pre_node = self.enodes[edge.sender]
post_node = self.enodes[edge.receiver]
value = pre_node.evaluable_outputs[edge.sender_port].curr_value
weight = (
1
if not edge.parameters or not "weight" in edge.parameters
else edge.parameters["weight"]
)
if self.verbose:
print(
" Edge %s connects %s to %s, passing %s with weight %s"
% (
edge.id,
pre_node.node.id,
post_node.node.id,
_val_info(value),
_val_info(weight),
)
)
if (type(weight) == int or type(weight) == float) and weight == 1:
input_value = value
else:
input_value = weight * value
post_node.evaluable_inputs[edge.receiver_port].set_input_value(input_value)
[docs] def parse_condition(
self, condition: Union[Condition, Dict]
) -> graph_scheduler.Condition:
"""Convert the condition in a specific format
Args:
condition: Specify the condition under which a Component should be allowed to execute
Returns:
Condition in specific format
"""
def get_custom_parameter_getter(eobj):
# try to pick a default based on expected shape of the
# evaluable object before it has ever been executed
for d in [
lambda: np.zeros(eobj.input_port.shape),
lambda: np.zeros(eobj.output_port.shape),
lambda: eobj.parameter.default_initial_value,
]:
try:
default = d()
except AttributeError:
pass
else:
break
else:
default = 0
def getter(dependency, parameter):
res = eobj.curr_value
if res is None:
return default
else:
return res
return getter
def update_condition_arguments(args, condition_type):
# mdf format prefers the key name for all conditions that use it or
# its value as an __init__ argument
combined_condition_arguments = {"dependencies": "dependency"}
sig = inspect.signature(condition_type)
for preferred, actual in combined_condition_arguments.items():
if (
preferred in condition.kwargs
and preferred not in sig.parameters
and actual in sig.parameters
):
args[actual] = args[preferred]
del args[preferred]
if "custom_parameter_getter" in sig.parameters:
try:
dependency = args["dependency"]
parameter = args["parameter"]
except KeyError as e:
raise ValueError(
"Threshold condition did not specify dependency or parameter"
) from e
# self is EvaluableGraph condition is running under,
enode = self.enodes[dependency.id]
evaluable_objects = [
enode.evaluable_inputs,
enode.evaluable_parameters,
enode.evaluable_functions,
enode.evaluable_outputs,
]
valid_parameters = itertools.chain.from_iterable(
[obj for obj in evaluable_objects]
)
args["custom_parameter_validator"] = (
lambda dependency, parameter: parameter in valid_parameters
)
# assumes a unique parameter id among evaluable objects for
# dependency, or will resolve in favor of first with that id
for obj in evaluable_objects:
try:
obj = obj[parameter]
except KeyError:
pass
else:
args["custom_parameter_getter"] = get_custom_parameter_getter(
obj
)
break
else:
raise ValueError(
f"No {parameter} evaluable object for {dependency}, options: {list(valid_parameters)}"
)
return args
# if specified as dict
try:
args = condition["kwargs"]
except (IndexError, TypeError, KeyError):
args = {}
try:
condition = Condition(condition["type"], **args)
except (IndexError, TypeError, KeyError):
pass
cond_type = condition.type
cond_args = copy.copy(condition.kwargs)
try:
typ = getattr(graph_scheduler.condition, cond_type)
except AttributeError as e:
raise ValueError("Unsupported condition type: %s" % cond_type) from e
except TypeError as e:
raise TypeError("Invalid condition dictionary: %s" % condition) from e
for k, v in cond_args.items():
new_v = self.graph.get_node(v)
if new_v is not None:
# arg is a node id
cond_args[k] = new_v
try:
if isinstance(v, list):
# arg is a list all of conditions
new_v = [self.parse_condition(item) for item in v]
else:
# arg is another condition
new_v = self.parse_condition(v)
except (AttributeError, TypeError, ValueError):
try:
# value may be a string representing a TimeScale
cond_args[k] = getattr(
graph_scheduler.TimeScale,
re.match(time_scale_str_regex, v).groups()[1],
)
except (AttributeError, IndexError, TypeError):
pass
else:
cond_args[k] = new_v
cond_args = update_condition_arguments(cond_args, typ)
try:
return typ(**cond_args)
except TypeError as e:
sig = inspect.signature(typ)
try:
var_positional_arg = [
name
for name, param in sig.parameters.items()
if param.kind is inspect.Parameter.VAR_POSITIONAL
][0]
except IndexError:
# other unhandled situation
raise e
else:
try:
cond_args[var_positional_arg]
except KeyError:
# error is due to missing required parameter,
# not named variable positional argument
raise TypeError(f"Condition '{typ.__name__}': {e}")
else:
return typ(
*cond_args[var_positional_arg],
**{
k: v
for k, v in cond_args.items()
if k != var_positional_arg
},
)
[docs]def main(example_file: str, array_format: str = FORMAT_NUMPY, verbose: bool = False):
"""
Main entry point for execution engine.
Args:
example_file: The MDF file to execute.
array_format: The format of arrays to use. Allowed values: 'numpy' or 'tensorflow'.
verbose: Whether to print output to standard out during execution.
"""
from modeci_mdf.utils import load_mdf, print_summary
mod_graph = load_mdf(example_file).graphs[0]
if verbose:
print("Loaded Graph:")
print_summary(mod_graph)
print("------------------")
eg = EvaluableGraph(mod_graph, verbose)
eg.evaluate(array_format=array_format)
return eg
if __name__ == "__main__":
example_file = os.path.join(
os.path.dirname(__file__), "..", "..", "examples/MDF/Simple.json"
)
verbose = True
if len(sys.argv) >= 2:
example_file = sys.argv[1]
if "-v" in sys.argv:
verbose = True
else:
verbose = False
from modelspec.utils import FORMAT_NUMPY, FORMAT_TENSORFLOW
format = FORMAT_TENSORFLOW if "-tf" in sys.argv else FORMAT_NUMPY
print("Executing MDF file %s with scheduler" % example_file)
main(example_file, array_format=format, verbose=verbose)