"""Split data flow and run analysis in parallel."""
import copy
import itertools
from . import fill_compute_seq
from . import check_sequence_type as ct
from . import fill_request_seq
from . import sequence
from . import exceptions
from . import source
from . import meta
def _get_seq_with_type(seq, bufsize=None):
"""Return a (sequence, type) pair.
Sequence is derived from *seq*
(or is *seq*, if that is of a sequence type).
"""
seq_type = ""
if isinstance(seq, source.Source):
seq_type = "source"
elif isinstance(seq, fill_compute_seq.FillComputeSeq):
seq_type = "fill_compute"
elif isinstance(seq, fill_request_seq.FillRequestSeq):
seq_type = "fill_request"
elif isinstance(seq, sequence.Sequence):
seq_type = "sequence"
if seq_type:
# append later
pass
## If no explicit type is given, check seq's methods
elif ct.is_fill_compute_seq(seq):
seq_type = "fill_compute"
if not ct.is_fill_compute_el(seq):
seq = fill_compute_seq.FillComputeSeq(*seq)
elif ct.is_fill_request_seq(seq):
seq_type = "fill_request"
if not ct.is_fill_request_el(seq):
seq = fill_request_seq.FillRequestSeq(
*seq, bufsize=bufsize,
# if we have a FillRequest element inside,
# it decides itself when to reset.
reset=False,
# todo: change the interface, because
# no difference with buffer_output: we fill
# without a buffer
buffer_input=True
)
# Source is not checked,
# because it must be Source explicitly.
else:
try:
if isinstance(seq, tuple):
seq = sequence.Sequence(*seq)
else:
seq = sequence.Sequence(seq)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be a "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(seq)
)
else:
seq_type = "sequence"
return (seq, seq_type)
[docs]class Split(object):
"""Split data flow and run analysis in parallel."""
def __init__(self, seqs, bufsize=1000, copy_buf=True):
"""*seqs* must be a list of Sequence, Source, FillComputeSeq
or FillRequestSeq sequences.
If *seqs* is empty, *Split* acts as an empty *Sequence* and
yields all values it receives.
*bufsize* is the size of the buffer for the input flow.
If *bufsize* is ``None``,
whole input flow is materialized in the buffer.
*bufsize* must be a natural number or ``None``.
*copy_buf* sets whether the buffer should be copied
during :meth:`run`.
This is important if different sequences can change input data
and thus interfere with each other.
Common type:
If each sequence from *seqs* has a common type,
*Split* creates methods corresponding to this type.
For example, if each sequence is *FillCompute*,
*Split* creates methods *fill* and *compute*
and can be used as a *FillCompute* sequence.
*fill* fills all its subsequences (with copies
if *copy_buf* is True), and *compute*
yields values from all sequences in turn
(as would also do *request* or *Source.__call__*).
In case of wrong initialization arguments, :exc:`.LenaTypeError`
or :exc:`.LenaValueError` is raised.
"""
# todo: copy_buf must be always True. Isn't that?
if not isinstance(seqs, list):
raise exceptions.LenaTypeError(
"seqs must be a list of sequences, "
"{} provided".format(seqs)
)
seqs = [meta.alter_sequence(seq) for seq in seqs]
self._sequences = []
self._seq_types = []
for sequence in seqs:
try:
seq, seq_type = _get_seq_with_type(sequence, bufsize)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be one of "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(sequence)
)
self._sequences.append(seq)
self._seq_types.append(seq_type)
different_seq_types = set(self._seq_types)
self._n_seq_types = len(different_seq_types)
if self._n_seq_types == 1:
seq_type = different_seq_types.pop()
# todo: probably remove run to avoid duplication?
if seq_type == "fill_compute":
self.fill = self._fill
self.compute = self._compute
elif seq_type == "fill_request":
self.fill = self._fill
self.request = self._request
elif seq_type == "source":
pass
elif self._n_seq_types == 0:
self.run = self._empty_run
self._copy_buf = bool(copy_buf)
if bufsize is not None:
if bufsize != int(bufsize) or bufsize < 1:
raise exceptions.LenaValueError(
"bufsize should be a natural number "
"or None, {} provided".format(bufsize)
)
self._bufsize = bufsize
[docs] def __call__(self):
"""Each initialization sequence generates flow.
After its flow is empty, next sequence is called, etc.
This method is available only if each self sequence is a
:class:`.Source`,
otherwise runtime :exc:`.LenaAttributeError` is raised.
"""
if self._n_seq_types != 1 or not ct.is_source(self._sequences[0]):
raise exceptions.LenaAttributeError(
"Split has no method '__call__'. It should contain "
"only Source sequences to be callable"
)
# todo: use itertools.chain and check performance difference
for seq in self._sequences:
for result in seq():
yield result
def _fill(self, val):
for seq in self._sequences[:-1]:
if self._copy_buf:
seq.fill(copy.deepcopy(val))
else:
seq.fill(val)
self._sequences[-1].fill(val)
def _compute(self):
for seq in self._sequences:
for val in seq.compute():
yield val
def _request(self):
for seq in self._sequences:
for val in seq.request():
yield val
def _empty_run(self, flow):
"""If self sequence is empty, yield all flow unchanged."""
for val in flow:
yield val
[docs] def run(self, flow):
"""Iterate input *flow* and yield results.
The *flow* is divided into subslices of *bufsize*.
Each subslice is processed by sequences
in the order of their initializer list.
If a sequence is a *Source*,
it doesn't accept the incoming *flow*,
but produces its own complete flow
and becomes inactive (is not called any more).
A *FillRequestSeq* is filled with the buffer contents.
After the buffer is finished,
it yields all values from *request()*.
A *FillComputeSeq* is filled with values from each buffer,
but yields values from *compute* only after the whole *flow*
is finished.
A *Sequence* is called with *run(buffer)*
instead of the whole flow. The results are yielded
for each buffer (and also if the *flow* was empty).
If the whole flow must be analysed at once,
don't use such a sequence in *Split*.
If the *flow* was empty, each *call*, *compute*,
*request* or *run* is called nevertheless.
If *copy_buf* is True,
then the buffer for each sequence except the last one is a deep copy
of the current buffer.
"""
active_seqs = self._sequences[:]
active_seq_types = self._seq_types[:]
n_of_active_seqs = len(active_seqs)
ind = 0
flow = iter(flow)
flow_was_empty = True
while True:
## iterate on flow
# If stop is None, then iteration continues
# until the iterator is exhausted, if at all
# https://docs.python.org/3/library/itertools.html#itertools.islice
orig_buf = list(itertools.islice(flow, self._bufsize))
if orig_buf:
flow_was_empty = False
else:
break
# iterate on active sequences
ind = 0
while ind < n_of_active_seqs:
if self._copy_buf and n_of_active_seqs - ind > 1:
# last sequence doesn't need a copy of the buffer
buf = copy.deepcopy(orig_buf)
else:
buf = orig_buf
seq = active_seqs[ind]
seq_type = active_seq_types[ind]
if seq_type == "source":
for val in seq():
yield val
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_compute":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
if stopped:
for result in seq.compute():
yield result
# we don't have goto in Python,
# so we have to repeat this
# each time we break double cycle.
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_request":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
# FillRequest yields each time after buffer is filled
for result in seq.request():
yield result
if stopped:
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "sequence":
# run buf as a whole flow.
# this may be very wrong if seq has internal state,
# e.g. contains a Cache
for res in seq.run(buf):
yield res
# this is not needed, because can't be tested.
# else:
# raise exceptions.LenaRuntimeError(
# "unknown sequence type {}".format(seq_type)
# )
ind += 1
# end internal while on sequences
# end while on flow
# yield computed data
for seq, seq_type in zip(active_seqs, active_seq_types):
if seq_type == "source":
# otherwise it is a logic error
assert flow_was_empty
for val in seq():
yield val
elif seq_type == "fill_compute":
for val in seq.compute():
yield val
elif seq_type == "fill_request":
# otherwise FillRequest yielded after each buffer
if flow_was_empty:
for val in seq.request():
yield val
elif seq_type == "sequence":
if flow_was_empty:
for val in seq.run([]):
yield val