Browse Source

Remove implemented in nilmdb; still needs bulkdata changes.

tags/nilmdb-0.1
Jim Paris 11 years ago
parent
commit
fe3b087435
6 changed files with 92 additions and 32 deletions
  1. +1
    -1
      nilmdb/cmdline/remove.py
  2. +15
    -5
      nilmdb/interval.pyx
  3. +52
    -22
      nilmdb/nilmdb.py
  4. +1
    -1
      nilmdb/server.py
  5. +5
    -2
      tests/test_client.py
  6. +18
    -1
      tests/test_cmdline.py

+ 1
- 1
nilmdb/cmdline/remove.py View File

@@ -40,7 +40,7 @@ def cmd_remove(self):
try:
count = self.client.stream_remove(self.args.path,
self.args.start, self.args.end)
except nilmdb.client.ClientError as e:
except nilmdb.client.ClientError as e: # pragma: no cover (shouldn't happen)
self.die("Error removing data: %s", str(e))

if self.args.count:


+ 15
- 5
nilmdb/interval.pyx View File

@@ -270,16 +270,16 @@ cdef class IntervalSet:
out = IntervalSet()

if not isinstance(other, IntervalSet):
for (i, orig) in self.intersection(other):
for i in self.intersection(other):
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
else:
for x in other:
for (i, orig) in self.intersection(x):
for i in self.intersection(x):
out.tree.insert(rbtree.RBNode(i.start, i.end, i))

return out

def intersection(self, Interval interval not None):
def intersection(self, Interval interval not None, orig = False):
"""
Compute a sequence of intervals that correspond to the
intersection between `self` and the provided interval.
@@ -288,6 +288,10 @@ cdef class IntervalSet:

Output intervals are built as subsets of the intervals in the
first argument (self).

If orig = True, also return the original interval that was
(potentially) subsetted to make the one that is being
returned.
"""
if not isinstance(interval, Interval):
raise TypeError("bad type")
@@ -295,11 +299,17 @@ cdef class IntervalSet:
i = n.obj
if i:
if i.start >= interval.start and i.end <= interval.end:
yield (i, i)
if orig:
yield (i, i)
else:
yield i
else:
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield (subset, i)
if orig:
yield (subset, i)
else:
yield subset

cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval"""


+ 52
- 22
nilmdb/nilmdb.py View File

@@ -175,14 +175,14 @@ class NilmDB(object):
return iset

def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
"""Add interval to the SQL database only"""
"""Helper that adds interval to the SQL database only"""
self.con.execute("INSERT INTO ranges "
"(stream_id,start_time,end_time,start_pos,end_pos) "
"VALUES (?,?,?,?,?)",
(id, start, end, start_pos, end_pos))

def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
"""Remove interval from the SQL database only"""
"""Helper that removes interval from the SQL database only"""
self.con.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
@@ -215,10 +215,8 @@ class NilmDB(object):
# database
iset -= adjacent
self._sql_interval_delete(stream_id,
adjacent.db_start,
adjacent.db_end,
adjacent.db_startpos,
adjacent.db_endpos)
adjacent.db_start, adjacent.db_end,
adjacent.db_startpos, adjacent.db_endpos)

# Now update our interval so the fallthrough add is
# correct.
@@ -236,7 +234,7 @@ class NilmDB(object):

self.con.commit()

def _remove_interval(self, stream_id, original, to_remove):
def _remove_interval(self, stream_id, original, remove):
"""
Remove an interval from the internal cache and the database.

@@ -244,12 +242,40 @@ class NilmDB(object):
original: original DBInterval; must be already present in DB
to_remove: DBInterval to remove; must be subset of 'original'
"""
#DBInterval(1332496800.0, 1332496919.991668,
# 1332496800.0, 1332496919.991668,
# 14400L, 28800L)
#Interval(1332496830.0, 1332496830.0)
# Just return if we have nothing to remove
if remove.start == remove.end: # pragma: no cover
return

# Load this stream's intervals
iset = self._get_intervals(stream_id)

# Remove existing interval from the cached set and the database
iset -= original
self._sql_interval_delete(stream_id,
original.db_start, original.db_end,
original.db_startpos, original.db_endpos)

# Add back the intervals that would be left over if the
# requested interval is removed. There may be two of them, if
# the removed piece was in the middle.
def add(iset, start, end, start_pos, end_pos):
iset += DBInterval(start, end, start, end, start_pos, end_pos)
self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)

if original.start != remove.start:
# Interval before the removed region
add(iset, original.start, remove.start,
original.db_startpos, remove.db_startpos)

if original.end != remove.end:
# Interval after the removed region
add(iset, remove.end, original.end,
remove.db_endpos, original.db_endpos)

# Commit SQL changes
self.con.commit()

return
raise NotImplementedError(repr((stream_id, original, to_remove)))

def stream_list(self, path = None, layout = None):
"""Return list of [path, layout] lists of all streams
@@ -291,7 +317,7 @@ class NilmDB(object):
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
result = []
for n, (i, orig) in enumerate(intervals.intersection(requested)):
for n, i in enumerate(intervals.intersection(requested)):
if n >= self.max_results:
restart = i.start
break
@@ -466,7 +492,7 @@ class NilmDB(object):
matched = 0
remaining = self.max_results
restart = 0
for (interval, orig) in intervals.intersection(requested):
for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and
# ending row for this particular interval, then
@@ -513,19 +539,23 @@ class NilmDB(object):
if start == end:
return 0

for (interval, orig) in intervals.intersection(to_remove):
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True))

for (dbint, orig) in all_candidates:
# Find row start and end
row_start = self._find_start(table, interval)
row_end = self._find_end(table, interval)
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)

# Adjust the DBInterval to match the newly found ends
interval.db_start = interval.start
interval.db_end = interval.end
interval.db_startpos = row_start
interval.db_endpos = row_end
dbint.db_start = dbint.start
dbint.db_end = dbint.end
dbint.db_startpos = row_start
dbint.db_endpos = row_end

# Remove interval from the database
self._remove_interval(stream_id, orig, interval)
self._remove_interval(stream_id, orig, dbint)

# Remove data from the underlying table storage
table.remove(row_start, row_end)


+ 1
- 1
nilmdb/server.py View File

@@ -33,7 +33,7 @@ def chunked_response(func):
"""Decorator to enable chunked responses"""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': False }
func._cp_config = { 'response.stream': True }
return func

def workaround_cp_bug_1200(func): # pragma: no cover (just a workaround)


+ 5
- 2
tests/test_client.py View File

@@ -218,13 +218,16 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("OverlapError", str(e.exception))

def test_client_5_extract(self):
# Misc tests for extract. Most of them are in test_cmdline.
def test_client_5_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = "http://localhost:12380/")

for x in client.stream_extract("/newton/prep", 123, 123):
raise Exception("shouldn't be any data for this request")

with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)

def test_client_6_generators(self):
# A lot of the client functionality is already tested by test_cmdline,
# but this gets a bit more coverage that cmdline misses.


+ 18
- 1
tests/test_cmdline.py View File

@@ -504,6 +504,9 @@ class TestCmdline(object):
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("Error getting stream info")

self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")

# empty ranges return success, backwards ranges return error
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
@@ -575,6 +578,20 @@ class TestCmdline(object):
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")

# Remove all data, verify it's missing
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("") # no count requested this time
self.ok("list --detail /newton/prep")
self.match("/newton/prep PrepData\n" +
" (no intervals)\n")

# Reinsert some data, to verify that no overlaps with deleted
# data are reported
os.environ['TZ'] = "UTC"
self.ok("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1000 "
"tests/data/prep-20120323T1002")

def test_11_destroy(self):
# Delete records
self.ok("destroy --help")
@@ -596,7 +613,7 @@ class TestCmdline(object):

# Notice how they're not empty
self.ok("list --detail")
lines_(self.captured, 8)
lines_(self.captured, 7)

# Delete some
self.ok("destroy /newton/prep")


Loading…
Cancel
Save