diff --git a/nilmdb/fsck/fsck.py b/nilmdb/fsck/fsck.py index 56df83e..209b555 100644 --- a/nilmdb/fsck/fsck.py +++ b/nilmdb/fsck/fsck.py @@ -53,7 +53,11 @@ def retry_if_raised(exc, message = None): class Progress(object): def __init__(self, maxval): - self.bar = progressbar.ProgressBar(maxval = maxval) + self.bar = progressbar.ProgressBar( + maxval = maxval, + widgets = [ progressbar.Percentage(), ' ', + progressbar.Bar(), ' ', + progressbar.ETA() ]) if self.bar.term_width == 0: self.bar.term_width = 75 def __enter__(self): @@ -79,14 +83,26 @@ class Fsck(object): @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck") def check(self): - self.check_paths() - self.check_sql() - self.check_streams() - self.check_intervals() + self.bulk = None + self.sql = None + try: + self.check_paths() + self.check_sql() + self.check_streams() + self.check_intervals() + self.check_data() + finally: + if self.bulk: + self.bulk.close() + if self.sql: + self.sql.commit() + self.sql.close() log("ok\n") def check_paths(self): log("checking paths\n") + if self.bulk: + self.bulk.close() if not os.path.isfile(self.sqlpath): raise FsckError("SQL database missing (%s)", self.sqlpath) if not os.path.isdir(self.bulkpath): @@ -94,17 +110,16 @@ class Fsck(object): with open(self.bulklock, "w") as lockfile: if not nilmdb.utils.lock.exclusive_lock(lockfile): raise FsckError('database already locked by another process') + # unlocked immediately self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath) - # override must_close warning - if "_must_close" in dir(self.bulk): - del self.bulk._must_close def check_sql(self): log("checking sqlite database\n") self.sql = sqlite3.connect(self.sqlpath) - with self.sql as con: - ver = con.execute("PRAGMA user_version").fetchone()[0] + with self.sql: + cur = self.sql.cursor() + ver = cur.execute("PRAGMA user_version").fetchone()[0] good = max(nilmdb.server.nilmdb._sql_schema_updates.keys()) if ver != good: raise FsckError("database version %d too old, should be %d", @@ -112,7 +127,7 @@ class Fsck(object): self.stream_path = {} self.stream_layout = {} log(" loading paths\n") - result = con.execute("SELECT id, path, layout FROM streams") + result = cur.execute("SELECT id, path, layout FROM streams") for r in result: if r[0] in self.stream_path: raise FsckError("duplicated ID %d in stream IDs", r[0]) @@ -121,7 +136,7 @@ class Fsck(object): log(" loading intervals\n") self.stream_interval = defaultdict(list) - result = con.execute("SELECT stream_id, start_time, end_time, " + result = cur.execute("SELECT stream_id, start_time, end_time, " "start_pos, end_pos FROM ranges") for r in result: if r[0] not in self.stream_path: @@ -130,7 +145,7 @@ class Fsck(object): log(" loading metadata\n") self.stream_meta = defaultdict(dict) - result = con.execute("SELECT stream_id, key, value FROM metadata") + result = cur.execute("SELECT stream_id, key, value FROM metadata") for r in result: if r[0] not in self.stream_path: raise FsckError("metadata ID %d not in streams", k) @@ -271,9 +286,8 @@ class Fsck(object): if offset % row_size: self.fix_bad_filesize(path, filepath, offset, row_size) - def check_intervals(self): + def _check_for_each_interval(self, checkfunc): total_ints = sum(len(x) for x in self.stream_interval.values()) - log("checking %d intervals\n", total_ints) checked = 0 with Progress(total_ints) as pbar: for sid in self.stream_interval: @@ -283,25 +297,100 @@ class Fsck(object): def update(x): pbar.update(checked + x) ints = self.stream_interval[sid] - path = self.stream_path[sid] - self.check_table_intervals(path, ints, tab, update) + checkfunc(sid, ints, tab, update) checked += len(ints) finally: tab.close() - def check_table_intervals(self, path, ints, tab, update): + def check_intervals(self): + total_ints = sum(len(x) for x in self.stream_interval.values()) + log("checking %d intervals\n", total_ints) + self._check_for_each_interval(self.check_table_intervals) + + def fix_bad_interval(self, sid, intv, tab, msg): + path = self.stream_path[sid] + msg = sprintf("%s: interval %s error accessing rows: %s", + path, str(intv), str(msg)) + if not self.fix: + raise FixableFsckError(msg) + err("\n%s\n", msg) + + (stime, etime, spos, epos) = intv + # If it's just that the end pos is more than the number of rows + # in the table, lower end pos and truncate interval time too. + if spos < tab.nrows and epos >= tab.nrows: + err("end position is past endrows, but it can be truncated\n") + err("old end: time %d, pos %d\n", etime, epos) + new_epos = tab.nrows + new_etime = tab[new_epos-1] + 1 + err("new end: time %d, pos %d\n", new_etime, new_epos) + if stime < new_etime: + # Change it in SQL + with self.sql: + cur = self.sql.cursor() + cur.execute("UPDATE ranges SET end_time=?, end_pos=? " + "WHERE stream_id=? AND start_time=? AND " + "end_time=? AND start_pos=? AND end_pos=?", + (new_etime, new_epos, sid, stime, etime, + spos, epos)) + if cur.rowcount != 1: + raise FsckError("failed to fix SQL database") + raise RetryFsck + err("actually it can't be truncated; times are bad too") + + # Otherwise, the only hope is to delete the interval entirely. + err("*** Deleting the entire interval from SQL.\n") + err("This may leave stale data on disk. To fix that, copy all\n") + err("data from this stream to a new stream, then remove all data\n") + err("and destroy %s.\n") + with self.sql: + cur = self.sql.cursor() + cur.execute("DELETE FROM ranges WHERE " + "stream_id=? AND start_time=? AND " + "end_time=? AND start_pos=? AND end_pos=?", + (sid, stime, etime, spos, epos)) + if cur.rowcount != 1: + raise FsckError("failed to remove interval") + raise RetryFsck + + def check_table_intervals(self, sid, ints, tab, update): # look in the table to make sure we can pick out the interval's # endpoints + path = self.stream_path[sid] tab.file_open.cache_remove_all() for (i, intv) in enumerate(ints): (stime, etime, spos, epos) = intv update(i) - if spos == epos: + if spos == epos and spos >= 0 and spos <= tab.nrows: continue try: srow = tab[spos] erow = tab[epos-1] except Exception as e: - msg = sprintf("%s: interval %s error accessing rows: %s", - path, str(intv), str(e)) - raise FsckError(msg) + self.fix_bad_interval(sid, intv, tab, str(e)) + raise RetryFsck + + def check_data(self): + total_rows = sum(sum((y[3] - y[2]) for y in x) + for x in self.stream_interval.values()) + log("checking %d rows of data\n", total_rows) + self._check_for_each_interval(self.check_table_data) + + def check_table_data(self, sid, ints, tab, update): + # look in the table to make sure we can pick out all of + # the interval's data, and that the data is monotonic + path = self.stream_path[sid] + tab.file_open.cache_remove_all() + for (i, intv) in enumerate(ints): + (stime, etime, spos, epos) = intv + update(i) + last_ts = None + for row in xrange(spos, epos): + ts = tab[row] + if ts <= last_ts: + raise FsckError("%s: interval %s has non-monotonic " + "timestamps: %d and then %d\n", + path, intv, last_ts, ts) + if ts < stime or ts >= etime: + raise FsckError("%s: interval %s has out-of-bound " + "timestamp %d\n", ps, intv, ts)