Source code for lena.structures.hist_functions

"""Functions for histograms.

These functions are used for low-level work
with histograms and their contents.
They are not needed for normal usage.
"""
import collections
import copy
import itertools
import operator
import re
import sys
if sys.version_info.major == 3:
    from functools import reduce as _reduce
else:
    _reduce = reduce

import lena.core
from .graph import graph as _graph


[docs]class HistCell(collections.namedtuple("HistCell", ("edges, bin, index"))): """A namedtuple with fields *edges, bin, index*.""" # from Aaron Hall's answer https://stackoverflow.com/a/28568351/952234 __slots__ = ()
[docs]def cell_to_string( cell_edges, var_context=None, coord_names=None, coord_fmt="{}_lte_{}_lt_{}", coord_join="_", reverse=False): """Transform cell edges into a string. *cell_edges* is a tuple of pairs *(lower bound, upper bound)* for each coordinate. *coord_names* is a list of coordinates names. *coord_fmt* is a string, which defines how to format individual coordinates. *coord_join* is a string, which joins coordinate pairs. If *reverse* is True, coordinates are joined in reverse order. """ # todo: do we really need var_context? # todo: even if so, why isn't that a {}? Is that dangerous? if coord_names is None: if var_context is None: coord_names = [ "coord{}".format(ind) for ind in range(len(cell_edges)) ] else: if "combine" in var_context: coord_names = [var["name"] for var in var_context["combine"]] else: coord_names = [var_context["name"]] if len(cell_edges) != len(coord_names): raise lena.core.LenaValueError( "coord_names must have same length as cell_edges, " "{} and {} given".format(coord_names, cell_edges) ) coord_strings = [coord_fmt.format(edge[0], coord_names[ind], edge[1]) for (ind, edge) in enumerate(cell_edges)] if reverse: coord_strings = reversed(coord_strings) coord_str = coord_join.join(coord_strings) return coord_str
def _check_edges_increasing_1d(arr): if len(arr) <= 1: raise lena.core.LenaValueError("size of edges should be more than one," " {} provided".format(arr)) increasing = (tup[0] < tup[1] for tup in zip(arr, arr[1:])) if not all(increasing): raise lena.core.LenaValueError( "expected strictly increasing values, " "{} provided".format(arr) )
[docs]def check_edges_increasing(edges): """Assure that multidimensional *edges* are increasing. If length of *edges* or its subarray is less than 2 or if some subarray of *edges* contains not strictly increasing values, :exc:`.LenaValueError` is raised. """ if not len(edges): raise lena.core.LenaValueError("edges must be non-empty") elif not hasattr(edges[0], '__iter__'): _check_edges_increasing_1d(edges) return for arr in edges: if len(arr) <= 1: raise lena.core.LenaValueError( "size of edges should be more than one. " "{} provided".format(arr) ) _check_edges_increasing_1d(arr)
[docs]def get_bin_edges(index, edges): """Return edges of the bin for the given *edges* of a histogram. In one-dimensional case *index* must be an integer and a tuple of *(x_low_edge, x_high_edge)* for that bin is returned. In a multidimensional case *index* is a container of numeric indices in each dimension. A list of bin edges in each dimension is returned.""" # todo: maybe give up this 1- and multidimensional unification # and write separate functions for each case. if not hasattr(edges[0], '__iter__'): # 1-dimensional edges if hasattr(index, '__iter__'): index = index[0] return (edges[index], edges[index+1]) # multidimensional edges return [(edges[coord][i], edges[coord][i+1]) for coord, i in enumerate(index)]
[docs]def get_bin_on_index(index, bins): """Return bin corresponding to multidimensional *index*. *index* can be a number or a list/tuple. If *index* length is less than dimension of *bins*, a subarray of *bins* is returned. In case of an index error, :exc:`.LenaIndexError` is raised. Example: >>> from lena.structures import histogram, get_bin_on_index >>> hist = histogram([0, 1], [0]) >>> get_bin_on_index(0, hist.bins) 0 >>> get_bin_on_index((0, 1), [[0, 1], [0, 0]]) 1 >>> get_bin_on_index(0, [[0, 1], [0, 0]]) [0, 1] """ if not isinstance(index, (list, tuple)): index = [index] subarr = bins for ind in index: try: subarr = subarr[ind] except IndexError: raise lena.core.LenaIndexError( "bad index: {}, bins = {}".format(index, bins) ) return subarr
[docs]def get_bin_on_value_1d(val, arr): """Return index for value in one-dimensional array. *arr* must contain strictly increasing values (not necessarily equidistant), it is not checked. "Linear binary search" is used, that is our array search by default assumes the array to be split on equidistant steps. Example: >>> from lena.structures import get_bin_on_value_1d >>> arr = [0, 1, 4, 5, 7, 10] >>> get_bin_on_value_1d(0, arr) 0 >>> get_bin_on_value_1d(4.5, arr) 2 >>> # upper range is excluded >>> get_bin_on_value_1d(10, arr) 5 >>> # underflow >>> get_bin_on_value_1d(-10, arr) -1 """ # may also use numpy.searchsorted # https://docs.scipy.org/doc/numpy-1.15.0/reference/generated/numpy.searchsorted.html ind_min = 0 ind_max = len(arr) - 1 while True: if ind_max - ind_min <= 1: # lower bound is close if val < arr[ind_min]: return ind_min - 1 # upper bound is open elif val >= arr[ind_max]: return ind_max else: return ind_min if val == arr[ind_min]: return ind_min if val < arr[ind_min]: return ind_min - 1 elif val >= arr[ind_max]: return ind_max else: shift = int( (ind_max - ind_min) * ( float(val - arr[ind_min]) / (arr[ind_max] - arr[ind_min]) )) ind_guess = ind_min + shift if ind_min == ind_guess: ind_min += 1 continue # ind_max is always more that ind_guess, # because val < arr[ind_max] (see the formula for shift). # This branch is not needed and can't be tested. # But for the sake of numerical inaccuracies, let us keep this # so that we never get into an infinite loop. elif ind_max == ind_guess: ind_max -= 1 continue if val < arr[ind_guess]: ind_max = ind_guess else: ind_min = ind_guess
[docs]def get_bin_on_value(arg, edges): """Get the bin index for *arg* in a multidimensional array *edges*. *arg* is a 1-dimensional array of numbers (or a number for 1-dimensional *edges*), and corresponds to a point in N-dimensional space. *edges* is an array of N-1 dimensional arrays (lists or tuples) of numbers. Each 1-dimensional subarray consists of increasing numbers. *arg* and *edges* must have the same length (otherwise :exc:`.LenaValueError` is raised). *arg* and *edges* must be iterable and support *len()*. Return list of indices in *edges* corresponding to *arg*. If any coordinate is out of its corresponding edge range, its index will be ``-1`` for underflow or ``len(edge)-1`` for overflow. Examples: >>> from lena.structures import get_bin_on_value >>> edges = [[1, 2, 3], [1, 3.5]] >>> get_bin_on_value((1.5, 2), edges) [0, 0] >>> get_bin_on_value((1.5, 0), edges) [0, -1] >>> # the upper edge is excluded >>> get_bin_on_value((3, 2), edges) [2, 0] >>> # one-dimensional edges >>> edges = [1, 2, 3] >>> get_bin_on_value(2, edges) [1] """ # arg is a one-dimensional index if not isinstance(arg, (tuple, list)): return [get_bin_on_value_1d(arg, edges)] # arg is a multidimensional index if len(arg) != len(edges): raise lena.core.LenaValueError( "argument should have same dimension as edges. " "arg = {}, edges = {}".format(arg, edges) ) indices = [] for ind, array in enumerate(edges): cur_bin = get_bin_on_value_1d(arg[ind], array) indices.append(cur_bin) return indices
[docs]def get_example_bin(struct): """Return bin with zero index on each axis of the histogram bins. For example, if the histogram is two-dimensional, return hist[0][0]. *struct* can be a :class:`.histogram` or an array of bins. """ if isinstance(struct, lena.structures.histogram): return lena.structures.get_bin_on_index([0] * struct.dim, struct.bins) else: bins = struct while isinstance(bins, list): bins = bins[0] return bins
[docs]def hist_to_graph(hist, make_value=None, get_coordinate="left", field_names=("x", "y"), scale=None): """Convert a :class:`.histogram` to a :class:`.graph`. *make_value* is a function to set the value of a graph's point. By default it is bin content. *make_value* accepts a single value (bin content) without context. This option could be used to create graph's error bars. For example, to create a graph with errors from a histogram where bins contain a named tuple with fields *mean*, *mean_error* and a context one could use >>> make_value = lambda bin_: (bin_.mean, bin_.mean_error) *get_coordinate* defines what the coordinate of a graph point created from a histogram bin will be. It can be "left" (default), "right" and "middle". *field_names* set field names of the graph. Their number must be the same as the dimension of the result. For a *make_value* above they would be *("x", "y_mean", "y_mean_error")*. *scale* becomes the graph's scale (unknown by default). If it is ``True``, it uses the histogram scale. *hist* must contain only numeric bins (without context) or *make_value* must remove context when creating a numeric graph. Return the resulting graph. """ ## Could have allowed get_coordinate to be callable # (for generality), but 1) first find a use case, # 2) histogram bins could be adjusted in the first place. # -- don't understand 2. if get_coordinate == "left": get_coord = lambda edges: tuple(coord[0] for coord in edges) elif get_coordinate == "right": get_coord = lambda edges: tuple(coord[1] for coord in edges) # *middle* between the two edges, not the *center* of the bin # as a whole (because the graph corresponds to a point) elif get_coordinate == "middle": get_coord = lambda edges: tuple(0.5*(coord[0] + coord[1]) for coord in edges) else: raise lena.core.LenaValueError( 'get_coordinate must be one of "left", "right" or "middle"; ' '"{}" provided'.format(get_coordinate) ) # todo: make_value may be bad design. # Maybe allow to change the graph in the sequence. # However, make_value allows not to recreate a graph # or its coordinates (if that is not needed). if isinstance(field_names, str): # copied from graph.__init__ field_names = tuple(re.findall(r'[^,\s]+', field_names)) elif not isinstance(field_names, tuple): raise lena.core.LenaTypeError( "field_names must be a string or a tuple" ) coords = [[] for _ in field_names] chain = itertools.chain if scale is True: scale = hist.scale() for value, edges in iter_bins_with_edges(hist.bins, hist.edges): coord = get_coord(edges) # Since we never use contexts here, it will be optimal # to ignore them completely (remove them elsewhere). # bin_value = lena.flow.get_data(value) bin_value = value if make_value is None: graph_value = bin_value else: graph_value = make_value(bin_value) # for iteration below if not hasattr(graph_value, "__iter__"): graph_value = (graph_value,) # add each coordinate to respective array for arr, coord_ in zip(coords, chain(coord, graph_value)): arr.append(coord_) return _graph(coords, field_names=field_names, scale=scale)
[docs]def init_bins(edges, value=0, deepcopy=False): """Initialize cells of the form *edges* with the given *value*. Return bins filled with copies of *value*. *Value* must be copyable, usual numbers will suit. If the value is mutable, use *deepcopy =* ``True`` (or the content of cells will be identical). Examples: >>> edges = [[0, 1], [0, 1]] >>> # one cell >>> init_bins(edges) [[0]] >>> # no need to use floats, >>> # because integers will automatically be cast to floats >>> # when used together >>> init_bins(edges, 0.0) [[0.0]] >>> init_bins([[0, 1, 2], [0, 1, 2]]) [[0, 0], [0, 0]] >>> init_bins([0, 1, 2]) [0, 0] """ nbins = len(edges) - 1 if not isinstance(edges[0], (list, tuple)): # edges is one-dimensional if deepcopy: return [copy.deepcopy(value) for _ in range(nbins)] else: return [value] * nbins for ind, arr in enumerate(edges): if ind == nbins: if deepcopy: return [copy.deepcopy(value) for _ in range(len(arr)-1)] else: return list([value] * (len(arr)-1)) bins = [] for _ in range(len(arr)-1): bins.append(init_bins(edges[ind+1:], value, deepcopy)) return bins
[docs]def integral(bins, edges): """Compute integral (scale for a histogram). *bins* contain values, and *edges* form the mesh for the integration. Their format is defined in :class:`.histogram` description. """ total = 0 for ind, bin_content in iter_bins(bins): bin_lengths = [ edges[coord][i+1] - edges[coord][i] for coord, i in enumerate(ind) ] # product vol = _reduce(operator.mul, bin_lengths, 1) cell_integral = vol * bin_content total += cell_integral return total
[docs]def iter_bins(bins): """Iterate on *bins*. Yield *(index, bin content)*. Edges with higher index are iterated first (that is z, then y, then x for a 3-dimensional histogram). """ # if not isinstance(bins, (list, tuple)): if not hasattr(bins, '__iter__'): # cell yield ((), bins) else: for ind, _ in enumerate(bins): for sub_ind, val in iter_bins(bins[ind]): yield (((ind,) + sub_ind), val)
[docs]def iter_bins_with_edges(bins, edges): """Generate *(bin content, bin edges)* pairs. Bin edges is a tuple, such that its item at index i is *(lower bound, upper bound)* of the bin at i-th coordinate. Examples: >>> from lena.math import mesh >>> list(iter_bins_with_edges([0, 1, 2], edges=mesh((0, 3), 3))) [(0, ((0, 1.0),)), (1, ((1.0, 2.0),)), (2, ((2.0, 3),))] >>> >>> # 2-dimensional histogram >>> list(iter_bins_with_edges( ... bins=[[2]], edges=mesh(((0, 1), (0, 1)), (1, 1)) ... )) [(2, ((0, 1), (0, 1)))] .. versionadded:: 0.5 made public. """ # todo: only a list or also a tuple, an array? if not isinstance(edges[0], list): edges = [edges] bins_sizes = [len(edge)-1 for edge in edges] indices = [list(range(nbins)) for nbins in bins_sizes] for index in itertools.product(*indices): bin_ = lena.structures.get_bin_on_index(index, bins) edges_low = [] edges_high = [] for var, var_ind in enumerate(index): edges_low.append(edges[var][var_ind]) edges_high.append(edges[var][var_ind+1]) yield (bin_, tuple(zip(edges_low, edges_high)))
[docs]def iter_cells(hist, ranges=None, coord_ranges=None): """Iterate cells of a histogram *hist*, possibly in a subrange. For each bin, yield a :class:`HistCell` containing *bin edges, bin content* and *bin index*. The order of iteration is the same as for :func:`iter_bins`. *ranges* are the ranges of bin indices to be used for each coordinate (the lower value is included, the upper value is excluded). *coord_ranges* set real coordinate ranges based on histogram edges. Obviously, they can be not exactly bin edges. If one of the ranges for the given coordinate is outside the histogram edges, then only existing histogram edges within the range are selected. If the coordinate range is completely outside histogram edges, nothing is yielded. If a lower or upper *coord_range* falls within a bin, this bin is yielded. Note that if a coordinate range falls on a bin edge, the number of generated bins can be unstable because of limited float precision. *ranges* and *coord_ranges* are tuples of tuples of limits in corresponding dimensions. For one-dimensional histogram it must be a tuple containing a tuple, for example *((None, None),)*. ``None`` as an upper or lower *range* means no limit (*((None, None),)* is equivalent to *((0, len(bins)),)* for a 1-dimensional histogram). If a *range* index is lower than 0 or higher than possible index, :exc:`.LenaValueError` is raised. If both *coord_ranges* and *ranges* are provided, :exc:`.LenaTypeError` is raised. """ # for bin_ind, bin_ in iter_bins(hist.bins): # yield HistCell(get_bin_edges(bin_ind, hist.edges), bin_, bin_ind) # if bins and edges are calculated each time, save the result now bins, edges = hist.bins, hist.edges # todo: hist.edges must be same # for 1- and multidimensional histograms. if hist.dim == 1: edges = (edges,) if coord_ranges is not None: if ranges is not None: raise lena.core.LenaTypeError( "only ranges or coord_ranges can be provided, not both" ) ranges = [] if not isinstance(coord_ranges[0], (tuple, list)): coord_ranges = (coord_ranges, ) for coord, coord_range in enumerate(coord_ranges): # todo: (dis?)allow None as an infinite range. # todo: raise or transpose unordered coordinates? # todo: change the order of function arguments. lower_bin_ind = get_bin_on_value_1d(coord_range[0], edges[coord]) if lower_bin_ind == -1: lower_bin_ind = 0 upper_bin_ind = get_bin_on_value_1d(coord_range[1], edges[coord]) max_ind = len(edges[coord]) if upper_bin_ind == max_ind: upper_bin_ind -= 1 if lower_bin_ind >= max_ind or upper_bin_ind <= 0: # histogram edges are outside the range. return ranges.append((lower_bin_ind, upper_bin_ind)) if not ranges: ranges = ((None, None),) * hist.dim real_ind_ranges = [] for coord, coord_range in enumerate(ranges): low, up = coord_range if low is None: low = 0 else: # negative indices should not be supported if low < 0: raise lena.core.LenaValueError( "low must be not less than 0 if provided" ) max_ind = len(edges[coord]) - 1 if up is None: up = max_ind else: # huge indices should not be supported as well. if up > max_ind: raise lena.core.LenaValueError( "up must not be greater than len(edges)-1, if provided" ) real_ind_ranges.append(list(range(low, up))) indices = list(itertools.product(*real_ind_ranges)) for ind in indices: yield HistCell(get_bin_edges(ind, edges), get_bin_on_index(ind, bins), ind)
[docs]def make_hist_context(hist, context): """Update a deep copy of *context* with the context of a :class:`.histogram` *hist*. .. deprecated:: 0.5 histogram context is updated automatically during conversion in :class:`~.output.ToCSV`. Use histogram._update_context explicitly if needed. """ # absolutely unnecessary. context = copy.deepcopy(context) hist_context = { "histogram": { "dim": hist.dim, "nbins": hist.nbins, "ranges": hist.ranges } } context.update(hist_context) # just bad. return context
[docs]def unify_1_md(bins, edges): """Unify 1- and multidimensional bins and edges. Return a tuple of *(bins, edges)*. Bins and multidimensional *edges* return unchanged, while one-dimensional *edges* are inserted into a list. """ if hasattr(edges[0], '__iter__'): # if isinstance(edges[0], (list, tuple)): return (bins, edges) else: return (bins, [edges])