tolerance in equality comparisions, but it's a real pain to get all the edge cases right, and it's not clear we actually need that functionality at the moment. Skip it for now. git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10659 ddd99763-3ecb-0310-9145-efcb8ce7c51ftags/bxinterval-last
@@ -14,31 +14,26 @@ class Interval(object): | |||
start = None | |||
end = None | |||
def __init__(self, start, end, tolerance = 0.0): | |||
def __init__(self, start, end): | |||
""" | |||
'start' and 'end' are time, in floating point seconds since epoch. | |||
When determining whether two intervals are adjacent, a gap of | |||
up to 'tolerance' units is allowed between this interval and | |||
the subsequent one. | |||
""" | |||
if start > end: | |||
raise IntervalError("start must precede end") | |||
self.start = start | |||
self.end = end | |||
self.tolerance = tolerance | |||
def __repr__(self): | |||
s = repr(self.start) + ", " + repr(self.end) | |||
if self.tolerance: | |||
s += ", " + repr(self.tolerance) | |||
return self.__class__.__name__ + "(" + s + ")" | |||
def __str__(self): | |||
return "[" + str(self.start) + " -> " + str(self.end) + "]" | |||
def __cmp__(self, other): | |||
"""Compare two intervals. If non-equal, order by start then end""" | |||
""" | |||
Compare two intervals. If non-equal, order by start then end. | |||
""" | |||
if self.start == other.start: | |||
if self.end < other.end: | |||
return -1 | |||
@@ -56,23 +51,12 @@ class Interval(object): | |||
return False | |||
return True | |||
def is_adjacent(self, other): | |||
"""Return True if two Intervals are adjacent (same end or | |||
start), within the tolerance gap. Overlapping intervals are | |||
not considered adjacent.""" | |||
if self.intersects(other): | |||
return False | |||
for (a, b) in [(self, other), (other, self)]: | |||
if a.end <= b.start and (a.end + a.tolerance) >= b.start: | |||
return True | |||
return False | |||
def subset(self, start, end): | |||
"""Return a new Interval that is a subset of this one""" | |||
# A subclass that tracks additional data might override this. | |||
if start < self.start or end > self.end: | |||
raise IntervalError("not a subset") | |||
return Interval(start, end, self.tolerance) | |||
return Interval(start, end) | |||
class IntervalSet(object): | |||
""" | |||
@@ -105,10 +89,18 @@ class IntervalSet(object): | |||
i = 0 | |||
j = 0 | |||
outside = True | |||
def is_adjacent(a, b): | |||
"""Return True if two Intervals are adjacent (same end or start)""" | |||
if a.end == b.start or b.end == a.start: | |||
return True | |||
else: | |||
return False | |||
try: | |||
while True: | |||
if (outside): | |||
# To match, we need to be finished this set | |||
# To match, we need to be finished both sets | |||
if (i >= len(self) and j >= len(other)): | |||
return True | |||
# Or the starts need to match | |||
@@ -124,11 +116,11 @@ class IntervalSet(object): | |||
else: | |||
# Whichever ends first needs to be adjacent to the next | |||
if (self[i].end < other[j].end): | |||
if (not self[i].is_adjacent(self[i+1], self.tolerance)): | |||
if (not is_adjacent(self[i],self[i+1])): | |||
return False | |||
i += 1 | |||
else: | |||
if (not other[j].is_adjacent(other[j+1], self.tolerance)): | |||
if (not is_adjacent(other[j],other[j+1])): | |||
return False | |||
j += 1 | |||
except IndexError: | |||
@@ -167,10 +159,6 @@ class IntervalSet(object): | |||
Output intervals are built as subsets of the intervals in the | |||
first argument (self). | |||
This does not take tolerances into account -- any gap between | |||
two adjacent intervals in an input will be reflected in the | |||
output. | |||
""" | |||
out = IntervalSet() | |||
@@ -180,7 +168,6 @@ class IntervalSet(object): | |||
out += self & interval | |||
else: | |||
# Intersecting with a just a single interval. | |||
out = IntervalSet() | |||
# This loop could likely be optimized using a bisect. | |||
for this in self.data: | |||
# If there's any overlap, add the overlapping region | |||
@@ -188,7 +175,7 @@ class IntervalSet(object): | |||
out += this.subset(max(this.start, other.start), | |||
min(this.end, other.end)) | |||
return out | |||
def _add_intervals(self, iterable): | |||
"""Add each Interval from an interable to this set""" | |||
for element in iter(iterable): | |||
@@ -267,5 +267,6 @@ class NilmDB(object): | |||
""" | |||
# First check for basic overlap using timestamp info from the parser. | |||
raise NilmDBError("go away") | |||
@@ -10,7 +10,8 @@ cover-erase= | |||
##cover-branches= # need nose 1.1.3 for this | |||
stop= | |||
verbosity=2 | |||
tests=tests/test_interval.py | |||
#tests=tests/test_interval.py | |||
#tests=tests/test_client.py | |||
#tests=tests/test_timestamper.py | |||
#tests=tests/test_serializer.py | |||
#tests=tests/test_client.py:TestClient.test_client_nilmdb | |||
@@ -6,10 +6,18 @@ from nose.tools import * | |||
from nose.tools import assert_raises | |||
import itertools | |||
from nilmdb.interval import Interval, IntervalError | |||
from nilmdb.interval import Interval, IntervalSet, IntervalError | |||
def eq_(a, b): | |||
if not a == b or not b == a: | |||
raise AssertionError("%r != %r" % (a, b)) | |||
def ne_(a, b): | |||
if not a != b or not b != a: | |||
raise AssertionError("unexpected %r == %r" % (a, b)) | |||
def test_interval(): | |||
"""Test Interval class""" | |||
# Test Interval class | |||
(d1, d2, d3) = [ datetime_tz.datetime_tz.smartparse(x).totimestamp() | |||
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ] | |||
@@ -46,22 +54,13 @@ def test_interval(): | |||
with assert_raises(IntervalError): | |||
x = Interval(d2, d3).subset(d1, d2) | |||
# append | |||
assert(Interval(d1, d2).is_adjacent(Interval(d2, d3))) | |||
assert(Interval(d2, d3).is_adjacent(Interval(d1, d2))) | |||
assert(not Interval(d2, d3).is_adjacent(Interval(d1, d3))) | |||
with assert_raises(AttributeError): | |||
x = Interval(d1, d2).is_adjacent(1) | |||
# misc | |||
i = Interval(d1, d2) | |||
j = Interval(d1, d2, 1.23) | |||
eq_(repr(i), repr(eval(repr(i)))) | |||
eq_(repr(j), repr(eval(repr(j)))) | |||
eq_(str(i), "[1332561600.0 -> 1332648000.0]") | |||
def test_interval_intersect(): | |||
"""Test Interval intersections""" | |||
# Test Interval intersections | |||
dates = [ 100, 200, 300, 400 ] | |||
perm = list(itertools.permutations(dates, 2)) | |||
prod = list(itertools.product(perm, perm)) | |||
@@ -83,8 +82,8 @@ def test_interval_intersect(): | |||
x = i1.intersects(1234) | |||
def test_intervalset_construct(): | |||
"""Test IntervalSet construction""" | |||
dates = [ datetime.strptime(year, "%y") for year in [ "00", "01", "02", "03" ]] | |||
# Test IntervalSet construction | |||
dates = [ 100, 200, 300, 400 ] | |||
a = Interval(dates[0], dates[1]) | |||
b = Interval(dates[1], dates[2]) | |||
@@ -94,14 +93,19 @@ def test_intervalset_construct(): | |||
iseta = IntervalSet(a) | |||
isetb = IntervalSet([a, b]) | |||
isetc = IntervalSet([a]) | |||
assert(iseta != isetb) | |||
assert(iseta == isetc) | |||
assert(iseta != 3) | |||
assert(IntervalSet(a) != IntervalSet(b)) | |||
assert_raises(TypeError, cmp, iseta, isetb) | |||
assert_raises(IntervalError, IntervalSet, [a, b, c]) | |||
assert_raises(TypeError, IntervalSet, [1, 2]) | |||
ne_(iseta, isetb) | |||
eq_(iseta, isetc) | |||
with assert_raises(TypeError): | |||
x = iseta != 3 | |||
ne_(IntervalSet(a), IntervalSet(b)) | |||
# overlap | |||
with assert_raises(IntervalError): | |||
x = IntervalSet([a, b, c]) | |||
# bad types | |||
with assert_raises(AttributeError): | |||
x = IntervalSet([1, 2]) | |||
iset = IntervalSet(isetb) # test iterator | |||
assert(iset == isetb) | |||
@@ -128,12 +132,12 @@ def test_intervalset_construct(): | |||
assert(IntervalSet([c,d]) != IntervalSet([b,d])) | |||
# misc | |||
assert(repr(iset) == repr(eval(repr(iset).replace("datetime.","")))) | |||
assert(repr(iset) == repr(eval(repr(iset)))) | |||
def iset(string): | |||
"""Build an IntervalSet from a string, for testing purposes | |||
Each character is a year | |||
Each character is 1 second | |||
[ = interval start | |||
| = interval end + adjacent start | |||
] = interval end | |||
@@ -141,7 +145,7 @@ def iset(string): | |||
""" | |||
iset = IntervalSet() | |||
for i, c in enumerate(string): | |||
day = datetime.strptime("{0:04d}".format(i+2000), "%Y") | |||
day = i + 10000 | |||
if (c == "["): | |||
start = day | |||
elif (c == "|"): | |||
@@ -153,7 +157,7 @@ def iset(string): | |||
return iset | |||
def test_intervalset_iset(): | |||
"""Test basic iset construction""" | |||
# Test basic iset construction | |||
assert(iset(" [----] ") == | |||
iset(" [-|--] ")) | |||
@@ -161,9 +165,13 @@ def test_intervalset_iset(): | |||
iset(" [] [--]") == | |||
iset("[|] [-----]")) | |||
def test_intervalset_intsersect(): | |||
"""Test intersection (&)""" | |||
assert_raises(TypeError, iset("[--]").__and__, 1234) | |||
assert(iset(" [-------]") == | |||
iset(" [-|-----|")) | |||
def test_intervalset_intersect(): | |||
# Test intersection (&) | |||
assert_raises(AttributeError, iset("[--]").__and__, 1234) | |||
assert(iset("[---------]") & | |||
iset(" [---] ") == | |||
@@ -192,4 +200,3 @@ def test_intervalset_intsersect(): | |||
assert(iset(" [----][--]") & | |||
iset("[-] [--] []") == | |||
iset(" [] [-] []")) | |||