You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

523 lines
21 KiB

  1. # -*- coding: utf-8 -*-
  2. """Check database consistency, with some ability to fix problems.
  3. This should be able to fix cases where a database gets corrupted due
  4. to unexpected system shutdown, and detect other cases that may cause
  5. NilmDB to return errors when trying to manipulate the database."""
  6. import nilmdb.utils
  7. import nilmdb.server
  8. import nilmdb.client.numpyclient
  9. from nilmdb.utils.interval import IntervalError
  10. from nilmdb.server.interval import Interval, IntervalSet
  11. from nilmdb.utils.printf import printf, fprintf, sprintf
  12. from collections import defaultdict
  13. import sqlite3
  14. import os
  15. import sys
  16. import progressbar
  17. import re
  18. import shutil
  19. import pickle
  20. import numpy
  21. class FsckError(Exception):
  22. def __init__(self, msg="", *args):
  23. if args:
  24. msg = sprintf(msg, *args)
  25. Exception.__init__(self, msg)
  26. class FixableFsckError(FsckError):
  27. def __init__(self, msg=""):
  28. FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
  29. class RetryFsck(FsckError):
  30. pass
  31. def log(format, *args):
  32. printf(format, *args)
  33. def err(format, *args):
  34. fprintf(sys.stderr, format, *args)
  35. # Decorator that retries a function if it returns a specific value
  36. def retry_if_raised(exc, message=None, max_retries=100):
  37. def f1(func):
  38. def f2(*args, **kwargs):
  39. for n in range(max_retries):
  40. try:
  41. return func(*args, **kwargs)
  42. except exc:
  43. if message:
  44. log("%s\n\n", message)
  45. raise Exception("Max number of retries (%d) exceeded; giving up" %
  46. max_retries)
  47. return f2
  48. return f1
  49. class Progress(object):
  50. def __init__(self, maxval):
  51. if maxval == 0:
  52. maxval = 1
  53. self.bar = progressbar.ProgressBar(
  54. maxval=maxval,
  55. widgets=[progressbar.Percentage(), ' ',
  56. progressbar.Bar(), ' ',
  57. progressbar.ETA()])
  58. self.bar.term_width = self.bar.term_width or 75
  59. def __enter__(self):
  60. self.bar.start()
  61. self.last_update = 0
  62. return self
  63. def __exit__(self, exc_type, exc_value, traceback):
  64. if exc_type is None:
  65. self.bar.finish()
  66. else:
  67. printf("\n")
  68. def update(self, val):
  69. self.bar.update(val)
  70. class Fsck(object):
  71. def __init__(self, path, fix=False):
  72. self.basepath = path
  73. self.sqlpath = os.path.join(path, "data.sql")
  74. self.bulkpath = os.path.join(path, "data")
  75. self.bulklock = os.path.join(path, "data.lock")
  76. self.fix = fix
  77. ### Main checks
  78. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  79. def check(self, skip_data=False):
  80. self.bulk = None
  81. self.sql = None
  82. try:
  83. self.check_paths()
  84. self.check_sql()
  85. self.check_streams()
  86. self.check_intervals()
  87. if skip_data:
  88. log("skipped data check\n")
  89. else:
  90. self.check_data()
  91. finally:
  92. if self.bulk:
  93. self.bulk.close()
  94. if self.sql: # pragma: no cover
  95. # (coverage doesn't handle finally clauses correctly;
  96. # both branches here are tested)
  97. self.sql.commit()
  98. self.sql.close()
  99. log("ok\n")
  100. ### Check basic path structure
  101. def check_paths(self):
  102. log("checking paths\n")
  103. if self.bulk:
  104. self.bulk.close()
  105. if not os.path.isfile(self.sqlpath):
  106. raise FsckError("SQL database missing (%s)", self.sqlpath)
  107. if not os.path.isdir(self.bulkpath):
  108. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  109. with open(self.bulklock, "w") as lockfile:
  110. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  111. raise FsckError('Database already locked by another process\n'
  112. 'Make sure all other processes that might be '
  113. 'using the database are stopped.\n'
  114. 'Restarting apache will cause it to unlock '
  115. 'the db until a request is received.')
  116. # unlocked immediately
  117. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  118. ### Check SQL database health
  119. def check_sql(self):
  120. log("checking sqlite database\n")
  121. self.sql = sqlite3.connect(self.sqlpath)
  122. with self.sql:
  123. cur = self.sql.cursor()
  124. ver = cur.execute("PRAGMA user_version").fetchone()[0]
  125. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  126. if ver != good:
  127. raise FsckError("database version %d too old, should be %d",
  128. ver, good)
  129. self.stream_path = {}
  130. self.stream_layout = {}
  131. log(" loading paths\n")
  132. result = cur.execute("SELECT id, path, layout FROM streams")
  133. for r in result:
  134. if r[0] in self.stream_path:
  135. raise FsckError("duplicated ID %d in stream IDs", r[0])
  136. self.stream_path[r[0]] = r[1]
  137. self.stream_layout[r[0]] = r[2]
  138. log(" loading intervals\n")
  139. self.stream_interval = defaultdict(list)
  140. result = cur.execute("SELECT stream_id, start_time, end_time, "
  141. "start_pos, end_pos FROM ranges "
  142. "ORDER BY start_time")
  143. for r in result:
  144. if r[0] not in self.stream_path:
  145. raise FsckError("interval ID %d not in streams", r[0])
  146. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  147. log(" loading metadata\n")
  148. self.stream_meta = defaultdict(dict)
  149. result = cur.execute("SELECT stream_id, key, value FROM metadata")
  150. for r in result:
  151. if r[0] not in self.stream_path:
  152. raise FsckError("metadata ID %d not in streams", r[0])
  153. if r[1] in self.stream_meta[r[0]]:
  154. raise FsckError(
  155. "duplicate metadata key '%s' for stream %d",
  156. r[1], r[0])
  157. self.stream_meta[r[0]][r[1]] = r[2]
  158. ### Check streams and basic interval overlap
  159. def check_streams(self):
  160. ids = list(self.stream_path.keys())
  161. log("checking %s streams\n", "{:,d}".format(len(ids)))
  162. with Progress(len(ids)) as pbar:
  163. for i, sid in enumerate(ids):
  164. pbar.update(i)
  165. path = self.stream_path[sid]
  166. # unique path, valid layout
  167. if list(self.stream_path.values()).count(path) != 1:
  168. raise FsckError("duplicated path %s", path)
  169. layout = self.stream_layout[sid].split('_')[0]
  170. if layout not in ('int8', 'int16', 'int32', 'int64',
  171. 'uint8', 'uint16', 'uint32', 'uint64',
  172. 'float32', 'float64'):
  173. raise FsckError("bad layout %s for %s", layout, path)
  174. count = int(self.stream_layout[sid].split('_')[1])
  175. if count < 1 or count > 1024:
  176. raise FsckError("bad count %d for %s", count, path)
  177. # must exist in bulkdata
  178. bulk = self.bulkpath + path
  179. bulk = bulk.encode('utf-8')
  180. if not os.path.isdir(bulk):
  181. raise FsckError("%s: missing bulkdata dir", path)
  182. if not nilmdb.server.bulkdata.Table.exists(bulk):
  183. raise FsckError("%s: bad bulkdata table", path)
  184. # intervals don't overlap. Abuse IntervalSet to check
  185. # for intervals in file positions, too.
  186. timeiset = IntervalSet()
  187. posiset = IntervalSet()
  188. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  189. new = Interval(stime, etime)
  190. try:
  191. timeiset += new
  192. except IntervalError:
  193. raise FsckError("%s: overlap in intervals:\n"
  194. "set: %s\nnew: %s",
  195. path, str(timeiset), str(new))
  196. if spos != epos:
  197. new = Interval(spos, epos)
  198. try:
  199. posiset += new
  200. except IntervalError:
  201. raise FsckError("%s: overlap in file offsets:\n"
  202. "set: %s\nnew: %s",
  203. path, str(posiset), str(new))
  204. # check bulkdata
  205. self.check_bulkdata(sid, path, bulk)
  206. # Check that we can open bulkdata
  207. try:
  208. tab = nilmdb.server.bulkdata.Table(bulk)
  209. except Exception as e: # pragma: no cover --
  210. # No coverage here because, in the current code,
  211. # everything that would cause the bulkdata to fail
  212. # has been already checked.
  213. raise FsckError("%s: can't open bulkdata: %s",
  214. path, str(e))
  215. tab.close()
  216. ### Check that bulkdata is good enough to be opened
  217. @retry_if_raised(RetryFsck)
  218. def check_bulkdata(self, sid, path, bulk):
  219. with open(os.path.join(bulk, b"_format"), "rb") as f:
  220. fmt = pickle.load(f)
  221. if fmt["version"] != 3:
  222. raise FsckError("%s: bad or unsupported bulkdata version %d",
  223. path, fmt["version"])
  224. rows_per_file = int(fmt["rows_per_file"])
  225. if rows_per_file < 1:
  226. raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
  227. files_per_dir = int(fmt["files_per_dir"])
  228. if files_per_dir < 1:
  229. raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
  230. layout = fmt["layout"]
  231. if layout != self.stream_layout[sid]:
  232. raise FsckError("%s: layout mismatch %s != %s", path,
  233. layout, self.stream_layout[sid])
  234. # Every file should have a size that's the multiple of the row size
  235. rkt = nilmdb.server.rocket.Rocket(layout, None)
  236. row_size = rkt.binary_size
  237. rkt.close()
  238. # Find all directories
  239. regex = re.compile(b"^[0-9a-f]{4,}$")
  240. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  241. key=lambda x: int(x, 16), reverse=True)
  242. for subdir in subdirs:
  243. # Find all files in that dir
  244. subpath = os.path.join(bulk, subdir)
  245. files = list(filter(regex.search, os.listdir(subpath)))
  246. if not files:
  247. self.fix_empty_subdir(subpath)
  248. # Verify that their size is a multiple of the row size
  249. for filename in files:
  250. filepath = os.path.join(subpath, filename)
  251. offset = os.path.getsize(filepath)
  252. if offset % row_size:
  253. self.fix_bad_filesize(path, filepath, offset, row_size)
  254. def fix_empty_subdir(self, subpath):
  255. msg = sprintf("bulkdata path %s is missing data files", subpath)
  256. if not self.fix:
  257. raise FixableFsckError(msg)
  258. # Try to fix it by just deleting whatever is present,
  259. # as long as it's only ".removed" files.
  260. err("\n%s\n", msg)
  261. for fn in os.listdir(subpath):
  262. if not fn.endswith(b".removed"):
  263. raise FsckError("can't fix automatically: please manually "
  264. "remove the file '%s' and try again",
  265. os.path.join(subpath, fn).decode(
  266. 'utf-8', errors='backslashreplace'))
  267. # Remove the whole thing
  268. err("Removing empty subpath\n")
  269. shutil.rmtree(subpath)
  270. raise RetryFsck
  271. def fix_bad_filesize(self, path, filepath, offset, row_size):
  272. extra = offset % row_size
  273. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  274. " of row size (%d): %d extra bytes present",
  275. path, filepath, offset, row_size, extra)
  276. if not self.fix:
  277. raise FixableFsckError(msg)
  278. # Try to fix it by just truncating the file
  279. err("\n%s\n", msg)
  280. newsize = offset - extra
  281. err("Truncating file to %d bytes and retrying\n", newsize)
  282. with open(filepath, "r+b") as f:
  283. f.truncate(newsize)
  284. raise RetryFsck
  285. def fix_remove_stream(self, sid, path, bulk, reason):
  286. msg = f"stream {path} is corrupted: {reason}"
  287. if not self.fix:
  288. raise FixableFsckError(msg)
  289. # Remove the stream from disk and the database
  290. err(f"\n{msg}\n")
  291. err(f"Removing stream {path} from disk and database\n")
  292. shutil.rmtree(bulk)
  293. with self.sql:
  294. cur = self.sql.cursor()
  295. cur.execute("DELETE FROM streams WHERE id=?",
  296. (sid,))
  297. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  298. raise FsckError("failed to remove stream")
  299. cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
  300. cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
  301. raise RetryFsck
  302. ### Check interval endpoints
  303. def check_intervals(self):
  304. total_ints = sum(len(x) for x in list(self.stream_interval.values()))
  305. log("checking %s intervals\n", "{:,d}".format(total_ints))
  306. done = 0
  307. with Progress(total_ints) as pbar:
  308. for sid in self.stream_interval:
  309. try:
  310. bulk = self.bulkpath + self.stream_path[sid]
  311. bulk = bulk.encode('utf-8')
  312. tab = nilmdb.server.bulkdata.Table(bulk)
  313. def update(x):
  314. pbar.update(done + x)
  315. ints = self.stream_interval[sid]
  316. done += self.check_table_intervals(sid, ints, tab, update)
  317. finally:
  318. tab.close()
  319. def check_table_intervals(self, sid, ints, tab, update):
  320. # look in the table to make sure we can pick out the interval's
  321. # endpoints
  322. path = self.stream_path[sid] # noqa: F841 unused
  323. tab.file_open.cache_remove_all()
  324. for (i, intv) in enumerate(ints):
  325. update(i)
  326. (stime, etime, spos, epos) = intv
  327. if spos == epos and spos >= 0 and spos <= tab.nrows:
  328. continue
  329. try:
  330. srow = tab[spos] # noqa: F841 unused
  331. erow = tab[epos-1] # noqa: F841 unused
  332. except Exception as e:
  333. self.fix_bad_interval(sid, intv, tab, str(e))
  334. return len(ints)
  335. def fix_bad_interval(self, sid, intv, tab, msg):
  336. path = self.stream_path[sid]
  337. msg = sprintf("%s: interval %s error accessing rows: %s",
  338. path, str(intv), str(msg))
  339. if not self.fix:
  340. raise FixableFsckError(msg)
  341. err("\n%s\n", msg)
  342. (stime, etime, spos, epos) = intv
  343. # If it's just that the end pos is more than the number of rows
  344. # in the table, lower end pos and truncate interval time too.
  345. if spos < tab.nrows and epos >= tab.nrows:
  346. err("end position is past endrows, but it can be truncated\n")
  347. err("old end: time %d, pos %d\n", etime, epos)
  348. new_epos = tab.nrows
  349. new_etime = tab[new_epos-1] + 1
  350. err("new end: time %d, pos %d\n", new_etime, new_epos)
  351. if stime < new_etime:
  352. # Change it in SQL
  353. with self.sql:
  354. cur = self.sql.cursor()
  355. cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
  356. "WHERE stream_id=? AND start_time=? AND "
  357. "end_time=? AND start_pos=? AND end_pos=?",
  358. (new_etime, new_epos, sid, stime, etime,
  359. spos, epos))
  360. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  361. raise FsckError("failed to fix SQL database")
  362. raise RetryFsck
  363. err("actually it can't be truncated; times are bad too\n")
  364. # Otherwise, the only hope is to delete the interval entirely.
  365. err("*** Deleting the entire interval from SQL.\n")
  366. err("This may leave stale data on disk. To fix that, copy all\n")
  367. err("data from this stream to a new stream using nilm-copy, then\n")
  368. err("remove all data from and destroy %s.\n", path)
  369. with self.sql:
  370. cur = self.sql.cursor()
  371. cur.execute("DELETE FROM ranges WHERE "
  372. "stream_id=? AND start_time=? AND "
  373. "end_time=? AND start_pos=? AND end_pos=?",
  374. (sid, stime, etime, spos, epos))
  375. if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
  376. raise FsckError("failed to remove interval")
  377. raise RetryFsck
  378. ### Check data in each interval
  379. def check_data(self):
  380. total_rows = sum(sum((y[3] - y[2]) for y in x)
  381. for x in list(self.stream_interval.values()))
  382. log("checking %s rows of data\n", "{:,d}".format(total_rows))
  383. done = 0
  384. with Progress(total_rows) as pbar:
  385. for sid in self.stream_interval:
  386. try:
  387. bulk = self.bulkpath + self.stream_path[sid]
  388. bulk = bulk.encode('utf-8')
  389. tab = nilmdb.server.bulkdata.Table(bulk)
  390. def update(x):
  391. pbar.update(done + x)
  392. ints = self.stream_interval[sid]
  393. done += self.check_table_data(sid, ints, tab, update)
  394. finally:
  395. tab.close()
  396. def check_table_data(self, sid, ints, tab, update):
  397. # Pull out all of the interval's data and verify that it's
  398. # monotonic.
  399. maxrows = getattr(self, 'maxrows_override', 100000)
  400. path = self.stream_path[sid]
  401. layout = self.stream_layout[sid]
  402. dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
  403. tab.file_open.cache_remove_all()
  404. done = 0
  405. for intv in ints:
  406. last_ts = None
  407. (stime, etime, spos, epos) = intv
  408. # Break interval into maxrows-sized chunks
  409. next_start = spos
  410. while next_start < epos:
  411. start = next_start
  412. stop = min(start + maxrows, epos)
  413. count = stop - start
  414. next_start = stop
  415. # Get raw data, convert to NumPy arary
  416. try:
  417. raw = tab.get_data(start, stop, binary=True)
  418. data = numpy.frombuffer(raw, dtype)
  419. except Exception as e: # pragma: no cover
  420. # No coverage because it's hard to trigger this -- earlier
  421. # checks check the ranges, so this would probably be a real
  422. # disk error, malloc failure, etc.
  423. raise FsckError(
  424. "%s: failed to grab rows %d through %d: %s",
  425. path, start, stop, repr(e))
  426. ts = data['timestamp']
  427. # Verify that all timestamps are in range.
  428. match = (ts < stime) | (ts >= etime)
  429. if match.any():
  430. row = start + numpy.argmax(match)
  431. raise FsckError("%s: data timestamp %d at row %d "
  432. "outside interval range [%d,%d)",
  433. path, data['timestamp'][row], row,
  434. stime, etime)
  435. # Verify that timestamps are monotonic
  436. match = numpy.diff(ts) <= 0
  437. if match.any():
  438. row = start + numpy.argmax(match)
  439. raise FsckError("%s: non-monotonic timestamp (%d -> %d) "
  440. "at row %d", path, ts[row], ts[row+1], row)
  441. first_ts = ts[0]
  442. if last_ts is not None and first_ts <= last_ts:
  443. raise FsckError("%s: first interval timestamp %d is not "
  444. "greater than the previous last interval "
  445. "timestamp %d, at row %d",
  446. path, first_ts, last_ts, start)
  447. last_ts = ts[-1]
  448. # The previous errors are fixable, by removing the
  449. # offending intervals, or changing the data
  450. # timestamps. But these are probably unlikely errors,
  451. # so it's not worth implementing that yet.
  452. # Done
  453. done += count
  454. update(done)
  455. return done