You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

397 lines
16 KiB

  1. # -*- coding: utf-8 -*-
  2. """Check database consistency"""
  3. import nilmdb.utils
  4. import nilmdb.server
  5. from nilmdb.utils.interval import IntervalError
  6. from nilmdb.server.interval import Interval, IntervalSet
  7. from nilmdb.utils.printf import *
  8. from nilmdb.utils.time import timestamp_to_string
  9. from collections import defaultdict
  10. import sqlite3
  11. import os
  12. import sys
  13. import progressbar
  14. import re
  15. import time
  16. import shutil
  17. import cPickle as pickle
  18. class FsckError(Exception):
  19. def __init__(self, msg = "", *args):
  20. if args:
  21. msg = sprintf(msg, *args)
  22. Exception.__init__(self, msg)
  23. class FixableFsckError(FsckError):
  24. def __init__(self, msg = "", *args):
  25. if args:
  26. msg = sprintf(msg, *args)
  27. FsckError.__init__(self, "%s\nThis may be fixable with \"-y\".", msg)
  28. class RetryFsck(FsckError):
  29. pass
  30. def log(format, *args):
  31. printf(format, *args)
  32. def err(format, *args):
  33. fprintf(sys.stderr, format, *args)
  34. # Decorator that retries a function if it returns a specific value
  35. def retry_if_raised(exc, message = None):
  36. def f1(func):
  37. def f2(*args, **kwargs):
  38. while True:
  39. try:
  40. return func(*args, **kwargs)
  41. except exc as e:
  42. if message:
  43. log("%s\n\n", message)
  44. return f2
  45. return f1
  46. class Progress(object):
  47. def __init__(self, maxval):
  48. self.bar = progressbar.ProgressBar(
  49. maxval = maxval,
  50. widgets = [ progressbar.Percentage(), ' ',
  51. progressbar.Bar(), ' ',
  52. progressbar.ETA() ])
  53. if self.bar.term_width == 0:
  54. self.bar.term_width = 75
  55. def __enter__(self):
  56. self.bar.start()
  57. self.last_update = 0
  58. return self
  59. def __exit__(self, exc_type, exc_value, traceback):
  60. if exc_type is None:
  61. self.bar.finish()
  62. else:
  63. printf("\n")
  64. def update(self, val):
  65. self.bar.update(val)
  66. class Fsck(object):
  67. def __init__(self, path, fix = False):
  68. self.basepath = path
  69. self.sqlpath = os.path.join(path, "data.sql")
  70. self.bulkpath = os.path.join(path, "data")
  71. self.bulklock = os.path.join(path, "data.lock")
  72. self.fix = fix
  73. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  74. def check(self):
  75. self.bulk = None
  76. self.sql = None
  77. try:
  78. self.check_paths()
  79. self.check_sql()
  80. self.check_streams()
  81. self.check_intervals()
  82. self.check_data()
  83. finally:
  84. if self.bulk:
  85. self.bulk.close()
  86. if self.sql:
  87. self.sql.commit()
  88. self.sql.close()
  89. log("ok\n")
  90. def check_paths(self):
  91. log("checking paths\n")
  92. if self.bulk:
  93. self.bulk.close()
  94. if not os.path.isfile(self.sqlpath):
  95. raise FsckError("SQL database missing (%s)", self.sqlpath)
  96. if not os.path.isdir(self.bulkpath):
  97. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  98. with open(self.bulklock, "w") as lockfile:
  99. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  100. raise FsckError('database already locked by another process')
  101. # unlocked immediately
  102. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  103. def check_sql(self):
  104. log("checking sqlite database\n")
  105. self.sql = sqlite3.connect(self.sqlpath)
  106. with self.sql:
  107. cur = self.sql.cursor()
  108. ver = cur.execute("PRAGMA user_version").fetchone()[0]
  109. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  110. if ver != good:
  111. raise FsckError("database version %d too old, should be %d",
  112. ver, good)
  113. self.stream_path = {}
  114. self.stream_layout = {}
  115. log(" loading paths\n")
  116. result = cur.execute("SELECT id, path, layout FROM streams")
  117. for r in result:
  118. if r[0] in self.stream_path:
  119. raise FsckError("duplicated ID %d in stream IDs", r[0])
  120. self.stream_path[r[0]] = r[1]
  121. self.stream_layout[r[0]] = r[2]
  122. log(" loading intervals\n")
  123. self.stream_interval = defaultdict(list)
  124. result = cur.execute("SELECT stream_id, start_time, end_time, "
  125. "start_pos, end_pos FROM ranges")
  126. for r in result:
  127. if r[0] not in self.stream_path:
  128. raise FsckError("interval ID %d not in streams", k)
  129. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  130. log(" loading metadata\n")
  131. self.stream_meta = defaultdict(dict)
  132. result = cur.execute("SELECT stream_id, key, value FROM metadata")
  133. for r in result:
  134. if r[0] not in self.stream_path:
  135. raise FsckError("metadata ID %d not in streams", k)
  136. if r[1] in self.stream_meta[r[0]]:
  137. raise FsckError("duplicate metadata key '%s' for stream %d",
  138. r[1], r[0])
  139. self.stream_meta[r[0]][r[1]] = r[2]
  140. def check_streams(self):
  141. ids = self.stream_path.keys()
  142. log("checking %d streams\n", len(ids))
  143. with Progress(len(ids)) as pbar:
  144. for i, sid in enumerate(ids):
  145. pbar.update(i)
  146. path = self.stream_path[sid]
  147. # unique path, valid layout
  148. if self.stream_path.values().count(path) != 1:
  149. raise FsckError("duplicated path %s", path)
  150. layout = self.stream_layout[sid].split('_')[0]
  151. if layout not in ('int8', 'int16', 'int32', 'int64',
  152. 'uint8', 'uint16', 'uint32', 'uint64',
  153. 'float32', 'float64'):
  154. raise FsckError("bad layout %s for %s", layout, path)
  155. count = int(self.stream_layout[sid].split('_')[1])
  156. if count < 1 or count > 1024:
  157. raise FsckError("bad count %d for %s", count, path)
  158. # must exist in bulkdata
  159. bulk = self.bulkpath + path
  160. if not os.path.isdir(bulk):
  161. raise FsckError("%s: missing bulkdata dir", path)
  162. if not nilmdb.server.bulkdata.Table.exists(bulk):
  163. raise FsckError("%s: bad bulkdata table", path)
  164. # intervals don't overlap. Abuse IntervalSet to check
  165. # for intervals in file positions, too.
  166. timeiset = IntervalSet()
  167. posiset = IntervalSet()
  168. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  169. new = Interval(stime, etime)
  170. try:
  171. timeiset += new
  172. except IntervalError:
  173. raise FsckError("%s: overlap in intervals:\n"
  174. "set: %s\nnew: %s",
  175. path, str(timeiset), str(new))
  176. if spos != epos:
  177. new = Interval(spos, epos)
  178. try:
  179. posiset += new
  180. except IntervalError:
  181. raise FsckError("%s: overlap in file offsets:\n"
  182. "set: %s\nnew: %s",
  183. path, str(posiset), str(new))
  184. # check bulkdata
  185. self.check_bulkdata(sid, path, bulk)
  186. # Check that we can open bulkdata
  187. try:
  188. tab = None
  189. try:
  190. tab = nilmdb.server.bulkdata.Table(bulk)
  191. except Exception as e:
  192. raise FsckError("%s: can't open bulkdata: %s",
  193. path, str(e))
  194. finally:
  195. if tab:
  196. tab.close()
  197. def fix_empty_subdir(self, subpath):
  198. msg = sprintf("bulkdata path %s is missing data files", subpath)
  199. if not self.fix:
  200. raise FixableFsckError(msg)
  201. # Try to fix it by just deleting whatever is present,
  202. # as long as it's only ".removed" files.
  203. err("\n%s\n", msg)
  204. for fn in os.listdir(subpath):
  205. if not fn.endswith(".removed"):
  206. raise FsckError("can't fix automatically: please manually "
  207. "remove the file %s and try again",
  208. os.path.join(subpath, fn))
  209. # Remove the whole thing
  210. err("Removing empty subpath\n")
  211. shutil.rmtree(subpath)
  212. raise RetryFsck
  213. def fix_bad_filesize(self, path, filepath, offset, row_size):
  214. extra = offset % row_size
  215. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  216. " of row size (%d): %d extra bytes present",
  217. path, filepath, offset, row_size, extra)
  218. if not self.fix:
  219. raise FixableFsckError(msg)
  220. # Try to fix it by just truncating the file
  221. err("\n%s\n", msg)
  222. newsize = offset - extra
  223. err("Truncating file to %d bytes and retrying\n", newsize)
  224. with open(filepath, "r+b") as f:
  225. f.truncate(newsize)
  226. raise RetryFsck
  227. @retry_if_raised(RetryFsck)
  228. def check_bulkdata(self, sid, path, bulk):
  229. with open(os.path.join(bulk, "_format"), "rb") as f:
  230. fmt = pickle.load(f)
  231. if fmt["version"] != 3:
  232. raise FsckError("%s: bad or unsupported bulkdata version %d",
  233. path, fmt["version"])
  234. row_per_file = int(fmt["rows_per_file"])
  235. files_per_dir = int(fmt["files_per_dir"])
  236. layout = fmt["layout"]
  237. if layout != self.stream_layout[sid]:
  238. raise FsckError("%s: layout mismatch %s != %s", path,
  239. layout, self.stream_layout[sid])
  240. # Every file should have a size that's the multiple of the row size
  241. rkt = nilmdb.server.rocket.Rocket(layout, None)
  242. row_size = rkt.binary_size
  243. rkt.close()
  244. # Find all directories
  245. regex = re.compile("^[0-9a-f]{4,}$")
  246. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  247. key = lambda x: int(x, 16), reverse = True)
  248. for subdir in subdirs:
  249. # Find all files in that dir
  250. subpath = os.path.join(bulk, subdir)
  251. files = filter(regex.search, os.listdir(subpath))
  252. if not files:
  253. self.fix_empty_subdir(subpath)
  254. raise RetryFsck
  255. # Verify that their size is a multiple of the row size
  256. for filename in files:
  257. filepath = os.path.join(subpath, filename)
  258. offset = os.path.getsize(filepath)
  259. if offset % row_size:
  260. self.fix_bad_filesize(path, filepath, offset, row_size)
  261. def _check_for_each_interval(self, checkfunc):
  262. total_ints = sum(len(x) for x in self.stream_interval.values())
  263. checked = 0
  264. with Progress(total_ints) as pbar:
  265. for sid in self.stream_interval:
  266. try:
  267. bulk = self.bulkpath + self.stream_path[sid]
  268. tab = nilmdb.server.bulkdata.Table(bulk)
  269. def update(x):
  270. pbar.update(checked + x)
  271. ints = self.stream_interval[sid]
  272. checkfunc(sid, ints, tab, update)
  273. checked += len(ints)
  274. finally:
  275. tab.close()
  276. def check_intervals(self):
  277. total_ints = sum(len(x) for x in self.stream_interval.values())
  278. log("checking %d intervals\n", total_ints)
  279. self._check_for_each_interval(self.check_table_intervals)
  280. def fix_bad_interval(self, sid, intv, tab, msg):
  281. path = self.stream_path[sid]
  282. msg = sprintf("%s: interval %s error accessing rows: %s",
  283. path, str(intv), str(msg))
  284. if not self.fix:
  285. raise FixableFsckError(msg)
  286. err("\n%s\n", msg)
  287. (stime, etime, spos, epos) = intv
  288. # If it's just that the end pos is more than the number of rows
  289. # in the table, lower end pos and truncate interval time too.
  290. if spos < tab.nrows and epos >= tab.nrows:
  291. err("end position is past endrows, but it can be truncated\n")
  292. err("old end: time %d, pos %d\n", etime, epos)
  293. new_epos = tab.nrows
  294. new_etime = tab[new_epos-1] + 1
  295. err("new end: time %d, pos %d\n", new_etime, new_epos)
  296. if stime < new_etime:
  297. # Change it in SQL
  298. with self.sql:
  299. cur = self.sql.cursor()
  300. cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
  301. "WHERE stream_id=? AND start_time=? AND "
  302. "end_time=? AND start_pos=? AND end_pos=?",
  303. (new_etime, new_epos, sid, stime, etime,
  304. spos, epos))
  305. if cur.rowcount != 1:
  306. raise FsckError("failed to fix SQL database")
  307. raise RetryFsck
  308. err("actually it can't be truncated; times are bad too")
  309. # Otherwise, the only hope is to delete the interval entirely.
  310. err("*** Deleting the entire interval from SQL.\n")
  311. err("This may leave stale data on disk. To fix that, copy all\n")
  312. err("data from this stream to a new stream, then remove all data\n")
  313. err("and destroy %s.\n")
  314. with self.sql:
  315. cur = self.sql.cursor()
  316. cur.execute("DELETE FROM ranges WHERE "
  317. "stream_id=? AND start_time=? AND "
  318. "end_time=? AND start_pos=? AND end_pos=?",
  319. (sid, stime, etime, spos, epos))
  320. if cur.rowcount != 1:
  321. raise FsckError("failed to remove interval")
  322. raise RetryFsck
  323. def check_table_intervals(self, sid, ints, tab, update):
  324. # look in the table to make sure we can pick out the interval's
  325. # endpoints
  326. path = self.stream_path[sid]
  327. tab.file_open.cache_remove_all()
  328. for (i, intv) in enumerate(ints):
  329. (stime, etime, spos, epos) = intv
  330. update(i)
  331. if spos == epos and spos >= 0 and spos <= tab.nrows:
  332. continue
  333. try:
  334. srow = tab[spos]
  335. erow = tab[epos-1]
  336. except Exception as e:
  337. self.fix_bad_interval(sid, intv, tab, str(e))
  338. raise RetryFsck
  339. def check_data(self):
  340. total_rows = sum(sum((y[3] - y[2]) for y in x)
  341. for x in self.stream_interval.values())
  342. log("checking %d rows of data\n", total_rows)
  343. self._check_for_each_interval(self.check_table_data)
  344. def check_table_data(self, sid, ints, tab, update):
  345. # look in the table to make sure we can pick out all of
  346. # the interval's data, and that the data is monotonic
  347. path = self.stream_path[sid]
  348. tab.file_open.cache_remove_all()
  349. for (i, intv) in enumerate(ints):
  350. (stime, etime, spos, epos) = intv
  351. update(i)
  352. last_ts = None
  353. for row in xrange(spos, epos):
  354. ts = tab[row]
  355. if ts <= last_ts:
  356. raise FsckError("%s: interval %s has non-monotonic "
  357. "timestamps: %d and then %d\n",
  358. path, intv, last_ts, ts)
  359. if ts < stime or ts >= etime:
  360. raise FsckError("%s: interval %s has out-of-bound "
  361. "timestamp %d\n", ps, intv, ts)