Source code for lena.flow.elements

"""Elements that work with the flow."""
import collections
import copy
import itertools

import lena.core
from . import functions


[docs]class Count(object): """Count items that pass through. Example: >>> flow = [0, 1, 2] >>> c = Count("my_counter") >>> list(c.run(iter(flow))) == [ ... 0, 1, (2, {'my_counter': 3}) ... ] True """ def __init__(self, name="count", count=0): """*name* is this counter's name (added to context). One can use the default name if *Count* is filled, but it is recommended to provide a meaningful name in a *Run* element. *count* is the initial counter. It is added to all countings. It is set to 0 during :meth:`reset`. *name* and *count* are public attributes. """ self.name = name self.count = count # todo: move to update_context. self._cur_context = {}
[docs] def fill(self, value): """Increase *count* and set current context from *value*.""" self.count += 1 self._cur_context = lena.flow.get_context(value)
[docs] def compute(self): """Yield *(count, context)*. *context* is taken from the last filled value and is updated with *{self.name: self.count}*. """ # compute is idempotent self._cur_context.update({self.name: self.count}) yield (self.count, copy.deepcopy(self._cur_context))
[docs] def fill_into(self, element, value): """Fill *element* with *value* and increase *count*. *value* context is updated with *{self.name: self.count}*. *element* must have a ``fill(value)`` method. """ self.count += 1 data, context = lena.flow.get_data_context(value) context.update({self.name: self.count}) element.fill((data, context))
[docs] def reset(self): """Set *count* to zero. Clear current context.""" # note that we reset not to the initialized count. self.count = 0 self._cur_context = {}
[docs] def run(self, flow): """Yield incoming values and increase *count*. After the flow is exhausted, update last value's context with *{self.name: self.count}*. If the *flow* was empty, nothing is yielded (so *count* can be zero only from :meth:`compute`). """ try: prev_val = next(flow) except StopIteration: # otherwise it will be an error since PEP 479 # https://stackoverflow.com/a/51701040/952234 return # raise StopIteration # todo: add an option to update context with every count, # not only last. But it may be better with a sort of zip. count = 1 for val in flow: yield prev_val count += 1 prev_val = val # we don't overwrite the count field, in case of a simultaneous # filling in another place. self.count += count val = prev_val data, context = lena.flow.get_data_context(val) # we yield the total count from all threads. # This is the same result (mod context) as for compute. context.update({self.name: self.count}) yield (data, context)
def __eq__(self, other): if not isinstance(other, Count): return NotImplemented return (self.name == other.name and self.count == other.count and self._cur_context == other._cur_context) def __repr__(self): return "Count(name=\"{}\", count={})".format(self.name, self.count)
[docs]class RunIf(object): """Run a sequence only for selected values. Note ---- In general, different flows are transformed to common data types (like histograms). In some complicated analyses (like in :class:`.SplitIntoBins`) there can appear values of very different types, for which additional transformation must be run. Use this element in such cases. *RunIf* is similar to :class:`.Filter`, but the latter can be used as a :class:`.FillInto` element inside :class:`.Split`. *RunIf* with a selector *select* (let us call its opposite *not_select*) is equivalent to .. code-block:: python Split( [ ( select, Sequence(*args) ), not_select # not selected values pass unchanged ], bufsize=1, copy_buf=False ) and can be considered "syntactic sugar". Use :class:`.Split` for more flexibility. """ def __init__(self, select, *args): """*select* is a function that accepts a value (maybe with context) and returns a boolean. It is converted to a :class:`.Selector`. See its specifications for available options. *args* are an arbitrary number of elements that will be run for selected values. They are joined into a :class:`.Sequence`. .. versionadded:: 0.4 """ # this element was present in Lena for a long time, # but it was called TransformIf # and was deprecated (undocumented). if isinstance(select, lena.flow.Selector): self._select = select else: try: select = lena.flow.Selector(select) except lena.core.LenaTypeError: raise lena.core.LenaTypeError( "select must be a Selector or convertible to that, " "{} provided".format(select) ) else: self._select = select if len(args) == 1 and isinstance(args[0], lena.core.Sequence): self._seq = args[0] else: try: seq = lena.core.Sequence(*args) except lena.core.LenaTypeError: raise lena.core.LenaTypeError( "args must be a Sequence or convertible to that, " "{} provided".format(*args) ) else: self._seq = seq
[docs] def run(self, flow): """Run the sequence for selected values from the *flow*. Warning ------- *RunIf* disrupts the flow: it feeds values to the sequence one by one, and yields the results. If the sequence depends on the complete flow (for example, yields the maximum element), this will be incorrect. The flow after *RunIf* is not disrupted. Not selected values pass unchanged. """ for val in flow: if self._select(val): for result in self._seq.run([val]): yield result else: yield val
class RunningChunkBy(object): """Split the flow into shifting intersecting chunks.""" def __init__(self, chunk_size, container=tuple, from_iterable=False): """*container* is a callable constructor for new chunks. If *container* initialization requires an iterable, set *from_iterable* to ``True``. Example:: >>> rcb = RunningChunkBy(2) >>> flow = range(5) >>> list(rcb.run(flow)) [(0, 1), (1, 2), (2, 3), (3, 4)] """ # todo: add example of event selection self._cs = chunk_size if not callable(container): raise LenaTypeError( "container must be a callable constructor for new chunks" ) self._container = container self._from_iterable = bool(from_iterable) # todo: create FillInto def run(self, flow): """If the *flow* contains fewer than *chunk_size* values, nothing is yielded. Prepend or append the *flow* with default elements to change this. """ chunk_size = self._cs container = self._container # if flow appears to be a container, this will work correctly flow = iter(flow) chunk = collections.deque(itertools.islice(flow, chunk_size), maxlen=chunk_size) # we can't use isinstance(tuple), because # container is its constructor, not an object if (container == tuple) or self._from_iterable: for val in flow: yield container(chunk) chunk.append(val) # head is popped automatically # flow contained enough elements if len(chunk) == chunk_size: yield container(chunk) else: # e.g. namedtuple for val in flow: yield container(*chunk) chunk.append(val) if len(chunk) == chunk_size: yield container(*chunk)
[docs]class StoreFilled(object): """Store filled items.""" def __init__(self, yield_as_a_group=True): """If *yield_as_a_group* is ``False``, values are yielded one by one in :meth:`compute`. By default they are yielded as a group. A public attribute :attr:`group` allows access to the list of filled values. This class is memory unsafe by definition. It is used mostly for testing purposes. """ # that could be another container (set,...) # if we allow that in the future self.group = [] self._yield_as_a_group = yield_as_a_group
[docs] def fill(self, value): """Add *value* to the collected items.""" self.group.append(value)
def __eq__(self, other): if not isinstance(other, StoreFilled): return NotImplemented return (self.group == other.group and self._yield_as_a_group == other._yield_as_a_group)
[docs] def compute(self): """Yield the collected values.""" if self._yield_as_a_group: # copy, because if we yield several times without reset, # the results will be interdependent yield self.group[:] else: for val in self.group: yield val
[docs] def reset(self): """Clear the group.""" self.group = []
[docs]class End(object): """Stop sequence here."""
[docs] def run(self, flow): """Exhaust all preceding flow and stop iteration (yield nothing to the following flow). """ for val in flow: pass return # otherwise it won't be a generator yield "unreachable"
def __eq__(self, other): return isinstance(other, End) def __repr__(self): return "End()"