Compare commits

...

8 Commits

Author SHA1 Message Date
fe91ff59a3 Better handling of JSON requests 2013-03-05 12:38:08 -05:00
64c24a00d6 Add --traceback argument to nilmdb-server script 2013-03-05 12:20:07 -05:00
58c0ae72f6 Support application/json POST bodies as well as x-www-form-urlencoded 2013-03-05 11:54:29 -05:00
c5f079f61f When removing data from files, try to punch a hole.
Requires fallocate(2) support with FALLOC_FL_PUNCH_HOLE, as
well as a filesystem that supports it (in Linux 3.7,
tmpfs, btrfs, xfs, or ext4)
2013-03-04 20:31:14 -05:00
2d45466f66 Print version at server startup 2013-03-04 15:43:45 -05:00
c6a0e6e96f More complete CORS handling, including preflight requests (hopefully) 2013-03-04 15:40:35 -05:00
79755dc624 Fix Allow: header by switching to cherrypy's built in tools.allow().
Replaces custom tools.allow_methods which didn't return the Allow: header.
2013-03-04 14:08:37 -05:00
c512631184 bulkdata: Build up rows and write to disk all at once 2013-03-03 12:03:44 -05:00
11 changed files with 286 additions and 110 deletions

View File

@@ -21,8 +21,12 @@ def extract_timestamp(line):
class Client(object):
"""Main client interface to the Nilm database."""
def __init__(self, url):
self.http = nilmdb.client.httpclient.HTTPClient(url)
def __init__(self, url, post_json = False):
"""Initialize client with given URL. If post_json is true,
POST requests are sent with Content-Type 'application/json'
instead of the default 'x-www-form-urlencoded'."""
self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
self.post_json = post_json
# __enter__/__exit__ allow this class to be a context manager
def __enter__(self):
@@ -31,8 +35,11 @@ class Client(object):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _json_param(self, data):
def _json_post_param(self, data):
"""Return compact json-encoded version of parameter"""
if self.post_json:
# If we're posting as JSON, we don't need to encode it further here
return data
return json.dumps(data, separators=(',',':'))
def close(self):
@@ -73,7 +80,7 @@ class Client(object):
metadata."""
params = {
"path": path,
"data": self._json_param(data)
"data": self._json_post_param(data)
}
return self.http.post("stream/set_metadata", params)
@@ -81,7 +88,7 @@ class Client(object):
"""Update stream metadata from a dictionary"""
params = {
"path": path,
"data": self._json_param(data)
"data": self._json_post_param(data)
}
return self.http.post("stream/update_metadata", params)

View File

@@ -10,7 +10,7 @@ import requests
class HTTPClient(object):
"""Class to manage and perform HTTP requests from the client"""
def __init__(self, baseurl = ""):
def __init__(self, baseurl = "", post_json = False):
"""If baseurl is supplied, all other functions that take
a URL can be given a relative URL instead."""
# Verify / clean up URL
@@ -26,6 +26,10 @@ class HTTPClient(object):
# Saved response, so that tests can verify a few things.
self._last_response = {}
# Whether to send application/json POST bodies (versus
# x-www-form-urlencoded)
self.post_json = post_json
def _handle_error(self, url, code, body):
# Default variables for exception. We use the entire body as
# the default message, in case we can't extract it from a JSON
@@ -57,13 +61,14 @@ class HTTPClient(object):
def close(self):
self.session.close()
def _do_req(self, method, url, query_data, body_data, stream):
def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url)
try:
response = self.session.request(method, url,
params = query_data,
data = body_data,
stream = stream)
stream = stream,
headers = headers)
except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url,
message = str(e.message))
@@ -77,12 +82,13 @@ class HTTPClient(object):
return (response, False)
# Normal versions that return data directly
def _req(self, method, url, query = None, body = None):
def _req(self, method, url, query = None, body = None, headers = None):
"""
Make a request and return the body data as a string or parsed
JSON object, or raise an error if it contained an error.
"""
(response, isjson) = self._do_req(method, url, query, body, False)
(response, isjson) = self._do_req(method, url, query, body,
stream = False, headers = headers)
if isjson:
return json.loads(response.content)
return response.content
@@ -93,20 +99,26 @@ class HTTPClient(object):
def post(self, url, params = None):
"""Simple POST (parameters in body)"""
return self._req("POST", url, None, params)
if self.post_json:
return self._req("POST", url, None,
json.dumps(params),
{ 'Content-type': 'application/json' })
else:
return self._req("POST", url, None, params)
def put(self, url, data, params = None):
"""Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data)
# Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None):
def _req_gen(self, method, url, query = None, body = None, headers = None):
"""
Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if
it contained an eror.
"""
(response, isjson) = self._do_req(method, url, query, body, True)
(response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
for line in response.iter_lines():
if isjson:
yield json.loads(line)

View File

@@ -28,6 +28,9 @@ def main():
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
'commits for sqlite transactions',
action = 'store_true', default = False)
group.add_argument('-t', '--traceback',
help = 'Provide tracebacks in client errors',
action = 'store_true', default = False)
group = parser.add_argument_group("Debug options")
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
@@ -49,10 +52,12 @@ def main():
server = nilmdb.server.Server(db,
host = args.address,
port = args.port,
embedded = embedded)
embedded = embedded,
force_traceback = args.traceback)
# Print info
if not args.quiet:
print "Version: %s" % nilmdb.__version__
print "Database: %s" % (os.path.realpath(args.database))
if args.address == '0.0.0.0' or args.address == '::':
host = socket.getfqdn()

View File

@@ -221,9 +221,11 @@ class File(object):
# An optimized verison of append, to avoid flushing the file
# and resizing the mmap after each data point.
try:
rows = []
for i in xrange(count):
row = dataiter.next()
self._f.write(packer(*row))
rows.append(packer(*row))
self._f.write("".join(rows))
finally:
self._f.flush()
self.size = self._f.tell()
@@ -408,8 +410,16 @@ class Table(object):
def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a
file, and potentially removing or truncating the file itself."""
# Import an existing list of deleted rows for this file
file, and potentially remove or truncate the file itself."""
# Close potentially open file in file_open LRU cache
self.file_open.cache_remove(self, subdir, filename)
# We keep a file like 0000.removed that contains a list of
# which rows have been "removed". Note that we never have to
# remove entries from this list, because we never decrease
# self.nrows, and so we will never overwrite those locations in the
# file. Only when the list covers the entire extent of the
# file will that file be removed.
datafile = os.path.join(self.root, subdir, filename)
cachefile = datafile + ".removed"
try:
@@ -463,6 +473,14 @@ class Table(object):
except:
pass
else:
# File needs to stick around. This means we can get
# degenerate cases where we have large files containing as
# little as one row. Try to punch a hole in the file,
# so that this region doesn't take up filesystem space.
offset = start * self.packer.size
count = (stop - start) * self.packer.size
nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
# Update cache. Try to do it atomically.
nilmdb.utils.atomic.replace_file(cachefile,
pickle.dumps(merged, 2))

View File

@@ -71,16 +71,59 @@ def exception_to_httperror(*expected):
# care of that.
return decorator.decorator(wrapper)
# Custom Cherrypy tools
def allow_methods(methods):
method = cherrypy.request.method.upper()
if method not in methods:
if method in cherrypy.request.methods_with_bodies:
cherrypy.request.body.read()
allowed = ', '.join(methods)
cherrypy.response.headers['Allow'] = allowed
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps
class Root(NilmApp):
@@ -140,9 +183,10 @@ class Stream(NilmApp):
# /stream/create?path=/newton/prep&layout=PrepData
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def create(self, path, layout):
"""Create a new stream in the database. Provide path
and one of the nilmdb.layout.layouts keys.
@@ -151,9 +195,10 @@ class Stream(NilmApp):
# /stream/destroy?path=/newton/prep
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
return self.db.stream_destroy(path)
@@ -184,31 +229,41 @@ class Stream(NilmApp):
# /stream/set_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def set_metadata(self, path, data):
"""Set metadata for the named stream, replacing any
existing metadata. Data should be a json-encoded
dictionary"""
data_dict = json.loads(data)
self.db.stream_set_metadata(path, data_dict)
"""Set metadata for the named stream, replacing any existing
metadata. Data can be json-encoded or a plain dictionary (if
it was sent as application/json to begin with)"""
if not isinstance(data, dict):
try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_set_metadata(path, data)
# /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@exception_to_httperror(NilmDBError, LookupError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def update_metadata(self, path, data):
"""Update metadata for the named stream. Data
should be a json-encoded dictionary"""
data_dict = json.loads(data)
self.db.stream_update_metadata(path, data_dict)
if not isinstance(data, dict):
try:
data = dict(json.loads(data))
except TypeError as e:
raise NilmDBError("can't parse 'data' parameter: " + e.message)
self.db.stream_update_metadata(path, data)
# /stream/insert?path=/newton/prep
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.allow_methods(methods = ["PUT"])
@cherrypy.tools.CORS_allow(methods = ["PUT"])
def insert(self, path, start, end):
"""
Insert new data into the database. Provide textual data
@@ -262,9 +317,10 @@ class Stream(NilmApp):
# /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
@cherrypy.tools.CORS_allow(methods = ["POST"])
def remove(self, path, start = None, end = None):
"""
Remove data from the backend database. Removes all data in
@@ -419,16 +475,20 @@ class Server(object):
'error_page.default': self.json_error_page,
})
# Send a permissive Access-Control-Allow-Origin (CORS) header
# with all responses so that browsers can send cross-domain
# requests to this server.
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
'*' })
# Some default headers to just help identify that things are working
app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
# Only allow GET and HEAD by default. Individual handlers
# can override.
app_config.update({ 'tools.allow_methods.on': True,
'tools.allow_methods.methods': ['GET', 'HEAD'] })
# Set up Cross-Origin Resource Sharing (CORS) handler so we
# can correctly respond to browsers' CORS preflight requests.
# This also limits verbs to GET and HEAD by default.
app_config.update({ 'tools.CORS_allow.on': True,
'tools.CORS_allow.methods': ['GET', 'HEAD'] })
# Configure the 'json_in' tool to also allow other content-types
# (like x-www-form-urlencoded), and to treat JSON as a dict that
# fills requests.param.
app_config.update({ 'tools.json_in.force': False,
'tools.json_in.processor': json_to_request_params })
# Send tracebacks in error responses. They're hidden by the
# error_page function for client errors (code 400-499).

View File

@@ -8,3 +8,4 @@ from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close
from nilmdb.utils import atomic
import nilmdb.utils.threadsafety
import nilmdb.utils.fallocate

49
nilmdb/utils/fallocate.py Normal file
View File

@@ -0,0 +1,49 @@
# Implementation of hole punching via fallocate, if the OS
# and filesystem support it.
try:
import os
import ctypes
import ctypes.util
def make_fallocate():
libc_name = ctypes.util.find_library('c')
libc = ctypes.CDLL(libc_name, use_errno=True)
_fallocate = libc.fallocate
_fallocate.restype = ctypes.c_int
_fallocate.argtypes = [ ctypes.c_int, ctypes.c_int,
ctypes.c_int64, ctypes.c_int64 ]
del libc
del libc_name
def fallocate(fd, mode, offset, len_):
res = _fallocate(fd, mode, offset, len_)
if res != 0: # pragma: no cover
errno = ctypes.get_errno()
raise IOError(errno, os.strerror(errno))
return fallocate
fallocate = make_fallocate()
del make_fallocate
except Exception: # pragma: no cover
fallocate = None
FALLOC_FL_KEEP_SIZE = 0x01
FALLOC_FL_PUNCH_HOLE = 0x02
def punch_hole(filename, offset, length, ignore_errors = True):
"""Punch a hole in the file. This isn't well supported, so errors
are ignored by default."""
try:
if fallocate is None: # pragma: no cover
raise IOError("fallocate not available")
with open(filename, "r+") as f:
fallocate(f.fileno(),
FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
offset, length)
except IOError: # pragma: no cover
if ignore_errors:
return
raise

View File

@@ -55,12 +55,6 @@ class TestClient(object):
client.version()
client.close()
# Trigger same error with a PUT request
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Then a fake URL on a real host
client = nilmdb.Client(url = "http://localhost:32180/fake/")
with assert_raises(nilmdb.client.ClientError):
@@ -360,52 +354,48 @@ class TestClient(object):
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())
# Make sure Access-Control-Allow-Origin gets set
if "access-control-allow-origin: " not in headers():
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in /stream/extract response:\n" +
headers())
client.close()
def test_client_08_unicode(self):
# Basic Unicode tests
client = nilmdb.Client(url = testurl)
# Try both with and without posting JSON
for post_json in (False, True):
# Basic Unicode tests
client = nilmdb.Client(url = testurl, post_json = post_json)
# Delete streams that exist
for stream in client.stream_list():
client.stream_destroy(stream[0])
# Delete streams that exist
for stream in client.stream_list():
client.stream_destroy(stream[0])
# Database is empty
eq_(client.stream_list(), [])
# Database is empty
eq_(client.stream_list(), [])
# Create Unicode stream, match it
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
client.stream_create(*raw)
eq_(client.stream_list(), [raw])
eq_(client.stream_list(layout=raw[1]), [raw])
eq_(client.stream_list(path=raw[0]), [raw])
client.stream_create(*prep)
eq_(client.stream_list(), [prep, raw])
# Create Unicode stream, match it
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
client.stream_create(*raw)
eq_(client.stream_list(), [raw])
eq_(client.stream_list(layout=raw[1]), [raw])
eq_(client.stream_list(path=raw[0]), [raw])
client.stream_create(*prep)
eq_(client.stream_list(), [prep, raw])
# Set / get metadata with Unicode keys and values
eq_(client.stream_get_metadata(raw[0]), {})
eq_(client.stream_get_metadata(prep[0]), {})
meta1 = { u"alpha": u"α",
u"β": u"beta" }
meta2 = { u"alpha": u"α" }
meta3 = { u"β": u"beta" }
client.stream_set_metadata(prep[0], meta1)
client.stream_update_metadata(prep[0], {})
client.stream_update_metadata(raw[0], meta2)
client.stream_update_metadata(raw[0], meta3)
eq_(client.stream_get_metadata(prep[0]), meta1)
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
# Set / get metadata with Unicode keys and values
eq_(client.stream_get_metadata(raw[0]), {})
eq_(client.stream_get_metadata(prep[0]), {})
meta1 = { u"alpha": u"α",
u"β": u"beta" }
meta2 = { u"alpha": u"α" }
meta3 = { u"β": u"beta" }
client.stream_set_metadata(prep[0], meta1)
client.stream_update_metadata(prep[0], {})
client.stream_update_metadata(raw[0], meta2)
client.stream_update_metadata(raw[0], meta3)
eq_(client.stream_get_metadata(prep[0]), meta1)
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close()
client.close()
def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New

View File

@@ -11,12 +11,7 @@ from nose.tools import assert_raises
import itertools
import os
import re
import shutil
import sys
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import StringIO
import shlex

View File

@@ -9,14 +9,7 @@ from nose.tools import assert_raises
import distutils.version
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import random
import unittest

View File

@@ -6,15 +6,13 @@ import distutils.version
import simplejson as json
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import time
import requests
from nilmdb.utils import serializer_proxy
@@ -208,3 +206,51 @@ class TestServer(object):
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=foo")
eq_(data, {'foo': None})
def test_cors_headers(self):
# Test that CORS headers are being set correctly
# Normal GET should send simple response
url = "http://127.0.0.1:32180/stream/list"
r = requests.get(url, headers = { "Origin": "http://google.com/" })
eq_(r.status_code, 200)
if "access-control-allow-origin" not in r.headers:
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in response:\n", r.headers)
eq_(r.headers["access-control-allow-origin"], "http://google.com/")
# OPTIONS without CORS preflight headers should result in 405
r = requests.options(url, headers = {
"Origin": "http://google.com/",
})
eq_(r.status_code, 405)
# OPTIONS with preflight headers should give preflight response
r = requests.options(url, headers = {
"Origin": "http://google.com/",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "X-Custom",
})
eq_(r.status_code, 200)
if "access-control-allow-origin" not in r.headers:
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in response:\n", r.headers)
eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
eq_(r.headers["access-control-allow-headers"], "X-Custom")
def test_post_bodies(self):
# Test JSON post bodies
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '{"hello": 1}')
eq_(r.status_code, 404) # wrong parameters
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '["hello"]')
eq_(r.status_code, 415) # not a dict
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
headers = { "Content-Type": "application/json" },
data = '[hello]')
eq_(r.status_code, 400) # badly formatted JSON