Flow

Elements:

Cache(filename[, method, protocol]) Cache flow passing through.
DropContext(*args) Sequence that transforms (data, context) flow so that only data remains in the inner sequence.
End Stop sequence here.
Filter(selector) Filter values from flow.
Print([before, sep, end, transform]) Print values passing through.
Progress([name, format]) Print progress (how much data was processed and remains).
RunIf(select, *args) Run a sequence only for selected values.

Functions:

get_context(value) Get context from a possible (data, context) pair.
get_data(value) Get data from value (a possible (data, context) pair).
get_data_context(value) Get (data, context) from value (a possible (data, context) pair).
seq_map(seq, container[, one_result]) Map Lena Sequence seq to the container.

Group plots:

GroupBy(group_by) Group values.
GroupPlots(group_by[, select, transform, …]) Group several plots.
GroupScale(scale_to[, allow_zero_scale, …]) Scale a group of data.
MapGroup(*seq, **map_scalars) Apply a sequence to groups.
Not(selector[, raise_on_error]) Negate a selector.
Selector(selector[, raise_on_error]) Determine whether an item should be selected.

Iterators:

Chain(*iterables) Chain generators.
CountFrom([start, step]) Generate numbers from start to infinity, with step between values.
ISlice(*args, **kwargs)

Deprecated since version 0.4.

Reverse() Reverse the flow (yield values from last to first).
Slice(*args) Slice data flow from start to stop with step.

Split into bins:

Since Lena 0.5 moved to Structures.

Elements

Elements form Lena sequences. This group contains miscellaneous elements, which didn’t fit other categories.

class Cache(filename, method='cPickle', protocol=2)[source]

Cache flow passing through.

On the first run, dump all flow to file (and yield the flow unaltered). On subsequent runs, load all flow from that file in the original order.

Example:

s = Source(
         ReadFiles(),
         ReadEvents(),
         MakeHistograms(),
         Cache("histograms.pkl"),
         MakeStats(),
         Cache("stats.pkl"),
      )

If stats.pkl exists, Cache will read data flow from that file and no other processing will be done. If the stats.pkl cache doesn’t exist, but the cache for histograms exist, it will be used and no previous processing (from ReadFiles to MakeHistograms) will occur. If both caches are not filled yet, processing will run as usually.

Only pickleable objects can be cached (otherwise a pickle.PickleError is raised).

Warning

The pickle module is not secure against erroneous or maliciously constructed data. Never unpickle data from an untrusted source.

filename is the name of file where to store the cache. You can give it .pkl extension.

method can be pickle or cPickle (faster pickle). For Python 3 they are same.

protocol is pickle protocol. Version 2 is the highest supported by Python 2. Version 0 is “human-readable” (as noted in the documentation). 3 is recommended if compatibility between Python 3 versions is needed. 4 was added in Python 3.4. It adds support for very large objects, pickling more kinds of objects, and some data format optimizations.

static alter_sequence(seq)[source]

If the Sequence seq contains a Cache, which has an up-to-date cache, a Source is built based on the flattened seq and returned. Otherwise the seq is returned unchanged.

cache_exists()[source]

Return True if file with cache exists and is readable.

drop_cache()[source]

Remove file with cache if that exists, pass otherwise.

If cache exists and is readable, but could not be deleted, LenaEnvironmentError is raised.

run(flow)[source]

Load cache or fill it.

If we can read filename, load flow from there. Otherwise use the incoming flow and fill the cache. All loaded or passing items are yielded.

class DropContext(*args)[source]

Sequence that transforms (data, context) flow so that only data remains in the inner sequence. Context is restored outside DropContext.

DropContext works for most simple cases as a Sequence, but may not work in more advanced circumstances. For example, since DropContext is not transparent, Split can’t judge whether it has a FillCompute element inside, and this may lead to errors in the analysis. It is recommended to provide context when possible.

*args will form a Sequence.

run(flow)[source]

Run the sequence without context, and generate output flow restoring the context before DropContext.

If the sequence adds a context, the returned context is updated with that.

class End[source]

Stop sequence here.

run(flow)[source]

Exhaust all preceding flow and stop iteration (yield nothing to the following flow).

class Filter(selector)[source]

Filter values from flow.

selector is a boolean function. If it returns True, the value passes Filter. If selector is not callable, it is converted to a Selector. If the conversion could not be done, LenaTypeError is raised.

Note

Filter appeared in Lena only in version 0.4. There may be better alternatives to using this element:

  • don’t produce values that you will discard later. If you want to select data from a specific file, read only that file.
  • use a custom class. SelectPosition(“border”) is more readable and maintainable than a Filter with many conditions, and it is also more cohesive if you group several options like “center” or “top” in a single place. If you make a selection, it can be useful to add information about that to the context (and Filter does not do that).

This doesn’t mean that we recommend against this class: sometimes it can be quick and explicit, and if one’s class name provides absolutely no clue what it does, a general Filter would be more readable.

New in version 0.4.

fill_into(element, value)[source]

Fill value into an element if selector(value) is True.

Element must have a fill(value) method.

run(flow)[source]

Yield values from the flow for which the selector is True.

class Print(before='', sep='', end='n', transform=None)[source]

Print values passing through.

before is a string appended before the first element in the item (which may be a container).

sep separates elements, end is appended after the last element.

transform is a function which transforms passing items (for example, it can select its specific fields).

__call__(value)[source]

Print and return value.

class Progress(name='', format='')[source]

Print progress (how much data was processed and remains).

name, if set, customizes the output with the collective name of values being processed (for example, “events”).

format is a formatting string for the output. It will be passed keyword arguments percent, index, total and name.

Use Progress before a large processing. For example, if you have files with much data, put this element after generating file names, but before reading files. To print indices without reading the whole flow, use CountFrom and Print.

Progress is estimated based on the number of items processed by this element. It does not take into account the creation of final plots or the difference in the processing time for different values.

Warning

To measure progress, the whole flow is consumed.

run(flow)[source]

Consume the flow, then yield values one by one and print progress.

class RunIf(select, *args)[source]

Run a sequence only for selected values.

Note

In general, different flows are transformed to common data types (like histograms). In some complicated analyses (like in SplitIntoBins) there can appear values of very different types, for which additional transformation must be run. Use this element in such cases.

RunIf is similar to Filter, but the latter can be used as a FillInto element inside Split.

RunIf with a selector select (let us call its opposite not_select) is equivalent to

Split(
    [
        (
            select,
            Sequence(*args)
        ),
        not_select
        # not selected values pass unchanged
    ],
    bufsize=1,
    copy_buf=False
)

and can be considered “syntactic sugar”. Use Split for more flexibility.

select is a function that accepts a value (maybe with context) and returns a boolean. It is converted to a Selector. See its specifications for available options.

args are an arbitrary number of elements that will be run for selected values. They are joined into a Sequence.

New in version 0.4.

run(flow)[source]

Run the sequence for selected values from the flow.

Warning

RunIf disrupts the flow: it feeds values to the sequence one by one, and yields the results. If the sequence depends on the complete flow (for example, yields the maximum element), this will be incorrect. The flow after RunIf is not disrupted.

Not selected values pass unchanged.

Functions

Functions to deal with data and context, and seq_map().

A value is considered a (data, context) pair, if it is a tuple of length 2, and the second element is a dictionary or its subclass.

get_context(value)[source]

Get context from a possible (data, context) pair.

If context is not found, return an empty dictionary.

get_data(value)[source]

Get data from value (a possible (data, context) pair).

If context is not found, return value.

get_data_context(value)[source]

Get (data, context) from value (a possible (data, context) pair).

If context is not found, (value, {}) is returned.

Since get_data() and get_context() both check whether context is present, this function may be slightly more efficient and compact than the other two.

seq_map(seq, container, one_result=True)[source]

Map Lena Sequence seq to the container.

For each value from the container, calculate seq.run([value]). This can be a list or a single value. If one_result is True, the result must be a single value. In this case, if results contain less than or more than one element, LenaValueError is raised.

The list of results (lists or single values) is returned. The results are in the same order as read from the container.

Group plots

Group several plots into one.

Since data can be produced in different places, several classes are needed to support this. First, the plots of interest must be selected (for example, one-dimensional histograms). This is done by Selector. Selected plots must be grouped. For example, we may want to plot data x versus Monte-Carlo x, but not data x vs data y. Data is grouped by GroupBy. To preserve the group, we can’t yield its members to the following elements, but have to transform the plots inside GroupPlots. We can also scale (normalize) all plots to one using GroupScale.

Example from a real analysis:

Sequence(
    # ... read data and produce histograms ...
    MakeFilename(dirname="background/{{run_number}}"),
    UpdateContext("output.plot.name", "{{variable.name}}",
                  raise_on_missing=True),
    lena.flow.GroupPlots(
        group_by="variable.coordinate",
        # Select either histograms (data) or Graphs (fit),
        # but only having "variable.coordinate" in context
        select=("variable.coordinate", [histogram, Graph]),
        # scale to data
        scale=Not("fit"),
        transform=(
            ToCSV(),
            # scaled plots will be written to separate files
            MakeFilename(
                "{{output.filename}}_scaled",
                overwrite=True,
            ),
            UpdateContext("output.plot.name", "{{variable.name}}",
                          raise_on_missing=True),
            write,
            # Several prints were used during this code creation
            # Print(transform=lambda val: val[1]["plot"]["name"]),
        ),
        # make both single and combined plots of coordinates
        yield_selected=True,
    ),
    # create file names for combined plots
    MakeFilename("combined_{{variable.coordinate}}"),
    # non-combined plots will still need file names
    MakeFilename("{{variable.name}}"),
    lena.output.ToCSV(),
    write,
    lena.context.Context(),
    # here our jinja template renders a group as a list of items
    lena.output.RenderLaTeX(template_dir=TEMPLATE_DIR,
                            select_template=select_template),
    # we have a single template, no more groups are present
    write,
    lena.output.LaTeXToPDF(),
)
class GroupBy(group_by)[source]

Group values.

Data is added during update(). Groups dictionary is available as groups attribute. groups is a mapping of keys (defined by group_by) to lists of items with the same key.

group_by is a function that returns distinct hashable results for values from different groups. It can be also a dot-separated formatting string. In that case only the context part of the value is used (see context.format_context).

If group_by is not a callable or a string, LenaTypeError is raised.

clear()[source]

Remove all groups.

update(val)[source]

Find a group for val and add it there.

A group key is calculated by group_by. If no such key exists, a new group is created.

If a formatting key was not found for val, LenaValueError is raised.

class GroupPlots(group_by, select=None, transform=(), scale=None, yield_selected=False)[source]

Group several plots.

Plots to be grouped are chosen by select, which acts as a boolean function. By default everything is selected. If select is not a Selector, it is converted to that class. Use Selector for more options.

Deprecated since version 0.5: use RunIf instead of select.

Plots are grouped by group_by, which returns different keys for different groups. It can be a function of a value or a formatting string for its context (see GroupBy). Example: group_by=”{{value.variable.name}}_{{variable.name}}”.

transform is a sequence that processes individual plots before yielding. Example: transform=(ToCSV(), write). transform is called after scale.

Deprecated since version 0.5: use MapGroup instead of transform.

scale is a number or a string. A number means the scale, to which plots must be normalized. A string is a name of the plot to which other plots must be normalized. If scale is not an instance of GroupScale, it is converted to that class. If a plot could not be rescaled, LenaValueError is raised. For more options, use GroupScale.

yield_selected defines whether selected items should be yielded during run(). By default it is False: if we used a variable in a combined plot, we don’t create a separate plot of that.

run(flow)[source]

Run the flow and yield final groups.

Each item of the flow is checked with the selector. If it is selected, it is added to groups. Otherwise, it is yielded.

After the flow is finished, groups are yielded. Groups are lists of items, which have same keys returned from group_by. Each group’s context (including empty one) is inserted into a list in context.group. If any element’s context.output.changed is True, the final context.output.changed is set to True (and to False otherwise). The resulting context is updated with the intersection of groups’ contexts.

If scale was set, plots are normalized to the given value or plot. If that plot was not selected (is missing in the captured group) or its norm could not be calculated, LenaValueError is raised.

class GroupScale(scale_to, allow_zero_scale=False, allow_unknown_scale=False)[source]

Scale a group of data.

scale_to defines the method of scaling. If a number is given, group items are scaled to that. Otherwise it is converted to a Selector, which must return a unique item from the group. Group items will be scaled to the scale of that item.

By default, attempts to rescale a structure with unknown or zero scale raise an error. If allow_zero_scale and allow_unknown_scale are set to True, the corresponding errors are ignored and the structure remains unscaled.

scale(group)[source]

Scale each structure in a group.

The group can contain (structure, context) pairs. The original group is rescaled in place.

If any item could not be rescaled and options were not set to ignore that, LenaValueError is raised.

class MapGroup(*seq, **map_scalars)[source]

Apply a sequence to groups.

Arguments seq must form a Sequence.

Set a keyword argument map_scalars to False to ignore scalar values (those that are not groups). Other keyword arguments raise LenaTypeError.

New in version 0.5.

run(flow)[source]

Map seq to every group from flow.

A value represents a group if its context has a key group and its data part is iterable (for example, a list of values). If length of data is different from the length of context.group, LenaRuntimeError is raised.

seq must produce an equal number of results for each item of group, or LenaRuntimeError is raised. These results are yielded in groups one by one.

Common changes of group context update common context (that of the value). context.output.changed is set appropriately.

class Not(selector, raise_on_error=False)[source]

Bases: lena.flow.selectors.Selector

Negate a selector.

selector is an instance of Selector or will be used to initialize that.

raise_on_error is used during the initialization of selector and has the same meaning as in Selector. It has no effect if selector is already initialized.

__call__(value)[source]

Negate the result of the initialized selector.

If raise_on_error is False, then this is a complete negation (including the case of an error encountered in the selector). For example, if the selector is variable.name, and value’s context contains no “variable”, Not(“variable.name”)(value) will be True. If raise_on_error is True, then any occurred exception will be raised here.

class Selector(selector, raise_on_error=False)[source]

Determine whether an item should be selected.

Generally, selected means the result is convertible to True, but other values can be used as well.

The usage of selector depends on its type.

If selector is a class, __call__() checks that data part of the value is subclassed from that.

A callable is used as is.

A string means that value’s context must conform to that (as in context.contains).

selector can be a container. In this case its items are converted to selectors. If selector is a list, the result is or applied to results of each item. If it is a tuple, boolean and is applied to the results.

raise_on_error is a boolean that sets whether in case of an exception the selector raises that exception or returns False. If selector is a container, raise_on_error will be used during its items initialization (recursively).

If incorrect arguments are provided, LenaTypeError is raised.

__call__(value)[source]

Check whether value is selected.

By default, if an exception occurs, the result is False. Thus it is safe to use non-existing attributes or arbitrary contexts. However, if raise_on_error was set to True, the exception will be raised. Use it if you are confident in the data and want to see any error.

Iterators

Iterators allow to transform a data flow or create a new one.

class Chain(*iterables)[source]

Chain generators.

Chain can be used as a Source to generate data.

Example:

>>> c = lena.flow.Chain([1, 2, 3], ['a', 'b'])
>>> list(c())
[1, 2, 3, 'a', 'b']

iterables will be chained during __call__(), that is after the first one is exhausted, the second is called, etc.

__call__()[source]

Generate values from chained iterables.

class CountFrom(start=0, step=1)[source]

Generate numbers from start to infinity, with step between values.

Similar to itertools.count().

__call__()[source]

Yield values from start to infinity with step.

ISlice(*args, **kwargs)[source]

Deprecated since version 0.4: use Slice.

class Reverse[source]

Reverse the flow (yield values from last to first).

Warning

This element will consume the whole flow.

run(flow)[source]

Consume the flow and yield values in reverse order.

class Slice(*args)[source]

Slice data flow from start to stop with step.

Initialization:

Slice (stop)

Slice (start, stop [, step])

Similar to itertools.islice() or range(). Negative indices for start and stop are supported during run().

Examples:

>>> Slice(1000)  # doctest: +SKIP

analyse only one thousand first events (no other values from flow are generated). Use it for quick checks of data on small subsamples.

>>> Slice(-1)  # doctest: +SKIP

yields all elements from the flow except the last one.

>>> Slice(1, -1)  # doctest: +SKIP

yields all elements from the flow except the first and the last one.

Note that in case of negative indices it is necessary to store abs(start) or abs(stop) values in memory. For example, to discard the last 200 elements one has to a) read the whole flow, b) store 200 elements during each iteration.

It is not possible to use negative indices with fill_into(), because it doesn’t control the flow and doesn’t know when it is finished. To obtain a negative step, use a composition with Reverse.

fill_into(element, value)[source]

Fill element with value.

Values are filled in the order defined by (start, stop, step). Element must have a fill(value) method.

When the filling should stop, LenaStopFill is raised (Split handles this normally). Sometimes for step more than one LenaStopFill will be raised before reaching stop elements. Early exceptions are an optimization and don’t affect the correctness of this method.

run(flow)[source]

Yield values from flow from start to stop with step.