Compare commits

..

4 Commits

Author SHA1 Message Date
fe91ff59a3 Better handling of JSON requests 2013-03-05 12:38:08 -05:00
64c24a00d6 Add --traceback argument to nilmdb-server script 2013-03-05 12:20:07 -05:00
58c0ae72f6 Support application/json POST bodies as well as x-www-form-urlencoded 2013-03-05 11:54:29 -05:00
c5f079f61f When removing data from files, try to punch a hole.
Requires fallocate(2) support with FALLOC_FL_PUNCH_HOLE, as
well as a filesystem that supports it (in Linux 3.7,
tmpfs, btrfs, xfs, or ext4)
2013-03-04 20:31:14 -05:00
9 changed files with 192 additions and 57 deletions

View File

@@ -21,8 +21,12 @@ def extract_timestamp(line):
class Client(object): class Client(object):
"""Main client interface to the Nilm database.""" """Main client interface to the Nilm database."""
def __init__(self, url): def __init__(self, url, post_json = False):
self.http = nilmdb.client.httpclient.HTTPClient(url) """Initialize client with given URL. If post_json is true,
POST requests are sent with Content-Type 'application/json'
instead of the default 'x-www-form-urlencoded'."""
self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
self.post_json = post_json
# __enter__/__exit__ allow this class to be a context manager # __enter__/__exit__ allow this class to be a context manager
def __enter__(self): def __enter__(self):
@@ -31,8 +35,11 @@ class Client(object):
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
self.close() self.close()
def _json_param(self, data): def _json_post_param(self, data):
"""Return compact json-encoded version of parameter""" """Return compact json-encoded version of parameter"""
if self.post_json:
# If we're posting as JSON, we don't need to encode it further here
return data
return json.dumps(data, separators=(',',':')) return json.dumps(data, separators=(',',':'))
def close(self): def close(self):
@@ -73,7 +80,7 @@ class Client(object):
metadata.""" metadata."""
params = { params = {
"path": path, "path": path,
"data": self._json_param(data) "data": self._json_post_param(data)
} }
return self.http.post("stream/set_metadata", params) return self.http.post("stream/set_metadata", params)
@@ -81,7 +88,7 @@ class Client(object):
"""Update stream metadata from a dictionary""" """Update stream metadata from a dictionary"""
params = { params = {
"path": path, "path": path,
"data": self._json_param(data) "data": self._json_post_param(data)
} }
return self.http.post("stream/update_metadata", params) return self.http.post("stream/update_metadata", params)

View File

@@ -10,7 +10,7 @@ import requests
class HTTPClient(object): class HTTPClient(object):
"""Class to manage and perform HTTP requests from the client""" """Class to manage and perform HTTP requests from the client"""
def __init__(self, baseurl = ""): def __init__(self, baseurl = "", post_json = False):
"""If baseurl is supplied, all other functions that take """If baseurl is supplied, all other functions that take
a URL can be given a relative URL instead.""" a URL can be given a relative URL instead."""
# Verify / clean up URL # Verify / clean up URL
@@ -26,6 +26,10 @@ class HTTPClient(object):
# Saved response, so that tests can verify a few things. # Saved response, so that tests can verify a few things.
self._last_response = {} self._last_response = {}
# Whether to send application/json POST bodies (versus
# x-www-form-urlencoded)
self.post_json = post_json
def _handle_error(self, url, code, body): def _handle_error(self, url, code, body):
# Default variables for exception. We use the entire body as # Default variables for exception. We use the entire body as
# the default message, in case we can't extract it from a JSON # the default message, in case we can't extract it from a JSON
@@ -57,13 +61,14 @@ class HTTPClient(object):
def close(self): def close(self):
self.session.close() self.session.close()
def _do_req(self, method, url, query_data, body_data, stream): def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url) url = urlparse.urljoin(self.baseurl, url)
try: try:
response = self.session.request(method, url, response = self.session.request(method, url,
params = query_data, params = query_data,
data = body_data, data = body_data,
stream = stream) stream = stream,
headers = headers)
except requests.RequestException as e: except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url, raise ServerError(status = "502 Error", url = url,
message = str(e.message)) message = str(e.message))
@@ -77,12 +82,13 @@ class HTTPClient(object):
return (response, False) return (response, False)
# Normal versions that return data directly # Normal versions that return data directly
def _req(self, method, url, query = None, body = None): def _req(self, method, url, query = None, body = None, headers = None):
""" """
Make a request and return the body data as a string or parsed Make a request and return the body data as a string or parsed
JSON object, or raise an error if it contained an error. JSON object, or raise an error if it contained an error.
""" """
(response, isjson) = self._do_req(method, url, query, body, False) (response, isjson) = self._do_req(method, url, query, body,
stream = False, headers = headers)
if isjson: if isjson:
return json.loads(response.content) return json.loads(response.content)
return response.content return response.content
@@ -93,20 +99,26 @@ class HTTPClient(object):
def post(self, url, params = None): def post(self, url, params = None):
"""Simple POST (parameters in body)""" """Simple POST (parameters in body)"""
return self._req("POST", url, None, params) if self.post_json:
return self._req("POST", url, None,
json.dumps(params),
{ 'Content-type': 'application/json' })
else:
return self._req("POST", url, None, params)
def put(self, url, data, params = None): def put(self, url, data, params = None):
"""Simple PUT (parameters in URL, data in body)""" """Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data) return self._req("PUT", url, params, data)
# Generator versions that return data one line at a time. # Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None): def _req_gen(self, method, url, query = None, body = None, headers = None):
""" """
Make a request and return a generator that gives back strings Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if or JSON decoded lines of the body data, or raise an error if
it contained an eror. it contained an eror.
""" """
(response, isjson) = self._do_req(method, url, query, body, True) (response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
for line in response.iter_lines(): for line in response.iter_lines():
if isjson: if isjson:
yield json.loads(line) yield json.loads(line)

View File

@@ -28,6 +28,9 @@ def main():
group.add_argument('-n', '--nosync', help = 'Use asynchronous ' group.add_argument('-n', '--nosync', help = 'Use asynchronous '
'commits for sqlite transactions', 'commits for sqlite transactions',
action = 'store_true', default = False) action = 'store_true', default = False)
group.add_argument('-t', '--traceback',
help = 'Provide tracebacks in client errors',
action = 'store_true', default = False)
group = parser.add_argument_group("Debug options") group = parser.add_argument_group("Debug options")
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and ' group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
@@ -49,7 +52,8 @@ def main():
server = nilmdb.server.Server(db, server = nilmdb.server.Server(db,
host = args.address, host = args.address,
port = args.port, port = args.port,
embedded = embedded) embedded = embedded,
force_traceback = args.traceback)
# Print info # Print info
if not args.quiet: if not args.quiet:

View File

@@ -410,8 +410,16 @@ class Table(object):
def _remove_rows(self, subdir, filename, start, stop): def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a """Helper to mark specific rows as being removed from a
file, and potentially removing or truncating the file itself.""" file, and potentially remove or truncate the file itself."""
# Import an existing list of deleted rows for this file # Close potentially open file in file_open LRU cache
self.file_open.cache_remove(self, subdir, filename)
# We keep a file like 0000.removed that contains a list of
# which rows have been "removed". Note that we never have to
# remove entries from this list, because we never decrease
# self.nrows, and so we will never overwrite those locations in the
# file. Only when the list covers the entire extent of the
# file will that file be removed.
datafile = os.path.join(self.root, subdir, filename) datafile = os.path.join(self.root, subdir, filename)
cachefile = datafile + ".removed" cachefile = datafile + ".removed"
try: try:
@@ -465,6 +473,14 @@ class Table(object):
except: except:
pass pass
else: else:
# File needs to stick around. This means we can get
# degenerate cases where we have large files containing as
# little as one row. Try to punch a hole in the file,
# so that this region doesn't take up filesystem space.
offset = start * self.packer.size
count = (stop - start) * self.packer.size
nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
# Update cache. Try to do it atomically. # Update cache. Try to do it atomically.
nilmdb.utils.atomic.replace_file(cachefile, nilmdb.utils.atomic.replace_file(cachefile,
pickle.dumps(merged, 2)) pickle.dumps(merged, 2))

View File

@@ -117,6 +117,14 @@ def CORS_allow(methods):
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps # CherryPy apps
class Root(NilmApp): class Root(NilmApp):
"""Root application for NILM database""" """Root application for NILM database"""
@@ -175,6 +183,7 @@ class Stream(NilmApp):
# /stream/create?path=/newton/prep&layout=PrepData # /stream/create?path=/newton/prep&layout=PrepData
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError) @exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
@@ -186,6 +195,7 @@ class Stream(NilmApp):
# /stream/destroy?path=/newton/prep # /stream/destroy?path=/newton/prep
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError) @exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
@@ -219,26 +229,36 @@ class Stream(NilmApp):
# /stream/set_metadata?path=/newton/prep&data=<json> # /stream/set_metadata?path=/newton/prep&data=<json>
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError) @exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def set_metadata(self, path, data): def set_metadata(self, path, data):
"""Set metadata for the named stream, replacing any """Set metadata for the named stream, replacing any existing
existing metadata. Data should be a json-encoded metadata. Data can be json-encoded or a plain dictionary (if
dictionary""" it was sent as application/json to begin with)"""
data_dict = json.loads(data) if not isinstance(data, dict):
self.db.stream_set_metadata(path, data_dict) try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_set_metadata(path, data)
# /stream/update_metadata?path=/newton/prep&data=<json> # /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError) @exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def update_metadata(self, path, data): def update_metadata(self, path, data):
"""Update metadata for the named stream. Data """Update metadata for the named stream. Data
should be a json-encoded dictionary""" should be a json-encoded dictionary"""
data_dict = json.loads(data) if not isinstance(data, dict):
self.db.stream_update_metadata(path, data_dict) try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_update_metadata(path, data)
# /stream/insert?path=/newton/prep # /stream/insert?path=/newton/prep
@cherrypy.expose @cherrypy.expose
@@ -297,6 +317,7 @@ class Stream(NilmApp):
# /stream/remove?path=/newton/prep # /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError) @exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
@@ -463,6 +484,12 @@ class Server(object):
app_config.update({ 'tools.CORS_allow.on': True, app_config.update({ 'tools.CORS_allow.on': True,
'tools.CORS_allow.methods': ['GET', 'HEAD'] }) 'tools.CORS_allow.methods': ['GET', 'HEAD'] })
# Configure the 'json_in' tool to also allow other content-types
# (like x-www-form-urlencoded), and to treat JSON as a dict that
# fills requests.param.
app_config.update({ 'tools.json_in.force': False,
'tools.json_in.processor': json_to_request_params })
# Send tracebacks in error responses. They're hidden by the # Send tracebacks in error responses. They're hidden by the
# error_page function for client errors (code 400-499). # error_page function for client errors (code 400-499).
app_config.update({ 'request.show_tracebacks' : True }) app_config.update({ 'request.show_tracebacks' : True })

View File

@@ -8,3 +8,4 @@ from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close from nilmdb.utils.mustclose import must_close
from nilmdb.utils import atomic from nilmdb.utils import atomic
import nilmdb.utils.threadsafety import nilmdb.utils.threadsafety
import nilmdb.utils.fallocate

49
nilmdb/utils/fallocate.py Normal file
View File

@@ -0,0 +1,49 @@
# Implementation of hole punching via fallocate, if the OS
# and filesystem support it.
try:
import os
import ctypes
import ctypes.util
def make_fallocate():
libc_name = ctypes.util.find_library('c')
libc = ctypes.CDLL(libc_name, use_errno=True)
_fallocate = libc.fallocate
_fallocate.restype = ctypes.c_int
_fallocate.argtypes = [ ctypes.c_int, ctypes.c_int,
ctypes.c_int64, ctypes.c_int64 ]
del libc
del libc_name
def fallocate(fd, mode, offset, len_):
res = _fallocate(fd, mode, offset, len_)
if res != 0: # pragma: no cover
errno = ctypes.get_errno()
raise IOError(errno, os.strerror(errno))
return fallocate
fallocate = make_fallocate()
del make_fallocate
except Exception: # pragma: no cover
fallocate = None
FALLOC_FL_KEEP_SIZE = 0x01
FALLOC_FL_PUNCH_HOLE = 0x02
def punch_hole(filename, offset, length, ignore_errors = True):
"""Punch a hole in the file. This isn't well supported, so errors
are ignored by default."""
try:
if fallocate is None: # pragma: no cover
raise IOError("fallocate not available")
with open(filename, "r+") as f:
fallocate(f.fileno(),
FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
offset, length)
except IOError: # pragma: no cover
if ignore_errors:
return
raise

View File

@@ -357,43 +357,45 @@ class TestClient(object):
client.close() client.close()
def test_client_08_unicode(self): def test_client_08_unicode(self):
# Basic Unicode tests # Try both with and without posting JSON
client = nilmdb.Client(url = testurl) for post_json in (False, True):
# Basic Unicode tests
client = nilmdb.Client(url = testurl, post_json = post_json)
# Delete streams that exist # Delete streams that exist
for stream in client.stream_list(): for stream in client.stream_list():
client.stream_destroy(stream[0]) client.stream_destroy(stream[0])
# Database is empty # Database is empty
eq_(client.stream_list(), []) eq_(client.stream_list(), [])
# Create Unicode stream, match it # Create Unicode stream, match it
raw = [ u"/düsseldorf/raw", u"uint16_6" ] raw = [ u"/düsseldorf/raw", u"uint16_6" ]
prep = [ u"/düsseldorf/prep", u"uint16_6" ] prep = [ u"/düsseldorf/prep", u"uint16_6" ]
client.stream_create(*raw) client.stream_create(*raw)
eq_(client.stream_list(), [raw]) eq_(client.stream_list(), [raw])
eq_(client.stream_list(layout=raw[1]), [raw]) eq_(client.stream_list(layout=raw[1]), [raw])
eq_(client.stream_list(path=raw[0]), [raw]) eq_(client.stream_list(path=raw[0]), [raw])
client.stream_create(*prep) client.stream_create(*prep)
eq_(client.stream_list(), [prep, raw]) eq_(client.stream_list(), [prep, raw])
# Set / get metadata with Unicode keys and values # Set / get metadata with Unicode keys and values
eq_(client.stream_get_metadata(raw[0]), {}) eq_(client.stream_get_metadata(raw[0]), {})
eq_(client.stream_get_metadata(prep[0]), {}) eq_(client.stream_get_metadata(prep[0]), {})
meta1 = { u"alpha": u"α", meta1 = { u"alpha": u"α",
u"β": u"beta" } u"β": u"beta" }
meta2 = { u"alpha": u"α" } meta2 = { u"alpha": u"α" }
meta3 = { u"β": u"beta" } meta3 = { u"β": u"beta" }
client.stream_set_metadata(prep[0], meta1) client.stream_set_metadata(prep[0], meta1)
client.stream_update_metadata(prep[0], {}) client.stream_update_metadata(prep[0], {})
client.stream_update_metadata(raw[0], meta2) client.stream_update_metadata(raw[0], meta2)
client.stream_update_metadata(raw[0], meta3) client.stream_update_metadata(raw[0], meta3)
eq_(client.stream_get_metadata(prep[0]), meta1) eq_(client.stream_get_metadata(prep[0]), meta1)
eq_(client.stream_get_metadata(raw[0]), meta1) eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2) eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1) eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close() client.close()
def test_client_09_closing(self): def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New # Make sure we actually close sockets correctly. New

View File

@@ -237,3 +237,20 @@ class TestServer(object):
"header in response:\n", r.headers) "header in response:\n", r.headers)
eq_(r.headers["access-control-allow-methods"], "GET, HEAD") eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
eq_(r.headers["access-control-allow-headers"], "X-Custom") eq_(r.headers["access-control-allow-headers"], "X-Custom")
def test_post_bodies(self):
# Test JSON post bodies
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '{"hello": 1}')
eq_(r.status_code, 404) # wrong parameters
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '["hello"]')
eq_(r.status_code, 415) # not a dict
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '[hello]')
eq_(r.status_code, 400) # badly formatted JSON