|
@@ -0,0 +1,194 @@ |
|
|
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
|
|
|
|
|
|
"""Check database consistency""" |
|
|
|
|
|
|
|
|
|
|
|
import nilmdb.utils |
|
|
|
|
|
import nilmdb.server |
|
|
|
|
|
from nilmdb.utils.interval import IntervalError |
|
|
|
|
|
from nilmdb.server.interval import Interval, IntervalSet |
|
|
|
|
|
from nilmdb.utils.printf import * |
|
|
|
|
|
from nilmdb.utils.time import timestamp_to_string |
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
|
|
|
|
import sqlite3 |
|
|
|
|
|
import os |
|
|
|
|
|
import progressbar |
|
|
|
|
|
import time |
|
|
|
|
|
import cPickle as pickle |
|
|
|
|
|
|
|
|
|
|
|
class FsckError(Exception): |
|
|
|
|
|
def __init__(self, format, *args): |
|
|
|
|
|
Exception.__init__(self, sprintf(format, *args)) |
|
|
|
|
|
|
|
|
|
|
|
def log(format, *args): |
|
|
|
|
|
printf(format, *args) |
|
|
|
|
|
|
|
|
|
|
|
def err(format, *args): |
|
|
|
|
|
fprintf(sys.stderr, format, *args) |
|
|
|
|
|
|
|
|
|
|
|
class Progress(object): |
|
|
|
|
|
def __init__(self, maxval): |
|
|
|
|
|
self.bar = progressbar.ProgressBar(maxval = maxval) |
|
|
|
|
|
if self.bar.term_width == 0: |
|
|
|
|
|
self.bar.term_width = 75 |
|
|
|
|
|
def __enter__(self): |
|
|
|
|
|
self.bar.start() |
|
|
|
|
|
self.last_update = 0 |
|
|
|
|
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback): |
|
|
|
|
|
if exc_type is None: |
|
|
|
|
|
self.bar.finish() |
|
|
|
|
|
else: |
|
|
|
|
|
printf("\n") |
|
|
|
|
|
def update(self, val): |
|
|
|
|
|
self.bar.update(val) |
|
|
|
|
|
#now = time.time() |
|
|
|
|
|
#if now - self.last_update < 0.005: |
|
|
|
|
|
# time.sleep(0.005) |
|
|
|
|
|
#self.last_update = now |
|
|
|
|
|
|
|
|
|
|
|
class Fsck(object): |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, path): |
|
|
|
|
|
self.basepath = path |
|
|
|
|
|
self.sqlpath = os.path.join(path, "data.sql") |
|
|
|
|
|
self.bulkpath = os.path.join(path, "data") |
|
|
|
|
|
self.bulklock = os.path.join(path, "data.lock") |
|
|
|
|
|
|
|
|
|
|
|
def check(self): |
|
|
|
|
|
self.check_paths() |
|
|
|
|
|
self.check_sql() |
|
|
|
|
|
self.check_streams() |
|
|
|
|
|
log("ok\n") |
|
|
|
|
|
|
|
|
|
|
|
def check_paths(self): |
|
|
|
|
|
log("checking paths\n") |
|
|
|
|
|
if not os.path.isfile(self.sqlpath): |
|
|
|
|
|
raise FsckError("SQL database missing") |
|
|
|
|
|
if not os.path.isdir(self.bulkpath): |
|
|
|
|
|
raise FsckError("Bulk data directory missing") |
|
|
|
|
|
with open(self.bulklock, "w") as lockfile: |
|
|
|
|
|
if not nilmdb.utils.lock.exclusive_lock(lockfile): |
|
|
|
|
|
raise FsckError('database already locked by another process') |
|
|
|
|
|
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath) |
|
|
|
|
|
# override must_close warning |
|
|
|
|
|
if "_must_close" in dir(self.bulk): |
|
|
|
|
|
del self.bulk._must_close |
|
|
|
|
|
|
|
|
|
|
|
def check_sql(self): |
|
|
|
|
|
log("checking sqlite database\n") |
|
|
|
|
|
|
|
|
|
|
|
self.sql = sqlite3.connect(self.sqlpath) |
|
|
|
|
|
with self.sql as con: |
|
|
|
|
|
ver = con.execute("PRAGMA user_version").fetchone()[0] |
|
|
|
|
|
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys()) |
|
|
|
|
|
if ver != good: |
|
|
|
|
|
raise FsckError("database version %d too old, should be %d", |
|
|
|
|
|
ver, good) |
|
|
|
|
|
self.stream_path = {} |
|
|
|
|
|
self.stream_layout = {} |
|
|
|
|
|
log(" loading paths\n") |
|
|
|
|
|
result = con.execute("SELECT id, path, layout FROM streams") |
|
|
|
|
|
for r in result: |
|
|
|
|
|
if r[0] in self.stream_path: |
|
|
|
|
|
raise FsckError("duplicated ID %d in stream IDs", r[0]) |
|
|
|
|
|
self.stream_path[r[0]] = r[1] |
|
|
|
|
|
self.stream_layout[r[0]] = r[2] |
|
|
|
|
|
|
|
|
|
|
|
log(" loading intervals\n") |
|
|
|
|
|
self.stream_interval = defaultdict(list) |
|
|
|
|
|
result = con.execute("SELECT stream_id, start_time, end_time, " |
|
|
|
|
|
"start_pos, end_pos FROM ranges") |
|
|
|
|
|
for r in result: |
|
|
|
|
|
if r[0] not in self.stream_path: |
|
|
|
|
|
raise FsckError("interval ID %d not in streams", k) |
|
|
|
|
|
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4])) |
|
|
|
|
|
|
|
|
|
|
|
log(" loading metadata\n") |
|
|
|
|
|
self.stream_meta = defaultdict(dict) |
|
|
|
|
|
result = con.execute("SELECT stream_id, key, value FROM metadata") |
|
|
|
|
|
for r in result: |
|
|
|
|
|
if r[0] not in self.stream_path: |
|
|
|
|
|
raise FsckError("metadata ID %d not in streams", k) |
|
|
|
|
|
if r[1] in self.stream_meta[r[0]]: |
|
|
|
|
|
raise FsckError("duplicate metadata key '%s' for stream %d", |
|
|
|
|
|
r[1], r[0]) |
|
|
|
|
|
self.stream_meta[r[0]][r[1]] = r[2] |
|
|
|
|
|
|
|
|
|
|
|
def check_streams(self): |
|
|
|
|
|
log("checking streams\n") |
|
|
|
|
|
ids = self.stream_path.keys() |
|
|
|
|
|
with Progress(len(ids)) as pbar: |
|
|
|
|
|
for i, sid in enumerate(ids): |
|
|
|
|
|
pbar.update(i) |
|
|
|
|
|
path = self.stream_path[sid] |
|
|
|
|
|
|
|
|
|
|
|
# unique path, valid layout |
|
|
|
|
|
if self.stream_path.values().count(path) != 1: |
|
|
|
|
|
raise FsckError("duplicated path %s", path) |
|
|
|
|
|
layout = self.stream_layout[sid].split('_')[0] |
|
|
|
|
|
if layout not in ('int8', 'int16', 'int32', 'int64', |
|
|
|
|
|
'uint8', 'uint16', 'uint32', 'uint64', |
|
|
|
|
|
'float32', 'float64'): |
|
|
|
|
|
raise FsckError("bad layout %s for %s", layout, path) |
|
|
|
|
|
count = int(self.stream_layout[sid].split('_')[1]) |
|
|
|
|
|
if count < 1 or count > 1024: |
|
|
|
|
|
raise FsckError("bad count %d for %s", count, path) |
|
|
|
|
|
|
|
|
|
|
|
# must exist in bulkdata |
|
|
|
|
|
bulk = self.bulkpath + path |
|
|
|
|
|
if not os.path.isdir(bulk): |
|
|
|
|
|
raise FsckError("%s: missing bulkdata dir", path) |
|
|
|
|
|
if not nilmdb.server.bulkdata.Table.exists(bulk): |
|
|
|
|
|
raise FsckError("%s: bad bulkdata table", path) |
|
|
|
|
|
|
|
|
|
|
|
# intervals don't overlap. Abuse IntervalSet to check |
|
|
|
|
|
# for intervals in file positions, too. |
|
|
|
|
|
timeiset = IntervalSet() |
|
|
|
|
|
posiset = IntervalSet() |
|
|
|
|
|
for (stime, etime, spos, epos) in self.stream_interval[sid]: |
|
|
|
|
|
new = Interval(stime, etime) |
|
|
|
|
|
try: |
|
|
|
|
|
timeiset += new |
|
|
|
|
|
except IntervalError: |
|
|
|
|
|
raise FsckError("%s: overlap in intervals:\n" |
|
|
|
|
|
"set: %s\nnew: %s\n", |
|
|
|
|
|
path, str(timeiset), str(new)) |
|
|
|
|
|
if spos != epos: |
|
|
|
|
|
new = Interval(spos, epos) |
|
|
|
|
|
try: |
|
|
|
|
|
posiset += new |
|
|
|
|
|
except IntervalError: |
|
|
|
|
|
raise FsckError("%s: overlap in file offsets:\n" |
|
|
|
|
|
"set: %s\nnew: %s\n", |
|
|
|
|
|
path, str(posiset), str(new)) |
|
|
|
|
|
|
|
|
|
|
|
# check bulkdata |
|
|
|
|
|
self.check_bulkdata(sid, path, bulk) |
|
|
|
|
|
|
|
|
|
|
|
continue |
|
|
|
|
|
# verify we can can open it with bulkdata |
|
|
|
|
|
try: |
|
|
|
|
|
tab = None |
|
|
|
|
|
try: |
|
|
|
|
|
tab = nilmdb.server.bulkdata.Table(bulk) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
raise FsckError("%s: can't open bulkdata: %s", |
|
|
|
|
|
path, str(e)) |
|
|
|
|
|
self.check_bulkdata(path, tab) |
|
|
|
|
|
finally: |
|
|
|
|
|
if tab: |
|
|
|
|
|
tab.close() |
|
|
|
|
|
|
|
|
|
|
|
def check_bulkdata(self, sid, path, bulk): |
|
|
|
|
|
with open(os.path.join(bulk, "_format"), "rb") as f: |
|
|
|
|
|
fmt = pickle.load(f) |
|
|
|
|
|
if fmt["version"] != 3: |
|
|
|
|
|
raise FsckError("%s: bad or unsupported bulkdata version %d", |
|
|
|
|
|
path, fmt["version"]) |
|
|
|
|
|
row_per_file = int(fmt["rows_per_file"]) |
|
|
|
|
|
files_per_dir = int(fmt["files_per_dir"]) |
|
|
|
|
|
layout = fmt["layout"] |
|
|
|
|
|
if layout != self.stream_layout[sid]: |
|
|
|
|
|
raise FsckError("%s: layout mismatch %s != %s", path, |
|
|
|
|
|
layout, self.stream_layout[sid]) |