You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

298 lines
11 KiB

  1. # Fixed record size bulk data storage
  2. from __future__ import absolute_import
  3. from __future__ import division
  4. import nilmdb
  5. from nilmdb.utils.printf import *
  6. import os
  7. import sys
  8. import cPickle as pickle
  9. import struct
  10. import fnmatch
  11. import mmap
  12. # Up to 256 open file descriptors at any given time
  13. table_cache_size = 16
  14. fd_cache_size = 16
  15. @nilmdb.utils.must_close()
  16. class BulkData(object):
  17. def __init__(self, basepath):
  18. self.basepath = basepath
  19. self.root = os.path.join(self.basepath, "data")
  20. # Make root path
  21. if not os.path.isdir(self.root):
  22. os.mkdir(self.root)
  23. def close(self):
  24. self.getnode.cache_remove_all()
  25. def create(self, path, layout_name):
  26. """
  27. path: path to the data (e.g. '/newton/prep').
  28. Paths must contain at least two elements, e.g.:
  29. /newton/prep
  30. /newton/raw
  31. /newton/upstairs/prep
  32. /newton/upstairs/raw
  33. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  34. """
  35. if path[0] != '/':
  36. raise ValueError("paths must start with /")
  37. [ group, node ] = path.rsplit("/", 1)
  38. if group == '':
  39. raise ValueError("invalid path")
  40. # Get layout, and build format string for struct module
  41. try:
  42. layout = nilmdb.layout.get_named(layout_name)
  43. struct_fmt = '<d' # Little endian, double timestamp
  44. struct_mapping = {
  45. "int8": 'b',
  46. "uint8": 'B',
  47. "int16": 'h',
  48. "uint16": 'H',
  49. "int32": 'i',
  50. "uint32": 'I',
  51. "int64": 'q',
  52. "uint64": 'Q',
  53. "float32": 'f',
  54. "float64": 'd',
  55. }
  56. for n in range(layout.count):
  57. struct_fmt += struct_mapping[layout.datatype]
  58. except KeyError:
  59. raise ValueError("no such layout, or bad data types")
  60. # Create the table. Note that we make a distinction here
  61. # between NilmDB paths (always Unix style, split apart
  62. # manually) and OS paths (built up with os.path.join)
  63. try:
  64. # Make directories leading up to this one
  65. elements = path.lstrip('/').split('/')
  66. for i in range(len(elements)):
  67. ospath = os.path.join(self.root, *elements[0:i])
  68. if Table.exists(ospath):
  69. raise ValueError("path is subdir of existing node")
  70. if not os.path.isdir(ospath):
  71. os.mkdir(ospath)
  72. # Make the final dir
  73. ospath = os.path.join(self.root, *elements)
  74. if os.path.isdir(ospath):
  75. raise ValueError("subdirs of this path already exist")
  76. os.mkdir(ospath)
  77. # Write format string to file
  78. Table.create(ospath, struct_fmt)
  79. except OSError as e:
  80. raise ValueError("error creating table at that path: " + e.strerror)
  81. # Open and cache it
  82. self.getnode(path)
  83. # Success
  84. return
  85. def destroy(self, path):
  86. """Fully remove all data at a particular path. No way to undo
  87. it! The group/path structure is removed, too."""
  88. # Get OS path
  89. elements = path.lstrip('/').split('/')
  90. ospath = os.path.join(self.root, *elements)
  91. # Remove Table object from cache
  92. self.getnode.cache_remove(self, ospath)
  93. # Remove the contents of the target directory
  94. if not os.path.isfile(os.path.join(ospath, "format")):
  95. raise ValueError("nothing at that path")
  96. for file in os.listdir(ospath):
  97. os.remove(os.path.join(ospath, file))
  98. # Remove empty parent directories
  99. for i in reversed(range(len(elements))):
  100. ospath = os.path.join(self.root, *elements[0:i+1])
  101. try:
  102. os.rmdir(ospath)
  103. except OSError:
  104. break
  105. # Cache open tables
  106. @nilmdb.utils.lru_cache(size = table_cache_size,
  107. onremove = lambda x: x.close())
  108. def getnode(self, path):
  109. """Return a Table object corresponding to the given database
  110. path, which must exist."""
  111. elements = path.lstrip('/').split('/')
  112. ospath = os.path.join(self.root, *elements)
  113. return Table(ospath)
  114. @nilmdb.utils.must_close()
  115. class Table(object):
  116. """Tools to help access a single table (data at a specific OS path)"""
  117. # Class methods, to help keep format details in this class.
  118. @classmethod
  119. def exists(cls, root):
  120. """Return True if a table appears to exist at this OS path"""
  121. return os.path.isfile(os.path.join(root, "format"))
  122. @classmethod
  123. def create(cls, root, struct_fmt):
  124. """Initialize a table at the given OS path.
  125. 'struct_fmt' is a Struct module format description"""
  126. format = { "rows_per_file": 4 * 1024 * 1024,
  127. "struct_fmt": struct_fmt }
  128. with open(os.path.join(root, "format"), "wb") as f:
  129. pickle.dump(format, f, 2)
  130. # Normal methods
  131. def __init__(self, root):
  132. """'root' is the full OS path to the directory of this table"""
  133. self.root = root
  134. # Load the format and build packer
  135. with open(self._fullpath("format"), "rb") as f:
  136. format = pickle.load(f)
  137. self.rows_per_file = format["rows_per_file"]
  138. self.packer = struct.Struct(format["struct_fmt"])
  139. self.file_size = self.packer.size * self.rows_per_file
  140. # Find nrows by locating the lexicographically last filename
  141. # and using its size.
  142. pattern = '[0-9a-f]' * 8
  143. allfiles = fnmatch.filter(os.listdir(self.root), pattern)
  144. if allfiles:
  145. filename = max(allfiles)
  146. offset = os.path.getsize(self._fullpath(filename))
  147. self.nrows = self._row_from_fnoffset(filename, offset)
  148. else:
  149. self.nrows = 0
  150. def close(self):
  151. self.mmap_open.cache_remove_all()
  152. # Internal helpers
  153. def _fullpath(self, filename):
  154. return os.path.join(self.root, filename)
  155. def _fnoffset_from_row(self, row):
  156. """Return a (filename, offset, count) tuple:
  157. filename: the filename that contains the specified row
  158. offset: byte offset of the specified row within the file
  159. count: number of rows (starting at offste) that fit in the file
  160. """
  161. filenum = row // self.rows_per_file
  162. filename = sprintf("%08x", filenum)
  163. offset = (row % self.rows_per_file) * self.packer.size
  164. count = self.rows_per_file - (row % self.rows_per_file)
  165. return (filename, offset, count)
  166. def _row_from_fnoffset(self, filename, offset):
  167. """Return the row number that corresponds to the given
  168. filename and byte-offset within that file."""
  169. filenum = int(filename, 16)
  170. if (offset % self.packer.size) != 0:
  171. raise ValueError("file offset is not a multiple of data size")
  172. row = (filenum * self.rows_per_file) + (offset // self.packer.size)
  173. return row
  174. # Cache open files
  175. @nilmdb.utils.lru_cache(size = fd_cache_size,
  176. onremove = lambda x: x.close())
  177. def mmap_open(self, file, newsize = None):
  178. """Open and map a given filename (relative to self.root).
  179. Will be automatically closed when evicted from the cache.
  180. If 'newsize' is provided, the file is truncated to the given
  181. size before the mapping is returned. (Note that the LRU cache
  182. on this function means the truncate will only happen if the
  183. object isn't already cached; mmap.resize should be used too)"""
  184. f = open(os.path.join(self.root, file), "a+", 0)
  185. if newsize is not None:
  186. # mmap can't map a zero-length file, so this allows the
  187. # caller to set the filesize between file creation and
  188. # mmap.
  189. f.truncate(newsize)
  190. mm = mmap.mmap(f.fileno(), 0)
  191. return mm
  192. def append(self, data):
  193. """Append the data and flush it to disk.
  194. data is a nested Python list [[row],[row],[...]]"""
  195. remaining = len(data)
  196. dataiter = iter(data)
  197. while remaining:
  198. # See how many rows we can fit into the current file, and open it
  199. (filename, offset, count) = self._fnoffset_from_row(self.nrows)
  200. if count > remaining:
  201. count = remaining
  202. newsize = offset + count * self.packer.size
  203. mm = self.mmap_open(filename, newsize)
  204. mm.seek(offset)
  205. # Extend the file to the target length. We specified
  206. # newsize when opening, but that may have been ignored if
  207. # the mmap_open returned a cached object.
  208. mm.resize(newsize)
  209. # Write the data
  210. for i in xrange(count):
  211. row = dataiter.next()
  212. mm.write(self.packer.pack(*row))
  213. remaining -= count
  214. self.nrows += count
  215. def __getitem__(self, key):
  216. """Extract data and return it. Supports simple indexing
  217. (table[n]) and range slices (table[n:m]). Returns a nested
  218. Python list [[row],[row],[...]]"""
  219. # Handle simple slices
  220. if isinstance(key, slice):
  221. # Fall back to brute force if the slice isn't simple
  222. if ((key.step is not None and key.step != 1) or
  223. key.start is None or
  224. key.stop is None or
  225. key.start >= key.stop or
  226. key.start < 0 or
  227. key.stop > self.nrows):
  228. return [ self[x] for x in xrange(*key.indices(self.nrows)) ]
  229. ret = []
  230. row = key.start
  231. remaining = key.stop - key.start
  232. while remaining:
  233. (filename, offset, count) = self._fnoffset_from_row(row)
  234. if count > remaining:
  235. count = remaining
  236. mm = self.mmap_open(filename)
  237. for i in xrange(count):
  238. ret.append(list(self.packer.unpack_from(mm, offset)))
  239. offset += self.packer.size
  240. remaining -= count
  241. row += count
  242. return ret
  243. # Handle single points
  244. if key < 0 or key >= self.nrows:
  245. raise IndexError("Index out of range")
  246. (filename, offset, count) = self._fnoffset_from_row(key)
  247. mm = self.mmap_open(filename)
  248. # unpack_from ignores the mmap object's current seek position
  249. return self.packer.unpack_from(mm, offset)
  250. class TimestampOnlyTable(object):
  251. """Helper that lets us pass a Tables object into bisect, by
  252. returning only the timestamp when a particular row is requested."""
  253. def __init__(self, table):
  254. self.table = table
  255. def __getitem__(self, index):
  256. return self.table[index][0]