From d634f7d3cf665e24d5869ca4c71a35ab56f269bb Mon Sep 17 00:00:00 2001 From: Jim Paris Date: Wed, 13 Feb 2013 20:27:05 -0500 Subject: [PATCH] bulkdata: Use file writes instead of writing to the mmap. Extending and then writing to the mmap file has a problem: if the disk fills up, the mapping becomes invalid, and the Python interpreter will get a SIGBUS, killing it. It's difficult to catch this gracefully; there's no way to do that with existing modules. Instead, switch to only using mmap when reading, and normal file writes when writing. Since we only ever append, it should have similar performance. --- nilmdb/server/bulkdata.py | 107 ++++++++++++++++++++++++-------------- nilmdb/server/nilmdb.py | 2 +- 2 files changed, 69 insertions(+), 40 deletions(-) diff --git a/nilmdb/server/bulkdata.py b/nilmdb/server/bulkdata.py index db97e63..644e780 100644 --- a/nilmdb/server/bulkdata.py +++ b/nilmdb/server/bulkdata.py @@ -13,6 +13,16 @@ import struct import mmap import re +# If we have the faulthandler module, use it. All of the mmap stuff +# might trigger a SIGSEGV or SIGBUS if we're not careful, and +# faulthandler will give a traceback in that case. (the Python +# interpreter will still die either way). +try: # pragma: no cover + import faulthandler + faulthandler.enable() +except: # pragma: no cover + pass + # Up to 256 open file descriptors at any given time. # These variables are global so they can be used in the decorator arguments. table_cache_size = 16 @@ -161,6 +171,52 @@ class BulkData(object): ospath = os.path.join(self.root, *elements) return Table(ospath) +@nilmdb.utils.must_close(wrap_verify = True) +class File(object): + """Object representing a single file on disk. Data can be appended, + or the self.mmap handle can be used for random reads.""" + + def __init__(self, root, subdir, filename): + # Create path if it doesn't exist + try: + os.mkdir(os.path.join(root, subdir)) + except OSError: + pass + + # Open/create file + self._f = open(os.path.join(root, subdir, filename), "a+b", 0) + + # Seek to end, and get size + self._f.seek(0, 2) + self.size = self._f.tell() + + # Open mmap object + self.mmap = None + self._mmap_reopen() + + def _mmap_reopen(self): + if self.size == 0: + # Don't mmap if the file is empty; it would fail + pass + elif self.mmap is None: + # Not opened yet, so open it + self.mmap = mmap.mmap(self._f.fileno(), 0) + else: + # Already opened, so just resize it + self.mmap.resize(self.size) + + def close(self): + if self.mmap is not None: + self.mmap.close() + self._f.close() + + def append(self, data): + # Write data, flush it, and resize our mmap accordingly + self._f.write(data) + self._f.flush() + self.size += len(data) + self._mmap_reopen() + @nilmdb.utils.must_close(wrap_verify = True) class Table(object): """Tools to help access a single table (data at a specific OS path).""" @@ -211,7 +267,7 @@ class Table(object): self.nrows = self._get_nrows() def close(self): - self.mmap_open.cache_remove_all() + self.file_open.cache_remove_all() # Internal helpers def _get_nrows(self): @@ -275,37 +331,11 @@ class Table(object): # Cache open files @nilmdb.utils.lru_cache(size = fd_cache_size, - keys = slice(0, 3), # exclude newsize - onremove = lambda x: x.close()) - def mmap_open(self, subdir, filename, newsize = None): + onremove = lambda f: f.close()) + def file_open(self, subdir, filename): """Open and map a given 'subdir/filename' (relative to self.root). - Will be automatically closed when evicted from the cache. - - If 'newsize' is provided, the file is truncated to the given - size before the mapping is returned. (Note that the LRU cache - on this function means the truncate will only happen if the - object isn't already cached; mmap.resize should be used too.)""" - try: - os.mkdir(os.path.join(self.root, subdir)) - except OSError: - pass - f = open(os.path.join(self.root, subdir, filename), "a+", 0) - if newsize is not None: - # mmap can't map a zero-length file, so this allows the - # caller to set the filesize between file creation and - # mmap. - f.truncate(newsize) - mm = mmap.mmap(f.fileno(), 0) - return mm - - def mmap_open_resize(self, subdir, filename, newsize): - """Open and map a given 'subdir/filename' (relative to self.root). - The file is resized to the given size.""" - # Pass new size to mmap_open - mm = self.mmap_open(subdir, filename, newsize) - # In case we got a cached copy, need to call mm.resize too. - mm.resize(newsize) - return mm + Will be automatically closed when evicted from the cache.""" + return File(self.root, subdir, filename) def append(self, data): """Append the data and flush it to disk. @@ -317,14 +347,13 @@ class Table(object): (subdir, fname, offset, count) = self._offset_from_row(self.nrows) if count > remaining: count = remaining - newsize = offset + count * self.packer.size - mm = self.mmap_open_resize(subdir, fname, newsize) - mm.seek(offset) + + f = self.file_open(subdir, fname) # Write the data for i in xrange(count): row = dataiter.next() - mm.write(self.packer.pack(*row)) + f.append(self.packer.pack(*row)) remaining -= count self.nrows += count @@ -351,7 +380,7 @@ class Table(object): (subdir, filename, offset, count) = self._offset_from_row(row) if count > remaining: count = remaining - mm = self.mmap_open(subdir, filename) + mm = self.file_open(subdir, filename).mmap for i in xrange(count): ret.append(list(self.packer.unpack_from(mm, offset))) offset += self.packer.size @@ -363,7 +392,7 @@ class Table(object): if key < 0 or key >= self.nrows: raise IndexError("Index out of range") (subdir, filename, offset, count) = self._offset_from_row(key) - mm = self.mmap_open(subdir, filename) + mm = self.file_open(subdir, filename).mmap # unpack_from ignores the mmap object's current seek position return list(self.packer.unpack_from(mm, offset)) @@ -410,8 +439,8 @@ class Table(object): # are generally easier if we don't have to special-case that. if (len(merged) == 1 and merged[0][0] == 0 and merged[0][1] == self.rows_per_file): - # Close potentially open file in mmap_open LRU cache - self.mmap_open.cache_remove(self, subdir, filename) + # Close potentially open file in file_open LRU cache + self.file_open.cache_remove(self, subdir, filename) # Delete files os.remove(datafile) diff --git a/nilmdb/server/nilmdb.py b/nilmdb/server/nilmdb.py index 0603908..5737c8b 100644 --- a/nilmdb/server/nilmdb.py +++ b/nilmdb/server/nilmdb.py @@ -396,7 +396,7 @@ class NilmDB(object): path: Path at which to add the data start: Starting timestamp end: Ending timestamp - data: Rows of data, to be passed to PyTable's table.append + data: Rows of data, to be passed to bulkdata table.append method. E.g. nilmdb.layout.Parser.data """ # First check for basic overlap using timestamp info given.