Compare commits
38 Commits
nilmdb-1.8
...
nilmdb-1.1
Author | SHA1 | Date | |
---|---|---|---|
ba55ad82f0 | |||
45c81d2019 | |||
78cfda32e3 | |||
3658d3876b | |||
022b50950f | |||
e5efbadc8e | |||
74f633c9da | |||
ab9a327130 | |||
da72fc9777 | |||
a01cb4132d | |||
7c3da2fe44 | |||
f0e06dc436 | |||
ddc0eb4264 | |||
0a22db3965 | |||
8bb8f068de | |||
416902097d | |||
f5276e9fc8 | |||
c47f28f93a | |||
63b5f99b90 | |||
7d7b89b52f | |||
8d249273c6 | |||
abe431c663 | |||
ccf1f695af | |||
06f7390c9e | |||
6de77a08f1 | |||
8db9771c20 | |||
04f815a24b | |||
6868f5f126 | |||
ca0943ec19 | |||
68addb4e4a | |||
68c33b1f14 | |||
8dd8741100 | |||
8e6341ae5d | |||
422b1e2df2 | |||
0f745b3047 | |||
71cd7ed9b7 | |||
a79d6104d5 | |||
8e8ec59e30 |
@@ -7,4 +7,4 @@
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
if 0:
|
||||
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py
|
||||
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py,nilmdb/fsck
|
||||
|
@@ -8,7 +8,8 @@ Prerequisites:
|
||||
|
||||
# Base NilmDB dependencies
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
||||
sudo apt-get install python-requests python-dateutil python-tz
|
||||
sudo apt-get install python-progressbar python-psutil
|
||||
|
||||
# Other dependencies (required by some modules)
|
||||
sudo apt-get install python-numpy
|
||||
@@ -26,6 +27,7 @@ Install:
|
||||
Usage:
|
||||
|
||||
nilmdb-server --help
|
||||
nilmdb-fsck --help
|
||||
nilmtool --help
|
||||
|
||||
See docs/wsgi.md for info on setting up a WSGI application in Apache.
|
||||
|
@@ -9,7 +9,7 @@ import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
def __init__(self, baseurl = "", post_json = False):
|
||||
def __init__(self, baseurl = "", post_json = False, verify_ssl = True):
|
||||
"""If baseurl is supplied, all other functions that take
|
||||
a URL can be given a relative URL instead."""
|
||||
# Verify / clean up URL
|
||||
@@ -18,9 +18,8 @@ class HTTPClient(object):
|
||||
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
||||
self.baseurl = reparsed.rstrip('/') + '/'
|
||||
|
||||
# Build Requests session object, enable SSL verification
|
||||
self.session = requests.Session()
|
||||
self.session.verify = True
|
||||
# Note whether we want SSL verification
|
||||
self.verify_ssl = verify_ssl
|
||||
|
||||
# Saved response, so that tests can verify a few things.
|
||||
self._last_response = {}
|
||||
@@ -58,16 +57,34 @@ class HTTPClient(object):
|
||||
raise Error(**args)
|
||||
|
||||
def close(self):
|
||||
self.session.close()
|
||||
pass
|
||||
|
||||
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data,
|
||||
stream = stream,
|
||||
headers = headers)
|
||||
# Create a new session, ensure we send "Connection: close",
|
||||
# and explicitly close connection after the transfer.
|
||||
# This is to avoid HTTP/1.1 persistent connections
|
||||
# (keepalive), because they have fundamental race
|
||||
# conditions when there are delays between requests:
|
||||
# a new request may be sent at the same instant that the
|
||||
# server decides to timeout the connection.
|
||||
session = requests.Session()
|
||||
if headers is None:
|
||||
headers = {}
|
||||
headers["Connection"] = "close"
|
||||
response = session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data,
|
||||
stream = stream,
|
||||
headers = headers,
|
||||
verify = self.verify_ssl)
|
||||
|
||||
# Close the connection. If it's a generator (stream =
|
||||
# True), the requests library shouldn't actually close the
|
||||
# HTTP connection until all data has been read from the
|
||||
# response.
|
||||
session.close()
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
|
@@ -19,9 +19,8 @@ except ImportError: # pragma: no cover
|
||||
|
||||
# Valid subcommands. Defined in separate files just to break
|
||||
# things up -- they're still called with Cmdline as self.
|
||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
||||
"insert", "extract", "remove", "destroy",
|
||||
"intervals", "rename" ]
|
||||
subcommands = [ "help", "info", "create", "rename", "list", "intervals",
|
||||
"metadata", "insert", "extract", "remove", "destroy" ]
|
||||
|
||||
# Import the subcommand modules
|
||||
subcmd_mods = {}
|
||||
@@ -29,6 +28,14 @@ for cmd in subcommands:
|
||||
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
|
||||
|
||||
class JimArgumentParser(argparse.ArgumentParser):
|
||||
def parse_args(self, args=None, namespace=None):
|
||||
# Look for --version anywhere and change it to just "nilmtool
|
||||
# --version". This makes "nilmtool cmd --version" work, which
|
||||
# is needed by help2man.
|
||||
if "--version" in (args or sys.argv[1:]):
|
||||
args = [ "--version" ]
|
||||
return argparse.ArgumentParser.parse_args(self, args, namespace)
|
||||
|
||||
def error(self, message):
|
||||
self.print_usage(sys.stderr)
|
||||
self.exit(2, sprintf("error: %s\n", message))
|
||||
@@ -114,7 +121,7 @@ class Cmdline(object):
|
||||
group = self.parser.add_argument_group("General options")
|
||||
group.add_argument("-h", "--help", action='help',
|
||||
help='show this help message and exit')
|
||||
group.add_argument("-V", "--version", action="version",
|
||||
group.add_argument("-v", "--version", action="version",
|
||||
version = nilmdb.__version__)
|
||||
|
||||
group = self.parser.add_argument_group("Server")
|
||||
|
@@ -45,6 +45,8 @@ def setup(self, sub):
|
||||
help="Show raw timestamps when printing times")
|
||||
group.add_argument("-l", "--layout", action="store_true",
|
||||
help="Show layout type next to path name")
|
||||
group.add_argument("-n", "--no-decim", action="store_true",
|
||||
help="Skip paths containing \"~decim-\"")
|
||||
|
||||
return cmd
|
||||
|
||||
@@ -71,6 +73,8 @@ def cmd_list(self):
|
||||
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
||||
if not fnmatch.fnmatch(path, argpath):
|
||||
continue
|
||||
if self.args.no_decim and "~decim-" in path:
|
||||
continue
|
||||
|
||||
if self.args.layout:
|
||||
printf("%s %s\n", path, layout)
|
||||
|
5
nilmdb/fsck/__init__.py
Normal file
5
nilmdb/fsck/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""nilmdb.fsck"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from nilmdb.fsck.fsck import Fsck
|
464
nilmdb/fsck/fsck.py
Normal file
464
nilmdb/fsck/fsck.py
Normal file
@@ -0,0 +1,464 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Check database consistency, with some ability to fix problems.
|
||||
This should be able to fix cases where a database gets corrupted due
|
||||
to unexpected system shutdown, and detect other cases that may cause
|
||||
NilmDB to return errors when trying to manipulate the database."""
|
||||
|
||||
import nilmdb.utils
|
||||
import nilmdb.server
|
||||
import nilmdb.client.numpyclient
|
||||
from nilmdb.utils.interval import IntervalError
|
||||
from nilmdb.server.interval import Interval, IntervalSet
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils.time import timestamp_to_string
|
||||
|
||||
from collections import defaultdict
|
||||
import sqlite3
|
||||
import os
|
||||
import sys
|
||||
import progressbar
|
||||
import re
|
||||
import time
|
||||
import shutil
|
||||
import cPickle as pickle
|
||||
import numpy
|
||||
|
||||
class FsckError(Exception):
|
||||
def __init__(self, msg = "", *args):
|
||||
if args:
|
||||
msg = sprintf(msg, *args)
|
||||
Exception.__init__(self, msg)
|
||||
class FixableFsckError(FsckError):
|
||||
def __init__(self, msg = "", *args):
|
||||
if args:
|
||||
msg = sprintf(msg, *args)
|
||||
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
|
||||
class RetryFsck(FsckError):
|
||||
pass
|
||||
|
||||
def log(format, *args):
|
||||
printf(format, *args)
|
||||
|
||||
def err(format, *args):
|
||||
fprintf(sys.stderr, format, *args)
|
||||
|
||||
# Decorator that retries a function if it returns a specific value
|
||||
def retry_if_raised(exc, message = None, max_retries = 100):
|
||||
def f1(func):
|
||||
def f2(*args, **kwargs):
|
||||
for n in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exc as e:
|
||||
if message:
|
||||
log("%s\n\n", message)
|
||||
raise Exception("Max number of retries (%d) exceeded; giving up")
|
||||
return f2
|
||||
return f1
|
||||
|
||||
class Progress(object):
|
||||
def __init__(self, maxval):
|
||||
if maxval == 0:
|
||||
maxval = 1
|
||||
self.bar = progressbar.ProgressBar(
|
||||
maxval = maxval,
|
||||
widgets = [ progressbar.Percentage(), ' ',
|
||||
progressbar.Bar(), ' ',
|
||||
progressbar.ETA() ])
|
||||
if self.bar.term_width == 0:
|
||||
self.bar.term_width = 75
|
||||
def __enter__(self):
|
||||
self.bar.start()
|
||||
self.last_update = 0
|
||||
return self
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if exc_type is None:
|
||||
self.bar.finish()
|
||||
else:
|
||||
printf("\n")
|
||||
def update(self, val):
|
||||
self.bar.update(val)
|
||||
|
||||
class Fsck(object):
|
||||
|
||||
def __init__(self, path, fix = False):
|
||||
self.basepath = path
|
||||
self.sqlpath = os.path.join(path, "data.sql")
|
||||
self.bulkpath = os.path.join(path, "data")
|
||||
self.bulklock = os.path.join(path, "data.lock")
|
||||
self.fix = fix
|
||||
|
||||
### Main checks
|
||||
|
||||
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
|
||||
def check(self, skip_data = False):
|
||||
self.bulk = None
|
||||
self.sql = None
|
||||
try:
|
||||
self.check_paths()
|
||||
self.check_sql()
|
||||
self.check_streams()
|
||||
self.check_intervals()
|
||||
if skip_data:
|
||||
log("skipped data check\n")
|
||||
else:
|
||||
self.check_data()
|
||||
finally:
|
||||
if self.bulk:
|
||||
self.bulk.close()
|
||||
if self.sql:
|
||||
self.sql.commit()
|
||||
self.sql.close()
|
||||
log("ok\n")
|
||||
|
||||
### Check basic path structure
|
||||
|
||||
def check_paths(self):
|
||||
log("checking paths\n")
|
||||
if self.bulk:
|
||||
self.bulk.close()
|
||||
if not os.path.isfile(self.sqlpath):
|
||||
raise FsckError("SQL database missing (%s)", self.sqlpath)
|
||||
if not os.path.isdir(self.bulkpath):
|
||||
raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
|
||||
with open(self.bulklock, "w") as lockfile:
|
||||
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
||||
raise FsckError('Database already locked by another process\n'
|
||||
'Make sure all other processes that might be '
|
||||
'using the database are stopped.\n'
|
||||
'Restarting apache will cause it to unlock '
|
||||
'the db until a request is received.')
|
||||
# unlocked immediately
|
||||
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
|
||||
|
||||
### Check SQL database health
|
||||
|
||||
def check_sql(self):
|
||||
log("checking sqlite database\n")
|
||||
|
||||
self.sql = sqlite3.connect(self.sqlpath)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
ver = cur.execute("PRAGMA user_version").fetchone()[0]
|
||||
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
|
||||
if ver != good:
|
||||
raise FsckError("database version %d too old, should be %d",
|
||||
ver, good)
|
||||
self.stream_path = {}
|
||||
self.stream_layout = {}
|
||||
log(" loading paths\n")
|
||||
result = cur.execute("SELECT id, path, layout FROM streams")
|
||||
for r in result:
|
||||
if r[0] in self.stream_path:
|
||||
raise FsckError("duplicated ID %d in stream IDs", r[0])
|
||||
self.stream_path[r[0]] = r[1]
|
||||
self.stream_layout[r[0]] = r[2]
|
||||
|
||||
log(" loading intervals\n")
|
||||
self.stream_interval = defaultdict(list)
|
||||
result = cur.execute("SELECT stream_id, start_time, end_time, "
|
||||
"start_pos, end_pos FROM ranges "
|
||||
"ORDER BY start_time")
|
||||
for r in result:
|
||||
if r[0] not in self.stream_path:
|
||||
raise FsckError("interval ID %d not in streams", k)
|
||||
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
|
||||
|
||||
log(" loading metadata\n")
|
||||
self.stream_meta = defaultdict(dict)
|
||||
result = cur.execute("SELECT stream_id, key, value FROM metadata")
|
||||
for r in result:
|
||||
if r[0] not in self.stream_path:
|
||||
raise FsckError("metadata ID %d not in streams", k)
|
||||
if r[1] in self.stream_meta[r[0]]:
|
||||
raise FsckError("duplicate metadata key '%s' for stream %d",
|
||||
r[1], r[0])
|
||||
self.stream_meta[r[0]][r[1]] = r[2]
|
||||
|
||||
### Check streams and basic interval overlap
|
||||
|
||||
def check_streams(self):
|
||||
ids = self.stream_path.keys()
|
||||
log("checking %s streams\n", "{:,d}".format(len(ids)))
|
||||
with Progress(len(ids)) as pbar:
|
||||
for i, sid in enumerate(ids):
|
||||
pbar.update(i)
|
||||
path = self.stream_path[sid]
|
||||
|
||||
# unique path, valid layout
|
||||
if self.stream_path.values().count(path) != 1:
|
||||
raise FsckError("duplicated path %s", path)
|
||||
layout = self.stream_layout[sid].split('_')[0]
|
||||
if layout not in ('int8', 'int16', 'int32', 'int64',
|
||||
'uint8', 'uint16', 'uint32', 'uint64',
|
||||
'float32', 'float64'):
|
||||
raise FsckError("bad layout %s for %s", layout, path)
|
||||
count = int(self.stream_layout[sid].split('_')[1])
|
||||
if count < 1 or count > 1024:
|
||||
raise FsckError("bad count %d for %s", count, path)
|
||||
|
||||
# must exist in bulkdata
|
||||
bulk = self.bulkpath + path
|
||||
if not os.path.isdir(bulk):
|
||||
raise FsckError("%s: missing bulkdata dir", path)
|
||||
if not nilmdb.server.bulkdata.Table.exists(bulk):
|
||||
raise FsckError("%s: bad bulkdata table", path)
|
||||
|
||||
# intervals don't overlap. Abuse IntervalSet to check
|
||||
# for intervals in file positions, too.
|
||||
timeiset = IntervalSet()
|
||||
posiset = IntervalSet()
|
||||
for (stime, etime, spos, epos) in self.stream_interval[sid]:
|
||||
new = Interval(stime, etime)
|
||||
try:
|
||||
timeiset += new
|
||||
except IntervalError:
|
||||
raise FsckError("%s: overlap in intervals:\n"
|
||||
"set: %s\nnew: %s",
|
||||
path, str(timeiset), str(new))
|
||||
if spos != epos:
|
||||
new = Interval(spos, epos)
|
||||
try:
|
||||
posiset += new
|
||||
except IntervalError:
|
||||
raise FsckError("%s: overlap in file offsets:\n"
|
||||
"set: %s\nnew: %s",
|
||||
path, str(posiset), str(new))
|
||||
|
||||
# check bulkdata
|
||||
self.check_bulkdata(sid, path, bulk)
|
||||
|
||||
# Check that we can open bulkdata
|
||||
try:
|
||||
tab = None
|
||||
try:
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
except Exception as e:
|
||||
raise FsckError("%s: can't open bulkdata: %s",
|
||||
path, str(e))
|
||||
finally:
|
||||
if tab:
|
||||
tab.close()
|
||||
|
||||
### Check that bulkdata is good enough to be opened
|
||||
|
||||
@retry_if_raised(RetryFsck)
|
||||
def check_bulkdata(self, sid, path, bulk):
|
||||
with open(os.path.join(bulk, "_format"), "rb") as f:
|
||||
fmt = pickle.load(f)
|
||||
if fmt["version"] != 3:
|
||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
||||
path, fmt["version"])
|
||||
row_per_file = int(fmt["rows_per_file"])
|
||||
files_per_dir = int(fmt["files_per_dir"])
|
||||
layout = fmt["layout"]
|
||||
if layout != self.stream_layout[sid]:
|
||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
||||
layout, self.stream_layout[sid])
|
||||
|
||||
# Every file should have a size that's the multiple of the row size
|
||||
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
||||
row_size = rkt.binary_size
|
||||
rkt.close()
|
||||
|
||||
# Find all directories
|
||||
regex = re.compile("^[0-9a-f]{4,}$")
|
||||
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
|
||||
key = lambda x: int(x, 16), reverse = True)
|
||||
for subdir in subdirs:
|
||||
# Find all files in that dir
|
||||
subpath = os.path.join(bulk, subdir)
|
||||
files = filter(regex.search, os.listdir(subpath))
|
||||
if not files:
|
||||
self.fix_empty_subdir(subpath)
|
||||
raise RetryFsck
|
||||
# Verify that their size is a multiple of the row size
|
||||
for filename in files:
|
||||
filepath = os.path.join(subpath, filename)
|
||||
offset = os.path.getsize(filepath)
|
||||
if offset % row_size:
|
||||
self.fix_bad_filesize(path, filepath, offset, row_size)
|
||||
|
||||
def fix_empty_subdir(self, subpath):
|
||||
msg = sprintf("bulkdata path %s is missing data files", subpath)
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
# Try to fix it by just deleting whatever is present,
|
||||
# as long as it's only ".removed" files.
|
||||
err("\n%s\n", msg)
|
||||
for fn in os.listdir(subpath):
|
||||
if not fn.endswith(".removed"):
|
||||
raise FsckError("can't fix automatically: please manually "
|
||||
"remove the file %s and try again",
|
||||
os.path.join(subpath, fn))
|
||||
# Remove the whole thing
|
||||
err("Removing empty subpath\n")
|
||||
shutil.rmtree(subpath)
|
||||
raise RetryFsck
|
||||
|
||||
def fix_bad_filesize(self, path, filepath, offset, row_size):
|
||||
extra = offset % row_size
|
||||
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
|
||||
" of row size (%d): %d extra bytes present",
|
||||
path, filepath, offset, row_size, extra)
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
# Try to fix it by just truncating the file
|
||||
err("\n%s\n", msg)
|
||||
newsize = offset - extra
|
||||
err("Truncating file to %d bytes and retrying\n", newsize)
|
||||
with open(filepath, "r+b") as f:
|
||||
f.truncate(newsize)
|
||||
raise RetryFsck
|
||||
|
||||
### Check interval endpoints
|
||||
|
||||
def check_intervals(self):
|
||||
total_ints = sum(len(x) for x in self.stream_interval.values())
|
||||
log("checking %s intervals\n", "{:,d}".format(total_ints))
|
||||
done = 0
|
||||
with Progress(total_ints) as pbar:
|
||||
for sid in self.stream_interval:
|
||||
try:
|
||||
bulk = self.bulkpath + self.stream_path[sid]
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
def update(x):
|
||||
pbar.update(done + x)
|
||||
ints = self.stream_interval[sid]
|
||||
done += self.check_table_intervals(sid, ints, tab, update)
|
||||
finally:
|
||||
tab.close()
|
||||
|
||||
def check_table_intervals(self, sid, ints, tab, update):
|
||||
# look in the table to make sure we can pick out the interval's
|
||||
# endpoints
|
||||
path = self.stream_path[sid]
|
||||
tab.file_open.cache_remove_all()
|
||||
for (i, intv) in enumerate(ints):
|
||||
update(i)
|
||||
(stime, etime, spos, epos) = intv
|
||||
if spos == epos and spos >= 0 and spos <= tab.nrows:
|
||||
continue
|
||||
try:
|
||||
srow = tab[spos]
|
||||
erow = tab[epos-1]
|
||||
except Exception as e:
|
||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||
raise RetryFsck
|
||||
return len(ints)
|
||||
|
||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||
path = self.stream_path[sid]
|
||||
msg = sprintf("%s: interval %s error accessing rows: %s",
|
||||
path, str(intv), str(msg))
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
err("\n%s\n", msg)
|
||||
|
||||
(stime, etime, spos, epos) = intv
|
||||
# If it's just that the end pos is more than the number of rows
|
||||
# in the table, lower end pos and truncate interval time too.
|
||||
if spos < tab.nrows and epos >= tab.nrows:
|
||||
err("end position is past endrows, but it can be truncated\n")
|
||||
err("old end: time %d, pos %d\n", etime, epos)
|
||||
new_epos = tab.nrows
|
||||
new_etime = tab[new_epos-1] + 1
|
||||
err("new end: time %d, pos %d\n", new_etime, new_epos)
|
||||
if stime < new_etime:
|
||||
# Change it in SQL
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
|
||||
"WHERE stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND end_pos=?",
|
||||
(new_etime, new_epos, sid, stime, etime,
|
||||
spos, epos))
|
||||
if cur.rowcount != 1:
|
||||
raise FsckError("failed to fix SQL database")
|
||||
raise RetryFsck
|
||||
err("actually it can't be truncated; times are bad too")
|
||||
|
||||
# Otherwise, the only hope is to delete the interval entirely.
|
||||
err("*** Deleting the entire interval from SQL.\n")
|
||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||
err("data from this stream to a new stream, then remove all data\n")
|
||||
err("from and destroy %s.\n", path)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("DELETE FROM ranges WHERE "
|
||||
"stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND end_pos=?",
|
||||
(sid, stime, etime, spos, epos))
|
||||
if cur.rowcount != 1:
|
||||
raise FsckError("failed to remove interval")
|
||||
raise RetryFsck
|
||||
|
||||
### Check data in each interval
|
||||
|
||||
def check_data(self):
|
||||
total_rows = sum(sum((y[3] - y[2]) for y in x)
|
||||
for x in self.stream_interval.values())
|
||||
log("checking %s rows of data\n", "{:,d}".format(total_rows))
|
||||
done = 0
|
||||
with Progress(total_rows) as pbar:
|
||||
for sid in self.stream_interval:
|
||||
try:
|
||||
bulk = self.bulkpath + self.stream_path[sid]
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
def update(x):
|
||||
pbar.update(done + x)
|
||||
ints = self.stream_interval[sid]
|
||||
done += self.check_table_data(sid, ints, tab, update)
|
||||
finally:
|
||||
tab.close()
|
||||
|
||||
def check_table_data(self, sid, ints, tab, update):
|
||||
# Pull out all of the interval's data and verify that it's
|
||||
# monotonic.
|
||||
maxrows = 100000
|
||||
path = self.stream_path[sid]
|
||||
layout = self.stream_layout[sid]
|
||||
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
|
||||
tab.file_open.cache_remove_all()
|
||||
done = 0
|
||||
for intv in ints:
|
||||
last_ts = None
|
||||
(stime, etime, spos, epos) = intv
|
||||
|
||||
# Break interval into maxrows-sized chunks
|
||||
next_start = spos
|
||||
while next_start < epos:
|
||||
start = next_start
|
||||
stop = min(start + maxrows, epos)
|
||||
count = stop - start
|
||||
next_start = stop
|
||||
|
||||
# Get raw data, convert to NumPy arary
|
||||
try:
|
||||
raw = tab.get_data(start, stop, binary = True)
|
||||
data = numpy.fromstring(raw, dtype)
|
||||
except Exception as e:
|
||||
raise FsckError("%s: failed to grab rows %d through %d: %s",
|
||||
path, start, stop, repr(e))
|
||||
|
||||
# Verify that timestamps are monotonic
|
||||
if (numpy.diff(data['timestamp']) <= 0).any():
|
||||
raise FsckError("%s: non-monotonic timestamp(s) in rows "
|
||||
"%d through %d", path, start, stop)
|
||||
first_ts = data['timestamp'][0]
|
||||
if last_ts is not None and first_ts <= last_ts:
|
||||
raise FsckError("%s: first interval timestamp %d is not "
|
||||
"greater than the previous last interval "
|
||||
"timestamp %d, at row %d",
|
||||
path, first_ts, last_ts, start)
|
||||
last_ts = data['timestamp'][-1]
|
||||
|
||||
# These are probably fixable, by removing the offending
|
||||
# intervals. But I'm not going to bother implementing
|
||||
# that yet.
|
||||
|
||||
# Done
|
||||
done += count
|
||||
update(done)
|
||||
return done
|
26
nilmdb/scripts/nilmdb_fsck.py
Executable file
26
nilmdb/scripts/nilmdb_fsck.py
Executable file
@@ -0,0 +1,26 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import nilmdb.fsck
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
def main():
|
||||
"""Main entry point for the 'nilmdb-fsck' command line script"""
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = 'Check database consistency',
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||
version = nilmdb.__version__)
|
||||
parser.add_argument("-f", "--fix", action="store_true",
|
||||
default=False, help = 'Fix errors when possible '
|
||||
'(which may involve removing data)')
|
||||
parser.add_argument("-n", "--no-data", action="store_true",
|
||||
default=False, help = 'Skip the slow full-data check')
|
||||
parser.add_argument('database', help = 'Database directory')
|
||||
args = parser.parse_args()
|
||||
|
||||
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -10,10 +10,8 @@ def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = 'Run the NilmDB server',
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument("-V", "--version", action="version",
|
||||
version = nilmdb.__version__)
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||
version = nilmdb.__version__)
|
||||
|
||||
group = parser.add_argument_group("Standard options")
|
||||
group.add_argument('-a', '--address',
|
||||
|
@@ -43,6 +43,12 @@ class BulkData(object):
|
||||
# 32768 files per dir should work even on FAT32
|
||||
self.files_per_dir = 32768
|
||||
|
||||
if "initial_nrows" in kwargs:
|
||||
self.initial_nrows = kwargs["initial_nrows"]
|
||||
else:
|
||||
# First row is 0
|
||||
self.initial_nrows = 0
|
||||
|
||||
# Make root path
|
||||
if not os.path.isdir(self.root):
|
||||
os.mkdir(self.root)
|
||||
@@ -194,6 +200,9 @@ class BulkData(object):
|
||||
if oldospath == newospath:
|
||||
raise ValueError("old and new paths are the same")
|
||||
|
||||
# Remove Table object at old path from cache
|
||||
self.getnode.cache_remove(self, oldunicodepath)
|
||||
|
||||
# Move the table to a temporary location
|
||||
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
|
||||
tmppath = os.path.join(tmpdir, "table")
|
||||
@@ -251,7 +260,7 @@ class BulkData(object):
|
||||
path = self._encode_filename(unicodepath)
|
||||
elements = path.lstrip('/').split('/')
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
return Table(ospath, self.initial_nrows)
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class Table(object):
|
||||
@@ -288,9 +297,10 @@ class Table(object):
|
||||
pickle.dump(fmt, f, 2)
|
||||
|
||||
# Normal methods
|
||||
def __init__(self, root):
|
||||
def __init__(self, root, initial_nrows):
|
||||
"""'root' is the full OS path to the directory of this table"""
|
||||
self.root = root
|
||||
self.initial_nrows = initial_nrows
|
||||
|
||||
# Load the format
|
||||
with open(os.path.join(self.root, "_format"), "rb") as f:
|
||||
@@ -330,7 +340,8 @@ class Table(object):
|
||||
|
||||
# Find the last directory. We sort and loop through all of them,
|
||||
# starting with the numerically greatest, because the dirs could be
|
||||
# empty if something was deleted.
|
||||
# empty if something was deleted but the directory was unexpectedly
|
||||
# not deleted.
|
||||
subdirs = sorted(filter(regex.search, os.listdir(self.root)),
|
||||
key = lambda x: int(x, 16), reverse = True)
|
||||
|
||||
@@ -349,8 +360,14 @@ class Table(object):
|
||||
# Convert to row number
|
||||
return self._row_from_offset(subdir, filename, offset)
|
||||
|
||||
# No files, so no data
|
||||
return 0
|
||||
# No files, so no data. We typically start at row 0 in this
|
||||
# case, although initial_nrows is specified during some tests
|
||||
# to exercise other parts of the code better. Since we have
|
||||
# no files yet, round initial_nrows up so it points to a row
|
||||
# that would begin a new file.
|
||||
nrows = ((self.initial_nrows + (self.rows_per_file - 1)) //
|
||||
self.rows_per_file) * self.rows_per_file
|
||||
return nrows
|
||||
|
||||
def _offset_from_row(self, row):
|
||||
"""Return a (subdir, filename, offset, count) tuple:
|
||||
|
@@ -23,7 +23,6 @@ from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
|
||||
import sqlite3
|
||||
import os
|
||||
import errno
|
||||
import bisect
|
||||
|
||||
# Note about performance and transactions:
|
||||
#
|
||||
@@ -83,8 +82,11 @@ _sql_schema_updates = {
|
||||
class NilmDB(object):
|
||||
verbose = 0
|
||||
|
||||
def __init__(self, basepath, max_results=None,
|
||||
max_removals=None, bulkdata_args=None):
|
||||
def __init__(self, basepath,
|
||||
max_results=None,
|
||||
max_removals=None,
|
||||
max_int_removals=None,
|
||||
bulkdata_args=None):
|
||||
"""Initialize NilmDB at the given basepath.
|
||||
Other arguments are for debugging / testing:
|
||||
|
||||
@@ -92,7 +94,10 @@ class NilmDB(object):
|
||||
stream_intervals or stream_extract response.
|
||||
|
||||
'max_removals' is the max rows to delete at once
|
||||
in stream_move.
|
||||
in stream_remove.
|
||||
|
||||
'max_int_removals' is the max intervals to delete
|
||||
at once in stream_remove.
|
||||
|
||||
'bulkdata_args' is kwargs for the bulkdata module.
|
||||
"""
|
||||
@@ -134,6 +139,9 @@ class NilmDB(object):
|
||||
# Remove up to this many rows per call to stream_remove.
|
||||
self.max_removals = max_removals or 1048576
|
||||
|
||||
# Remove up to this many intervals per call to stream_remove.
|
||||
self.max_int_removals = max_int_removals or 4096
|
||||
|
||||
def get_basepath(self):
|
||||
return self.basepath
|
||||
|
||||
@@ -507,6 +515,17 @@ class NilmDB(object):
|
||||
# And that's all
|
||||
return
|
||||
|
||||
def _bisect_left(self, a, x, lo, hi):
|
||||
# Like bisect.bisect_left, but doesn't choke on large indices on
|
||||
# 32-bit systems, like bisect's fast C implementation does.
|
||||
while lo < hi:
|
||||
mid = (lo + hi) / 2
|
||||
if a[mid] < x:
|
||||
lo = mid + 1
|
||||
else:
|
||||
hi = mid
|
||||
return lo
|
||||
|
||||
def _find_start(self, table, dbinterval):
|
||||
"""
|
||||
Given a DBInterval, find the row in the database that
|
||||
@@ -517,10 +536,10 @@ class NilmDB(object):
|
||||
# Optimization for the common case where an interval wasn't truncated
|
||||
if dbinterval.start == dbinterval.db_start:
|
||||
return dbinterval.db_startpos
|
||||
return bisect.bisect_left(table,
|
||||
dbinterval.start,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
return self._bisect_left(table,
|
||||
dbinterval.start,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def _find_end(self, table, dbinterval):
|
||||
"""
|
||||
@@ -536,10 +555,10 @@ class NilmDB(object):
|
||||
# want to include the given timestamp in the results. This is
|
||||
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
|
||||
# non-overlapping data.
|
||||
return bisect.bisect_left(table,
|
||||
dbinterval.end,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
return self._bisect_left(table,
|
||||
dbinterval.end,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None,
|
||||
count = False, markup = False, binary = False):
|
||||
@@ -643,13 +662,22 @@ class NilmDB(object):
|
||||
to_remove = Interval(start, end)
|
||||
removed = 0
|
||||
remaining = self.max_removals
|
||||
int_remaining = self.max_int_removals
|
||||
restart = None
|
||||
|
||||
# Can't remove intervals from within the iterator, so we need to
|
||||
# remember what's currently in the intersection now.
|
||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||
|
||||
remove_start = None
|
||||
remove_end = None
|
||||
|
||||
for (dbint, orig) in all_candidates:
|
||||
# Stop if we've hit the max number of interval removals
|
||||
if int_remaining <= 0:
|
||||
restart = dbint.start
|
||||
break
|
||||
|
||||
# Find row start and end
|
||||
row_start = self._find_start(table, dbint)
|
||||
row_end = self._find_end(table, dbint)
|
||||
@@ -670,14 +698,29 @@ class NilmDB(object):
|
||||
# Remove interval from the database
|
||||
self._remove_interval(stream_id, orig, dbint)
|
||||
|
||||
# Remove data from the underlying table storage
|
||||
table.remove(row_start, row_end)
|
||||
# Remove data from the underlying table storage,
|
||||
# coalescing adjacent removals to reduce the number of calls
|
||||
# to table.remove.
|
||||
if remove_end == row_start:
|
||||
# Extend our coalesced region
|
||||
remove_end = row_end
|
||||
else:
|
||||
# Perform previous removal, then save this one
|
||||
if remove_end is not None:
|
||||
table.remove(remove_start, remove_end)
|
||||
remove_start = row_start
|
||||
remove_end = row_end
|
||||
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
remaining -= row_end - row_start
|
||||
int_remaining -= 1
|
||||
|
||||
if restart is not None:
|
||||
break
|
||||
|
||||
# Perform any final coalesced removal
|
||||
if remove_end is not None:
|
||||
table.remove(remove_start, remove_end)
|
||||
|
||||
return (removed, restart)
|
||||
|
@@ -74,8 +74,8 @@ class Root(NilmApp):
|
||||
dbsize = nilmdb.utils.du(path)
|
||||
return { "path": path,
|
||||
"size": dbsize,
|
||||
"other": usage.used - dbsize,
|
||||
"reserved": usage.total - usage.used - usage.free,
|
||||
"other": max(usage.used - dbsize, 0),
|
||||
"reserved": max(usage.total - usage.used - usage.free, 0),
|
||||
"free": usage.free }
|
||||
|
||||
class Stream(NilmApp):
|
||||
@@ -429,7 +429,7 @@ class Server(object):
|
||||
cherrypy.config.update({
|
||||
'server.socket_host': host,
|
||||
'server.socket_port': port,
|
||||
'engine.autoreload_on': False,
|
||||
'engine.autoreload.on': False,
|
||||
'server.max_request_body_size': 8*1024*1024,
|
||||
})
|
||||
if self.embedded:
|
||||
|
@@ -21,7 +21,8 @@ def du(path):
|
||||
errors that might occur if we encounter broken symlinks or
|
||||
files in the process of being removed."""
|
||||
try:
|
||||
size = os.path.getsize(path)
|
||||
st = os.stat(path)
|
||||
size = st.st_blocks * 512
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
|
@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
||||
|
||||
@wrap_class_method
|
||||
def __del__(orig, self, *args, **kwargs):
|
||||
if "_must_close" in self.__dict__:
|
||||
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
||||
self.__class__.__name__)
|
||||
return orig(self, *args, **kwargs)
|
||||
try:
|
||||
if "_must_close" in self.__dict__:
|
||||
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
||||
self.__class__.__name__)
|
||||
return orig(self, *args, **kwargs)
|
||||
except: # pragma: no cover
|
||||
pass
|
||||
|
||||
@wrap_class_method
|
||||
def close(orig, self, *args, **kwargs):
|
||||
|
@@ -117,7 +117,10 @@ def serializer_proxy(obj_or_type):
|
||||
return ret
|
||||
|
||||
def __del__(self):
|
||||
self.__call_queue.put((None, None, None, None))
|
||||
self.__thread.join()
|
||||
try:
|
||||
self.__call_queue.put((None, None, None, None))
|
||||
self.__thread.join()
|
||||
except: # pragma: no cover
|
||||
pass
|
||||
|
||||
return SerializerObjectProxy(obj_or_type)
|
||||
|
@@ -87,7 +87,7 @@ def parse_time(toparse):
|
||||
try:
|
||||
return unix_to_timestamp(datetime_tz.datetime_tz.
|
||||
smartparse(toparse).totimestamp())
|
||||
except (ValueError, OverflowError):
|
||||
except (ValueError, OverflowError, TypeError):
|
||||
pass
|
||||
|
||||
# If it's parseable as a float, treat it as a Unix or NILM
|
||||
|
16
setup.py
16
setup.py
@@ -6,15 +6,6 @@
|
||||
# Then just package it up:
|
||||
# python setup.py sdist
|
||||
|
||||
# This is supposed to be using Distribute:
|
||||
#
|
||||
# distutils provides a "setup" method.
|
||||
# setuptools is a set of monkeypatches on top of that.
|
||||
# distribute is a particular version/implementation of setuptools.
|
||||
#
|
||||
# So we don't really know if this is using the old setuptools or the
|
||||
# Distribute-provided version of setuptools.
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
import os
|
||||
@@ -109,7 +100,7 @@ setup(name='nilmdb',
|
||||
'coverage',
|
||||
'numpy',
|
||||
],
|
||||
setup_requires = [ 'distribute',
|
||||
setup_requires = [ 'setuptools',
|
||||
],
|
||||
install_requires = [ 'decorator',
|
||||
'cherrypy >= 3.2',
|
||||
@@ -117,7 +108,8 @@ setup(name='nilmdb',
|
||||
'python-dateutil',
|
||||
'pytz',
|
||||
'psutil >= 0.3.0',
|
||||
'requests >= 1.1.0, < 2.0.0',
|
||||
'requests >= 1.1.0',
|
||||
'progressbar >= 2.2',
|
||||
],
|
||||
packages = [ 'nilmdb',
|
||||
'nilmdb.utils',
|
||||
@@ -126,11 +118,13 @@ setup(name='nilmdb',
|
||||
'nilmdb.client',
|
||||
'nilmdb.cmdline',
|
||||
'nilmdb.scripts',
|
||||
'nilmdb.fsck',
|
||||
],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'nilmtool = nilmdb.scripts.nilmtool:main',
|
||||
'nilmdb-server = nilmdb.scripts.nilmdb_server:main',
|
||||
'nilmdb-fsck = nilmdb.scripts.nilmdb_fsck:main',
|
||||
],
|
||||
},
|
||||
ext_modules = ext_modules,
|
||||
|
@@ -690,40 +690,15 @@ class TestClient(object):
|
||||
client.close()
|
||||
|
||||
def test_client_12_persistent(self):
|
||||
# Check that connections are persistent when they should be.
|
||||
# This is pretty hard to test; we have to poke deep into
|
||||
# the Requests library.
|
||||
# Check that connections are NOT persistent. Rather than trying
|
||||
# to verify this at the TCP level, just make sure that the response
|
||||
# contained a "Connection: close" header.
|
||||
with nilmdb.client.Client(url = testurl) as c:
|
||||
def connections():
|
||||
try:
|
||||
poolmanager = c.http._last_response.connection.poolmanager
|
||||
pool = poolmanager.pools[('http','localhost',32180)]
|
||||
return (pool.num_connections, pool.num_requests)
|
||||
except Exception:
|
||||
raise SkipTest("can't get connection info")
|
||||
|
||||
# First request makes a connection
|
||||
c.stream_create("/persist/test", "uint16_1")
|
||||
eq_(connections(), (1, 1))
|
||||
eq_(c.http._last_response.headers["Connection"], "close")
|
||||
|
||||
# Non-generator
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 2))
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 3))
|
||||
|
||||
# Generators
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 4))
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 5))
|
||||
|
||||
# Clean up
|
||||
c.stream_remove("/persist/test")
|
||||
c.stream_destroy("/persist/test")
|
||||
eq_(connections(), (1, 7))
|
||||
eq_(c.http._last_response.headers["Connection"], "close")
|
||||
|
||||
def test_client_13_timestamp_rounding(self):
|
||||
# Test potentially bad timestamps (due to floating point
|
||||
|
@@ -21,13 +21,17 @@ from testutil.helpers import *
|
||||
|
||||
testdb = "tests/cmdline-testdb"
|
||||
|
||||
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
|
||||
def server_start(max_results = None,
|
||||
max_removals = None,
|
||||
max_int_removals = None,
|
||||
bulkdata_args = {}):
|
||||
global test_server, test_db
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb,
|
||||
max_results = max_results,
|
||||
max_removals = max_removals,
|
||||
max_int_removals = max_int_removals,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
@@ -59,8 +63,7 @@ class TestCmdline(object):
|
||||
|
||||
def run(self, arg_string, infile=None, outfile=None):
|
||||
"""Run a cmdline client with the specified argument string,
|
||||
passing the given input. Returns a tuple with the output and
|
||||
exit code"""
|
||||
passing the given input. Save the output and exit code."""
|
||||
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
|
||||
os.environ['NILMDB_URL'] = "http://localhost:32180/"
|
||||
class stdio_wrapper:
|
||||
@@ -160,6 +163,12 @@ class TestCmdline(object):
|
||||
self.ok("--help")
|
||||
self.contain("usage:")
|
||||
|
||||
# help
|
||||
self.ok("--version")
|
||||
ver = self.captured
|
||||
self.ok("list --version")
|
||||
eq_(self.captured, ver)
|
||||
|
||||
# fail for no args
|
||||
self.fail("")
|
||||
|
||||
@@ -285,6 +294,7 @@ class TestCmdline(object):
|
||||
self.ok("create /newton/zzz/rawnotch uint16_9")
|
||||
self.ok("create /newton/prep float32_8")
|
||||
self.ok("create /newton/raw uint16_6")
|
||||
self.ok("create /newton/raw~decim-1234 uint16_6")
|
||||
|
||||
# Create a stream that already exists
|
||||
self.fail("create /newton/raw uint16_6")
|
||||
@@ -300,13 +310,23 @@ class TestCmdline(object):
|
||||
self.fail("create /newton/zzz float32_8")
|
||||
self.contain("subdirs of this path already exist")
|
||||
|
||||
# Verify we got those 3 streams and they're returned in
|
||||
# Verify we got those 4 streams and they're returned in
|
||||
# alphabetical order.
|
||||
self.ok("list -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/raw~decim-1234 uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
# No decimated streams if -n specified
|
||||
self.ok("list -n -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
# Delete that decimated stream
|
||||
self.ok("destroy /newton/raw~decim-1234")
|
||||
|
||||
# Match just one type or one path. Also check
|
||||
# that --path is optional
|
||||
self.ok("list --layout /newton/raw")
|
||||
@@ -814,9 +834,12 @@ class TestCmdline(object):
|
||||
def test_13_files(self):
|
||||
# Test BulkData's ability to split into multiple files,
|
||||
# by forcing the file size to be really small.
|
||||
# Also increase the initial nrows, so that start/end positions
|
||||
# in the database are very large (> 32 bit)
|
||||
server_stop()
|
||||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
|
||||
# Fill data
|
||||
self.ok("create /newton/prep float32_8")
|
||||
@@ -864,14 +887,28 @@ class TestCmdline(object):
|
||||
self.ok("destroy -R /newton/prep") # destroy again
|
||||
|
||||
def test_14_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
# Also limit max_removals, to cover more functionality.
|
||||
# Limit max_removals, to cover more functionality.
|
||||
server_stop()
|
||||
server_start(max_removals = 4321,
|
||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
self.do_remove_files()
|
||||
self.ok("destroy -R /newton/prep") # destroy again
|
||||
|
||||
def test_14b_remove_files_maxint(self):
|
||||
# Limit max_int_removals, to cover more functionality.
|
||||
server_stop()
|
||||
server_start(max_int_removals = 1,
|
||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
self.do_remove_files()
|
||||
|
||||
def do_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
|
||||
# Insert data. Just for fun, insert out of order
|
||||
self.ok("create /newton/prep float32_8")
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import datetime_tz
|
||||
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
@@ -19,6 +20,8 @@ class TestTimestamper(object):
|
||||
def join(list):
|
||||
return "\n".join(list) + "\n"
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
|
||||
start = nilmdb.utils.time.parse_time("03/24/2012")
|
||||
lines_in = [ "hello", "world", "hello world", "# commented out" ]
|
||||
lines_out = [ "1332561600000000 hello",
|
||||
|
Reference in New Issue
Block a user