Исходный код lena.flow.elements

"""Elements which work with the flow."""
import copy

import lena.core
from . import functions


class Count(object):
    """Count items that pass through.

    After the flow is exhausted, add {*name*: count} to the *context*.

    Example:

    >>> flow = [1, 2, 3, "foo"]
    >>> c = Count("counter")
    >>> list(c.run(iter(flow))) == [
    ...     1, 2, 3, ('foo', {'counter': 4})
    ... ]
    True
    """

    def __init__(self, name="counter"):
        """*name* is this Counter's name."""

        self._name = name
        self._count = 0
        self._cur_context = {}

    def fill(self, value):
        """Increase counter and set context from *value*."""
        self._count += 1
        self._cur_context = lena.flow.get_context(value)

    def compute(self):
        """Yield *(count, context)* and reset self."""
        self._cur_context.update({self._name: self._count})
        yield (self._count, copy.deepcopy(self._cur_context))
        # reset
        self._count = 0
        self._cur_context = {}

    def fill_into(self, element, value):
        """Fill *element* with *value* and increase counter.

        *value* context is updated with the counter.

        Element must have a ``fill(value)`` method.
        """
        self._count += 1
        data, context = lena.flow.get_data_context(value)
        context.update({self._name: self._count})
        element.fill((data, context))

    def run(self, flow):
        """Yield incoming values and increase counter.

        When the incoming flow is exhausted,
        update last value's context with *(count, context)*.

        If the flow was empty, nothing is yielded
        (so *count* can be zero only from :meth:`compute`).
        """
        try:
            prev_val = next(flow)
        except StopIteration:
            # otherwise it will be an error since PEP 479
            # https://stackoverflow.com/a/51701040/952234
            return
            # raise StopIteration
        # todo: add an option to update context with every count,
        # not only last
        count = 1
        for val in flow:
            yield prev_val
            count += 1
            prev_val = val
        val = prev_val
        data, context = lena.flow.get_data_context(val)
        context.update({self._name: count})
        yield (data, context)


class TransformIf(object):
    """Transform selected flow.

    In general this sequence should not be used,
    and different flows should be transformed
    to common data types (like Histograms) before merging.
    In some cases, however, there emerge values of very different types
    (like in :class:`.SplitIntoBins`),
    and this class may be useful.
    Todo: probably it should be structure-transparent
    (that is work for histogram content directly)

    Warning
    -------
        This class may be changed or deleted.
    """

    def __init__(self, select, seq):
        """*select* is converted to :class:`.Selector`.
        See its specifications for available options.

        *seq* is converted to :class:`.Sequence`.
        """
        if isinstance(select, lena.flow.Selector):
            self._select = select
        else:
            try:
                select = lena.flow.Selector(select)
            except lena.core.LenaTypeError:
                raise lena.core.LenaTypeError(
                    "select must be a Selector or convertible to that, "
                    "{} provided".format(select)
                )
            else:
                self._select = select
        if isinstance(seq, lena.core.Sequence):
            self._seq = seq
        else:
            try:
                seq = lena.core.Sequence(seq)
            except lena.core.LenaTypeError:
                raise lena.core.LenaTypeError(
                    "seq must be a Sequence or convertible to that, "
                    "{} provided".format(seq)
                )
            else:
                self._seq = seq

    def run(self, flow):
        """Transform selected flow.

        Not selected values pass unchanged.
        """
        for value in flow:
            if self._select(value):
                # transform with self._seq
                for result in self._seq.run([value]):
                    yield result
            else:
                # not selected pass unchanged
                yield value


[документация]class End(object): """Stop sequence here."""
[документация] def run(self, flow): """Exhaust all preceding flow and stop iteration (yield nothing to the following flow). """ for val in flow: pass return # otherwise it won't be a generator yield "unreachable"