Browse Source

Rename some nilmdb methods to be private

More work with nilmdb.stream_insert.  Checks for overlap between
parser output and database intervals, and actually inserts the
data into pytables.  Need to benchmark/figure out whether we can
use table.append rather than the row.append() nonsense -- it could
simplify things quite a bit.

Improve layout class and add tests to get more coverage.  Better
error handling on invalid inputs (reports the reason for the error)

Replace layout.fillrow with layout.filltable, but turns out that we
can probably just remove it anyway.

Add nilmdb.Timer for simple timing tests

Move some duplicated test suite helper functions into a new file,
test_helper.py

Add class to test_interval.py to match the others


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10661 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 12 years ago
parent
commit
cc4e3bdb76
15 changed files with 409 additions and 247 deletions
  1. +1
    -0
      nilmdb/__init__.py
  2. +7
    -5
      nilmdb/interval.py
  3. +36
    -14
      nilmdb/layout.py
  4. +45
    -10
      nilmdb/nilmdb.py
  5. +1
    -1
      nilmdb/server.py
  6. +16
    -0
      nilmdb/timer.py
  7. +2
    -1
      setup.cfg
  8. +3
    -21
      tests/test_client.py
  9. +3
    -16
      tests/test_cmdline.py
  10. +26
    -0
      tests/test_helpers.py
  11. +167
    -171
      tests/test_interval.py
  12. +85
    -0
      tests/test_layout.py
  13. +13
    -8
      tests/test_nilmdb.py
  14. +2
    -0
      tests/test_printf.py
  15. +2
    -0
      tests/test_serializer.py

+ 1
- 0
nilmdb/__init__.py View File

@@ -3,6 +3,7 @@
from .nilmdb import NilmDB
from .server import Server
from .client import Client
from .timer import Timer

import layout
import serializer


+ 7
- 5
nilmdb/interval.py View File

@@ -176,6 +176,10 @@ class IntervalSet(object):
min(this.end, other.end))
return out

def intersects(self, other):
"""Return True if this IntervalSet intersects another"""
return len(self & other) > 0

def _add_intervals(self, iterable):
"""Add each Interval from an interable to this set"""
for element in iter(iterable):
@@ -183,9 +187,7 @@ class IntervalSet(object):

def _add_single_interval(self, interval):
"""Add one Interval to this set"""
# This loop could likely be optimized using a bisect.
for existing in self.data:
if existing.intersects(interval):
raise IntervalError("Tried to add overlapping interval "
"to this set")
if self.intersects(interval):
raise IntervalError("Tried to add overlapping interval "
"to this set")
bisect.insort(self.data, interval)

+ 36
- 14
nilmdb/layout.py View File

@@ -1,9 +1,18 @@
from __future__ import absolute_import
import nilmdb
from nilmdb.printf import *

import tables
import time
import sys
import inspect
import cStringIO

class ParserError(Exception):
def __init__(self, line, message):
self.message = sprintf("line %d: %s", line, message)
Exception.__init__(self, self.message)

class Layout(object):
"""Represents a NILM database layout"""
def description(self):
@@ -18,6 +27,9 @@ class Layout(object):
converted to the correct types"""
# Consider overriding this in subclasses for speed?
out=[]
if len(self.fields) != len(in_fields):
raise IndexError(sprintf("wanted %d fields, got %d",
len(self.fields), len(in_fields)))
for (n, (name, type)) in enumerate(self.fields):
if name == 'timestamp':
# special case: parse float, save as int
@@ -27,7 +39,7 @@ class Layout(object):
elif type == 'uint16':
out.append(max(0, min(65535, int(in_fields[n], 10))))
else:
raise TypeError("Can't parse type " + type)
raise TypeError("can't parse type " + repr(type))
return out

class PrepData(Layout):
@@ -67,12 +79,15 @@ for name, obj in inspect.getmembers(sys.modules[__name__]):
class Parser(object):
"""Object that parses and stores ASCII data for inclusion into the database"""
def __init__(self, layout):
if layout not in named:
raise TypeError("unknown layout")
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = named[layout]
except KeyError:
raise TypeError("unknown layout")

self.layout = named[layout]
self.data = []
self.nrows = 0
self.min_timestamp = None
self.max_timestamp = None
# Assume timestamp is always the first field, for now
@@ -85,14 +100,14 @@ class Parser(object):
# This currently takes about 0.1 seconds for 1 megabyte of prep data,
# 85 klines/sec. Could clearly be optimized a lot...
indata = cStringIO.StringIO(textdata)
self.nrows = 0
n = 0
# Assume any parsing error is a real error.
# In the future we might want to skip completely empty lines,
# or partial lines right before EOF?
try:
last_ts = None
for line in indata:
self.nrows += 1
n += 1

# Parse and append
fields = line.partition('#')[0].split()
@@ -104,16 +119,23 @@ class Parser(object):
if last_ts is not None and out[self.ts_field] < last_ts:
raise ValueError("timestamp is not monotonically increasing")
last_ts = out[self.ts_field]
except (ValueError, TypeError, IndexError) as e:
raise TypeError("line " + str(self.nrows) + ": " + e.message)
except ValueError as e:
raise ParserError(n, "value error: " + e.message)
except IndexError as e:
raise ParserError(n, "index error: " + e.message)
except TypeError as e:
raise ParserError(n, "type error: " + e.message)

# Mark timestamp ranges
if len(self.data) and self.ts_field is not None:
self.min_timestamp = self.data[0][self.ts_field]
self.max_timestamp = self.data[-1][self.ts_field]

def fillrow(self, tablerow, rownum):
"""Fill a PyTables row object with the parsed data.
The row must match the parser's layout"""
for (n, (name, type)) in enumerate(self.layout.fields):
tablerow[name] = self.data[rownum][n]
def fill_table(self, table):
"""Fill a PyTables table object with the parsed data.
The table's rows must match the parser's layout"""
row = table.row
for data in self.data:
for (n, (name, type)) in enumerate(self.layout.fields):
row[name] = data[n]
row.append()

+ 45
- 10
nilmdb/nilmdb.py View File

@@ -12,6 +12,7 @@ Manages both the SQL database and the PyTables storage backend.
from __future__ import absolute_import
import nilmdb
from nilmdb.printf import *
from nilmdb.interval import Interval, IntervalSet, IntervalError

import sqlite3
import tables
@@ -34,7 +35,7 @@ import errno


# Don't touch old entries -- just add new ones.
sql_schema_updates = {
_sql_schema_updates = {
0: """
-- All streams
CREATE TABLE streams(
@@ -105,7 +106,7 @@ class NilmDB(object):
# database from multiple threads simultaneously. That requirement
# may be relaxed later.
self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
self.sql_schema_update()
self._sql_schema_update()

# See big comment at top about the performance implications of this
if sync:
@@ -128,13 +129,13 @@ class NilmDB(object):
self.h5file.close()
del self.opened

def sql_schema_update(self):
def _sql_schema_update(self):
cur = self.con.cursor()
version = cur.execute("PRAGMA user_version").fetchone()[0]
oldversion = version

while version in sql_schema_updates:
cur.executescript(sql_schema_updates[version])
while version in _sql_schema_updates:
cur.executescript(_sql_schema_updates[version])
version = version + 1
if self.verbose: # pragma: no cover
printf("Schema updated to %d\n", version)
@@ -143,6 +144,22 @@ class NilmDB(object):
with self.con:
cur.execute("PRAGMA user_version = {v:d}".format(v=version))

def _get_intervals(self, stream_id):
"""
Return an IntervalSet corresponding to the given stream ID.
"""
# Could cache these, if it's a performance bottleneck
iset = IntervalSet()
result = self.con.execute("SELECT start_time, end_time " +
"FROM ranges " +
"WHERE stream_id=?", (stream_id,))
try:
for (start, end) in result:
iset += Interval(start, end)
except IntervalError as e: # pragma: no cover
raise NilmDBError("unexpected overlap in ranges table!")
return iset

def stream_list(self, path = None, layout = None):
"""Return list of [path, layout] lists of all streams
in the database.
@@ -220,7 +237,7 @@ class NilmDB(object):
con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
(path, layout_name))

def stream_id(self, path):
def _stream_id(self, path):
"""Return unique stream ID"""
result = self.con.execute("SELECT id FROM streams WHERE path=?",
(path,)).fetchone()
@@ -234,7 +251,7 @@ class NilmDB(object):
v_scaling = 123.45 }
This replaces all existing metadata.
"""
stream_id = self.stream_id(path)
stream_id = self._stream_id(path)
with self.con as con:
for key in data:
con.execute("DELETE FROM metadata "
@@ -245,7 +262,7 @@ class NilmDB(object):

def stream_get_metadata(self, path):
"""Return stream metadata as a dictionary."""
stream_id = self.stream_id(path)
stream_id = self._stream_id(path)
result = self.con.execute("SELECT metadata.key, metadata.value "
"FROM metadata "
"WHERE metadata.stream_id=?", (stream_id,))
@@ -266,7 +283,25 @@ class NilmDB(object):
parser: nilmdb.layout.Parser instance full of data to insert
"""
# First check for basic overlap using timestamp info from the parser.
stream_id = self._stream_id(path)
iset = self._get_intervals(stream_id)
interval = Interval(parser.min_timestamp, parser.max_timestamp)
if iset.intersects(interval):
raise OverlapError("New data overlaps existing data: "
+ str(iset & interval))

# Insert the data into pytables
table = self.h5file.getNode(path)
# with nilmdb.Timer("fill"):
# parser.fill_table(table)
# with nilmdb.Timer("flush"):
# table.flush()

# with nilmdb.Timer("append"):
# table.append()
# with nilmdb.Timer("flush"):
# table.flush()
# TODO: Need to figure out which rows were used there.
return "ok"
raise NilmDBError("go away")

+ 1
- 1
nilmdb/server.py View File

@@ -165,7 +165,7 @@ class Stream(NilmApp):
try:
parser = nilmdb.layout.Parser(layout)
parser.parse(body)
except TypeError as e:
except nilmdb.layout.ParserError as e:
raise cherrypy.HTTPError("400 Bad Request",
"Error parsing input data: " +
e.message)


+ 16
- 0
nilmdb/timer.py View File

@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-

# Simple timer to time a block of code, for optimization debugging
# use like:
# with nilmdb.Timer("flush"):
# foo.flush()

import contextlib
import time

@contextlib.contextmanager
def Timer(name = None):
start = time.time()
yield
elapsed = int((time.time() - start) * 1000)
print (name or 'elapsed') + ": " + str(elapsed) + " ms"

+ 2
- 1
setup.cfg View File

@@ -5,11 +5,12 @@ nologcapture= # comment to see cherrypy logs on failure
with-coverage=
cover-inclusive=
cover-package=nilmdb
cover-erase=
#cover-erase=
##cover-html= # this works, puts html output in cover/ dir
##cover-branches= # need nose 1.1.3 for this
stop=
verbosity=2
#tests=tests/test_layout.py
#tests=tests/test_interval.py
#tests=tests/test_client.py
#tests=tests/test_timestamper.py


+ 3
- 21
tests/test_client.py View File

@@ -10,35 +10,17 @@ import json
import itertools
import distutils.version
import os
import shutil
import sys
import threading

testdb = "tests/client-testdb"

def eq_(a, b):
if not a == b:
raise AssertionError("%r != %r" % (a, b))
from test_helpers import *

def in_(a, b):
if a not in b:
raise AssertionError("%r not in %r" % (a, b))

def ne_(a, b):
if not a != b:
raise AssertionError("unexpected %r == %r" % (a, b))
testdb = "tests/client-testdb"

def setup_module():
global test_server, test_db
# Clear out DB
try:
shutil.rmtree(testdb)
except:
pass
try:
os.unlink(testdb)
except:
pass
recursive_unlink(testdb)

# Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False)


+ 3
- 16
tests/test_cmdline.py View File

@@ -16,28 +16,15 @@ import Queue
import StringIO
import shlex

testdb = "tests/cmdline-testdb"

def eq_(a, b):
if not a == b:
raise AssertionError("%r != %r" % (a, b))
from test_helpers import *

def ne_(a, b):
if not a != b:
raise AssertionError("unexpected %r == %r" % (a, b))
testdb = "tests/cmdline-testdb"

class TestCmdline(object):

def setUp(self):
# Clear out DB
try:
shutil.rmtree(testdb)
except:
pass
try:
os.unlink(testdb)
except:
pass
recursive_unlink(testdb)

# Start web app on a custom port
self.db = nilmdb.NilmDB(testdb, sync=False)


+ 26
- 0
tests/test_helpers.py View File

@@ -0,0 +1,26 @@
# Just some helpers for test functions

import shutil, os

def eq_(a, b):
if not a == b:
raise AssertionError("%r != %r" % (a, b))

def in_(a, b):
if a not in b:
raise AssertionError("%r not in %r" % (a, b))

def ne_(a, b):
if not a != b:
raise AssertionError("unexpected %r == %r" % (a, b))

def recursive_unlink(path):
try:
shutil.rmtree(path)
except OSError:
pass
try:
os.unlink(path)
except OSError:
pass


+ 167
- 171
tests/test_interval.py View File

@@ -8,133 +8,9 @@ import itertools

from nilmdb.interval import Interval, IntervalSet, IntervalError

def eq_(a, b):
if not a == b or not b == a:
raise AssertionError("%r != %r" % (a, b))

def ne_(a, b):
if not a != b or not b != a:
raise AssertionError("unexpected %r == %r" % (a, b))

def test_interval():
# Test Interval class
(d1, d2, d3) = [ datetime_tz.datetime_tz.smartparse(x).totimestamp()
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]

# basic construction
i = Interval(d1, d1)
i = Interval(d1, d3)
assert(i.start == d1)
assert(i.end == d3)

# assignment is allowed, but not verified
i.start = d2
#with assert_raises(IntervalError):
# i.end = d1
i.start = d1
i.end = d2

# end before start
with assert_raises(IntervalError):
i = Interval(d3, d1)

# compare
assert(Interval(d1, d2) == Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d1, d3))
assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3))
with assert_raises(AttributeError):
x = (i == 123)

# subset
assert(Interval(d1, d3).subset(d1, d2) == Interval(d1, d2))
with assert_raises(IntervalError):
x = Interval(d2, d3).subset(d1, d2)

# misc
i = Interval(d1, d2)
eq_(repr(i), repr(eval(repr(i))))
eq_(str(i), "[1332561600.0 -> 1332648000.0]")

def test_interval_intersect():
# Test Interval intersections
dates = [ 100, 200, 300, 400 ]
perm = list(itertools.permutations(dates, 2))
prod = list(itertools.product(perm, perm))
should_intersect = {
False: [4, 5, 8, 20, 48, 56, 60, 96, 97, 100],
True: [0, 1, 2, 12, 13, 14, 16, 17, 24, 25, 26, 28, 29,
32, 49, 50, 52, 53, 61, 62, 64, 65, 68, 98, 101, 104]
}
for i,((a,b),(c,d)) in enumerate(prod):
try:
i1 = Interval(a, b)
i2 = Interval(c, d)
assert(i1.intersects(i2) == i2.intersects(i1))
assert(i in should_intersect[i1.intersects(i2)])
except IntervalError:
assert(i not in should_intersect[True] and
i not in should_intersect[False])
with assert_raises(AttributeError):
x = i1.intersects(1234)

def test_intervalset_construct():
# Test IntervalSet construction
dates = [ 100, 200, 300, 400 ]

a = Interval(dates[0], dates[1])
b = Interval(dates[1], dates[2])
c = Interval(dates[0], dates[2])
d = Interval(dates[2], dates[3])

iseta = IntervalSet(a)
isetb = IntervalSet([a, b])
isetc = IntervalSet([a])
ne_(iseta, isetb)
eq_(iseta, isetc)
with assert_raises(TypeError):
x = iseta != 3
ne_(IntervalSet(a), IntervalSet(b))

# overlap
with assert_raises(IntervalError):
x = IntervalSet([a, b, c])

# bad types
with assert_raises(AttributeError):
x = IntervalSet([1, 2])

iset = IntervalSet(isetb) # test iterator
assert(iset == isetb)
assert(len(iset) == 2)
assert(len(IntervalSet()) == 0)

# Test adding
iset = IntervalSet(a)
iset += IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a)
iset += b
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a) + IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(b) + a
assert(iset == IntervalSet([a, b]))

# A set consisting of [0-1],[1-2] should match a set consisting of [0-2]
assert(IntervalSet([a,b]) == IntervalSet([c]))
# Etc
assert(IntervalSet([a,d]) != IntervalSet([c]))
assert(IntervalSet([c]) != IntervalSet([a,d]))
assert(IntervalSet([c,d]) != IntervalSet([b,d]))
# misc
assert(repr(iset) == repr(eval(repr(iset))))

def iset(string):
from test_helpers import *

def makeset(string):
"""Build an IntervalSet from a string, for testing purposes

Each character is 1 second
@@ -156,47 +32,167 @@ def iset(string):
del start
return iset

def test_intervalset_iset():
# Test basic iset construction
assert(iset(" [----] ") ==
iset(" [-|--] "))

assert(iset("[] [--] ") +
iset(" [] [--]") ==
iset("[|] [-----]"))

assert(iset(" [-------]") ==
iset(" [-|-----|"))


def test_intervalset_intersect():
# Test intersection (&)
assert_raises(AttributeError, iset("[--]").__and__, 1234)
assert(iset("[---------]") &
iset(" [---] ") ==
iset(" [---] "))

assert(iset(" [---] ") &
iset("[---------]") ==
iset(" [---] "))

assert(iset(" [-----]") &
iset(" [-----] ") ==
iset(" [--] "))

assert(iset(" [---]") &
iset(" [--] ") ==
iset(" "))

assert(iset(" [-|---]") &
iset(" [-----|-] ") ==
iset(" [----] "))

assert(iset(" [-|-] ") &
iset(" [-|--|--] ") ==
iset(" [---] "))

assert(iset(" [----][--]") &
iset("[-] [--] []") ==
iset(" [] [-] []"))
class TestInterval:
def test_interval(self):
# Test Interval class
(d1, d2, d3) = [ datetime_tz.datetime_tz.smartparse(x).totimestamp()
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]

# basic construction
i = Interval(d1, d1)
i = Interval(d1, d3)
assert(i.start == d1)
assert(i.end == d3)

# assignment is allowed, but not verified
i.start = d2
#with assert_raises(IntervalError):
# i.end = d1
i.start = d1
i.end = d2

# end before start
with assert_raises(IntervalError):
i = Interval(d3, d1)

# compare
assert(Interval(d1, d2) == Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d1, d3))
assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3))
with assert_raises(AttributeError):
x = (i == 123)

# subset
assert(Interval(d1, d3).subset(d1, d2) == Interval(d1, d2))
with assert_raises(IntervalError):
x = Interval(d2, d3).subset(d1, d2)

# misc
i = Interval(d1, d2)
eq_(repr(i), repr(eval(repr(i))))
eq_(str(i), "[1332561600.0 -> 1332648000.0]")

def test_interval_intersect(self):
# Test Interval intersections
dates = [ 100, 200, 300, 400 ]
perm = list(itertools.permutations(dates, 2))
prod = list(itertools.product(perm, perm))
should_intersect = {
False: [4, 5, 8, 20, 48, 56, 60, 96, 97, 100],
True: [0, 1, 2, 12, 13, 14, 16, 17, 24, 25, 26, 28, 29,
32, 49, 50, 52, 53, 61, 62, 64, 65, 68, 98, 101, 104]
}
for i,((a,b),(c,d)) in enumerate(prod):
try:
i1 = Interval(a, b)
i2 = Interval(c, d)
assert(i1.intersects(i2) == i2.intersects(i1))
assert(i in should_intersect[i1.intersects(i2)])
except IntervalError:
assert(i not in should_intersect[True] and
i not in should_intersect[False])
with assert_raises(AttributeError):
x = i1.intersects(1234)

def test_intervalset_construct(self):
# Test IntervalSet construction
dates = [ 100, 200, 300, 400 ]

a = Interval(dates[0], dates[1])
b = Interval(dates[1], dates[2])
c = Interval(dates[0], dates[2])
d = Interval(dates[2], dates[3])

iseta = IntervalSet(a)
isetb = IntervalSet([a, b])
isetc = IntervalSet([a])
ne_(iseta, isetb)
eq_(iseta, isetc)
with assert_raises(TypeError):
x = iseta != 3
ne_(IntervalSet(a), IntervalSet(b))

# overlap
with assert_raises(IntervalError):
x = IntervalSet([a, b, c])

# bad types
with assert_raises(AttributeError):
x = IntervalSet([1, 2])

iset = IntervalSet(isetb) # test iterator
assert(iset == isetb)
assert(len(iset) == 2)
assert(len(IntervalSet()) == 0)

# Test adding
iset = IntervalSet(a)
iset += IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a)
iset += b
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(a) + IntervalSet(b)
assert(iset == IntervalSet([a, b]))
iset = IntervalSet(b) + a
assert(iset == IntervalSet([a, b]))

# A set consisting of [0-1],[1-2] should match a set consisting of [0-2]
assert(IntervalSet([a,b]) == IntervalSet([c]))
# Etc
assert(IntervalSet([a,d]) != IntervalSet([c]))
assert(IntervalSet([c]) != IntervalSet([a,d]))
assert(IntervalSet([c,d]) != IntervalSet([b,d]))

# misc
assert(repr(iset) == repr(eval(repr(iset))))

def test_intervalset_geniset(self):
# Test basic iset construction
assert(makeset(" [----] ") ==
makeset(" [-|--] "))

assert(makeset("[] [--] ") +
makeset(" [] [--]") ==
makeset("[|] [-----]"))

assert(makeset(" [-------]") ==
makeset(" [-|-----|"))


def test_intervalset_intersect(self):
# Test intersection (&)
with assert_raises(AttributeError):
x = makeset("[--]") & 1234

assert(makeset("[---------]") &
makeset(" [---] ") ==
makeset(" [---] "))

assert(makeset(" [---] ") &
makeset("[---------]") ==
makeset(" [---] "))

assert(makeset(" [-----]") &
makeset(" [-----] ") ==
makeset(" [--] "))

assert(makeset(" [---]") &
makeset(" [--] ") ==
makeset(" "))

assert(makeset(" [-|---]") &
makeset(" [-----|-] ") ==
makeset(" [----] "))

assert(makeset(" [-|-] ") &
makeset(" [-|--|--] ") ==
makeset(" [---] "))

assert(makeset(" [----][--]") &
makeset("[-] [--] []") ==
makeset(" [] [-] []"))

+ 85
- 0
tests/test_layout.py View File

@@ -0,0 +1,85 @@
import nilmdb

from nose.tools import *
from nose.tools import assert_raises
import distutils.version
import json
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO

from test_helpers import *

from nilmdb.layout import *

class TestLayouts(object):
# Some nilmdb.layout tests. Not complete, just fills in missing
# coverage.
def test_layouts(self):
x = nilmdb.layout.named["PrepData"].description()

def test_parsing(self):
# invalid layout
with assert_raises(TypeError) as e:
parser = Parser("NoSuchLayout")

# too little data
parser = Parser("PrepData")
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("index error", str(e.exception))

# too much data
parser = Parser("PrepData")
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("index error", str(e.exception))

# just right
parser = Parser("PrepData")
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")
parser.parse(data)
# try RawData too
parser = Parser("RawData")
data = ( "1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
parser.parse(data)

# pass an instantiated class
parser = Parser(RawNotchedData())
data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n" )
parser.parse(data)

# some invalid type
class CrappyLayout(RawData):
pass
x = CrappyLayout()
x.fields = x.fields + [("fakename", "faketype")]
parser = Parser(x)
data = ( "1234567890.000000 1 2 3 4 5 6 fake\n" +
"1234567890.100000 1 2 3 4 5 6 fake\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("can't parse type", str(e.exception))

# non-monotonic
parser = Parser("RawData")
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.000000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))

+ 13
- 8
tests/test_nilmdb.py View File

@@ -13,6 +13,7 @@ import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO

testdb = "tests/testdb"

@@ -20,16 +21,11 @@ testdb = "tests/testdb"
#def cleanup():
# os.unlink(testdb)

from test_helpers import *

class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self):
try:
shutil.rmtree(testdb)
except:
pass
try:
os.unlink(testdb)
except:
pass
recursive_unlink(testdb)

with assert_raises(IOError):
nilmdb.NilmDB("/nonexistant-db/foo")
@@ -39,6 +35,14 @@ class Test00Nilmdb(object): # named 00 so it runs first
db = nilmdb.NilmDB(testdb, sync=False)
db.close()

# test timer, just to get coverage
capture = cStringIO.StringIO()
old = sys.stdout
sys.stdout = capture
with nilmdb.Timer("test"):
nilmdb.timer.time.sleep(0.1)
sys.stdout = old

def test_stream(self):
db = nilmdb.NilmDB(testdb, sync=False)
eq_(db.stream_list(), [])
@@ -196,3 +200,4 @@ class TestServer(object):
with assert_raises(HTTPError) as e:
getjson("/stream/insert?path=/newton/prep")
eq_(e.exception.code, 400)


+ 2
- 0
tests/test_printf.py View File

@@ -6,6 +6,8 @@ from nose.tools import assert_raises
from cStringIO import StringIO
import sys

from test_helpers import *

class TestPrintf(object):
def test_printf(self):
old_stdout = sys.stdout


+ 2
- 0
tests/test_serializer.py View File

@@ -7,6 +7,8 @@ from nose.tools import assert_raises
import threading
import time

from test_helpers import *

#raise nose.exc.SkipTest("Skip these")

class Foo(object):


Loading…
Cancel
Save