# -*- coding: utf-8 -*- import nilmdb from nilmdb.utils.printf import * from nose.tools import * from nose.tools import assert_raises import distutils.version import itertools import os import sys import random import unittest from testutil.helpers import * from nilmdb.server.layout import * class TestLayouts(object): # Some nilmdb.layout tests. Not complete, just fills in missing # coverage. def test_layouts(self): x = nilmdb.server.layout.get_named("PrepData") y = nilmdb.server.layout.get_named("float32_8") eq_(x.count, y.count) eq_(x.datatype, y.datatype) y = nilmdb.server.layout.get_named("float32_7") ne_(x.count, y.count) eq_(x.datatype, y.datatype) def test_parsing(self): self.real_t_parsing("PrepData", "RawData", "RawNotchedData") self.real_t_parsing("float32_8", "uint16_6", "uint16_9") def real_t_parsing(self, name_prep, name_raw, name_rawnotch): # invalid layouts with assert_raises(TypeError) as e: parser = Parser("NoSuchLayout") with assert_raises(TypeError) as e: parser = Parser("float32") # too little data parser = Parser(name_prep) data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" + "1234567890.100000 1.1 2.2 3.3 4.4 5.5\n") with assert_raises(ParserError) as e: parser.parse(data) in_("error", str(e.exception)) # too much data parser = Parser(name_prep) data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" + "1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n") with assert_raises(ParserError) as e: parser.parse(data) in_("error", str(e.exception)) # just right parser = Parser(name_prep) data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" + "1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n") parser.parse(data) eq_(parser.min_timestamp, 1234567890.0) eq_(parser.max_timestamp, 1234567890.1) eq_(parser.data, [[1234567890.0,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8], [1234567890.1,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8]]) # try RawData too, with clamping parser = Parser(name_raw) data = ( "1234567890.000000 1 2 3 4 5 6\n" + "1234567890.100000 1 2 3 4 5 6\n" ) parser.parse(data) eq_(parser.data, [[1234567890.0,1,2,3,4,5,6], [1234567890.1,1,2,3,4,5,6]]) # pass an instantiated class parser = Parser(get_named(name_rawnotch)) data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" + "1234567890.100000 1 2 3 4 5 6 7 8 9\n" ) parser.parse(data) # non-monotonic parser = Parser(name_raw) data = ( "1234567890.100000 1 2 3 4 5 6\n" + "1234567890.099999 1 2 3 4 5 6\n" ) with assert_raises(ParserError) as e: parser.parse(data) in_("not monotonically increasing", str(e.exception)) parser = Parser(name_raw) data = ( "1234567890.100000 1 2 3 4 5 6\n" + "1234567890.100000 1 2 3 4 5 6\n" ) with assert_raises(ParserError) as e: parser.parse(data) in_("not monotonically increasing", str(e.exception)) parser = Parser(name_raw) data = ( "1234567890.100000 1 2 3 4 5 6\n" + "1234567890.100001 1 2 3 4 5 6\n" ) parser.parse(data) # RawData with values out of bounds parser = Parser(name_raw) data = ( "1234567890.000000 1 2 3 4 500000 6\n" + "1234567890.100000 1 2 3 4 5 6\n" ) with assert_raises(ParserError) as e: parser.parse(data) in_("value out of range", str(e.exception)) # Empty data should work but is useless parser = Parser(name_raw) data = "" parser.parse(data) assert(parser.min_timestamp is None) assert(parser.max_timestamp is None) def test_formatting(self): self.real_t_formatting("PrepData", "RawData", "RawNotchedData") self.real_t_formatting("float32_8", "uint16_6", "uint16_9") def real_t_formatting(self, name_prep, name_raw, name_rawnotch): # invalid layout with assert_raises(TypeError) as e: formatter = Formatter("NoSuchLayout") # too little data formatter = Formatter(name_prep) data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ], [ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ] with assert_raises(FormatterError) as e: formatter.format(data) in_("error", str(e.exception)) # too much data formatter = Formatter(name_prep) data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ], [ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ] with assert_raises(FormatterError) as e: formatter.format(data) in_("error", str(e.exception)) # just right formatter = Formatter(name_prep) data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ], [ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ] text = formatter.format(data) eq_(text, "1234567890.000000 1.100000 2.200000 3.300000 4.400000 " + "5.500000 6.600000 7.700000 8.800000\n" + "1234567890.100000 1.100000 2.200000 3.300000 4.400000 " + "5.500000 6.600000 7.700000 8.800000\n") # try RawData too formatter = Formatter(name_raw) data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ], [ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ] text = formatter.format(data) eq_(text, "1234567890.000000 1 2 3 4 5 6\n" + "1234567890.100000 1 2 3 4 5 6\n") # pass an instantiated class formatter = Formatter(get_named(name_rawnotch)) data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ], [ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ] text = formatter.format(data) eq_(text, "1234567890.000000 1 2 3 4 5 6 7 8 9\n" + "1234567890.100000 1 2 3 4 5 6 7 8 9\n") # Empty data should work but is useless formatter = Formatter(name_raw) data = [] text = formatter.format(data) eq_(text, "") def test_roundtrip(self): self.real_t_roundtrip("PrepData", "RawData", "RawNotchedData") self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9") def real_t_roundtrip(self, name_prep, name_raw, name_rawnotch): # Verify that textual data passed into the Parser, and then # back through the Formatter, then back into the Parser, # gives identical parsed representations random.seed(12345) def do_roundtrip(layout, datagen): for i in range(100): rows = random.randint(1,100) data = "" ts = 1234567890 for r in range(rows): ts += random.uniform(0,1) row = sprintf("%f", ts) + " " row += " ".join(datagen()) row += "\n" data += row parser1 = Parser(layout) formatter = Formatter(layout) parser2 = Parser(layout) parser1.parse(data) parser2.parse(formatter.format(parser1.data)) eq_(parser1.data, parser2.data) def datagen(): return [ sprintf("%f", random.uniform(-1000,1000)) for x in range(8) ] do_roundtrip(name_prep, datagen) def datagen(): return [ sprintf("%d", random.randint(0,65535)) for x in range(6) ] do_roundtrip(name_raw, datagen) def datagen(): return [ sprintf("%d", random.randint(0,65535)) for x in range(9) ] do_roundtrip(name_rawnotch, datagen) class TestLayoutSpeed: @unittest.skip("this is slow") def test_layout_speed(self): import time random.seed(54321) def do_speedtest(layout, datagen, rows = 5000, times = 100): # Build data once data = "" ts = 1234567890 for r in range(rows): ts += random.uniform(0,1) row = sprintf("%f", ts) + " " row += " ".join(datagen()) row += "\n" data += row # Do lots of roundtrips start = time.time() for i in range(times): parser = Parser(layout) formatter = Formatter(layout) parser.parse(data) formatter.format(parser.data) elapsed = time.time() - start printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n", layout, elapsed * 1e3, (elapsed * 1e6) / (rows * times), (rows * times) / elapsed) print "" def datagen(): return [ sprintf("%f", random.uniform(-1000,1000)) for x in range(10) ] do_speedtest("float32_10", datagen) def datagen(): return [ sprintf("%d", random.randint(0,65535)) for x in range(10) ] do_speedtest("uint16_10", datagen) def datagen(): return [ sprintf("%d", random.randint(0,65535)) for x in range(6) ] do_speedtest("uint16_6", datagen)