"""Split data flow and run analysis in parallel."""
import collections
import copy
import itertools
from copy import deepcopy
import lena.context
from . import fill_compute_seq
from . import check_sequence_type as ct
from . import fill_request_seq
from . import sequence
from . import exceptions
from . import source
from . import meta
def _get_seq_with_type(seq, bufsize=None):
"""Return a (sequence, type) pair.
Sequence is derived from *seq*
(or is *seq*, if that is of a sequence type).
"""
seq_type = ""
if isinstance(seq, source.Source):
seq_type = "source"
elif isinstance(seq, fill_compute_seq.FillComputeSeq):
seq_type = "fill_compute"
elif isinstance(seq, fill_request_seq.FillRequestSeq):
seq_type = "fill_request"
elif isinstance(seq, sequence.Sequence):
seq_type = "sequence"
if seq_type:
# append later
pass
## If no explicit type is given, check seq's methods
elif ct.is_fill_compute_seq(seq):
seq_type = "fill_compute"
if not ct.is_fill_compute_el(seq):
seq = fill_compute_seq.FillComputeSeq(*seq)
elif ct.is_fill_request_seq(seq):
seq_type = "fill_request"
if not ct.is_fill_request_el(seq):
seq = fill_request_seq.FillRequestSeq(
*seq, bufsize=bufsize,
# if we have a FillRequest element inside,
# it decides itself when to reset.
reset=False,
# todo: change the interface, because
# no difference with buffer_output: we fill
# without a buffer
buffer_input=True
)
# Source is not checked,
# because it must be Source explicitly.
else:
try:
if isinstance(seq, tuple):
seq = sequence.Sequence(*seq)
else:
seq = sequence.Sequence(seq)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be a "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(seq)
)
else:
seq_type = "sequence"
return (seq, seq_type)
def _get_uc_intersection(unknown_contexts_seq):
from copy import copy as copy_
# deque to preserve the order, and because it may be faster
# to delete elements from start (if all sequences are different)
intersection = collections.deque(unknown_contexts_seq[0])
def remove_from_deque(ucs, context):
# index does not help in deque, it's O(n) (or maybe not at 0?)
# https://stackoverflow.com/questions/58152201/time-complexity-deleting-element-of-deque
while True:
try:
ucs.remove(context)
except ValueError:
break
for unknown_contexts in unknown_contexts_seq[1:]:
# we can't iterate a mutated deque
for context in copy_(intersection):
remove = False
if context not in unknown_contexts:
remove = True
# todo: if a local (known) context sets the same key,
# the result will be different (no intersection here!)
# Also need to check cardinality
# and other unknown contexts setting the same key.
# we could have several copies of this context
# elif ...
if remove:
remove_from_deque(intersection, context)
return list(intersection)
def _remove_uc_intersection(unknown_contexts_seq, intersection):
for cont in intersection:
# cont can be several times (N) in the intersection,
# in that case it will be present in unknown_contexts
# N times as well
for unknown_contexts in unknown_contexts_seq:
unknown_contexts.remove(cont)
class LenaSplit(object):
"""Abstract base class for split sequences."""
def __init__(self, seqs):
self._seqs = seqs
contexts = []
unknown_contexts_seq = []
# the order of sequences is not important
for seq in seqs:
# first we get all known contexts, then unknown ones
if hasattr(seq, "_get_context"):
contexts.append(seq._get_context())
if hasattr(seq, "_unknown_contexts"):
# it is important that we have links to actual lists
unknown_contexts_seq.append(seq._unknown_contexts)
if unknown_contexts_seq and len(unknown_contexts_seq) == len(seqs):
# otherwise ignore them (the intersection is empty):
# they will be set from external context.
intersection = _get_uc_intersection(unknown_contexts_seq)
self._unknown_contexts = intersection
# never update template contexts twice.
_remove_uc_intersection(unknown_contexts_seq, intersection)
# todo: if a template context updates an existing context,
# this will be wrong. But what if it is a feature?
# Don't do that if you are not sure what you are doing (I'm not)
# Maybe we shall remove some keys from the intersection
# if this ever becomes a problem.
self._context = lena.context.intersection(*contexts)
def _repr_nested(self, base_indent="", indent=" "*4, el_separ=",\n"):
# copied from LenaSequence, see the diffs
def repr_maybe_nested(el, base_indent, indent):
if hasattr(el, "_repr_nested"):
return el._repr_nested(base_indent=base_indent+indent, indent=indent)
else:
return base_indent + indent + repr(el)
elems = el_separ.join((repr_maybe_nested(el, base_indent=base_indent,
indent=indent)
# diff here
for el in self._seqs))
if "\n" in el_separ and self._seqs:
# maybe new line
mnl = "\n"
# maybe base indent
mbi = base_indent
else:
mnl = ""
mbi = ""
# diff here in name and brackets
return "".join([base_indent, self._name,
"([", mnl, elems, mnl, mbi, "])"])
def _get_context(self):
return deepcopy(self._context)
def _set_context(self, context):
# we don't update current context here,
# because Split is always within a sequence.
# todo
# If it is not, it has no external context,
# or one must first copy its context before setting a new one.
for seq in self._seqs:
if hasattr(seq, "_set_context"):
seq._set_context(context)
def __eq__(self, other):
if not isinstance(other, LenaSplit):
return NotImplemented
return self._seqs == other._seqs
def __repr__(self):
return self._repr_nested()
[документация]class Split(LenaSplit):
"""Split data flow and run analysis in parallel."""
def __init__(self, seqs, bufsize=1000, copy_buf=True):
"""*seqs* must be a list of Sequence, Source, FillComputeSeq
or FillRequestSeq sequences.
If *seqs* is empty, *Split* acts as an empty *Sequence* and
yields all values it receives.
*bufsize* is the size of the buffer for the input flow.
If *bufsize* is ``None``,
whole input flow is materialized in the buffer.
*bufsize* must be a natural number or ``None``.
*copy_buf* sets whether the buffer should be copied
during :meth:`run`.
This is important if different sequences can change input data
and thus interfere with each other.
Common type:
If each sequence from *seqs* has a common type,
*Split* creates methods corresponding to this type.
For example, if each sequence is *FillCompute*,
*Split* creates methods *fill* and *compute*
and can be used as a *FillCompute* sequence.
*fill* fills all its subsequences (with copies
if *copy_buf* is True), and *compute*
yields values from all sequences in turn
(as would also do *request* or *Source.__call__*).
Common type is not implemented for *Call* element.
In case of wrong initialization arguments, :exc:`.LenaTypeError`
or :exc:`.LenaValueError` is raised.
"""
# todo: copy_buf must be always True. Isn't that?
if not isinstance(seqs, list):
raise exceptions.LenaTypeError(
"seqs must be a list of sequences, "
"{} provided".format(seqs)
)
seqs = [meta.alter_sequence(seq) for seq in seqs]
new_seqs = []
self._seq_types = []
for sequence in seqs:
try:
seq, seq_type = _get_seq_with_type(sequence, bufsize)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be one of "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(sequence)
)
new_seqs.append(seq)
self._seq_types.append(seq_type)
different_seq_types = set(self._seq_types)
self._n_seq_types = len(different_seq_types)
if self._n_seq_types == 1:
seq_type = different_seq_types.pop()
# todo: probably remove run to avoid duplication?
if seq_type == "fill_compute":
self.fill = self._fill
self.compute = self._compute
elif seq_type == "fill_request":
self.fill = self._fill
self.request = self._request
elif seq_type == "source":
pass
elif self._n_seq_types == 0:
self.run = self._empty_run
self._copy_buf = bool(copy_buf)
if bufsize is not None:
if bufsize != int(bufsize) or bufsize < 1:
raise exceptions.LenaValueError(
"bufsize should be a natural number "
"or None, {} provided".format(bufsize)
)
self._bufsize = bufsize
self._name = "Split"
super(Split, self).__init__(new_seqs)
[документация] def __call__(self):
"""Each initialization sequence generates flow.
After its flow is empty, next sequence is called, etc.
This method is available only if each self sequence is a
:class:`.Source`,
otherwise runtime :exc:`.LenaAttributeError` is raised.
"""
if self._n_seq_types != 1 or not ct.is_source(self._seqs[0]):
raise exceptions.LenaAttributeError(
"Split has no method '__call__'. It should contain "
"only Source sequences to be callable"
)
# todo: use itertools.chain and check performance difference
for seq in self._seqs:
for result in seq():
yield result
def _fill(self, val):
for seq in self._seqs[:-1]:
if self._copy_buf:
seq.fill(copy.deepcopy(val))
else:
seq.fill(val)
self._seqs[-1].fill(val)
def _compute(self):
for seq in self._seqs:
for val in seq.compute():
yield val
def _request(self):
for seq in self._seqs:
for val in seq.request():
yield val
def _empty_run(self, flow):
"""If self sequence is empty, yield all flow unchanged."""
for val in flow:
yield val
[документация] def run(self, flow):
"""Iterate input *flow* and yield results.
The *flow* is divided into subslices of *bufsize*.
Each subslice is processed by sequences
in the order of their initializer list.
If a sequence is a *Source*,
it doesn't accept the incoming *flow*,
but produces its own complete flow
and becomes inactive (is not called any more).
A *FillRequestSeq* is filled with the buffer contents.
After the buffer is finished,
it yields all values from *request()*.
A *FillComputeSeq* is filled with values from each buffer,
but yields values from *compute* only after the whole *flow*
is finished.
A *Sequence* is called with *run(buffer)*
instead of the whole flow. The results are yielded
for each buffer (and also if the *flow* was empty).
If the whole flow must be analysed at once,
don't use such a sequence in *Split*.
If the *flow* was empty, each *call*, *compute*,
*request* or *run* is called nevertheless.
If *copy_buf* is True,
then the buffer for each sequence except the last one is a deep copy
of the current buffer.
"""
active_seqs = self._seqs[:]
active_seq_types = self._seq_types[:]
n_of_active_seqs = len(active_seqs)
ind = 0
flow = iter(flow)
flow_was_empty = True
while True:
## iterate on flow
# If stop is None, then iteration continues
# until the iterator is exhausted, if at all
# https://docs.python.org/3/library/itertools.html#itertools.islice
orig_buf = list(itertools.islice(flow, self._bufsize))
if orig_buf:
flow_was_empty = False
else:
break
# iterate on active sequences
ind = 0
while ind < n_of_active_seqs:
if self._copy_buf and n_of_active_seqs - ind > 1:
# last sequence doesn't need a copy of the buffer
buf = copy.deepcopy(orig_buf)
else:
buf = orig_buf
seq = active_seqs[ind]
seq_type = active_seq_types[ind]
if seq_type == "source":
for val in seq():
yield val
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_compute":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
if stopped:
for result in seq.compute():
yield result
# we don't have goto in Python,
# so we have to repeat this
# each time we break double cycle.
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "fill_request":
stopped = False
for val in buf:
try:
seq.fill(val)
except exceptions.LenaStopFill:
stopped = True
break
# FillRequest yields each time after buffer is filled
for result in seq.request():
yield result
if stopped:
del active_seqs[ind]
del active_seq_types[ind]
n_of_active_seqs -= 1
continue
elif seq_type == "sequence":
# run buf as a whole flow.
# this may be very wrong if seq has internal state,
# e.g. contains a Cache
for res in seq.run(buf):
yield res
# this is not needed, because can't be tested.
# else:
# raise exceptions.LenaRuntimeError(
# "unknown sequence type {}".format(seq_type)
# )
ind += 1
# end internal while on sequences
# end while on flow
# yield computed data
for seq, seq_type in zip(active_seqs, active_seq_types):
if seq_type == "source":
# otherwise it is a logic error
assert flow_was_empty
for val in seq():
yield val
elif seq_type == "fill_compute":
for val in seq.compute():
yield val
elif seq_type == "fill_request":
# otherwise FillRequest yielded after each buffer
if flow_was_empty:
for val in seq.request():
yield val
elif seq_type == "sequence":
if flow_was_empty:
for val in seq.run([]):
yield val