Source code for modeci_mdf.interfaces.actr.importer

"""
    Code for importing ACT-R models into MDF.
"""
from modeci_mdf.mdf import *


[docs]def build_model() -> Model: """Builds the base model of the ACT-R production system without any chunks, goals, or productions specified. Returns: An MDF model object representing the core ACT-R production system. """ mod = Model(id="ACT-R Base") mod_graph = Graph(id="actr_base") mod.graphs.append(mod_graph) # Declarative memory dm_node = Node(id="declarative_memory") dm_node.parameters.append(Parameter(id="chunks", value=[])) dm_node.parameters.append(Parameter(id="chunk_types", value=[])) dm_ip = InputPort(id="dm_input") dm_node.input_ports.append(dm_ip) retrieval_f = Parameter( id="retrieve_chunk", function="retrieve_chunk", args={"pattern": dm_ip.id, "dm_chunks": "chunks", "types": "chunk_types"}, ) dm_node.parameters.append(retrieval_f) dm_op = OutputPort(id="dm_output", value=retrieval_f.id) dm_node.output_ports.append(dm_op) mod_graph.nodes.append(dm_node) # Retrieval buffer retrieval_node = Node(id="retrieval_buffer") retrieval_ip = InputPort(id="retrieval_input") retrieval_node.input_ports.append(retrieval_ip) retrieval_op = OutputPort(id="retrieval_output", value=retrieval_ip.id) retrieval_node.output_ports.append(retrieval_op) mod_graph.nodes.append(retrieval_node) # Goal buffer with state goal_node = Node(id="goal_buffer") goal_node.parameters.append(Parameter(id="first_goal", value={})) goal_ip = InputPort(id="goal_input") goal_node.input_ports.append(goal_ip) goal_f = Parameter( id="change_goal", function="change_goal", args={"pattern": goal_ip.id, "curr_goal": "goal_state"}, ) goal_node.parameters.append(goal_f) goal_state = Parameter( id="goal_state", default_initial_value="first_goal", value=f"first_goal if {goal_ip.id} == {{}} else {goal_f.id}", ) goal_node.parameters.append(goal_state) goal_op = OutputPort(id="goal_output", value=goal_state.id) goal_node.output_ports.append(goal_op) mod_graph.nodes.append(goal_node) # Procedural memory pm_node = Node(id="procedural_memory") pm_node.parameters.append(Parameter(id="productions", value=[])) pm_op = OutputPort(id="pm_output", value="productions") pm_node.output_ports.append(pm_op) mod_graph.nodes.append(pm_node) # Pattern matching pattern_node = Node(id="pattern_matching") pattern_ip1 = InputPort(id="pattern_input_from_pm") pattern_ip2 = InputPort(id="pattern_input_from_goal") pattern_ip3 = InputPort(id="pattern_input_from_retrieval") pattern_node.input_ports.extend([pattern_ip1, pattern_ip2, pattern_ip3]) pattern_f = Parameter( id="pattern_matching_function", function="pattern_matching_function", args={ "productions": pattern_ip1.id, "goal": pattern_ip2.id, "retrieval": pattern_ip3.id, }, ) pattern_node.parameters.append(pattern_f) pattern_op = OutputPort(id="pattern_output", value=pattern_f.id) pattern_node.output_ports.append(pattern_op) mod_graph.nodes.append(pattern_node) # Conflict resolution conflict_node = Node(id="conflict_resolution") conflict_ip = InputPort(id="conflict_input") conflict_node.input_ports.append(conflict_ip) conflict_f = Parameter( id="conflict_resolution_function", function="conflict_resolution_function", args={"productions": conflict_ip.id}, ) conflict_node.parameters.append(conflict_f) conflict_op1 = OutputPort(id="conflict_output_to_fire_prod", value=conflict_f.id) conflict_op2 = OutputPort(id="conflict_output_to_check", value=conflict_f.id) conflict_node.output_ports.extend([conflict_op1, conflict_op2]) mod_graph.nodes.append(conflict_node) # Node for firing productions fire_prod_node = Node(id="fire_production") fire_prod_ip = InputPort(id="fire_prod_input") fire_prod_node.input_ports.append(fire_prod_ip) fire_prod_f1 = Parameter( id="update_goal", function="update_goal", args={"production": fire_prod_ip.id} ) fire_prod_f2 = Parameter( id="update_retrieval", function="update_retrieval", args={"production": fire_prod_ip.id}, ) fire_prod_node.parameters.extend([fire_prod_f1, fire_prod_f2]) fire_prod_op1 = OutputPort(id="fire_prod_output_to_goal", value=fire_prod_f1.id) fire_prod_op2 = OutputPort( id="fire_prod_output_to_retrieval", value=fire_prod_f2.id ) fire_prod_node.output_ports.extend([fire_prod_op1, fire_prod_op2]) mod_graph.nodes.append(fire_prod_node) # Node to check termination check_node = Node(id="check_termination") check_ip = InputPort(id="check_input") check_node.input_ports.append(check_ip) check_f = Parameter( id="check_termination", function="check_termination", args={"production": check_ip.id}, ) check_node.parameters.append(check_f) check_op = OutputPort(id="check_output", value=check_f.id) check_node.output_ports.append(check_op) mod_graph.nodes.append(check_node) # Edges dm_to_retrieval = Edge( id="dm_to_pattern_edge", sender=dm_node.id, sender_port=dm_op.id, receiver=retrieval_node.id, receiver_port=retrieval_ip.id, ) mod_graph.edges.append(dm_to_retrieval) retrieval_to_pattern = Edge( id="retrieval_to_pattern_edge", sender=retrieval_node.id, sender_port=retrieval_op.id, receiver=pattern_node.id, receiver_port=pattern_ip3.id, ) mod_graph.edges.append(retrieval_to_pattern) goal_to_pattern = Edge( id="goal_to_pattern_edge", sender=goal_node.id, sender_port=goal_op.id, receiver=pattern_node.id, receiver_port=pattern_ip2.id, ) mod_graph.edges.append(goal_to_pattern) pm_to_pattern = Edge( id="pm_to_pattern_edge", sender=pm_node.id, sender_port=pm_op.id, receiver=pattern_node.id, receiver_port=pattern_ip1.id, ) mod_graph.edges.append(pm_to_pattern) pattern_to_conflict = Edge( id="pattern_to_conflict_edge", sender=pattern_node.id, sender_port=pattern_op.id, receiver=conflict_node.id, receiver_port=conflict_ip.id, ) mod_graph.edges.append(pattern_to_conflict) conflict_to_fire_prod = Edge( id="conflict_to_fire_prod_edge", sender=conflict_node.id, sender_port=conflict_op1.id, receiver=fire_prod_node.id, receiver_port=fire_prod_ip.id, ) mod_graph.edges.append(conflict_to_fire_prod) conflict_to_check = Edge( id="conflict_to_check_edge", sender=conflict_node.id, sender_port=conflict_op1.id, receiver=check_node.id, receiver_port=check_ip.id, ) mod_graph.edges.append(conflict_to_check) # Conditions cond_dm = Condition(type="Always") cond_retrieval = Condition(type="JustRan", dependencies=dm_node.id) cond_goal = Condition(type="Always") cond_pm = Condition(type="Always") cond_pattern = Condition( type="And", dependencies=[ Condition(type="EveryNCalls", dependencies=retrieval_node.id, n=1), Condition(type="EveryNCalls", dependencies=goal_node.id, n=1), Condition(type="EveryNCalls", dependencies=dm_node.id, n=1), ], ) cond_conflict = Condition(type="JustRan", dependencies=pattern_node.id) cond_fire_prod = Condition(type="JustRan", dependencies=conflict_node.id) cond_check = Condition(type="JustRan", dependencies=conflict_node.id) cond_term = Condition(type="JustRan", dependencies=[check_node.id]) mod_graph.conditions = ConditionSet( node_specific={ dm_node.id: cond_dm, retrieval_node.id: cond_retrieval, goal_node.id: cond_goal, pm_node.id: cond_pm, pattern_node.id: cond_pattern, conflict_node.id: cond_conflict, fire_prod_node.id: cond_fire_prod, check_node.id: cond_check, }, termination={"check_term_true": cond_term}, ) return mod
[docs]def actr_to_mdf(file_name: str): """Parses an ACT-R .lisp model file and outputs MDF .json and .yaml files. Args: file_name: The name of the ACT-R model file ending in .lisp. """ with open(file_name) as actr_file: mod = build_model() add_dm = False add_lhs = False add_rhs = False chunk_types = {} dm = [] pm = [] goal = None curr_prod = None curr_pattern = None for l in actr_file: line = l.strip() # Add the chunk types if line.startswith("(chunk-type"): tokens = line.replace("(", "").replace(")", "").split(" ") chunk_types[tokens[1]] = tokens[2:] # Add the chunks to declarative memory elif line.startswith("(add-dm"): add_dm = True elif add_dm: if line == "": add_dm = False else: chunk = {} tokens = line.replace("(", "").replace(")", "").split(" ") chunk["name"] = tokens[0] for i in range(1, len(tokens), 2): chunk[tokens[i]] = tokens[i + 1] dm.append(chunk) # Add the goal to the goal buffer elif line.startswith("(goal-focus"): goal_name = line[12:-1] for chunk in dm: if chunk["name"] == goal_name: goal = chunk break for k in chunk_types[goal["ISA"]]: if k not in goal.keys(): goal[k] = "nil" # Add the productions to procedural memory elif line.startswith(("(P", "(p")): curr_prod = {"name": line[3:], "lhs": [], "rhs": []} pm.append(curr_prod) add_lhs = True # Left hand side of production elif add_lhs: if line == "==>": add_lhs = False add_rhs = True elif line.endswith(">"): curr_pattern = {"buffer": line[1:-1]} curr_prod["lhs"].append(curr_pattern) else: tokens = [t for t in line.split(" ") if t != ""] if tokens[0] == "-": curr_pattern[tokens[1]] = "-" + tokens[2] else: curr_pattern[tokens[0]] = tokens[1] # Right hand side of production elif add_rhs: if line == "" or line == ")" or "!output!" in line: add_rhs = False elif line.endswith(">"): curr_pattern = {"buffer": line[1:-1]} curr_prod["rhs"].append(curr_pattern) else: tokens = [t for t in line.split(" ") if t != ""] if len(tokens) == 3: curr_pattern[tokens[0] + tokens[1]] = tokens[2] else: curr_pattern[tokens[0]] = tokens[1] # Update production patterns for prod in pm: lhs = [] for pattern in prod["lhs"]: isa = pattern["ISA"] final_pattern = {"buffer": pattern["buffer"], "ISA": isa} for k in chunk_types[isa]: if k in pattern.keys(): final_pattern[k] = pattern[k] else: final_pattern[k] = "=" + k lhs.append(final_pattern) prod["lhs"] = lhs # Generate MDF files mod.id = ( file_name[file_name.rindex("/") + 1 : -5] if "/" in file_name else file_name[:-5] ) mod.graphs[0].id = mod.id + "_graph" mod.graphs[0].get_node("declarative_memory").get_parameter("chunks").value = dm mod.graphs[0].get_node("declarative_memory").get_parameter( "chunk_types" ).value = chunk_types mod.graphs[0].get_node("procedural_memory").get_parameter( "productions" ).value = pm mod.graphs[0].get_node("goal_buffer").get_parameter("first_goal").value = goal mod.to_json_file("%s.json" % file_name[:-5]) mod.to_yaml_file("%s.yaml" % file_name[:-5]) return mod