Browse Source

Progress and tests for bulkdata.remove

Passes tests, but doesn't really handle nrows (and removing partially
full files) correctly, when deleting near the end of the data.
tags/nilmdb-0.1
Jim Paris 11 years ago
parent
commit
7807d6caf0
3 changed files with 175 additions and 8 deletions
  1. +75
    -6
      nilmdb/bulkdata.py
  2. +5
    -0
      tests/test_bulkdata.py
  3. +95
    -2
      tests/test_cmdline.py

+ 75
- 6
nilmdb/bulkdata.py View File

@@ -14,7 +14,7 @@ import mmap
import re

# Up to 256 open file descriptors at any given time.
# Global to this module so they can be used in the decorator arguments.
# These variables are global so they can be used in the decorator arguments.
table_cache_size = 16
fd_cache_size = 16

@@ -245,7 +245,7 @@ class Table(object):
subdir: subdirectory for the file
filename: the filename that contains the specified row
offset: byte offset of the specified row within the file
count: number of rows (starting at offste) that fit in the file
count: number of rows (starting at offset) that fit in the file
"""
filenum = row // self.rows_per_file
# It's OK if these format specifiers are too short; the filenames
@@ -359,14 +359,83 @@ class Table(object):
# unpack_from ignores the mmap object's current seek position
return list(self.packer.unpack_from(mm, offset))

def remove(self, start, end):
"""Remove specified rows [start, end) from this table.
def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a
file, and potentially removing or truncating the file itself."""
# Import an existing list of deleted rows for this file
datafile = os.path.join(self.root, subdir, filename)
cachefile = datafile + ".removed"
try:
with open(cachefile, "rb") as f:
ranges = pickle.load(f)
cachefile_present = True
except:
ranges = []
cachefile_present = False

# Append our new range and sort
ranges.append((start, stop))
ranges.sort()

# Merge adjacent ranges into "out"
merged = []
prev = None
for new in ranges:
if prev is None:
# No previous range, so remember this one
prev = new
elif prev[1] == new[0]:
# Previous range connected to this new one; extend prev
prev = (prev[0], new[1])
else:
# Not connected; append previous and start again
merged.append(prev)
prev = new
if prev is not None:
merged.append(prev)

# If the range covered the whole file, we can delete it now.
if (len(merged) == 1 and
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
# Close potentially open file in mmap_open LRU cache
self.mmap_open.cache_remove(self, subdir, filename)

# Delete files
os.remove(datafile)
if cachefile_present:
os.remove(cachefile)

# Try deleting subdir, too
try:
os.rmdir(os.path.join(self.root, subdir))
except:
pass
else:
# Update cache
with open(cachefile, "wb") as f:
pickle.dump(merged, f, 2)

def remove(self, start, stop):
"""Remove specified rows [start, stop) from this table.

If a file is left empty, it is fully removed. Otherwise, a
parallel data file is used to remember which rows have been
removed, and the file is otherwise untouched."""
return
raise NotImplementedError()
if start < 0 or start > stop or stop > self.nrows:
raise IndexError("Index out of range")

row = start
remaining = stop - start
while remaining:
# Loop through each file that we need to touch
(subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining:
count = remaining
row_offset = offset // self.packer.size
# Mark the rows as being removed
self._remove_rows(subdir, filename, row_offset, row_offset + count)
remaining -= count
row += count

class TimestampOnlyTable(object):
"""Helper that lets us pass a Tables object into bisect, by


+ 5
- 0
tests/test_bulkdata.py View File

@@ -71,6 +71,11 @@ class TestBulkData(object):
for s in misc_slices:
eq_(node[s], raw[s])

# Get some coverage of remove; remove is more fully tested
# in cmdline
with assert_raises(IndexError):
node.remove(9999,9998)

# close, reopen
# reopen
data.close()


+ 95
- 2
tests/test_cmdline.py View File

@@ -667,7 +667,7 @@ class TestCmdline(object):
# Test BulkData's ability to split into multiple files,
# by forcing the file size to be really small.
server_stop()
server_start(bulkdata_args = { "file_size" : 999,
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })

# Fill data
@@ -702,6 +702,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are
# fewer files.
self.ok("destroy /newton/prep")
self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
with open("tests/data/prep-20120323T1004-timestamped") as input:
@@ -709,4 +710,96 @@ class TestCmdline(object):
nfiles = 0
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
assert(nfiles < 50)
lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again

def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })

# Insert data. Just for fun, insert out of order
self.ok("create /newton/prep PrepData")
os.environ['TZ'] = "UTC"
self.ok("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1002 "
"tests/data/prep-20120323T1000")

# Should take up about 2.8 MB here (including directory entries)
du_before = nilmdb.utils.diskusage.du_bytes(testdb)

# Make sure we have the data we expect
self.ok("list --detail")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n")

# Remove various chunks of prep data and make sure
# they're gone.
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("28800\n")

self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:03:30'")
self.match("21600\n")

self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:10' " +
"--end '23 Mar 2012 10:00:20'")
self.match("1200\n")

self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:05' " +
"--end '23 Mar 2012 10:00:25'")
self.match("1200\n")

self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:03:50' " +
"--end '23 Mar 2012 10:06:50'")
self.match("1200\n")

self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("3600\n")

# See the missing chunks in list output
self.ok("list --detail")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:00:25.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:03:30.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")

# We have 1/8 of the data that we had before, so the file size
# should have dropped below 1/4 of what it used to be
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
lt_(du_after, (du_before / 4))

# Remove anything that came from the 10:02 data file
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:02:00' --end '2020-01-01'")

# Shut down and restart server, to force nrows to get refreshed
# global test_server, test_db
# raise Exception()
# print test_db.data.getnode("/newton/prep")
server_stop()
server_start()
# print test_db.data.getnode("/newton/prep")

# Re-add the full 10:02 data file. This tests adding new data once
# we removed data near the end.
self.ok("insert --rate 120 /newton/prep tests/data/prep-20120323T1002")

# See if we can extract it all
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 15600)

# raise Exception()

Loading…
Cancel
Save