Исходный код lena.core.adapters

"""Adapters allow to use existing objects as Lena core elements.

Adapters can be used for several purposes:

    - provide an unusual name for a method (*Run(my_obj, run="my_run")*).
    - hide unused methods to prevent ambiguity.
    - automatically convert objects of one type to another in sequences (*FillCompute* to *Run*).
    - explicitly cast object of one type to another (*FillRequest* to *FillCompute*).

Example:

>>> class MyEl(object):
...     def my_run(self, flow):
...         for val in flow:
...             yield val
...
>>> my_run = Run(MyEl(), run="my_run")
>>> list(my_run.run([1, 2, 3]))
[1, 2, 3]
"""
import itertools

from . import exceptions
from . import check_sequence_type as ct # it is a module, no need to hide

# pylint: disable=method-hidden
# We leave method stubs in the adapters,
# because they will appear in their documentation,
# even though the actual code will be overwritten.

# class _MISSING_TYPE:
#     pass
# _SENTINEL = _MISSING_TYPE()
# This looks better in automatic documentation:
_SENTINEL = object()
# Sentinel is used when it is important to know
# whether it was passed as a keyword argument, for example,
# when an element can be used without this method (type cast).
# Otherwise use just the method's name.


def _init_callable(self, el, call):
    """Common initializer for Call and SourceEl."""
    if call is _SENTINEL:
        # try to find call in el
        if callable(el):
            self._call = el  # pylint: disable=protected-access
        else:
            raise exceptions.LenaTypeError(
                "provide a callable method or a callable element, "
                "{} given".format(el)
            )
    else:
        # get call by its name
        el_call = getattr(el, call, None)
        if callable(el_call):
            self._call = el_call  # pylint: disable=protected-access
        else:
            raise exceptions.LenaTypeError(
                "call method {} of {} must exist and be callable"
                .format(call, el)
            )
    self._el = el


[документация]class Call(object): """Adapter to provide *__call__(value)* method. Name of the actually called method can be customized during the initialization. The method *__call__(value)* is a simple (preferably pure) function, which accepts a *value* and returns its transformation. """ def __init__(self, el, call=_SENTINEL): """Element *el* must contain a callable method *call* or be callable itself. If *call* method name is not provided, it is checked whether *el* is callable itself. If :class:`Call` failed to instantiate with *el* and *call*, :exc:`.LenaTypeError` is raised. """ _init_callable(self, el, call)
[документация] def __call__(self, value): """Transform the *value* and return.""" # This fix is needed only for a special method, # https://docs.python.org/3/reference/datamodel.html#special-lookup # Methods with other names will be monkey-patched correctly. return self._call(value) # pylint: disable=no-member
[документация]class FillCompute(object): """Adapter for a *FillCompute* element. A *FillCompute* element has methods *fill(value)* and *compute()*. """ def __init__(self, el, fill="fill", compute="compute"): """Method names can be customized through *fill* and *compute* keyword arguments during the initialization. *FillCompute* can be explicitly cast from *FillRequest*. In this case *compute* is *request*. If callable methods *fill* and *compute* or *request* were not found, :exc:`.LenaTypeError` is raised. """ fill_method = getattr(el, fill, None) compute_method = getattr(el, compute, None) if callable(fill_method): self.fill = fill_method else: raise exceptions.LenaTypeError( "fill method {} must exist and be callable".format(fill) ) if callable(compute_method): self.compute = compute_method else: # derive from FillRequest request = getattr(el, "request", None) if not callable(request): raise exceptions.LenaTypeError( "compute method {} or request must exist and be callable"\ .format(compute) ) self.compute = request self._el = el
[документация] def fill(self, value): # pylint: disable=no-self-use,unused-argument """Fill *self* with *value*.""" raise exceptions.LenaNotImplementedError
[документация] def compute(self): # pylint: disable=no-self-use """Yield computed values.""" raise exceptions.LenaNotImplementedError
[документация]class FillInto(object): """Adapter for a FillInto element.""" def __init__(self, el, fill_into=_SENTINEL, explicit=True): """Element *el* must implement *fill_into* method, be callable or be a Run element. If no *fill_into* argument is provided, then *fill_into* method is searched, then *__call__*, then *run*. If none of them is found and callable, :exc:`.LenaTypeError` is raised. Note that callable elements and elements with *fill_into* method have different interface. If the *el* is callable, it is assumed to be a simple function, which accepts a single value and transforms that, and the result is filled into the element by this adapter. *fill_into* method, on the contrary, takes two arguments (element and value) and fills the element itself. This allows to use lambdas directly in *FillInto*. A *Run* element is converted to *FillInto* this way: for each value the *el* runs a flow consisting of this one value and fills the results into the output element. This can be done only if *explicit* is True. """ if fill_into is _SENTINEL: fill_into_m = getattr(el, "fill_into", None) if callable(fill_into_m): self.fill_into = fill_into_m # try to make possible convertions elif callable(el): # use default implementation pass elif ct.is_run_el(el) and explicit: self.fill_into = self._run_fill_into else: raise exceptions.LenaTypeError( "element {} ".format(el) + "must implement 'fill_into' method, " "or be callable or a Run element" ) elif callable(getattr(el, fill_into, None)): self.fill_into = getattr(el, fill_into) else: raise exceptions.LenaTypeError( "method {} of {} must exist and be callable". format(fill_into, el) ) self._el = el self._explicit = explicit
[документация] def fill_into(self, element, value): """Fill *value* into an *element*. *Value* is transformed by the initialization element before filling *el*. *Element* must provide a *fill* method. """ element.fill(self._el(value))
def _run_fill_into(self, element, value): """Convert *value* into a flow of one element, run this flow and fill the *element* with the results. """ for result in self._el.run([value]): element.fill(result) def __repr__(self): if self._explicit: return "FillInto({})".format(repr(self._el)) else: return repr(self._el)
[документация]class FillRequest(object): """ .. deprecated:: 0.6 inside :class:`.Split` this element can be implemented by a simple :class:`Run` element. Adapter for a *FillRequest* element. A *FillRequest* element slices the flow during *fill* and yields results for each chunk during *request*. It can also call *reset* after each *request*. """ # FillRequest is not deprecated, but changing its meaning. # # They seem redundant, because mostly they are used # # as FillCompute (and a lot of code is duplicated). # # Memory warnings should be indicated elsewhere # # (like special fields). # # Moreover, FillCompute elements # # sometimes need to be more versatile, # # and FillRequest is of no help for that. def __init__(self, el, bufsize=1, reset=None, buffer_input=None, buffer_output=None, yield_on_remainder=False, fill="fill", request="request", reset_name="reset" ): """Names for actual *fill*, *request* and *reset* methods can be provided during initialization (the latter is set through *reset_name*). *FillRequest* can be initialized from a *FillCompute* element. If a callable *request* method was not found, *el* must have a method *compute*. *request* in this case is *compute*. *FillRequest* can also be initialized from a *Run* element. In that case *el* is not required to have *fill*, *compute* or *reset* methods (and *FillRequest* will not have such missing methods). *FillRequest* implements *run* method that splits the flow into subslices of *bufsize* values and feeds them to the *run* method of *el*. Since we require no less than *bufsize* values (except *yield_on_remainder* is ``True``), we need to store either *bufsize* values of the incoming flow or all values produced by *el.run* or *el.request* for each slice. This is set by *buffer_input* or *buffer_output*. One and only one of them must be ``True``. For example, if the element receives file names and produces data from them, it would be wise to buffer input. If the element receives much data and produces a histogram, one should buffer output. If a keyword argument *reset* is ``True``, *el* must have a method *reset_name*, and in this case :meth:`reset` is called after each :meth:`request` (including those during :meth:`run`). In general, *Run* elements have no *reset* methods, but for *FillCompute* elements *reset* must be set explicitly. If *yield_on_remainder* is ``True``, then the output will be yielded even if the element was filled less than *bufsize* times (but at least once). In that case no internal buffers are used during :meth:`run` and corresponding attributes are not checked. **Attributes** :attr:`bufsize` is the maximum size of subslices during *run*. *bufsize* must be a natural number, otherwise :exc:`.LenaValueError` is raised. If callable *fill* and *request* methods were not found, or *FillRequest* could not be derived from *FillCompute*, or if *reset* is ``True``, but *el* has no method *reset*, :exc:`.LenaTypeError` is raised. .. versionchanged:: 0.5 add keyword arguments *yield_on_remainder*, *buffer_input*, *buffer_output*, *reset_name*. Require explicit *reset* for *FillCompute* elements. """ # todo: rename bufsize to size or something cleverer el_reset = getattr(el, reset_name, None) if callable(el_reset): # we set this method even if *reset* is False, # because reset() may be called manually self._el_reset = el_reset elif reset: raise exceptions.LenaTypeError( "{} method must exist and be callable".format(reset_name) ) else: # disable the missing method. self.reset = None self._reset = bool(reset) # set buffer_input bi = bool(buffer_input) bo = bool(buffer_output) # if yield_on_remainder is True, buffers are not used. if not yield_on_remainder and int(bi) + int(bo) != 1: raise exceptions.LenaValueError( "one and only one of buffer_input or buffer_output " "must be set" ) self._buffer_input = bi run = getattr(el, "run", None) if run and callable(run): self.run = self._run_run has_run = True else: # This method won't work in case of no fill/compute/request, # but it will raise during initialization if so. self.run = self._run_fill_compute has_run = False el_fill = getattr(el, fill, None) if callable(el_fill): self._el_fill = el_fill self._n_count = 0 if self._buffer_input: self._buffer_in = [] else: self._buffer_out = [] if reset is None: raise exceptions.LenaTypeError( "reset must be set explicitly for a FillCompute element" ) elif not has_run: raise exceptions.LenaTypeError( "no callable run or fill method {} were found".format(fill) ) else: self.fill = None el_request = getattr(el, request, None) if callable(el_request): # we don't check whether it has fill method here. self._el_request = el_request else: # derive from compute compute = getattr(el, "compute", None) if callable(compute): self._el_request = compute elif not has_run: raise exceptions.LenaTypeError( "element must have callable methods request, compute or run" ) else: self.request = None if(bufsize != int(bufsize) or bufsize < 1): raise exceptions.LenaValueError( "bufsize must be a natural number, not {}".format(bufsize) ) self.bufsize = int(bufsize) self._yield_on_remainder = bool(yield_on_remainder) self._el = el
[документация] def fill(self, value): """Fill *el* with *value*. If more than *bufsize* values were filled, incoming values are stored in a buffer (if *buffer_input* is ``True``) or, otherwise, the output of *el.request* is stored in a buffer, until it is requested. """ if self._n_count and not self._n_count % self.bufsize: if self._buffer_input: self._buffer_in.append(value) return else: # add output to the output buffer self._buffer_out.extend(self.request()) # don't reset because need to know that fill was called # self._n_count = 0 self._el_fill(value) self._n_count += 1
[документация] def request(self): """Yield results (if they are available) and possibly reset. If input or output buffers were filled, all their contents are processed and yielded. """ # yield what was filled into the element if self._n_count >= self.bufsize: for val in self._el_request(): yield val if self._reset: self._el_reset() # it is important that request is not called # when not enough values were filled after last request self._n_count = self._n_count % self.bufsize # process buffers. # Buffers are always filled after the element, # therefore the order is correct. if not self._buffer_input: # all results are in _buffer_out for val in self._buffer_out: yield val if self._yield_on_remainder: for val in self._el_request(): yield val # reset was already called when filling _buffer_out return else: # fill the buffer from _buffer_in and yield nfills = 0 bufsize = self.bufsize buffer_in = self._buffer_in while True: if nfills == bufsize: for val in self._el_request(): yield val if self._reset: self._el_reset() nfills = 0 del buffer_in[:bufsize] # should be slower, because a slice below # copies elements ## self._buffer_in = self._buffer_in[bufsize:] continue # fill the element with values from buffer try: val = buffer_in[nfills] except IndexError: if self._yield_on_remainder: for val in self._el_request(): yield val break else: self._el_fill(val) nfills += 1 if self._reset: self._el_reset()
[документация] def reset(self): """Reset *el* (ignoring the initialization setting).""" self._el_reset()
[документация] def run(self, flow): """Process the *flow* slice by slice. *fill* each value from a subslice of *flow* of *bufsize* length, then yield results from *request*. Repeat until the *flow* is exhausted. If *fill* was not called even once (*flow* was empty), nothing is yielded, because *bufsize* values were not obtained (in contrast to *FillCompute*, for which output for an empty flow is reasonable). The last slice may contain less than *bufsize* values. If there were any and if *yield_on_remainder* is ``True``, *request* will be called for that. """ raise exceptions.LenaNotImplementedError
def _run_fill_compute(self, flow): while True: # A slice is a non-materialized list, which means # that it will not take place of *bufsize* in memory. slice_ = itertools.islice(flow, self.bufsize) # Reset the counter; what if it grows too large? # Maybe it will allow faster nfills % bufsize? # May be irrelevant though. nfills = 0 # there is no other way to check # whether the flow contains at least one element try: val = next(slice_) except StopIteration: # Unlike FillCompute, we don't yield anything # if the flow was smaller than the required bufsize break else: self._el_fill(val) nfills += 1 for val in slice_: self._el_fill(val) nfills += 1 # Flow finished too early. # Normally nfills would be equal to self.bufsize if nfills % self.bufsize: if self._yield_on_remainder: # can't return smth in Python 2 generator. # return self.request() for result in self._el_request(): yield result # # in fact, this should be undefined # # for the remainder. # if self._reset: # self._el_reset() return for result in self._el_request(): yield result if self._reset: self._el_reset() def _run_run(self, flow): # todo: to improve performance, one might create # a separate run method for bufsize = 1. from itertools import islice, chain el_run = self._el.run bufsize = self.bufsize # we can yield results one by one if self._yield_on_remainder: # it is important that flow is an iterable, not a sequence, # so we can use islice repeatedly while True: try: val = next(flow) except StopIteration: # empty flow gives empty results return else: # at least one event present # this would give bad performance for bufsize=1 for val in el_run(chain([val], islice(flow, bufsize-1))): yield val # usually Run elements have no reset, but... # we call reset here, because we don't call request # (which usually calls reset itself) if self._reset: self.reset() # we can't yield results one by one, # because we need to be sure # that *bufsize* values were encountered class slice_iterated_with_count(): def __init__(self, size, seq): self.count = 0 self._size = size self._seq = seq def __iter__(self): count = 0 for val in islice(self._seq, self._size): count += 1 yield val self.count = count if self._buffer_input: while True: buffer = list(islice(flow, bufsize)) # incomplete buffer, yield nothing. if len(buffer) < bufsize: # _yield_on_remainder is False return # full buffer received, process. for val in el_run(iter(buffer)): yield val # probably False for Run element. if self._reset: self._el_reset() else: # buffer output # slice_ can be iterated multiple times slice_ = slice_iterated_with_count(bufsize, flow) while True: results = list(el_run(slice_)) if slice_.count < bufsize: return for val in results: yield val # probably False for Run element. if self._reset: self._el_reset()
[документация]class Run(object): """Adapter for a *Run* element.""" def __init__(self, el, run=_SENTINEL): """Name of the method *run* can be customized during initialization. If *run* argument is supplied, *el* must be None or it must have a callable method with name given by *run*. If *run* keyword argument is missing, then *el* is searched for a method *run*. If that is not found, a type cast is attempted. A *Run* element can be initialized from a *Call* or a *FillCompute* element. A callable element is run as a transformation function, which accepts single values from the flow and *returns* their transformations for each value. A *FillCompute* element is run the following way: first, *el.fill(value)* is called for the whole flow. After the flow is exhausted, *el.compute()* is called. It is possible to initialize :class:`Run` using a generator function without an element. To do that, set the element to ``None``: *Run(None, run=<my_function>)*. If the initialization failed, :exc:`.LenaTypeError` is raised. :class:`Run` is used implicitly during the initialization of :class:`.Sequence`. """ if run is _SENTINEL: # no explicit method name given run_ = getattr(el, "run", None) if callable(run_): # callable method "run" found self.run = run_ # convert el to Run elif callable(el): # Call to Run self.run = self._call_run elif ct.is_fill_compute_el(el): # FillCompute to Run self.run = self._fc_run else: raise exceptions.LenaTypeError( "element {} must implement run method ".format(el) + "be callable, or be a FillCompute element" ) self._run_name = None else: # explicit method name given if el is None: self.run = run # may raise if run is not a string elif callable(getattr(el, run, None)): self.run = getattr(el, run) else: raise exceptions.LenaTypeError( "no callable method {} of {} found".format(run, el) ) self._run_name = run self._el = el def _call_run(self, flow): """Implement *run* method for *Call* element.""" for val in flow: yield self._el(val)
[документация] def run(self, flow): """Yield transformed values from the incoming *flow*."""
# will be redefined in __init__ # for val in flow: # yield self._el(val) def _fc_run(self, flow): """Implement *run* method for *FillCompute* element. First, *fill* is called for the whole flow, after the *flow* is exhausted, *compute* is called. """ for arg in flow: self._el.fill(arg) results = self._el.compute() return results def __eq__(self, other): if not isinstance(other, Run): # Run(el) != el return NotImplemented if self._run_name == "run": if other._run_name not in (None, "run"): return False return (self._el == other._el and self._run_name == other._run_name) def __repr__(self): if self._run_name: return "Run({}, run={})".format(repr(self._el), self._run_name) else: return "Run({})".format(repr(self._el))
[документация]class SourceEl(object): """Adapter to provide *__call__()* method. Name of the actually called method can be customized during the initialization. The :meth:`__call__()` method is a generator, which yields values. It doesn't accept any input flow. """ def __init__(self, el, call=_SENTINEL): """Element *el* must be callable or iterable, or contain a callable method *call*. If :class:`SourceEl` failed to instantiate with *el* and *call*, :exc:`.LenaTypeError` is raised. """ if call is _SENTINEL: # try to find call in el if callable(el): self._call = el elif hasattr(el, "__iter__"): # works for iterators and range self._call = lambda: el else: raise exceptions.LenaTypeError( "provide a callable method or a callable element, " "{} given".format(el) ) else: # get call by its name el_call = getattr(el, call, None) if callable(el_call): self._call = el_call # pylint: disable=protected-access else: raise exceptions.LenaTypeError( "call method {} of {} must exist and be callable" .format(call, el) ) self._el = el
[документация] def __call__(self): """Yield generated values.""" # This fix is needed only for a special method, # https://docs.python.org/3/reference/datamodel.html#special-lookup # Methods with other names will be monkey-patched correctly. return self._call() # pylint: disable=no-member
def __repr__(self): return "SourceEl({})".format(repr(self._el))