You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

595 lines
23 KiB

  1. # Fixed record size bulk data storage
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the parent nilmdb module instead.
  4. from __future__ import absolute_import
  5. from __future__ import division
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import timestamp_to_string as timestamp_to_string
  8. import nilmdb.utils
  9. import os
  10. import cPickle as pickle
  11. import re
  12. import sys
  13. import tempfile
  14. from . import rocket
  15. # Up to 256 open file descriptors at any given time.
  16. # These variables are global so they can be used in the decorator arguments.
  17. table_cache_size = 16
  18. fd_cache_size = 16
  19. @nilmdb.utils.must_close(wrap_verify = False)
  20. class BulkData(object):
  21. def __init__(self, basepath, **kwargs):
  22. self.basepath = basepath
  23. self.root = os.path.join(self.basepath, "data")
  24. # Tuneables
  25. if "file_size" in kwargs:
  26. self.file_size = kwargs["file_size"]
  27. else:
  28. # Default to approximately 128 MiB per file
  29. self.file_size = 128 * 1024 * 1024
  30. if "files_per_dir" in kwargs:
  31. self.files_per_dir = kwargs["files_per_dir"]
  32. else:
  33. # 32768 files per dir should work even on FAT32
  34. self.files_per_dir = 32768
  35. # Make root path
  36. if not os.path.isdir(self.root):
  37. os.mkdir(self.root)
  38. def close(self):
  39. self.getnode.cache_remove_all()
  40. def _encode_filename(self, path):
  41. # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
  42. # because we want to be able to represent all code points and the user
  43. # will never be directly exposed to filenames. We can then do path
  44. # manipulations on the UTF-8 directly.
  45. if isinstance(path, unicode):
  46. return path.encode('utf-8')
  47. return path
  48. def _create_check_ospath(self, ospath):
  49. if ospath[-1] == '/':
  50. raise ValueError("invalid path; should not end with a /")
  51. if Table.exists(ospath):
  52. raise ValueError("stream already exists at this path")
  53. if os.path.isdir(ospath):
  54. raise ValueError("subdirs of this path already exist")
  55. def _create_parents(self, unicodepath):
  56. """Verify the path name, and create parent directories if they
  57. don't exist. Returns a list of elements that got created."""
  58. path = self._encode_filename(unicodepath)
  59. if path[0] != '/':
  60. raise ValueError("paths must start with /")
  61. [ group, node ] = path.rsplit("/", 1)
  62. if group == '':
  63. raise ValueError("invalid path; path must contain at least one "
  64. "folder")
  65. if node == '':
  66. raise ValueError("invalid path; should not end with a /")
  67. if not Table.valid_path(path):
  68. raise ValueError("path name is invalid or contains reserved words")
  69. # Create the table's base dir. Note that we make a
  70. # distinction here between NilmDB paths (always Unix style,
  71. # split apart manually) and OS paths (built up with
  72. # os.path.join)
  73. # Make directories leading up to this one
  74. elements = path.lstrip('/').split('/')
  75. made_dirs = []
  76. try:
  77. # Make parent elements
  78. for i in range(len(elements)):
  79. ospath = os.path.join(self.root, *elements[0:i])
  80. if Table.exists(ospath):
  81. raise ValueError("path is subdir of existing node")
  82. if not os.path.isdir(ospath):
  83. os.mkdir(ospath)
  84. made_dirs.append(ospath)
  85. except Exception as e:
  86. # Try to remove paths that we created; ignore errors
  87. exc_info = sys.exc_info()
  88. for ospath in reversed(made_dirs): # pragma: no cover (hard to hit)
  89. try:
  90. os.rmdir(ospath)
  91. except OSError:
  92. pass
  93. raise exc_info[1], None, exc_info[2]
  94. return elements
  95. def create(self, unicodepath, layout_name):
  96. """
  97. unicodepath: path to the data (e.g. u'/newton/prep').
  98. Paths must contain at least two elements, e.g.:
  99. /newton/prep
  100. /newton/raw
  101. /newton/upstairs/prep
  102. /newton/upstairs/raw
  103. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  104. """
  105. elements = self._create_parents(unicodepath)
  106. # Make the final dir
  107. ospath = os.path.join(self.root, *elements)
  108. self._create_check_ospath(ospath)
  109. os.mkdir(ospath)
  110. try:
  111. # Write format string to file
  112. Table.create(ospath, layout_name, self.file_size,
  113. self.files_per_dir)
  114. # Open and cache it
  115. self.getnode(unicodepath)
  116. except:
  117. exc_info = sys.exc_info()
  118. try:
  119. os.rmdir(ospath)
  120. except OSError:
  121. pass
  122. raise exc_info[1], None, exc_info[2]
  123. # Success
  124. return
  125. def _remove_leaves(self, unicodepath):
  126. """Remove empty directories starting at the leaves of unicodepath"""
  127. path = self._encode_filename(unicodepath)
  128. elements = path.lstrip('/').split('/')
  129. for i in reversed(range(len(elements))):
  130. ospath = os.path.join(self.root, *elements[0:i+1])
  131. try:
  132. os.rmdir(ospath)
  133. except OSError:
  134. pass
  135. def rename(self, oldunicodepath, newunicodepath):
  136. """Move entire tree from 'oldunicodepath' to
  137. 'newunicodepath'"""
  138. oldpath = self._encode_filename(oldunicodepath)
  139. newpath = self._encode_filename(newunicodepath)
  140. # Get OS paths
  141. oldelements = oldpath.lstrip('/').split('/')
  142. oldospath = os.path.join(self.root, *oldelements)
  143. newelements = newpath.lstrip('/').split('/')
  144. newospath = os.path.join(self.root, *newelements)
  145. # Basic checks
  146. if oldospath == newospath:
  147. raise ValueError("old and new paths are the same")
  148. self._create_check_ospath(newospath)
  149. # Move the table to a temporary location
  150. tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
  151. tmppath = os.path.join(tmpdir, "table")
  152. os.rename(oldospath, tmppath)
  153. try:
  154. # Create parent dirs for new location
  155. self._create_parents(newunicodepath)
  156. # Move table into new location
  157. os.rename(tmppath, newospath)
  158. except Exception:
  159. # On failure, move the table back to original path
  160. os.rename(tmppath, oldospath)
  161. os.rmdir(tmpdir)
  162. raise
  163. # Prune old dirs
  164. self._remove_leaves(oldunicodepath)
  165. os.rmdir(tmpdir)
  166. def destroy(self, unicodepath):
  167. """Fully remove all data at a particular path. No way to undo
  168. it! The group/path structure is removed, too."""
  169. path = self._encode_filename(unicodepath)
  170. # Get OS path
  171. elements = path.lstrip('/').split('/')
  172. ospath = os.path.join(self.root, *elements)
  173. # Remove Table object from cache
  174. self.getnode.cache_remove(self, unicodepath)
  175. # Remove the contents of the target directory
  176. if not Table.exists(ospath):
  177. raise ValueError("nothing at that path")
  178. for (root, dirs, files) in os.walk(ospath, topdown = False):
  179. for name in files:
  180. os.remove(os.path.join(root, name))
  181. for name in dirs:
  182. os.rmdir(os.path.join(root, name))
  183. # Remove leftover empty directories
  184. self._remove_leaves(unicodepath)
  185. # Cache open tables
  186. @nilmdb.utils.lru_cache(size = table_cache_size,
  187. onremove = lambda x: x.close())
  188. def getnode(self, unicodepath):
  189. """Return a Table object corresponding to the given database
  190. path, which must exist."""
  191. path = self._encode_filename(unicodepath)
  192. elements = path.lstrip('/').split('/')
  193. ospath = os.path.join(self.root, *elements)
  194. return Table(ospath)
  195. @nilmdb.utils.must_close(wrap_verify = False)
  196. class Table(object):
  197. """Tools to help access a single table (data at a specific OS path)."""
  198. # See design.md for design details
  199. # Class methods, to help keep format details in this class.
  200. @classmethod
  201. def valid_path(cls, root):
  202. """Return True if a root path is a valid name"""
  203. return "_format" not in root.split("/")
  204. @classmethod
  205. def exists(cls, root):
  206. """Return True if a table appears to exist at this OS path"""
  207. return os.path.isfile(os.path.join(root, "_format"))
  208. @classmethod
  209. def create(cls, root, layout, file_size, files_per_dir):
  210. """Initialize a table at the given OS path with the
  211. given layout string"""
  212. # Calculate rows per file so that each file is approximately
  213. # file_size bytes.
  214. rkt = rocket.Rocket(layout, None)
  215. rows_per_file = max(file_size // rkt.binary_size, 1)
  216. rkt.close()
  217. fmt = { "rows_per_file": rows_per_file,
  218. "files_per_dir": files_per_dir,
  219. "layout": layout,
  220. "version": 3 }
  221. with open(os.path.join(root, "_format"), "wb") as f:
  222. pickle.dump(fmt, f, 2)
  223. # Normal methods
  224. def __init__(self, root):
  225. """'root' is the full OS path to the directory of this table"""
  226. self.root = root
  227. # Load the format
  228. with open(os.path.join(self.root, "_format"), "rb") as f:
  229. fmt = pickle.load(f)
  230. if fmt["version"] != 3: # pragma: no cover
  231. # Old versions used floating point timestamps, which aren't
  232. # valid anymore.
  233. raise NotImplementedError("old version " + str(fmt["version"]) +
  234. " bulk data store is not supported")
  235. self.rows_per_file = fmt["rows_per_file"]
  236. self.files_per_dir = fmt["files_per_dir"]
  237. self.layout = fmt["layout"]
  238. # Use rocket to get row size and file size
  239. rkt = rocket.Rocket(self.layout, None)
  240. self.row_size = rkt.binary_size
  241. self.file_size = rkt.binary_size * self.rows_per_file
  242. rkt.close()
  243. # Find nrows
  244. self.nrows = self._get_nrows()
  245. def close(self):
  246. self.file_open.cache_remove_all()
  247. # Internal helpers
  248. def _get_nrows(self):
  249. """Find nrows by locating the lexicographically last filename
  250. and using its size"""
  251. # Note that this just finds a 'nrows' that is guaranteed to be
  252. # greater than the row number of any piece of data that
  253. # currently exists, not necessarily all data that _ever_
  254. # existed.
  255. regex = re.compile("^[0-9a-f]{4,}$")
  256. # Find the last directory. We sort and loop through all of them,
  257. # starting with the numerically greatest, because the dirs could be
  258. # empty if something was deleted.
  259. subdirs = sorted(filter(regex.search, os.listdir(self.root)),
  260. key = lambda x: int(x, 16), reverse = True)
  261. for subdir in subdirs:
  262. # Now find the last file in that dir
  263. path = os.path.join(self.root, subdir)
  264. files = filter(regex.search, os.listdir(path))
  265. if not files: # pragma: no cover (shouldn't occur)
  266. # Empty dir: try the next one
  267. continue
  268. # Find the numerical max
  269. filename = max(files, key = lambda x: int(x, 16))
  270. offset = os.path.getsize(os.path.join(self.root, subdir, filename))
  271. # Convert to row number
  272. return self._row_from_offset(subdir, filename, offset)
  273. # No files, so no data
  274. return 0
  275. def _offset_from_row(self, row):
  276. """Return a (subdir, filename, offset, count) tuple:
  277. subdir: subdirectory for the file
  278. filename: the filename that contains the specified row
  279. offset: byte offset of the specified row within the file
  280. count: number of rows (starting at offset) that fit in the file
  281. """
  282. filenum = row // self.rows_per_file
  283. # It's OK if these format specifiers are too short; the filenames
  284. # will just get longer but will still sort correctly.
  285. dirname = sprintf("%04x", filenum // self.files_per_dir)
  286. filename = sprintf("%04x", filenum % self.files_per_dir)
  287. offset = (row % self.rows_per_file) * self.row_size
  288. count = self.rows_per_file - (row % self.rows_per_file)
  289. return (dirname, filename, offset, count)
  290. def _row_from_offset(self, subdir, filename, offset):
  291. """Return the row number that corresponds to the given
  292. 'subdir/filename' and byte-offset within that file."""
  293. if (offset % self.row_size) != 0: # pragma: no cover
  294. # this shouldn't occur, unless there is some corruption somewhere
  295. raise ValueError("file offset is not a multiple of data size")
  296. filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
  297. row = (filenum * self.rows_per_file) + (offset // self.row_size)
  298. return row
  299. def _remove_or_truncate_file(self, subdir, filename, offset = 0):
  300. """Remove the given file, and remove the subdirectory too
  301. if it's empty. If offset is nonzero, truncate the file
  302. to that size instead."""
  303. # Close potentially open file in file_open LRU cache
  304. self.file_open.cache_remove(self, subdir, filename)
  305. if offset:
  306. # Truncate it
  307. with open(os.path.join(self.root, subdir, filename), "r+b") as f:
  308. f.truncate(offset)
  309. else:
  310. # Remove file
  311. os.remove(os.path.join(self.root, subdir, filename))
  312. # Try deleting subdir, too
  313. try:
  314. os.rmdir(os.path.join(self.root, subdir))
  315. except:
  316. pass
  317. # Cache open files
  318. @nilmdb.utils.lru_cache(size = fd_cache_size,
  319. onremove = lambda f: f.close())
  320. def file_open(self, subdir, filename):
  321. """Open and map a given 'subdir/filename' (relative to self.root).
  322. Will be automatically closed when evicted from the cache."""
  323. # Create path if it doesn't exist
  324. try:
  325. os.mkdir(os.path.join(self.root, subdir))
  326. except OSError:
  327. pass
  328. # Return a rocket.Rocket object, which contains the open file
  329. return rocket.Rocket(self.layout,
  330. os.path.join(self.root, subdir, filename))
  331. def append_string(self, data, start, end):
  332. """Parse the formatted string in 'data', according to the
  333. current layout, and append it to the table. If any timestamps
  334. are non-monotonic, or don't fall between 'start' and 'end',
  335. a ValueError is raised.
  336. If this function succeeds, it returns normally. Otherwise,
  337. the table is reverted back to its original state by truncating
  338. or deleting files as necessary."""
  339. data_offset = 0
  340. last_timestamp = nilmdb.utils.time.min_timestamp
  341. tot_rows = self.nrows
  342. count = 0
  343. linenum = 0
  344. try:
  345. while data_offset < len(data):
  346. # See how many rows we can fit into the current file,
  347. # and open it
  348. (subdir, fname, offset, count) = self._offset_from_row(tot_rows)
  349. f = self.file_open(subdir, fname)
  350. # Ask the rocket object to parse and append up to "count"
  351. # rows of data, verifying things along the way.
  352. try:
  353. (added_rows, data_offset, last_timestamp, linenum
  354. ) = f.append_string(count, data, data_offset, linenum,
  355. start, end, last_timestamp)
  356. except rocket.ParseError as e:
  357. (linenum, colnum, errtype, obj) = e.args
  358. where = "line %d, column %d: " % (linenum, colnum)
  359. # Extract out the error line, add column marker
  360. try:
  361. bad = data.splitlines()[linenum-1]
  362. badptr = ' ' * (colnum - 1) + '^'
  363. except IndexError: # pragma: no cover
  364. bad = ""
  365. if errtype == rocket.ERR_NON_MONOTONIC:
  366. err = "timestamp is not monotonically increasing"
  367. elif errtype == rocket.ERR_OUT_OF_INTERVAL:
  368. if obj < start:
  369. err = sprintf("Data timestamp %s < start time %s",
  370. timestamp_to_string(obj),
  371. timestamp_to_string(start))
  372. else:
  373. err = sprintf("Data timestamp %s >= end time %s",
  374. timestamp_to_string(obj),
  375. timestamp_to_string(end))
  376. else:
  377. err = str(obj)
  378. raise ValueError("error parsing input data: " +
  379. where + err + "\n" + bad + "\n" + badptr)
  380. tot_rows += added_rows
  381. except Exception:
  382. # Some failure, so try to roll things back by truncating or
  383. # deleting files that we may have appended data to.
  384. cleanpos = self.nrows
  385. while cleanpos <= tot_rows:
  386. (subdir, fname, offset, count) = self._offset_from_row(cleanpos)
  387. self._remove_or_truncate_file(subdir, fname, offset)
  388. cleanpos += count
  389. # Re-raise original exception
  390. raise
  391. else:
  392. # Success, so update self.nrows accordingly
  393. self.nrows = tot_rows
  394. def get_data(self, start, stop):
  395. """Extract data corresponding to Python range [n:m],
  396. and returns a formatted string"""
  397. if (start is None or
  398. stop is None or
  399. start > stop or
  400. start < 0 or
  401. stop > self.nrows):
  402. raise IndexError("Index out of range")
  403. ret = []
  404. row = start
  405. remaining = stop - start
  406. while remaining > 0:
  407. (subdir, filename, offset, count) = self._offset_from_row(row)
  408. if count > remaining:
  409. count = remaining
  410. f = self.file_open(subdir, filename)
  411. ret.append(f.extract_string(offset, count))
  412. remaining -= count
  413. row += count
  414. return "".join(ret)
  415. def get_timestamp(self, row):
  416. """Return the timestamp corresponding to the given row"""
  417. if (row is None or row < 0 or row >= self.nrows):
  418. raise IndexError("Index out of range")
  419. return int(self[row].split(' ')[0])
  420. def __getitem__(self, key):
  421. """Extract data and return it. Supports simple indexing
  422. (table[n]) and range slices (table[n:m]). For simple
  423. indexing, return 'row\n'. For ranges, return a list of rows
  424. ['row\n','row\n'].
  425. This is inefficient; use .get_data instead."""
  426. if isinstance(key, slice):
  427. return [ self[x] for x in xrange(*key.indices(self.nrows)) ]
  428. return self.get_data(key, key+1)
  429. def _remove_rows(self, subdir, filename, start, stop):
  430. """Helper to mark specific rows as being removed from a
  431. file, and potentially remove or truncate the file itself."""
  432. # Close potentially open file in file_open LRU cache
  433. self.file_open.cache_remove(self, subdir, filename)
  434. # We keep a file like 0000.removed that contains a list of
  435. # which rows have been "removed". Note that we never have to
  436. # remove entries from this list, because we never decrease
  437. # self.nrows, and so we will never overwrite those locations in the
  438. # file. Only when the list covers the entire extent of the
  439. # file will that file be removed.
  440. datafile = os.path.join(self.root, subdir, filename)
  441. cachefile = datafile + ".removed"
  442. try:
  443. with open(cachefile, "rb") as f:
  444. ranges = pickle.load(f)
  445. cachefile_present = True
  446. except:
  447. ranges = []
  448. cachefile_present = False
  449. # Append our new range and sort
  450. ranges.append((start, stop))
  451. ranges.sort()
  452. # Merge adjacent ranges into "out"
  453. merged = []
  454. prev = None
  455. for new in ranges:
  456. if prev is None:
  457. # No previous range, so remember this one
  458. prev = new
  459. elif prev[1] == new[0]:
  460. # Previous range connected to this new one; extend prev
  461. prev = (prev[0], new[1])
  462. else:
  463. # Not connected; append previous and start again
  464. merged.append(prev)
  465. prev = new
  466. if prev is not None:
  467. merged.append(prev)
  468. # If the range covered the whole file, we can delete it now.
  469. # Note that the last file in a table may be only partially
  470. # full (smaller than self.rows_per_file). We purposely leave
  471. # those files around rather than deleting them, because the
  472. # remainder will be filled on a subsequent append(), and things
  473. # are generally easier if we don't have to special-case that.
  474. if (len(merged) == 1 and
  475. merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
  476. # Delete files
  477. if cachefile_present:
  478. os.remove(cachefile)
  479. self._remove_or_truncate_file(subdir, filename, 0)
  480. else:
  481. # File needs to stick around. This means we can get
  482. # degenerate cases where we have large files containing as
  483. # little as one row. Try to punch a hole in the file,
  484. # so that this region doesn't take up filesystem space.
  485. offset = start * self.row_size
  486. count = (stop - start) * self.row_size
  487. nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
  488. # Update cache. Try to do it atomically.
  489. nilmdb.utils.atomic.replace_file(cachefile,
  490. pickle.dumps(merged, 2))
  491. def remove(self, start, stop):
  492. """Remove specified rows [start, stop) from this table.
  493. If a file is left empty, it is fully removed. Otherwise, a
  494. parallel data file is used to remember which rows have been
  495. removed, and the file is otherwise untouched."""
  496. if start < 0 or start > stop or stop > self.nrows:
  497. raise IndexError("Index out of range")
  498. row = start
  499. remaining = stop - start
  500. while remaining:
  501. # Loop through each file that we need to touch
  502. (subdir, filename, offset, count) = self._offset_from_row(row)
  503. if count > remaining:
  504. count = remaining
  505. row_offset = offset // self.row_size
  506. # Mark the rows as being removed
  507. self._remove_rows(subdir, filename, row_offset, row_offset + count)
  508. remaining -= count
  509. row += count
  510. class TimestampOnlyTable(object):
  511. """Helper that lets us pass a Tables object into bisect, by
  512. returning only the timestamp when a particular row is requested."""
  513. def __init__(self, table):
  514. self.table = table
  515. def __getitem__(self, index):
  516. return self.table.get_timestamp(index)