import math
__all__ = [
"Memory",
"MemorySubModule",
"DMNoise",
"DMBaseLevel",
"DMSalience",
"DMSpreading",
"DMFixed",
"Partial",
"BlendingMemory",
"DMAssociate",
"DMInhibition",
]
from .buffer import Chunk, Buffer
from .pattern import Pattern
from .model import Model
[docs]class Memory(Model):
def __init__(
self,
buffer,
latency=0.05,
threshold=0,
maximum_time=10.0,
finst_size=4,
finst_time=3.0,
):
Model.__init__(self)
self._buffer = buffer
self.dm = []
self.error = False
self.busy = False
self.adaptors = []
self.latency = latency
self.threshold = threshold
self.maximum_time = maximum_time
self.partials = []
self.finst = Finst(self, size=finst_size, time=finst_time)
self.record_all_chunks = False
self._request_count = 0
def clear(self):
del self.dm[:]
def add(self, chunk, record=None, **keys):
if self.error:
self.error = False
if isinstance(chunk, Buffer):
chunk = Chunk(chunk.chunk)
else:
bound = None
if hasattr(self, "sch"):
bound = getattr(self.sch, "bound", None)
chunk = Chunk(chunk, bound)
for c in self.dm:
if chunk == c:
for a in self.adaptors:
a.merge(c, **keys)
break
else:
for a in self.adaptors:
a.create(chunk, **keys)
self.dm.append(chunk)
chunk.record = record
def find_matching_chunks(self, pattern, threshold=None):
bound = getattr(self.sch, "bound", None)
pattern = Pattern(pattern, bound)
matches = [x for x in self.dm if pattern.match(x) is not None]
if threshold is not None:
matches = [x for x in matches if self.get_activation(x) >= threshold]
return matches
def request(self, pattern, partial=None, require_new=False):
if partial is None and len(self.partials) > 0:
partial = self.partials[0]
self.busy = True
if self.error:
self.error = False
self._request_count += 1
b = getattr(self.sch, "bound", None)
pattern = Pattern(pattern, b, partial=partial)
all = self.dm
if require_new:
all = [x for x in all if not self.finst.contains(x)]
matches = [x for x in all if pattern.match(x) is not None]
# for x in matches: print `x`,self.get_activation(x)
for a in self.adaptors:
a.matched(matches)
if len(matches) == 0:
self.fail(self._request_count)
else:
maximum = None
for chunk in matches:
chunk.activation = self.get_activation(chunk)
if partial is not None:
chunk.activation += chunk._partial
if maximum is None or chunk.activation > maximum:
maximum = chunk.activation
if maximum < self.threshold:
self.fail(self._request_count)
else:
best = [x for x in matches if x.activation == maximum]
choice = self.random.choice(best)
self.recall(choice, matches=matches, request_number=self._request_count)
def fail(self, request_number):
if self.threshold is None:
time = self.maximum_time
else:
time = self.latency * math.exp(-self.threshold)
if time > self.maximum_time:
time = self.maximum_time
yield time
if request_number != self._request_count:
return
self.error = True
self._buffer.clear()
self.busy = False
def recall(self, chunk, matches, request_number):
self.finst.add(chunk)
time = self.latency * math.exp(-chunk.activation)
if time > self.maximum_time:
time = self.maximum_time
yield time
if request_number != self._request_count:
return
self._buffer.set(chunk)
for a in self.adaptors:
a.recalled(chunk)
self.busy = False
def get_activation(self, chunk):
if not isinstance(chunk, Chunk):
try:
chunk = Chunk(chunk, self.sch.bound)
except AttributeError:
chunk = Chunk(chunk, None)
for c in self.dm:
if chunk == c:
chunk = c
break
else:
raise Exception("No such chunk found")
act = 0
for a in self.adaptors:
act += a.activation(chunk)
if self.record_all_chunks or chunk.record is True:
self.log[str(chunk)] = act
return act
def add_adaptor(self, a):
self.adaptors.append(a)
[docs]class Finst:
def __init__(self, parent, size=4, time=3.0):
self.parent = parent
self.size = size
self.time = time
self.obj = []
def contains(self, o):
return o in self.obj
def add(self, o):
if self.size == 0:
return
self.obj.append(o)
if len(self.obj) > self.size:
self.remove(self.obj[0])
self.parent.sch.add(self.remove, args=[o], delay=self.time)
def remove(self, o):
if o in self.obj:
self.obj.remove(o)
[docs]class MemorySubModule:
def __init__(self, parent):
self.parent = parent
if parent is not None:
parent.add_adaptor(self)
def create(self, chunk, **keys):
pass
def merge(self, chunk, **keys):
pass
def matched(self, chunks):
pass
def activation(self, chunk):
return 0
def recalled(self, chunk):
pass
def now(self):
if self.parent is None or not self.parent._is_converted():
return 0
else:
return self.parent.now()
[docs]class DMNoise(MemorySubModule):
def __init__(self, memory, noise=0.3, baseNoise=0.0):
MemorySubModule.__init__(self, memory)
self.noise = noise
self.baseNoise = baseNoise
def create(self, chunk, **keys):
chunk.baseNoise = self.logisticNoise(self.baseNoise)
def activation(self, chunk):
return chunk.baseNoise + self.logisticNoise(self.noise)
def logisticNoise(self, s):
try:
x = self.parent.random.random()
except AttributeError:
import random
x = random.random()
return s * math.log(1.0 / x - 1.0)
[docs]class DMBaseLevel(MemorySubModule):
def __init__(self, memory, decay=0.5, limit=None):
MemorySubModule.__init__(self, memory)
self.decay = decay
self.limit = limit
def create(self, chunk, time=0.0, baselevel=None, **keys):
chunk.creation = self.now()
chunk.times = [chunk.creation - time]
chunk.count = 1
if baselevel is not None and baselevel != "calculate":
chunk.baselevel = baselevel
def merge(self, chunk, time=0.0, baselevel=None, **keys):
chunk.times.append(self.now() - time)
chunk.count += 1
if self.limit is not None and len(chunk.times) > self.limit:
if self.limit == 0:
del chunk.times[:]
else:
chunk.times = chunk.times[-self.limit :]
if baselevel is not None:
if baselevel == "calculate":
del chunk.baselevel
else:
chunk.baselevel = baselevel
def activation(self, chunk):
if hasattr(chunk, "baselevel"):
return chunk.baselevel
if self.decay == None:
return 0
d = self.decay
now = self.now()
t = [now - time for time in chunk.times]
t = [max(L, 0.005) for L in t]
exact = 0.0
approx = 0.0
if len(t) > 0:
exact = sum([math.pow(time, -d) for time in t])
if self.limit is not None:
n = chunk.count
k = len(chunk.times)
if n > k:
tn = now - chunk.creation
if k > 0:
tk = now - chunk.times[0]
else:
tk = 0.0
approx = (
(n - k)
/ (1 - d)
* (math.pow(tn, 1 - d) - math.pow(tk, 1 - d))
/ (tn - tk)
)
B = math.log(exact + approx)
return B
[docs]class DMSpacing(MemorySubModule):
def __init__(self, memory, decayScale=0.0, decayIntercept=0.5):
MemorySubModule.__init__(self, memory)
self.decayScale = decayScale
self.decayIntercept = decayIntercept
def create(self, chunk, time=0.0, **keys):
chunk.times = [(self.actr.time - time, self.decayIntercept)]
def merge(self, chunk, time=0.0, **keys):
t = self.now() - time
m = self.activation(chunk)
d = self.decayScale * math.exp(m) + self.decayIntercept
chunk.times.append((t, d))
def activation(self, chunk):
now = self.now()
total = 0.0
for time, d in chunk.times:
total += math.pow(now - time, -d)
B = math.log(total)
return B
[docs]class DMInhibition(MemorySubModule):
def __init__(self, memory, decayScale=1.0, timeScale=5.0):
MemorySubModule.__init__(self, memory)
self.decayScale = decayScale
self.timeScale = timeScale
def create(self, chunk, time=0.0, **keys):
chunk.mostRecentTime = time
def merge(self, chunk, time=0.0, **keys):
chunk.mostRecentTime = self.now()
def activation(self, chunk):
tn = self.now() - chunk.mostRecentTime
I = -math.log(1 + math.pow(tn, -self.decayScale) / self.timeScale)
return I
[docs]class DMSalience(MemorySubModule):
def __init__(self, memory):
MemorySubModule.__init__(self, memory)
self.histogram = {}
self.weight = {}
self.memory = memory
def weights(self, **weights):
self.histogram = {}
self.weight = {}
for k, v in weights.items():
if k.startswith("_"):
k = int(k[1:])
self.weight[k] = float(v)
self.histogram[k] = {}
def context(self, pattern):
pat = Pattern(pattern)
chunks = [x for x in self.memory.dm if pat.match(x) is not None]
for k, hist in self.histogram.items():
hist.clear()
if len(chunks) == 0:
raise Exception('No chunks match salience pattern: "%s"' % pattern)
dw = 1.0 / len(chunks)
for c in chunks:
for k, hist in self.histogram.items():
val = c.get(k, None)
if val not in hist:
hist[val] = dw
else:
hist[val] += dw
def activation(self, chunk):
act = 0.0
for k, hist in self.histogram.items():
val = chunk.get(k, None)
p = hist[val]
w = self.weight[k]
act += math.log(1.0 / p, 2) * w
# if self.log: self.log.act[`chunk`]=act
return act
[docs]class DMSpreading(MemorySubModule):
def __init__(self, memory, *buffers):
MemorySubModule.__init__(self, memory)
self.strength = 1
self.buffers = buffers
self.weight = {}
for b in buffers:
self.weight[b] = 1
self.slots = {}
def create(self, chunk, **keys):
for slot in chunk.values():
if slot in self.slots:
self.slots[slot].append(chunk)
else:
self.slots[slot] = [chunk]
def activation(self, chunk):
values = chunk.values()
total = 0.0
for b in self.buffers:
ch = b.chunk
if ch is not None:
w = self.weight[b]
for key, slot in ch.items():
if slot in values:
s = self.strength - math.log(len(self.slots[slot]) + 1)
total += w * s
return total
[docs]class DMFixed(MemorySubModule):
def __init__(self, memory, default=0):
MemorySubModule.__init__(self, memory)
self.default = default
def create(self, chunk, fixed=None, **keys):
if fixed is None:
fixed = self.default
chunk.fixed = fixed
def merge(self, chunk, fixed=0, **keys):
chunk.fixed += fixed
def activation(self, chunk):
return chunk.fixed
[docs]class Associated:
def __init__(self, a, b):
self.a = a
self.b = b
[docs]class DMAssociate(MemorySubModule):
def __init__(self, memory, buffer, weight=1, decay=0.5, limit=None):
MemorySubModule.__init__(self, memory)
self._buffer = buffer
self._mem = {}
self._bl = DMBaseLevel(None, decay=decay, limit=limit)
self.weight = weight
def set_association(self, pre, post, baselevel):
c = self._mem.get((pre, post), None)
if c == None:
c = Associated(pre, post)
self._bl.create(c)
self._mem[(pre, post)] = c
c.baselevel = baselevel
def recalled(self, chunk):
prechunk = self._buffer.chunk
if prechunk is None:
return
for pk, pv in prechunk.items():
for k, v in chunk.items():
c = self._mem.get((pv, v), None)
if c == None:
c = Associated(pv, v)
self._bl.create(c)
self._mem[(pv, v)] = c
else:
self._bl.merge(c)
def activation(self, chunk):
act = 0
prechunk = self._buffer.chunk
if prechunk is None:
return 0
for pk, pv in prechunk.items():
for k, v in chunk.items():
c = self._mem.get((pv, v), None)
if c is not None:
act += self._bl.activation(c)
return act * self.weight
[docs]class Partial:
def __init__(self, memory, strength=1.0, limit=-1.0):
self._memory = memory
self.limit = limit
self.strength = strength
self.sims = {}
memory.partials.append(self)
def similarity(self, a, b, value):
self.sims[a, b] = value
self.sims[b, a] = value
def request(self, pattern):
self._memory.request(pattern, partial=self)
def match(self, key, a, b):
m = self.sims.get((a, b), self.limit)
p = self.strength
return p * m
[docs]class BlendingMemory(Memory):
def recall(self, chunk, matches, request_number):
keys = list(sorted(chunk.keys()))
if not hasattr(chunk, "_blend_keys"):
bk = []
for k in keys:
try:
v = float(chunk[k])
bk.append(k)
except:
pass
chunk._blend_keys = tuple(bk)
bk = chunk._blend_keys
if len(bk) > 0:
a = chunk.activation
chunk = Chunk(chunk)
chunk.activation = a
for b in bk:
chunk[b] = 0
total_activation = 0
for m in matches:
m.exp_activation = math.exp(m.activation)
k = list(sorted(m.keys()))
if k == keys:
blend = {}
try:
for b in bk:
blend[b] = float(m[b])
except ValueError:
continue
for b in bk:
chunk[b] += blend[b] * m.exp_activation
total_activation += m.exp_activation
for b in bk:
chunk[b] /= total_activation
for m in Memory.recall(self, chunk, matches, request_number):
yield m