Source code for lena.flow.cache

"""Cache (pickle) the flow."""
import os
import pickle
import sys

import lena.core 
import lena.context

if sys.version_info.major == 2:
    import cPickle

[docs]class Cache(object): """Cache the flow passing through. On the first run, dump the whole flow to a file (and yield the flow unaltered). On subsequent runs, load the flow from that file in the original order. Example:: s = Source( ReadFiles(), ReadEvents(), MakeHistograms(), Cache("histograms.pkl"), MakeStats(), Cache("stats.pkl"), ) If *stats.pkl* exists, :class:`Cache` will read the data from that file and no other processing will be done. If the *stats.pkl* cache doesn't exist, but the cache for histograms exists, it will be used and no previous processing (from *ReadFiles* to *MakeHistograms*) will occur. If both caches were not filled yet, processing will go as usual. Only pickleable objects can be cached (otherwise a *pickle.PickleError* will be raised). Warning ------- The pickle module is not secure against erroneous or maliciously constructed data. Never unpickle data from an untrusted source. """ def __init__(self, filename, recompute=False, method="cPickle", protocol=2): """*filename* is the name of file where to store the cache. It can be given *.pkl* extension. If *recompute* is ``True``, an existing cache will always be overwritten. This option is typically used if one wants to define cache behaviour from the command line. *method* can be *pickle* or *cPickle* (faster pickle). For Python 3 they are same. *protocol* is pickle protocol. Version 2 is the highest supported by Python 2. Version 0 is "human-readable" (as noted in the documentation). 3 is recommended if compatibility between Python 3 versions is needed. 4 was added in Python 3.4. It adds support for very large objects, pickling more kinds of objects, and some data format optimizations. """ if method == "pickle" or sys.version_info.major > 2: self._dump = pickle.dump self._load = pickle.load elif method == "cPickle": self._dump = cPickle.dump self._load = cPickle.load else: raise lena.core.LenaValueError( "Cache method should be one of pickle of cPickle." ) self._method = method self._filename = filename self._orig_filename = filename if '{' in filename: self._format_context = lena.context.format_context(filename) self.protocol = protocol self._recompute = recompute # used by meta elements self.is_cache = True cache_dir = os.path.dirname(self._filename) if cache_dir: # could be empty for files in current directory if sys.version_info.major == 2: # race condition, no good solution in Python 2. if not os.path.exists(cache_dir): os.makedirs(cache_dir) else: # Python 3 optimal way os.makedirs(cache_dir, exist_ok=True)
[docs] def cache_exists(self): """Return ``True`` if file with cache exists and is readable. If *recompute* was ``True`` during the initialization, pretend that cache does not exist (return ``False``). """ if self._recompute: return False return os.access(self._filename, os.R_OK)
[docs] def drop_cache(self): """Remove file with cache if that exists, pass otherwise. If cache exists and is readable, but could not be deleted, :exc:`.LenaEnvironmentError` is raised.""" try: os.remove(self._filename) except OSError as err: if self.cache_exists(): raise lena.core.LenaEnvironmentError( "Cache {}".format(self._filename) + " exists and readable, but can't be removed" ) raise err
def __repr__(self): if self.cache_exists(): cache_exists = "[cache exists]" else: cache_exists = "[cache does not exist or will be recomputed]" return ( """Cache("{}" + "{}"*0, recompute={}, method="{}", protocol={})""". format(self._filename, cache_exists, self._recompute, self._method, self.protocol) )
[docs] def run(self, flow): """Load cache or fill it. If we can read *filename*, load flow from there. Otherwise use the incoming *flow* and fill the cache. All loaded or passing items are yielded. """ if self.cache_exists(): # Load cache, ignore flow. # Race condition # (if file is changed or deleted right after the check), # but it will be always present because of lazy evaluation. return self._load_flow() # can't copy code here due to unknown reasons (stops working) return self._dump_flow_and_yield(flow)
def _set_context(self, context): # copied from output.Write if '{' not in self._orig_filename: return try: filename = self._format_context(context) except lena.core.LenaKeyError: pass else: self._filename = filename
[docs] @staticmethod def alter_sequence(seq): """If the Sequence *seq* contains a :class:`Cache`, which has an up-to-date cache, a :class:`.Source` is built based on the flattened *seq* and returned. Otherwise the *seq* is returned unchanged. """ # it will check for any Caches, not just this one import lena.flow, lena.core orig_seq = seq seq = lena.core.flatten(seq) if hasattr(seq, '__iter__'): last_cache_filled_ind = None for ind in reversed(range(len(seq))): el = seq[ind] if isinstance(el, lena.flow.Cache): if el.cache_exists(): last_cache_filled_ind = ind break if last_cache_filled_ind is not None: return lena.core.Source( lena.core.SourceEl(seq[last_cache_filled_ind], call="_load_flow"), *seq[last_cache_filled_ind+1:] ) else: # Cache element if isinstance(seq, Cache): if seq.cache_exists(): return lena.core.Source( lena.core.SourceEl(seq, call="_load_flow") ) return orig_seq
def _dump_flow_and_yield(self, flow): # fill cache and yield values with open(self._filename, "wb") as f: dump = lambda val: self._dump(val, f, self.protocol) for val in flow: # if there were an error in a next element, # our value will be saved first (before yielding) dump(val) yield val def _load_flow(self): """Load flow from self.filename.""" with open(self._filename, "rb") as f: while True: try: val = self._load(f) yield val except EOFError: break