Browse Source

Incomplete, need to finish interval/intervalset tests

git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10656 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 12 years ago
parent
commit
d940aabf66
6 changed files with 411 additions and 1 deletions
  1. +15
    -0
      design.md
  2. +1
    -0
      nilmdb/__init__.py
  3. +204
    -0
      nilmdb/interval.py
  4. +0
    -1
      nilmdb/nilmdb.py
  5. +1
    -0
      setup.cfg
  6. +190
    -0
      tests/test_interval.py

+ 15
- 0
design.md View File

@@ -82,3 +82,18 @@ Converting from ASCII to PyTables:
for n in xrange(parser.nrows):
parser.fill_row(rowinstance, n)
table.append()


Inserting streams, inside nilmdb
--------------------------------

- First check that the new stream doesn't overlap.
- Get minimum timestamp, maximum timestamp from data parser.
- (extend parser to verify monotonicity and track extents)
- Get all intervals for this stream in the database
- See if new interval overlaps any existing ones
- If so, bail
- Question: should we cache intervals inside NilmDB?
- Assume database is fast for now, and always rebuild fom DB.
- Can add a caching layer later if we need to.
- `stream_get_ranges(path)` -> return IntervalSet?

+ 1
- 0
nilmdb/__init__.py View File

@@ -8,3 +8,4 @@ import layout
import serializer
import cmdline
import timestamper
import interval

+ 204
- 0
nilmdb/interval.py View File

@@ -0,0 +1,204 @@
"""Interval and IntervalSet

Represents an interval of time, and a sorted set of such intervals"""

import bisect

class IntervalError(Exception):
"""Error due to interval overlap, etc"""
pass

class Interval(object):
"""Represents an interval of time."""

start = None
end = None

def __init__(self, start, end, tolerance = 0.0):
"""
'start' and 'end' are time, in floating point seconds since epoch.

When determining whether two intervals are adjacent, a gap of
up to 'tolerance' units is allowed between this interval and
the subsequent one.
"""
if start > end:
raise IntervalError("start must precede end")
self.start = start
self.end = end
self.tolerance = tolerance

def __repr__(self):
s = repr(self.start) + ", " + repr(self.end)
if self.tolerance:
s += ", " + repr(self.tolerance)
return self.__name__ + "(" + s + ")"

def __str__(self):
return "[" + str(self.start) + " -> " + str(self.end) + "]"

def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end"""
if self.start == other.start:
if self.end < other.end:
return -1
if self.end > other.end:
return 1
return 0
if self.start < other.start:
return -1
return 1
def intersects(self, other):
"""Return True if two Interval objects intersect"""
if (self.end <= other.start or
self.start >= other.end):
return False
return True

def is_adjacent(self, other):
"""Return True if two Intervals are adjacent (same end or
start), within the tolerance gap. Overlapping intervals are
not considered adjacent."""
if self.intersects(other):
return False
for (a, b) in [(self, other), (other, self)]:
if a.end <= b.start and (a.end + a.tolerance) >= b.start:
return True
return False

def subset(self, start, end):
"""Return a new Interval that is a subset of this one"""
# A subclass that tracks additional data might override this.
if start < self.start or end > self.end:
raise IntervalError("not a subset")
return Interval(start, end, self.tolerance)

class IntervalSet(object):
"""
A sorted, non-intersecting set of intervals.
"""

def __init__(self, source=None):
"""
'source' is an Interval or IntervalSet to add.
"""
self.data = []
if source is not None:
if isinstance(source, Interval):
self._add_single_interval(source)
else:
self._add_intervals(source)

def __iter__(self):
return self.data.__iter__()

def __repr__(self):
return self.__name__ + "(" + repr(self.data) + ")"

def __eq__(self, other):
"""Test equality of two IntervalSets.

Treats adjacent Intervals as equivalent to one long interval,
so this function really tests whether the IntervalSets cover
the same spans of time."""
i = 0
j = 0
outside = True
try:
while True:
if (outside):
# To match, we need to be finished this set
if (i >= len(self) and j >= len(other)):
return True
# Or the starts need to match
if (self[i].start != other[j].start):
return False
outside = False
else:
# We can move on if the two interval ends match
if (self[i].end == other[j].end):
i += 1
j += 1
outside = True
else:
# Whichever ends first needs to be adjacent to the next
if (self[i].end < other[j].end):
if (not self[i].is_adjacent(self[i+1], self.tolerance)):
return False
i += 1
else:
if (not other[j].is_adjacent(other[j+1], self.tolerance)):
return False
j += 1
except IndexError:
return False

def __ne__(self, other):
return not self.__eq__(other)

def __len__(self):
return len(self.data)

def __getitem__(self, key):
return self.data.__getitem__(key)

def __iadd__(self, other):
"""Inplace add -- modifies self

This throws an exception if the regions being added intersect."""
if isinstance(other, Interval):
self._add_single_interval(other)
else:
self._add_intervals(other)
return self
def __add__(self, other):
"""Add -- returns a new object

This throws an exception if the regions being added intersect."""
new = IntervalSet(self)
new += IntervalSet(other)
return new

def __and__(self, other):
"""
Compute a new IntervalSet from the intersection of two others

Output intervals are built as subsets of the intervals in the
first argument (self).

This does not take tolerances into account -- any gap between
two adjacent intervals in an input will be reflected in the
output.
"""
out = IntervalSet()

if isinstance(other, IntervalSet):
# We were given a set -- intersect with each interval inside
for interval in other.data:
out += self & interval
else:
# Intersecting with a just a single interval.
out = IntervalSet()
# This loop could likely be optimized using a bisect.
for this in self.data:
# If there's any overlap, add the overlapping region
if this.intersects(other):
out += this.subset(max(this.start, other.start),
min(this.end, other.end))
return out
def _add_intervals(self, iterable):
"""Add each Interval from an interable to this set"""
for element in iter(iterable):
self._add_single_interval(element)

def _add_single_interval(self, interval):
"""Add one Interval to this set"""
# This loop could likely be optimized using a bisect.
for existing in self.data:
if existing.intersects(interval):
raise IntervalError("Tried to add overlapping interval "
"to this set")
bisect.insort(self.data, interval)

+ 0
- 1
nilmdb/nilmdb.py View File

@@ -267,6 +267,5 @@ class NilmDB(object):
"""
# First check for basic overlap using timestamp info from the parser.

raise NilmDBError("go away")

+ 1
- 0
setup.cfg View File

@@ -10,6 +10,7 @@ cover-erase=
##cover-branches= # need nose 1.1.3 for this
stop=
verbosity=2
tests=tests/test_interval.py
#tests=tests/test_timestamper.py
#tests=tests/test_serializer.py
#tests=tests/test_client.py:TestClient.test_client_nilmdb


+ 190
- 0
tests/test_interval.py View File

@@ -0,0 +1,190 @@
import nilmdb
from nilmdb.printf import *
import datetime_tz

from nose.tools import *
from nose.tools import assert_raises
import itertools

def test_interval():
"""Test the Interval class"""
(d1, d2, d3) = [ datetime_tz.datetime_tz.smartparse(x).totimestamp()
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]

# basic construction
i = nilmdb.interval.Interval(d1, d1)
i = nilmdb.interval.Interval(d1, d3)
assert(i.start == d1)
assert(i.end == d3)

# assignment should work
i.start = d2
try:
i.end = d1
raise Exception("should have died there")
except IntervalError:
pass
i.start = d1
i.end = d2

# end before start
assert_raises(IntervalError, Interval, d3, d1)

# wrong type
assert_raises(IntervalError, Interval, 1, 2)

# compare
assert(Interval(d1, d2) == Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d1, d3))
assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3))
assert_raises(TypeError, cmp, i, 123)

# subset
assert(Interval(d1, d3).subset(d1, d2) == Interval(d1, d2))
assert_raises(IntervalError, Interval(d2, d3).subset, d1, d2)

# append
assert(Interval(d1, d2).is_adjacent(Interval(d2,d3)))
assert(Interval(d2, d3).is_adjacent(Interval(d1,d2)))
assert(not Interval(d2, d3).is_adjacent(Interval(d1,d3)))
assert_raises(TypeError, Interval(d1, d2).is_adjacent, 1)

# misc
assert(repr(i) == repr(eval(repr(i).replace("datetime.",""))))
assert(str(i) == "[1980-12-05 00:00:00 -> 1990-02-16 00:00:00]")

def test_interval_intersect():
"""Test Interval intersections"""
dates = [ datetime.strptime(year, "%y") for year in [ "00", "01", "02", "03" ] ]
perm = list(itertools.permutations(dates, 2))
prod = list(itertools.product(perm, perm))
should_intersect = {
False: [4, 5, 8, 20, 48, 56, 60, 96, 97, 100],
True: [0, 1, 2, 12, 13, 14, 16, 17, 24, 25, 26, 28, 29,
32, 49, 50, 52, 53, 61, 62, 64, 65, 68, 98, 101, 104]}
for i,((a,b),(c,d)) in enumerate(prod):
try:
i1 = Interval(a, b)
i2 = Interval(c, d)
assert(i1.intersects(i2) == i2.intersects(i1))
assert(i in should_intersect[i1.intersects(i2)])
except IntervalError:
assert(i not in should_intersect[True] and
i not in should_intersect[False])
assert_raises(TypeError, i1.intersects, 1234)

def test_intervalset_construct():
"""Test interval set construction"""
dates = [ datetime.strptime(year, "%y") for year in [ "00", "01", "02", "03" ]]

a = Interval(dates[0], dates[1])
b = Interval(dates[1], dates[2])
c = Interval(dates[0], dates[2])
d = Interval(dates[2], dates[3])

iseta = IntervalSet(a)
isetb = IntervalSet([a, b])
isetc = IntervalSet([a])
assert(iseta != isetb)
assert(iseta == isetc)
assert(iseta != 3)
assert(IntervalSet(a) != IntervalSet(b))
assert_raises(TypeError, cmp, iseta, isetb)
assert_raises(IntervalError, IntervalSet, [a, b, c])
assert_raises(TypeError, IntervalSet, [1, 2])

iset = IntervalSet(isetb) # test iterator
assert(iset == isetb)
assert(len(iset) == 2)
assert(len(IntervalSet()) == 0)

# Test adding
iset = IntervalSet(a)
iset += IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a)
iset += b
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a) + IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(b) + a
assert(iset == IntervalSet([a, b]))

# A set consisting of [0-1],[1-2] should match a set consisting of [0-2]
assert(IntervalSet([a,b]) == IntervalSet([c]))
# Etc
assert(IntervalSet([a,d]) != IntervalSet([c]))
assert(IntervalSet([c]) != IntervalSet([a,d]))
assert(IntervalSet([c,d]) != IntervalSet([b,d]))
# misc
assert(repr(iset) == repr(eval(repr(iset).replace("datetime.",""))))

def iset(string):
"""Build an IntervalSet from a string, for testing purposes

Each character is a year
[ = interval start
| = interval end + adjacent start
] = interval end
anything else is ignored
"""
iset = IntervalSet()
for i, c in enumerate(string):
day = datetime.strptime("{0:04d}".format(i+2000), "%Y")
if (c == "["):
start = day
elif (c == "|"):
iset += Interval(start, day)
start = day
elif (c == "]"):
iset += Interval(start, day)
del start
return iset

def test_intervalset_iset():
"""Test basic iset construction"""
assert(iset(" [----] ") ==
iset(" [-|--] "))

assert(iset("[] [--] ") +
iset(" [] [--]") ==
iset("[|] [-----]"))

def test_intervalset_intsersect():
"""Test intersection (&)"""
assert_raises(TypeError, iset("[--]").__and__, 1234)
assert(iset("[---------]") &
iset(" [---] ") ==
iset(" [---] "))

assert(iset(" [---] ") &
iset("[---------]") ==
iset(" [---] "))

assert(iset(" [-----]") &
iset(" [-----] ") ==
iset(" [--] "))

assert(iset(" [---]") &
iset(" [--] ") ==
iset(" "))

assert(iset(" [-|---]") &
iset(" [-----|-] ") ==
iset(" [----] "))

assert(iset(" [-|-] ") &
iset(" [-|--|--] ") ==
iset(" [---] "))

assert(iset(" [----][--]") &
iset("[-] [--] []") ==
iset(" [] [-] []"))


Loading…
Cancel
Save