You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

493 lines
20 KiB

  1. # -*- coding: utf-8 -*-
  2. """Check database consistency, with some ability to fix problems.
  3. This should be able to fix cases where a database gets corrupted due
  4. to unexpected system shutdown, and detect other cases that may cause
  5. NilmDB to return errors when trying to manipulate the database."""
  6. import nilmdb.utils
  7. import nilmdb.server
  8. import nilmdb.client.numpyclient
  9. from nilmdb.utils.interval import IntervalError
  10. from nilmdb.server.interval import Interval, IntervalSet
  11. from nilmdb.utils.printf import printf, fprintf, sprintf
  12. from collections import defaultdict
  13. import sqlite3
  14. import os
  15. import sys
  16. import progressbar
  17. import re
  18. import shutil
  19. import pickle
  20. import numpy
  21. class FsckError(Exception):
  22. def __init__(self, msg="", *args):
  23. if args:
  24. msg = sprintf(msg, *args)
  25. Exception.__init__(self, msg)
  26. class FixableFsckError(FsckError):
  27. def __init__(self, msg=""):
  28. FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
  29. class RetryFsck(FsckError):
  30. pass
  31. def log(format, *args):
  32. printf(format, *args)
  33. def err(format, *args):
  34. fprintf(sys.stderr, format, *args)
  35. # Decorator that retries a function if it returns a specific value
  36. def retry_if_raised(exc, message=None, max_retries=100):
  37. def f1(func):
  38. def f2(*args, **kwargs):
  39. for n in range(max_retries):
  40. try:
  41. return func(*args, **kwargs)
  42. except exc:
  43. if message:
  44. log("%s\n\n", message)
  45. raise Exception("Max number of retries (%d) exceeded; giving up" %
  46. max_retries)
  47. return f2
  48. return f1
  49. class Progress(object):
  50. def __init__(self, maxval):
  51. if maxval == 0:
  52. maxval = 1
  53. self.bar = progressbar.ProgressBar(
  54. maxval=maxval,
  55. widgets=[progressbar.Percentage(), ' ',
  56. progressbar.Bar(), ' ',
  57. progressbar.ETA()])
  58. self.bar.term_width = self.bar.term_width or 75
  59. def __enter__(self):
  60. self.bar.start()
  61. self.last_update = 0
  62. return self
  63. def __exit__(self, exc_type, exc_value, traceback):
  64. if exc_type is None:
  65. self.bar.finish()
  66. else:
  67. printf("\n")
  68. def update(self, val):
  69. self.bar.update(val)
  70. class Fsck(object):
  71. def __init__(self, path, fix=False):
  72. self.basepath = path
  73. self.sqlpath = os.path.join(path, "data.sql")
  74. self.bulkpath = os.path.join(path, "data")
  75. self.bulklock = os.path.join(path, "data.lock")
  76. self.fix = fix
  77. ### Main checks
  78. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  79. def check(self, skip_data=False):
  80. self.bulk = None
  81. self.sql = None
  82. try:
  83. self.check_paths()
  84. self.check_sql()
  85. self.check_streams()
  86. self.check_intervals()
  87. if skip_data:
  88. log("skipped data check\n")
  89. else:
  90. self.check_data()
  91. finally:
  92. if self.bulk:
  93. self.bulk.close()
  94. if self.sql: # pragma: no cover
  95. # (coverage doesn't handle finally clauses correctly;
  96. # both branches here are tested)
  97. self.sql.commit()
  98. self.sql.close()
  99. log("ok\n")
  100. ### Check basic path structure
  101. def check_paths(self):
  102. log("checking paths\n")
  103. if self.bulk:
  104. self.bulk.close()
  105. if not os.path.isfile(self.sqlpath):
  106. raise FsckError("SQL database missing (%s)", self.sqlpath)
  107. if not os.path.isdir(self.bulkpath):
  108. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  109. with open(self.bulklock, "w") as lockfile:
  110. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  111. raise FsckError('Database already locked by another process\n'
  112. 'Make sure all other processes that might be '
  113. 'using the database are stopped.\n'
  114. 'Restarting apache will cause it to unlock '
  115. 'the db until a request is received.')
  116. # unlocked immediately
  117. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  118. ### Check SQL database health
  119. def check_sql(self):
  120. log("checking sqlite database\n")
  121. self.sql = sqlite3.connect(self.sqlpath)
  122. with self.sql:
  123. cur = self.sql.cursor()
  124. ver = cur.execute("PRAGMA user_version").fetchone()[0]
  125. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  126. if ver != good:
  127. raise FsckError("database version %d too old, should be %d",
  128. ver, good)
  129. self.stream_path = {}
  130. self.stream_layout = {}
  131. log(" loading paths\n")
  132. result = cur.execute("SELECT id, path, layout FROM streams")
  133. for r in result:
  134. if r[0] in self.stream_path:
  135. raise FsckError("duplicated ID %d in stream IDs", r[0])
  136. self.stream_path[r[0]] = r[1]
  137. self.stream_layout[r[0]] = r[2]
  138. log(" loading intervals\n")
  139. self.stream_interval = defaultdict(list)
  140. result = cur.execute("SELECT stream_id, start_time, end_time, "
  141. "start_pos, end_pos FROM ranges "
  142. "ORDER BY start_time")
  143. for r in result:
  144. if r[0] not in self.stream_path:
  145. raise FsckError("interval ID %d not in streams", r[0])
  146. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  147. log(" loading metadata\n")
  148. self.stream_meta = defaultdict(dict)
  149. result = cur.execute("SELECT stream_id, key, value FROM metadata")
  150. for r in result:
  151. if r[0] not in self.stream_path:
  152. raise FsckError("metadata ID %d not in streams", r[0])
  153. if r[1] in self.stream_meta[r[0]]:
  154. raise FsckError(
  155. "duplicate metadata key '%s' for stream %d",
  156. r[1], r[0])
  157. self.stream_meta[r[0]][r[1]] = r[2]
  158. ### Check streams and basic interval overlap
  159. def check_streams(self):
  160. ids = list(self.stream_path.keys())
  161. log("checking %s streams\n", "{:,d}".format(len(ids)))
  162. with Progress(len(ids)) as pbar:
  163. for i, sid in enumerate(ids):
  164. pbar.update(i)
  165. path = self.stream_path[sid]
  166. # unique path, valid layout
  167. if list(self.stream_path.values()).count(path) != 1:
  168. raise FsckError("duplicated path %s", path)
  169. layout = self.stream_layout[sid].split('_')[0]
  170. if layout not in ('int8', 'int16', 'int32', 'int64',
  171. 'uint8', 'uint16', 'uint32', 'uint64',
  172. 'float32', 'float64'):
  173. raise FsckError("bad layout %s for %s", layout, path)
  174. count = int(self.stream_layout[sid].split('_')[1])
  175. if count < 1 or count > 1024:
  176. raise FsckError("bad count %d for %s", count, path)
  177. # must exist in bulkdata
  178. bulk = self.bulkpath + path
  179. bulk = bulk.encode('utf-8')
  180. if not os.path.isdir(bulk):
  181. raise FsckError("%s: missing bulkdata dir", path)
  182. if not nilmdb.server.bulkdata.Table.exists(bulk):
  183. raise FsckError("%s: bad bulkdata table", path)
  184. # intervals don't overlap. Abuse IntervalSet to check
  185. # for intervals in file positions, too.
  186. timeiset = IntervalSet()
  187. posiset = IntervalSet()
  188. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  189. new = Interval(stime, etime)
  190. try:
  191. timeiset += new
  192. except IntervalError:
  193. raise FsckError("%s: overlap in intervals:\n"
  194. "set: %s\nnew: %s",
  195. path, str(timeiset), str(new))
  196. if spos != epos:
  197. new = Interval(spos, epos)
  198. try:
  199. posiset += new
  200. except IntervalError:
  201. raise FsckError("%s: overlap in file offsets:\n"
  202. "set: %s\nnew: %s",
  203. path, str(posiset), str(new))
  204. # check bulkdata
  205. self.check_bulkdata(sid, path, bulk)
  206. # Check that we can open bulkdata
  207. try:
  208. tab = nilmdb.server.bulkdata.Table(bulk)
  209. except Exception as e: # pragma: no cover --
  210. # No coverage here because, in the current code,
  211. # everything that would cause the bulkdata to fail
  212. # has been already checked.
  213. raise FsckError("%s: can't open bulkdata: %s",
  214. path, str(e))
  215. tab.close()
  216. ### Check that bulkdata is good enough to be opened
  217. @retry_if_raised(RetryFsck)
  218. def check_bulkdata(self, sid, path, bulk):
  219. with open(os.path.join(bulk, b"_format"), "rb") as f:
  220. fmt = pickle.load(f)
  221. if fmt["version"] != 3:
  222. raise FsckError("%s: bad or unsupported bulkdata version %d",
  223. path, fmt["version"])
  224. rows_per_file = int(fmt["rows_per_file"])
  225. if rows_per_file < 1:
  226. raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
  227. files_per_dir = int(fmt["files_per_dir"])
  228. if files_per_dir < 1:
  229. raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
  230. layout = fmt["layout"]
  231. if layout != self.stream_layout[sid]:
  232. raise FsckError("%s: layout mismatch %s != %s", path,
  233. layout, self.stream_layout[sid])
  234. # Every file should have a size that's the multiple of the row size
  235. rkt = nilmdb.server.rocket.Rocket(layout, None)
  236. row_size = rkt.binary_size
  237. rkt.close()
  238. # Find all directories
  239. regex = re.compile(b"^[0-9a-f]{4,}$")
  240. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  241. key=lambda x: int(x, 16), reverse=True)
  242. for subdir in subdirs:
  243. # Find all files in that dir
  244. subpath = os.path.join(bulk, subdir)
  245. files = list(filter(regex.search, os.listdir(subpath)))
  246. if not files:
  247. self.fix_empty_subdir(subpath)
  248. raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
  249. # Verify that their size is a multiple of the row size
  250. for filename in files:
  251. filepath = os.path.join(subpath, filename)
  252. offset = os.path.getsize(filepath)
  253. if offset % row_size:
  254. self.fix_bad_filesize(path, filepath, offset, row_size)
  255. def fix_empty_subdir(self, subpath):
  256. msg = sprintf("bulkdata path %s is missing data files", subpath)
  257. if not self.fix:
  258. raise FixableFsckError(msg)
  259. # Try to fix it by just deleting whatever is present,
  260. # as long as it's only ".removed" files.
  261. err("\n%s\n", msg)
  262. for fn in os.listdir(subpath):
  263. if not fn.endswith(b".removed"):
  264. raise FsckError("can't fix automatically: please manually "
  265. "remove the file '%s' and try again",
  266. os.path.join(subpath, fn).decode(
  267. 'utf-8', errors='backslashreplace'))
  268. # Remove the whole thing
  269. err("Removing empty subpath\n")
  270. shutil.rmtree(subpath)
  271. raise RetryFsck
  272. def fix_bad_filesize(self, path, filepath, offset, row_size):
  273. extra = offset % row_size
  274. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  275. " of row size (%d): %d extra bytes present",
  276. path, filepath, offset, row_size, extra)
  277. if not self.fix:
  278. raise FixableFsckError(msg)
  279. # Try to fix it by just truncating the file
  280. err("\n%s\n", msg)
  281. newsize = offset - extra
  282. err("Truncating file to %d bytes and retrying\n", newsize)
  283. with open(filepath, "r+b") as f:
  284. f.truncate(newsize)
  285. raise RetryFsck
  286. ### Check interval endpoints
  287. def check_intervals(self):
  288. total_ints = sum(len(x) for x in list(self.stream_interval.values()))
  289. log("checking %s intervals\n", "{:,d}".format(total_ints))
  290. done = 0
  291. with Progress(total_ints) as pbar:
  292. for sid in self.stream_interval:
  293. try:
  294. bulk = self.bulkpath + self.stream_path[sid]
  295. bulk = bulk.encode('utf-8')
  296. tab = nilmdb.server.bulkdata.Table(bulk)
  297. def update(x):
  298. pbar.update(done + x)
  299. ints = self.stream_interval[sid]
  300. done += self.check_table_intervals(sid, ints, tab, update)
  301. finally:
  302. tab.close()
  303. def check_table_intervals(self, sid, ints, tab, update):
  304. # look in the table to make sure we can pick out the interval's
  305. # endpoints
  306. path = self.stream_path[sid] # noqa: F841 unused
  307. tab.file_open.cache_remove_all()
  308. for (i, intv) in enumerate(ints):
  309. update(i)
  310. (stime, etime, spos, epos) = intv
  311. if spos == epos and spos >= 0 and spos <= tab.nrows:
  312. continue
  313. try:
  314. srow = tab[spos] # noqa: F841 unused
  315. erow = tab[epos-1] # noqa: F841 unused
  316. except Exception as e:
  317. self.fix_bad_interval(sid, intv, tab, str(e))
  318. raise RetryFsck # pragma: no cover; raised by fix_bad_interval
  319. return len(ints)
  320. def fix_bad_interval(self, sid, intv, tab, msg):
  321. path = self.stream_path[sid]
  322. msg = sprintf("%s: interval %s error accessing rows: %s",
  323. path, str(intv), str(msg))
  324. if not self.fix:
  325. raise FixableFsckError(msg)
  326. err("\n%s\n", msg)
  327. (stime, etime, spos, epos) = intv
  328. # If it's just that the end pos is more than the number of rows
  329. # in the table, lower end pos and truncate interval time too.
  330. if spos < tab.nrows and epos >= tab.nrows:
  331. err("end position is past endrows, but it can be truncated\n")
  332. err("old end: time %d, pos %d\n", etime, epos)
  333. new_epos = tab.nrows
  334. new_etime = tab[new_epos-1] + 1
  335. err("new end: time %d, pos %d\n", new_etime, new_epos)
  336. if stime < new_etime:
  337. # Change it in SQL
  338. with self.sql:
  339. cur = self.sql.cursor()
  340. cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
  341. "WHERE stream_id=? AND start_time=? AND "
  342. "end_time=? AND start_pos=? AND end_pos=?",
  343. (new_etime, new_epos, sid, stime, etime,
  344. spos, epos))
  345. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  346. raise FsckError("failed to fix SQL database")
  347. raise RetryFsck
  348. err("actually it can't be truncated; times are bad too\n")
  349. # Otherwise, the only hope is to delete the interval entirely.
  350. err("*** Deleting the entire interval from SQL.\n")
  351. err("This may leave stale data on disk. To fix that, copy all\n")
  352. err("data from this stream to a new stream, then remove all data\n")
  353. err("from and destroy %s.\n", path)
  354. with self.sql:
  355. cur = self.sql.cursor()
  356. cur.execute("DELETE FROM ranges WHERE "
  357. "stream_id=? AND start_time=? AND "
  358. "end_time=? AND start_pos=? AND end_pos=?",
  359. (sid, stime, etime, spos, epos))
  360. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  361. raise FsckError("failed to remove interval")
  362. raise RetryFsck
  363. ### Check data in each interval
  364. def check_data(self):
  365. total_rows = sum(sum((y[3] - y[2]) for y in x)
  366. for x in list(self.stream_interval.values()))
  367. log("checking %s rows of data\n", "{:,d}".format(total_rows))
  368. done = 0
  369. with Progress(total_rows) as pbar:
  370. for sid in self.stream_interval:
  371. try:
  372. bulk = self.bulkpath + self.stream_path[sid]
  373. bulk = bulk.encode('utf-8')
  374. tab = nilmdb.server.bulkdata.Table(bulk)
  375. def update(x):
  376. pbar.update(done + x)
  377. ints = self.stream_interval[sid]
  378. done += self.check_table_data(sid, ints, tab, update)
  379. finally:
  380. tab.close()
  381. def check_table_data(self, sid, ints, tab, update):
  382. # Pull out all of the interval's data and verify that it's
  383. # monotonic.
  384. maxrows = getattr(self, 'maxrows_override', 100000)
  385. path = self.stream_path[sid]
  386. layout = self.stream_layout[sid]
  387. dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
  388. tab.file_open.cache_remove_all()
  389. done = 0
  390. for intv in ints:
  391. last_ts = None
  392. (stime, etime, spos, epos) = intv
  393. # Break interval into maxrows-sized chunks
  394. next_start = spos
  395. while next_start < epos:
  396. start = next_start
  397. stop = min(start + maxrows, epos)
  398. count = stop - start
  399. next_start = stop
  400. # Get raw data, convert to NumPy arary
  401. try:
  402. raw = tab.get_data(start, stop, binary=True)
  403. data = numpy.frombuffer(raw, dtype)
  404. except Exception as e: # pragma: no cover
  405. # No coverage because it's hard to trigger this -- earlier
  406. # checks check the ranges, so this would probably be a real
  407. # disk error, malloc failure, etc.
  408. raise FsckError(
  409. "%s: failed to grab rows %d through %d: %s",
  410. path, start, stop, repr(e))
  411. # Verify that timestamps are monotonic
  412. if (numpy.diff(data['timestamp']) <= 0).any():
  413. raise FsckError("%s: non-monotonic timestamp(s) in rows "
  414. "%d through %d", path, start, stop)
  415. first_ts = data['timestamp'][0]
  416. print("first_ts", first_ts, "last_ts", last_ts)
  417. if last_ts is not None and first_ts <= last_ts:
  418. raise FsckError("%s: first interval timestamp %d is not "
  419. "greater than the previous last interval "
  420. "timestamp %d, at row %d",
  421. path, first_ts, last_ts, start)
  422. last_ts = data['timestamp'][-1]
  423. print("last_ts", last_ts)
  424. # These are probably fixable, by removing the offending
  425. # intervals. But I'm not going to bother implementing
  426. # that yet.
  427. # Done
  428. done += count
  429. update(done)
  430. return done