Compare commits
24 Commits
nilmdb-1.2
...
nilmdb-1.2
Author | SHA1 | Date | |
---|---|---|---|
fe91ff59a3 | |||
64c24a00d6 | |||
58c0ae72f6 | |||
c5f079f61f | |||
2d45466f66 | |||
c6a0e6e96f | |||
79755dc624 | |||
c512631184 | |||
19d27c31bc | |||
28310fe886 | |||
1ccc2bce7e | |||
00237e30b2 | |||
521ff88f7c | |||
64897a1dd1 | |||
41ce8480bb | |||
204a6ecb15 | |||
5db3b186a4 | |||
fe640cf421 | |||
ca67c79fe4 | |||
8917bcd4bf | |||
a75ec98673 | |||
e476338d61 | |||
d752b882f2 | |||
ade27773e6 |
@@ -21,8 +21,12 @@ def extract_timestamp(line):
|
||||
class Client(object):
|
||||
"""Main client interface to the Nilm database."""
|
||||
|
||||
def __init__(self, url):
|
||||
self.http = nilmdb.client.httpclient.HTTPClient(url)
|
||||
def __init__(self, url, post_json = False):
|
||||
"""Initialize client with given URL. If post_json is true,
|
||||
POST requests are sent with Content-Type 'application/json'
|
||||
instead of the default 'x-www-form-urlencoded'."""
|
||||
self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
|
||||
self.post_json = post_json
|
||||
|
||||
# __enter__/__exit__ allow this class to be a context manager
|
||||
def __enter__(self):
|
||||
@@ -31,8 +35,11 @@ class Client(object):
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
def _json_param(self, data):
|
||||
def _json_post_param(self, data):
|
||||
"""Return compact json-encoded version of parameter"""
|
||||
if self.post_json:
|
||||
# If we're posting as JSON, we don't need to encode it further here
|
||||
return data
|
||||
return json.dumps(data, separators=(',',':'))
|
||||
|
||||
def close(self):
|
||||
@@ -52,12 +59,14 @@ class Client(object):
|
||||
as a dictionary."""
|
||||
return self.http.get("dbinfo")
|
||||
|
||||
def stream_list(self, path = None, layout = None):
|
||||
def stream_list(self, path = None, layout = None, extent = False):
|
||||
params = {}
|
||||
if path is not None:
|
||||
params["path"] = path
|
||||
if layout is not None:
|
||||
params["layout"] = layout
|
||||
if extent:
|
||||
params["extent"] = 1
|
||||
return self.http.get("stream/list", params)
|
||||
|
||||
def stream_get_metadata(self, path, keys = None):
|
||||
@@ -71,7 +80,7 @@ class Client(object):
|
||||
metadata."""
|
||||
params = {
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
"data": self._json_post_param(data)
|
||||
}
|
||||
return self.http.post("stream/set_metadata", params)
|
||||
|
||||
@@ -79,7 +88,7 @@ class Client(object):
|
||||
"""Update stream metadata from a dictionary"""
|
||||
params = {
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
"data": self._json_post_param(data)
|
||||
}
|
||||
return self.http.post("stream/update_metadata", params)
|
||||
|
||||
@@ -220,7 +229,7 @@ class StreamInserter(object):
|
||||
# These are soft limits -- actual data might be rounded up.
|
||||
# We send when we have a certain amount of data queued, or
|
||||
# when a certain amount of time has passed since the last send.
|
||||
_max_data = 1048576
|
||||
_max_data = 2 * 1024 * 1024
|
||||
_max_time = 30
|
||||
|
||||
# Delta to add to the final timestamp, if "end" wasn't given
|
||||
|
@@ -10,7 +10,7 @@ import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
def __init__(self, baseurl = ""):
|
||||
def __init__(self, baseurl = "", post_json = False):
|
||||
"""If baseurl is supplied, all other functions that take
|
||||
a URL can be given a relative URL instead."""
|
||||
# Verify / clean up URL
|
||||
@@ -26,6 +26,10 @@ class HTTPClient(object):
|
||||
# Saved response, so that tests can verify a few things.
|
||||
self._last_response = {}
|
||||
|
||||
# Whether to send application/json POST bodies (versus
|
||||
# x-www-form-urlencoded)
|
||||
self.post_json = post_json
|
||||
|
||||
def _handle_error(self, url, code, body):
|
||||
# Default variables for exception. We use the entire body as
|
||||
# the default message, in case we can't extract it from a JSON
|
||||
@@ -57,12 +61,14 @@ class HTTPClient(object):
|
||||
def close(self):
|
||||
self.session.close()
|
||||
|
||||
def _do_req(self, method, url, query_data, body_data, stream):
|
||||
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data)
|
||||
data = body_data,
|
||||
stream = stream,
|
||||
headers = headers)
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
@@ -76,12 +82,13 @@ class HTTPClient(object):
|
||||
return (response, False)
|
||||
|
||||
# Normal versions that return data directly
|
||||
def _req(self, method, url, query = None, body = None):
|
||||
def _req(self, method, url, query = None, body = None, headers = None):
|
||||
"""
|
||||
Make a request and return the body data as a string or parsed
|
||||
JSON object, or raise an error if it contained an error.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, False)
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = False, headers = headers)
|
||||
if isjson:
|
||||
return json.loads(response.content)
|
||||
return response.content
|
||||
@@ -92,20 +99,26 @@ class HTTPClient(object):
|
||||
|
||||
def post(self, url, params = None):
|
||||
"""Simple POST (parameters in body)"""
|
||||
return self._req("POST", url, None, params)
|
||||
if self.post_json:
|
||||
return self._req("POST", url, None,
|
||||
json.dumps(params),
|
||||
{ 'Content-type': 'application/json' })
|
||||
else:
|
||||
return self._req("POST", url, None, params)
|
||||
|
||||
def put(self, url, data, params = None):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
return self._req("PUT", url, params, data)
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def _req_gen(self, method, url, query = None, body = None):
|
||||
def _req_gen(self, method, url, query = None, body = None, headers = None):
|
||||
"""
|
||||
Make a request and return a generator that gives back strings
|
||||
or JSON decoded lines of the body data, or raise an error if
|
||||
it contained an eror.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, True)
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = True, headers = headers)
|
||||
for line in response.iter_lines():
|
||||
if isjson:
|
||||
yield json.loads(line)
|
||||
|
@@ -6,13 +6,14 @@ from nilmdb.utils import datetime_tz
|
||||
import nilmdb.utils.time
|
||||
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
# Valid subcommands. Defined in separate files just to break
|
||||
# things up -- they're still called with Cmdline as self.
|
||||
subcommands = [ "info", "create", "list", "metadata", "insert", "extract",
|
||||
"remove", "destroy" ]
|
||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
||||
"insert", "extract", "remove", "destroy" ]
|
||||
|
||||
# Import the subcommand modules
|
||||
subcmd_mods = {}
|
||||
@@ -29,6 +30,8 @@ class Cmdline(object):
|
||||
def __init__(self, argv = None):
|
||||
self.argv = argv or sys.argv[1:]
|
||||
self.client = None
|
||||
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380")
|
||||
self.subcmd = {}
|
||||
|
||||
def arg_time(self, toparse):
|
||||
"""Parse a time string argument"""
|
||||
@@ -50,18 +53,17 @@ class Cmdline(object):
|
||||
|
||||
group = self.parser.add_argument_group("Server")
|
||||
group.add_argument("-u", "--url", action="store",
|
||||
default="http://localhost:12380/",
|
||||
default=self.def_url,
|
||||
help="NilmDB server URL (default: %(default)s)")
|
||||
|
||||
sub = self.parser.add_subparsers(title="Commands",
|
||||
dest="command",
|
||||
description="Specify --help after "
|
||||
"the command for command-specific "
|
||||
"options.")
|
||||
sub = self.parser.add_subparsers(
|
||||
title="Commands", dest="command",
|
||||
description="Use 'help command' or 'command --help' for more "
|
||||
"details on a particular command.")
|
||||
|
||||
# Set up subcommands (defined in separate files)
|
||||
for cmd in subcommands:
|
||||
subcmd_mods[cmd].setup(self, sub)
|
||||
self.subcmd[cmd] = subcmd_mods[cmd].setup(self, sub)
|
||||
|
||||
def die(self, formatstr, *args):
|
||||
fprintf(sys.stderr, formatstr + "\n", *args)
|
||||
@@ -84,11 +86,13 @@ class Cmdline(object):
|
||||
|
||||
self.client = nilmdb.Client(self.args.url)
|
||||
|
||||
# Make a test connection to make sure things work
|
||||
try:
|
||||
server_version = self.client.version()
|
||||
except nilmdb.client.Error as e:
|
||||
self.die("error connecting to server: %s", str(e))
|
||||
# Make a test connection to make sure things work,
|
||||
# unless the particular command requests that we don't.
|
||||
if "no_test_connect" not in self.args:
|
||||
try:
|
||||
server_version = self.client.version()
|
||||
except nilmdb.client.Error as e:
|
||||
self.die("error connecting to server: %s", str(e))
|
||||
|
||||
# Now dispatch client request to appropriate function. Parser
|
||||
# should have ensured that we don't have any unknown commands
|
||||
|
@@ -26,6 +26,7 @@ Layout types are of the format: type_count
|
||||
help="Path (in database) of new stream, e.g. /foo/bar")
|
||||
group.add_argument("layout",
|
||||
help="Layout type for new stream, e.g. float32_8")
|
||||
return cmd
|
||||
|
||||
def cmd_create(self):
|
||||
"""Create new stream"""
|
||||
|
@@ -16,6 +16,7 @@ def setup(self, sub):
|
||||
group = cmd.add_argument_group("Required arguments")
|
||||
group.add_argument("path",
|
||||
help="Path of the stream to delete, e.g. /foo/bar")
|
||||
return cmd
|
||||
|
||||
def cmd_destroy(self):
|
||||
"""Destroy stream"""
|
||||
|
@@ -30,6 +30,7 @@ def setup(self, sub):
|
||||
help="Show raw timestamps in annotated information")
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Just output a count of matched data points")
|
||||
return cmd
|
||||
|
||||
def cmd_extract_verify(self):
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
|
26
nilmdb/cmdline/help.py
Normal file
26
nilmdb/cmdline/help.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from nilmdb.utils.printf import *
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("help", help="Show detailed help for a command",
|
||||
description="""
|
||||
Show help for a command. 'help command' is
|
||||
the same as 'command --help'.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_help)
|
||||
cmd.set_defaults(no_test_connect = True)
|
||||
cmd.add_argument("command", nargs="?",
|
||||
help="Command to get help about")
|
||||
cmd.add_argument("rest", nargs=argparse.REMAINDER,
|
||||
help=argparse.SUPPRESS)
|
||||
return cmd
|
||||
|
||||
def cmd_help(self):
|
||||
if self.args.command in self.subcmd:
|
||||
self.subcmd[self.args.command].print_help()
|
||||
else:
|
||||
self.parser.print_help()
|
||||
|
||||
return
|
@@ -12,6 +12,7 @@ def setup(self, sub):
|
||||
version.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_info)
|
||||
return cmd
|
||||
|
||||
def cmd_info(self):
|
||||
"""Print info about the server"""
|
||||
|
@@ -47,6 +47,7 @@ def setup(self, sub):
|
||||
help="Path of stream, e.g. /foo/bar")
|
||||
group.add_argument("file", nargs="*", default=['-'],
|
||||
help="File(s) to insert (default: - (stdin))")
|
||||
return cmd
|
||||
|
||||
def cmd_insert(self):
|
||||
# Find requested stream
|
||||
|
@@ -24,11 +24,13 @@ def setup(self, sub):
|
||||
group.add_argument("-l", "--layout", default="*",
|
||||
help="Match only this stream layout")
|
||||
|
||||
group = cmd.add_argument_group("Interval extent")
|
||||
group.add_argument("-E", "--extent", action="store_true",
|
||||
help="Show min/max timestamps in this stream")
|
||||
|
||||
group = cmd.add_argument_group("Interval details")
|
||||
group.add_argument("-d", "--detail", action="store_true",
|
||||
help="Show available data time intervals")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps in time intervals")
|
||||
group.add_argument("-s", "--start",
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Starting timestamp (free-form, inclusive)")
|
||||
@@ -36,6 +38,12 @@ def setup(self, sub):
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Ending timestamp (free-form, noninclusive)")
|
||||
|
||||
group = cmd.add_argument_group("Misc options")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps in time intervals or extents")
|
||||
|
||||
return cmd
|
||||
|
||||
def cmd_list_verify(self):
|
||||
# A hidden "path_positional" argument lets the user leave off the
|
||||
# "-p" when specifying the path. Handle it here.
|
||||
@@ -51,28 +59,38 @@ def cmd_list_verify(self):
|
||||
if self.args.start >= self.args.end:
|
||||
self.parser.error("start must precede end")
|
||||
|
||||
if self.args.start is not None or self.args.end is not None:
|
||||
if not self.args.detail:
|
||||
self.parser.error("--start and --end only make sense with --detail")
|
||||
|
||||
def cmd_list(self):
|
||||
"""List available streams"""
|
||||
streams = self.client.stream_list()
|
||||
streams = self.client.stream_list(extent = True)
|
||||
|
||||
if self.args.timestamp_raw:
|
||||
time_string = repr
|
||||
else:
|
||||
time_string = nilmdb.utils.time.format_time
|
||||
|
||||
for (path, layout) in streams:
|
||||
for (path, layout, extent_min, extent_max) in streams:
|
||||
if not (fnmatch.fnmatch(path, self.args.path) and
|
||||
fnmatch.fnmatch(layout, self.args.layout)):
|
||||
continue
|
||||
|
||||
printf("%s %s\n", path, layout)
|
||||
if not self.args.detail:
|
||||
continue
|
||||
|
||||
printed = False
|
||||
for (start, end) in self.client.stream_intervals(path, self.args.start,
|
||||
self.args.end):
|
||||
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
|
||||
printed = True
|
||||
if not printed:
|
||||
printf(" (no intervals)\n")
|
||||
if self.args.extent:
|
||||
if extent_min is None or extent_max is None:
|
||||
printf(" extent: (no data)\n")
|
||||
else:
|
||||
printf(" extent: %s -> %s\n",
|
||||
time_string(extent_min), time_string(extent_max))
|
||||
|
||||
if self.args.detail:
|
||||
printed = False
|
||||
for (start, end) in self.client.stream_intervals(
|
||||
path, self.args.start, self.args.end):
|
||||
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
|
||||
printed = True
|
||||
if not printed:
|
||||
printf(" (no intervals)\n")
|
||||
|
@@ -26,6 +26,7 @@ def setup(self, sub):
|
||||
exc.add_argument("-u", "--update", nargs="+", metavar="key=value",
|
||||
help="Update metadata using provided "
|
||||
"key=value pairs")
|
||||
return cmd
|
||||
|
||||
def cmd_metadata(self):
|
||||
"""Manipulate metadata"""
|
||||
|
@@ -23,6 +23,7 @@ def setup(self, sub):
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Output number of data points removed")
|
||||
return cmd
|
||||
|
||||
def cmd_remove(self):
|
||||
try:
|
||||
|
@@ -25,6 +25,12 @@ def main():
|
||||
default = os.path.join(os.getcwd(), "db"))
|
||||
group.add_argument('-q', '--quiet', help = 'Silence output',
|
||||
action = 'store_true')
|
||||
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
|
||||
'commits for sqlite transactions',
|
||||
action = 'store_true', default = False)
|
||||
group.add_argument('-t', '--traceback',
|
||||
help = 'Provide tracebacks in client errors',
|
||||
action = 'store_true', default = False)
|
||||
|
||||
group = parser.add_argument_group("Debug options")
|
||||
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
|
||||
@@ -35,7 +41,8 @@ def main():
|
||||
|
||||
# Create database object. Needs to be serialized before passing
|
||||
# to the Server.
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
|
||||
sync = not args.nosync)
|
||||
|
||||
# Configure the server
|
||||
if args.quiet:
|
||||
@@ -45,10 +52,12 @@ def main():
|
||||
server = nilmdb.server.Server(db,
|
||||
host = args.address,
|
||||
port = args.port,
|
||||
embedded = embedded)
|
||||
embedded = embedded,
|
||||
force_traceback = args.traceback)
|
||||
|
||||
# Print info
|
||||
if not args.quiet:
|
||||
print "Version: %s" % nilmdb.__version__
|
||||
print "Database: %s" % (os.path.realpath(args.database))
|
||||
if args.address == '0.0.0.0' or args.address == '::':
|
||||
host = socket.getfqdn()
|
||||
|
@@ -28,7 +28,7 @@ except: # pragma: no cover
|
||||
table_cache_size = 16
|
||||
fd_cache_size = 16
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class BulkData(object):
|
||||
def __init__(self, basepath, **kwargs):
|
||||
self.basepath = basepath
|
||||
@@ -171,7 +171,7 @@ class BulkData(object):
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class File(object):
|
||||
"""Object representing a single file on disk. Data can be appended,
|
||||
or the self.mmap handle can be used for random reads."""
|
||||
@@ -210,14 +210,28 @@ class File(object):
|
||||
self.mmap.close()
|
||||
self._f.close()
|
||||
|
||||
def append(self, data):
|
||||
def append(self, data): # pragma: no cover (below version used instead)
|
||||
# Write data, flush it, and resize our mmap accordingly
|
||||
self._f.write(data)
|
||||
self._f.flush()
|
||||
self.size += len(data)
|
||||
self._mmap_reopen()
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
def append_pack_iter(self, count, packer, dataiter):
|
||||
# An optimized verison of append, to avoid flushing the file
|
||||
# and resizing the mmap after each data point.
|
||||
try:
|
||||
rows = []
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
rows.append(packer(*row))
|
||||
self._f.write("".join(rows))
|
||||
finally:
|
||||
self._f.flush()
|
||||
self.size = self._f.tell()
|
||||
self._mmap_reopen()
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class Table(object):
|
||||
"""Tools to help access a single table (data at a specific OS path)."""
|
||||
# See design.md for design details
|
||||
@@ -351,9 +365,7 @@ class Table(object):
|
||||
f = self.file_open(subdir, fname)
|
||||
|
||||
# Write the data
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
f.append(self.packer.pack(*row))
|
||||
f.append_pack_iter(count, self.packer.pack, dataiter)
|
||||
remaining -= count
|
||||
self.nrows += count
|
||||
|
||||
@@ -398,8 +410,16 @@ class Table(object):
|
||||
|
||||
def _remove_rows(self, subdir, filename, start, stop):
|
||||
"""Helper to mark specific rows as being removed from a
|
||||
file, and potentially removing or truncating the file itself."""
|
||||
# Import an existing list of deleted rows for this file
|
||||
file, and potentially remove or truncate the file itself."""
|
||||
# Close potentially open file in file_open LRU cache
|
||||
self.file_open.cache_remove(self, subdir, filename)
|
||||
|
||||
# We keep a file like 0000.removed that contains a list of
|
||||
# which rows have been "removed". Note that we never have to
|
||||
# remove entries from this list, because we never decrease
|
||||
# self.nrows, and so we will never overwrite those locations in the
|
||||
# file. Only when the list covers the entire extent of the
|
||||
# file will that file be removed.
|
||||
datafile = os.path.join(self.root, subdir, filename)
|
||||
cachefile = datafile + ".removed"
|
||||
try:
|
||||
@@ -453,6 +473,14 @@ class Table(object):
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
# File needs to stick around. This means we can get
|
||||
# degenerate cases where we have large files containing as
|
||||
# little as one row. Try to punch a hole in the file,
|
||||
# so that this region doesn't take up filesystem space.
|
||||
offset = start * self.packer.size
|
||||
count = (stop - start) * self.packer.size
|
||||
nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
|
||||
|
||||
# Update cache. Try to do it atomically.
|
||||
nilmdb.utils.atomic.replace_file(cachefile,
|
||||
pickle.dumps(merged, 2))
|
||||
|
@@ -4,7 +4,6 @@ import time
|
||||
import sys
|
||||
import inspect
|
||||
import cStringIO
|
||||
import numpy as np
|
||||
|
||||
cdef enum:
|
||||
max_value_count = 64
|
||||
@@ -42,10 +41,12 @@ class Layout:
|
||||
|
||||
if datatype == 'uint16':
|
||||
self.parse = self.parse_uint16
|
||||
self.format = self.format_uint16
|
||||
self.format_str = "%.6f" + " %d" * self.count
|
||||
self.format = self.format_generic
|
||||
elif datatype == 'float32' or datatype == 'float64':
|
||||
self.parse = self.parse_float64
|
||||
self.format = self.format_float64
|
||||
self.format_str = "%.6f" + " %f" * self.count
|
||||
self.format = self.format_generic
|
||||
else:
|
||||
raise KeyError("invalid type")
|
||||
|
||||
@@ -57,15 +58,15 @@ class Layout:
|
||||
cdef double ts
|
||||
# Return doubles even in float32 case, since they're going into
|
||||
# a Python array which would upconvert to double anyway.
|
||||
result = []
|
||||
result = [0] * (self.count + 1)
|
||||
cdef char *end
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result.append(ts)
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
result.append(libc.stdlib.strtod(text, &end))
|
||||
result[n+1] = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
@@ -79,18 +80,18 @@ class Layout:
|
||||
cdef int n
|
||||
cdef double ts
|
||||
cdef int v
|
||||
result = []
|
||||
cdef char *end
|
||||
result = [0] * (self.count + 1)
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result.append(ts)
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
v = libc.stdlib.strtol(text, &end, 10)
|
||||
if v < 0 or v > 65535:
|
||||
raise ValueError("value out of range")
|
||||
result.append(v)
|
||||
result[n+1] = v
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
@@ -101,25 +102,12 @@ class Layout:
|
||||
return (ts, result)
|
||||
|
||||
# Formatters
|
||||
def format_float64(self, d):
|
||||
def format_generic(self, d):
|
||||
n = len(d) - 1
|
||||
if n != self.count:
|
||||
raise ValueError("wrong number of values for layout type: "
|
||||
"got %d, wanted %d" % (n, self.count))
|
||||
s = "%.6f" % d[0]
|
||||
for i in range(n):
|
||||
s += " %f" % d[i+1]
|
||||
return s + "\n"
|
||||
|
||||
def format_uint16(self, d):
|
||||
n = len(d) - 1
|
||||
if n != self.count:
|
||||
raise ValueError("wrong number of values for layout type: "
|
||||
"got %d, wanted %d" % (n, self.count))
|
||||
s = "%.6f" % d[0]
|
||||
for i in range(n):
|
||||
s += " %d" % d[i+1]
|
||||
return s + "\n"
|
||||
return (self.format_str % tuple(d)) + "\n"
|
||||
|
||||
# Get a layout by name
|
||||
def get_named(typestring):
|
||||
|
@@ -269,28 +269,39 @@ class NilmDB(object):
|
||||
|
||||
return
|
||||
|
||||
def stream_list(self, path = None, layout = None):
|
||||
"""Return list of [path, layout] lists of all streams
|
||||
in the database.
|
||||
def stream_list(self, path = None, layout = None, extent = False):
|
||||
"""Return list of lists of all streams in the database.
|
||||
|
||||
If path is specified, include only streams with a path that
|
||||
matches the given string.
|
||||
|
||||
If layout is specified, include only streams with a layout
|
||||
that matches the given string.
|
||||
"""
|
||||
where = "WHERE 1=1"
|
||||
params = ()
|
||||
if layout:
|
||||
where += " AND layout=?"
|
||||
params += (layout,)
|
||||
if path:
|
||||
where += " AND path=?"
|
||||
params += (path,)
|
||||
result = self.con.execute("SELECT path, layout "
|
||||
"FROM streams " + where, params).fetchall()
|
||||
|
||||
return sorted(list(x) for x in result)
|
||||
If extent = False, returns a list of lists containing
|
||||
the path and layout: [ path, layout ]
|
||||
|
||||
If extent = True, returns a list of lists containing the
|
||||
path, layout, and min/max extent of the data:
|
||||
[ path, layout, extent_min, extent_max ]
|
||||
"""
|
||||
params = ()
|
||||
query = "SELECT streams.path, streams.layout"
|
||||
if extent:
|
||||
query += ", min(ranges.start_time), max(ranges.end_time)"
|
||||
query += " FROM streams"
|
||||
if extent:
|
||||
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
|
||||
query += " WHERE 1=1"
|
||||
if layout is not None:
|
||||
query += " AND streams.layout=?"
|
||||
params += (layout,)
|
||||
if path is not None:
|
||||
query += " AND streams.path=?"
|
||||
params += (path,)
|
||||
query += " GROUP BY streams.id ORDER BY streams.path"
|
||||
result = self.con.execute(query, params).fetchall()
|
||||
return [ list(x) for x in result ]
|
||||
|
||||
def stream_intervals(self, path, start = None, end = None):
|
||||
"""
|
||||
|
@@ -71,16 +71,59 @@ def exception_to_httperror(*expected):
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom Cherrypy tools
|
||||
def allow_methods(methods):
|
||||
method = cherrypy.request.method.upper()
|
||||
if method not in methods:
|
||||
if method in cherrypy.request.methods_with_bodies:
|
||||
cherrypy.request.body.read()
|
||||
allowed = ', '.join(methods)
|
||||
cherrypy.response.headers['Allow'] = allowed
|
||||
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
|
||||
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
|
||||
# Custom CherryPy tools
|
||||
|
||||
def CORS_allow(methods):
|
||||
"""This does several things:
|
||||
|
||||
Handles CORS preflight requests.
|
||||
Adds Allow: header to all requests.
|
||||
Raise 405 if request.method not in method.
|
||||
|
||||
It is similar to cherrypy.tools.allow, with the CORS stuff added.
|
||||
"""
|
||||
request = cherrypy.request.headers
|
||||
response = cherrypy.response.headers
|
||||
|
||||
if not isinstance(methods, (tuple, list)): # pragma: no cover
|
||||
methods = [ methods ]
|
||||
methods = [ m.upper() for m in methods if m ]
|
||||
if not methods: # pragma: no cover
|
||||
methods = [ 'GET', 'HEAD' ]
|
||||
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
|
||||
methods.append('HEAD')
|
||||
response['Allow'] = ', '.join(methods)
|
||||
|
||||
# Allow all origins
|
||||
if 'Origin' in request:
|
||||
response['Access-Control-Allow-Origin'] = request['Origin']
|
||||
|
||||
# If it's a CORS request, send response.
|
||||
request_method = request.get("Access-Control-Request-Method", None)
|
||||
request_headers = request.get("Access-Control-Request-Headers", None)
|
||||
if (cherrypy.request.method == "OPTIONS" and
|
||||
request_method and request_headers):
|
||||
response['Access-Control-Allow-Headers'] = request_headers
|
||||
response['Access-Control-Allow-Methods'] = ', '.join(methods)
|
||||
# Try to stop further processing and return a 200 OK
|
||||
cherrypy.response.status = "200 OK"
|
||||
cherrypy.response.body = ""
|
||||
cherrypy.request.handler = lambda: ""
|
||||
return
|
||||
|
||||
# Reject methods that were not explicitly allowed
|
||||
if cherrypy.request.method not in methods:
|
||||
raise cherrypy.HTTPError(405)
|
||||
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
@@ -121,20 +164,29 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/list
|
||||
# /stream/list?layout=PrepData
|
||||
# /stream/list?path=/newton/prep
|
||||
# /stream/list?path=/newton/prep&extent=1
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
def list(self, path = None, layout = None):
|
||||
def list(self, path = None, layout = None, extent = None):
|
||||
"""List all streams in the database. With optional path or
|
||||
layout parameter, just list streams that match the given path
|
||||
or layout"""
|
||||
return self.db.stream_list(path, layout)
|
||||
or layout.
|
||||
|
||||
If extent is not given, returns a list of lists containing
|
||||
the path and layout: [ path, layout ]
|
||||
|
||||
If extent is provided, returns a list of lists containing the
|
||||
path, layout, and min/max extent of the data:
|
||||
[ path, layout, extent_min, extent_max ]
|
||||
"""
|
||||
return self.db.stream_list(path, layout, bool(extent))
|
||||
|
||||
# /stream/create?path=/newton/prep&layout=PrepData
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def create(self, path, layout):
|
||||
"""Create a new stream in the database. Provide path
|
||||
and one of the nilmdb.layout.layouts keys.
|
||||
@@ -143,9 +195,10 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/destroy?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def destroy(self, path):
|
||||
"""Delete a stream and its associated data."""
|
||||
return self.db.stream_destroy(path)
|
||||
@@ -176,31 +229,41 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/set_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
@exception_to_httperror(NilmDBError, LookupError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def set_metadata(self, path, data):
|
||||
"""Set metadata for the named stream, replacing any
|
||||
existing metadata. Data should be a json-encoded
|
||||
dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_set_metadata(path, data_dict)
|
||||
"""Set metadata for the named stream, replacing any existing
|
||||
metadata. Data can be json-encoded or a plain dictionary (if
|
||||
it was sent as application/json to begin with)"""
|
||||
if not isinstance(data, dict):
|
||||
try:
|
||||
data = dict(json.loads(data))
|
||||
except TypeError as e:
|
||||
raise NilmDBError("can't parse 'data' parameter: " + e.message)
|
||||
self.db.stream_set_metadata(path, data)
|
||||
|
||||
# /stream/update_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
@exception_to_httperror(NilmDBError, LookupError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def update_metadata(self, path, data):
|
||||
"""Update metadata for the named stream. Data
|
||||
should be a json-encoded dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_update_metadata(path, data_dict)
|
||||
if not isinstance(data, dict):
|
||||
try:
|
||||
data = dict(json.loads(data))
|
||||
except TypeError as e:
|
||||
raise NilmDBError("can't parse 'data' parameter: " + e.message)
|
||||
self.db.stream_update_metadata(path, data)
|
||||
|
||||
# /stream/insert?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@cherrypy.tools.allow_methods(methods = ["PUT"])
|
||||
@cherrypy.tools.CORS_allow(methods = ["PUT"])
|
||||
def insert(self, path, start, end):
|
||||
"""
|
||||
Insert new data into the database. Provide textual data
|
||||
@@ -254,9 +317,10 @@ class Stream(NilmApp):
|
||||
# /stream/remove?path=/newton/prep
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
@@ -400,7 +464,7 @@ class Server(object):
|
||||
'server.socket_host': host,
|
||||
'server.socket_port': port,
|
||||
'engine.autoreload_on': False,
|
||||
'server.max_request_body_size': 4*1024*1024,
|
||||
'server.max_request_body_size': 8*1024*1024,
|
||||
})
|
||||
if self.embedded:
|
||||
cherrypy.config.update({ 'environment': 'embedded' })
|
||||
@@ -411,16 +475,20 @@ class Server(object):
|
||||
'error_page.default': self.json_error_page,
|
||||
})
|
||||
|
||||
# Send a permissive Access-Control-Allow-Origin (CORS) header
|
||||
# with all responses so that browsers can send cross-domain
|
||||
# requests to this server.
|
||||
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
|
||||
'*' })
|
||||
# Some default headers to just help identify that things are working
|
||||
app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
|
||||
|
||||
# Only allow GET and HEAD by default. Individual handlers
|
||||
# can override.
|
||||
app_config.update({ 'tools.allow_methods.on': True,
|
||||
'tools.allow_methods.methods': ['GET', 'HEAD'] })
|
||||
# Set up Cross-Origin Resource Sharing (CORS) handler so we
|
||||
# can correctly respond to browsers' CORS preflight requests.
|
||||
# This also limits verbs to GET and HEAD by default.
|
||||
app_config.update({ 'tools.CORS_allow.on': True,
|
||||
'tools.CORS_allow.methods': ['GET', 'HEAD'] })
|
||||
|
||||
# Configure the 'json_in' tool to also allow other content-types
|
||||
# (like x-www-form-urlencoded), and to treat JSON as a dict that
|
||||
# fills requests.param.
|
||||
app_config.update({ 'tools.json_in.force': False,
|
||||
'tools.json_in.processor': json_to_request_params })
|
||||
|
||||
# Send tracebacks in error responses. They're hidden by the
|
||||
# error_page function for client errors (code 400-499).
|
||||
|
@@ -8,3 +8,4 @@ from nilmdb.utils.diskusage import du, human_size
|
||||
from nilmdb.utils.mustclose import must_close
|
||||
from nilmdb.utils import atomic
|
||||
import nilmdb.utils.threadsafety
|
||||
import nilmdb.utils.fallocate
|
||||
|
49
nilmdb/utils/fallocate.py
Normal file
49
nilmdb/utils/fallocate.py
Normal file
@@ -0,0 +1,49 @@
|
||||
# Implementation of hole punching via fallocate, if the OS
|
||||
# and filesystem support it.
|
||||
|
||||
try:
|
||||
import os
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
def make_fallocate():
|
||||
libc_name = ctypes.util.find_library('c')
|
||||
libc = ctypes.CDLL(libc_name, use_errno=True)
|
||||
|
||||
_fallocate = libc.fallocate
|
||||
_fallocate.restype = ctypes.c_int
|
||||
_fallocate.argtypes = [ ctypes.c_int, ctypes.c_int,
|
||||
ctypes.c_int64, ctypes.c_int64 ]
|
||||
|
||||
del libc
|
||||
del libc_name
|
||||
|
||||
def fallocate(fd, mode, offset, len_):
|
||||
res = _fallocate(fd, mode, offset, len_)
|
||||
if res != 0: # pragma: no cover
|
||||
errno = ctypes.get_errno()
|
||||
raise IOError(errno, os.strerror(errno))
|
||||
return fallocate
|
||||
|
||||
fallocate = make_fallocate()
|
||||
del make_fallocate
|
||||
except Exception: # pragma: no cover
|
||||
fallocate = None
|
||||
|
||||
FALLOC_FL_KEEP_SIZE = 0x01
|
||||
FALLOC_FL_PUNCH_HOLE = 0x02
|
||||
|
||||
def punch_hole(filename, offset, length, ignore_errors = True):
|
||||
"""Punch a hole in the file. This isn't well supported, so errors
|
||||
are ignored by default."""
|
||||
try:
|
||||
if fallocate is None: # pragma: no cover
|
||||
raise IOError("fallocate not available")
|
||||
with open(filename, "r+") as f:
|
||||
fallocate(f.fileno(),
|
||||
FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
|
||||
offset, length)
|
||||
except IOError: # pragma: no cover
|
||||
if ignore_errors:
|
||||
return
|
||||
raise
|
@@ -20,11 +20,12 @@ import unittest
|
||||
import warnings
|
||||
import resource
|
||||
import time
|
||||
import re
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
testdb = "tests/client-testdb"
|
||||
testurl = "http://localhost:12380/"
|
||||
testurl = "http://localhost:32180/"
|
||||
|
||||
def setup_module():
|
||||
global test_server, test_db
|
||||
@@ -34,7 +35,7 @@ def setup_module():
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(testdb, sync = False)
|
||||
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False,
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
force_traceback = False)
|
||||
test_server.start(blocking = False)
|
||||
@@ -54,20 +55,14 @@ class TestClient(object):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Trigger same error with a PUT request
|
||||
client = nilmdb.Client(url = "http://localhost:1/")
|
||||
with assert_raises(nilmdb.client.ServerError):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Then a fake URL on a real host
|
||||
client = nilmdb.Client(url = "http://localhost:12380/fake/")
|
||||
client = nilmdb.Client(url = "http://localhost:32180/fake/")
|
||||
with assert_raises(nilmdb.client.ClientError):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Now a real URL with no http:// prefix
|
||||
client = nilmdb.Client(url = "localhost:12380")
|
||||
client = nilmdb.Client(url = "localhost:32180")
|
||||
version = client.version()
|
||||
client.close()
|
||||
|
||||
@@ -172,6 +167,10 @@ class TestClient(object):
|
||||
def test_client_04_insert(self):
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Limit _max_data to 1 MB, since our test file is 1.5 MB
|
||||
old_max_data = nilmdb.client.client.StreamInserter._max_data
|
||||
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
|
||||
testfile = "tests/data/prep-20120323T1000"
|
||||
@@ -244,8 +243,9 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
# Client chunks the input, so the exact timestamp here might change
|
||||
# if the chunk positions change.
|
||||
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
|
||||
str(e.exception))
|
||||
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
|
||||
">= end time 1332511201.0", str(e.exception))
|
||||
is not None)
|
||||
|
||||
# Now do the real load
|
||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||
@@ -264,6 +264,7 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("verlap", str(e.exception))
|
||||
|
||||
nilmdb.client.client.StreamInserter._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
def test_client_05_extractremove(self):
|
||||
@@ -353,52 +354,48 @@ class TestClient(object):
|
||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||
headers())
|
||||
|
||||
# Make sure Access-Control-Allow-Origin gets set
|
||||
if "access-control-allow-origin: " not in headers():
|
||||
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
||||
"header in /stream/extract response:\n" +
|
||||
headers())
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_08_unicode(self):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = testurl)
|
||||
# Try both with and without posting JSON
|
||||
for post_json in (False, True):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = testurl, post_json = post_json)
|
||||
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
client.stream_destroy(stream[0])
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
client.stream_destroy(stream[0])
|
||||
|
||||
# Database is empty
|
||||
eq_(client.stream_list(), [])
|
||||
# Database is empty
|
||||
eq_(client.stream_list(), [])
|
||||
|
||||
# Create Unicode stream, match it
|
||||
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
|
||||
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
|
||||
client.stream_create(*raw)
|
||||
eq_(client.stream_list(), [raw])
|
||||
eq_(client.stream_list(layout=raw[1]), [raw])
|
||||
eq_(client.stream_list(path=raw[0]), [raw])
|
||||
client.stream_create(*prep)
|
||||
eq_(client.stream_list(), [prep, raw])
|
||||
# Create Unicode stream, match it
|
||||
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
|
||||
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
|
||||
client.stream_create(*raw)
|
||||
eq_(client.stream_list(), [raw])
|
||||
eq_(client.stream_list(layout=raw[1]), [raw])
|
||||
eq_(client.stream_list(path=raw[0]), [raw])
|
||||
client.stream_create(*prep)
|
||||
eq_(client.stream_list(), [prep, raw])
|
||||
|
||||
# Set / get metadata with Unicode keys and values
|
||||
eq_(client.stream_get_metadata(raw[0]), {})
|
||||
eq_(client.stream_get_metadata(prep[0]), {})
|
||||
meta1 = { u"alpha": u"α",
|
||||
u"β": u"beta" }
|
||||
meta2 = { u"alpha": u"α" }
|
||||
meta3 = { u"β": u"beta" }
|
||||
client.stream_set_metadata(prep[0], meta1)
|
||||
client.stream_update_metadata(prep[0], {})
|
||||
client.stream_update_metadata(raw[0], meta2)
|
||||
client.stream_update_metadata(raw[0], meta3)
|
||||
eq_(client.stream_get_metadata(prep[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||
# Set / get metadata with Unicode keys and values
|
||||
eq_(client.stream_get_metadata(raw[0]), {})
|
||||
eq_(client.stream_get_metadata(prep[0]), {})
|
||||
meta1 = { u"alpha": u"α",
|
||||
u"β": u"beta" }
|
||||
meta2 = { u"alpha": u"α" }
|
||||
meta3 = { u"β": u"beta" }
|
||||
client.stream_set_metadata(prep[0], meta1)
|
||||
client.stream_update_metadata(prep[0], {})
|
||||
client.stream_update_metadata(raw[0], meta2)
|
||||
client.stream_update_metadata(raw[0], meta3)
|
||||
eq_(client.stream_get_metadata(prep[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||
|
||||
client.close()
|
||||
client.close()
|
||||
|
||||
def test_client_09_closing(self):
|
||||
# Make sure we actually close sockets correctly. New
|
||||
@@ -577,7 +574,7 @@ class TestClient(object):
|
||||
def connections():
|
||||
try:
|
||||
poolmanager = c.http._last_response.connection.poolmanager
|
||||
pool = poolmanager.pools[('http','localhost',12380)]
|
||||
pool = poolmanager.pools[('http','localhost',32180)]
|
||||
return (pool.num_connections, pool.num_requests)
|
||||
except:
|
||||
raise SkipTest("can't get connection info")
|
||||
|
@@ -11,12 +11,7 @@ from nose.tools import assert_raises
|
||||
import itertools
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
import urllib2
|
||||
from urllib2 import urlopen, HTTPError
|
||||
import Queue
|
||||
import StringIO
|
||||
import shlex
|
||||
|
||||
@@ -32,7 +27,7 @@ def server_start(max_results = None, bulkdata_args = {}):
|
||||
max_results = max_results,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False,
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
force_traceback = False)
|
||||
test_server.start(blocking = False)
|
||||
@@ -64,6 +59,7 @@ class TestCmdline(object):
|
||||
passing the given input. Returns a tuple with the output and
|
||||
exit code"""
|
||||
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
|
||||
os.environ['NILMDB_URL'] = "http://localhost:32180/"
|
||||
class stdio_wrapper:
|
||||
def __init__(self, stdin, stdout, stderr):
|
||||
self.io = (stdin, stdout, stderr)
|
||||
@@ -174,7 +170,7 @@ class TestCmdline(object):
|
||||
self.fail("-u localhost:1 info")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.ok("-u localhost:12380 info")
|
||||
self.ok("-u localhost:32180 info")
|
||||
self.ok("info")
|
||||
|
||||
# Duplicated arguments should fail, but this isn't implemented
|
||||
@@ -192,6 +188,20 @@ class TestCmdline(object):
|
||||
self.fail("extract --start 2000-01-01 --start 2001-01-02")
|
||||
self.contain("duplicated argument")
|
||||
|
||||
# Verify that "help command" and "command --help" are identical
|
||||
# for all commands.
|
||||
self.fail("")
|
||||
m = re.search(r"{(.*)}", self.captured)
|
||||
for command in [""] + m.group(1).split(','):
|
||||
self.ok(command + " --help")
|
||||
cap1 = self.captured
|
||||
self.ok("help " + command)
|
||||
cap2 = self.captured
|
||||
self.ok("help " + command + " asdf --url --zxcv -")
|
||||
cap3 = self.captured
|
||||
eq_(cap1, cap2)
|
||||
eq_(cap2, cap3)
|
||||
|
||||
def test_02_parsetime(self):
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
test = datetime_tz.datetime_tz.now()
|
||||
@@ -210,7 +220,7 @@ class TestCmdline(object):
|
||||
|
||||
def test_03_info(self):
|
||||
self.ok("info")
|
||||
self.contain("Server URL: http://localhost:12380/")
|
||||
self.contain("Server URL: http://localhost:32180/")
|
||||
self.contain("Client version: " + nilmdb.__version__)
|
||||
self.contain("Server version: " + test_server.version)
|
||||
self.contain("Server database path")
|
||||
@@ -418,7 +428,7 @@ class TestCmdline(object):
|
||||
# bad start time
|
||||
self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null")
|
||||
|
||||
def test_07_detail(self):
|
||||
def test_07_detail_extent(self):
|
||||
# Just count the number of lines, it's probably fine
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 8)
|
||||
@@ -463,6 +473,18 @@ class TestCmdline(object):
|
||||
lines_(self.captured, 2)
|
||||
self.contain("[ 1332497115.612 -> 1332497159.991668 ]")
|
||||
|
||||
# Check --extent output
|
||||
self.ok("list --extent")
|
||||
lines_(self.captured, 6)
|
||||
|
||||
self.ok("list -E -T")
|
||||
self.contain(" extent: 1332496800 -> 1332497159.991668")
|
||||
self.contain(" extent: (no data)")
|
||||
|
||||
# Misc
|
||||
self.fail("list --extent --start='23 Mar 2012 10:05:15.50'")
|
||||
self.contain("--start and --end only make sense with --detail")
|
||||
|
||||
def test_08_extract(self):
|
||||
# nonexistent stream
|
||||
self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||
|
@@ -9,14 +9,7 @@ from nose.tools import assert_raises
|
||||
import distutils.version
|
||||
import itertools
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import cherrypy
|
||||
import threading
|
||||
import urllib2
|
||||
from urllib2 import urlopen, HTTPError
|
||||
import Queue
|
||||
import cStringIO
|
||||
import random
|
||||
import unittest
|
||||
|
||||
@@ -246,7 +239,7 @@ class TestLayoutSpeed:
|
||||
parser = Parser(layout)
|
||||
formatter = Formatter(layout)
|
||||
parser.parse(data)
|
||||
data = formatter.format(parser.data)
|
||||
formatter.format(parser.data)
|
||||
elapsed = time.time() - start
|
||||
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
|
||||
layout,
|
||||
@@ -264,3 +257,8 @@ class TestLayoutSpeed:
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(10) ]
|
||||
do_speedtest("uint16_10", datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(6) ]
|
||||
do_speedtest("uint16_6", datagen)
|
||||
|
@@ -34,6 +34,10 @@ class Bar:
|
||||
def __del__(self):
|
||||
fprintf(err, "Deleting\n")
|
||||
|
||||
@classmethod
|
||||
def baz(self):
|
||||
fprintf(err, "Baz\n")
|
||||
|
||||
def close(self):
|
||||
fprintf(err, "Closing\n")
|
||||
|
||||
|
@@ -6,15 +6,13 @@ import distutils.version
|
||||
import simplejson as json
|
||||
import itertools
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import cherrypy
|
||||
import threading
|
||||
import urllib2
|
||||
from urllib2 import urlopen, HTTPError
|
||||
import Queue
|
||||
import cStringIO
|
||||
import time
|
||||
import requests
|
||||
|
||||
from nilmdb.utils import serializer_proxy
|
||||
|
||||
@@ -119,7 +117,7 @@ class TestBlockingServer(object):
|
||||
|
||||
# Start web app on a custom port
|
||||
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = True)
|
||||
port = 32180, stoppable = True)
|
||||
|
||||
# Run it
|
||||
event = threading.Event()
|
||||
@@ -131,13 +129,13 @@ class TestBlockingServer(object):
|
||||
raise AssertionError("server didn't start in 10 seconds")
|
||||
|
||||
# Send request to exit.
|
||||
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
|
||||
req = urlopen("http://127.0.0.1:32180/exit/", timeout = 1)
|
||||
|
||||
# Wait for it
|
||||
thread.join()
|
||||
|
||||
def geturl(path):
|
||||
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
|
||||
req = urlopen("http://127.0.0.1:32180" + path, timeout = 10)
|
||||
return req.read()
|
||||
|
||||
def getjson(path):
|
||||
@@ -149,7 +147,7 @@ class TestServer(object):
|
||||
# Start web app on a custom port
|
||||
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
|
||||
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False)
|
||||
port = 32180, stoppable = False)
|
||||
self.server.start(blocking = False)
|
||||
|
||||
def tearDown(self):
|
||||
@@ -208,3 +206,51 @@ class TestServer(object):
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=foo")
|
||||
eq_(data, {'foo': None})
|
||||
|
||||
def test_cors_headers(self):
|
||||
# Test that CORS headers are being set correctly
|
||||
|
||||
# Normal GET should send simple response
|
||||
url = "http://127.0.0.1:32180/stream/list"
|
||||
r = requests.get(url, headers = { "Origin": "http://google.com/" })
|
||||
eq_(r.status_code, 200)
|
||||
if "access-control-allow-origin" not in r.headers:
|
||||
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
||||
"header in response:\n", r.headers)
|
||||
eq_(r.headers["access-control-allow-origin"], "http://google.com/")
|
||||
|
||||
# OPTIONS without CORS preflight headers should result in 405
|
||||
r = requests.options(url, headers = {
|
||||
"Origin": "http://google.com/",
|
||||
})
|
||||
eq_(r.status_code, 405)
|
||||
|
||||
# OPTIONS with preflight headers should give preflight response
|
||||
r = requests.options(url, headers = {
|
||||
"Origin": "http://google.com/",
|
||||
"Access-Control-Request-Method": "POST",
|
||||
"Access-Control-Request-Headers": "X-Custom",
|
||||
})
|
||||
eq_(r.status_code, 200)
|
||||
if "access-control-allow-origin" not in r.headers:
|
||||
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
||||
"header in response:\n", r.headers)
|
||||
eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
|
||||
eq_(r.headers["access-control-allow-headers"], "X-Custom")
|
||||
|
||||
def test_post_bodies(self):
|
||||
# Test JSON post bodies
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '{"hello": 1}')
|
||||
eq_(r.status_code, 404) # wrong parameters
|
||||
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '["hello"]')
|
||||
eq_(r.status_code, 415) # not a dict
|
||||
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '[hello]')
|
||||
eq_(r.status_code, 400) # badly formatted JSON
|
||||
|
Reference in New Issue
Block a user