Source code for modeci_mdf.functions.actr.ccm.logger

import atexit
import time
import bisect
import sys
import random
import os

using_java = False
try:
    from java.io import File

    using_java = True
except:
    pass


[docs]def file_exists(filename): if hasattr(os, "access"): return os.access(filename, os.F_OK) elif using_java: return File(filename).exists()
try: basestring # For Python 2 compatibility except NameError: basestring = str # For Python 3 compatibility
[docs]class Trace: def __init__(self): self.data = dict(time=[(0, 0.0)]) self.index = 1 self.type = {} def add(self, key, value): if not isinstance(value, (bool, int, float, basestring, type(None))): value = repr(value) if type(value) == str: if value.startswith("<"): return if not self.data.has_key(key): self.data[key] = [] self.data[key].append((self.index, value)) self.index += 1 def keys(self): return self.data.keys() def fixed_keys(self): return [k for k, v in self.data.items() if len(v) == 1] def get_final(self, key): return self.data[key][-1][1] def get_pts(self, vars): pts = [] for v in vars: for i, val in self.data[v]: if i not in pts: pts.append(i) pts.sort() return pts def merge_pts(self, pts, key): times = [x[0] for x in self.data[key]] i = 0 for t in times: if i >= len(pts) or t < pts[i]: continue while i < len(pts) - 1 and pts[i + 1] < t: del pts[i] i += 1 while i < len(pts) - 1: del pts[i] def group_pts(self, pts, key): times = [x[0] for x in self.data[key]] group = [] i = 0 for p in pts: if i >= len(times) or p < times[i]: group.append(p) else: if len(group) > 0: yield group group = [p] while i < len(times) and p >= times[i]: i += 1 if len(group) > 0: yield group def get_at(self, name, time): d = self.data[name] i = bisect.bisect(d, (time + 0.5,)) - 1 if i < 0: return None if i >= len(d) - 1: i = -1 return d[i][1] def __nonzero__(self): return self.index != 1
[docs]class Log: def __init__(self): self.do_screen = True self.do_html = False self.do_summary = False self.do_data = False self.directory = sys.argv[0] self.using_default_directory = True if self.directory.endswith(".py"): self.directory = self.directory[:-3] self.start_time = time.time() self.last_flush = self.start_time self.reset() def use_directory(self, dir): self.directory = dir self.using_default_directory = False def set(self, key, value): if key == "time": self.time = value if self.do_screen: self.display_value(key, value) if self.do_data or self.do_summary: self.data[key] = value if self.do_html: self.trace.add(key, value) def display_value(self, key, value): if key != "time": print(f"{self.time:8.3f} {key} {value}") def display_all(self): if self.time > 0: print("Total time: %8.3f" % self.time) items = self.data.items() items.sort() for k, v in items: if k != "time": print(f" {k} {v}") def get_time_code(self): t = time.strftime("%Y%m%d-%H%M%S", time.localtime(self.start_time)) return f"{t}-{self.id}" def ensure_directory_exists(self): if not file_exists(self.directory + "/"): os.makedirs(self.directory) def reset(self): self.trace = Trace() self.data = {} self.time = 0 self.id = "%08x" % int(random.randrange(0x7FFFFFFF))
[docs]class DummyLog: def set(self, key, value): pass def __nonzero__(self): return False def __setattr__(self, key, value): pass def __getattr__(self, key): if key[0] != "_": return self else: raise AttributeError(key) def __setitem__(self, key, value): pass def __getitem__(self, key): return self
dummy = DummyLog()
[docs]class LogProxy: def __init__(self, log, prefix=""): self._log = log if prefix.startswith("."): prefix = prefix[1:] self._prefix = prefix self._sub = {} def __setattr__(self, key, value): if key.startswith("_"): if key == "_": self._set(value) else: self.__dict__[key] = value else: getattr(self, key)._set(value) def __getattr__(self, key): if key.startswith("_"): if key == "_": return self if key in self.__dict__: return self.__dict__[key] raise AttributeError(key) else: s = LogProxy(self._log, f"{self._prefix}.{key}") self.__dict__[key] = s return s def __setitem__(self, key, value): self[key]._set(value) def __getitem__(self, key): s = self._sub.get(key, None) if s is not None: return s s = LogProxy(self._log, f"{self._prefix}[{key}]") self._sub[key] = s return s def _set(self, value): try: if not isinstance(value, (int, float, bool, basestring, tuple, list, dict)): value = repr(value) except TypeError: value = repr(value) self._log.set(self._prefix, value) def __nonzero__(self): return True
singleton_log = Log() log_proxy = LogProxy(singleton_log)
[docs]def log(screen=None, html=None, data=None, summary=None, directory=None): if screen is not None: singleton_log.do_screen = screen if html is not None: singleton_log.do_html = html if data is not None: singleton_log.do_data = data if summary is not None: singleton_log.do_summary = summary if directory is not None: singleton_log.use_directory(directory) return log_proxy
pending_output = []
[docs]def finished(flush=True): log = singleton_log has_data = log.data or log.trace if has_data: if log.do_summary: log.display_all() if log.do_data: log.ensure_directory_exists() fn = f"{log.directory}/{log.get_time_code()}.data" pending_output.append((fn, log.data)) if log.do_html: log.ensure_directory_exists() from ccm.ui.htmltrace import HTMLTrace html = HTMLTrace(log.trace) html.generate(f"{log.directory}/{log.get_time_code()}.html") if flush or time.time() - log.last_flush > 10: for fn, data in pending_output: f = file(fn, "w") items = data.items() items.sort() for k, v in items: f.write(f"{k}={v}\n") f.close() del pending_output[:] log.last_flush = time.time() log.reset()
atexit.register(finished, flush=True)