diff --git a/nilmdb/bulkdata.py b/nilmdb/bulkdata.py index 0e8e11e..1ad4677 100644 --- a/nilmdb/bulkdata.py +++ b/nilmdb/bulkdata.py @@ -14,7 +14,7 @@ import mmap import re # Up to 256 open file descriptors at any given time. -# Global to this module so they can be used in the decorator arguments. +# These variables are global so they can be used in the decorator arguments. table_cache_size = 16 fd_cache_size = 16 @@ -245,7 +245,7 @@ class Table(object): subdir: subdirectory for the file filename: the filename that contains the specified row offset: byte offset of the specified row within the file - count: number of rows (starting at offste) that fit in the file + count: number of rows (starting at offset) that fit in the file """ filenum = row // self.rows_per_file # It's OK if these format specifiers are too short; the filenames @@ -359,14 +359,83 @@ class Table(object): # unpack_from ignores the mmap object's current seek position return list(self.packer.unpack_from(mm, offset)) - def remove(self, start, end): - """Remove specified rows [start, end) from this table. + def _remove_rows(self, subdir, filename, start, stop): + """Helper to mark specific rows as being removed from a + file, and potentially removing or truncating the file itself.""" + # Import an existing list of deleted rows for this file + datafile = os.path.join(self.root, subdir, filename) + cachefile = datafile + ".removed" + try: + with open(cachefile, "rb") as f: + ranges = pickle.load(f) + cachefile_present = True + except: + ranges = [] + cachefile_present = False + + # Append our new range and sort + ranges.append((start, stop)) + ranges.sort() + + # Merge adjacent ranges into "out" + merged = [] + prev = None + for new in ranges: + if prev is None: + # No previous range, so remember this one + prev = new + elif prev[1] == new[0]: + # Previous range connected to this new one; extend prev + prev = (prev[0], new[1]) + else: + # Not connected; append previous and start again + merged.append(prev) + prev = new + if prev is not None: + merged.append(prev) + + # If the range covered the whole file, we can delete it now. + if (len(merged) == 1 and + merged[0][0] == 0 and merged[0][1] == self.rows_per_file): + # Close potentially open file in mmap_open LRU cache + self.mmap_open.cache_remove(self, subdir, filename) + + # Delete files + os.remove(datafile) + if cachefile_present: + os.remove(cachefile) + + # Try deleting subdir, too + try: + os.rmdir(os.path.join(self.root, subdir)) + except: + pass + else: + # Update cache + with open(cachefile, "wb") as f: + pickle.dump(merged, f, 2) + + def remove(self, start, stop): + """Remove specified rows [start, stop) from this table. If a file is left empty, it is fully removed. Otherwise, a parallel data file is used to remember which rows have been removed, and the file is otherwise untouched.""" - return - raise NotImplementedError() + if start < 0 or start > stop or stop > self.nrows: + raise IndexError("Index out of range") + + row = start + remaining = stop - start + while remaining: + # Loop through each file that we need to touch + (subdir, filename, offset, count) = self._offset_from_row(row) + if count > remaining: + count = remaining + row_offset = offset // self.packer.size + # Mark the rows as being removed + self._remove_rows(subdir, filename, row_offset, row_offset + count) + remaining -= count + row += count class TimestampOnlyTable(object): """Helper that lets us pass a Tables object into bisect, by diff --git a/tests/test_bulkdata.py b/tests/test_bulkdata.py index f214e59..28bd75b 100644 --- a/tests/test_bulkdata.py +++ b/tests/test_bulkdata.py @@ -71,6 +71,11 @@ class TestBulkData(object): for s in misc_slices: eq_(node[s], raw[s]) + # Get some coverage of remove; remove is more fully tested + # in cmdline + with assert_raises(IndexError): + node.remove(9999,9998) + # close, reopen # reopen data.close() diff --git a/tests/test_cmdline.py b/tests/test_cmdline.py index 263545f..c35abb6 100644 --- a/tests/test_cmdline.py +++ b/tests/test_cmdline.py @@ -667,7 +667,7 @@ class TestCmdline(object): # Test BulkData's ability to split into multiple files, # by forcing the file size to be really small. server_stop() - server_start(bulkdata_args = { "file_size" : 999, + server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file "files_per_dir" : 3 }) # Fill data @@ -702,6 +702,7 @@ class TestCmdline(object): # Now recreate the data one more time and make sure there are # fewer files. self.ok("destroy /newton/prep") + self.fail("destroy /newton/prep") # already destroyed self.ok("create /newton/prep float32_8") os.environ['TZ'] = "UTC" with open("tests/data/prep-20120323T1004-timestamped") as input: @@ -709,4 +710,96 @@ class TestCmdline(object): nfiles = 0 for (dirpath, dirnames, filenames) in os.walk(testdb): nfiles += len(filenames) - assert(nfiles < 50) + lt_(nfiles, 50) + self.ok("destroy /newton/prep") # destroy again + + def test_14_remove_files(self): + # Test BulkData's ability to remove when data is split into + # multiple files. Should be a fairly comprehensive test of + # remove functionality. + server_stop() + server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file + "files_per_dir" : 3 }) + + # Insert data. Just for fun, insert out of order + self.ok("create /newton/prep PrepData") + os.environ['TZ'] = "UTC" + self.ok("insert --rate 120 /newton/prep " + "tests/data/prep-20120323T1002 " + "tests/data/prep-20120323T1000") + + # Should take up about 2.8 MB here (including directory entries) + du_before = nilmdb.utils.diskusage.du_bytes(testdb) + + # Make sure we have the data we expect + self.ok("list --detail") + self.match("/newton/prep PrepData\n" + + " [ Fri, 23 Mar 2012 10:00:00.000000 +0000" + " -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n" + " [ Fri, 23 Mar 2012 10:02:00.000000 +0000" + " -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n") + + # Remove various chunks of prep data and make sure + # they're gone. + self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") + self.match("28800\n") + + self.ok("remove -c /newton/prep " + + "--start '23 Mar 2012 10:00:30' " + + "--end '23 Mar 2012 10:03:30'") + self.match("21600\n") + + self.ok("remove -c /newton/prep " + + "--start '23 Mar 2012 10:00:10' " + + "--end '23 Mar 2012 10:00:20'") + self.match("1200\n") + + self.ok("remove -c /newton/prep " + + "--start '23 Mar 2012 10:00:05' " + + "--end '23 Mar 2012 10:00:25'") + self.match("1200\n") + + self.ok("remove -c /newton/prep " + + "--start '23 Mar 2012 10:03:50' " + + "--end '23 Mar 2012 10:06:50'") + self.match("1200\n") + + self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") + self.match("3600\n") + + # See the missing chunks in list output + self.ok("list --detail") + self.match("/newton/prep PrepData\n" + + " [ Fri, 23 Mar 2012 10:00:00.000000 +0000" + " -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n" + " [ Fri, 23 Mar 2012 10:00:25.000000 +0000" + " -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n" + " [ Fri, 23 Mar 2012 10:03:30.000000 +0000" + " -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n") + + # We have 1/8 of the data that we had before, so the file size + # should have dropped below 1/4 of what it used to be + du_after = nilmdb.utils.diskusage.du_bytes(testdb) + lt_(du_after, (du_before / 4)) + + # Remove anything that came from the 10:02 data file + self.ok("remove /newton/prep " + + "--start '23 Mar 2012 10:02:00' --end '2020-01-01'") + + # Shut down and restart server, to force nrows to get refreshed +# global test_server, test_db +# raise Exception() +# print test_db.data.getnode("/newton/prep") + server_stop() + server_start() +# print test_db.data.getnode("/newton/prep") + + # Re-add the full 10:02 data file. This tests adding new data once + # we removed data near the end. + self.ok("insert --rate 120 /newton/prep tests/data/prep-20120323T1002") + + # See if we can extract it all + self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01") + lines_(self.captured, 15600) + +# raise Exception()