Browse Source

Some prep work for merging adjacent insertions.

Doesn't actually merge them yet; need to change Interval
implementation to allow deletes.


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@11354 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 11 years ago
parent
commit
ea3e92be3f
6 changed files with 48 additions and 13 deletions
  1. +3
    -1
      nilmdb/client.py
  2. +23
    -4
      nilmdb/nilmdb.py
  3. +19
    -5
      nilmdb/server.py
  4. +1
    -1
      setup.cfg
  5. +1
    -1
      tests/test_client.py
  6. +1
    -1
      tests/test_interval.py

+ 3
- 1
nilmdb/client.py View File

@@ -95,7 +95,9 @@ class Client(object):
max_time = 30

def sendit():
return self.http.put("stream/insert", send_data, params)
result = self.http.put("stream/insert", send_data, params)
params["old_timestamp"] = result[1]
return result

result = None
start = time.time()


+ 23
- 4
nilmdb/nilmdb.py View File

@@ -192,6 +192,15 @@ class NilmDB(object):
# Return cached value
return self._cached_iset[stream_id]

# TODO: Split add_interval into two pieces, one to add
# and one to flush to disk?
# Need to think about this. Basic problem is that we can't
# mess with intervals once they're in the IntervalSet,
# without mucking with bxinterval internals.

# Maybe add a separate optimization step?
# Join intervals that have a fairly small gap between them

def _add_interval(self, stream_id, interval, start_pos, end_pos):
"""
Add interval to the internal interval cache, and to the database.
@@ -201,7 +210,6 @@ class NilmDB(object):
# interval to that cache.
iset = self._get_intervals(stream_id)
try:
# XXX TODO: Intervals should hold start_pos, end_pos too
iset += DBInterval(interval.start, interval.end,
interval.start, interval.end,
start_pos, end_pos)
@@ -353,7 +361,7 @@ class NilmDB(object):
data.update(newdata)
self.stream_set_metadata(path, data)

def stream_insert(self, path, parser):
def stream_insert(self, path, parser, old_timestamp = None):
"""Insert new data into the database.
path: Path at which to add the data
parser: nilmdb.layout.Parser instance full of data to insert
@@ -362,10 +370,21 @@ class NilmDB(object):
not len(parser.data)):
raise StreamError("no data provided")

# First check for basic overlap using timestamp info from the parser.
# If we were provided with an old timestamp, the expectation
# is that the client has a contiguous block of time it is sending,
# but it's doing it over multiple calls to stream_insert.
# old_timestamp is the max_timestamp of the previous insert.
# To make things continuous, use that as our starting timestamp
# instead of what the parser found.
if old_timestamp:
min_timestamp = old_timestamp
else:
min_timestamp = parser.min_timestamp

# First check for basic overlap using timestamp info given.
stream_id = self._stream_id(path)
iset = self._get_intervals(stream_id)
interval = Interval(parser.min_timestamp, parser.max_timestamp)
interval = Interval(min_timestamp, parser.max_timestamp)
if iset.intersects(interval):
raise OverlapError("new data overlaps existing data: "
+ str(iset & interval))


+ 19
- 5
nilmdb/server.py View File

@@ -145,9 +145,18 @@ class Stream(NilmApp):
@cherrypy.expose
@cherrypy.tools.json_out()
#@cherrypy.tools.disable_prb()
def insert(self, path):
"""Insert new data into the database. Provide textual data
(matching the path's layout) as a HTTP PUT"""
def insert(self, path, old_timestamp = None):
"""
Insert new data into the database. Provide textual data
(matching the path's layout) as a HTTP PUT.

old_timestamp is used when making multiple, split-up insertions
for a larger contiguous block of data. The first insert
will return the maximum timestamp that it saw, and the second
insert should provide this timestamp as an argument. This is
used to extend the previous database interval rather than
start a new one.
"""

# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
@@ -175,11 +184,16 @@ class Stream(NilmApp):

# Now do the nilmdb insert, passing it the parser full of data.
try:
result = self.db.stream_insert(path, parser)
if old_timestamp:
old_timestamp = float(old_timestamp)
result = self.db.stream_insert(path, parser, old_timestamp)
except nilmdb.nilmdb.NilmDBError as e:
raise cherrypy.HTTPError("400 Bad Request", e.message)

return "ok"
# Return the maximum timestamp that we saw. The client will
# return this back to us as the old_timestamp parameter, if
# it has more data to send.
return ("ok", parser.max_timestamp)

# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0


+ 1
- 1
setup.cfg View File

@@ -12,7 +12,7 @@ stop=
verbosity=2
#tests=tests/test_cmdline.py
#tests=tests/test_layout.py
#tests=tests/test_interval.py
tests=tests/test_interval.py
#tests=tests/test_client.py
#tests=tests/test_timestamper.py
#tests=tests/test_serializer.py


+ 1
- 1
tests/test_client.py View File

@@ -162,7 +162,7 @@ class TestClient(object):
# Now do the real load
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
result = client.stream_insert("/newton/prep", data)
eq_(result, "ok")
eq_(result[0], "ok")

# Try some overlapping data -- just insert it again
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)


+ 1
- 1
tests/test_interval.py View File

@@ -256,7 +256,7 @@ class TestInterval:
assert(isinstance(i, DBInterval))

class TestIntervalSpeed:
@unittest.skip("this is slow")
#@unittest.skip("this is slow")
def test_interval_speed(self):
import yappi
import time


Loading…
Cancel
Save