You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

308 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. """Check database consistency"""
  3. import nilmdb.utils
  4. import nilmdb.server
  5. from nilmdb.utils.interval import IntervalError
  6. from nilmdb.server.interval import Interval, IntervalSet
  7. from nilmdb.utils.printf import *
  8. from nilmdb.utils.time import timestamp_to_string
  9. from collections import defaultdict
  10. import sqlite3
  11. import os
  12. import sys
  13. import progressbar
  14. import re
  15. import time
  16. import shutil
  17. import cPickle as pickle
  18. class FsckError(Exception):
  19. def __init__(self, msg = "", *args):
  20. if args:
  21. msg = sprintf(msg, *args)
  22. Exception.__init__(self, msg)
  23. class FixableFsckError(FsckError):
  24. def __init__(self, msg = "", *args):
  25. if args:
  26. msg = sprintf(msg, *args)
  27. FsckError.__init__(self, "%s\nThis may be fixable with \"-y\".", msg)
  28. class RetryFsck(FsckError):
  29. pass
  30. def log(format, *args):
  31. printf(format, *args)
  32. def err(format, *args):
  33. fprintf(sys.stderr, format, *args)
  34. # Decorator that retries a function if it returns a specific value
  35. def retry_if_raised(exc, message = None):
  36. def f1(func):
  37. def f2(*args, **kwargs):
  38. while True:
  39. try:
  40. return func(*args, **kwargs)
  41. except exc as e:
  42. if message:
  43. log("%s\n\n", message)
  44. return f2
  45. return f1
  46. class Progress(object):
  47. def __init__(self, maxval):
  48. self.bar = progressbar.ProgressBar(maxval = maxval)
  49. if self.bar.term_width == 0:
  50. self.bar.term_width = 75
  51. def __enter__(self):
  52. self.bar.start()
  53. self.last_update = 0
  54. return self
  55. def __exit__(self, exc_type, exc_value, traceback):
  56. if exc_type is None:
  57. self.bar.finish()
  58. else:
  59. printf("\n")
  60. def update(self, val):
  61. self.bar.update(val)
  62. class Fsck(object):
  63. def __init__(self, path, fix = False):
  64. self.basepath = path
  65. self.sqlpath = os.path.join(path, "data.sql")
  66. self.bulkpath = os.path.join(path, "data")
  67. self.bulklock = os.path.join(path, "data.lock")
  68. self.fix = fix
  69. @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
  70. def check(self):
  71. self.check_paths()
  72. self.check_sql()
  73. self.check_streams()
  74. self.check_intervals()
  75. log("ok\n")
  76. def check_paths(self):
  77. log("checking paths\n")
  78. if not os.path.isfile(self.sqlpath):
  79. raise FsckError("SQL database missing (%s)", self.sqlpath)
  80. if not os.path.isdir(self.bulkpath):
  81. raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
  82. with open(self.bulklock, "w") as lockfile:
  83. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  84. raise FsckError('database already locked by another process')
  85. self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
  86. # override must_close warning
  87. if "_must_close" in dir(self.bulk):
  88. del self.bulk._must_close
  89. def check_sql(self):
  90. log("checking sqlite database\n")
  91. self.sql = sqlite3.connect(self.sqlpath)
  92. with self.sql as con:
  93. ver = con.execute("PRAGMA user_version").fetchone()[0]
  94. good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
  95. if ver != good:
  96. raise FsckError("database version %d too old, should be %d",
  97. ver, good)
  98. self.stream_path = {}
  99. self.stream_layout = {}
  100. log(" loading paths\n")
  101. result = con.execute("SELECT id, path, layout FROM streams")
  102. for r in result:
  103. if r[0] in self.stream_path:
  104. raise FsckError("duplicated ID %d in stream IDs", r[0])
  105. self.stream_path[r[0]] = r[1]
  106. self.stream_layout[r[0]] = r[2]
  107. log(" loading intervals\n")
  108. self.stream_interval = defaultdict(list)
  109. result = con.execute("SELECT stream_id, start_time, end_time, "
  110. "start_pos, end_pos FROM ranges")
  111. for r in result:
  112. if r[0] not in self.stream_path:
  113. raise FsckError("interval ID %d not in streams", k)
  114. self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
  115. log(" loading metadata\n")
  116. self.stream_meta = defaultdict(dict)
  117. result = con.execute("SELECT stream_id, key, value FROM metadata")
  118. for r in result:
  119. if r[0] not in self.stream_path:
  120. raise FsckError("metadata ID %d not in streams", k)
  121. if r[1] in self.stream_meta[r[0]]:
  122. raise FsckError("duplicate metadata key '%s' for stream %d",
  123. r[1], r[0])
  124. self.stream_meta[r[0]][r[1]] = r[2]
  125. def check_streams(self):
  126. ids = self.stream_path.keys()
  127. log("checking %d streams\n", len(ids))
  128. with Progress(len(ids)) as pbar:
  129. for i, sid in enumerate(ids):
  130. pbar.update(i)
  131. path = self.stream_path[sid]
  132. # unique path, valid layout
  133. if self.stream_path.values().count(path) != 1:
  134. raise FsckError("duplicated path %s", path)
  135. layout = self.stream_layout[sid].split('_')[0]
  136. if layout not in ('int8', 'int16', 'int32', 'int64',
  137. 'uint8', 'uint16', 'uint32', 'uint64',
  138. 'float32', 'float64'):
  139. raise FsckError("bad layout %s for %s", layout, path)
  140. count = int(self.stream_layout[sid].split('_')[1])
  141. if count < 1 or count > 1024:
  142. raise FsckError("bad count %d for %s", count, path)
  143. # must exist in bulkdata
  144. bulk = self.bulkpath + path
  145. if not os.path.isdir(bulk):
  146. raise FsckError("%s: missing bulkdata dir", path)
  147. if not nilmdb.server.bulkdata.Table.exists(bulk):
  148. raise FsckError("%s: bad bulkdata table", path)
  149. # intervals don't overlap. Abuse IntervalSet to check
  150. # for intervals in file positions, too.
  151. timeiset = IntervalSet()
  152. posiset = IntervalSet()
  153. for (stime, etime, spos, epos) in self.stream_interval[sid]:
  154. new = Interval(stime, etime)
  155. try:
  156. timeiset += new
  157. except IntervalError:
  158. raise FsckError("%s: overlap in intervals:\n"
  159. "set: %s\nnew: %s",
  160. path, str(timeiset), str(new))
  161. if spos != epos:
  162. new = Interval(spos, epos)
  163. try:
  164. posiset += new
  165. except IntervalError:
  166. raise FsckError("%s: overlap in file offsets:\n"
  167. "set: %s\nnew: %s",
  168. path, str(posiset), str(new))
  169. # check bulkdata
  170. self.check_bulkdata(sid, path, bulk)
  171. # Check that we can open bulkdata
  172. try:
  173. tab = None
  174. try:
  175. tab = nilmdb.server.bulkdata.Table(bulk)
  176. except Exception as e:
  177. raise FsckError("%s: can't open bulkdata: %s",
  178. path, str(e))
  179. finally:
  180. if tab:
  181. tab.close()
  182. def fix_empty_subdir(self, subpath):
  183. msg = sprintf("bulkdata path %s is missing data files", subpath)
  184. if not self.fix:
  185. raise FixableFsckError(msg)
  186. # Try to fix it by just deleting whatever is present,
  187. # as long as it's only ".removed" files.
  188. err("\n%s\n", msg)
  189. for fn in os.listdir(subpath):
  190. if not fn.endswith(".removed"):
  191. raise FsckError("can't fix automatically: please manually "
  192. "remove the file %s and try again",
  193. os.path.join(subpath, fn))
  194. # Remove the whole thing
  195. err("Removing empty subpath\n")
  196. shutil.rmtree(subpath)
  197. raise RetryFsck
  198. def fix_bad_filesize(self, path, filepath, offset, row_size):
  199. extra = offset % row_size
  200. msg = sprintf("%s: size of file %s (%d) is not a multiple" +
  201. " of row size (%d): %d extra bytes present",
  202. path, filepath, offset, row_size, extra)
  203. if not self.fix:
  204. raise FixableFsckError(msg)
  205. # Try to fix it by just truncating the file
  206. err("\n%s\n", msg)
  207. newsize = offset - extra
  208. err("Truncating file to %d bytes and retrying\n", newsize)
  209. with open(filepath, "r+b") as f:
  210. f.truncate(newsize)
  211. raise RetryFsck
  212. @retry_if_raised(RetryFsck)
  213. def check_bulkdata(self, sid, path, bulk):
  214. with open(os.path.join(bulk, "_format"), "rb") as f:
  215. fmt = pickle.load(f)
  216. if fmt["version"] != 3:
  217. raise FsckError("%s: bad or unsupported bulkdata version %d",
  218. path, fmt["version"])
  219. row_per_file = int(fmt["rows_per_file"])
  220. files_per_dir = int(fmt["files_per_dir"])
  221. layout = fmt["layout"]
  222. if layout != self.stream_layout[sid]:
  223. raise FsckError("%s: layout mismatch %s != %s", path,
  224. layout, self.stream_layout[sid])
  225. # Every file should have a size that's the multiple of the row size
  226. rkt = nilmdb.server.rocket.Rocket(layout, None)
  227. row_size = rkt.binary_size
  228. rkt.close()
  229. # Find all directories
  230. regex = re.compile("^[0-9a-f]{4,}$")
  231. subdirs = sorted(filter(regex.search, os.listdir(bulk)),
  232. key = lambda x: int(x, 16), reverse = True)
  233. for subdir in subdirs:
  234. # Find all files in that dir
  235. subpath = os.path.join(bulk, subdir)
  236. files = filter(regex.search, os.listdir(subpath))
  237. if not files:
  238. self.fix_empty_subdir(subpath)
  239. raise RetryFsck
  240. # Verify that their size is a multiple of the row size
  241. for filename in files:
  242. filepath = os.path.join(subpath, filename)
  243. offset = os.path.getsize(filepath)
  244. if offset % row_size:
  245. self.fix_bad_filesize(path, filepath, offset, row_size)
  246. def check_intervals(self):
  247. total_ints = sum(len(x) for x in self.stream_interval.values())
  248. log("checking %d intervals\n", total_ints)
  249. checked = 0
  250. with Progress(total_ints) as pbar:
  251. for sid in self.stream_interval:
  252. try:
  253. bulk = self.bulkpath + self.stream_path[sid]
  254. tab = nilmdb.server.bulkdata.Table(bulk)
  255. def update(x):
  256. pbar.update(checked + x)
  257. ints = self.stream_interval[sid]
  258. path = self.stream_path[sid]
  259. self.check_table_intervals(path, ints, tab, update)
  260. checked += len(ints)
  261. finally:
  262. tab.close()
  263. def check_table_intervals(self, path, ints, tab, update):
  264. # look in the table to make sure we can pick out the interval's
  265. # endpoints
  266. tab.file_open.cache_remove_all()
  267. for (i, intv) in enumerate(ints):
  268. (stime, etime, spos, epos) = intv
  269. update(i)
  270. if spos == epos:
  271. continue
  272. try:
  273. srow = tab[spos]
  274. erow = tab[epos-1]
  275. except Exception as e:
  276. msg = sprintf("%s: interval %s error accessing rows: %s",
  277. path, str(intv), str(e))
  278. raise FsckError(msg)