Source code for lena.flow.cache

"""Cache (pickle) flow."""
import sys
import os
import pickle

import lena.core 

if sys.version_info.major == 2:
    import cPickle


[docs]class Cache(object): """Cache flow passing through. On the first run, dump all flow to file (and yield the flow unaltered). On subsequent runs, load all flow from that file in the original order. Example:: s = Source( ReadFiles(), ReadEvents(), MakeHistograms(), Cache("histograms.pkl"), MakeStats(), Cache("stats.pkl"), ) If *stats.pkl* exists, :class:`Cache` will read data flow from that file and no other processing will be done. If the *stats.pkl* cache doesn't exist, but the cache for histograms exist, it will be used and no previous processing (from *ReadFiles* to *MakeHistograms*) will occur. If both caches are not filled yet, processing will run as usually. Only pickleable objects can be cached (otherwise a *pickle.PickleError* is raised). Warning ------- The pickle module is not secure against erroneous or maliciously constructed data. Never unpickle data from an untrusted source. """ def __init__(self, filename, method="cPickle", protocol=2): """*filename* is the name of file where to store the cache. You can give it *.pkl* extension. *method* can be *pickle* or *cPickle* (faster pickle). For Python 3 they are same. *protocol* is pickle protocol. Version 2 is the highest supported by Python 2. Version 0 is "human-readable" (as noted in the documentation). 3 is recommended if compatibility between Python 3 versions is needed. 4 was added in Python 3.4. It adds support for very large objects, pickling more kinds of objects, and some data format optimizations. """ if method == "pickle" or sys.version_info.major > 2: self._dump = pickle.dump self._load = pickle.load elif method == "cPickle": self._dump = cPickle.dump self._load = cPickle.load else: raise lena.core.LenaValueError( "Cache method should be one of pickle of cPickle." ) self._filename = filename self._method = method self._protocol = protocol
[docs] def cache_exists(self): """Return ``True`` if file with cache exists and is readable.""" return os.access(self._filename, os.R_OK)
[docs] def drop_cache(self): """Remove file with cache if that exists, pass otherwise. If cache exists and is readable, but could not be deleted, :exc:`.LenaEnvironmentError` is raised.""" try: os.remove(self._filename) except OSError: if self.cache_exists(): raise lena.core.LenaEnvironmentError( "Cache {}".format(self._filename) + " exists and readable, but can't be removed" )
[docs] def run(self, flow): """Load cache or fill it. If we can read *filename*, load flow from there. Otherwise use the incoming *flow* and fill the cache. All loaded or passing items are yielded. """ if self.cache_exists(): # race condition in this implementation # (if file is changed or deleted right after the check) return self._load_flow() else: return self._pass_and_dump_flow(flow)
[docs] @staticmethod def alter_sequence(seq): """If the Sequence *seq* contains a :class:`Cache`, which has an up-to-date cache, a :class:`.Source` is built based on the flattened *seq* and returned. Otherwise the *seq* is returned unchanged. """ # it will check for any Caches, not just this one import lena.flow, lena.core orig_seq = seq seq = lena.core.flatten(seq) last_cache_filled_ind = None if hasattr(seq, '__iter__'): for ind in reversed(range(len(seq))): el = seq[ind] if isinstance(el, lena.flow.Cache): if el.cache_exists(): last_cache_filled_ind = ind break if last_cache_filled_ind is not None: return lena.core.Source( lena.core.SourceEl(seq[last_cache_filled_ind], call="_load_flow"), *seq[last_cache_filled_ind+1:] ) else: # Cache element if isinstance(seq, Cache): if seq.cache_exists(): return lena.core.Source( lena.core.SourceEl(seq, call="_load_flow") ) return orig_seq
def _load_flow(self): """Load flow from self.filename.""" with open(self._filename, "rb") as f: while True: try: val = self._load(f) yield val except EOFError: break def _pass_and_dump_flow(self, flow): """Dump flow into self.filename. Flow is simultaneously yielded. """ with open(self._filename, "wb") as f: dump = lambda val: self._dump(val, f, self._protocol) for val in flow: dump(val) yield val