You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

460 lines
17 KiB

  1. # Fixed record size bulk data storage
  2. from __future__ import absolute_import
  3. from __future__ import division
  4. import nilmdb
  5. from nilmdb.utils.printf import *
  6. import os
  7. import sys
  8. import cPickle as pickle
  9. import struct
  10. import fnmatch
  11. import mmap
  12. import re
  13. # Up to 256 open file descriptors at any given time.
  14. # These variables are global so they can be used in the decorator arguments.
  15. table_cache_size = 16
  16. fd_cache_size = 16
  17. @nilmdb.utils.must_close(wrap_verify = True)
  18. class BulkData(object):
  19. def __init__(self, basepath, **kwargs):
  20. self.basepath = basepath
  21. self.root = os.path.join(self.basepath, "data")
  22. # Tuneables
  23. if "file_size" in kwargs:
  24. self.file_size = kwargs["file_size"]
  25. else:
  26. # Default to approximately 128 MiB per file
  27. self.file_size = 128 * 1024 * 1024
  28. if "files_per_dir" in kwargs:
  29. self.files_per_dir = kwargs["files_per_dir"]
  30. else:
  31. # 32768 files per dir should work even on FAT32
  32. self.files_per_dir = 32768
  33. # Make root path
  34. if not os.path.isdir(self.root):
  35. os.mkdir(self.root)
  36. def close(self):
  37. self.getnode.cache_remove_all()
  38. def _encode_filename(self, path):
  39. # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
  40. # because we want to be able to represent all code points and the user
  41. # will never be directly exposed to filenames. We can then do path
  42. # manipulations on the UTF-8 directly.
  43. if isinstance(path, unicode):
  44. return path.encode('utf-8')
  45. return path
  46. def create(self, unicodepath, layout_name):
  47. """
  48. unicodepath: path to the data (e.g. u'/newton/prep').
  49. Paths must contain at least two elements, e.g.:
  50. /newton/prep
  51. /newton/raw
  52. /newton/upstairs/prep
  53. /newton/upstairs/raw
  54. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  55. """
  56. path = self._encode_filename(unicodepath)
  57. if path[0] != '/':
  58. raise ValueError("paths must start with /")
  59. [ group, node ] = path.rsplit("/", 1)
  60. if group == '':
  61. raise ValueError("invalid path")
  62. # Get layout, and build format string for struct module
  63. try:
  64. layout = nilmdb.layout.get_named(layout_name)
  65. struct_fmt = '<d' # Little endian, double timestamp
  66. struct_mapping = {
  67. "int8": 'b',
  68. "uint8": 'B',
  69. "int16": 'h',
  70. "uint16": 'H',
  71. "int32": 'i',
  72. "uint32": 'I',
  73. "int64": 'q',
  74. "uint64": 'Q',
  75. "float32": 'f',
  76. "float64": 'd',
  77. }
  78. for n in range(layout.count):
  79. struct_fmt += struct_mapping[layout.datatype]
  80. except KeyError:
  81. raise ValueError("no such layout, or bad data types")
  82. # Create the table. Note that we make a distinction here
  83. # between NilmDB paths (always Unix style, split apart
  84. # manually) and OS paths (built up with os.path.join)
  85. # Make directories leading up to this one
  86. elements = path.lstrip('/').split('/')
  87. for i in range(len(elements)):
  88. ospath = os.path.join(self.root, *elements[0:i])
  89. if Table.exists(ospath):
  90. raise ValueError("path is subdir of existing node")
  91. if not os.path.isdir(ospath):
  92. os.mkdir(ospath)
  93. # Make the final dir
  94. ospath = os.path.join(self.root, *elements)
  95. if os.path.isdir(ospath):
  96. raise ValueError("subdirs of this path already exist")
  97. os.mkdir(ospath)
  98. # Write format string to file
  99. Table.create(ospath, struct_fmt, self.file_size, self.files_per_dir)
  100. # Open and cache it
  101. self.getnode(unicodepath)
  102. # Success
  103. return
  104. def destroy(self, unicodepath):
  105. """Fully remove all data at a particular path. No way to undo
  106. it! The group/path structure is removed, too."""
  107. path = self._encode_filename(unicodepath)
  108. # Get OS path
  109. elements = path.lstrip('/').split('/')
  110. ospath = os.path.join(self.root, *elements)
  111. # Remove Table object from cache
  112. self.getnode.cache_remove(self, unicodepath)
  113. # Remove the contents of the target directory
  114. if not Table.exists(ospath):
  115. raise ValueError("nothing at that path")
  116. for (root, dirs, files) in os.walk(ospath, topdown = False):
  117. for name in files:
  118. os.remove(os.path.join(root, name))
  119. for name in dirs:
  120. os.rmdir(os.path.join(root, name))
  121. # Remove empty parent directories
  122. for i in reversed(range(len(elements))):
  123. ospath = os.path.join(self.root, *elements[0:i+1])
  124. try:
  125. os.rmdir(ospath)
  126. except OSError:
  127. break
  128. # Cache open tables
  129. @nilmdb.utils.lru_cache(size = table_cache_size,
  130. onremove = lambda x: x.close())
  131. def getnode(self, unicodepath):
  132. """Return a Table object corresponding to the given database
  133. path, which must exist."""
  134. path = self._encode_filename(unicodepath)
  135. elements = path.lstrip('/').split('/')
  136. ospath = os.path.join(self.root, *elements)
  137. return Table(ospath)
  138. @nilmdb.utils.must_close(wrap_verify = True)
  139. class Table(object):
  140. """Tools to help access a single table (data at a specific OS path)."""
  141. # See design.md for design details
  142. # Class methods, to help keep format details in this class.
  143. @classmethod
  144. def exists(cls, root):
  145. """Return True if a table appears to exist at this OS path"""
  146. return os.path.isfile(os.path.join(root, "_format"))
  147. @classmethod
  148. def create(cls, root, struct_fmt, file_size, files_per_dir):
  149. """Initialize a table at the given OS path.
  150. 'struct_fmt' is a Struct module format description"""
  151. # Calculate rows per file so that each file is approximately
  152. # file_size bytes.
  153. packer = struct.Struct(struct_fmt)
  154. rows_per_file = max(file_size // packer.size, 1)
  155. format = { "rows_per_file": rows_per_file,
  156. "files_per_dir": files_per_dir,
  157. "struct_fmt": struct_fmt,
  158. "version": 1 }
  159. with open(os.path.join(root, "_format"), "wb") as f:
  160. pickle.dump(format, f, 2)
  161. # Normal methods
  162. def __init__(self, root):
  163. """'root' is the full OS path to the directory of this table"""
  164. self.root = root
  165. # Load the format and build packer
  166. with open(os.path.join(self.root, "_format"), "rb") as f:
  167. format = pickle.load(f)
  168. if format["version"] != 1: # pragma: no cover (just future proofing)
  169. raise NotImplementedError("version " + format["version"] +
  170. " bulk data store not supported")
  171. self.rows_per_file = format["rows_per_file"]
  172. self.files_per_dir = format["files_per_dir"]
  173. self.packer = struct.Struct(format["struct_fmt"])
  174. self.file_size = self.packer.size * self.rows_per_file
  175. # Find nrows
  176. self.nrows = self._get_nrows()
  177. def close(self):
  178. self.mmap_open.cache_remove_all()
  179. # Internal helpers
  180. def _get_nrows(self):
  181. """Find nrows by locating the lexicographically last filename
  182. and using its size"""
  183. # Note that this just finds a 'nrows' that is guaranteed to be
  184. # greater than the row number of any piece of data that
  185. # currently exists, not necessarily all data that _ever_
  186. # existed.
  187. regex = re.compile("^[0-9a-f]{4,}$")
  188. # Find the last directory. We sort and loop through all of them,
  189. # starting with the numerically greatest, because the dirs could be
  190. # empty if something was deleted.
  191. subdirs = sorted(filter(regex.search, os.listdir(self.root)),
  192. key = lambda x: int(x, 16), reverse = True)
  193. for subdir in subdirs:
  194. # Now find the last file in that dir
  195. path = os.path.join(self.root, subdir)
  196. files = filter(regex.search, os.listdir(path))
  197. if not files: # pragma: no cover (shouldn't occur)
  198. # Empty dir: try the next one
  199. continue
  200. # Find the numerical max
  201. filename = max(files, key = lambda x: int(x, 16))
  202. offset = os.path.getsize(os.path.join(self.root, subdir, filename))
  203. # Convert to row number
  204. return self._row_from_offset(subdir, filename, offset)
  205. # No files, so no data
  206. return 0
  207. def _offset_from_row(self, row):
  208. """Return a (subdir, filename, offset, count) tuple:
  209. subdir: subdirectory for the file
  210. filename: the filename that contains the specified row
  211. offset: byte offset of the specified row within the file
  212. count: number of rows (starting at offset) that fit in the file
  213. """
  214. filenum = row // self.rows_per_file
  215. # It's OK if these format specifiers are too short; the filenames
  216. # will just get longer but will still sort correctly.
  217. dirname = sprintf("%04x", filenum // self.files_per_dir)
  218. filename = sprintf("%04x", filenum % self.files_per_dir)
  219. offset = (row % self.rows_per_file) * self.packer.size
  220. count = self.rows_per_file - (row % self.rows_per_file)
  221. return (dirname, filename, offset, count)
  222. def _row_from_offset(self, subdir, filename, offset):
  223. """Return the row number that corresponds to the given
  224. 'subdir/filename' and byte-offset within that file."""
  225. if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur
  226. raise ValueError("file offset is not a multiple of data size")
  227. filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
  228. row = (filenum * self.rows_per_file) + (offset // self.packer.size)
  229. return row
  230. # Cache open files
  231. @nilmdb.utils.lru_cache(size = fd_cache_size,
  232. keys = slice(0,3), # exclude newsize
  233. onremove = lambda x: x.close())
  234. def mmap_open(self, subdir, filename, newsize = None):
  235. """Open and map a given 'subdir/filename' (relative to self.root).
  236. Will be automatically closed when evicted from the cache.
  237. If 'newsize' is provided, the file is truncated to the given
  238. size before the mapping is returned. (Note that the LRU cache
  239. on this function means the truncate will only happen if the
  240. object isn't already cached; mmap.resize should be used too.)"""
  241. try:
  242. os.mkdir(os.path.join(self.root, subdir))
  243. except OSError:
  244. pass
  245. f = open(os.path.join(self.root, subdir, filename), "a+", 0)
  246. if newsize is not None:
  247. # mmap can't map a zero-length file, so this allows the
  248. # caller to set the filesize between file creation and
  249. # mmap.
  250. f.truncate(newsize)
  251. mm = mmap.mmap(f.fileno(), 0)
  252. return mm
  253. def mmap_open_resize(self, subdir, filename, newsize):
  254. """Open and map a given 'subdir/filename' (relative to self.root).
  255. The file is resized to the given size."""
  256. # Pass new size to mmap_open
  257. mm = self.mmap_open(subdir, filename, newsize)
  258. # In case we got a cached copy, need to call mm.resize too.
  259. mm.resize(newsize)
  260. return mm
  261. def append(self, data):
  262. """Append the data and flush it to disk.
  263. data is a nested Python list [[row],[row],[...]]"""
  264. remaining = len(data)
  265. dataiter = iter(data)
  266. while remaining:
  267. # See how many rows we can fit into the current file, and open it
  268. (subdir, fname, offset, count) = self._offset_from_row(self.nrows)
  269. if count > remaining:
  270. count = remaining
  271. newsize = offset + count * self.packer.size
  272. mm = self.mmap_open_resize(subdir, fname, newsize)
  273. mm.seek(offset)
  274. # Write the data
  275. for i in xrange(count):
  276. row = dataiter.next()
  277. mm.write(self.packer.pack(*row))
  278. remaining -= count
  279. self.nrows += count
  280. def __getitem__(self, key):
  281. """Extract data and return it. Supports simple indexing
  282. (table[n]) and range slices (table[n:m]). Returns a nested
  283. Python list [[row],[row],[...]]"""
  284. # Handle simple slices
  285. if isinstance(key, slice):
  286. # Fall back to brute force if the slice isn't simple
  287. if ((key.step is not None and key.step != 1) or
  288. key.start is None or
  289. key.stop is None or
  290. key.start >= key.stop or
  291. key.start < 0 or
  292. key.stop > self.nrows):
  293. return [ self[x] for x in xrange(*key.indices(self.nrows)) ]
  294. ret = []
  295. row = key.start
  296. remaining = key.stop - key.start
  297. while remaining:
  298. (subdir, filename, offset, count) = self._offset_from_row(row)
  299. if count > remaining:
  300. count = remaining
  301. mm = self.mmap_open(subdir, filename)
  302. for i in xrange(count):
  303. ret.append(list(self.packer.unpack_from(mm, offset)))
  304. offset += self.packer.size
  305. remaining -= count
  306. row += count
  307. return ret
  308. # Handle single points
  309. if key < 0 or key >= self.nrows:
  310. raise IndexError("Index out of range")
  311. (subdir, filename, offset, count) = self._offset_from_row(key)
  312. mm = self.mmap_open(subdir, filename)
  313. # unpack_from ignores the mmap object's current seek position
  314. return list(self.packer.unpack_from(mm, offset))
  315. def _remove_rows(self, subdir, filename, start, stop):
  316. """Helper to mark specific rows as being removed from a
  317. file, and potentially removing or truncating the file itself."""
  318. # Import an existing list of deleted rows for this file
  319. datafile = os.path.join(self.root, subdir, filename)
  320. cachefile = datafile + ".removed"
  321. try:
  322. with open(cachefile, "rb") as f:
  323. ranges = pickle.load(f)
  324. cachefile_present = True
  325. except:
  326. ranges = []
  327. cachefile_present = False
  328. # Append our new range and sort
  329. ranges.append((start, stop))
  330. ranges.sort()
  331. # Merge adjacent ranges into "out"
  332. merged = []
  333. prev = None
  334. for new in ranges:
  335. if prev is None:
  336. # No previous range, so remember this one
  337. prev = new
  338. elif prev[1] == new[0]:
  339. # Previous range connected to this new one; extend prev
  340. prev = (prev[0], new[1])
  341. else:
  342. # Not connected; append previous and start again
  343. merged.append(prev)
  344. prev = new
  345. if prev is not None:
  346. merged.append(prev)
  347. # If the range covered the whole file, we can delete it now.
  348. # Note that the last file in a table may be only partially
  349. # full (smaller than self.rows_per_file). We purposely leave
  350. # those files around rather than deleting them, because the
  351. # remainder will be filled on a subsequent append(), and things
  352. # are generally easier if we don't have to special-case that.
  353. if (len(merged) == 1 and
  354. merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
  355. # Close potentially open file in mmap_open LRU cache
  356. self.mmap_open.cache_remove(self, subdir, filename)
  357. # Delete files
  358. os.remove(datafile)
  359. if cachefile_present:
  360. os.remove(cachefile)
  361. # Try deleting subdir, too
  362. try:
  363. os.rmdir(os.path.join(self.root, subdir))
  364. except:
  365. pass
  366. else:
  367. # Update cache. Try to do it atomically.
  368. nilmdb.utils.atomic.replace_file(cachefile,
  369. pickle.dumps(merged, 2))
  370. def remove(self, start, stop):
  371. """Remove specified rows [start, stop) from this table.
  372. If a file is left empty, it is fully removed. Otherwise, a
  373. parallel data file is used to remember which rows have been
  374. removed, and the file is otherwise untouched."""
  375. if start < 0 or start > stop or stop > self.nrows:
  376. raise IndexError("Index out of range")
  377. row = start
  378. remaining = stop - start
  379. while remaining:
  380. # Loop through each file that we need to touch
  381. (subdir, filename, offset, count) = self._offset_from_row(row)
  382. if count > remaining:
  383. count = remaining
  384. row_offset = offset // self.packer.size
  385. # Mark the rows as being removed
  386. self._remove_rows(subdir, filename, row_offset, row_offset + count)
  387. remaining -= count
  388. row += count
  389. class TimestampOnlyTable(object):
  390. """Helper that lets us pass a Tables object into bisect, by
  391. returning only the timestamp when a particular row is requested."""
  392. def __init__(self, table):
  393. self.table = table
  394. def __getitem__(self, index):
  395. return self.table[index][0]