Compare commits

...

6 Commits

Author SHA1 Message Date
00e6ba1124 Avoid ENOENT in nilmdb.utils.diskusage.du
ENOENT might show up if we're actively deleting files in the nilmdb
thread while trying to read available space from e.g. the server
thread.
2013-04-10 22:25:22 -04:00
01029230c9 Tweaks to sorting 2013-04-10 19:59:38 -04:00
ecc4e5ef9d Improve test coverage 2013-04-10 19:08:05 -04:00
23f31c472b Split sort_streams_nicely into separate file 2013-04-10 19:07:58 -04:00
a1e2746360 Fix bug in nilmdb.stream_remove with max_removals 2013-04-10 18:37:21 -04:00
1c40d59a52 server: use a generator in /stream/remove
Instead of returning a single number at the end of N nilmdb calls, we
now use a generator that returns one line of text every time there's a
new count of rows removed.  This ensures that the connection will stay
alive for very long removals.
2013-04-10 18:11:58 -04:00
8 changed files with 83 additions and 31 deletions

View File

@@ -6,7 +6,6 @@ import nilmdb.utils
import nilmdb.client.httpclient import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError from nilmdb.client.errors import ClientError
import re
import time import time
import simplejson as json import simplejson as json
import contextlib import contextlib
@@ -66,12 +65,8 @@ class Client(object):
params["layout"] = layout params["layout"] = layout
if extended: if extended:
params["extended"] = 1 params["extended"] = 1
def sort_streams_nicely(x): streams = self.http.get("stream/list", params)
"""Human-friendly sort (/stream/2 before /stream/10)""" return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
def stream_get_metadata(self, path, keys = None): def stream_get_metadata(self, path, keys = None):
params = { "path": path } params = { "path": path }
@@ -122,7 +117,10 @@ class Client(object):
params["start"] = timestamp_to_string(start) params["start"] = timestamp_to_string(start)
if end is not None: if end is not None:
params["end"] = timestamp_to_string(end) params["end"] = timestamp_to_string(end)
return self.http.post("stream/remove", params) total = 0
for count in self.http.post_gen("stream/remove", params):
total += int(count)
return total
@contextlib.contextmanager @contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None): def stream_insert_context(self, path, start = None, end = None):

View File

@@ -137,5 +137,14 @@ class HTTPClient(object):
"""Simple GET (parameters in URL) returning a generator""" """Simple GET (parameters in URL) returning a generator"""
return self._req_gen("GET", url, params, binary = binary) return self._req_gen("GET", url, params, binary = binary)
def post_gen(self, url, params = None):
"""Simple POST (parameters in body) returning a generator"""
if self.post_json:
return self._req_gen("POST", url, None,
json.dumps(params),
{ 'Content-type': 'application/json' })
else:
return self._req_gen("POST", url, None, params)
# Not much use for a POST or PUT generator, since they don't # Not much use for a POST or PUT generator, since they don't
# return much data. # return much data.

View File

@@ -675,6 +675,7 @@ class NilmDB(object):
# Count how many were removed # Count how many were removed
removed += row_end - row_start removed += row_end - row_start
remaining -= row_end - row_start
if restart is not None: if restart is not None:
break break

View File

@@ -347,24 +347,34 @@ class Stream(NilmApp):
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
@chunked_response
@response_type("application/x-json-stream")
def remove(self, path, start = None, end = None): def remove(self, path, start = None, end = None):
""" """
Remove data from the backend database. Removes all data in Remove data from the backend database. Removes all data in
the interval [start, end). Returns the number of data points the interval [start, end).
removed.
Returns the number of data points removed. Since this is a potentially
long-running operation, multiple numbers may be returned as the
data gets removed from the backend database. The total number of
points removed is the sum of all of these numbers.
""" """
(start, end) = self._get_times(start, end) (start, end) = self._get_times(start, end)
total_removed = 0
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
@workaround_cp_bug_1200
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
while True: while True:
(removed, restart) = self.db.stream_remove(path, start, end) (removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed yield json.dumps(removed) + "\r\n"
if restart is None: if restart is None:
break break
start = restart start = restart
return total_removed return content(start, end)
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0

View File

@@ -13,3 +13,4 @@ import nilmdb.utils.time
import nilmdb.utils.iterator import nilmdb.utils.iterator
import nilmdb.utils.interval import nilmdb.utils.interval
import nilmdb.utils.lock import nilmdb.utils.lock
import nilmdb.utils.sort

View File

@@ -1,4 +1,5 @@
import os import os
import errno
from math import log from math import log
def human_size(num): def human_size(num):
@@ -16,10 +17,17 @@ def human_size(num):
return '1 byte' return '1 byte'
def du(path): def du(path):
"""Like du -sb, returns total size of path in bytes.""" """Like du -sb, returns total size of path in bytes. Ignore
errors that might occur if we encounter broken symlinks or
files in the process of being removed."""
try:
size = os.path.getsize(path) size = os.path.getsize(path)
if os.path.isdir(path): if os.path.isdir(path):
for thisfile in os.listdir(path): for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile) filepath = os.path.join(path, thisfile)
size += du(filepath) size += du(filepath)
return size return size
except OSError as e: # pragma: no cover
if e.errno != errno.ENOENT:
raise
return 0

18
nilmdb/utils/sort.py Normal file
View File

@@ -0,0 +1,18 @@
import re
def sort_human(items, key = None):
"""Human-friendly sort (/stream/2 before /stream/10)"""
def to_num(val):
try:
return int(val)
except Exception:
return val
def human_key(text):
if key:
text = key(text)
# Break into character and numeric chunks.
chunks = re.split(r'([0-9]+)', text)
return [ to_num(c) for c in chunks ]
return sorted(items, key = human_key)

View File

@@ -105,16 +105,19 @@ class TestClient(object):
client.http.post("/stream/list") client.http.post("/stream/list")
client = nilmdb.client.Client(url = testurl) client = nilmdb.client.Client(url = testurl)
# Create three streams # Create four streams
client.stream_create("/newton/prep", "float32_8") client.stream_create("/newton/prep", "float32_8")
client.stream_create("/newton/raw", "uint16_6") client.stream_create("/newton/raw", "uint16_6")
client.stream_create("/newton/zzz/rawnotch", "uint16_9") client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
# Verify we got 3 streams # Verify we got 4 streams in the right order
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"], eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
["/newton/raw", "uint16_6"], ["/newton/raw", "uint16_6"],
["/newton/zzz/rawnotch", "uint16_9"] ["/newton/zzz/rawnotch2", "uint16_9"],
["/newton/zzz/rawnotch11", "uint16_9"]
]) ])
# Match just one type or one path # Match just one type or one path
eq_(client.stream_list(layout="uint16_6"), eq_(client.stream_list(layout="uint16_6"),
[ ["/newton/raw", "uint16_6"] ]) [ ["/newton/raw", "uint16_6"] ])
@@ -327,6 +330,10 @@ class TestClient(object):
2525.169921875, 8350.83984375, 3724.699951171875, 2525.169921875, 8350.83984375, 3724.699951171875,
1355.3399658203125, 2039.0)) 1355.3399658203125, 2039.0))
# Just get some coverage
with assert_raises(ClientError) as e:
client.http.post("/stream/remove", { "path": "/none" })
client.close() client.close()
def test_client_06_generators(self): def test_client_06_generators(self):