Browse Source

Add IntervalSet.intersection(). This returns a generator that allows

us to look at just some of the intervals without having to reconstruct
an entire IntervalSet class -- which greatly reduces server load when
handling requests that cover large interval ranges.

Add Client.get and Client.put, analogous to getjson and putjson but
without parsing the result as json.

Add Client.stream_extract.  Still needs server side love.

Allow Cmdline subcommands to provide a return value that turns into
the exit code.

More work on cmdline.extract.


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10851 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 12 years ago
parent
commit
e3be1a1d8a
7 changed files with 80 additions and 27 deletions
  1. +40
    -7
      nilmdb/client.py
  2. +2
    -2
      nilmdb/cmdline/cmdline.py
  3. +16
    -3
      nilmdb/cmdline/extract.py
  4. +18
    -9
      nilmdb/interval.py
  5. +1
    -1
      nilmdb/nilmdb.py
  6. +1
    -1
      nilmdb/server.py
  7. +2
    -4
      tests/test_cmdline.py

+ 40
- 7
nilmdb/client.py View File

@@ -89,8 +89,8 @@ class MyCurl(object):
else:
raise Error(**args)

def _reqjson(self, url, params):
"""GET or POST that returns JSON string"""
def _req(self, url, params):
"""GET or POST that returns raw data"""
self._setup_url(url, params)
body = cStringIO.StringIO()
self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
@@ -102,23 +102,31 @@ class MyCurl(object):
message = e[1])
body_str = body.getvalue()
self._check_error(body_str)
return json.loads(body_str)
return body_str

def close(self):
self.curl.close()

def getjson(self, url, params = None):
"""Simple GET that returns JSON string"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._reqjson(url, params)
return json.loads(self.get(url, params))

def putjson(self, url, postdata, params = None):
"""Simple PUT that returns JSON string"""
"""Simple GET that returns JSON string"""
return json.loads(self.put(url, postdata, params))

def get(self, url, params = None):
"""Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._req(url, params)

def put(self, url, postdata, params = None):
"""Simple PUT"""
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.UPLOAD, 1)
self.curl.setopt(pycurl.READFUNCTION, data.read)
return self._reqjson(url, params)
return self._req(url, params)

class Client(object):
"""Main client interface to the Nilm database."""
@@ -238,3 +246,28 @@ class Client(object):
# Restart where we left off
params["start"] = repr(intervals[-1][1])

def stream_extract(self, path, start = None, end = None, bare = False):
"""
Return a generator that yields each line of data from the stream.
Multiple requests are made to the server if the results
get truncated.
"""
params = {
"path": path
}
if start is not None:
params["start"] = repr(start) # keep full precision
if end is not None:
params["end"] = repr(end)
params["bare"] = bare

more = True
while more:
self.curl.get
(intervals, more) = self.curl.getjson("stream/intervals", params)
for interval in intervals:
yield interval
if more: # pragma: no cover (harder to test; manually checked)
# Restart where we left off
params["start"] = repr(intervals[-1][1])


+ 2
- 2
nilmdb/cmdline/cmdline.py View File

@@ -143,7 +143,7 @@ class Cmdline(object):
# Now dispatch client request to appropriate function. Parser
# should have ensured that we don't have any unknown commands
# here.
self.args.handler(self)
retval = self.args.handler(self) or 0

self.client.close()
sys.exit(0)
sys.exit(retval)

+ 16
- 3
nilmdb/cmdline/extract.py View File

@@ -25,7 +25,7 @@ def setup(self, sub):
help="Exclude timestamps from output lines")
group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information "
"about the stream and intervals")
"about the stream")

def cmd_extract(self):
streams = self.client.stream_list(self.args.path)
@@ -39,5 +39,18 @@ def cmd_extract(self):
printf("# start: %s\n", self.time_string(self.args.start))
printf("# end: %s\n", self.time_string(self.args.end))

print "your layout is " + layout
raise Exception
return 0

printed = False
for data in self.client.stream_extract(self.args.path,
self.args.start,
self.args.end,
bare = self.args.bare):
sys.stdout.write(data)
printed = True
if not printed:
if self.args.annotate:
printf("# no data!\n")
return 2

return 0

+ 18
- 9
nilmdb/interval.py View File

@@ -45,7 +45,7 @@ class Interval(bxintersect.Interval):
def subset(self, start, end):
"""Return a new Interval that is a subset of this one"""
# A subclass that tracks additional data might override this.
if start < (self.start) or end > self.end:
if start < self.start or end > self.end:
raise IntervalError("not a subset")
return Interval(start, end)

@@ -163,16 +163,25 @@ class IntervalSet(object):
other = [ other ]

for x in other:
# Intersecting with a just a single interval.
all_intersecting = self.tree.find(x.start, x.end)
for i in all_intersecting:
if i.start == x.start and i.end == x.end:
out.tree.insert_interval(i)
else:
out.tree.insert_interval(i.subset(max(i.start, x.start),
min(i.end, x.end)))
for i in self.intersection(x):
out.tree.insert_interval(i)

return out

def intersection(self, interval):
"""
Compute a sequence of intervals that correspond to the
intersection between `self` and the provided interval.
Returns a generator that yields each of these intervals
in turn.
"""
for i in self.tree.find(interval.start, interval.end):
if i.start > interval.start and i.end < interval.end:
yield i
else:
yield i.subset(max(i.start, interval.start),
min(i.end, interval.end))

def intersects(self, other):
"""Return True if this IntervalSet intersects another interval"""
return len(self.tree.find(other.start, other.end)) > 0

+ 1
- 1
nilmdb/nilmdb.py View File

@@ -238,7 +238,7 @@ class NilmDB(object):
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
result = []
for n, i in enumerate(intervals & requested):
for n, i in enumerate(intervals.intersection(requested)):
if n >= MAX_RESULTS: # pragma: no cover (hard to hit in testsuite)
truncated = True
break


+ 1
- 1
nilmdb/server.py View File

@@ -186,7 +186,7 @@ class Stream(NilmApp):
start = float(start)
if end is not None:
end = float(end)
(intervals, truncated ) = self.db.stream_intervals(path, start, end)
(intervals, truncated) = self.db.stream_intervals(path, start, end)
return (intervals, truncated)

class Exiter(object):


+ 2
- 4
tests/test_cmdline.py View File

@@ -349,7 +349,5 @@ class TestCmdline(object):
self.contain("10:05:15.500")

def test_cmdline_8_extract(self):
self.ok("extract --help")
# self.dump()

# self.ok("extract /newton/prep")
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
self.dump()

Loading…
Cancel
Save