Browse Source

- Some updates to max_results handling on server side

- Flesh out tests for the new nilmdb.layout.Formatter
  Coverage doesn't handle the cython module, so this is just
  functional stuff, not necessarily complete.
  Still need to finish each Layout.format()

- Split out test_client_5_chunked from test_client_4_misc
  so it's easier to skip while debugging.  Turning off streaming
  lets us see tracebacks from within the server's content()
  functions.

- More work on stream/extract in cmdline, client, server, nilmdb.
  Still needs work on server side, but should be complete in nilmdb.

- Start nilmdb.layout.Formatter class


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10888 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 12 years ago
parent
commit
e2daeb5e54
8 changed files with 196 additions and 32 deletions
  1. +2
    -2
      nilmdb/client.py
  2. +28
    -0
      nilmdb/layout.pyx
  3. +40
    -14
      nilmdb/nilmdb.py
  4. +16
    -5
      nilmdb/server.py
  5. +1
    -1
      setup.cfg
  6. +10
    -6
      tests/test_client.py
  7. +4
    -4
      tests/test_cmdline.py
  8. +95
    -0
      tests/test_layout.py

+ 2
- 2
nilmdb/client.py View File

@@ -122,7 +122,7 @@ class Client(object):
params["start"] = repr(start) # use repr to keep precision params["start"] = repr(start) # use repr to keep precision
if end is not None: if end is not None:
params["end"] = repr(end) params["end"] = repr(end)
return self.http.get_gen("stream/intervals", params)
return self.http.get_gen("stream/intervals", params, retjson = True)


def stream_extract(self, path, start = None, end = None): def stream_extract(self, path, start = None, end = None):
""" """
@@ -138,4 +138,4 @@ class Client(object):
params["start"] = repr(start) # use repr to keep precision params["start"] = repr(start) # use repr to keep precision
if end is not None: if end is not None:
params["end"] = repr(end) params["end"] = repr(end)
return self.http.get_gen("stream/extract", params)
return self.http.get_gen("stream/extract", params, retjson = False)

+ 28
- 0
nilmdb/layout.pyx View File

@@ -17,6 +17,9 @@ class ParserError(Exception):
self.message = "line " + str(line) + ": " + message self.message = "line " + str(line) + ": " + message
Exception.__init__(self, self.message) Exception.__init__(self, self.message)


class FormatterError(Exception):
pass

class Layout: class Layout:
"""Represents a NILM database layout""" """Represents a NILM database layout"""
def description(self): def description(self):
@@ -159,3 +162,28 @@ class Parser(object):
if len(self.data): if len(self.data):
self.min_timestamp = self.data[0][0] self.min_timestamp = self.data[0][0]
self.max_timestamp = self.data[-1][0] self.max_timestamp = self.data[-1][0]

class Formatter(object):
"""Object that formats database data into ASCII"""

def __init__(self, layout):
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = named[layout]
except KeyError:
raise TypeError("unknown layout")

def format(self, data):
"""
Format raw data from the database, using the current layout,
as lines of ACSII text.
"""
text = cStringIO.StringIO()
try:
for row in data:
text.write(self.layout.format(row))
except (ValueError, IndexError, TypeError) as e:
raise FormatterError("formatting error: " + e.message)
return text.getvalue()

+ 40
- 14
nilmdb/nilmdb.py View File

@@ -86,7 +86,7 @@ class OverlapError(NilmDBError):
class NilmDB(object): class NilmDB(object):
verbose = 0 verbose = 0


def __init__(self, basepath, sync=True, response_size=None):
def __init__(self, basepath, sync=True, max_results=None):
# set up path # set up path
self.basepath = os.path.abspath(basepath.rstrip('/')) self.basepath = os.path.abspath(basepath.rstrip('/'))


@@ -116,12 +116,12 @@ class NilmDB(object):
else: else:
self.con.execute("PRAGMA synchronous=OFF") self.con.execute("PRAGMA synchronous=OFF")


# Approximate largest response that we want to send in a single
# reply (for stream_intervals, stream_extract)
if response_size:
self.response_size = response_size
# Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract)
if max_results:
self.max_results = max_results
else: else:
self.response_size = 500000
self.max_results = 16384


self.opened = True self.opened = True


@@ -227,25 +227,23 @@ class NilmDB(object):


def stream_intervals(self, path, start = None, end = None): def stream_intervals(self, path, start = None, end = None):
""" """
Returns (intervals, truncated) tuple.
Returns (intervals, restart) tuple.


intervals is a list of [start,end] timestamps of all intervals intervals is a list of [start,end] timestamps of all intervals
that exist for path, between start and end. that exist for path, between start and end.


truncated, if True, means that there were too many results to
restart, if nonzero, means that there were too many results to
return in a single request. The data is complete from the return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated.
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
""" """

# Around 32 bytes per interval in the final JSON output
max_results = max(self.response_size / 32, 2)

stream_id = self._stream_id(path) stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12) requested = Interval(start or 0, end or 1e12)
result = [] result = []
for n, i in enumerate(intervals.intersection(requested)): for n, i in enumerate(intervals.intersection(requested)):
if n >= max_results:
if n >= self.max_results:
restart = i.start restart = i.start
break break
result.append([i.start, i.end]) result.append([i.start, i.end])
@@ -371,3 +369,31 @@ class NilmDB(object):


# And that's all # And that's all
return "ok" return "ok"

def stream_extract(self, path, start = None, end = None):
"""
Returns (data, restart) tuple.

data is a list of raw data from the database, suitable for
passing to e.g. nilmdb.layout.Formatter to translate into
textual form.

restart, if nonzero, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
"""
# TODO: FIX
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
result = []
for n, i in enumerate(intervals.intersection(requested)):
if n >= self.max_results:
restart = i.start
break
result.append([i.start, i.end])
else:
restart = 0
return (result, restart)

+ 16
- 5
nilmdb/server.py View File

@@ -221,19 +221,30 @@ class Stream(NilmApp):
if end is not None: if end is not None:
end = float(end) end = float(end)


# Check parameters
if start is not None and end is not None: if start is not None and end is not None:
if end < start: if end < start:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"end before start") "end before start")


# Check path and get layout
streams = self.db.stream_list(path = path)
if len(streams) != 1:
raise cherrypy.HTTPError("404 Not Found", "No such stream")
layout = streams[0][1]

# Get formatter
formatter = nilmdb.layout.Formatter(layout)

def content(start, end): def content(start, end):
# Note: disable response.stream below to get better debug info
# from tracebacks in this subfunction.
while True: while True:
# Note: disable response.stream below to get better debug info
# from tracebacks in this subfunction.
(data, restart) = self.db.stream_extract(path, start, end) (data, restart) = self.db.stream_extract(path, start, end)
# data is a list of rows; format it as text
response = "timestamp foo bar baz XXX\n"
yield response

# Format the data and yield it
yield formatter.format(data)

if restart == 0: if restart == 0:
break break
start = restart start = restart


+ 1
- 1
setup.cfg View File

@@ -11,7 +11,7 @@ cover-erase=
stop= stop=
verbosity=2 verbosity=2
#tests=tests/test_cmdline.py #tests=tests/test_cmdline.py
#tests=tests/test_layout.py
tests=tests/test_layout.py
#tests=tests/test_interval.py #tests=tests/test_interval.py
#tests=tests/test_client.py #tests=tests/test_client.py
#tests=tests/test_timestamper.py #tests=tests/test_timestamper.py


+ 10
- 6
tests/test_client.py View File

@@ -13,6 +13,7 @@ import sys
import threading import threading
import cStringIO import cStringIO
import simplejson as json import simplejson as json
import unittest


from test_helpers import * from test_helpers import *


@@ -205,17 +206,20 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception)) in_("no data provided", str(e.exception))


# Make sure that /stream/intervals properly returns a
# streaming, chunked response. Pokes around in client.http
# internals a bit to look at the response headers.
@unittest.skip("while debugging")
def test_client_5_chunked(self):
# Make sure that /stream/intervals and /stream/extract
# properly return streaming, chunked response. Pokes around
# in client.http internals a bit to look at the response
# headers.

client = nilmdb.Client(url = "http://localhost:12380/")

x = client.http.get("stream/intervals", { "path": "/newton/prep" }, x = client.http.get("stream/intervals", { "path": "/newton/prep" },
retjson=False) retjson=False)
eq_(x.count('\n'), 2) eq_(x.count('\n'), 2)
in_("transfer-encoding: chunked", client.http._headers.lower()) in_("transfer-encoding: chunked", client.http._headers.lower())


# Make sure that /stream/extract properly returns a
# streaming, chunked response. Pokes around in client.http
# internals a bit to look at the response headers.
x = client.http.get("stream/extract", x = client.http.get("stream/extract",
{ "path": "/newton/prep", { "path": "/newton/prep",
"start": "123", "start": "123",


+ 4
- 4
tests/test_cmdline.py View File

@@ -20,10 +20,10 @@ from test_helpers import *


testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"


def server_start(response_size = None):
def server_start(max_results = None):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False, response_size = response_size)
test_db = nilmdb.NilmDB(testdb, sync = False, max_results = max_results)
test_server = nilmdb.Server(test_db, host = "127.0.0.1", test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False, port = 12380, stoppable = False,
fast_shutdown = True, fast_shutdown = True,
@@ -360,9 +360,9 @@ class TestCmdline(object):
self.dump() self.dump()


def test_cmdline_9_truncated(self): def test_cmdline_9_truncated(self):
# Test truncated responses by overriding the nilmdb response_size
# Test truncated responses by overriding the nilmdb max_results
server_stop() server_stop()
server_start(response_size = 30)
server_start(max_results = 2)


self.ok("list --detail") self.ok("list --detail")
eq_(self.captured.count('\n'), 11) eq_(self.captured.count('\n'), 11)

+ 95
- 0
tests/test_layout.py View File

@@ -1,5 +1,7 @@
import nilmdb import nilmdb


from nilmdb.printf import *

from nose.tools import * from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
import distutils.version import distutils.version
@@ -13,6 +15,8 @@ import urllib2
from urllib2 import urlopen, HTTPError from urllib2 import urlopen, HTTPError
import Queue import Queue
import cStringIO import cStringIO
import random
import unittest


from test_helpers import * from test_helpers import *


@@ -91,3 +95,94 @@ class TestLayouts(object):
parser.parse(data) parser.parse(data)
assert(parser.min_timestamp is None) assert(parser.min_timestamp is None)
assert(parser.max_timestamp is None) assert(parser.max_timestamp is None)

def test_formatting(self):
# invalid layout
with assert_raises(TypeError) as e:
formatter = Formatter("NoSuchLayout")

# too little data
formatter = Formatter("PrepData")
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))

# too much data
formatter = Formatter("PrepData")
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))

# just right
formatter = Formatter("PrepData")
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")

# try RawData too
formatter = Formatter("RawData")
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n")

# pass an instantiated class
formatter = Formatter(RawNotchedData())
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n")

# Empty data should work but is useless
formatter = Formatter("RawData")
data = []
text = formatter.format(data)
eq_(text, "")

def test_roundtrip(self):
# Verify that textual data passed into the Parser and then
# back through the Formatter comes out the same way.
random.seed(12345)

# Roundtrip PrepData
for i in range(1000):
rows = random.randint(1,100)
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts)
for f in range(8):
row += sprintf(" %f", random.uniform(-1000,1000))
data += row + "\n"
parser = Parser("PrepData")
formatter = Formatter("PrepData")
parser.parse(data)
eq_(formatter.format(parser.data), data)

# Roundtrip RawData
for i in range(1000):
rows = random.randint(1,100)
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts)
for f in range(8):
row += sprintf(" %d", random.randint(0,65535))
data += row + "\n"
parser = Parser("RawData")
formatter = Formatter("RawData")
parser.parse(data)
eq_(formatter.format(parser.data), data)

Loading…
Cancel
Save