Compare commits

..

24 Commits

Author SHA1 Message Date
fe91ff59a3 Better handling of JSON requests 2013-03-05 12:38:08 -05:00
64c24a00d6 Add --traceback argument to nilmdb-server script 2013-03-05 12:20:07 -05:00
58c0ae72f6 Support application/json POST bodies as well as x-www-form-urlencoded 2013-03-05 11:54:29 -05:00
c5f079f61f When removing data from files, try to punch a hole.
Requires fallocate(2) support with FALLOC_FL_PUNCH_HOLE, as
well as a filesystem that supports it (in Linux 3.7,
tmpfs, btrfs, xfs, or ext4)
2013-03-04 20:31:14 -05:00
2d45466f66 Print version at server startup 2013-03-04 15:43:45 -05:00
c6a0e6e96f More complete CORS handling, including preflight requests (hopefully) 2013-03-04 15:40:35 -05:00
79755dc624 Fix Allow: header by switching to cherrypy's built in tools.allow().
Replaces custom tools.allow_methods which didn't return the Allow: header.
2013-03-04 14:08:37 -05:00
c512631184 bulkdata: Build up rows and write to disk all at once 2013-03-03 12:03:44 -05:00
19d27c31bc Fix streaming requests like stream_extract 2013-03-03 11:37:47 -05:00
28310fe886 Add test for extents 2013-03-02 15:19:25 -05:00
1ccc2bce7e Add commandline support for listing extents 2013-03-02 15:19:19 -05:00
00237e30b2 Add "extent" option to stream_list in client, server, and nilmdb 2013-03-02 15:18:54 -05:00
521ff88f7c Support 'nilmtool help command' just like 'nilmtool command --help' 2013-03-02 13:56:03 -05:00
64897a1dd1 Change port from 12380 -> 32180 when running tests
This is so tests can be run without interfering with a normal server.
2013-03-02 13:19:44 -05:00
41ce8480bb cmdline: Support NILMDB_URL environment variable for default URL 2013-03-02 13:18:33 -05:00
204a6ecb15 Optimize bulkdata.append() by postponing flushes & mmap resize
Rather than flushing and resizing after each row is written to the
file, have the file object iterate by itself and do all of the
writes.  Only flush and resize the mmap after finishing.  This should
be pretty safe to do, especially since nothing is concurrent at the
moment.
2013-03-01 16:30:49 -05:00
5db3b186a4 Make test_mustclose more complete 2013-03-01 16:30:22 -05:00
fe640cf421 Remove must_close verification wrappers on bulkdata
At this point we know that the close() behavior is correct, so it's
not worth slowing everything down for these checks.
2013-03-01 16:11:44 -05:00
ca67c79fe4 Improve test_layout_speed 2013-03-01 16:04:10 -05:00
8917bcd4bf Fix test case failures due to increased client chunk size 2013-03-01 16:04:00 -05:00
a75ec98673 Slight speed improvements in layout.pyx 2013-03-01 16:03:38 -05:00
e476338d61 Remove outdated numpy dependency 2013-03-01 16:03:19 -05:00
d752b882f2 Bump up block sizes in client
This will help amortize the sqlite synchronization costs.
2013-02-28 21:11:57 -05:00
ade27773e6 Add --nosync option to nilmdb-server script 2013-02-28 20:45:08 -05:00
24 changed files with 504 additions and 206 deletions

View File

@@ -21,8 +21,12 @@ def extract_timestamp(line):
class Client(object):
"""Main client interface to the Nilm database."""
def __init__(self, url):
self.http = nilmdb.client.httpclient.HTTPClient(url)
def __init__(self, url, post_json = False):
"""Initialize client with given URL. If post_json is true,
POST requests are sent with Content-Type 'application/json'
instead of the default 'x-www-form-urlencoded'."""
self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
self.post_json = post_json
# __enter__/__exit__ allow this class to be a context manager
def __enter__(self):
@@ -31,8 +35,11 @@ class Client(object):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _json_param(self, data):
def _json_post_param(self, data):
"""Return compact json-encoded version of parameter"""
if self.post_json:
# If we're posting as JSON, we don't need to encode it further here
return data
return json.dumps(data, separators=(',',':'))
def close(self):
@@ -52,12 +59,14 @@ class Client(object):
as a dictionary."""
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None):
def stream_list(self, path = None, layout = None, extent = False):
params = {}
if path is not None:
params["path"] = path
if layout is not None:
params["layout"] = layout
if extent:
params["extent"] = 1
return self.http.get("stream/list", params)
def stream_get_metadata(self, path, keys = None):
@@ -71,7 +80,7 @@ class Client(object):
metadata."""
params = {
"path": path,
"data": self._json_param(data)
"data": self._json_post_param(data)
}
return self.http.post("stream/set_metadata", params)
@@ -79,7 +88,7 @@ class Client(object):
"""Update stream metadata from a dictionary"""
params = {
"path": path,
"data": self._json_param(data)
"data": self._json_post_param(data)
}
return self.http.post("stream/update_metadata", params)
@@ -220,7 +229,7 @@ class StreamInserter(object):
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 1048576
_max_data = 2 * 1024 * 1024
_max_time = 30
# Delta to add to the final timestamp, if "end" wasn't given

View File

@@ -10,7 +10,7 @@ import requests
class HTTPClient(object):
"""Class to manage and perform HTTP requests from the client"""
def __init__(self, baseurl = ""):
def __init__(self, baseurl = "", post_json = False):
"""If baseurl is supplied, all other functions that take
a URL can be given a relative URL instead."""
# Verify / clean up URL
@@ -26,6 +26,10 @@ class HTTPClient(object):
# Saved response, so that tests can verify a few things.
self._last_response = {}
# Whether to send application/json POST bodies (versus
# x-www-form-urlencoded)
self.post_json = post_json
def _handle_error(self, url, code, body):
# Default variables for exception. We use the entire body as
# the default message, in case we can't extract it from a JSON
@@ -57,12 +61,14 @@ class HTTPClient(object):
def close(self):
self.session.close()
def _do_req(self, method, url, query_data, body_data, stream):
def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url)
try:
response = self.session.request(method, url,
params = query_data,
data = body_data)
data = body_data,
stream = stream,
headers = headers)
except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url,
message = str(e.message))
@@ -76,12 +82,13 @@ class HTTPClient(object):
return (response, False)
# Normal versions that return data directly
def _req(self, method, url, query = None, body = None):
def _req(self, method, url, query = None, body = None, headers = None):
"""
Make a request and return the body data as a string or parsed
JSON object, or raise an error if it contained an error.
"""
(response, isjson) = self._do_req(method, url, query, body, False)
(response, isjson) = self._do_req(method, url, query, body,
stream = False, headers = headers)
if isjson:
return json.loads(response.content)
return response.content
@@ -92,20 +99,26 @@ class HTTPClient(object):
def post(self, url, params = None):
"""Simple POST (parameters in body)"""
return self._req("POST", url, None, params)
if self.post_json:
return self._req("POST", url, None,
json.dumps(params),
{ 'Content-type': 'application/json' })
else:
return self._req("POST", url, None, params)
def put(self, url, data, params = None):
"""Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data)
# Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None):
def _req_gen(self, method, url, query = None, body = None, headers = None):
"""
Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if
it contained an eror.
"""
(response, isjson) = self._do_req(method, url, query, body, True)
(response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
for line in response.iter_lines():
if isjson:
yield json.loads(line)

View File

@@ -6,13 +6,14 @@ from nilmdb.utils import datetime_tz
import nilmdb.utils.time
import sys
import os
import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form
# Valid subcommands. Defined in separate files just to break
# things up -- they're still called with Cmdline as self.
subcommands = [ "info", "create", "list", "metadata", "insert", "extract",
"remove", "destroy" ]
subcommands = [ "help", "info", "create", "list", "metadata",
"insert", "extract", "remove", "destroy" ]
# Import the subcommand modules
subcmd_mods = {}
@@ -29,6 +30,8 @@ class Cmdline(object):
def __init__(self, argv = None):
self.argv = argv or sys.argv[1:]
self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380")
self.subcmd = {}
def arg_time(self, toparse):
"""Parse a time string argument"""
@@ -50,18 +53,17 @@ class Cmdline(object):
group = self.parser.add_argument_group("Server")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
default=self.def_url,
help="NilmDB server URL (default: %(default)s)")
sub = self.parser.add_subparsers(title="Commands",
dest="command",
description="Specify --help after "
"the command for command-specific "
"options.")
sub = self.parser.add_subparsers(
title="Commands", dest="command",
description="Use 'help command' or 'command --help' for more "
"details on a particular command.")
# Set up subcommands (defined in separate files)
for cmd in subcommands:
subcmd_mods[cmd].setup(self, sub)
self.subcmd[cmd] = subcmd_mods[cmd].setup(self, sub)
def die(self, formatstr, *args):
fprintf(sys.stderr, formatstr + "\n", *args)
@@ -84,11 +86,13 @@ class Cmdline(object):
self.client = nilmdb.Client(self.args.url)
# Make a test connection to make sure things work
try:
server_version = self.client.version()
except nilmdb.client.Error as e:
self.die("error connecting to server: %s", str(e))
# Make a test connection to make sure things work,
# unless the particular command requests that we don't.
if "no_test_connect" not in self.args:
try:
server_version = self.client.version()
except nilmdb.client.Error as e:
self.die("error connecting to server: %s", str(e))
# Now dispatch client request to appropriate function. Parser
# should have ensured that we don't have any unknown commands

View File

@@ -26,6 +26,7 @@ Layout types are of the format: type_count
help="Path (in database) of new stream, e.g. /foo/bar")
group.add_argument("layout",
help="Layout type for new stream, e.g. float32_8")
return cmd
def cmd_create(self):
"""Create new stream"""

View File

@@ -16,6 +16,7 @@ def setup(self, sub):
group = cmd.add_argument_group("Required arguments")
group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar")
return cmd
def cmd_destroy(self):
"""Destroy stream"""

View File

@@ -30,6 +30,7 @@ def setup(self, sub):
help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true",
help="Just output a count of matched data points")
return cmd
def cmd_extract_verify(self):
if self.args.start is not None and self.args.end is not None:

26
nilmdb/cmdline/help.py Normal file
View File

@@ -0,0 +1,26 @@
from nilmdb.utils.printf import *
import argparse
import sys
def setup(self, sub):
cmd = sub.add_parser("help", help="Show detailed help for a command",
description="""
Show help for a command. 'help command' is
the same as 'command --help'.
""")
cmd.set_defaults(handler = cmd_help)
cmd.set_defaults(no_test_connect = True)
cmd.add_argument("command", nargs="?",
help="Command to get help about")
cmd.add_argument("rest", nargs=argparse.REMAINDER,
help=argparse.SUPPRESS)
return cmd
def cmd_help(self):
if self.args.command in self.subcmd:
self.subcmd[self.args.command].print_help()
else:
self.parser.print_help()
return

View File

@@ -12,6 +12,7 @@ def setup(self, sub):
version.
""")
cmd.set_defaults(handler = cmd_info)
return cmd
def cmd_info(self):
"""Print info about the server"""

View File

@@ -47,6 +47,7 @@ def setup(self, sub):
help="Path of stream, e.g. /foo/bar")
group.add_argument("file", nargs="*", default=['-'],
help="File(s) to insert (default: - (stdin))")
return cmd
def cmd_insert(self):
# Find requested stream

View File

@@ -24,11 +24,13 @@ def setup(self, sub):
group.add_argument("-l", "--layout", default="*",
help="Match only this stream layout")
group = cmd.add_argument_group("Interval extent")
group.add_argument("-E", "--extent", action="store_true",
help="Show min/max timestamps in this stream")
group = cmd.add_argument_group("Interval details")
group.add_argument("-d", "--detail", action="store_true",
help="Show available data time intervals")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in time intervals")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp (free-form, inclusive)")
@@ -36,6 +38,12 @@ def setup(self, sub):
metavar="TIME", type=self.arg_time,
help="Ending timestamp (free-form, noninclusive)")
group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in time intervals or extents")
return cmd
def cmd_list_verify(self):
# A hidden "path_positional" argument lets the user leave off the
# "-p" when specifying the path. Handle it here.
@@ -51,28 +59,38 @@ def cmd_list_verify(self):
if self.args.start >= self.args.end:
self.parser.error("start must precede end")
if self.args.start is not None or self.args.end is not None:
if not self.args.detail:
self.parser.error("--start and --end only make sense with --detail")
def cmd_list(self):
"""List available streams"""
streams = self.client.stream_list()
streams = self.client.stream_list(extent = True)
if self.args.timestamp_raw:
time_string = repr
else:
time_string = nilmdb.utils.time.format_time
for (path, layout) in streams:
for (path, layout, extent_min, extent_max) in streams:
if not (fnmatch.fnmatch(path, self.args.path) and
fnmatch.fnmatch(layout, self.args.layout)):
continue
printf("%s %s\n", path, layout)
if not self.args.detail:
continue
printed = False
for (start, end) in self.client.stream_intervals(path, self.args.start,
self.args.end):
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
printed = True
if not printed:
printf(" (no intervals)\n")
if self.args.extent:
if extent_min is None or extent_max is None:
printf(" extent: (no data)\n")
else:
printf(" extent: %s -> %s\n",
time_string(extent_min), time_string(extent_max))
if self.args.detail:
printed = False
for (start, end) in self.client.stream_intervals(
path, self.args.start, self.args.end):
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
printed = True
if not printed:
printf(" (no intervals)\n")

View File

@@ -26,6 +26,7 @@ def setup(self, sub):
exc.add_argument("-u", "--update", nargs="+", metavar="key=value",
help="Update metadata using provided "
"key=value pairs")
return cmd
def cmd_metadata(self):
"""Manipulate metadata"""

View File

@@ -23,6 +23,7 @@ def setup(self, sub):
group = cmd.add_argument_group("Output format")
group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed")
return cmd
def cmd_remove(self):
try:

View File

@@ -25,6 +25,12 @@ def main():
default = os.path.join(os.getcwd(), "db"))
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
'commits for sqlite transactions',
action = 'store_true', default = False)
group.add_argument('-t', '--traceback',
help = 'Provide tracebacks in client errors',
action = 'store_true', default = False)
group = parser.add_argument_group("Debug options")
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
@@ -35,7 +41,8 @@ def main():
# Create database object. Needs to be serialized before passing
# to the Server.
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
sync = not args.nosync)
# Configure the server
if args.quiet:
@@ -45,10 +52,12 @@ def main():
server = nilmdb.server.Server(db,
host = args.address,
port = args.port,
embedded = embedded)
embedded = embedded,
force_traceback = args.traceback)
# Print info
if not args.quiet:
print "Version: %s" % nilmdb.__version__
print "Database: %s" % (os.path.realpath(args.database))
if args.address == '0.0.0.0' or args.address == '::':
host = socket.getfqdn()

View File

@@ -28,7 +28,7 @@ except: # pragma: no cover
table_cache_size = 16
fd_cache_size = 16
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
@@ -171,7 +171,7 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements)
return Table(ospath)
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class File(object):
"""Object representing a single file on disk. Data can be appended,
or the self.mmap handle can be used for random reads."""
@@ -210,14 +210,28 @@ class File(object):
self.mmap.close()
self._f.close()
def append(self, data):
def append(self, data): # pragma: no cover (below version used instead)
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = True)
def append_pack_iter(self, count, packer, dataiter):
# An optimized verison of append, to avoid flushing the file
# and resizing the mmap after each data point.
try:
rows = []
for i in xrange(count):
row = dataiter.next()
rows.append(packer(*row))
self._f.write("".join(rows))
finally:
self._f.flush()
self.size = self._f.tell()
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = False)
class Table(object):
"""Tools to help access a single table (data at a specific OS path)."""
# See design.md for design details
@@ -351,9 +365,7 @@ class Table(object):
f = self.file_open(subdir, fname)
# Write the data
for i in xrange(count):
row = dataiter.next()
f.append(self.packer.pack(*row))
f.append_pack_iter(count, self.packer.pack, dataiter)
remaining -= count
self.nrows += count
@@ -398,8 +410,16 @@ class Table(object):
def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a
file, and potentially removing or truncating the file itself."""
# Import an existing list of deleted rows for this file
file, and potentially remove or truncate the file itself."""
# Close potentially open file in file_open LRU cache
self.file_open.cache_remove(self, subdir, filename)
# We keep a file like 0000.removed that contains a list of
# which rows have been "removed". Note that we never have to
# remove entries from this list, because we never decrease
# self.nrows, and so we will never overwrite those locations in the
# file. Only when the list covers the entire extent of the
# file will that file be removed.
datafile = os.path.join(self.root, subdir, filename)
cachefile = datafile + ".removed"
try:
@@ -453,6 +473,14 @@ class Table(object):
except:
pass
else:
# File needs to stick around. This means we can get
# degenerate cases where we have large files containing as
# little as one row. Try to punch a hole in the file,
# so that this region doesn't take up filesystem space.
offset = start * self.packer.size
count = (stop - start) * self.packer.size
nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
# Update cache. Try to do it atomically.
nilmdb.utils.atomic.replace_file(cachefile,
pickle.dumps(merged, 2))

View File

@@ -4,7 +4,6 @@ import time
import sys
import inspect
import cStringIO
import numpy as np
cdef enum:
max_value_count = 64
@@ -42,10 +41,12 @@ class Layout:
if datatype == 'uint16':
self.parse = self.parse_uint16
self.format = self.format_uint16
self.format_str = "%.6f" + " %d" * self.count
self.format = self.format_generic
elif datatype == 'float32' or datatype == 'float64':
self.parse = self.parse_float64
self.format = self.format_float64
self.format_str = "%.6f" + " %f" * self.count
self.format = self.format_generic
else:
raise KeyError("invalid type")
@@ -57,15 +58,15 @@ class Layout:
cdef double ts
# Return doubles even in float32 case, since they're going into
# a Python array which would upconvert to double anyway.
result = []
result = [0] * (self.count + 1)
cdef char *end
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
result.append(libc.stdlib.strtod(text, &end))
result[n+1] = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -79,18 +80,18 @@ class Layout:
cdef int n
cdef double ts
cdef int v
result = []
cdef char *end
result = [0] * (self.count + 1)
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
v = libc.stdlib.strtol(text, &end, 10)
if v < 0 or v > 65535:
raise ValueError("value out of range")
result.append(v)
result[n+1] = v
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -101,25 +102,12 @@ class Layout:
return (ts, result)
# Formatters
def format_float64(self, d):
def format_generic(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %f" % d[i+1]
return s + "\n"
def format_uint16(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %d" % d[i+1]
return s + "\n"
return (self.format_str % tuple(d)) + "\n"
# Get a layout by name
def get_named(typestring):

View File

@@ -269,28 +269,39 @@ class NilmDB(object):
return
def stream_list(self, path = None, layout = None):
"""Return list of [path, layout] lists of all streams
in the database.
def stream_list(self, path = None, layout = None, extent = False):
"""Return list of lists of all streams in the database.
If path is specified, include only streams with a path that
matches the given string.
If layout is specified, include only streams with a layout
that matches the given string.
"""
where = "WHERE 1=1"
params = ()
if layout:
where += " AND layout=?"
params += (layout,)
if path:
where += " AND path=?"
params += (path,)
result = self.con.execute("SELECT path, layout "
"FROM streams " + where, params).fetchall()
return sorted(list(x) for x in result)
If extent = False, returns a list of lists containing
the path and layout: [ path, layout ]
If extent = True, returns a list of lists containing the
path, layout, and min/max extent of the data:
[ path, layout, extent_min, extent_max ]
"""
params = ()
query = "SELECT streams.path, streams.layout"
if extent:
query += ", min(ranges.start_time), max(ranges.end_time)"
query += " FROM streams"
if extent:
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
query += " WHERE 1=1"
if layout is not None:
query += " AND streams.layout=?"
params += (layout,)
if path is not None:
query += " AND streams.path=?"
params += (path,)
query += " GROUP BY streams.id ORDER BY streams.path"
result = self.con.execute(query, params).fetchall()
return [ list(x) for x in result ]
def stream_intervals(self, path, start = None, end = None):
"""

View File

@@ -71,16 +71,59 @@ def exception_to_httperror(*expected):
# care of that.
return decorator.decorator(wrapper)
# Custom Cherrypy tools
def allow_methods(methods):
method = cherrypy.request.method.upper()
if method not in methods:
if method in cherrypy.request.methods_with_bodies:
cherrypy.request.body.read()
allowed = ', '.join(methods)
cherrypy.response.headers['Allow'] = allowed
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps
class Root(NilmApp):
@@ -121,20 +164,29 @@ class Stream(NilmApp):
# /stream/list
# /stream/list?layout=PrepData
# /stream/list?path=/newton/prep
# /stream/list?path=/newton/prep&extent=1
@cherrypy.expose
@cherrypy.tools.json_out()
def list(self, path = None, layout = None):
def list(self, path = None, layout = None, extent = None):
"""List all streams in the database. With optional path or
layout parameter, just list streams that match the given path
or layout"""
return self.db.stream_list(path, layout)
or layout.
If extent is not given, returns a list of lists containing
the path and layout: [ path, layout ]
If extent is provided, returns a list of lists containing the
path, layout, and min/max extent of the data:
[ path, layout, extent_min, extent_max ]
"""
return self.db.stream_list(path, layout, bool(extent))
# /stream/create?path=/newton/prep&layout=PrepData
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def create(self, path, layout):
"""Create a new stream in the database. Provide path
and one of the nilmdb.layout.layouts keys.
@@ -143,9 +195,10 @@ class Stream(NilmApp):
# /stream/destroy?path=/newton/prep
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
return self.db.stream_destroy(path)
@@ -176,31 +229,41 @@ class Stream(NilmApp):
# /stream/set_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def set_metadata(self, path, data):
"""Set metadata for the named stream, replacing any
existing metadata. Data should be a json-encoded
dictionary"""
data_dict = json.loads(data)
self.db.stream_set_metadata(path, data_dict)
"""Set metadata for the named stream, replacing any existing
metadata. Data can be json-encoded or a plain dictionary (if
it was sent as application/json to begin with)"""
if not isinstance(data, dict):
try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_set_metadata(path, data)
# /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def update_metadata(self, path, data):
"""Update metadata for the named stream. Data
should be a json-encoded dictionary"""
data_dict = json.loads(data)
self.db.stream_update_metadata(path, data_dict)
if not isinstance(data, dict):
try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_update_metadata(path, data)
# /stream/insert?path=/newton/prep
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.allow_methods(methods = ["PUT"])
@cherrypy.tools.CORS_allow(methods = ["PUT"])
def insert(self, path, start, end):
"""
Insert new data into the database. Provide textual data
@@ -254,9 +317,10 @@ class Stream(NilmApp):
# /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def remove(self, path, start = None, end = None):
"""
Remove data from the backend database. Removes all data in
@@ -400,7 +464,7 @@ class Server(object):
'server.socket_host': host,
'server.socket_port': port,
'engine.autoreload_on': False,
'server.max_request_body_size': 4*1024*1024,
'server.max_request_body_size': 8*1024*1024,
})
if self.embedded:
cherrypy.config.update({ 'environment': 'embedded' })
@@ -411,16 +475,20 @@ class Server(object):
'error_page.default': self.json_error_page,
})
# Send a permissive Access-Control-Allow-Origin (CORS) header
# with all responses so that browsers can send cross-domain
# requests to this server.
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
'*' })
# Some default headers to just help identify that things are working
app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
# Only allow GET and HEAD by default. Individual handlers
# can override.
app_config.update({ 'tools.allow_methods.on': True,
'tools.allow_methods.methods': ['GET', 'HEAD'] })
# Set up Cross-Origin Resource Sharing (CORS) handler so we
# can correctly respond to browsers' CORS preflight requests.
# This also limits verbs to GET and HEAD by default.
app_config.update({ 'tools.CORS_allow.on': True,
'tools.CORS_allow.methods': ['GET', 'HEAD'] })
# Configure the 'json_in' tool to also allow other content-types
# (like x-www-form-urlencoded), and to treat JSON as a dict that
# fills requests.param.
app_config.update({ 'tools.json_in.force': False,
'tools.json_in.processor': json_to_request_params })
# Send tracebacks in error responses. They're hidden by the
# error_page function for client errors (code 400-499).

View File

@@ -8,3 +8,4 @@ from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close
from nilmdb.utils import atomic
import nilmdb.utils.threadsafety
import nilmdb.utils.fallocate

49
nilmdb/utils/fallocate.py Normal file
View File

@@ -0,0 +1,49 @@
# Implementation of hole punching via fallocate, if the OS
# and filesystem support it.
try:
import os
import ctypes
import ctypes.util
def make_fallocate():
libc_name = ctypes.util.find_library('c')
libc = ctypes.CDLL(libc_name, use_errno=True)
_fallocate = libc.fallocate
_fallocate.restype = ctypes.c_int
_fallocate.argtypes = [ ctypes.c_int, ctypes.c_int,
ctypes.c_int64, ctypes.c_int64 ]
del libc
del libc_name
def fallocate(fd, mode, offset, len_):
res = _fallocate(fd, mode, offset, len_)
if res != 0: # pragma: no cover
errno = ctypes.get_errno()
raise IOError(errno, os.strerror(errno))
return fallocate
fallocate = make_fallocate()
del make_fallocate
except Exception: # pragma: no cover
fallocate = None
FALLOC_FL_KEEP_SIZE = 0x01
FALLOC_FL_PUNCH_HOLE = 0x02
def punch_hole(filename, offset, length, ignore_errors = True):
"""Punch a hole in the file. This isn't well supported, so errors
are ignored by default."""
try:
if fallocate is None: # pragma: no cover
raise IOError("fallocate not available")
with open(filename, "r+") as f:
fallocate(f.fileno(),
FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
offset, length)
except IOError: # pragma: no cover
if ignore_errors:
return
raise

View File

@@ -20,11 +20,12 @@ import unittest
import warnings
import resource
import time
import re
from testutil.helpers import *
testdb = "tests/client-testdb"
testurl = "http://localhost:12380/"
testurl = "http://localhost:32180/"
def setup_module():
global test_server, test_db
@@ -34,7 +35,7 @@ def setup_module():
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(testdb, sync = False)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False,
port = 32180, stoppable = False,
fast_shutdown = True,
force_traceback = False)
test_server.start(blocking = False)
@@ -54,20 +55,14 @@ class TestClient(object):
client.version()
client.close()
# Trigger same error with a PUT request
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Then a fake URL on a real host
client = nilmdb.Client(url = "http://localhost:12380/fake/")
client = nilmdb.Client(url = "http://localhost:32180/fake/")
with assert_raises(nilmdb.client.ClientError):
client.version()
client.close()
# Now a real URL with no http:// prefix
client = nilmdb.Client(url = "localhost:12380")
client = nilmdb.Client(url = "localhost:32180")
version = client.version()
client.close()
@@ -172,6 +167,10 @@ class TestClient(object):
def test_client_04_insert(self):
client = nilmdb.Client(url = testurl)
# Limit _max_data to 1 MB, since our test file is 1.5 MB
old_max_data = nilmdb.client.client.StreamInserter._max_data
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
datetime_tz.localtz_set("America/New_York")
testfile = "tests/data/prep-20120323T1000"
@@ -244,8 +243,9 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
# Client chunks the input, so the exact timestamp here might change
# if the chunk positions change.
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
str(e.exception))
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
">= end time 1332511201.0", str(e.exception))
is not None)
# Now do the real load
data = timestamper.TimestamperRate(testfile, start, 120)
@@ -264,6 +264,7 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("verlap", str(e.exception))
nilmdb.client.client.StreamInserter._max_data = old_max_data
client.close()
def test_client_05_extractremove(self):
@@ -353,52 +354,48 @@ class TestClient(object):
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())
# Make sure Access-Control-Allow-Origin gets set
if "access-control-allow-origin: " not in headers():
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in /stream/extract response:\n" +
headers())
client.close()
def test_client_08_unicode(self):
# Basic Unicode tests
client = nilmdb.Client(url = testurl)
# Try both with and without posting JSON
for post_json in (False, True):
# Basic Unicode tests
client = nilmdb.Client(url = testurl, post_json = post_json)
# Delete streams that exist
for stream in client.stream_list():
client.stream_destroy(stream[0])
# Delete streams that exist
for stream in client.stream_list():
client.stream_destroy(stream[0])
# Database is empty
eq_(client.stream_list(), [])
# Database is empty
eq_(client.stream_list(), [])
# Create Unicode stream, match it
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
client.stream_create(*raw)
eq_(client.stream_list(), [raw])
eq_(client.stream_list(layout=raw[1]), [raw])
eq_(client.stream_list(path=raw[0]), [raw])
client.stream_create(*prep)
eq_(client.stream_list(), [prep, raw])
# Create Unicode stream, match it
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
client.stream_create(*raw)
eq_(client.stream_list(), [raw])
eq_(client.stream_list(layout=raw[1]), [raw])
eq_(client.stream_list(path=raw[0]), [raw])
client.stream_create(*prep)
eq_(client.stream_list(), [prep, raw])
# Set / get metadata with Unicode keys and values
eq_(client.stream_get_metadata(raw[0]), {})
eq_(client.stream_get_metadata(prep[0]), {})
meta1 = { u"alpha": u"α",
u"β": u"beta" }
meta2 = { u"alpha": u"α" }
meta3 = { u"β": u"beta" }
client.stream_set_metadata(prep[0], meta1)
client.stream_update_metadata(prep[0], {})
client.stream_update_metadata(raw[0], meta2)
client.stream_update_metadata(raw[0], meta3)
eq_(client.stream_get_metadata(prep[0]), meta1)
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
# Set / get metadata with Unicode keys and values
eq_(client.stream_get_metadata(raw[0]), {})
eq_(client.stream_get_metadata(prep[0]), {})
meta1 = { u"alpha": u"α",
u"β": u"beta" }
meta2 = { u"alpha": u"α" }
meta3 = { u"β": u"beta" }
client.stream_set_metadata(prep[0], meta1)
client.stream_update_metadata(prep[0], {})
client.stream_update_metadata(raw[0], meta2)
client.stream_update_metadata(raw[0], meta3)
eq_(client.stream_get_metadata(prep[0]), meta1)
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close()
client.close()
def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New
@@ -577,7 +574,7 @@ class TestClient(object):
def connections():
try:
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',12380)]
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except:
raise SkipTest("can't get connection info")

View File

@@ -11,12 +11,7 @@ from nose.tools import assert_raises
import itertools
import os
import re
import shutil
import sys
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import StringIO
import shlex
@@ -32,7 +27,7 @@ def server_start(max_results = None, bulkdata_args = {}):
max_results = max_results,
bulkdata_args = bulkdata_args)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False,
port = 32180, stoppable = False,
fast_shutdown = True,
force_traceback = False)
test_server.start(blocking = False)
@@ -64,6 +59,7 @@ class TestCmdline(object):
passing the given input. Returns a tuple with the output and
exit code"""
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
os.environ['NILMDB_URL'] = "http://localhost:32180/"
class stdio_wrapper:
def __init__(self, stdin, stdout, stderr):
self.io = (stdin, stdout, stderr)
@@ -174,7 +170,7 @@ class TestCmdline(object):
self.fail("-u localhost:1 info")
self.contain("error connecting to server")
self.ok("-u localhost:12380 info")
self.ok("-u localhost:32180 info")
self.ok("info")
# Duplicated arguments should fail, but this isn't implemented
@@ -192,6 +188,20 @@ class TestCmdline(object):
self.fail("extract --start 2000-01-01 --start 2001-01-02")
self.contain("duplicated argument")
# Verify that "help command" and "command --help" are identical
# for all commands.
self.fail("")
m = re.search(r"{(.*)}", self.captured)
for command in [""] + m.group(1).split(','):
self.ok(command + " --help")
cap1 = self.captured
self.ok("help " + command)
cap2 = self.captured
self.ok("help " + command + " asdf --url --zxcv -")
cap3 = self.captured
eq_(cap1, cap2)
eq_(cap2, cap3)
def test_02_parsetime(self):
os.environ['TZ'] = "America/New_York"
test = datetime_tz.datetime_tz.now()
@@ -210,7 +220,7 @@ class TestCmdline(object):
def test_03_info(self):
self.ok("info")
self.contain("Server URL: http://localhost:12380/")
self.contain("Server URL: http://localhost:32180/")
self.contain("Client version: " + nilmdb.__version__)
self.contain("Server version: " + test_server.version)
self.contain("Server database path")
@@ -418,7 +428,7 @@ class TestCmdline(object):
# bad start time
self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null")
def test_07_detail(self):
def test_07_detail_extent(self):
# Just count the number of lines, it's probably fine
self.ok("list --detail")
lines_(self.captured, 8)
@@ -463,6 +473,18 @@ class TestCmdline(object):
lines_(self.captured, 2)
self.contain("[ 1332497115.612 -> 1332497159.991668 ]")
# Check --extent output
self.ok("list --extent")
lines_(self.captured, 6)
self.ok("list -E -T")
self.contain(" extent: 1332496800 -> 1332497159.991668")
self.contain(" extent: (no data)")
# Misc
self.fail("list --extent --start='23 Mar 2012 10:05:15.50'")
self.contain("--start and --end only make sense with --detail")
def test_08_extract(self):
# nonexistent stream
self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01")

View File

@@ -9,14 +9,7 @@ from nose.tools import assert_raises
import distutils.version
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import random
import unittest
@@ -246,7 +239,7 @@ class TestLayoutSpeed:
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
data = formatter.format(parser.data)
formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
@@ -264,3 +257,8 @@ class TestLayoutSpeed:
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_speedtest("uint16_6", datagen)

View File

@@ -34,6 +34,10 @@ class Bar:
def __del__(self):
fprintf(err, "Deleting\n")
@classmethod
def baz(self):
fprintf(err, "Baz\n")
def close(self):
fprintf(err, "Closing\n")

View File

@@ -6,15 +6,13 @@ import distutils.version
import simplejson as json
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import time
import requests
from nilmdb.utils import serializer_proxy
@@ -119,7 +117,7 @@ class TestBlockingServer(object):
# Start web app on a custom port
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = True)
port = 32180, stoppable = True)
# Run it
event = threading.Event()
@@ -131,13 +129,13 @@ class TestBlockingServer(object):
raise AssertionError("server didn't start in 10 seconds")
# Send request to exit.
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
req = urlopen("http://127.0.0.1:32180/exit/", timeout = 1)
# Wait for it
thread.join()
def geturl(path):
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
req = urlopen("http://127.0.0.1:32180" + path, timeout = 10)
return req.read()
def getjson(path):
@@ -149,7 +147,7 @@ class TestServer(object):
# Start web app on a custom port
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = False)
port = 32180, stoppable = False)
self.server.start(blocking = False)
def tearDown(self):
@@ -208,3 +206,51 @@ class TestServer(object):
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=foo")
eq_(data, {'foo': None})
def test_cors_headers(self):
# Test that CORS headers are being set correctly
# Normal GET should send simple response
url = "http://127.0.0.1:32180/stream/list"
r = requests.get(url, headers = { "Origin": "http://google.com/" })
eq_(r.status_code, 200)
if "access-control-allow-origin" not in r.headers:
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in response:\n", r.headers)
eq_(r.headers["access-control-allow-origin"], "http://google.com/")
# OPTIONS without CORS preflight headers should result in 405
r = requests.options(url, headers = {
"Origin": "http://google.com/",
})
eq_(r.status_code, 405)
# OPTIONS with preflight headers should give preflight response
r = requests.options(url, headers = {
"Origin": "http://google.com/",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "X-Custom",
})
eq_(r.status_code, 200)
if "access-control-allow-origin" not in r.headers:
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in response:\n", r.headers)
eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
eq_(r.headers["access-control-allow-headers"], "X-Custom")
def test_post_bodies(self):
# Test JSON post bodies
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '{"hello": 1}')
eq_(r.status_code, 404) # wrong parameters
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '["hello"]')
eq_(r.status_code, 415) # not a dict
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '[hello]')
eq_(r.status_code, 400) # badly formatted JSON