diff --git a/nilmdb/server/bulkdata.py b/nilmdb/server/bulkdata.py index db97e63..644e780 100644 --- a/nilmdb/server/bulkdata.py +++ b/nilmdb/server/bulkdata.py @@ -13,6 +13,16 @@ import struct import mmap import re +# If we have the faulthandler module, use it. All of the mmap stuff +# might trigger a SIGSEGV or SIGBUS if we're not careful, and +# faulthandler will give a traceback in that case. (the Python +# interpreter will still die either way). +try: # pragma: no cover + import faulthandler + faulthandler.enable() +except: # pragma: no cover + pass + # Up to 256 open file descriptors at any given time. # These variables are global so they can be used in the decorator arguments. table_cache_size = 16 @@ -161,6 +171,52 @@ class BulkData(object): ospath = os.path.join(self.root, *elements) return Table(ospath) +@nilmdb.utils.must_close(wrap_verify = True) +class File(object): + """Object representing a single file on disk. Data can be appended, + or the self.mmap handle can be used for random reads.""" + + def __init__(self, root, subdir, filename): + # Create path if it doesn't exist + try: + os.mkdir(os.path.join(root, subdir)) + except OSError: + pass + + # Open/create file + self._f = open(os.path.join(root, subdir, filename), "a+b", 0) + + # Seek to end, and get size + self._f.seek(0, 2) + self.size = self._f.tell() + + # Open mmap object + self.mmap = None + self._mmap_reopen() + + def _mmap_reopen(self): + if self.size == 0: + # Don't mmap if the file is empty; it would fail + pass + elif self.mmap is None: + # Not opened yet, so open it + self.mmap = mmap.mmap(self._f.fileno(), 0) + else: + # Already opened, so just resize it + self.mmap.resize(self.size) + + def close(self): + if self.mmap is not None: + self.mmap.close() + self._f.close() + + def append(self, data): + # Write data, flush it, and resize our mmap accordingly + self._f.write(data) + self._f.flush() + self.size += len(data) + self._mmap_reopen() + @nilmdb.utils.must_close(wrap_verify = True) class Table(object): """Tools to help access a single table (data at a specific OS path).""" @@ -211,7 +267,7 @@ class Table(object): self.nrows = self._get_nrows() def close(self): - self.mmap_open.cache_remove_all() + self.file_open.cache_remove_all() # Internal helpers def _get_nrows(self): @@ -275,37 +331,11 @@ class Table(object): # Cache open files @nilmdb.utils.lru_cache(size = fd_cache_size, - keys = slice(0, 3), # exclude newsize - onremove = lambda x: x.close()) - def mmap_open(self, subdir, filename, newsize = None): + onremove = lambda f: f.close()) + def file_open(self, subdir, filename): """Open and map a given 'subdir/filename' (relative to self.root). - Will be automatically closed when evicted from the cache. - - If 'newsize' is provided, the file is truncated to the given - size before the mapping is returned. (Note that the LRU cache - on this function means the truncate will only happen if the - object isn't already cached; mmap.resize should be used too.)""" - try: - os.mkdir(os.path.join(self.root, subdir)) - except OSError: - pass - f = open(os.path.join(self.root, subdir, filename), "a+", 0) - if newsize is not None: - # mmap can't map a zero-length file, so this allows the - # caller to set the filesize between file creation and - # mmap. - f.truncate(newsize) - mm = mmap.mmap(f.fileno(), 0) - return mm - - def mmap_open_resize(self, subdir, filename, newsize): - """Open and map a given 'subdir/filename' (relative to self.root). - The file is resized to the given size.""" - # Pass new size to mmap_open - mm = self.mmap_open(subdir, filename, newsize) - # In case we got a cached copy, need to call mm.resize too. - mm.resize(newsize) - return mm + Will be automatically closed when evicted from the cache.""" + return File(self.root, subdir, filename) def append(self, data): """Append the data and flush it to disk. @@ -317,14 +347,13 @@ class Table(object): (subdir, fname, offset, count) = self._offset_from_row(self.nrows) if count > remaining: count = remaining - newsize = offset + count * self.packer.size - mm = self.mmap_open_resize(subdir, fname, newsize) - mm.seek(offset) + + f = self.file_open(subdir, fname) # Write the data for i in xrange(count): row = dataiter.next() - mm.write(self.packer.pack(*row)) + f.append(self.packer.pack(*row)) remaining -= count self.nrows += count @@ -351,7 +380,7 @@ class Table(object): (subdir, filename, offset, count) = self._offset_from_row(row) if count > remaining: count = remaining - mm = self.mmap_open(subdir, filename) + mm = self.file_open(subdir, filename).mmap for i in xrange(count): ret.append(list(self.packer.unpack_from(mm, offset))) offset += self.packer.size @@ -363,7 +392,7 @@ class Table(object): if key < 0 or key >= self.nrows: raise IndexError("Index out of range") (subdir, filename, offset, count) = self._offset_from_row(key) - mm = self.mmap_open(subdir, filename) + mm = self.file_open(subdir, filename).mmap # unpack_from ignores the mmap object's current seek position return list(self.packer.unpack_from(mm, offset)) @@ -410,8 +439,8 @@ class Table(object): # are generally easier if we don't have to special-case that. if (len(merged) == 1 and merged[0][0] == 0 and merged[0][1] == self.rows_per_file): - # Close potentially open file in mmap_open LRU cache - self.mmap_open.cache_remove(self, subdir, filename) + # Close potentially open file in file_open LRU cache + self.file_open.cache_remove(self, subdir, filename) # Delete files os.remove(datafile) diff --git a/nilmdb/server/nilmdb.py b/nilmdb/server/nilmdb.py index 0603908..5737c8b 100644 --- a/nilmdb/server/nilmdb.py +++ b/nilmdb/server/nilmdb.py @@ -396,7 +396,7 @@ class NilmDB(object): path: Path at which to add the data start: Starting timestamp end: Ending timestamp - data: Rows of data, to be passed to PyTable's table.append + data: Rows of data, to be passed to bulkdata table.append method. E.g. nilmdb.layout.Parser.data """ # First check for basic overlap using timestamp info given.