Compare commits
16 Commits
nilmdb-1.9
...
nilmdb-1.9
Author | SHA1 | Date | |
---|---|---|---|
7d7b89b52f | |||
8d249273c6 | |||
abe431c663 | |||
ccf1f695af | |||
06f7390c9e | |||
6de77a08f1 | |||
8db9771c20 | |||
04f815a24b | |||
6868f5f126 | |||
ca0943ec19 | |||
68addb4e4a | |||
68c33b1f14 | |||
8dd8741100 | |||
8e6341ae5d | |||
422b1e2df2 | |||
0f745b3047 |
4
Makefile
4
Makefile
@@ -23,10 +23,6 @@ docs:
|
|||||||
lint:
|
lint:
|
||||||
pylint --rcfile=.pylintrc nilmdb
|
pylint --rcfile=.pylintrc nilmdb
|
||||||
|
|
||||||
fscktest:
|
|
||||||
# python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/wsgi/db').check()"
|
|
||||||
python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/mnt/bucket/mnt/sharon/data/db').check()"
|
|
||||||
|
|
||||||
test:
|
test:
|
||||||
ifeq ($(INSIDE_EMACS), t)
|
ifeq ($(INSIDE_EMACS), t)
|
||||||
# Use the slightly more flexible script
|
# Use the slightly more flexible script
|
||||||
|
@@ -8,7 +8,8 @@ Prerequisites:
|
|||||||
|
|
||||||
# Base NilmDB dependencies
|
# Base NilmDB dependencies
|
||||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
sudo apt-get install python-requests python-dateutil python-tz
|
||||||
|
sudo apt-get install python-progressbar python-psutil
|
||||||
|
|
||||||
# Other dependencies (required by some modules)
|
# Other dependencies (required by some modules)
|
||||||
sudo apt-get install python-numpy
|
sudo apt-get install python-numpy
|
||||||
@@ -26,6 +27,7 @@ Install:
|
|||||||
Usage:
|
Usage:
|
||||||
|
|
||||||
nilmdb-server --help
|
nilmdb-server --help
|
||||||
|
nilmdb-fsck --help
|
||||||
nilmtool --help
|
nilmtool --help
|
||||||
|
|
||||||
See docs/wsgi.md for info on setting up a WSGI application in Apache.
|
See docs/wsgi.md for info on setting up a WSGI application in Apache.
|
||||||
|
@@ -9,7 +9,7 @@ import requests
|
|||||||
|
|
||||||
class HTTPClient(object):
|
class HTTPClient(object):
|
||||||
"""Class to manage and perform HTTP requests from the client"""
|
"""Class to manage and perform HTTP requests from the client"""
|
||||||
def __init__(self, baseurl = "", post_json = False):
|
def __init__(self, baseurl = "", post_json = False, verify_ssl = True):
|
||||||
"""If baseurl is supplied, all other functions that take
|
"""If baseurl is supplied, all other functions that take
|
||||||
a URL can be given a relative URL instead."""
|
a URL can be given a relative URL instead."""
|
||||||
# Verify / clean up URL
|
# Verify / clean up URL
|
||||||
@@ -19,6 +19,7 @@ class HTTPClient(object):
|
|||||||
self.baseurl = reparsed.rstrip('/') + '/'
|
self.baseurl = reparsed.rstrip('/') + '/'
|
||||||
|
|
||||||
# Build Requests session object, enable SSL verification
|
# Build Requests session object, enable SSL verification
|
||||||
|
self.verify_ssl = verify_ssl
|
||||||
self.session = requests.Session()
|
self.session = requests.Session()
|
||||||
self.session.verify = True
|
self.session.verify = True
|
||||||
|
|
||||||
@@ -67,7 +68,8 @@ class HTTPClient(object):
|
|||||||
params = query_data,
|
params = query_data,
|
||||||
data = body_data,
|
data = body_data,
|
||||||
stream = stream,
|
stream = stream,
|
||||||
headers = headers)
|
headers = headers,
|
||||||
|
verify = self.verify_ssl)
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
raise ServerError(status = "502 Error", url = url,
|
raise ServerError(status = "502 Error", url = url,
|
||||||
message = str(e.message))
|
message = str(e.message))
|
||||||
|
@@ -19,9 +19,8 @@ except ImportError: # pragma: no cover
|
|||||||
|
|
||||||
# Valid subcommands. Defined in separate files just to break
|
# Valid subcommands. Defined in separate files just to break
|
||||||
# things up -- they're still called with Cmdline as self.
|
# things up -- they're still called with Cmdline as self.
|
||||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
subcommands = [ "help", "info", "create", "rename", "list", "intervals",
|
||||||
"insert", "extract", "remove", "destroy",
|
"metadata", "insert", "extract", "remove", "destroy" ]
|
||||||
"intervals", "rename" ]
|
|
||||||
|
|
||||||
# Import the subcommand modules
|
# Import the subcommand modules
|
||||||
subcmd_mods = {}
|
subcmd_mods = {}
|
||||||
@@ -122,7 +121,7 @@ class Cmdline(object):
|
|||||||
group = self.parser.add_argument_group("General options")
|
group = self.parser.add_argument_group("General options")
|
||||||
group.add_argument("-h", "--help", action='help',
|
group.add_argument("-h", "--help", action='help',
|
||||||
help='show this help message and exit')
|
help='show this help message and exit')
|
||||||
group.add_argument("-V", "--version", action="version",
|
group.add_argument("-v", "--version", action="version",
|
||||||
version = nilmdb.__version__)
|
version = nilmdb.__version__)
|
||||||
|
|
||||||
group = self.parser.add_argument_group("Server")
|
group = self.parser.add_argument_group("Server")
|
||||||
|
@@ -45,6 +45,8 @@ def setup(self, sub):
|
|||||||
help="Show raw timestamps when printing times")
|
help="Show raw timestamps when printing times")
|
||||||
group.add_argument("-l", "--layout", action="store_true",
|
group.add_argument("-l", "--layout", action="store_true",
|
||||||
help="Show layout type next to path name")
|
help="Show layout type next to path name")
|
||||||
|
group.add_argument("-n", "--no-decim", action="store_true",
|
||||||
|
help="Skip paths containing \"~decim-\"")
|
||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
@@ -71,6 +73,8 @@ def cmd_list(self):
|
|||||||
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
||||||
if not fnmatch.fnmatch(path, argpath):
|
if not fnmatch.fnmatch(path, argpath):
|
||||||
continue
|
continue
|
||||||
|
if self.args.no_decim and "~decim-" in path:
|
||||||
|
continue
|
||||||
|
|
||||||
if self.args.layout:
|
if self.args.layout:
|
||||||
printf("%s %s\n", path, layout)
|
printf("%s %s\n", path, layout)
|
||||||
|
@@ -1 +0,0 @@
|
|||||||
jim@pilot.lees.18066:1373305995
|
|
@@ -1,9 +1,13 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
"""Check database consistency"""
|
"""Check database consistency, with some ability to fix problems.
|
||||||
|
This should be able to fix cases where a database gets corrupted due
|
||||||
|
to unexpected system shutdown, and detect other cases that may cause
|
||||||
|
NilmDB to return errors when trying to manipulate the database."""
|
||||||
|
|
||||||
import nilmdb.utils
|
import nilmdb.utils
|
||||||
import nilmdb.server
|
import nilmdb.server
|
||||||
|
import nilmdb.client.numpyclient
|
||||||
from nilmdb.utils.interval import IntervalError
|
from nilmdb.utils.interval import IntervalError
|
||||||
from nilmdb.server.interval import Interval, IntervalSet
|
from nilmdb.server.interval import Interval, IntervalSet
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
@@ -12,13 +16,26 @@ from nilmdb.utils.time import timestamp_to_string
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import progressbar
|
import progressbar
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
|
import shutil
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
|
import numpy
|
||||||
|
|
||||||
class FsckError(Exception):
|
class FsckError(Exception):
|
||||||
def __init__(self, format, *args):
|
def __init__(self, msg = "", *args):
|
||||||
Exception.__init__(self, sprintf(format, *args))
|
if args:
|
||||||
|
msg = sprintf(msg, *args)
|
||||||
|
Exception.__init__(self, msg)
|
||||||
|
class FixableFsckError(FsckError):
|
||||||
|
def __init__(self, msg = "", *args):
|
||||||
|
if args:
|
||||||
|
msg = sprintf(msg, *args)
|
||||||
|
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
|
||||||
|
class RetryFsck(FsckError):
|
||||||
|
pass
|
||||||
|
|
||||||
def log(format, *args):
|
def log(format, *args):
|
||||||
printf(format, *args)
|
printf(format, *args)
|
||||||
@@ -26,9 +43,27 @@ def log(format, *args):
|
|||||||
def err(format, *args):
|
def err(format, *args):
|
||||||
fprintf(sys.stderr, format, *args)
|
fprintf(sys.stderr, format, *args)
|
||||||
|
|
||||||
|
# Decorator that retries a function if it returns a specific value
|
||||||
|
def retry_if_raised(exc, message = None, max_retries = 100):
|
||||||
|
def f1(func):
|
||||||
|
def f2(*args, **kwargs):
|
||||||
|
for n in range(max_retries):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except exc as e:
|
||||||
|
if message:
|
||||||
|
log("%s\n\n", message)
|
||||||
|
raise Exception("Max number of retries (%d) exceeded; giving up")
|
||||||
|
return f2
|
||||||
|
return f1
|
||||||
|
|
||||||
class Progress(object):
|
class Progress(object):
|
||||||
def __init__(self, maxval):
|
def __init__(self, maxval):
|
||||||
self.bar = progressbar.ProgressBar(maxval = maxval)
|
self.bar = progressbar.ProgressBar(
|
||||||
|
maxval = maxval,
|
||||||
|
widgets = [ progressbar.Percentage(), ' ',
|
||||||
|
progressbar.Bar(), ' ',
|
||||||
|
progressbar.ETA() ])
|
||||||
if self.bar.term_width == 0:
|
if self.bar.term_width == 0:
|
||||||
self.bar.term_width = 75
|
self.bar.term_width = 75
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
@@ -42,45 +77,68 @@ class Progress(object):
|
|||||||
printf("\n")
|
printf("\n")
|
||||||
def update(self, val):
|
def update(self, val):
|
||||||
self.bar.update(val)
|
self.bar.update(val)
|
||||||
#now = time.time()
|
|
||||||
#if now - self.last_update < 0.005:
|
|
||||||
# time.sleep(0.005)
|
|
||||||
#self.last_update = now
|
|
||||||
|
|
||||||
class Fsck(object):
|
class Fsck(object):
|
||||||
|
|
||||||
def __init__(self, path):
|
def __init__(self, path, fix = False):
|
||||||
self.basepath = path
|
self.basepath = path
|
||||||
self.sqlpath = os.path.join(path, "data.sql")
|
self.sqlpath = os.path.join(path, "data.sql")
|
||||||
self.bulkpath = os.path.join(path, "data")
|
self.bulkpath = os.path.join(path, "data")
|
||||||
self.bulklock = os.path.join(path, "data.lock")
|
self.bulklock = os.path.join(path, "data.lock")
|
||||||
|
self.fix = fix
|
||||||
|
|
||||||
def check(self):
|
### Main checks
|
||||||
self.check_paths()
|
|
||||||
self.check_sql()
|
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
|
||||||
self.check_streams()
|
def check(self, skip_data = False):
|
||||||
|
self.bulk = None
|
||||||
|
self.sql = None
|
||||||
|
try:
|
||||||
|
self.check_paths()
|
||||||
|
self.check_sql()
|
||||||
|
self.check_streams()
|
||||||
|
self.check_intervals()
|
||||||
|
if skip_data:
|
||||||
|
log("skipped data check\n")
|
||||||
|
else:
|
||||||
|
self.check_data()
|
||||||
|
finally:
|
||||||
|
if self.bulk:
|
||||||
|
self.bulk.close()
|
||||||
|
if self.sql:
|
||||||
|
self.sql.commit()
|
||||||
|
self.sql.close()
|
||||||
log("ok\n")
|
log("ok\n")
|
||||||
|
|
||||||
|
### Check basic path structure
|
||||||
|
|
||||||
def check_paths(self):
|
def check_paths(self):
|
||||||
log("checking paths\n")
|
log("checking paths\n")
|
||||||
|
if self.bulk:
|
||||||
|
self.bulk.close()
|
||||||
if not os.path.isfile(self.sqlpath):
|
if not os.path.isfile(self.sqlpath):
|
||||||
raise FsckError("SQL database missing")
|
raise FsckError("SQL database missing (%s)", self.sqlpath)
|
||||||
if not os.path.isdir(self.bulkpath):
|
if not os.path.isdir(self.bulkpath):
|
||||||
raise FsckError("Bulk data directory missing")
|
raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
|
||||||
with open(self.bulklock, "w") as lockfile:
|
with open(self.bulklock, "w") as lockfile:
|
||||||
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
||||||
raise FsckError('database already locked by another process')
|
raise FsckError('Database already locked by another process\n'
|
||||||
|
'Make sure all other processes that might be '
|
||||||
|
'using the database are stopped.\n'
|
||||||
|
'Restarting apache will cause it to unlock '
|
||||||
|
'the db until a request is received.')
|
||||||
|
# unlocked immediately
|
||||||
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
|
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
|
||||||
# override must_close warning
|
|
||||||
if "_must_close" in dir(self.bulk):
|
### Check SQL database health
|
||||||
del self.bulk._must_close
|
|
||||||
|
|
||||||
def check_sql(self):
|
def check_sql(self):
|
||||||
log("checking sqlite database\n")
|
log("checking sqlite database\n")
|
||||||
|
|
||||||
self.sql = sqlite3.connect(self.sqlpath)
|
self.sql = sqlite3.connect(self.sqlpath)
|
||||||
with self.sql as con:
|
with self.sql:
|
||||||
ver = con.execute("PRAGMA user_version").fetchone()[0]
|
cur = self.sql.cursor()
|
||||||
|
ver = cur.execute("PRAGMA user_version").fetchone()[0]
|
||||||
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
|
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
|
||||||
if ver != good:
|
if ver != good:
|
||||||
raise FsckError("database version %d too old, should be %d",
|
raise FsckError("database version %d too old, should be %d",
|
||||||
@@ -88,7 +146,7 @@ class Fsck(object):
|
|||||||
self.stream_path = {}
|
self.stream_path = {}
|
||||||
self.stream_layout = {}
|
self.stream_layout = {}
|
||||||
log(" loading paths\n")
|
log(" loading paths\n")
|
||||||
result = con.execute("SELECT id, path, layout FROM streams")
|
result = cur.execute("SELECT id, path, layout FROM streams")
|
||||||
for r in result:
|
for r in result:
|
||||||
if r[0] in self.stream_path:
|
if r[0] in self.stream_path:
|
||||||
raise FsckError("duplicated ID %d in stream IDs", r[0])
|
raise FsckError("duplicated ID %d in stream IDs", r[0])
|
||||||
@@ -97,8 +155,9 @@ class Fsck(object):
|
|||||||
|
|
||||||
log(" loading intervals\n")
|
log(" loading intervals\n")
|
||||||
self.stream_interval = defaultdict(list)
|
self.stream_interval = defaultdict(list)
|
||||||
result = con.execute("SELECT stream_id, start_time, end_time, "
|
result = cur.execute("SELECT stream_id, start_time, end_time, "
|
||||||
"start_pos, end_pos FROM ranges")
|
"start_pos, end_pos FROM ranges "
|
||||||
|
"ORDER BY start_time")
|
||||||
for r in result:
|
for r in result:
|
||||||
if r[0] not in self.stream_path:
|
if r[0] not in self.stream_path:
|
||||||
raise FsckError("interval ID %d not in streams", k)
|
raise FsckError("interval ID %d not in streams", k)
|
||||||
@@ -106,7 +165,7 @@ class Fsck(object):
|
|||||||
|
|
||||||
log(" loading metadata\n")
|
log(" loading metadata\n")
|
||||||
self.stream_meta = defaultdict(dict)
|
self.stream_meta = defaultdict(dict)
|
||||||
result = con.execute("SELECT stream_id, key, value FROM metadata")
|
result = cur.execute("SELECT stream_id, key, value FROM metadata")
|
||||||
for r in result:
|
for r in result:
|
||||||
if r[0] not in self.stream_path:
|
if r[0] not in self.stream_path:
|
||||||
raise FsckError("metadata ID %d not in streams", k)
|
raise FsckError("metadata ID %d not in streams", k)
|
||||||
@@ -115,9 +174,11 @@ class Fsck(object):
|
|||||||
r[1], r[0])
|
r[1], r[0])
|
||||||
self.stream_meta[r[0]][r[1]] = r[2]
|
self.stream_meta[r[0]][r[1]] = r[2]
|
||||||
|
|
||||||
|
### Check streams and basic interval overlap
|
||||||
|
|
||||||
def check_streams(self):
|
def check_streams(self):
|
||||||
log("checking streams\n")
|
|
||||||
ids = self.stream_path.keys()
|
ids = self.stream_path.keys()
|
||||||
|
log("checking %s streams\n", "{:,d}".format(len(ids)))
|
||||||
with Progress(len(ids)) as pbar:
|
with Progress(len(ids)) as pbar:
|
||||||
for i, sid in enumerate(ids):
|
for i, sid in enumerate(ids):
|
||||||
pbar.update(i)
|
pbar.update(i)
|
||||||
@@ -152,7 +213,7 @@ class Fsck(object):
|
|||||||
timeiset += new
|
timeiset += new
|
||||||
except IntervalError:
|
except IntervalError:
|
||||||
raise FsckError("%s: overlap in intervals:\n"
|
raise FsckError("%s: overlap in intervals:\n"
|
||||||
"set: %s\nnew: %s\n",
|
"set: %s\nnew: %s",
|
||||||
path, str(timeiset), str(new))
|
path, str(timeiset), str(new))
|
||||||
if spos != epos:
|
if spos != epos:
|
||||||
new = Interval(spos, epos)
|
new = Interval(spos, epos)
|
||||||
@@ -160,14 +221,13 @@ class Fsck(object):
|
|||||||
posiset += new
|
posiset += new
|
||||||
except IntervalError:
|
except IntervalError:
|
||||||
raise FsckError("%s: overlap in file offsets:\n"
|
raise FsckError("%s: overlap in file offsets:\n"
|
||||||
"set: %s\nnew: %s\n",
|
"set: %s\nnew: %s",
|
||||||
path, str(posiset), str(new))
|
path, str(posiset), str(new))
|
||||||
|
|
||||||
# check bulkdata
|
# check bulkdata
|
||||||
self.check_bulkdata(sid, path, bulk)
|
self.check_bulkdata(sid, path, bulk)
|
||||||
|
|
||||||
continue
|
# Check that we can open bulkdata
|
||||||
# verify we can can open it with bulkdata
|
|
||||||
try:
|
try:
|
||||||
tab = None
|
tab = None
|
||||||
try:
|
try:
|
||||||
@@ -175,11 +235,13 @@ class Fsck(object):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise FsckError("%s: can't open bulkdata: %s",
|
raise FsckError("%s: can't open bulkdata: %s",
|
||||||
path, str(e))
|
path, str(e))
|
||||||
self.check_bulkdata(path, tab)
|
|
||||||
finally:
|
finally:
|
||||||
if tab:
|
if tab:
|
||||||
tab.close()
|
tab.close()
|
||||||
|
|
||||||
|
### Check that bulkdata is good enough to be opened
|
||||||
|
|
||||||
|
@retry_if_raised(RetryFsck)
|
||||||
def check_bulkdata(self, sid, path, bulk):
|
def check_bulkdata(self, sid, path, bulk):
|
||||||
with open(os.path.join(bulk, "_format"), "rb") as f:
|
with open(os.path.join(bulk, "_format"), "rb") as f:
|
||||||
fmt = pickle.load(f)
|
fmt = pickle.load(f)
|
||||||
@@ -192,3 +254,205 @@ class Fsck(object):
|
|||||||
if layout != self.stream_layout[sid]:
|
if layout != self.stream_layout[sid]:
|
||||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
raise FsckError("%s: layout mismatch %s != %s", path,
|
||||||
layout, self.stream_layout[sid])
|
layout, self.stream_layout[sid])
|
||||||
|
|
||||||
|
# Every file should have a size that's the multiple of the row size
|
||||||
|
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
||||||
|
row_size = rkt.binary_size
|
||||||
|
rkt.close()
|
||||||
|
|
||||||
|
# Find all directories
|
||||||
|
regex = re.compile("^[0-9a-f]{4,}$")
|
||||||
|
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
|
||||||
|
key = lambda x: int(x, 16), reverse = True)
|
||||||
|
for subdir in subdirs:
|
||||||
|
# Find all files in that dir
|
||||||
|
subpath = os.path.join(bulk, subdir)
|
||||||
|
files = filter(regex.search, os.listdir(subpath))
|
||||||
|
if not files:
|
||||||
|
self.fix_empty_subdir(subpath)
|
||||||
|
raise RetryFsck
|
||||||
|
# Verify that their size is a multiple of the row size
|
||||||
|
for filename in files:
|
||||||
|
filepath = os.path.join(subpath, filename)
|
||||||
|
offset = os.path.getsize(filepath)
|
||||||
|
if offset % row_size:
|
||||||
|
self.fix_bad_filesize(path, filepath, offset, row_size)
|
||||||
|
|
||||||
|
def fix_empty_subdir(self, subpath):
|
||||||
|
msg = sprintf("bulkdata path %s is missing data files", subpath)
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
# Try to fix it by just deleting whatever is present,
|
||||||
|
# as long as it's only ".removed" files.
|
||||||
|
err("\n%s\n", msg)
|
||||||
|
for fn in os.listdir(subpath):
|
||||||
|
if not fn.endswith(".removed"):
|
||||||
|
raise FsckError("can't fix automatically: please manually "
|
||||||
|
"remove the file %s and try again",
|
||||||
|
os.path.join(subpath, fn))
|
||||||
|
# Remove the whole thing
|
||||||
|
err("Removing empty subpath\n")
|
||||||
|
shutil.rmtree(subpath)
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
|
def fix_bad_filesize(self, path, filepath, offset, row_size):
|
||||||
|
extra = offset % row_size
|
||||||
|
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
|
||||||
|
" of row size (%d): %d extra bytes present",
|
||||||
|
path, filepath, offset, row_size, extra)
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
# Try to fix it by just truncating the file
|
||||||
|
err("\n%s\n", msg)
|
||||||
|
newsize = offset - extra
|
||||||
|
err("Truncating file to %d bytes and retrying\n", newsize)
|
||||||
|
with open(filepath, "r+b") as f:
|
||||||
|
f.truncate(newsize)
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
|
### Check interval endpoints
|
||||||
|
|
||||||
|
def check_intervals(self):
|
||||||
|
total_ints = sum(len(x) for x in self.stream_interval.values())
|
||||||
|
log("checking %s intervals\n", "{:,d}".format(total_ints))
|
||||||
|
done = 0
|
||||||
|
with Progress(total_ints) as pbar:
|
||||||
|
for sid in self.stream_interval:
|
||||||
|
try:
|
||||||
|
bulk = self.bulkpath + self.stream_path[sid]
|
||||||
|
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||||
|
def update(x):
|
||||||
|
pbar.update(done + x)
|
||||||
|
ints = self.stream_interval[sid]
|
||||||
|
done += self.check_table_intervals(sid, ints, tab, update)
|
||||||
|
finally:
|
||||||
|
tab.close()
|
||||||
|
|
||||||
|
def check_table_intervals(self, sid, ints, tab, update):
|
||||||
|
# look in the table to make sure we can pick out the interval's
|
||||||
|
# endpoints
|
||||||
|
path = self.stream_path[sid]
|
||||||
|
tab.file_open.cache_remove_all()
|
||||||
|
for (i, intv) in enumerate(ints):
|
||||||
|
update(i)
|
||||||
|
(stime, etime, spos, epos) = intv
|
||||||
|
if spos == epos and spos >= 0 and spos <= tab.nrows:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
srow = tab[spos]
|
||||||
|
erow = tab[epos-1]
|
||||||
|
except Exception as e:
|
||||||
|
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||||
|
raise RetryFsck
|
||||||
|
return len(ints)
|
||||||
|
|
||||||
|
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||||
|
path = self.stream_path[sid]
|
||||||
|
msg = sprintf("%s: interval %s error accessing rows: %s",
|
||||||
|
path, str(intv), str(msg))
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
err("\n%s\n", msg)
|
||||||
|
|
||||||
|
(stime, etime, spos, epos) = intv
|
||||||
|
# If it's just that the end pos is more than the number of rows
|
||||||
|
# in the table, lower end pos and truncate interval time too.
|
||||||
|
if spos < tab.nrows and epos >= tab.nrows:
|
||||||
|
err("end position is past endrows, but it can be truncated\n")
|
||||||
|
err("old end: time %d, pos %d\n", etime, epos)
|
||||||
|
new_epos = tab.nrows
|
||||||
|
new_etime = tab[new_epos-1] + 1
|
||||||
|
err("new end: time %d, pos %d\n", new_etime, new_epos)
|
||||||
|
if stime < new_etime:
|
||||||
|
# Change it in SQL
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
|
||||||
|
"WHERE stream_id=? AND start_time=? AND "
|
||||||
|
"end_time=? AND start_pos=? AND end_pos=?",
|
||||||
|
(new_etime, new_epos, sid, stime, etime,
|
||||||
|
spos, epos))
|
||||||
|
if cur.rowcount != 1:
|
||||||
|
raise FsckError("failed to fix SQL database")
|
||||||
|
raise RetryFsck
|
||||||
|
err("actually it can't be truncated; times are bad too")
|
||||||
|
|
||||||
|
# Otherwise, the only hope is to delete the interval entirely.
|
||||||
|
err("*** Deleting the entire interval from SQL.\n")
|
||||||
|
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||||
|
err("data from this stream to a new stream, then remove all data\n")
|
||||||
|
err("from and destroy %s.\n")
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("DELETE FROM ranges WHERE "
|
||||||
|
"stream_id=? AND start_time=? AND "
|
||||||
|
"end_time=? AND start_pos=? AND end_pos=?",
|
||||||
|
(sid, stime, etime, spos, epos))
|
||||||
|
if cur.rowcount != 1:
|
||||||
|
raise FsckError("failed to remove interval")
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
|
### Check data in each interval
|
||||||
|
|
||||||
|
def check_data(self):
|
||||||
|
total_rows = sum(sum((y[3] - y[2]) for y in x)
|
||||||
|
for x in self.stream_interval.values())
|
||||||
|
log("checking %s rows of data\n", "{:,d}".format(total_rows))
|
||||||
|
done = 0
|
||||||
|
with Progress(total_rows) as pbar:
|
||||||
|
for sid in self.stream_interval:
|
||||||
|
try:
|
||||||
|
bulk = self.bulkpath + self.stream_path[sid]
|
||||||
|
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||||
|
def update(x):
|
||||||
|
pbar.update(done + x)
|
||||||
|
ints = self.stream_interval[sid]
|
||||||
|
done += self.check_table_data(sid, ints, tab, update)
|
||||||
|
finally:
|
||||||
|
tab.close()
|
||||||
|
|
||||||
|
def check_table_data(self, sid, ints, tab, update):
|
||||||
|
# Pull out all of the interval's data and verify that it's
|
||||||
|
# monotonic.
|
||||||
|
maxrows = 100000
|
||||||
|
path = self.stream_path[sid]
|
||||||
|
layout = self.stream_layout[sid]
|
||||||
|
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
|
||||||
|
tab.file_open.cache_remove_all()
|
||||||
|
done = 0
|
||||||
|
for intv in ints:
|
||||||
|
last_ts = None
|
||||||
|
(stime, etime, spos, epos) = intv
|
||||||
|
if spos == epos:
|
||||||
|
continue
|
||||||
|
for start in xrange(*slice(spos, epos, maxrows).indices(epos)):
|
||||||
|
stop = min(start + maxrows, epos)
|
||||||
|
count = stop - start
|
||||||
|
# Get raw data, convert to NumPy arary
|
||||||
|
try:
|
||||||
|
raw = tab.get_data(start, stop, binary = True)
|
||||||
|
data = numpy.fromstring(raw, dtype)
|
||||||
|
except Exception as e:
|
||||||
|
raise FsckError("%s: failed to grab rows %d through %d: %s",
|
||||||
|
path, start, stop, repr(e))
|
||||||
|
|
||||||
|
# Verify that timestamps are monotonic
|
||||||
|
if (numpy.diff(data['timestamp']) <= 0).any():
|
||||||
|
raise FsckError("%s: non-monotonic timestamp(s) in rows "
|
||||||
|
"%d through %d", path, start, stop)
|
||||||
|
first_ts = data['timestamp'][0]
|
||||||
|
if last_ts is not None and first_ts <= last_ts:
|
||||||
|
raise FsckError("%s: first interval timestamp %d is not "
|
||||||
|
"greater than the previous last interval "
|
||||||
|
"timestamp %d, at row %d",
|
||||||
|
path, first_ts, last_ts, start)
|
||||||
|
last_ts = data['timestamp'][-1]
|
||||||
|
|
||||||
|
# These are probably fixable, by removing the offending
|
||||||
|
# intervals. But I'm not going to bother implementing
|
||||||
|
# that yet.
|
||||||
|
|
||||||
|
# Done
|
||||||
|
done += count
|
||||||
|
update(done)
|
||||||
|
return done
|
||||||
|
@@ -10,14 +10,17 @@ def main():
|
|||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description = 'Check database consistency',
|
description = 'Check database consistency',
|
||||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||||
parser.add_argument("-V", "--version", action="version",
|
version = nilmdb.__version__)
|
||||||
version = nilmdb.__version__)
|
parser.add_argument("-f", "--fix", action="store_true",
|
||||||
parser.add_argument('-d', '--database', help = 'Database directory',
|
default=False, help = 'Fix errors when possible '
|
||||||
default = "./db")
|
'(which may involve removing data)')
|
||||||
|
parser.add_argument("-n", "--no-data", action="store_true",
|
||||||
|
default=False, help = 'Skip the slow full-data check')
|
||||||
|
parser.add_argument('database', help = 'Database directory')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
nilmdb.fsck.Fsck(args.database).check()
|
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
@@ -10,10 +10,8 @@ def main():
|
|||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description = 'Run the NilmDB server',
|
description = 'Run the NilmDB server',
|
||||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||||
|
version = nilmdb.__version__)
|
||||||
parser.add_argument("-V", "--version", action="version",
|
|
||||||
version = nilmdb.__version__)
|
|
||||||
|
|
||||||
group = parser.add_argument_group("Standard options")
|
group = parser.add_argument_group("Standard options")
|
||||||
group.add_argument('-a', '--address',
|
group.add_argument('-a', '--address',
|
||||||
|
@@ -74,8 +74,8 @@ class Root(NilmApp):
|
|||||||
dbsize = nilmdb.utils.du(path)
|
dbsize = nilmdb.utils.du(path)
|
||||||
return { "path": path,
|
return { "path": path,
|
||||||
"size": dbsize,
|
"size": dbsize,
|
||||||
"other": usage.used - dbsize,
|
"other": max(usage.used - dbsize, 0),
|
||||||
"reserved": usage.total - usage.used - usage.free,
|
"reserved": max(usage.total - usage.used - usage.free, 0),
|
||||||
"free": usage.free }
|
"free": usage.free }
|
||||||
|
|
||||||
class Stream(NilmApp):
|
class Stream(NilmApp):
|
||||||
|
@@ -21,7 +21,8 @@ def du(path):
|
|||||||
errors that might occur if we encounter broken symlinks or
|
errors that might occur if we encounter broken symlinks or
|
||||||
files in the process of being removed."""
|
files in the process of being removed."""
|
||||||
try:
|
try:
|
||||||
size = os.path.getsize(path)
|
st = os.stat(path)
|
||||||
|
size = st.st_blocks * 512
|
||||||
if os.path.isdir(path):
|
if os.path.isdir(path):
|
||||||
for thisfile in os.listdir(path):
|
for thisfile in os.listdir(path):
|
||||||
filepath = os.path.join(path, thisfile)
|
filepath = os.path.join(path, thisfile)
|
||||||
|
Reference in New Issue
Block a user