Compare commits
4 Commits
nilmdb-1.2
...
nilmdb-1.2
Author | SHA1 | Date | |
---|---|---|---|
fe91ff59a3 | |||
64c24a00d6 | |||
58c0ae72f6 | |||
c5f079f61f |
@@ -21,8 +21,12 @@ def extract_timestamp(line):
|
||||
class Client(object):
|
||||
"""Main client interface to the Nilm database."""
|
||||
|
||||
def __init__(self, url):
|
||||
self.http = nilmdb.client.httpclient.HTTPClient(url)
|
||||
def __init__(self, url, post_json = False):
|
||||
"""Initialize client with given URL. If post_json is true,
|
||||
POST requests are sent with Content-Type 'application/json'
|
||||
instead of the default 'x-www-form-urlencoded'."""
|
||||
self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
|
||||
self.post_json = post_json
|
||||
|
||||
# __enter__/__exit__ allow this class to be a context manager
|
||||
def __enter__(self):
|
||||
@@ -31,8 +35,11 @@ class Client(object):
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
def _json_param(self, data):
|
||||
def _json_post_param(self, data):
|
||||
"""Return compact json-encoded version of parameter"""
|
||||
if self.post_json:
|
||||
# If we're posting as JSON, we don't need to encode it further here
|
||||
return data
|
||||
return json.dumps(data, separators=(',',':'))
|
||||
|
||||
def close(self):
|
||||
@@ -73,7 +80,7 @@ class Client(object):
|
||||
metadata."""
|
||||
params = {
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
"data": self._json_post_param(data)
|
||||
}
|
||||
return self.http.post("stream/set_metadata", params)
|
||||
|
||||
@@ -81,7 +88,7 @@ class Client(object):
|
||||
"""Update stream metadata from a dictionary"""
|
||||
params = {
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
"data": self._json_post_param(data)
|
||||
}
|
||||
return self.http.post("stream/update_metadata", params)
|
||||
|
||||
|
@@ -10,7 +10,7 @@ import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
def __init__(self, baseurl = ""):
|
||||
def __init__(self, baseurl = "", post_json = False):
|
||||
"""If baseurl is supplied, all other functions that take
|
||||
a URL can be given a relative URL instead."""
|
||||
# Verify / clean up URL
|
||||
@@ -26,6 +26,10 @@ class HTTPClient(object):
|
||||
# Saved response, so that tests can verify a few things.
|
||||
self._last_response = {}
|
||||
|
||||
# Whether to send application/json POST bodies (versus
|
||||
# x-www-form-urlencoded)
|
||||
self.post_json = post_json
|
||||
|
||||
def _handle_error(self, url, code, body):
|
||||
# Default variables for exception. We use the entire body as
|
||||
# the default message, in case we can't extract it from a JSON
|
||||
@@ -57,13 +61,14 @@ class HTTPClient(object):
|
||||
def close(self):
|
||||
self.session.close()
|
||||
|
||||
def _do_req(self, method, url, query_data, body_data, stream):
|
||||
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data,
|
||||
stream = stream)
|
||||
stream = stream,
|
||||
headers = headers)
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
@@ -77,12 +82,13 @@ class HTTPClient(object):
|
||||
return (response, False)
|
||||
|
||||
# Normal versions that return data directly
|
||||
def _req(self, method, url, query = None, body = None):
|
||||
def _req(self, method, url, query = None, body = None, headers = None):
|
||||
"""
|
||||
Make a request and return the body data as a string or parsed
|
||||
JSON object, or raise an error if it contained an error.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, False)
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = False, headers = headers)
|
||||
if isjson:
|
||||
return json.loads(response.content)
|
||||
return response.content
|
||||
@@ -93,20 +99,26 @@ class HTTPClient(object):
|
||||
|
||||
def post(self, url, params = None):
|
||||
"""Simple POST (parameters in body)"""
|
||||
return self._req("POST", url, None, params)
|
||||
if self.post_json:
|
||||
return self._req("POST", url, None,
|
||||
json.dumps(params),
|
||||
{ 'Content-type': 'application/json' })
|
||||
else:
|
||||
return self._req("POST", url, None, params)
|
||||
|
||||
def put(self, url, data, params = None):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
return self._req("PUT", url, params, data)
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def _req_gen(self, method, url, query = None, body = None):
|
||||
def _req_gen(self, method, url, query = None, body = None, headers = None):
|
||||
"""
|
||||
Make a request and return a generator that gives back strings
|
||||
or JSON decoded lines of the body data, or raise an error if
|
||||
it contained an eror.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, True)
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = True, headers = headers)
|
||||
for line in response.iter_lines():
|
||||
if isjson:
|
||||
yield json.loads(line)
|
||||
|
@@ -28,6 +28,9 @@ def main():
|
||||
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
|
||||
'commits for sqlite transactions',
|
||||
action = 'store_true', default = False)
|
||||
group.add_argument('-t', '--traceback',
|
||||
help = 'Provide tracebacks in client errors',
|
||||
action = 'store_true', default = False)
|
||||
|
||||
group = parser.add_argument_group("Debug options")
|
||||
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
|
||||
@@ -49,7 +52,8 @@ def main():
|
||||
server = nilmdb.server.Server(db,
|
||||
host = args.address,
|
||||
port = args.port,
|
||||
embedded = embedded)
|
||||
embedded = embedded,
|
||||
force_traceback = args.traceback)
|
||||
|
||||
# Print info
|
||||
if not args.quiet:
|
||||
|
@@ -410,8 +410,16 @@ class Table(object):
|
||||
|
||||
def _remove_rows(self, subdir, filename, start, stop):
|
||||
"""Helper to mark specific rows as being removed from a
|
||||
file, and potentially removing or truncating the file itself."""
|
||||
# Import an existing list of deleted rows for this file
|
||||
file, and potentially remove or truncate the file itself."""
|
||||
# Close potentially open file in file_open LRU cache
|
||||
self.file_open.cache_remove(self, subdir, filename)
|
||||
|
||||
# We keep a file like 0000.removed that contains a list of
|
||||
# which rows have been "removed". Note that we never have to
|
||||
# remove entries from this list, because we never decrease
|
||||
# self.nrows, and so we will never overwrite those locations in the
|
||||
# file. Only when the list covers the entire extent of the
|
||||
# file will that file be removed.
|
||||
datafile = os.path.join(self.root, subdir, filename)
|
||||
cachefile = datafile + ".removed"
|
||||
try:
|
||||
@@ -465,6 +473,14 @@ class Table(object):
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
# File needs to stick around. This means we can get
|
||||
# degenerate cases where we have large files containing as
|
||||
# little as one row. Try to punch a hole in the file,
|
||||
# so that this region doesn't take up filesystem space.
|
||||
offset = start * self.packer.size
|
||||
count = (stop - start) * self.packer.size
|
||||
nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
|
||||
|
||||
# Update cache. Try to do it atomically.
|
||||
nilmdb.utils.atomic.replace_file(cachefile,
|
||||
pickle.dumps(merged, 2))
|
||||
|
@@ -117,6 +117,14 @@ def CORS_allow(methods):
|
||||
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
"""Root application for NILM database"""
|
||||
@@ -175,6 +183,7 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/create?path=/newton/prep&layout=PrepData
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
@@ -186,6 +195,7 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/destroy?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
@@ -219,26 +229,36 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/set_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@exception_to_httperror(NilmDBError, LookupError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def set_metadata(self, path, data):
|
||||
"""Set metadata for the named stream, replacing any
|
||||
existing metadata. Data should be a json-encoded
|
||||
dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_set_metadata(path, data_dict)
|
||||
"""Set metadata for the named stream, replacing any existing
|
||||
metadata. Data can be json-encoded or a plain dictionary (if
|
||||
it was sent as application/json to begin with)"""
|
||||
if not isinstance(data, dict):
|
||||
try:
|
||||
data = dict(json.loads(data))
|
||||
except TypeError as e:
|
||||
raise NilmDBError("can't parse 'data' parameter: " + e.message)
|
||||
self.db.stream_set_metadata(path, data)
|
||||
|
||||
# /stream/update_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@exception_to_httperror(NilmDBError, LookupError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def update_metadata(self, path, data):
|
||||
"""Update metadata for the named stream. Data
|
||||
should be a json-encoded dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_update_metadata(path, data_dict)
|
||||
if not isinstance(data, dict):
|
||||
try:
|
||||
data = dict(json.loads(data))
|
||||
except TypeError as e:
|
||||
raise NilmDBError("can't parse 'data' parameter: " + e.message)
|
||||
self.db.stream_update_metadata(path, data)
|
||||
|
||||
# /stream/insert?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@@ -297,6 +317,7 @@ class Stream(NilmApp):
|
||||
# /stream/remove?path=/newton/prep
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
@@ -463,6 +484,12 @@ class Server(object):
|
||||
app_config.update({ 'tools.CORS_allow.on': True,
|
||||
'tools.CORS_allow.methods': ['GET', 'HEAD'] })
|
||||
|
||||
# Configure the 'json_in' tool to also allow other content-types
|
||||
# (like x-www-form-urlencoded), and to treat JSON as a dict that
|
||||
# fills requests.param.
|
||||
app_config.update({ 'tools.json_in.force': False,
|
||||
'tools.json_in.processor': json_to_request_params })
|
||||
|
||||
# Send tracebacks in error responses. They're hidden by the
|
||||
# error_page function for client errors (code 400-499).
|
||||
app_config.update({ 'request.show_tracebacks' : True })
|
||||
|
@@ -8,3 +8,4 @@ from nilmdb.utils.diskusage import du, human_size
|
||||
from nilmdb.utils.mustclose import must_close
|
||||
from nilmdb.utils import atomic
|
||||
import nilmdb.utils.threadsafety
|
||||
import nilmdb.utils.fallocate
|
||||
|
49
nilmdb/utils/fallocate.py
Normal file
49
nilmdb/utils/fallocate.py
Normal file
@@ -0,0 +1,49 @@
|
||||
# Implementation of hole punching via fallocate, if the OS
|
||||
# and filesystem support it.
|
||||
|
||||
try:
|
||||
import os
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
def make_fallocate():
|
||||
libc_name = ctypes.util.find_library('c')
|
||||
libc = ctypes.CDLL(libc_name, use_errno=True)
|
||||
|
||||
_fallocate = libc.fallocate
|
||||
_fallocate.restype = ctypes.c_int
|
||||
_fallocate.argtypes = [ ctypes.c_int, ctypes.c_int,
|
||||
ctypes.c_int64, ctypes.c_int64 ]
|
||||
|
||||
del libc
|
||||
del libc_name
|
||||
|
||||
def fallocate(fd, mode, offset, len_):
|
||||
res = _fallocate(fd, mode, offset, len_)
|
||||
if res != 0: # pragma: no cover
|
||||
errno = ctypes.get_errno()
|
||||
raise IOError(errno, os.strerror(errno))
|
||||
return fallocate
|
||||
|
||||
fallocate = make_fallocate()
|
||||
del make_fallocate
|
||||
except Exception: # pragma: no cover
|
||||
fallocate = None
|
||||
|
||||
FALLOC_FL_KEEP_SIZE = 0x01
|
||||
FALLOC_FL_PUNCH_HOLE = 0x02
|
||||
|
||||
def punch_hole(filename, offset, length, ignore_errors = True):
|
||||
"""Punch a hole in the file. This isn't well supported, so errors
|
||||
are ignored by default."""
|
||||
try:
|
||||
if fallocate is None: # pragma: no cover
|
||||
raise IOError("fallocate not available")
|
||||
with open(filename, "r+") as f:
|
||||
fallocate(f.fileno(),
|
||||
FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
|
||||
offset, length)
|
||||
except IOError: # pragma: no cover
|
||||
if ignore_errors:
|
||||
return
|
||||
raise
|
@@ -357,43 +357,45 @@ class TestClient(object):
|
||||
client.close()
|
||||
|
||||
def test_client_08_unicode(self):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = testurl)
|
||||
# Try both with and without posting JSON
|
||||
for post_json in (False, True):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = testurl, post_json = post_json)
|
||||
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
client.stream_destroy(stream[0])
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
client.stream_destroy(stream[0])
|
||||
|
||||
# Database is empty
|
||||
eq_(client.stream_list(), [])
|
||||
# Database is empty
|
||||
eq_(client.stream_list(), [])
|
||||
|
||||
# Create Unicode stream, match it
|
||||
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
|
||||
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
|
||||
client.stream_create(*raw)
|
||||
eq_(client.stream_list(), [raw])
|
||||
eq_(client.stream_list(layout=raw[1]), [raw])
|
||||
eq_(client.stream_list(path=raw[0]), [raw])
|
||||
client.stream_create(*prep)
|
||||
eq_(client.stream_list(), [prep, raw])
|
||||
# Create Unicode stream, match it
|
||||
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
|
||||
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
|
||||
client.stream_create(*raw)
|
||||
eq_(client.stream_list(), [raw])
|
||||
eq_(client.stream_list(layout=raw[1]), [raw])
|
||||
eq_(client.stream_list(path=raw[0]), [raw])
|
||||
client.stream_create(*prep)
|
||||
eq_(client.stream_list(), [prep, raw])
|
||||
|
||||
# Set / get metadata with Unicode keys and values
|
||||
eq_(client.stream_get_metadata(raw[0]), {})
|
||||
eq_(client.stream_get_metadata(prep[0]), {})
|
||||
meta1 = { u"alpha": u"α",
|
||||
u"β": u"beta" }
|
||||
meta2 = { u"alpha": u"α" }
|
||||
meta3 = { u"β": u"beta" }
|
||||
client.stream_set_metadata(prep[0], meta1)
|
||||
client.stream_update_metadata(prep[0], {})
|
||||
client.stream_update_metadata(raw[0], meta2)
|
||||
client.stream_update_metadata(raw[0], meta3)
|
||||
eq_(client.stream_get_metadata(prep[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||
# Set / get metadata with Unicode keys and values
|
||||
eq_(client.stream_get_metadata(raw[0]), {})
|
||||
eq_(client.stream_get_metadata(prep[0]), {})
|
||||
meta1 = { u"alpha": u"α",
|
||||
u"β": u"beta" }
|
||||
meta2 = { u"alpha": u"α" }
|
||||
meta3 = { u"β": u"beta" }
|
||||
client.stream_set_metadata(prep[0], meta1)
|
||||
client.stream_update_metadata(prep[0], {})
|
||||
client.stream_update_metadata(raw[0], meta2)
|
||||
client.stream_update_metadata(raw[0], meta3)
|
||||
eq_(client.stream_get_metadata(prep[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||
|
||||
client.close()
|
||||
client.close()
|
||||
|
||||
def test_client_09_closing(self):
|
||||
# Make sure we actually close sockets correctly. New
|
||||
|
@@ -237,3 +237,20 @@ class TestServer(object):
|
||||
"header in response:\n", r.headers)
|
||||
eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
|
||||
eq_(r.headers["access-control-allow-headers"], "X-Custom")
|
||||
|
||||
def test_post_bodies(self):
|
||||
# Test JSON post bodies
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '{"hello": 1}')
|
||||
eq_(r.status_code, 404) # wrong parameters
|
||||
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '["hello"]')
|
||||
eq_(r.status_code, 415) # not a dict
|
||||
|
||||
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
||||
headers = { "Content-Type": "application/json" },
|
||||
data = '[hello]')
|
||||
eq_(r.status_code, 400) # badly formatted JSON
|
||||
|
Reference in New Issue
Block a user