Compare commits

...

9 Commits

12 changed files with 189 additions and 50 deletions

View File

@@ -58,6 +58,11 @@ class Client(object):
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None, extended = False):
"""Return a sorted list of [path, layout] lists. If 'path' or
'layout' are specified, only return streams that match those
exact values. If 'extended' is True, the returned lists have
extended info, e.g.: [path, layout, extent_min, extent_max,
total_rows, total_seconds."""
params = {}
if path is not None:
params["path"] = path
@@ -69,6 +74,7 @@ class Client(object):
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
def stream_get_metadata(self, path, keys = None):
"""Get stream metadata"""
params = { "path": path }
if keys is not None:
params["key"] = keys

View File

@@ -1,5 +1,6 @@
from nilmdb.utils.printf import *
import nilmdb.utils.time
from nilmdb.utils.interval import Interval
import fnmatch
import argparse
@@ -42,6 +43,8 @@ def setup(self, sub):
group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps when printing times")
group.add_argument("-o", "--optimize", action="store_true",
help="Optimize (merge adjacent) intervals")
return cmd
@@ -58,9 +61,16 @@ def cmd_intervals(self):
time_string = nilmdb.utils.time.timestamp_to_human
try:
for (start, end) in self.client.stream_intervals(
self.args.path, self.args.start, self.args.end, self.args.diff):
printf("[ %s -> %s ]\n", time_string(start), time_string(end))
intervals = ( Interval(start, end) for (start, end) in
self.client.stream_intervals(self.args.path,
self.args.start,
self.args.end,
self.args.diff) )
if self.args.optimize:
intervals = nilmdb.utils.interval.optimize(intervals)
for i in intervals:
printf("[ %s -> %s ]\n", time_string(i.start), time_string(i.end))
except nilmdb.client.ClientError as e:
self.die("error listing intervals: %s", str(e))

View File

@@ -27,6 +27,7 @@ from nilmdb.server.serverutil import (
json_error_page,
cherrypy_start,
cherrypy_stop,
bool_param,
)
# Add CORS_allow tool
@@ -221,6 +222,8 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
# Note that CherryPy 3.2.2 has a bug where this fails for GET
@@ -345,6 +348,10 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
markup = bool_param(markup)
count = bool_param(count)
(start, end) = self._get_times(start, end)
# Check path and get layout

View File

@@ -7,6 +7,21 @@ import os
import decorator
import simplejson as json
# Helper to parse parameters into booleans
def bool_param(s):
"""Return a bool indicating whether parameter 's' was True or False,
supporting a few different types for 's'."""
try:
ss = s.lower()
if ss in [ "0", "false", "f", "no", "n" ]:
return False
if ss in [ "1", "true", "t", "yes", "y" ]:
return True
except Exception:
return bool(s)
raise cherrypy.HTTPError("400 Bad Request",
"can't parse parameter: " + ss)
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""

View File

@@ -58,18 +58,11 @@ class Interval:
raise IntervalError("not a subset")
return Interval(start, end)
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
def _interval_math_helper(a, b, op, subset = True):
"""Helper for set_difference, intersection functions,
to compute interval subsets based on a math operator on ranges
present in A and B. Subsets are computed from A, or new intervals
are generated if subset = False."""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
@@ -84,31 +77,57 @@ def set_difference(a, b):
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
a_interval = None
b_interval = None
in_a = False
in_b = False
out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0:
# start a interval
a_interval = i
if b_interval is None:
out_start = ts
in_a = True
elif k == 1:
# start b interval
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
in_b = True
elif k == 2:
# end a interval
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
in_a = False
elif k == 3:
# end b interval
b_interval = None
if a_interval:
in_b = False
include = op(in_a, in_b)
if include and out_start is None:
out_start = ts
elif not include:
if out_start is not None and out_start != ts:
if subset:
yield a_interval.subset(out_start, ts)
else:
yield Interval(out_start, ts)
out_start = None
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and not b))
def intersection(a, b):
"""
Compute the intersection between the intervals in 'a' and the
intervals in 'b'; i.e., the ranges that are present in both 'a'
and 'b'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and b))
def optimize(it):
"""

View File

@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
r = SerializerCallProxy(self.__call_queue, attr, self)
return r
# For an interable object, on __iter__(), save the object's
# iterator and return this proxy. On next(), call the object's
# iterator through this proxy.
def __iter__(self):
attr = getattr(self.__object, "__iter__")
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
return self
def next(self):
return SerializerCallProxy(self.__call_queue,
self.__iter.next, self)()
def __getitem__(self, key):
return self.__getattr__("__getitem__")(key)
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through."""

View File

@@ -60,7 +60,7 @@ def rate_to_period(hz, cycles = 1):
def parse_time(toparse):
"""
Parse a free-form time string and return a nilmdb timestamp
(integer seconds since epoch). If the string doesn't contain a
(integer microseconds since epoch). If the string doesn't contain a
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""

View File

@@ -1,7 +1,14 @@
import sys
if sys.version_info[0] >= 3: # pragma: no cover (future Python3 compat)
text_type = str
else:
text_type = unicode
def encode(u):
"""Try to encode something from Unicode to a string using the
default encoding. If it fails, try encoding as UTF-8."""
if not isinstance(u, unicode):
if not isinstance(u, text_type):
return u
try:
return u.encode()
@@ -11,7 +18,7 @@ def encode(u):
def decode(s):
"""Try to decode someting from string to Unicode using the
default encoding. If it fails, try decoding as UTF-8."""
if isinstance(s, unicode):
if isinstance(s, text_type):
return s
try:
return s.decode()

View File

@@ -354,10 +354,6 @@ class TestClient(object):
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
with assert_raises(ClientError) as e:
@@ -396,20 +392,16 @@ class TestClient(object):
headers())
# Extract
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124" })
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: text/plain;charset=utf-8" not in headers():
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124",
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
@@ -417,6 +409,21 @@ class TestClient(object):
raise AssertionError("/stream/extract is not binary:\n" +
headers())
# Make sure a binary of "0" is really off
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "0" })
if "content-type: application/octet-stream" in headers():
raise AssertionError("/stream/extract is not text:\n" +
headers())
# Invalid parameters
with assert_raises(ClientError) as e:
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "asdfasfd" })
in_("can't parse parameter", str(e.exception))
client.close()
def test_client_08_unicode(self):

View File

@@ -1011,6 +1011,18 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
# optimize
self.ok("insert -s 01-01-2002 -e 01-01-2004 /diff/1 /dev/null")
self.ok("intervals /diff/1")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Thu, 01 Jan 2004 00:00:00.000000 +0000 ]\n"
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("intervals /diff/1 --optimize")
self.ok("intervals /diff/1 -o")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2")

View File

@@ -234,13 +234,16 @@ class TestInterval:
x = makeset("[--)") & 1234
def do_test(a, b, c, d):
# a & b == c
# a & b == c (using nilmdb.server.interval)
ab = IntervalSet()
for x in b:
for i in (a & x):
ab += i
eq_(ab,c)
# a & b == c (using nilmdb.utils.interval)
eq_(IntervalSet(nilmdb.utils.interval.intersection(a,b)), c)
# a \ b == d
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
@@ -310,6 +313,17 @@ class TestInterval:
eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Fill out test coverage for non-subsets
def diff2(a,b, subset):
return nilmdb.utils.interval._interval_math_helper(
a, b, (lambda a, b: b and not a), subset=subset)
with assert_raises(nilmdb.utils.interval.IntervalError):
list(diff2(a,b,True))
list(diff2(a,b,False))
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)

View File

@@ -62,6 +62,28 @@ class Base(object):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)
class ListLike(object):
def __init__(self):
self.thread = threading.current_thread().name
self.foo = 0
def __iter__(self):
eq_(threading.current_thread().name, self.thread)
self.foo = 0
return self
def __getitem__(self, key):
eq_(threading.current_thread().name, self.thread)
return key
def next(self):
eq_(threading.current_thread().name, self.thread)
if self.foo < 5:
self.foo += 1
return self.foo
else:
raise StopIteration
class TestUnserialized(Base):
def setUp(self):
self.foo = Foo()
@@ -84,3 +106,9 @@ class TestSerializer(Base):
sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()
def test_iter(self):
sp = nilmdb.utils.serializer_proxy
i = sp(ListLike)()
eq_(list(i), [1,2,3,4,5])
eq_(i[3], 3)