Compare commits
6 Commits
nilmdb-1.4
...
nilmdb-1.4
Author | SHA1 | Date | |
---|---|---|---|
5292319802 | |||
173121ca87 | |||
26bab031bd | |||
b5fefffa09 | |||
dccb3e370a | |||
95ca55aa7e |
@@ -19,8 +19,9 @@ Then, set up Apache with a configuration like:
|
|||||||
|
|
||||||
<VirtualHost>
|
<VirtualHost>
|
||||||
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
|
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
|
||||||
WSGIProcessGroup nilmdb-server
|
WSGIApplicationGroup nilmdb-appgroup
|
||||||
WSGIDaemonProcess nilmdb-server threads=32 user=nilm group=nilm
|
WSGIProcessGroup nilmdb-procgroup
|
||||||
|
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
|
||||||
|
|
||||||
# Access control example:
|
# Access control example:
|
||||||
<Location /nilmdb>
|
<Location /nilmdb>
|
||||||
|
@@ -313,6 +313,11 @@ class StreamInserter(object):
|
|||||||
part of a new interval and there may be a gap left in-between."""
|
part of a new interval and there may be a gap left in-between."""
|
||||||
self._send_block(final = True)
|
self._send_block(final = True)
|
||||||
|
|
||||||
|
def send(self):
|
||||||
|
"""Send any data that we might have buffered up. Does not affect
|
||||||
|
any other treatment of timestamps or endpoints."""
|
||||||
|
self._send_block(final = False)
|
||||||
|
|
||||||
def _get_first_noncomment(self, block):
|
def _get_first_noncomment(self, block):
|
||||||
"""Return the (start, end) indices of the first full line in
|
"""Return the (start, end) indices of the first full line in
|
||||||
block that isn't a comment, or raise IndexError if
|
block that isn't a comment, or raise IndexError if
|
||||||
|
@@ -11,6 +11,7 @@ from nilmdb.utils.time import string_to_timestamp
|
|||||||
import cherrypy
|
import cherrypy
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import socket
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import decorator
|
import decorator
|
||||||
import psutil
|
import psutil
|
||||||
@@ -173,6 +174,21 @@ class Root(NilmApp):
|
|||||||
class Stream(NilmApp):
|
class Stream(NilmApp):
|
||||||
"""Stream-specific operations"""
|
"""Stream-specific operations"""
|
||||||
|
|
||||||
|
# Helpers
|
||||||
|
def _get_times(self, start_param, end_param):
|
||||||
|
(start, end) = (None, None)
|
||||||
|
if start_param is not None:
|
||||||
|
start = string_to_timestamp(start_param)
|
||||||
|
if end_param is not None:
|
||||||
|
end = string_to_timestamp(end_param)
|
||||||
|
if start is not None and end is not None:
|
||||||
|
if start >= end:
|
||||||
|
raise cherrypy.HTTPError(
|
||||||
|
"400 Bad Request",
|
||||||
|
sprintf("start must precede end (%s >= %s)",
|
||||||
|
start_param, end_param))
|
||||||
|
return (start, end)
|
||||||
|
|
||||||
# /stream/list
|
# /stream/list
|
||||||
# /stream/list?layout=float32_8
|
# /stream/list?layout=float32_8
|
||||||
# /stream/list?path=/newton/prep&extended=1
|
# /stream/list?path=/newton/prep&extended=1
|
||||||
@@ -306,11 +322,7 @@ class Stream(NilmApp):
|
|||||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||||
|
|
||||||
# Check limits
|
# Check limits
|
||||||
start = string_to_timestamp(start)
|
(start, end) = self._get_times(start, end)
|
||||||
end = string_to_timestamp(end)
|
|
||||||
if start >= end:
|
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
|
||||||
"start must precede end")
|
|
||||||
|
|
||||||
# Pass the data directly to nilmdb, which will parse it and
|
# Pass the data directly to nilmdb, which will parse it and
|
||||||
# raise a ValueError if there are any problems.
|
# raise a ValueError if there are any problems.
|
||||||
@@ -332,14 +344,7 @@ class Stream(NilmApp):
|
|||||||
the interval [start, end). Returns the number of data points
|
the interval [start, end). Returns the number of data points
|
||||||
removed.
|
removed.
|
||||||
"""
|
"""
|
||||||
if start is not None:
|
(start, end) = self._get_times(start, end)
|
||||||
start = string_to_timestamp(start)
|
|
||||||
if end is not None:
|
|
||||||
end = string_to_timestamp(end)
|
|
||||||
if start is not None and end is not None:
|
|
||||||
if start >= end:
|
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
|
||||||
"start must precede end")
|
|
||||||
total_removed = 0
|
total_removed = 0
|
||||||
while True:
|
while True:
|
||||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||||
@@ -370,15 +375,7 @@ class Stream(NilmApp):
|
|||||||
Note that the response type is the non-standard
|
Note that the response type is the non-standard
|
||||||
'application/x-json-stream' for lack of a better option.
|
'application/x-json-stream' for lack of a better option.
|
||||||
"""
|
"""
|
||||||
if start is not None:
|
(start, end) = self._get_times(start, end)
|
||||||
start = string_to_timestamp(start)
|
|
||||||
if end is not None:
|
|
||||||
end = string_to_timestamp(end)
|
|
||||||
|
|
||||||
if start is not None and end is not None:
|
|
||||||
if start >= end:
|
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
|
||||||
"start must precede end")
|
|
||||||
|
|
||||||
if len(self.db.stream_list(path = path)) != 1:
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
@@ -416,16 +413,7 @@ class Stream(NilmApp):
|
|||||||
If 'markup' is True, adds comments to the stream denoting each
|
If 'markup' is True, adds comments to the stream denoting each
|
||||||
interval's start and end timestamp.
|
interval's start and end timestamp.
|
||||||
"""
|
"""
|
||||||
if start is not None:
|
(start, end) = self._get_times(start, end)
|
||||||
start = string_to_timestamp(start)
|
|
||||||
if end is not None:
|
|
||||||
end = string_to_timestamp(end)
|
|
||||||
|
|
||||||
# Check parameters
|
|
||||||
if start is not None and end is not None:
|
|
||||||
if start >= end:
|
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
|
||||||
"start must precede end")
|
|
||||||
|
|
||||||
# Check path and get layout
|
# Check path and get layout
|
||||||
streams = self.db.stream_list(path = path)
|
streams = self.db.stream_list(path = path)
|
||||||
@@ -608,6 +596,11 @@ class Server(object):
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
cherrypy.engine.exit()
|
cherrypy.engine.exit()
|
||||||
|
|
||||||
|
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
|
||||||
|
# instance since the database can only be opened once. For this to
|
||||||
|
# work, the web server must use only a single process and single
|
||||||
|
# Python interpreter. Multiple threads are OK.
|
||||||
|
_wsgi_server = None
|
||||||
def wsgi_application(dbpath, basepath): # pragma: no cover
|
def wsgi_application(dbpath, basepath): # pragma: no cover
|
||||||
"""Return a WSGI application object with a database at the
|
"""Return a WSGI application object with a database at the
|
||||||
specified path.
|
specified path.
|
||||||
@@ -618,29 +611,33 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
|
|||||||
is the same as the first argument to Apache's WSGIScriptAlias
|
is the same as the first argument to Apache's WSGIScriptAlias
|
||||||
directive.
|
directive.
|
||||||
"""
|
"""
|
||||||
server = [None]
|
|
||||||
def application(environ, start_response):
|
def application(environ, start_response):
|
||||||
if server[0] is None:
|
global _wsgi_server
|
||||||
|
if _wsgi_server is None:
|
||||||
# Try to start the server
|
# Try to start the server
|
||||||
try:
|
try:
|
||||||
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
|
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
|
||||||
server[0] = nilmdb.server.Server(
|
_wsgi_server = nilmdb.server.Server(
|
||||||
db, embedded = True,
|
db, embedded = True,
|
||||||
basepath = basepath.rstrip('/'))
|
basepath = basepath.rstrip('/'))
|
||||||
except Exception:
|
except Exception:
|
||||||
# Build an error message on failure
|
# Build an error message on failure
|
||||||
|
import pprint
|
||||||
err = sprintf("Initializing database at path '%s' failed:\n\n",
|
err = sprintf("Initializing database at path '%s' failed:\n\n",
|
||||||
dbpath)
|
dbpath)
|
||||||
err += traceback.format_exc()
|
err += traceback.format_exc()
|
||||||
try:
|
try:
|
||||||
import pwd
|
import pwd
|
||||||
import grp
|
import grp
|
||||||
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s)\n",
|
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
|
||||||
|
"on host %s, pid %d\n",
|
||||||
os.getuid(), pwd.getpwuid(os.getuid())[0],
|
os.getuid(), pwd.getpwuid(os.getuid())[0],
|
||||||
os.getgid(), grp.getgrgid(os.getgid())[0])
|
os.getgid(), grp.getgrgid(os.getgid())[0],
|
||||||
|
socket.gethostname(), os.getpid())
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
if server[0] is None:
|
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
|
||||||
|
if _wsgi_server is None:
|
||||||
# Serve up the error with our own mini WSGI app.
|
# Serve up the error with our own mini WSGI app.
|
||||||
headers = [ ('Content-type', 'text/plain'),
|
headers = [ ('Content-type', 'text/plain'),
|
||||||
('Content-length', str(len(err))) ]
|
('Content-length', str(len(err))) ]
|
||||||
@@ -648,5 +645,5 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
|
|||||||
return [err]
|
return [err]
|
||||||
|
|
||||||
# Call the normal application
|
# Call the normal application
|
||||||
return server[0].wsgi_application(environ, start_response)
|
return _wsgi_server.wsgi_application(environ, start_response)
|
||||||
return application
|
return application
|
||||||
|
@@ -311,11 +311,11 @@ class TestClient(object):
|
|||||||
|
|
||||||
# Trigger a curl error in generator
|
# Trigger a curl error in generator
|
||||||
with assert_raises(ServerError) as e:
|
with assert_raises(ServerError) as e:
|
||||||
client.http.get_gen("http://nosuchurl/").next()
|
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||||
|
|
||||||
# Trigger a curl error in generator
|
# Trigger a curl error in generator
|
||||||
with assert_raises(ServerError) as e:
|
with assert_raises(ServerError) as e:
|
||||||
client.http.get_gen("http://nosuchurl/").next()
|
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||||
|
|
||||||
# Check 404 for missing streams
|
# Check 404 for missing streams
|
||||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||||
@@ -460,6 +460,7 @@ class TestClient(object):
|
|||||||
ctx.update_start(109)
|
ctx.update_start(109)
|
||||||
ctx.insert("110 1\n")
|
ctx.insert("110 1\n")
|
||||||
ctx.insert("111 1\n")
|
ctx.insert("111 1\n")
|
||||||
|
ctx.send()
|
||||||
ctx.insert("112 1\n")
|
ctx.insert("112 1\n")
|
||||||
ctx.insert("113 1\n")
|
ctx.insert("113 1\n")
|
||||||
ctx.insert("114 1\n")
|
ctx.insert("114 1\n")
|
||||||
|
Reference in New Issue
Block a user