Browse Source

nilmdb now caches the intervals the first time a particular stream's

intervals are accessed, so it doesn't need to keep rebuilding them as
long as it's running. 


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10800 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 10 years ago
parent
commit
47245df9bd
3 changed files with 53 additions and 20 deletions
  1. +33
    -12
      nilmdb/nilmdb.py
  2. +17
    -5
      tests/test_cmdline.py
  3. +3
    -3
      tests/test_nilmdb.py

+ 33
- 12
nilmdb/nilmdb.py View File

@@ -115,6 +115,9 @@ class NilmDB(object):

self.opened = True

# Cached intervals
self._cached_iset = {}

def __del__(self):
if "opened" in self.__dict__: # pragma: no cover
fprintf(sys.stderr,
@@ -148,22 +151,40 @@ class NilmDB(object):

def _get_intervals(self, stream_id):
"""
Return an IntervalSet corresponding to the given stream ID.
Return a mutable IntervalSet corresponding to the given stream ID.
"""
# Load from database if not cached
if stream_id not in self._cached_iset:
iset = IntervalSet()
result = self.con.execute("SELECT start_time, end_time, "
"start_pos, end_pos "
"FROM ranges "
"WHERE stream_id=?", (stream_id,))
try:
for (start_time, end_time, start_pos, end_pos) in result:
# XXX TODO: store start_pos, end_pos too
iset += Interval(start_time, end_time)
except IntervalError as e: # pragma: no cover
raise NilmDBError("unexpected overlap in ranges table!")
self._cached_iset[stream_id] = iset
# Return cached value
return self._cached_iset[stream_id]

def _add_interval(self, stream_id, interval, start_pos, end_pos):
"""
Add interval to the internal interval cache, and to the database.
Note: arguments must be ints (not numpy.int64, etc)
"""
# Could cache these, if it's a performance bottleneck
iset = IntervalSet()
result = self.con.execute("SELECT start_time, end_time "
"FROM ranges "
"WHERE stream_id=?", (stream_id,))
# Ensure this stream's intervals are cached, and add the new
# interval to that cache.
iset = self._get_intervals(stream_id)
try:
for (start, end) in result:
iset += Interval(start, end)
# XXX TODO: Intervals should hold start_pos, end_pos too
iset += Interval(interval.start, interval.end)
except IntervalError as e: # pragma: no cover
raise NilmDBError("unexpected overlap in ranges table!")
return iset
raise NilmDBError("new interval overlaps existing data")

def _add_interval(self, stream_id, interval, start_pos, end_pos):
# Arguments must be ints (not numpy.int64, etc)
# Insert into the database
self.con.execute("INSERT INTO ranges "
"(stream_id,start_time,end_time,start_pos,end_pos) "
"VALUES (?,?,?,?,?)",


+ 17
- 5
tests/test_cmdline.py View File

@@ -21,11 +21,8 @@ from test_helpers import *

testdb = "tests/cmdline-testdb"

def setup_module():
def server_start():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)

# Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
@@ -34,12 +31,21 @@ def setup_module():
force_traceback = False)
test_server.start(blocking = False)

def teardown_module():
def server_stop():
global test_server, test_db
# Close web app
test_server.stop()
test_db.close()

def setup_module():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)
server_start()

def teardown_module():
server_stop()

class TestCmdline(object):

def run(self, arg_string, infile=None, outfile=None):
@@ -283,6 +289,12 @@ class TestCmdline(object):
"tests/data/prep-20120323T1004")
self.contain("overlap")

# Just to help test more situations -- stop and restart
# the server now. This tests nilmdb's interval caching,
# at the very least.
server_stop()
server_start()

# not an overlap if we specify a different start
self.fail("insert --start '03/23/2012 06:05:00' /newton/prep "
"tests/data/prep-20120323T1004")


+ 3
- 3
tests/test_nilmdb.py View File

@@ -98,7 +98,7 @@ class TestBlockingServer(object):

def tearDown(self):
self.db.close()
def test_blocking_server(self):
# Start web app on a custom port
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
@@ -111,13 +111,13 @@ class TestBlockingServer(object):
thread = threading.Thread(target = run_server)
thread.start()
event.wait(timeout = 2)
# Send request to exit.
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)

# Wait for it
thread.join()
def geturl(path):
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
return req.read()


Loading…
Cancel
Save