|
- """Interval and IntervalSet
-
- Represents an interval of time, and a set of such intervals.
-
- Intervals are closed, ie. they include timestamps [start, end]
- """
-
- # First implementation kept a sorted list of intervals and used
- # biesct() to optimize some operations, but this was too slow.
-
- # Second version was based on the quicksect implementation from
- # python-bx, modified slightly to handle floating point intervals.
- # This didn't support deletion.
-
- # Third version is more similar to the first version, using a rb-tree
- # instead of a simple sorted list to maintain O(log n) operations.
-
- # Fourth version is an optimized rb-tree that stores interval starts
- # and ends directly in the tree, like bxinterval did.
-
- import rbtree
-
- class IntervalError(Exception):
- """Error due to interval overlap, etc"""
- pass
-
- class Interval(object):
- """Represents an interval of time."""
-
- def __init__(self, start, end):
- """
- 'start' and 'end' are arbitrary floats that represent time
- """
- if start > end:
- raise IntervalError("start %s must precede end %s" % (start, end))
- self.start = float(start)
- self.end = float(end)
-
- def __repr__(self):
- s = repr(self.start) + ", " + repr(self.end)
- return self.__class__.__name__ + "(" + s + ")"
-
- def __str__(self):
- return "[" + str(self.start) + " -> " + str(self.end) + "]"
-
- def __cmp__(self, other):
- """Compare two intervals. If non-equal, order by start then end"""
- if not isinstance(other, Interval):
- raise TypeError("bad type")
- if self.start == other.start:
- if self.end < other.end:
- return -1
- if self.end > other.end:
- return 1
- return 0
- if self.start < other.start:
- return -1
- return 1
-
- def intersects(self, other):
- """Return True if two Interval objects intersect"""
- if (self.end <= other.start or self.start >= other.end):
- return False
- return True
-
- def subset(self, start, end):
- """Return a new Interval that is a subset of this one"""
- # A subclass that tracks additional data might override this.
- if start < self.start or end > self.end:
- raise IntervalError("not a subset")
- return Interval(start, end)
-
- class DBInterval(Interval):
- """
- Like Interval, but also tracks corresponding start/end times and
- positions within the database. These are not currently modified
- when subsets are taken, but can be used later to help zero in on
- database positions.
-
- The actual 'start' and 'end' will always fall within the database
- start and end, e.g.:
- db_start = 100, db_startpos = 10000
- start = 123
- end = 150
- db_end = 200, db_endpos = 20000
- """
-
- def __init__(self, start, end,
- db_start, db_end,
- db_startpos, db_endpos):
- """
- 'db_start' and 'db_end' are arbitrary floats that represent
- time. They must be a strict superset of the time interval
- covered by 'start' and 'end'. The 'db_startpos' and
- 'db_endpos' are arbitrary database position indicators that
- correspond to those points.
- """
- Interval.__init__(self, start, end)
- self.db_start = db_start
- self.db_end = db_end
- self.db_startpos = db_startpos
- self.db_endpos = db_endpos
- if db_start > start or db_end < end:
- raise IntervalError("database times must span the interval times")
-
- def __repr__(self):
- s = repr(self.start) + ", " + repr(self.end)
- s += ", " + repr(self.db_start) + ", " + repr(self.db_end)
- s += ", " + repr(self.db_startpos) + ", " + repr(self.db_endpos)
- return self.__class__.__name__ + "(" + s + ")"
-
- def subset(self, start, end):
- """
- Return a new DBInterval that is a subset of this one
- """
- if start < self.start or end > self.end:
- raise IntervalError("not a subset")
- return DBInterval(start, end,
- self.db_start, self.db_end,
- self.db_startpos, self.db_endpos)
-
- class IntervalSet(object):
- """
- A non-intersecting set of intervals.
- """
-
- def __init__(self, source=None):
- """
- 'source' is an Interval or IntervalSet to add.
- """
- self.tree = rbtree.RBTree()
- if source is not None:
- self += source
-
- def __iter__(self):
- for node in self.tree:
- if node.obj:
- yield node.obj
-
- def __len__(self):
- return sum(1 for x in self)
-
- def __repr__(self):
- descs = [ repr(x) for x in self ]
- return self.__class__.__name__ + "([" + ", ".join(descs) + "])"
-
- def __str__(self):
- descs = [ str(x) for x in self ]
- return "[" + ", ".join(descs) + "]"
-
- def __eq__(self, other):
- # This isn't particularly efficient, but it shouldn't get used in the
- # general case.
- """Test equality of two IntervalSets.
-
- Treats adjacent Intervals as equivalent to one long interval,
- so this function really tests whether the IntervalSets cover
- the same spans of time."""
- i = 0
- j = 0
- outside = True
-
- def is_adjacent(a, b):
- """Return True if two Intervals are adjacent (same end or start)"""
- if a.end == b.start or b.end == a.start:
- return True
- else:
- return False
-
- this = [ x for x in self ]
- that = [ x for x in other ]
-
- try:
- while True:
- if (outside):
- # To match, we need to be finished both sets
- if (i >= len(this) and j >= len(that)):
- return True
- # Or the starts need to match
- if (this[i].start != that[j].start):
- return False
- outside = False
- else:
- # We can move on if the two interval ends match
- if (this[i].end == that[j].end):
- i += 1
- j += 1
- outside = True
- else:
- # Whichever ends first needs to be adjacent to the next
- if (this[i].end < that[j].end):
- if (not is_adjacent(this[i],this[i+1])):
- return False
- i += 1
- else:
- if (not is_adjacent(that[j],that[j+1])):
- return False
- j += 1
- except IndexError:
- return False
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __iadd__(self, other):
- """Inplace add -- modifies self
-
- This throws an exception if the regions being added intersect."""
- if isinstance(other, Interval):
- if self.intersects(other):
- raise IntervalError("Tried to add overlapping interval "
- "to this set")
- self.tree.insert(rbtree.RBNode(other.start, other.end, other))
- else:
- for x in other:
- self.__iadd__(x)
- return self
-
- def __isub__(self, other):
- """Inplace subtract -- modifies self
-
- Removes an interval from the set. Must exist exactly
- as provided -- cannot remove a subset of an existing interval."""
- i = self.tree.find(other.start, other.end)
- if i is None:
- raise IntervalError("interval " + str(other) + " not in tree")
- self.tree.delete(i)
- return self
-
- def __add__(self, other):
- """Add -- returns a new object"""
- new = IntervalSet(self)
- new += IntervalSet(other)
- return new
-
- def __and__(self, other):
- """
- Compute a new IntervalSet from the intersection of two others
-
- Output intervals are built as subsets of the intervals in the
- first argument (self).
- """
- out = IntervalSet()
-
- if not isinstance(other, IntervalSet):
- for i in self.intersection(other):
- out.tree.insert(rbtree.RBNode(i.start, i.end, i))
- else:
- for x in other:
- for i in self.intersection(x):
- out.tree.insert(rbtree.RBNode(i.start, i.end, i))
-
- return out
-
- def intersection(self, interval):
- """
- Compute a sequence of intervals that correspond to the
- intersection between `self` and the provided interval.
- Returns a generator that yields each of these intervals
- in turn.
-
- Output intervals are built as subsets of the intervals in the
- first argument (self).
- """
- if not isinstance(interval, Interval):
- raise TypeError("bad type")
- for n in self.tree.intersect(interval.start, interval.end):
- i = n.obj
- if i:
- if i.start >= interval.start and i.end <= interval.end:
- yield i
- else:
- subset = i.subset(max(i.start, interval.start),
- min(i.end, interval.end))
- yield subset
-
- def intersects(self, other):
- """Return True if this IntervalSet intersects another interval"""
- for n in self.tree.intersect(other.start, other.end):
- if n.obj.intersects(other):
- return True
- return False
|