You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

554 lines
22 KiB

  1. # Fixed record size bulk data storage
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the parent nilmdb module instead.
  4. from __future__ import absolute_import
  5. from __future__ import division
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import float_time_to_string as ftts
  8. import nilmdb.utils
  9. import os
  10. import cPickle as pickle
  11. import re
  12. import sys
  13. #from . import pyrocket as rocket
  14. from . import rocket
  15. # Up to 256 open file descriptors at any given time.
  16. # These variables are global so they can be used in the decorator arguments.
  17. table_cache_size = 16
  18. fd_cache_size = 16
  19. @nilmdb.utils.must_close(wrap_verify = False)
  20. class BulkData(object):
  21. def __init__(self, basepath, **kwargs):
  22. self.basepath = basepath
  23. self.root = os.path.join(self.basepath, "data")
  24. # Tuneables
  25. if "file_size" in kwargs:
  26. self.file_size = kwargs["file_size"]
  27. else:
  28. # Default to approximately 128 MiB per file
  29. self.file_size = 128 * 1024 * 1024
  30. if "files_per_dir" in kwargs:
  31. self.files_per_dir = kwargs["files_per_dir"]
  32. else:
  33. # 32768 files per dir should work even on FAT32
  34. self.files_per_dir = 32768
  35. # Make root path
  36. if not os.path.isdir(self.root):
  37. os.mkdir(self.root)
  38. def close(self):
  39. self.getnode.cache_remove_all()
  40. def _encode_filename(self, path):
  41. # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
  42. # because we want to be able to represent all code points and the user
  43. # will never be directly exposed to filenames. We can then do path
  44. # manipulations on the UTF-8 directly.
  45. if isinstance(path, unicode):
  46. return path.encode('utf-8')
  47. return path
  48. def create(self, unicodepath, layout_name):
  49. """
  50. unicodepath: path to the data (e.g. u'/newton/prep').
  51. Paths must contain at least two elements, e.g.:
  52. /newton/prep
  53. /newton/raw
  54. /newton/upstairs/prep
  55. /newton/upstairs/raw
  56. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  57. """
  58. path = self._encode_filename(unicodepath)
  59. if path[0] != '/':
  60. raise ValueError("paths must start with /")
  61. [ group, node ] = path.rsplit("/", 1)
  62. if group == '':
  63. raise ValueError("invalid path; path must contain at least one "
  64. "folder")
  65. # Create the table. Note that we make a distinction here
  66. # between NilmDB paths (always Unix style, split apart
  67. # manually) and OS paths (built up with os.path.join)
  68. # Make directories leading up to this one
  69. elements = path.lstrip('/').split('/')
  70. for i in range(len(elements)):
  71. ospath = os.path.join(self.root, *elements[0:i])
  72. if Table.exists(ospath):
  73. raise ValueError("path is subdir of existing node")
  74. if not os.path.isdir(ospath):
  75. os.mkdir(ospath)
  76. # Make the final dir
  77. ospath = os.path.join(self.root, *elements)
  78. if os.path.isdir(ospath):
  79. raise ValueError("subdirs of this path already exist")
  80. os.mkdir(ospath)
  81. try:
  82. # Write format string to file
  83. Table.create(ospath, layout_name, self.file_size,
  84. self.files_per_dir)
  85. # Open and cache it
  86. self.getnode(unicodepath)
  87. except:
  88. exc_info = sys.exc_info()
  89. try:
  90. os.rmdir(ospath)
  91. except OSError:
  92. pass
  93. raise exc_info[1], None, exc_info[2]
  94. # Success
  95. return
  96. def destroy(self, unicodepath):
  97. """Fully remove all data at a particular path. No way to undo
  98. it! The group/path structure is removed, too."""
  99. path = self._encode_filename(unicodepath)
  100. # Get OS path
  101. elements = path.lstrip('/').split('/')
  102. ospath = os.path.join(self.root, *elements)
  103. # Remove Table object from cache
  104. self.getnode.cache_remove(self, unicodepath)
  105. # Remove the contents of the target directory
  106. if not Table.exists(ospath):
  107. raise ValueError("nothing at that path")
  108. for (root, dirs, files) in os.walk(ospath, topdown = False):
  109. for name in files:
  110. os.remove(os.path.join(root, name))
  111. for name in dirs:
  112. os.rmdir(os.path.join(root, name))
  113. # Remove empty parent directories
  114. for i in reversed(range(len(elements))):
  115. ospath = os.path.join(self.root, *elements[0:i+1])
  116. try:
  117. os.rmdir(ospath)
  118. except OSError:
  119. break
  120. # Cache open tables
  121. @nilmdb.utils.lru_cache(size = table_cache_size,
  122. onremove = lambda x: x.close())
  123. def getnode(self, unicodepath):
  124. """Return a Table object corresponding to the given database
  125. path, which must exist."""
  126. path = self._encode_filename(unicodepath)
  127. elements = path.lstrip('/').split('/')
  128. ospath = os.path.join(self.root, *elements)
  129. return Table(ospath)
  130. @nilmdb.utils.must_close(wrap_verify = False)
  131. class Table(object):
  132. """Tools to help access a single table (data at a specific OS path)."""
  133. # See design.md for design details
  134. # Class methods, to help keep format details in this class.
  135. @classmethod
  136. def exists(cls, root):
  137. """Return True if a table appears to exist at this OS path"""
  138. return os.path.isfile(os.path.join(root, "_format"))
  139. @classmethod
  140. def create(cls, root, layout, file_size, files_per_dir):
  141. """Initialize a table at the given OS path with the
  142. given layout string"""
  143. # Calculate rows per file so that each file is approximately
  144. # file_size bytes.
  145. rkt = rocket.Rocket(layout, None)
  146. rows_per_file = max(file_size // rkt.binary_size, 1)
  147. rkt.close()
  148. fmt = { "rows_per_file": rows_per_file,
  149. "files_per_dir": files_per_dir,
  150. "layout": layout,
  151. "version": 2 }
  152. with open(os.path.join(root, "_format"), "wb") as f:
  153. pickle.dump(fmt, f, 2)
  154. # Normal methods
  155. def __init__(self, root):
  156. """'root' is the full OS path to the directory of this table"""
  157. self.root = root
  158. # Load the format
  159. with open(os.path.join(self.root, "_format"), "rb") as f:
  160. fmt = pickle.load(f)
  161. if fmt["version"] == 1: # pragma: no cover
  162. # We can handle this old version by converting from
  163. # struct_fmt back to layout name.
  164. compat = { "<dHHHHHH": "uint16_6",
  165. "<dHHHHHHHHH": "uint16_9",
  166. "<dffffffff": "float32_8" }
  167. if fmt["struct_fmt"] in compat:
  168. fmt["version"] = 2
  169. fmt["layout"] = compat[fmt["struct_fmt"]]
  170. else:
  171. raise NotImplementedError("old version 1 data with format "
  172. + fmt["struct_fmt"] + " is no good")
  173. elif fmt["version"] != 2: # pragma: no cover (just future proofing)
  174. raise NotImplementedError("version " + str(fmt["version"]) +
  175. " bulk data store not supported")
  176. self.rows_per_file = fmt["rows_per_file"]
  177. self.files_per_dir = fmt["files_per_dir"]
  178. self.layout = fmt["layout"]
  179. # Use rocket to get row size and file size
  180. rkt = rocket.Rocket(self.layout, None)
  181. self.row_size = rkt.binary_size
  182. self.file_size = rkt.binary_size * self.rows_per_file
  183. rkt.close()
  184. # Find nrows
  185. self.nrows = self._get_nrows()
  186. def close(self):
  187. self.file_open.cache_remove_all()
  188. # Internal helpers
  189. def _get_nrows(self):
  190. """Find nrows by locating the lexicographically last filename
  191. and using its size"""
  192. # Note that this just finds a 'nrows' that is guaranteed to be
  193. # greater than the row number of any piece of data that
  194. # currently exists, not necessarily all data that _ever_
  195. # existed.
  196. regex = re.compile("^[0-9a-f]{4,}$")
  197. # Find the last directory. We sort and loop through all of them,
  198. # starting with the numerically greatest, because the dirs could be
  199. # empty if something was deleted.
  200. subdirs = sorted(filter(regex.search, os.listdir(self.root)),
  201. key = lambda x: int(x, 16), reverse = True)
  202. for subdir in subdirs:
  203. # Now find the last file in that dir
  204. path = os.path.join(self.root, subdir)
  205. files = filter(regex.search, os.listdir(path))
  206. if not files: # pragma: no cover (shouldn't occur)
  207. # Empty dir: try the next one
  208. continue
  209. # Find the numerical max
  210. filename = max(files, key = lambda x: int(x, 16))
  211. offset = os.path.getsize(os.path.join(self.root, subdir, filename))
  212. # Convert to row number
  213. return self._row_from_offset(subdir, filename, offset)
  214. # No files, so no data
  215. return 0
  216. def _offset_from_row(self, row):
  217. """Return a (subdir, filename, offset, count) tuple:
  218. subdir: subdirectory for the file
  219. filename: the filename that contains the specified row
  220. offset: byte offset of the specified row within the file
  221. count: number of rows (starting at offset) that fit in the file
  222. """
  223. filenum = row // self.rows_per_file
  224. # It's OK if these format specifiers are too short; the filenames
  225. # will just get longer but will still sort correctly.
  226. dirname = sprintf("%04x", filenum // self.files_per_dir)
  227. filename = sprintf("%04x", filenum % self.files_per_dir)
  228. offset = (row % self.rows_per_file) * self.row_size
  229. count = self.rows_per_file - (row % self.rows_per_file)
  230. return (dirname, filename, offset, count)
  231. def _row_from_offset(self, subdir, filename, offset):
  232. """Return the row number that corresponds to the given
  233. 'subdir/filename' and byte-offset within that file."""
  234. if (offset % self.row_size) != 0: # pragma: no cover
  235. # this shouldn't occur, unless there is some corruption somewhere
  236. raise ValueError("file offset is not a multiple of data size")
  237. filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
  238. row = (filenum * self.rows_per_file) + (offset // self.row_size)
  239. return row
  240. def _remove_or_truncate_file(self, subdir, filename, offset = 0):
  241. """Remove the given file, and remove the subdirectory too
  242. if it's empty. If offset is nonzero, truncate the file
  243. to that size instead."""
  244. # Close potentially open file in file_open LRU cache
  245. self.file_open.cache_remove(self, subdir, filename)
  246. if offset:
  247. # Truncate it
  248. with open(os.path.join(self.root, subdir, filename), "r+b") as f:
  249. f.truncate(offset)
  250. else:
  251. # Remove file
  252. os.remove(os.path.join(self.root, subdir, filename))
  253. # Try deleting subdir, too
  254. try:
  255. os.rmdir(os.path.join(self.root, subdir))
  256. except:
  257. pass
  258. # Cache open files
  259. @nilmdb.utils.lru_cache(size = fd_cache_size,
  260. onremove = lambda f: f.close())
  261. def file_open(self, subdir, filename):
  262. """Open and map a given 'subdir/filename' (relative to self.root).
  263. Will be automatically closed when evicted from the cache."""
  264. # Create path if it doesn't exist
  265. try:
  266. os.mkdir(os.path.join(self.root, subdir))
  267. except OSError:
  268. pass
  269. # Return a rocket.Rocket object, which contains the open file
  270. return rocket.Rocket(self.layout,
  271. os.path.join(self.root, subdir, filename))
  272. def append(self, data):
  273. """Append the data and flush it to disk.
  274. data is a nested Python list [[row],[row],[...]]"""
  275. remaining = len(data)
  276. dataiter = iter(data)
  277. while remaining:
  278. # See how many rows we can fit into the current file, and open it
  279. (subdir, fname, offset, count) = self._offset_from_row(self.nrows)
  280. if count > remaining:
  281. count = remaining
  282. f = self.file_open(subdir, fname)
  283. # Write the data
  284. written = f.append_iter(count, dataiter)
  285. if written != count: # pragma: no cover
  286. raise Exception("Didn't write the expected number of rows: "
  287. + str(written) + " != " + str(count))
  288. remaining -= count
  289. self.nrows += count
  290. def append_string(self, data, start, end):
  291. """Parse the formatted string in 'data', according to the
  292. current layout, and append it to the table. If any timestamps
  293. are non-monotonic, or don't fall between 'start' and 'end',
  294. a ValueError is raised.
  295. If this function succeeds, it returns normally. Otherwise,
  296. the table is reverted back to its original state by truncating
  297. or deleting files as necessary."""
  298. data_offset = 0
  299. last_timestamp = -1e12
  300. tot_rows = self.nrows
  301. count = 0
  302. linenum = 0
  303. try:
  304. while data_offset < len(data):
  305. # See how many rows we can fit into the current file,
  306. # and open it
  307. (subdir, fname, offset, count) = self._offset_from_row(tot_rows)
  308. f = self.file_open(subdir, fname)
  309. # Ask the rocket object to parse and append up to "count"
  310. # rows of data, verifying things along the way.
  311. try:
  312. (added_rows, data_offset, last_timestamp, linenum
  313. ) = f.append_string(count, data, data_offset, linenum,
  314. start, end, last_timestamp)
  315. except rocket.ParseError as e:
  316. (linenum, errtype, obj) = e.args
  317. if errtype == rocket.ERR_NON_MONOTONIC:
  318. err = sprintf("line %d: timestamp is not monotonically "
  319. "increasing", linenum)
  320. elif errtype == rocket.ERR_OUT_OF_INTERVAL:
  321. if obj < start:
  322. err = sprintf("line %d: Data timestamp %s < "
  323. "start time %s", linenum,
  324. ftts(obj), ftts(start))
  325. else:
  326. err = sprintf("line %d: Data timestamp %s >= "
  327. "end time %s", linenum,
  328. ftts(obj), ftts(end))
  329. else:
  330. err = sprintf("line %d: %s", linenum, str(obj))
  331. raise ValueError("error parsing input data: " + err)
  332. tot_rows += added_rows
  333. except Exception:
  334. # Some failure, so try to roll things back by truncating or
  335. # deleting files that we may have appended data to.
  336. cleanpos = self.nrows
  337. while cleanpos <= tot_rows:
  338. (subdir, fname, offset, count) = self._offset_from_row(cleanpos)
  339. self._remove_or_truncate_file(subdir, fname, offset)
  340. cleanpos += count
  341. # Re-raise original exception
  342. raise
  343. else:
  344. # Success, so update self.nrows accordingly
  345. self.nrows = tot_rows
  346. def _get_data(self, start, stop, as_string):
  347. """Extract data corresponding to Python range [n:m],
  348. and returns a numeric list or formatted string,
  349. depending on as_string."""
  350. if (start is None or
  351. stop is None or
  352. start > stop or
  353. start < 0 or
  354. stop > self.nrows):
  355. raise IndexError("Index out of range")
  356. ret = []
  357. row = start
  358. remaining = stop - start
  359. while remaining > 0:
  360. (subdir, filename, offset, count) = self._offset_from_row(row)
  361. if count > remaining:
  362. count = remaining
  363. f = self.file_open(subdir, filename)
  364. if as_string:
  365. ret.append(f.extract_string(offset, count))
  366. else:
  367. ret.extend(f.extract_list(offset, count))
  368. remaining -= count
  369. row += count
  370. if as_string:
  371. return "".join(ret)
  372. return ret
  373. def get_as_text(self, start, stop):
  374. """Extract data corresponding to Python range [n:m],
  375. and returns a formatted string"""
  376. return self._get_data(start, stop, True)
  377. def __getitem__(self, key):
  378. """Extract data and return it. Supports simple indexing
  379. (table[n]) and range slices (table[n:m]). Returns a nested
  380. Python list [[row],[row],[...]]"""
  381. # Handle simple slices
  382. if isinstance(key, slice):
  383. # Fall back to brute force if the slice isn't simple
  384. try:
  385. if (key.step is not None and key.step != 1):
  386. raise IndexError
  387. return self._get_data(key.start, key.stop, False)
  388. except IndexError:
  389. return [ self[x] for x in xrange(*key.indices(self.nrows)) ]
  390. # Handle single points (inefficiently!)
  391. if key < 0 or key >= self.nrows:
  392. raise IndexError("Index out of range")
  393. (subdir, filename, offset, count) = self._offset_from_row(key)
  394. f = self.file_open(subdir, filename)
  395. return f.extract_list(offset, 1)[0]
  396. def _remove_rows(self, subdir, filename, start, stop):
  397. """Helper to mark specific rows as being removed from a
  398. file, and potentially remove or truncate the file itself."""
  399. # Close potentially open file in file_open LRU cache
  400. self.file_open.cache_remove(self, subdir, filename)
  401. # We keep a file like 0000.removed that contains a list of
  402. # which rows have been "removed". Note that we never have to
  403. # remove entries from this list, because we never decrease
  404. # self.nrows, and so we will never overwrite those locations in the
  405. # file. Only when the list covers the entire extent of the
  406. # file will that file be removed.
  407. datafile = os.path.join(self.root, subdir, filename)
  408. cachefile = datafile + ".removed"
  409. try:
  410. with open(cachefile, "rb") as f:
  411. ranges = pickle.load(f)
  412. cachefile_present = True
  413. except:
  414. ranges = []
  415. cachefile_present = False
  416. # Append our new range and sort
  417. ranges.append((start, stop))
  418. ranges.sort()
  419. # Merge adjacent ranges into "out"
  420. merged = []
  421. prev = None
  422. for new in ranges:
  423. if prev is None:
  424. # No previous range, so remember this one
  425. prev = new
  426. elif prev[1] == new[0]:
  427. # Previous range connected to this new one; extend prev
  428. prev = (prev[0], new[1])
  429. else:
  430. # Not connected; append previous and start again
  431. merged.append(prev)
  432. prev = new
  433. if prev is not None:
  434. merged.append(prev)
  435. # If the range covered the whole file, we can delete it now.
  436. # Note that the last file in a table may be only partially
  437. # full (smaller than self.rows_per_file). We purposely leave
  438. # those files around rather than deleting them, because the
  439. # remainder will be filled on a subsequent append(), and things
  440. # are generally easier if we don't have to special-case that.
  441. if (len(merged) == 1 and
  442. merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
  443. # Delete files
  444. if cachefile_present:
  445. os.remove(cachefile)
  446. self._remove_or_truncate_file(subdir, filename, 0)
  447. else:
  448. # File needs to stick around. This means we can get
  449. # degenerate cases where we have large files containing as
  450. # little as one row. Try to punch a hole in the file,
  451. # so that this region doesn't take up filesystem space.
  452. offset = start * self.row_size
  453. count = (stop - start) * self.row_size
  454. nilmdb.utils.fallocate.punch_hole(datafile, offset, count)
  455. # Update cache. Try to do it atomically.
  456. nilmdb.utils.atomic.replace_file(cachefile,
  457. pickle.dumps(merged, 2))
  458. def remove(self, start, stop):
  459. """Remove specified rows [start, stop) from this table.
  460. If a file is left empty, it is fully removed. Otherwise, a
  461. parallel data file is used to remember which rows have been
  462. removed, and the file is otherwise untouched."""
  463. if start < 0 or start > stop or stop > self.nrows:
  464. raise IndexError("Index out of range")
  465. row = start
  466. remaining = stop - start
  467. while remaining:
  468. # Loop through each file that we need to touch
  469. (subdir, filename, offset, count) = self._offset_from_row(row)
  470. if count > remaining:
  471. count = remaining
  472. row_offset = offset // self.row_size
  473. # Mark the rows as being removed
  474. self._remove_rows(subdir, filename, row_offset, row_offset + count)
  475. remaining -= count
  476. row += count
  477. class TimestampOnlyTable(object):
  478. """Helper that lets us pass a Tables object into bisect, by
  479. returning only the timestamp when a particular row is requested."""
  480. def __init__(self, table):
  481. self.table = table
  482. def __getitem__(self, index):
  483. return self.table[index][0]