|
- """Interval. Like nilmdb.server.interval, but re-implemented here
- in plain Python so clients have easier access to it, and with a few
- helper functions.
-
- Intervals are half-open, ie. they include data points with timestamps
- [start, end)
- """
-
- import nilmdb.utils.time
- import nilmdb.utils.iterator
-
- class IntervalError(Exception):
- """Error due to interval overlap, etc"""
- pass
-
- # Interval
- class Interval:
- """Represents an interval of time."""
-
- def __init__(self, start, end):
- """
- 'start' and 'end' are arbitrary numbers that represent time
- """
- if start >= end:
- # Explicitly disallow zero-width intervals (since they're half-open)
- raise IntervalError("start %s must precede end %s" % (start, end))
- self.start = start
- self.end = end
-
- def __repr__(self):
- s = repr(self.start) + ", " + repr(self.end)
- return self.__class__.__name__ + "(" + s + ")"
-
- def __str__(self):
- return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
- " -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
-
- def human_string(self):
- return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
- " -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
-
- # Compare two intervals. If non-equal, order by start then end
- def __lt__(self, other):
- return (self.start, self.end) < (other.start, other.end)
- def __gt__(self, other):
- return (self.start, self.end) > (other.start, other.end)
- def __le__(self, other):
- return (self.start, self.end) <= (other.start, other.end)
- def __le__(self, other):
- return (self.start, self.end) >= (other.start, other.end)
- def __eq__(self, other):
- return (self.start, self.end) == (other.start, other.end)
- def __ne__(self, other):
- return (self.start, self.end) != (other.start, other.end)
-
- def intersects(self, other):
- """Return True if two Interval objects intersect"""
- if not isinstance(other, Interval):
- raise TypeError("need an Interval")
- if self.end <= other.start or self.start >= other.end:
- return False
- return True
-
- def subset(self, start, end):
- """Return a new Interval that is a subset of this one"""
- # A subclass that tracks additional data might override this.
- if start < self.start or end > self.end:
- raise IntervalError("not a subset")
- return Interval(start, end)
-
- def _interval_math_helper(a, b, op, subset = True):
- """Helper for set_difference, intersection functions,
- to compute interval subsets based on a math operator on ranges
- present in A and B. Subsets are computed from A, or new intervals
- are generated if subset = False."""
- # Iterate through all starts and ends in sorted order. Add a
- # tag to the iterator so that we can figure out which one they
- # were, after sorting.
- def decorate(it, key_start, key_end):
- for i in it:
- yield i.start, key_start, i
- yield i.end, key_end, i
- a_iter = decorate(iter(a), 0, 2)
- b_iter = decorate(iter(b), 1, 3)
-
- # Now iterate over the timestamps of each start and end.
- # At each point, evaluate which type of end it is, to determine
- # how to build up the output intervals.
- a_interval = None
- in_a = False
- in_b = False
- out_start = None
- for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
- if k == 0:
- a_interval = i
- in_a = True
- elif k == 1:
- in_b = True
- elif k == 2:
- in_a = False
- elif k == 3:
- in_b = False
- include = op(in_a, in_b)
- if include and out_start is None:
- out_start = ts
- elif not include:
- if out_start is not None and out_start != ts:
- if subset:
- yield a_interval.subset(out_start, ts)
- else:
- yield Interval(out_start, ts)
- out_start = None
-
- def set_difference(a, b):
- """
- Compute the difference (a \\ b) between the intervals in 'a' and
- the intervals in 'b'; i.e., the ranges that are present in 'self'
- but not 'other'.
-
- 'a' and 'b' must both be iterables.
-
- Returns a generator that yields each interval in turn.
- Output intervals are built as subsets of the intervals in the
- first argument (a).
- """
- return _interval_math_helper(a, b, (lambda a, b: a and not b))
-
- def intersection(a, b):
- """
- Compute the intersection between the intervals in 'a' and the
- intervals in 'b'; i.e., the ranges that are present in both 'a'
- and 'b'.
-
- 'a' and 'b' must both be iterables.
-
- Returns a generator that yields each interval in turn.
- Output intervals are built as subsets of the intervals in the
- first argument (a).
- """
- return _interval_math_helper(a, b, (lambda a, b: a and b))
-
- def optimize(it):
- """
- Given an iterable 'it' with intervals, optimize them by joining
- together intervals that are adjacent in time, and return a generator
- that yields the new intervals.
- """
- saved_int = None
- for interval in it:
- if saved_int is not None:
- if saved_int.end == interval.start:
- interval.start = saved_int.start
- else:
- yield saved_int
- saved_int = interval
- if saved_int is not None:
- yield saved_int
|