You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

485 lines
19 KiB

  1. # -*- coding: utf-8 -*-
  2. """Check database consistency, with some ability to fix problems.
  3. This should be able to fix cases where a database gets corrupted due
  4. to unexpected system shutdown, and detect other cases that may cause
  5. NilmDB to return errors when trying to manipulate the database."""
  6. import nilmdb.utils
  7. import nilmdb.server
  8. import nilmdb.client.numpyclient
  9. from nilmdb.utils.interval import IntervalError
  10. from nilmdb.server.interval import Interval, IntervalSet
  11. from nilmdb.utils.printf import printf, fprintf, sprintf
  12. from collections import defaultdict
  13. import sqlite3
  14. import os
  15. import sys
  16. import progressbar
  17. import re
  18. import shutil
  19. import pickle
  20. import numpy
  21. class FsckError(Exception):
  22. def __init__(self, msg="", *args):
  23. if args:
  24. msg = sprintf(msg, *args)
  25. Exception.__init__(self, msg)
  26. class FixableFsckError(FsckError):
  27. def __init__(self, msg="", *args):
  28. if args:
  29. msg = sprintf(msg, *args)
  30. FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
  31. class RetryFsck(FsckError):
  32. pass
  33. def log(format, *args):
  34. printf(format, *args)
  35. def err(format, *args):
  36. fprintf(sys.stderr, format, *args)
  37. # Decorator that retries a function if it returns a specific value
  38. def retry_if_raised(exc, message=None, max_retries=100):
  39. def f1(func):
  40. def f2(*args, **kwargs):
  41. for n in range(max_retries):
  42. try:
  43. return func(*args, **kwargs)
  44. except exc:
  45. if message:
  46. log("%s\n\n", message)
  47. raise Exception("Max number of retries (%d) exceeded; giving up")
  48. return f2
  49. return f1
  50. class Progress(object):
  51. def __init__(self, maxval):
  52. if maxval == 0:
  53. maxval = 1
  54. self.bar = progressbar.ProgressBar(
  55. maxval=maxval,
  56. widgets=[progressbar.Percentage(), ' ',
  57. progressbar.Bar(), ' ',
  58. progressbar.ETA()])
  59. if self.bar.term_width == 0:
  60. self.bar.term_width = 75
  61. def __enter__(self):
  62. self.bar.start()
  63. self.last_update = 0
  64. return self
  65. def __exit__(self, exc_type, exc_value, traceback):
  66. if exc_type is None:
  67. self.bar.finish()
  68. else:
  69. printf("\n")
  70. def update(self, val):
  71. self.bar.update(val)
  72. class Fsck(object):
  73. def __init__(self, path, fix=False):
  74. self.basepath = path
  75. self.sqlpath = os.path.join(path, "data.sql")
  76. self.bulkpath = os.path.join(path, "data")
  77. self.bulklock = os.path.join(path, "data.lock")
  78. self.fix = fix
  79. ### Main checks
  80. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  81. def check(self, skip_data=False):
  82. self.bulk = None
  83. self.sql = None
  84. try:
  85. self.check_paths()
  86. self.check_sql()
  87. self.check_streams()
  88. self.check_intervals()
  89. if skip_data:
  90. log("skipped data check\n")
  91. else:
  92. self.check_data()
  93. finally:
  94. if self.bulk:
  95. self.bulk.close()
  96. if self.sql:
  97. self.sql.commit()
  98. self.sql.close()
  99. log("ok\n")
  100. ### Check basic path structure
  101. def check_paths(self):
  102. log("checking paths\n")
  103. if self.bulk:
  104. self.bulk.close()
  105. if not os.path.isfile(self.sqlpath):
  106. raise FsckError("SQL database missing (%s)", self.sqlpath)
  107. if not os.path.isdir(self.bulkpath):
  108. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  109. with open(self.bulklock, "w") as lockfile:
  110. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  111. raise FsckError('Database already locked by another process\n'
  112. 'Make sure all other processes that might be '
  113. 'using the database are stopped.\n'
  114. 'Restarting apache will cause it to unlock '
  115. 'the db until a request is received.')
  116. # unlocked immediately
  117. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  118. ### Check SQL database health
  119. def check_sql(self):
  120. log("checking sqlite database\n")
  121. self.sql = sqlite3.connect(self.sqlpath)
  122. with self.sql:
  123. cur = self.sql.cursor()
  124. ver = cur.execute("PRAGMA user_version").fetchone()[0]
  125. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  126. if ver != good:
  127. raise FsckError("database version %d too old, should be %d",
  128. ver, good)
  129. self.stream_path = {}
  130. self.stream_layout = {}
  131. log(" loading paths\n")
  132. result = cur.execute("SELECT id, path, layout FROM streams")
  133. for r in result:
  134. if r[0] in self.stream_path:
  135. raise FsckError("duplicated ID %d in stream IDs", r[0])
  136. self.stream_path[r[0]] = r[1]
  137. self.stream_layout[r[0]] = r[2]
  138. log(" loading intervals\n")
  139. self.stream_interval = defaultdict(list)
  140. result = cur.execute("SELECT stream_id, start_time, end_time, "
  141. "start_pos, end_pos FROM ranges "
  142. "ORDER BY start_time")
  143. for r in result:
  144. if r[0] not in self.stream_path:
  145. raise FsckError("interval ID %d not in streams", r[0])
  146. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  147. log(" loading metadata\n")
  148. self.stream_meta = defaultdict(dict)
  149. result = cur.execute("SELECT stream_id, key, value FROM metadata")
  150. for r in result:
  151. if r[0] not in self.stream_path:
  152. raise FsckError("metadata ID %d not in streams", r[0])
  153. if r[1] in self.stream_meta[r[0]]:
  154. raise FsckError(
  155. "duplicate metadata key '%s' for stream %d",
  156. r[1], r[0])
  157. self.stream_meta[r[0]][r[1]] = r[2]
  158. ### Check streams and basic interval overlap
  159. def check_streams(self):
  160. ids = list(self.stream_path.keys())
  161. log("checking %s streams\n", "{:,d}".format(len(ids)))
  162. with Progress(len(ids)) as pbar:
  163. for i, sid in enumerate(ids):
  164. pbar.update(i)
  165. path = self.stream_path[sid]
  166. # unique path, valid layout
  167. if list(self.stream_path.values()).count(path) != 1:
  168. raise FsckError("duplicated path %s", path)
  169. layout = self.stream_layout[sid].split('_')[0]
  170. if layout not in ('int8', 'int16', 'int32', 'int64',
  171. 'uint8', 'uint16', 'uint32', 'uint64',
  172. 'float32', 'float64'):
  173. raise FsckError("bad layout %s for %s", layout, path)
  174. count = int(self.stream_layout[sid].split('_')[1])
  175. if count < 1 or count > 1024:
  176. raise FsckError("bad count %d for %s", count, path)
  177. # must exist in bulkdata
  178. bulk = self.bulkpath + path
  179. if not os.path.isdir(bulk):
  180. raise FsckError("%s: missing bulkdata dir", path)
  181. if not nilmdb.server.bulkdata.Table.exists(bulk):
  182. raise FsckError("%s: bad bulkdata table", path)
  183. # intervals don't overlap. Abuse IntervalSet to check
  184. # for intervals in file positions, too.
  185. timeiset = IntervalSet()
  186. posiset = IntervalSet()
  187. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  188. new = Interval(stime, etime)
  189. try:
  190. timeiset += new
  191. except IntervalError:
  192. raise FsckError("%s: overlap in intervals:\n"
  193. "set: %s\nnew: %s",
  194. path, str(timeiset), str(new))
  195. if spos != epos:
  196. new = Interval(spos, epos)
  197. try:
  198. posiset += new
  199. except IntervalError:
  200. raise FsckError("%s: overlap in file offsets:\n"
  201. "set: %s\nnew: %s",
  202. path, str(posiset), str(new))
  203. # check bulkdata
  204. self.check_bulkdata(sid, path, bulk)
  205. # Check that we can open bulkdata
  206. try:
  207. tab = None
  208. try:
  209. tab = nilmdb.server.bulkdata.Table(bulk)
  210. except Exception as e:
  211. raise FsckError("%s: can't open bulkdata: %s",
  212. path, str(e))
  213. finally:
  214. if tab:
  215. tab.close()
  216. ### Check that bulkdata is good enough to be opened
  217. @retry_if_raised(RetryFsck)
  218. def check_bulkdata(self, sid, path, bulk):
  219. with open(os.path.join(bulk, "_format"), "rb") as f:
  220. fmt = pickle.load(f)
  221. if fmt["version"] != 3:
  222. raise FsckError("%s: bad or unsupported bulkdata version %d",
  223. path, fmt["version"])
  224. row_per_file = int(fmt["rows_per_file"])
  225. if row_per_file < 1:
  226. raise FsckError(f"{path}: bad row_per_file {row_per_file}")
  227. files_per_dir = int(fmt["files_per_dir"])
  228. if files_per_dir < 1:
  229. raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
  230. layout = fmt["layout"]
  231. if layout != self.stream_layout[sid]:
  232. raise FsckError("%s: layout mismatch %s != %s", path,
  233. layout, self.stream_layout[sid])
  234. # Every file should have a size that's the multiple of the row size
  235. rkt = nilmdb.server.rocket.Rocket(layout, None)
  236. row_size = rkt.binary_size
  237. rkt.close()
  238. # Find all directories
  239. regex = re.compile("^[0-9a-f]{4,}$")
  240. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  241. key=lambda x: int(x, 16), reverse=True)
  242. for subdir in subdirs:
  243. # Find all files in that dir
  244. subpath = os.path.join(bulk, subdir)
  245. files = list(filter(regex.search, os.listdir(subpath)))
  246. if not files:
  247. self.fix_empty_subdir(subpath)
  248. raise RetryFsck
  249. # Verify that their size is a multiple of the row size
  250. for filename in files:
  251. filepath = os.path.join(subpath, filename)
  252. offset = os.path.getsize(filepath)
  253. if offset % row_size:
  254. self.fix_bad_filesize(path, filepath, offset, row_size)
  255. def fix_empty_subdir(self, subpath):
  256. msg = sprintf("bulkdata path %s is missing data files", subpath)
  257. if not self.fix:
  258. raise FixableFsckError(msg)
  259. # Try to fix it by just deleting whatever is present,
  260. # as long as it's only ".removed" files.
  261. err("\n%s\n", msg)
  262. for fn in os.listdir(subpath):
  263. if not fn.endswith(".removed"):
  264. raise FsckError("can't fix automatically: please manually "
  265. "remove the file %s and try again",
  266. os.path.join(subpath, fn))
  267. # Remove the whole thing
  268. err("Removing empty subpath\n")
  269. shutil.rmtree(subpath)
  270. raise RetryFsck
  271. def fix_bad_filesize(self, path, filepath, offset, row_size):
  272. extra = offset % row_size
  273. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  274. " of row size (%d): %d extra bytes present",
  275. path, filepath, offset, row_size, extra)
  276. if not self.fix:
  277. raise FixableFsckError(msg)
  278. # Try to fix it by just truncating the file
  279. err("\n%s\n", msg)
  280. newsize = offset - extra
  281. err("Truncating file to %d bytes and retrying\n", newsize)
  282. with open(filepath, "r+b") as f:
  283. f.truncate(newsize)
  284. raise RetryFsck
  285. ### Check interval endpoints
  286. def check_intervals(self):
  287. total_ints = sum(len(x) for x in list(self.stream_interval.values()))
  288. log("checking %s intervals\n", "{:,d}".format(total_ints))
  289. done = 0
  290. with Progress(total_ints) as pbar:
  291. for sid in self.stream_interval:
  292. try:
  293. bulk = self.bulkpath + self.stream_path[sid]
  294. tab = nilmdb.server.bulkdata.Table(bulk)
  295. def update(x):
  296. pbar.update(done + x)
  297. ints = self.stream_interval[sid]
  298. done += self.check_table_intervals(sid, ints, tab, update)
  299. finally:
  300. tab.close()
  301. def check_table_intervals(self, sid, ints, tab, update):
  302. # look in the table to make sure we can pick out the interval's
  303. # endpoints
  304. path = self.stream_path[sid] # noqa: F841 unused
  305. tab.file_open.cache_remove_all()
  306. for (i, intv) in enumerate(ints):
  307. update(i)
  308. (stime, etime, spos, epos) = intv
  309. if spos == epos and spos >= 0 and spos <= tab.nrows:
  310. continue
  311. try:
  312. srow = tab[spos] # noqa: F841 unused
  313. erow = tab[epos-1] # noqa: F841 unused
  314. except Exception as e:
  315. self.fix_bad_interval(sid, intv, tab, str(e))
  316. raise RetryFsck
  317. return len(ints)
  318. def fix_bad_interval(self, sid, intv, tab, msg):
  319. path = self.stream_path[sid]
  320. msg = sprintf("%s: interval %s error accessing rows: %s",
  321. path, str(intv), str(msg))
  322. if not self.fix:
  323. raise FixableFsckError(msg)
  324. err("\n%s\n", msg)
  325. (stime, etime, spos, epos) = intv
  326. # If it's just that the end pos is more than the number of rows
  327. # in the table, lower end pos and truncate interval time too.
  328. if spos < tab.nrows and epos >= tab.nrows:
  329. err("end position is past endrows, but it can be truncated\n")
  330. err("old end: time %d, pos %d\n", etime, epos)
  331. new_epos = tab.nrows
  332. new_etime = tab[new_epos-1] + 1
  333. err("new end: time %d, pos %d\n", new_etime, new_epos)
  334. if stime < new_etime:
  335. # Change it in SQL
  336. with self.sql:
  337. cur = self.sql.cursor()
  338. cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
  339. "WHERE stream_id=? AND start_time=? AND "
  340. "end_time=? AND start_pos=? AND end_pos=?",
  341. (new_etime, new_epos, sid, stime, etime,
  342. spos, epos))
  343. if cur.rowcount != 1:
  344. raise FsckError("failed to fix SQL database")
  345. raise RetryFsck
  346. err("actually it can't be truncated; times are bad too")
  347. # Otherwise, the only hope is to delete the interval entirely.
  348. err("*** Deleting the entire interval from SQL.\n")
  349. err("This may leave stale data on disk. To fix that, copy all\n")
  350. err("data from this stream to a new stream, then remove all data\n")
  351. err("from and destroy %s.\n", path)
  352. with self.sql:
  353. cur = self.sql.cursor()
  354. cur.execute("DELETE FROM ranges WHERE "
  355. "stream_id=? AND start_time=? AND "
  356. "end_time=? AND start_pos=? AND end_pos=?",
  357. (sid, stime, etime, spos, epos))
  358. if cur.rowcount != 1:
  359. raise FsckError("failed to remove interval")
  360. raise RetryFsck
  361. ### Check data in each interval
  362. def check_data(self):
  363. total_rows = sum(sum((y[3] - y[2]) for y in x)
  364. for x in list(self.stream_interval.values()))
  365. log("checking %s rows of data\n", "{:,d}".format(total_rows))
  366. done = 0
  367. with Progress(total_rows) as pbar:
  368. for sid in self.stream_interval:
  369. try:
  370. bulk = self.bulkpath + self.stream_path[sid]
  371. tab = nilmdb.server.bulkdata.Table(bulk)
  372. def update(x):
  373. pbar.update(done + x)
  374. ints = self.stream_interval[sid]
  375. done += self.check_table_data(sid, ints, tab, update)
  376. finally:
  377. tab.close()
  378. def check_table_data(self, sid, ints, tab, update):
  379. # Pull out all of the interval's data and verify that it's
  380. # monotonic.
  381. maxrows = 100000
  382. path = self.stream_path[sid]
  383. layout = self.stream_layout[sid]
  384. dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
  385. tab.file_open.cache_remove_all()
  386. done = 0
  387. for intv in ints:
  388. last_ts = None
  389. (stime, etime, spos, epos) = intv
  390. # Break interval into maxrows-sized chunks
  391. next_start = spos
  392. while next_start < epos:
  393. start = next_start
  394. stop = min(start + maxrows, epos)
  395. count = stop - start
  396. next_start = stop
  397. # Get raw data, convert to NumPy arary
  398. try:
  399. raw = tab.get_data(start, stop, binary=True)
  400. data = numpy.fromstring(raw, dtype)
  401. except Exception as e:
  402. raise FsckError(
  403. "%s: failed to grab rows %d through %d: %s",
  404. path, start, stop, repr(e))
  405. # Verify that timestamps are monotonic
  406. if (numpy.diff(data['timestamp']) <= 0).any():
  407. raise FsckError("%s: non-monotonic timestamp(s) in rows "
  408. "%d through %d", path, start, stop)
  409. first_ts = data['timestamp'][0]
  410. if last_ts is not None and first_ts <= last_ts:
  411. raise FsckError("%s: first interval timestamp %d is not "
  412. "greater than the previous last interval "
  413. "timestamp %d, at row %d",
  414. path, first_ts, last_ts, start)
  415. last_ts = data['timestamp'][-1]
  416. # These are probably fixable, by removing the offending
  417. # intervals. But I'm not going to bother implementing
  418. # that yet.
  419. # Done
  420. done += count
  421. update(done)
  422. return done