You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

635 lines
25 KiB

  1. # Fixed record size bulk data storage
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the parent nilmdb module instead.
  4. from __future__ import absolute_import
  5. from __future__ import division
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import timestamp_to_string as timestamp_to_string
  8. import nilmdb.utils
  9. import os
  10. import cPickle as pickle
  11. import re
  12. import sys
  13. import tempfile
  14. import nilmdb.utils.lock
  15. from . import rocket
  16. # Up to 256 open file descriptors at any given time.
  17. # These variables are global so they can be used in the decorator arguments.
  18. table_cache_size = 32
  19. fd_cache_size = 8
  20. @nilmdb.utils.must_close(wrap_verify = False)
  21. class BulkData(object):
  22. def __init__(self, basepath, **kwargs):
  23. self.basepath = basepath
  24. self.root = os.path.join(self.basepath, "data")
  25. self.lock = self.root + ".lock"
  26. self.lockfile = None
  27. # Tuneables
  28. if "file_size" in kwargs:
  29. self.file_size = kwargs["file_size"]
  30. else:
  31. # Default to approximately 128 MiB per file
  32. self.file_size = 128 * 1024 * 1024
  33. if "files_per_dir" in kwargs:
  34. self.files_per_dir = kwargs["files_per_dir"]
  35. else:
  36. # 32768 files per dir should work even on FAT32
  37. self.files_per_dir = 32768
  38. if "initial_nrows" in kwargs:
  39. self.initial_nrows = kwargs["initial_nrows"]
  40. else:
  41. # First row is 0
  42. self.initial_nrows = 0
  43. # Make root path
  44. if not os.path.isdir(self.root):
  45. os.mkdir(self.root)
  46. # Create the lock
  47. self.lockfile = open(self.lock, "w")
  48. if not nilmdb.utils.lock.exclusive_lock(self.lockfile):
  49. raise IOError('database at "' + self.basepath +
  50. '" is already locked by another process')
  51. def close(self):
  52. self.getnode.cache_remove_all()
  53. if self.lockfile:
  54. nilmdb.utils.lock.exclusive_unlock(self.lockfile)
  55. self.lockfile.close()
  56. try:
  57. os.unlink(self.lock)
  58. except OSError: # pragma: no cover
  59. pass
  60. self.lockfile = None
  61. def _encode_filename(self, path):
  62. # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
  63. # because we want to be able to represent all code points and the user
  64. # will never be directly exposed to filenames. We can then do path
  65. # manipulations on the UTF-8 directly.
  66. if isinstance(path, unicode):
  67. return path.encode('utf-8')
  68. return path
  69. def _create_check_ospath(self, ospath):
  70. if ospath[-1] == '/':
  71. raise ValueError("invalid path; should not end with a /")
  72. if Table.exists(ospath):
  73. raise ValueError("stream already exists at this path")
  74. if os.path.isdir(ospath):
  75. # Look for any files in subdirectories. Fully empty subdirectories
  76. # are OK; they might be there during a rename
  77. for (root, dirs, files) in os.walk(ospath):
  78. if len(files):
  79. raise ValueError(
  80. "non-empty subdirs of this path already exist")
  81. def _create_parents(self, unicodepath):
  82. """Verify the path name, and create parent directories if they
  83. don't exist. Returns a list of elements that got created."""
  84. path = self._encode_filename(unicodepath)
  85. if path[0] != '/':
  86. raise ValueError("paths must start with /")
  87. [ group, node ] = path.rsplit("/", 1)
  88. if group == '':
  89. raise ValueError("invalid path; path must contain at least one "
  90. "folder")
  91. if node == '':
  92. raise ValueError("invalid path; should not end with a /")
  93. if not Table.valid_path(path):
  94. raise ValueError("path name is invalid or contains reserved words")
  95. # Create the table's base dir. Note that we make a
  96. # distinction here between NilmDB paths (always Unix style,
  97. # split apart manually) and OS paths (built up with
  98. # os.path.join)
  99. # Make directories leading up to this one
  100. elements = path.lstrip('/').split('/')
  101. made_dirs = []
  102. try:
  103. # Make parent elements
  104. for i in range(len(elements)):
  105. ospath = os.path.join(self.root, *elements[0:i])
  106. if Table.exists(ospath):
  107. raise ValueError("path is subdir of existing node")
  108. if not os.path.isdir(ospath):
  109. os.mkdir(ospath)
  110. made_dirs.append(ospath)
  111. except Exception as e:
  112. # Try to remove paths that we created; ignore errors
  113. exc_info = sys.exc_info()
  114. for ospath in reversed(made_dirs): # pragma: no cover (hard to hit)
  115. try:
  116. os.rmdir(ospath)
  117. except OSError:
  118. pass
  119. raise exc_info[1], None, exc_info[2]
  120. return elements
  121. def create(self, unicodepath, layout_name):
  122. """
  123. unicodepath: path to the data (e.g. u'/newton/prep').
  124. Paths must contain at least two elements, e.g.:
  125. /newton/prep
  126. /newton/raw
  127. /newton/upstairs/prep
  128. /newton/upstairs/raw
  129. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  130. """
  131. elements = self._create_parents(unicodepath)
  132. # Make the final dir
  133. ospath = os.path.join(self.root, *elements)
  134. self._create_check_ospath(ospath)
  135. os.mkdir(ospath)
  136. try:
  137. # Write format string to file
  138. Table.create(ospath, layout_name, self.file_size,
  139. self.files_per_dir)
  140. # Open and cache it
  141. self.getnode(unicodepath)
  142. except Exception:
  143. exc_info = sys.exc_info()
  144. try:
  145. os.rmdir(ospath)
  146. except OSError:
  147. pass
  148. raise exc_info[1], None, exc_info[2]
  149. # Success
  150. return
  151. def _remove_leaves(self, unicodepath):
  152. """Remove empty directories starting at the leaves of unicodepath"""
  153. path = self._encode_filename(unicodepath)
  154. elements = path.lstrip('/').split('/')
  155. for i in reversed(range(len(elements))):
  156. ospath = os.path.join(self.root, *elements[0:i+1])
  157. try:
  158. os.rmdir(ospath)
  159. except OSError:
  160. pass
  161. def rename(self, oldunicodepath, newunicodepath):
  162. """Move entire tree from 'oldunicodepath' to
  163. 'newunicodepath'"""
  164. oldpath = self._encode_filename(oldunicodepath)
  165. newpath = self._encode_filename(newunicodepath)
  166. # Get OS paths
  167. oldelements = oldpath.lstrip('/').split('/')
  168. oldospath = os.path.join(self.root, *oldelements)
  169. newelements = newpath.lstrip('/').split('/')
  170. newospath = os.path.join(self.root, *newelements)
  171. # Basic checks
  172. if oldospath == newospath:
  173. raise ValueError("old and new paths are the same")
  174. # Remove Table object at old path from cache
  175. self.getnode.cache_remove(self, oldunicodepath)
  176. # Move the table to a temporary location
  177. tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
  178. tmppath = os.path.join(tmpdir, "table")
  179. os.rename(oldospath, tmppath)
  180. try:
  181. # Check destination path
  182. self._create_check_ospath(newospath)
  183. # Create parent dirs for new location
  184. self._create_parents(newunicodepath)
  185. # Move table into new location
  186. os.rename(tmppath, newospath)
  187. except Exception:
  188. # On failure, move the table back to original path
  189. os.rename(tmppath, oldospath)
  190. os.rmdir(tmpdir)
  191. raise
  192. # Prune old dirs
  193. self._remove_leaves(oldunicodepath)
  194. os.rmdir(tmpdir)
  195. def destroy(self, unicodepath):
  196. """Fully remove all data at a particular path. No way to undo
  197. it! The group/path structure is removed, too."""
  198. path = self._encode_filename(unicodepath)
  199. # Get OS path
  200. elements = path.lstrip('/').split('/')
  201. ospath = os.path.join(self.root, *elements)
  202. # Remove Table object from cache
  203. self.getnode.cache_remove(self, unicodepath)
  204. # Remove the contents of the target directory
  205. if not Table.exists(ospath):
  206. raise ValueError("nothing at that path")
  207. for (root, dirs, files) in os.walk(ospath, topdown = False):
  208. for name in files:
  209. os.remove(os.path.join(root, name))
  210. for name in dirs:
  211. os.rmdir(os.path.join(root, name))
  212. # Remove leftover empty directories
  213. self._remove_leaves(unicodepath)
  214. # Cache open tables
  215. @nilmdb.utils.lru_cache(size = table_cache_size,
  216. onremove = lambda x: x.close())
  217. def getnode(self, unicodepath):
  218. """Return a Table object corresponding to the given database
  219. path, which must exist."""
  220. path = self._encode_filename(unicodepath)
  221. elements = path.lstrip('/').split('/')
  222. ospath = os.path.join(self.root, *elements)
  223. return Table(ospath, self.initial_nrows)
  224. @nilmdb.utils.must_close(wrap_verify = False)
  225. class Table(object):
  226. """Tools to help access a single table (data at a specific OS path)."""
  227. # See design.md for design details
  228. # Class methods, to help keep format details in this class.
  229. @classmethod
  230. def valid_path(cls, root):
  231. """Return True if a root path is a valid name"""
  232. return "_format" not in root.split("/")
  233. @classmethod
  234. def exists(cls, root):
  235. """Return True if a table appears to exist at this OS path"""
  236. return os.path.isfile(os.path.join(root, "_format"))
  237. @classmethod
  238. def create(cls, root, layout, file_size, files_per_dir):
  239. """Initialize a table at the given OS path with the
  240. given layout string"""
  241. # Calculate rows per file so that each file is approximately
  242. # file_size bytes.
  243. rkt = rocket.Rocket(layout, None)
  244. rows_per_file = max(file_size // rkt.binary_size, 1)
  245. rkt.close()
  246. fmt = { "rows_per_file": rows_per_file,
  247. "files_per_dir": files_per_dir,
  248. "layout": layout,
  249. "version": 3 }
  250. with open(os.path.join(root, "_format"), "wb") as f:
  251. pickle.dump(fmt, f, 2)
  252. # Normal methods
  253. def __init__(self, root, initial_nrows):
  254. """'root' is the full OS path to the directory of this table"""
  255. self.root = root
  256. self.initial_nrows = initial_nrows
  257. # Load the format
  258. with open(os.path.join(self.root, "_format"), "rb") as f:
  259. fmt = pickle.load(f)
  260. if fmt["version"] != 3: # pragma: no cover
  261. # Old versions used floating point timestamps, which aren't
  262. # valid anymore.
  263. raise NotImplementedError("old version " + str(fmt["version"]) +
  264. " bulk data store is not supported")
  265. self.rows_per_file = fmt["rows_per_file"]
  266. self.files_per_dir = fmt["files_per_dir"]
  267. self.layout = fmt["layout"]
  268. # Use rocket to get row size and file size
  269. rkt = rocket.Rocket(self.layout, None)
  270. self.row_size = rkt.binary_size
  271. self.file_size = rkt.binary_size * self.rows_per_file
  272. rkt.close()
  273. # Find nrows
  274. self.nrows = self._get_nrows()
  275. def close(self):
  276. self.file_open.cache_remove_all()
  277. # Internal helpers
  278. def _get_nrows(self):
  279. """Find nrows by locating the lexicographically last filename
  280. and using its size"""
  281. # Note that this just finds a 'nrows' that is guaranteed to be
  282. # greater than the row number of any piece of data that
  283. # currently exists, not necessarily all data that _ever_
  284. # existed.
  285. regex = re.compile("^[0-9a-f]{4,}$")
  286. # Find the last directory. We sort and loop through all of them,
  287. # starting with the numerically greatest, because the dirs could be
  288. # empty if something was deleted but the directory was unexpectedly
  289. # not deleted.
  290. subdirs = sorted(filter(regex.search, os.listdir(self.root)),
  291. key = lambda x: int(x, 16), reverse = True)
  292. for subdir in subdirs:
  293. # Now find the last file in that dir
  294. path = os.path.join(self.root, subdir)
  295. files = filter(regex.search, os.listdir(path))
  296. if not files: # pragma: no cover (shouldn't occur)
  297. # Empty dir: try the next one
  298. continue
  299. # Find the numerical max
  300. filename = max(files, key = lambda x: int(x, 16))
  301. offset = os.path.getsize(os.path.join(self.root, subdir, filename))
  302. # Convert to row number
  303. return self._row_from_offset(subdir, filename, offset)
  304. # No files, so no data. We typically start at row 0 in this
  305. # case, although initial_nrows is specified during some tests
  306. # to exercise other parts of the code better. Since we have
  307. # no files yet, round initial_nrows up so it points to a row
  308. # that would begin a new file.
  309. nrows = ((self.initial_nrows + (self.rows_per_file - 1)) //
  310. self.rows_per_file) * self.rows_per_file
  311. return nrows
  312. def _offset_from_row(self, row):
  313. """Return a (subdir, filename, offset, count) tuple:
  314. subdir: subdirectory for the file
  315. filename: the filename that contains the specified row
  316. offset: byte offset of the specified row within the file
  317. count: number of rows (starting at offset) that fit in the file
  318. """
  319. filenum = row // self.rows_per_file
  320. # It's OK if these format specifiers are too short; the filenames
  321. # will just get longer but will still sort correctly.
  322. dirname = sprintf("%04x", filenum // self.files_per_dir)
  323. filename = sprintf("%04x", filenum % self.files_per_dir)
  324. offset = (row % self.rows_per_file) * self.row_size
  325. count = self.rows_per_file - (row % self.rows_per_file)
  326. return (dirname, filename, offset, count)
  327. def _row_from_offset(self, subdir, filename, offset):
  328. """Return the row number that corresponds to the given
  329. 'subdir/filename' and byte-offset within that file."""
  330. if (offset % self.row_size) != 0: # pragma: no cover
  331. # this shouldn't occur, unless there is some corruption somewhere
  332. raise ValueError("file offset is not a multiple of data size")
  333. filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
  334. row = (filenum * self.rows_per_file) + (offset // self.row_size)
  335. return row
  336. def _remove_or_truncate_file(self, subdir, filename, offset = 0):
  337. """Remove the given file, and remove the subdirectory too
  338. if it's empty. If offset is nonzero, truncate the file
  339. to that size instead."""
  340. # Close potentially open file in file_open LRU cache
  341. self.file_open.cache_remove(self, subdir, filename)
  342. if offset:
  343. # Truncate it
  344. with open(os.path.join(self.root, subdir, filename), "r+b") as f:
  345. f.truncate(offset)
  346. else:
  347. # Remove file
  348. os.remove(os.path.join(self.root, subdir, filename))
  349. # Try deleting subdir, too
  350. try:
  351. os.rmdir(os.path.join(self.root, subdir))
  352. except Exception:
  353. pass
  354. # Cache open files
  355. @nilmdb.utils.lru_cache(size = fd_cache_size,
  356. onremove = lambda f: f.close())
  357. def file_open(self, subdir, filename):
  358. """Open and map a given 'subdir/filename' (relative to self.root).
  359. Will be automatically closed when evicted from the cache."""
  360. # Create path if it doesn't exist
  361. try:
  362. os.mkdir(os.path.join(self.root, subdir))
  363. except OSError:
  364. pass
  365. # Return a rocket.Rocket object, which contains the open file
  366. return rocket.Rocket(self.layout,
  367. os.path.join(self.root, subdir, filename))
  368. def append_data(self, data, start, end, binary = False):
  369. """Parse the formatted string in 'data', according to the
  370. current layout, and append it to the table. If any timestamps
  371. are non-monotonic, or don't fall between 'start' and 'end',
  372. a ValueError is raised.
  373. If 'binary' is True, the data should be in raw binary format
  374. instead: little-endian, matching the current table's layout,
  375. including the int64 timestamp.
  376. If this function succeeds, it returns normally. Otherwise,
  377. the table is reverted back to its original state by truncating
  378. or deleting files as necessary."""
  379. data_offset = 0
  380. last_timestamp = nilmdb.utils.time.min_timestamp
  381. tot_rows = self.nrows
  382. count = 0
  383. linenum = 0
  384. try:
  385. while data_offset < len(data):
  386. # See how many rows we can fit into the current file,
  387. # and open it
  388. (subdir, fname, offset, count) = self._offset_from_row(tot_rows)
  389. f = self.file_open(subdir, fname)
  390. # Ask the rocket object to parse and append up to "count"
  391. # rows of data, verifying things along the way.
  392. try:
  393. if binary:
  394. appender = f.append_binary
  395. else:
  396. appender = f.append_string
  397. (added_rows, data_offset, last_timestamp, linenum
  398. ) = appender(count, data, data_offset, linenum,
  399. start, end, last_timestamp)
  400. except rocket.ParseError as e:
  401. (linenum, colnum, errtype, obj) = e.args
  402. if binary:
  403. where = "byte %d: " % (linenum)
  404. else:
  405. where = "line %d, column %d: " % (linenum, colnum)
  406. # Extract out the error line, add column marker
  407. try:
  408. if binary:
  409. raise IndexError
  410. bad = data.splitlines()[linenum-1]
  411. bad += '\n' + ' ' * (colnum - 1) + '^'
  412. except IndexError:
  413. bad = ""
  414. if errtype == rocket.ERR_NON_MONOTONIC:
  415. err = "timestamp is not monotonically increasing"
  416. elif errtype == rocket.ERR_OUT_OF_INTERVAL:
  417. if obj < start:
  418. err = sprintf("Data timestamp %s < start time %s",
  419. timestamp_to_string(obj),
  420. timestamp_to_string(start))
  421. else:
  422. err = sprintf("Data timestamp %s >= end time %s",
  423. timestamp_to_string(obj),
  424. timestamp_to_string(end))
  425. else:
  426. err = str(obj)
  427. raise ValueError("error parsing input data: " +
  428. where + err + "\n" + bad)
  429. tot_rows += added_rows
  430. except Exception:
  431. # Some failure, so try to roll things back by truncating or
  432. # deleting files that we may have appended data to.
  433. cleanpos = self.nrows
  434. while cleanpos <= tot_rows:
  435. (subdir, fname, offset, count) = self._offset_from_row(cleanpos)
  436. self._remove_or_truncate_file(subdir, fname, offset)
  437. cleanpos += count
  438. # Re-raise original exception
  439. raise
  440. else:
  441. # Success, so update self.nrows accordingly
  442. self.nrows = tot_rows
  443. def get_data(self, start, stop, binary = False):
  444. """Extract data corresponding to Python range [n:m],
  445. and returns a formatted string"""
  446. if (start is None or
  447. stop is None or
  448. start > stop or
  449. start < 0 or
  450. stop > self.nrows):
  451. raise IndexError("Index out of range")
  452. ret = []
  453. row = start
  454. remaining = stop - start
  455. while remaining > 0:
  456. (subdir, filename, offset, count) = self._offset_from_row(row)
  457. if count > remaining:
  458. count = remaining
  459. f = self.file_open(subdir, filename)
  460. if binary:
  461. ret.append(f.extract_binary(offset, count))
  462. else:
  463. ret.append(f.extract_string(offset, count))
  464. remaining -= count
  465. row += count
  466. return b"".join(ret)
  467. def __getitem__(self, row):
  468. """Extract timestamps from a row, with table[n] notation."""
  469. if row < 0 or row >= self.nrows:
  470. raise IndexError("Index out of range")
  471. (subdir, filename, offset, count) = self._offset_from_row(row)
  472. f = self.file_open(subdir, filename)
  473. return f.extract_timestamp(offset)
  474. def _remove_rows(self, subdir, filename, start, stop):
  475. """Helper to mark specific rows as being removed from a
  476. file, and potentially remove or truncate the file itself."""
  477. # Close potentially open file in file_open LRU cache
  478. self.file_open.cache_remove(self, subdir, filename)
  479. # We keep a file like 0000.removed that contains a list of
  480. # which rows have been "removed". Note that we never have to
  481. # remove entries from this list, because we never decrease
  482. # self.nrows, and so we will never overwrite those locations in the
  483. # file. Only when the list covers the entire extent of the
  484. # file will that file be removed.
  485. datafile = os.path.join(self.root, subdir, filename)
  486. cachefile = datafile + ".removed"
  487. try:
  488. with open(cachefile, "rb") as f:
  489. ranges = pickle.load(f)
  490. cachefile_present = True
  491. except Exception:
  492. ranges = []
  493. cachefile_present = False
  494. # Append our new range and sort
  495. ranges.append((start, stop))
  496. ranges.sort()
  497. # Merge adjacent ranges into "out"
  498. merged = []
  499. prev = None
  500. for new in ranges:
  501. if prev is None:
  502. # No previous range, so remember this one
  503. prev = new
  504. elif prev[1] == new[0]:
  505. # Previous range connected to this new one; extend prev
  506. prev = (prev[0], new[1])
  507. else:
  508. # Not connected; append previous and start again
  509. merged.append(prev)
  510. prev = new
  511. if prev is not None:
  512. merged.append(prev)
  513. # If the range covered the whole file, we can delete it now.
  514. # Note that the last file in a table may be only partially
  515. # full (smaller than self.rows_per_file). We purposely leave
  516. # those files around rather than deleting them, because the
  517. # remainder will be filled on a subsequent append(), and things
  518. # are generally easier if we don't have to special-case that.
  519. if (len(merged) == 1 and
  520. merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
  521. # Delete files
  522. if cachefile_present:
  523. os.remove(cachefile)
  524. self._remove_or_truncate_file(subdir, filename, 0)
  525. else:
  526. # File needs to stick around. This means we can get
  527. # degenerate cases where we have large files containing as
  528. # little as one row. Try to punch a hole in the file,
  529. # so that this region doesn't take up filesystem space.
  530. offset = start * self.row_size
  531. count = (stop - start) * self.row_size
  532. nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
  533. # Update cache. Try to do it atomically.
  534. nilmdb.utils.atomic.replace_file(cachefile,
  535. pickle.dumps(merged, 2))
  536. def remove(self, start, stop):
  537. """Remove specified rows [start, stop) from this table.
  538. If a file is left empty, it is fully removed. Otherwise, a
  539. parallel data file is used to remember which rows have been
  540. removed, and the file is otherwise untouched."""
  541. if start < 0 or start > stop or stop > self.nrows:
  542. raise IndexError("Index out of range")
  543. row = start
  544. remaining = stop - start
  545. while remaining:
  546. # Loop through each file that we need to touch
  547. (subdir, filename, offset, count) = self._offset_from_row(row)
  548. if count > remaining:
  549. count = remaining
  550. row_offset = offset // self.row_size
  551. # Mark the rows as being removed
  552. self._remove_rows(subdir, filename, row_offset, row_offset + count)
  553. remaining -= count
  554. row += count