nilmdb/tests/test_layout.py
2012-08-09 17:53:01 +00:00

251 lines
8.9 KiB
Python

# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.printf import *
from nose.tools import *
from nose.tools import assert_raises
import distutils.version
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import random
import unittest
from test_helpers import *
from nilmdb.layout import *
class TestLayouts(object):
# Some nilmdb.layout tests. Not complete, just fills in missing
# coverage.
def test_layouts(self):
x = nilmdb.layout.get_named("PrepData").description()
y = nilmdb.layout.get_named("float32_8").description()
eq_(repr(x), repr(y))
def test_parsing(self):
self.real_t_parsing("PrepData", "RawData", "RawNotchedData")
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
def real_t_parsing(self, name_prep, name_raw, name_rawnotch):
# invalid layouts
with assert_raises(TypeError) as e:
parser = Parser("NoSuchLayout")
with assert_raises(TypeError) as e:
parser = Parser("float32")
# too little data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# too much data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# just right
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")
parser.parse(data)
eq_(parser.min_timestamp, 1234567890.0)
eq_(parser.max_timestamp, 1234567890.1)
eq_(parser.data, [[1234567890.0,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8],
[1234567890.1,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8]])
# try RawData too, with clamping
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
parser.parse(data)
eq_(parser.data, [[1234567890.0,1,2,3,4,5,6],
[1234567890.1,1,2,3,4,5,6]])
# pass an instantiated class
parser = Parser(get_named(name_rawnotch))
data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n" )
parser.parse(data)
# non-monotonic
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.000000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))
# RawData with values out of bounds
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 500000 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("value out of range", str(e.exception))
# Empty data should work but is useless
parser = Parser(name_raw)
data = ""
parser.parse(data)
assert(parser.min_timestamp is None)
assert(parser.max_timestamp is None)
def test_formatting(self):
self.real_t_formatting("PrepData", "RawData", "RawNotchedData")
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
def real_t_formatting(self, name_prep, name_raw, name_rawnotch):
# invalid layout
with assert_raises(TypeError) as e:
formatter = Formatter("NoSuchLayout")
# too little data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# too much data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# just right
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1.100000 2.200000 3.300000 4.400000 " +
"5.500000 6.600000 7.700000 8.800000\n" +
"1234567890.100000 1.100000 2.200000 3.300000 4.400000 " +
"5.500000 6.600000 7.700000 8.800000\n")
# try RawData too
formatter = Formatter(name_raw)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n")
# pass an instantiated class
formatter = Formatter(get_named(name_rawnotch))
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n")
# Empty data should work but is useless
formatter = Formatter(name_raw)
data = []
text = formatter.format(data)
eq_(text, "")
def test_roundtrip(self):
self.real_t_roundtrip("PrepData", "RawData", "RawNotchedData")
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
def real_t_roundtrip(self, name_prep, name_raw, name_rawnotch):
# Verify that textual data passed into the Parser, and then
# back through the Formatter, then back into the Parser,
# gives identical parsed representations
random.seed(12345)
def do_roundtrip(layout, datagen):
for i in range(100):
rows = random.randint(1,100)
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
parser1 = Parser(layout)
formatter = Formatter(layout)
parser2 = Parser(layout)
parser1.parse(data)
parser2.parse(formatter.format(parser1.data))
eq_(parser1.data, parser2.data)
def datagen():
return [ sprintf("%f", random.uniform(-1000,1000))
for x in range(8) ]
do_roundtrip(name_prep, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_roundtrip(name_raw, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(9) ]
do_roundtrip(name_rawnotch, datagen)
class TestLayoutSpeed:
@unittest.skip("this is slow")
def test_layout_speed(self):
import time
random.seed(54321)
def do_speedtest(layout, datagen, rows = 5000, times = 100):
# Build data once
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
# Do lots of roundtrips
start = time.time()
for i in range(times):
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
data = formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
elapsed * 1e3,
(elapsed * 1e6) / (rows * times),
(rows * times) / elapsed)
print ""
def datagen():
return [ sprintf("%f", random.uniform(-1000,1000))
for x in range(10) ]
do_speedtest("float32_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)