"""Split data flow and run analysis in parallel."""
import collections
import copy
import itertools
from copy import deepcopy
import lena.context
from . import fill_compute_seq
from . import check_sequence_type as ct
from . import fill_request_seq
from . import sequence
from . import exceptions
from .exceptions import LenaKeyError
from . import source
from . import meta
def _get_seq_with_type(seq, bufsize=None):
"""Return a (sequence, type) pair.
Sequence is derived from *seq*
(or is *seq*, if that is of a sequence type).
"""
seq_type = ""
if isinstance(seq, source.Source):
seq_type = "source"
elif isinstance(seq, fill_compute_seq.FillComputeSeq):
seq_type = "fill_compute"
elif isinstance(seq, fill_request_seq.FillRequestSeq):
seq_type = "fill_request"
elif isinstance(seq, sequence.Sequence):
seq_type = "sequence"
if seq_type:
# append later
pass
## If no explicit type is given, check seq's methods
elif ct.is_fill_compute_seq(seq):
seq_type = "fill_compute"
# not sure whether we should care about that.
# We probably should not economise on wrapping an element
# into a sequence. However, it is harmless now.
if not ct.is_fill_compute_el(seq):
seq = fill_compute_seq.FillComputeSeq(*seq)
elif ct.is_fill_request_seq(seq):
seq_type = "fill_request"
if not ct.is_fill_request_el(seq):
seq = fill_request_seq.FillRequestSeq(
*seq, bufsize=bufsize,
# if we have a FillRequest element inside,
# it decides itself when to reset.
reset=False,
# todo: change the interface, because
# no difference with buffer_output: we fill
# without a buffer
buffer_input=True
)
# Source or Sequence
else:
if not isinstance(seq, tuple):
seqwrap = (seq,)
else:
seqwrap = seq
if not seqwrap:
seq_cls = sequence.Sequence
seq_type = "sequence"
else:
el0 = seqwrap[0]
if hasattr(el0, "_is_source_el"):
seq_cls = source.Source
seq_type = "source"
else:
seq_cls = sequence.Sequence
seq_type = "sequence"
try:
seq = seq_cls(*seqwrap)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be a "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(seq)
)
return (seq, seq_type)
class LenaSplit(object):
"""Abstract base class for split sequences."""
def __init__(self, seqs):
self._seqs = seqs
# copied from LenaSequence
try:
self._set_context({})
except LenaKeyError:
pass
def _repr_nested(self, base_indent="", indent=" "*4, el_separ=",\n"):
# copied from LenaSequence, see the diffs
def repr_maybe_nested(el, base_indent, indent):
if hasattr(el, "_repr_nested"):
return el._repr_nested(base_indent=base_indent+indent, indent=indent)
else:
return base_indent + indent + repr(el)
elems = el_separ.join((repr_maybe_nested(el, base_indent=base_indent,
indent=indent)
# diff here
for el in self._seqs))
if "\n" in el_separ and self._seqs:
# maybe new line
mnl = "\n"
# maybe base indent
mbi = base_indent
else:
mnl = ""
mbi = ""
# diff here in name and brackets
return "".join([base_indent, self._name,
"([", mnl, elems, mnl, mbi, "])"])
def _get_context(self):
# copied from meta.SetContext and LenaSequence.
try:
sc = self._static_context
except AttributeError:
# self._exc is present in that case,
# because there is always at least one
# _set_context call before _get_context.
raise self._exc
return deepcopy(sc)
def _set_context(self, context):
if hasattr(self, "_static_context") and not context:
# every sequence was already initialised with {}.
return
for seq in self._seqs:
if hasattr(seq, "_set_context"):
try:
seq._set_context(deepcopy(context))
except LenaKeyError as exc:
# needed keys are lacking in the context.
# _static_context is not set.
# If _static_context was already set,
# we won't be here, since external context
# can not delete local keys.
self._exc = exc
return
contexts = []
for seq in self._seqs:
# a sequence without context is transparent
# (no need to intersect with others).
if hasattr(seq, "_get_context"):
# if some of sequences was not set proper context yet,
# this will raise a LenaKeyError.
# Hard to think about that case though.
contexts.append(seq._get_context())
if contexts:
context = lena.context.intersection(*contexts)
# deep copy is already done in the intersection
# or in get_context of a previous element.
self._static_context = context
def __eq__(self, other):
if not isinstance(other, LenaSplit):
return NotImplemented
return self._seqs == other._seqs
def __repr__(self):
return self._repr_nested()
[docs]
class Split(LenaSplit):
"""Split data flow and run analysis in parallel."""
def __init__(self, seqs, bufsize=1000, copy_buf=True):
"""*seqs* must be a list of Sequence, Source, FillComputeSeq
or FillRequestSeq sequences.
If *seqs* is empty, *Split* acts as an empty *Sequence* and
yields all values it receives.
Elements of *seqs* can be tuples. In that case they are
automatically converted into the corresponding Lena sequences.
Tuples with a FillCompute element are converted
into FillComputeSeq.
If the first element of the tuple has attribute ``_is_source_el``,
the tuple is converted to a Source, otherwise to a Sequence.
If automatic initialization fails, use an explicit one.
*bufsize* is the size of the buffer for the input flow.
If *bufsize* is ``None``,
whole input flow is materialized in the buffer.
*bufsize* must be a natural number or ``None``.
*copy_buf* sets whether the buffer should be copied
during :meth:`run`.
This is important if different sequences can change input data
and thus interfere with each other.
Common type:
If each sequence from *seqs* has a common type,
*Split* creates methods corresponding to this type.
For example, if each sequence is *FillCompute*,
*Split* creates methods *fill* and *compute*
and can be used as a *FillCompute* sequence.
*fill* fills all its subsequences (with copies
if *copy_buf* is True), and *compute*
yields values from all sequences in turn
(as would also do *request* or *Source.__call__*).
Common type is not implemented for *Call* element.
In case of wrong initialization arguments, :exc:`.LenaTypeError`
or :exc:`.LenaValueError` is raised.
"""
# todo: copy_buf must be always True. Isn't that?
if not isinstance(seqs, list):
raise exceptions.LenaTypeError(
"seqs must be a list of sequences, "
"{} provided".format(seqs)
)
seqs = [meta.alter_sequence(seq) for seq in seqs]
new_seqs = []
self._seq_types = []
for sequence in seqs:
try:
seq, seq_type = _get_seq_with_type(sequence, bufsize)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be one of "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(sequence)
)
new_seqs.append(seq)
self._seq_types.append(seq_type)
different_seq_types = set(self._seq_types)
self._n_seq_types = len(different_seq_types)
if self._n_seq_types == 1:
seq_type = different_seq_types.pop()
# todo: probably remove run to avoid duplication?
if seq_type == "fill_compute":
self.fill = self._fill
self.compute = self._compute
elif seq_type == "fill_request":
self.fill = self._fill
self.request = self._request
elif seq_type == "source":
# value is not important, only presence
self._is_source_el = True
elif self._n_seq_types == 0:
self.run = self._empty_run
self._copy_buf = bool(copy_buf)
if bufsize is not None:
if bufsize != int(bufsize) or bufsize < 1:
raise exceptions.LenaValueError(
"bufsize should be a natural number "
"or None, {} provided".format(bufsize)
)
self._bufsize = bufsize
self._name = "Split"
super(Split, self).__init__(new_seqs)
[docs]
def __call__(self):
"""Each initialization sequence generates flow.
After its flow is empty, next sequence is called, etc.
This method is available only if each self sequence is a
:class:`.Source`,
otherwise runtime :exc:`.LenaAttributeError` is raised.
"""
if not hasattr(self, "_is_source_el"):
raise exceptions.LenaAttributeError(
"Split has no method '__call__'. It should contain "
"only Source sequences to be callable"
)
# todo: use itertools.chain and check performance difference
for seq in self._seqs:
for result in seq():
yield result
def _fill(self, val):
for seq in self._seqs[:-1]:
if self._copy_buf:
seq.fill(copy.deepcopy(val))
else:
seq.fill(val)
self._seqs[-1].fill(val)
def _compute(self):
for seq in self._seqs:
for val in seq.compute():
yield val
def _request(self):
for seq in self._seqs:
for val in seq.request():
yield val
def _empty_run(self, flow):
"""If self sequence is empty, yield all flow unchanged."""
for val in flow:
yield val
[docs]
def run(self, flow):
"""Iterate input *flow* and yield results.
The *flow* is divided into subslices of *bufsize*.
Each subslice is processed by sequences
in the order of their initializer list.
If a sequence is a *Source*,
it doesn't accept the incoming *flow*,
but produces its own complete flow
and becomes inactive (is not called any more).
A *FillRequestSeq* is filled with the buffer contents.
After the buffer is finished,
it yields all values from *request()*.
A *FillComputeSeq* is filled with values from each buffer,
but yields values from *compute* only after the whole *flow*
is finished.
A *Sequence* is called with *run(buffer)*
instead of the whole flow. The results are yielded
for each buffer (and also if the *flow* was empty).
If the whole flow must be analysed at once,
don't use such a sequence in *Split*.
If the *flow* was empty, each *call*, *compute*,
*request* or *run* is called nevertheless.
If *copy_buf* is True,
then the buffer for each sequence except the last one is a deep copy
of the current buffer.
"""
active_seqs = self._seqs[:]
active_seq_types = self._seq_types[:]
n_of_active_seqs = len(active_seqs)
ind = 0
flow = iter(flow)
flow_was_empty = True
while True:
## iterate on flow
# If stop is None, then iteration continues
# until the iterator is exhausted, if at all
# https://docs.python.org/3/library/itertools.html#itertools.islice
orig_buf = list(itertools.islice(flow, self._bufsize))
if orig_buf:
flow_was_empty = False
else:
break
# iterate on active sequences
ind = 0
while ind < n_of_active_seqs:
if self._copy_buf and n_of_active_seqs - ind > 1:
# last sequence doesn't need a copy of the buffer
buf = copy.deepcopy(orig_buf)
else:
buf = orig_buf
seq = active_seqs[ind]
seq_type = active_seq_types[ind]
if seq_type == "source":
for val in seq():
yield val
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_compute":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
if stopped:
for result in seq.compute():
yield result
# we don't have goto in Python,
# so we have to repeat this
# each time we break double cycle.
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_request":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
# FillRequest yields each time after buffer is filled
for result in seq.request():
yield result
if stopped:
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "sequence":
# run buf as a whole flow.
# this may be very wrong if seq has internal state,
# e.g. contains a Cache
for res in seq.run(buf):
yield res
# this is not needed, because can't be tested.
# else:
# raise exceptions.LenaRuntimeError(
# "unknown sequence type {}".format(seq_type)
# )
ind += 1
# end internal while on sequences
# end while on flow
# yield computed data
for seq, seq_type in zip(active_seqs, active_seq_types):
if seq_type == "source":
# otherwise it is a logic error
assert flow_was_empty
for val in seq():
yield val
elif seq_type == "fill_compute":
for val in seq.compute():
yield val
elif seq_type == "fill_request":
# otherwise FillRequest yielded after each buffer
if flow_was_empty:
for val in seq.request():
yield val
elif seq_type == "sequence":
if flow_was_empty:
for val in seq.run([]):
yield val