Compare commits

...

6 Commits

Author SHA1 Message Date
5292319802 server: consolidate time processing and checks 2013-03-30 21:16:40 -04:00
173121ca87 Switch URL to one that should definitely not resolve 2013-03-30 17:31:35 -04:00
26bab031bd Add StreamInserter.send() to trigger intermediate block send 2013-03-30 17:30:43 -04:00
b5fefffa09 Use a global cached server object for WSGI app
This is instead of caching it inside nilmdb.server.wsgi_application.
Might make things work a bit better in case the web server decides
to call wsgi_application multiple times.
2013-03-30 15:56:57 -04:00
dccb3e370a WSGI config needs to specify application group
This ensures that the same Python sub-instance handles the request,
even if it's coming in from two different virtual hosts.
2013-03-30 15:56:02 -04:00
95ca55aa7e Print out WSGI environment on DB init failure 2013-03-30 15:55:41 -04:00
4 changed files with 47 additions and 43 deletions

View File

@@ -19,8 +19,9 @@ Then, set up Apache with a configuration like:
<VirtualHost> <VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIProcessGroup nilmdb-server WSGIApplicationGroup nilmdb-appgroup
WSGIDaemonProcess nilmdb-server threads=32 user=nilm group=nilm WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example: # Access control example:
<Location /nilmdb> <Location /nilmdb>

View File

@@ -313,6 +313,11 @@ class StreamInserter(object):
part of a new interval and there may be a gap left in-between.""" part of a new interval and there may be a gap left in-between."""
self._send_block(final = True) self._send_block(final = True)
def send(self):
"""Send any data that we might have buffered up. Does not affect
any other treatment of timestamps or endpoints."""
self._send_block(final = False)
def _get_first_noncomment(self, block): def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in """Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if block that isn't a comment, or raise IndexError if

View File

@@ -11,6 +11,7 @@ from nilmdb.utils.time import string_to_timestamp
import cherrypy import cherrypy
import sys import sys
import os import os
import socket
import simplejson as json import simplejson as json
import decorator import decorator
import psutil import psutil
@@ -173,6 +174,21 @@ class Root(NilmApp):
class Stream(NilmApp): class Stream(NilmApp):
"""Stream-specific operations""" """Stream-specific operations"""
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
if start_param is not None:
start = string_to_timestamp(start_param)
if end_param is not None:
end = string_to_timestamp(end_param)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(
"400 Bad Request",
sprintf("start must precede end (%s >= %s)",
start_param, end_param))
return (start, end)
# /stream/list # /stream/list
# /stream/list?layout=float32_8 # /stream/list?layout=float32_8
# /stream/list?path=/newton/prep&extended=1 # /stream/list?path=/newton/prep&extended=1
@@ -306,11 +322,7 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404 Not Found", "No such stream") raise cherrypy.HTTPError("404 Not Found", "No such stream")
# Check limits # Check limits
start = string_to_timestamp(start) (start, end) = self._get_times(start, end)
end = string_to_timestamp(end)
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
# Pass the data directly to nilmdb, which will parse it and # Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems. # raise a ValueError if there are any problems.
@@ -332,14 +344,7 @@ class Stream(NilmApp):
the interval [start, end). Returns the number of data points the interval [start, end). Returns the number of data points
removed. removed.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
total_removed = 0 total_removed = 0
while True: while True:
(removed, restart) = self.db.stream_remove(path, start, end) (removed, restart) = self.db.stream_remove(path, start, end)
@@ -370,15 +375,7 @@ class Stream(NilmApp):
Note that the response type is the non-standard Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option. 'application/x-json-stream' for lack of a better option.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if len(self.db.stream_list(path = path)) != 1: if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path) raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -416,16 +413,7 @@ class Stream(NilmApp):
If 'markup' is True, adds comments to the stream denoting each If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp. interval's start and end timestamp.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
# Check parameters
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) streams = self.db.stream_list(path = path)
@@ -608,6 +596,11 @@ class Server(object):
def stop(self): def stop(self):
cherrypy.engine.exit() cherrypy.engine.exit()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to
# work, the web server must use only a single process and single
# Python interpreter. Multiple threads are OK.
_wsgi_server = None
def wsgi_application(dbpath, basepath): # pragma: no cover def wsgi_application(dbpath, basepath): # pragma: no cover
"""Return a WSGI application object with a database at the """Return a WSGI application object with a database at the
specified path. specified path.
@@ -618,29 +611,33 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
is the same as the first argument to Apache's WSGIScriptAlias is the same as the first argument to Apache's WSGIScriptAlias
directive. directive.
""" """
server = [None]
def application(environ, start_response): def application(environ, start_response):
if server[0] is None: global _wsgi_server
if _wsgi_server is None:
# Try to start the server # Try to start the server
try: try:
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath) db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
server[0] = nilmdb.server.Server( _wsgi_server = nilmdb.server.Server(
db, embedded = True, db, embedded = True,
basepath = basepath.rstrip('/')) basepath = basepath.rstrip('/'))
except Exception: except Exception:
# Build an error message on failure # Build an error message on failure
import pprint
err = sprintf("Initializing database at path '%s' failed:\n\n", err = sprintf("Initializing database at path '%s' failed:\n\n",
dbpath) dbpath)
err += traceback.format_exc() err += traceback.format_exc()
try: try:
import pwd import pwd
import grp import grp
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s)\n", err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
"on host %s, pid %d\n",
os.getuid(), pwd.getpwuid(os.getuid())[0], os.getuid(), pwd.getpwuid(os.getuid())[0],
os.getgid(), grp.getgrgid(os.getgid())[0]) os.getgid(), grp.getgrgid(os.getgid())[0],
socket.gethostname(), os.getpid())
except ImportError: except ImportError:
pass pass
if server[0] is None: err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
if _wsgi_server is None:
# Serve up the error with our own mini WSGI app. # Serve up the error with our own mini WSGI app.
headers = [ ('Content-type', 'text/plain'), headers = [ ('Content-type', 'text/plain'),
('Content-length', str(len(err))) ] ('Content-length', str(len(err))) ]
@@ -648,5 +645,5 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
return [err] return [err]
# Call the normal application # Call the normal application
return server[0].wsgi_application(environ, start_response) return _wsgi_server.wsgi_application(environ, start_response)
return application return application

View File

@@ -311,11 +311,11 @@ class TestClient(object):
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams # Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]: for function in [ client.stream_intervals, client.stream_extract ]:
@@ -460,6 +460,7 @@ class TestClient(object):
ctx.update_start(109) ctx.update_start(109)
ctx.insert("110 1\n") ctx.insert("110 1\n")
ctx.insert("111 1\n") ctx.insert("111 1\n")
ctx.send()
ctx.insert("112 1\n") ctx.insert("112 1\n")
ctx.insert("113 1\n") ctx.insert("113 1\n")
ctx.insert("114 1\n") ctx.insert("114 1\n")