You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

fsck.py 25 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. # -*- coding: utf-8 -*-
  2. """Check database consistency, with some ability to fix problems.
  3. This should be able to fix cases where a database gets corrupted due
  4. to unexpected system shutdown, and detect other cases that may cause
  5. NilmDB to return errors when trying to manipulate the database."""
  6. import nilmdb.utils
  7. import nilmdb.server
  8. import nilmdb.client.numpyclient
  9. from nilmdb.utils.interval import IntervalError
  10. from nilmdb.server.interval import Interval, IntervalSet
  11. from nilmdb.utils.printf import printf, fprintf, sprintf
  12. from collections import defaultdict
  13. import sqlite3
  14. import os
  15. import sys
  16. import progressbar
  17. import re
  18. import shutil
  19. import pickle
  20. import numpy
  21. class FsckError(Exception):
  22. def __init__(self, msg="", *args):
  23. if args:
  24. msg = sprintf(msg, *args)
  25. Exception.__init__(self, msg)
  26. class FixableFsckError(FsckError):
  27. def __init__(self, msg=""):
  28. FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
  29. class RetryFsck(FsckError):
  30. pass
  31. class FsckFormatError(FsckError):
  32. pass
  33. def log(format, *args):
  34. printf(format, *args)
  35. def err(format, *args):
  36. fprintf(sys.stderr, format, *args)
  37. # Decorator that retries a function if it returns a specific value
  38. def retry_if_raised(exc, message=None, max_retries=1000):
  39. def f1(func):
  40. def f2(*args, **kwargs):
  41. for n in range(max_retries):
  42. try:
  43. return func(*args, **kwargs)
  44. except exc:
  45. if message:
  46. log(f"{message} ({n+1})\n\n")
  47. raise Exception("Max number of retries (%d) exceeded; giving up" %
  48. max_retries)
  49. return f2
  50. return f1
  51. class Progress(object):
  52. def __init__(self, maxval):
  53. if maxval == 0:
  54. maxval = 1
  55. self.bar = progressbar.ProgressBar(
  56. maxval=maxval,
  57. widgets=[progressbar.Percentage(), ' ',
  58. progressbar.Bar(), ' ',
  59. progressbar.ETA()])
  60. self.bar.term_width = self.bar.term_width or 75
  61. def __enter__(self):
  62. self.bar.start()
  63. self.last_update = 0
  64. return self
  65. def __exit__(self, exc_type, exc_value, traceback):
  66. if exc_type is None:
  67. self.bar.finish()
  68. else:
  69. printf("\n")
  70. def update(self, val):
  71. self.bar.update(val)
  72. class Fsck(object):
  73. def __init__(self, path, fix=False):
  74. self.basepath = path
  75. self.sqlpath = os.path.join(path, "data.sql")
  76. self.bulkpath = os.path.join(path, "data")
  77. self.bulklock = os.path.join(path, "data.lock")
  78. self.fix = fix
  79. ### Main checks
  80. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  81. def check(self, skip_data=False):
  82. self.bulk = None
  83. self.sql = None
  84. try:
  85. self.check_paths()
  86. self.check_sql()
  87. self.check_streams()
  88. self.check_intervals()
  89. if skip_data:
  90. log("skipped data check\n")
  91. else:
  92. self.check_data()
  93. finally:
  94. if self.bulk:
  95. self.bulk.close()
  96. if self.sql: # pragma: no cover
  97. # (coverage doesn't handle finally clauses correctly;
  98. # both branches here are tested)
  99. self.sql.commit()
  100. self.sql.close()
  101. log("ok\n")
  102. ### Check basic path structure
  103. def check_paths(self):
  104. log("checking paths\n")
  105. if self.bulk:
  106. self.bulk.close()
  107. if not os.path.isfile(self.sqlpath):
  108. raise FsckError("SQL database missing (%s)", self.sqlpath)
  109. if not os.path.isdir(self.bulkpath):
  110. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  111. with open(self.bulklock, "w") as lockfile:
  112. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  113. raise FsckError('Database already locked by another process\n'
  114. 'Make sure all other processes that might be '
  115. 'using the database are stopped.\n'
  116. 'Restarting apache will cause it to unlock '
  117. 'the db until a request is received.')
  118. # unlocked immediately
  119. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  120. ### Check SQL database health
  121. def check_sql(self):
  122. log("checking sqlite database\n")
  123. self.sql = sqlite3.connect(self.sqlpath)
  124. with self.sql:
  125. cur = self.sql.cursor()
  126. ver = cur.execute("PRAGMA user_version").fetchone()[0]
  127. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  128. if ver != good:
  129. raise FsckError("database version %d too old, should be %d",
  130. ver, good)
  131. self.stream_path = {}
  132. self.stream_layout = {}
  133. log(" loading paths\n")
  134. result = cur.execute("SELECT id, path, layout FROM streams")
  135. for r in result:
  136. if r[0] in self.stream_path:
  137. raise FsckError("duplicated ID %d in stream IDs", r[0])
  138. self.stream_path[r[0]] = r[1]
  139. self.stream_layout[r[0]] = r[2]
  140. log(" loading intervals\n")
  141. self.stream_interval = defaultdict(list)
  142. result = cur.execute("SELECT stream_id, start_time, end_time, "
  143. "start_pos, end_pos FROM ranges "
  144. "ORDER BY start_time")
  145. for r in result:
  146. if r[0] not in self.stream_path:
  147. raise FsckError("interval ID %d not in streams", r[0])
  148. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  149. log(" loading metadata\n")
  150. self.stream_meta = defaultdict(dict)
  151. result = cur.execute("SELECT stream_id, key, value FROM metadata")
  152. for r in result:
  153. if r[0] not in self.stream_path:
  154. raise FsckError("metadata ID %d not in streams", r[0])
  155. if r[1] in self.stream_meta[r[0]]:
  156. raise FsckError(
  157. "duplicate metadata key '%s' for stream %d",
  158. r[1], r[0])
  159. self.stream_meta[r[0]][r[1]] = r[2]
  160. ### Check streams and basic interval overlap
  161. def check_streams(self):
  162. ids = list(self.stream_path.keys())
  163. log("checking %s streams\n", "{:,d}".format(len(ids)))
  164. with Progress(len(ids)) as pbar:
  165. for i, sid in enumerate(ids):
  166. pbar.update(i)
  167. path = self.stream_path[sid]
  168. # unique path, valid layout
  169. if list(self.stream_path.values()).count(path) != 1:
  170. raise FsckError("duplicated path %s", path)
  171. layout = self.stream_layout[sid].split('_')[0]
  172. if layout not in ('int8', 'int16', 'int32', 'int64',
  173. 'uint8', 'uint16', 'uint32', 'uint64',
  174. 'float32', 'float64'):
  175. raise FsckError("bad layout %s for %s", layout, path)
  176. count = int(self.stream_layout[sid].split('_')[1])
  177. if count < 1 or count > 1024:
  178. raise FsckError("bad count %d for %s", count, path)
  179. # must exist in bulkdata
  180. bulk = self.bulkpath + path
  181. bulk = bulk.encode('utf-8')
  182. if not os.path.isdir(bulk):
  183. raise FsckError("%s: missing bulkdata dir", path)
  184. if not nilmdb.server.bulkdata.Table.exists(bulk):
  185. raise FsckError("%s: bad bulkdata table", path)
  186. # intervals don't overlap. Abuse IntervalSet to check
  187. # for intervals in file positions, too.
  188. timeiset = IntervalSet()
  189. posiset = IntervalSet()
  190. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  191. new = Interval(stime, etime)
  192. try:
  193. timeiset += new
  194. except IntervalError:
  195. raise FsckError("%s: overlap in intervals:\n"
  196. "set: %s\nnew: %s",
  197. path, str(timeiset), str(new))
  198. if spos != epos:
  199. new = Interval(spos, epos)
  200. try:
  201. posiset += new
  202. except IntervalError:
  203. self.fix_row_overlap(sid, path, posiset, new)
  204. try:
  205. # Check bulkdata
  206. self.check_bulkdata(sid, path, bulk)
  207. # Check that we can open bulkdata
  208. tab = nilmdb.server.bulkdata.Table(bulk)
  209. except FsckFormatError as e:
  210. # If there are no files except _format, try deleting
  211. # the entire stream; this may remove metadata, but
  212. # it's probably unimportant.
  213. files = list(os.listdir(bulk))
  214. if len(files) > 1:
  215. raise FsckFormatError(f"{path}: can't load _format, "
  216. f"but data is also present")
  217. # Since the stream was empty, just remove it
  218. self.fix_remove_stream(sid, path, bulk,
  219. "empty, with corrupted format file")
  220. except FsckError as e:
  221. raise e
  222. except Exception as e: # pragma: no cover
  223. # No coverage because this is an unknown/unexpected error
  224. raise FsckError("%s: can't open bulkdata: %s",
  225. path, str(e))
  226. tab.close()
  227. def fix_row_overlap(self, sid, path, existing, new):
  228. # If the file rows (spos, epos) overlap in the interval table,
  229. # and the overlapping ranges look like this:
  230. # A --------- C
  231. # B -------- D
  232. # Then we can try changing the first interval to go from
  233. # A to B instead.
  234. msg = (f"{path}: overlap in file offests:\n"
  235. f"existing ranges: {existing}\n"
  236. f"overlapping interval: {new}")
  237. if not self.fix:
  238. raise FixableFsckError(msg)
  239. err(f"\n{msg}\nSeeing if we can truncate one of them...\n")
  240. # See if there'e exactly one interval that overlaps the
  241. # conflicting one in the right way
  242. match = None
  243. for intv in self.stream_interval[sid]:
  244. (stime, etime, spos, epos) = intv
  245. if spos < new.start and epos > new.start:
  246. if match:
  247. err(f"no, more than one interval matched:\n"
  248. f"{intv}\n{match}\n")
  249. raise FsckError(f"{path}: unfixable overlap")
  250. match = intv
  251. if match is None:
  252. err(f"no intervals overlapped in the right way\n")
  253. raise FsckError(f"{path}: unfixable overlap")
  254. # Truncate the file position
  255. err(f"truncating {match}\n")
  256. with self.sql:
  257. cur = self.sql.cursor()
  258. cur.execute("UPDATE ranges SET end_pos=? "
  259. "WHERE stream_id=? AND start_time=? AND "
  260. "end_time=? AND start_pos=? AND end_pos=?",
  261. (new.start, sid, *match))
  262. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  263. raise FsckError("failed to fix SQL database")
  264. raise RetryFsck
  265. ### Check that bulkdata is good enough to be opened
  266. @retry_if_raised(RetryFsck)
  267. def check_bulkdata(self, sid, path, bulk):
  268. try:
  269. with open(os.path.join(bulk, b"_format"), "rb") as f:
  270. fmt = pickle.load(f)
  271. except Exception as e:
  272. raise FsckFormatError(f"{path}: can't load _format file ({e})")
  273. if fmt["version"] != 3:
  274. raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
  275. path, fmt["version"])
  276. rows_per_file = int(fmt["rows_per_file"])
  277. if rows_per_file < 1:
  278. raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
  279. files_per_dir = int(fmt["files_per_dir"])
  280. if files_per_dir < 1:
  281. raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
  282. layout = fmt["layout"]
  283. if layout != self.stream_layout[sid]:
  284. raise FsckFormatError("%s: layout mismatch %s != %s", path,
  285. layout, self.stream_layout[sid])
  286. # Every file should have a size that's the multiple of the row size
  287. rkt = nilmdb.server.rocket.Rocket(layout, None)
  288. row_size = rkt.binary_size
  289. rkt.close()
  290. # Find all directories
  291. regex = re.compile(b"^[0-9a-f]{4,}$")
  292. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  293. key=lambda x: int(x, 16), reverse=True)
  294. for subdir in subdirs:
  295. # Find all files in that dir
  296. subpath = os.path.join(bulk, subdir)
  297. files = list(filter(regex.search, os.listdir(subpath)))
  298. if not files:
  299. self.fix_empty_subdir(subpath)
  300. # Verify that their size is a multiple of the row size
  301. for filename in files:
  302. filepath = os.path.join(subpath, filename)
  303. offset = os.path.getsize(filepath)
  304. if offset % row_size:
  305. self.fix_bad_filesize(path, filepath, offset, row_size)
  306. def fix_empty_subdir(self, subpath):
  307. msg = sprintf("bulkdata path %s is missing data files", subpath)
  308. if not self.fix:
  309. raise FixableFsckError(msg)
  310. # Try to fix it by just deleting whatever is present,
  311. # as long as it's only ".removed" files.
  312. err("\n%s\n", msg)
  313. for fn in os.listdir(subpath):
  314. if not fn.endswith(b".removed"):
  315. raise FsckError("can't fix automatically: please manually "
  316. "remove the file '%s' and try again",
  317. os.path.join(subpath, fn).decode(
  318. 'utf-8', errors='backslashreplace'))
  319. # Remove the whole thing
  320. err("Removing empty subpath\n")
  321. shutil.rmtree(subpath)
  322. raise RetryFsck
  323. def fix_bad_filesize(self, path, filepath, offset, row_size):
  324. extra = offset % row_size
  325. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  326. " of row size (%d): %d extra bytes present",
  327. path, filepath, offset, row_size, extra)
  328. if not self.fix:
  329. raise FixableFsckError(msg)
  330. # Try to fix it by just truncating the file
  331. err("\n%s\n", msg)
  332. newsize = offset - extra
  333. err("Truncating file to %d bytes and retrying\n", newsize)
  334. with open(filepath, "r+b") as f:
  335. f.truncate(newsize)
  336. raise RetryFsck
  337. def fix_remove_stream(self, sid, path, bulk, reason):
  338. msg = f"stream {path} is corrupted: {reason}"
  339. if not self.fix:
  340. raise FixableFsckError(msg)
  341. # Remove the stream from disk and the database
  342. err(f"\n{msg}\n")
  343. err(f"Removing stream {path} from disk and database\n")
  344. shutil.rmtree(bulk)
  345. with self.sql:
  346. cur = self.sql.cursor()
  347. cur.execute("DELETE FROM streams WHERE id=?",
  348. (sid,))
  349. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  350. raise FsckError("failed to remove stream")
  351. cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
  352. cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
  353. raise RetryFsck
  354. ### Check interval endpoints
  355. def check_intervals(self):
  356. total_ints = sum(len(x) for x in list(self.stream_interval.values()))
  357. log("checking %s intervals\n", "{:,d}".format(total_ints))
  358. done = 0
  359. with Progress(total_ints) as pbar:
  360. for sid in self.stream_interval:
  361. try:
  362. bulk = self.bulkpath + self.stream_path[sid]
  363. bulk = bulk.encode('utf-8')
  364. tab = nilmdb.server.bulkdata.Table(bulk)
  365. def update(x):
  366. pbar.update(done + x)
  367. ints = self.stream_interval[sid]
  368. done += self.check_table_intervals(sid, ints, tab, update)
  369. finally:
  370. tab.close()
  371. def check_table_intervals(self, sid, ints, tab, update):
  372. # look in the table to make sure we can pick out the interval's
  373. # endpoints
  374. path = self.stream_path[sid] # noqa: F841 unused
  375. tab.file_open.cache_remove_all()
  376. for (i, intv) in enumerate(ints):
  377. update(i)
  378. (stime, etime, spos, epos) = intv
  379. if spos == epos and spos >= 0 and spos <= tab.nrows:
  380. continue
  381. try:
  382. srow = tab[spos] # noqa: F841 unused
  383. erow = tab[epos-1] # noqa: F841 unused
  384. except Exception as e:
  385. self.fix_bad_interval(sid, intv, tab, str(e))
  386. return len(ints)
  387. def fix_bad_interval(self, sid, intv, tab, msg):
  388. path = self.stream_path[sid]
  389. msg = sprintf("%s: interval %s error accessing rows: %s",
  390. path, str(intv), str(msg))
  391. if not self.fix:
  392. raise FixableFsckError(msg)
  393. err("\n%s\n", msg)
  394. (stime, etime, spos, epos) = intv
  395. # If it's just that the end pos is more than the number of rows
  396. # in the table, lower end pos and truncate interval time too.
  397. if spos < tab.nrows and epos >= tab.nrows:
  398. err("end position is past endrows, but it can be truncated\n")
  399. err("old end: time %d, pos %d\n", etime, epos)
  400. new_epos = tab.nrows
  401. new_etime = tab[new_epos-1] + 1
  402. err("new end: time %d, pos %d\n", new_etime, new_epos)
  403. if stime < new_etime:
  404. # Change it in SQL
  405. with self.sql:
  406. cur = self.sql.cursor()
  407. cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
  408. "WHERE stream_id=? AND start_time=? AND "
  409. "end_time=? AND start_pos=? AND end_pos=?",
  410. (new_etime, new_epos, sid, stime, etime,
  411. spos, epos))
  412. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  413. raise FsckError("failed to fix SQL database")
  414. raise RetryFsck
  415. err("actually it can't be truncated; times are bad too\n")
  416. # Otherwise, the only hope is to delete the interval entirely.
  417. err("*** Deleting the entire interval from SQL.\n")
  418. err("This may leave stale data on disk. To fix that, copy all "
  419. "data from this stream to a new stream using nilm-copy, then\n")
  420. err("remove all data from and destroy %s.\n", path)
  421. with self.sql:
  422. cur = self.sql.cursor()
  423. cur.execute("DELETE FROM ranges WHERE "
  424. "stream_id=? AND start_time=? AND "
  425. "end_time=? AND start_pos=? AND end_pos=?",
  426. (sid, stime, etime, spos, epos))
  427. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  428. raise FsckError("failed to remove interval")
  429. raise RetryFsck
  430. ### Check data in each interval
  431. def check_data(self):
  432. total_rows = sum(sum((y[3] - y[2]) for y in x)
  433. for x in list(self.stream_interval.values()))
  434. log("checking %s rows of data\n", "{:,d}".format(total_rows))
  435. done = 0
  436. with Progress(total_rows) as pbar:
  437. for sid in self.stream_interval:
  438. try:
  439. bulk = self.bulkpath + self.stream_path[sid]
  440. bulk = bulk.encode('utf-8')
  441. tab = nilmdb.server.bulkdata.Table(bulk)
  442. def update(x):
  443. pbar.update(done + x)
  444. ints = self.stream_interval[sid]
  445. done += self.check_table_data(sid, ints, tab, update)
  446. finally:
  447. tab.close()
  448. def check_table_data(self, sid, ints, tab, update):
  449. # Pull out all of the interval's data and verify that it's
  450. # monotonic.
  451. maxrows = getattr(self, 'maxrows_override', 100000)
  452. path = self.stream_path[sid]
  453. layout = self.stream_layout[sid]
  454. dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
  455. tab.file_open.cache_remove_all()
  456. done = 0
  457. for intv in ints:
  458. last_ts = None
  459. (stime, etime, spos, epos) = intv
  460. # Break interval into maxrows-sized chunks
  461. next_start = spos
  462. while next_start < epos:
  463. start = next_start
  464. stop = min(start + maxrows, epos)
  465. count = stop - start
  466. next_start = stop
  467. # Get raw data, convert to NumPy arary
  468. try:
  469. raw = tab.get_data(start, stop, binary=True)
  470. data = numpy.frombuffer(raw, dtype)
  471. except Exception as e: # pragma: no cover
  472. # No coverage because it's hard to trigger this -- earlier
  473. # checks check the ranges, so this would probably be a real
  474. # disk error, malloc failure, etc.
  475. raise FsckError(
  476. "%s: failed to grab rows %d through %d: %s",
  477. path, start, stop, repr(e))
  478. ts = data['timestamp']
  479. # Verify that all timestamps are in range.
  480. match = (ts < stime) | (ts >= etime)
  481. if match.any():
  482. row = numpy.argmax(match)
  483. if ts[row] != 0:
  484. raise FsckError("%s: data timestamp %d at row %d "
  485. "outside interval range [%d,%d)",
  486. path, ts[row], row + start,
  487. stime, etime)
  488. # Timestamp is zero and out of the expected range;
  489. # assume file ends with zeroed data and just truncate it.
  490. self.fix_table_by_truncating(
  491. path, tab, row + start,
  492. "data timestamp is out of range, and zero")
  493. # Verify that timestamps are monotonic
  494. match = numpy.diff(ts) <= 0
  495. if match.any():
  496. row = numpy.argmax(match)
  497. if ts[row+1] != 0:
  498. raise FsckError("%s: non-monotonic timestamp (%d -> %d)"
  499. " at row %d", path, ts[row], ts[row+1],
  500. row + start)
  501. # Timestamp is zero and non-monotonic;
  502. # assume file ends with zeroed data and just truncate it.
  503. self.fix_table_by_truncating(
  504. path, tab, row + start + 1,
  505. "data timestamp is non-monotonic, and zero")
  506. first_ts = ts[0]
  507. if last_ts is not None and first_ts <= last_ts:
  508. raise FsckError("%s: first interval timestamp %d is not "
  509. "greater than the previous last interval "
  510. "timestamp %d, at row %d",
  511. path, first_ts, last_ts, start)
  512. last_ts = ts[-1]
  513. # The previous errors are fixable, by removing the
  514. # offending intervals, or changing the data
  515. # timestamps. But these are probably unlikely errors,
  516. # so it's not worth implementing that yet.
  517. # Done
  518. done += count
  519. update(done)
  520. return done
  521. def fix_table_by_truncating(self, path, tab, row, reason):
  522. # Simple fix for bad data: truncate the table at the given row.
  523. # On retry, fix_bad_interval will correct the database and timestamps
  524. # to account for this truncation.
  525. msg = f"{path}: bad data in table, starting at row {row}: {reason}"
  526. if not self.fix:
  527. raise FixableFsckError(msg)
  528. err(f"\n{msg}\nWill try truncating table\n")
  529. (subdir, fname, offs, count) = tab._offset_from_row(row)
  530. tab._remove_or_truncate_file(subdir, fname, offs)
  531. raise RetryFsck