Compare commits
	
		
			34 Commits
		
	
	
		
			nilmdb-1.8
			...
			nilmdb-1.1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| ab9a327130 | |||
| da72fc9777 | |||
| a01cb4132d | |||
| 7c3da2fe44 | |||
| f0e06dc436 | |||
| ddc0eb4264 | |||
| 0a22db3965 | |||
| 8bb8f068de | |||
| 416902097d | |||
| f5276e9fc8 | |||
| c47f28f93a | |||
| 63b5f99b90 | |||
| 7d7b89b52f | |||
| 8d249273c6 | |||
| abe431c663 | |||
| ccf1f695af | |||
| 06f7390c9e | |||
| 6de77a08f1 | |||
| 8db9771c20 | |||
| 04f815a24b | |||
| 6868f5f126 | |||
| ca0943ec19 | |||
| 68addb4e4a | |||
| 68c33b1f14 | |||
| 8dd8741100 | |||
| 8e6341ae5d | |||
| 422b1e2df2 | |||
| 0f745b3047 | |||
| 71cd7ed9b7 | |||
| a79d6104d5 | |||
| 8e8ec59e30 | |||
| b89b945a0f | |||
| bd7bdb2eb8 | |||
| 840cd2fd13 | 
| @@ -7,4 +7,4 @@ | |||||||
| exclude_lines = | exclude_lines = | ||||||
| 	pragma: no cover | 	pragma: no cover | ||||||
| 	if 0: | 	if 0: | ||||||
| omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py | omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py,nilmdb/fsck | ||||||
|   | |||||||
| @@ -8,7 +8,8 @@ Prerequisites: | |||||||
|  |  | ||||||
|   # Base NilmDB dependencies |   # Base NilmDB dependencies | ||||||
|   sudo apt-get install python-cherrypy3 python-decorator python-simplejson |   sudo apt-get install python-cherrypy3 python-decorator python-simplejson | ||||||
|   sudo apt-get install python-requests python-dateutil python-tz python-psutil |   sudo apt-get install python-requests python-dateutil python-tz | ||||||
|  |   sudo apt-get install python-progressbar python-psutil | ||||||
|  |  | ||||||
|   # Other dependencies (required by some modules) |   # Other dependencies (required by some modules) | ||||||
|   sudo apt-get install python-numpy |   sudo apt-get install python-numpy | ||||||
| @@ -26,6 +27,7 @@ Install: | |||||||
| Usage: | Usage: | ||||||
|  |  | ||||||
|   nilmdb-server --help |   nilmdb-server --help | ||||||
|  |   nilmdb-fsck --help | ||||||
|   nilmtool --help |   nilmtool --help | ||||||
|  |  | ||||||
| See docs/wsgi.md for info on setting up a WSGI application in Apache. | See docs/wsgi.md for info on setting up a WSGI application in Apache. | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ import requests | |||||||
|  |  | ||||||
| class HTTPClient(object): | class HTTPClient(object): | ||||||
|     """Class to manage and perform HTTP requests from the client""" |     """Class to manage and perform HTTP requests from the client""" | ||||||
|     def __init__(self, baseurl = "", post_json = False): |     def __init__(self, baseurl = "", post_json = False, verify_ssl = True): | ||||||
|         """If baseurl is supplied, all other functions that take |         """If baseurl is supplied, all other functions that take | ||||||
|         a URL can be given a relative URL instead.""" |         a URL can be given a relative URL instead.""" | ||||||
|         # Verify / clean up URL |         # Verify / clean up URL | ||||||
| @@ -18,9 +18,8 @@ class HTTPClient(object): | |||||||
|             reparsed = urlparse.urlparse("http://" + baseurl).geturl() |             reparsed = urlparse.urlparse("http://" + baseurl).geturl() | ||||||
|         self.baseurl = reparsed.rstrip('/') + '/' |         self.baseurl = reparsed.rstrip('/') + '/' | ||||||
|  |  | ||||||
|         # Build Requests session object, enable SSL verification |         # Note whether we want SSL verification | ||||||
|         self.session = requests.Session() |         self.verify_ssl = verify_ssl | ||||||
|         self.session.verify = True |  | ||||||
|  |  | ||||||
|         # Saved response, so that tests can verify a few things. |         # Saved response, so that tests can verify a few things. | ||||||
|         self._last_response = {} |         self._last_response = {} | ||||||
| @@ -58,16 +57,34 @@ class HTTPClient(object): | |||||||
|                 raise Error(**args) |                 raise Error(**args) | ||||||
|  |  | ||||||
|     def close(self): |     def close(self): | ||||||
|         self.session.close() |         pass | ||||||
|  |  | ||||||
|     def _do_req(self, method, url, query_data, body_data, stream, headers): |     def _do_req(self, method, url, query_data, body_data, stream, headers): | ||||||
|         url = urlparse.urljoin(self.baseurl, url) |         url = urlparse.urljoin(self.baseurl, url) | ||||||
|         try: |         try: | ||||||
|             response = self.session.request(method, url, |             # Create a new session, ensure we send "Connection: close", | ||||||
|                                             params = query_data, |             # and explicitly close connection after the transfer. | ||||||
|                                             data = body_data, |             # This is to avoid HTTP/1.1 persistent connections | ||||||
|                                             stream = stream, |             # (keepalive), because they have fundamental race | ||||||
|                                             headers = headers) |             # conditions when there are delays between requests: | ||||||
|  |             # a new request may be sent at the same instant that the | ||||||
|  |             # server decides to timeout the connection. | ||||||
|  |             session = requests.Session() | ||||||
|  |             if headers is None: | ||||||
|  |                 headers = {} | ||||||
|  |             headers["Connection"] = "close" | ||||||
|  |             response = session.request(method, url, | ||||||
|  |                                        params = query_data, | ||||||
|  |                                        data = body_data, | ||||||
|  |                                        stream = stream, | ||||||
|  |                                        headers = headers, | ||||||
|  |                                        verify = self.verify_ssl) | ||||||
|  |  | ||||||
|  |             # Close the connection.  If it's a generator (stream = | ||||||
|  |             # True), the requests library shouldn't actually close the | ||||||
|  |             # HTTP connection until all data has been read from the | ||||||
|  |             # response. | ||||||
|  |             session.close() | ||||||
|         except requests.RequestException as e: |         except requests.RequestException as e: | ||||||
|             raise ServerError(status = "502 Error", url = url, |             raise ServerError(status = "502 Error", url = url, | ||||||
|                               message = str(e.message)) |                               message = str(e.message)) | ||||||
|   | |||||||
| @@ -19,9 +19,8 @@ except ImportError: # pragma: no cover | |||||||
|  |  | ||||||
| # Valid subcommands.  Defined in separate files just to break | # Valid subcommands.  Defined in separate files just to break | ||||||
| # things up -- they're still called with Cmdline as self. | # things up -- they're still called with Cmdline as self. | ||||||
| subcommands = [ "help", "info", "create", "list", "metadata", | subcommands = [ "help", "info", "create", "rename", "list", "intervals", | ||||||
|                 "insert", "extract", "remove", "destroy", |                 "metadata", "insert", "extract", "remove", "destroy" ] | ||||||
|                 "intervals", "rename" ] |  | ||||||
|  |  | ||||||
| # Import the subcommand modules | # Import the subcommand modules | ||||||
| subcmd_mods = {} | subcmd_mods = {} | ||||||
| @@ -29,6 +28,14 @@ for cmd in subcommands: | |||||||
|     subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ]) |     subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ]) | ||||||
|  |  | ||||||
| class JimArgumentParser(argparse.ArgumentParser): | class JimArgumentParser(argparse.ArgumentParser): | ||||||
|  |     def parse_args(self, args=None, namespace=None): | ||||||
|  |         # Look for --version anywhere and change it to just "nilmtool | ||||||
|  |         # --version".  This makes "nilmtool cmd --version" work, which | ||||||
|  |         # is needed by help2man. | ||||||
|  |         if "--version" in (args or sys.argv[1:]): | ||||||
|  |             args = [ "--version" ] | ||||||
|  |         return argparse.ArgumentParser.parse_args(self, args, namespace) | ||||||
|  |  | ||||||
|     def error(self, message): |     def error(self, message): | ||||||
|         self.print_usage(sys.stderr) |         self.print_usage(sys.stderr) | ||||||
|         self.exit(2, sprintf("error: %s\n", message)) |         self.exit(2, sprintf("error: %s\n", message)) | ||||||
| @@ -114,7 +121,7 @@ class Cmdline(object): | |||||||
|         group = self.parser.add_argument_group("General options") |         group = self.parser.add_argument_group("General options") | ||||||
|         group.add_argument("-h", "--help", action='help', |         group.add_argument("-h", "--help", action='help', | ||||||
|                            help='show this help message and exit') |                            help='show this help message and exit') | ||||||
|         group.add_argument("-V", "--version", action="version", |         group.add_argument("-v", "--version", action="version", | ||||||
|                            version = nilmdb.__version__) |                            version = nilmdb.__version__) | ||||||
|  |  | ||||||
|         group = self.parser.add_argument_group("Server") |         group = self.parser.add_argument_group("Server") | ||||||
|   | |||||||
| @@ -1,5 +1,6 @@ | |||||||
| from nilmdb.utils.printf import * | from nilmdb.utils.printf import * | ||||||
| import nilmdb.utils.time | import nilmdb.utils.time | ||||||
|  | from nilmdb.utils.interval import Interval | ||||||
|  |  | ||||||
| import fnmatch | import fnmatch | ||||||
| import argparse | import argparse | ||||||
| @@ -42,6 +43,8 @@ def setup(self, sub): | |||||||
|     group = cmd.add_argument_group("Misc options") |     group = cmd.add_argument_group("Misc options") | ||||||
|     group.add_argument("-T", "--timestamp-raw", action="store_true", |     group.add_argument("-T", "--timestamp-raw", action="store_true", | ||||||
|                        help="Show raw timestamps when printing times") |                        help="Show raw timestamps when printing times") | ||||||
|  |     group.add_argument("-o", "--optimize", action="store_true", | ||||||
|  |                        help="Optimize (merge adjacent) intervals") | ||||||
|  |  | ||||||
|     return cmd |     return cmd | ||||||
|  |  | ||||||
| @@ -58,9 +61,16 @@ def cmd_intervals(self): | |||||||
|         time_string = nilmdb.utils.time.timestamp_to_human |         time_string = nilmdb.utils.time.timestamp_to_human | ||||||
|  |  | ||||||
|     try: |     try: | ||||||
|            for (start, end) in self.client.stream_intervals( |         intervals = ( Interval(start, end) for (start, end) in | ||||||
|                self.args.path, self.args.start, self.args.end, self.args.diff): |                       self.client.stream_intervals(self.args.path, | ||||||
|                printf("[ %s -> %s ]\n", time_string(start), time_string(end)) |                                                    self.args.start, | ||||||
|  |                                                    self.args.end, | ||||||
|  |                                                    self.args.diff) ) | ||||||
|  |         if self.args.optimize: | ||||||
|  |             intervals = nilmdb.utils.interval.optimize(intervals) | ||||||
|  |         for i in intervals: | ||||||
|  |             printf("[ %s -> %s ]\n", time_string(i.start), time_string(i.end)) | ||||||
|  |  | ||||||
|     except nilmdb.client.ClientError as e: |     except nilmdb.client.ClientError as e: | ||||||
|         self.die("error listing intervals: %s", str(e)) |         self.die("error listing intervals: %s", str(e)) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -45,6 +45,8 @@ def setup(self, sub): | |||||||
|                        help="Show raw timestamps when printing times") |                        help="Show raw timestamps when printing times") | ||||||
|     group.add_argument("-l", "--layout", action="store_true", |     group.add_argument("-l", "--layout", action="store_true", | ||||||
|                        help="Show layout type next to path name") |                        help="Show layout type next to path name") | ||||||
|  |     group.add_argument("-n", "--no-decim", action="store_true", | ||||||
|  |                        help="Skip paths containing \"~decim-\"") | ||||||
|  |  | ||||||
|     return cmd |     return cmd | ||||||
|  |  | ||||||
| @@ -71,6 +73,8 @@ def cmd_list(self): | |||||||
|             (path, layout, int_min, int_max, rows, time) = stream[:6] |             (path, layout, int_min, int_max, rows, time) = stream[:6] | ||||||
|             if not fnmatch.fnmatch(path, argpath): |             if not fnmatch.fnmatch(path, argpath): | ||||||
|                 continue |                 continue | ||||||
|  |             if self.args.no_decim and "~decim-" in path: | ||||||
|  |                 continue | ||||||
|  |  | ||||||
|             if self.args.layout: |             if self.args.layout: | ||||||
|                 printf("%s %s\n", path, layout) |                 printf("%s %s\n", path, layout) | ||||||
|   | |||||||
							
								
								
									
										5
									
								
								nilmdb/fsck/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								nilmdb/fsck/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,5 @@ | |||||||
|  | """nilmdb.fsck""" | ||||||
|  |  | ||||||
|  | from __future__ import absolute_import | ||||||
|  |  | ||||||
|  | from nilmdb.fsck.fsck import Fsck | ||||||
							
								
								
									
										460
									
								
								nilmdb/fsck/fsck.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										460
									
								
								nilmdb/fsck/fsck.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,460 @@ | |||||||
|  | # -*- coding: utf-8 -*- | ||||||
|  |  | ||||||
|  | """Check database consistency, with some ability to fix problems. | ||||||
|  | This should be able to fix cases where a database gets corrupted due | ||||||
|  | to unexpected system shutdown, and detect other cases that may cause | ||||||
|  | NilmDB to return errors when trying to manipulate the database.""" | ||||||
|  |  | ||||||
|  | import nilmdb.utils | ||||||
|  | import nilmdb.server | ||||||
|  | import nilmdb.client.numpyclient | ||||||
|  | from nilmdb.utils.interval import IntervalError | ||||||
|  | from nilmdb.server.interval import Interval, IntervalSet | ||||||
|  | from nilmdb.utils.printf import * | ||||||
|  | from nilmdb.utils.time import timestamp_to_string | ||||||
|  |  | ||||||
|  | from collections import defaultdict | ||||||
|  | import sqlite3 | ||||||
|  | import os | ||||||
|  | import sys | ||||||
|  | import progressbar | ||||||
|  | import re | ||||||
|  | import time | ||||||
|  | import shutil | ||||||
|  | import cPickle as pickle | ||||||
|  | import numpy | ||||||
|  |  | ||||||
|  | class FsckError(Exception): | ||||||
|  |     def __init__(self, msg = "", *args): | ||||||
|  |         if args: | ||||||
|  |             msg = sprintf(msg, *args) | ||||||
|  |         Exception.__init__(self, msg) | ||||||
|  | class FixableFsckError(FsckError): | ||||||
|  |     def __init__(self, msg = "", *args): | ||||||
|  |         if args: | ||||||
|  |             msg = sprintf(msg, *args) | ||||||
|  |         FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg) | ||||||
|  | class RetryFsck(FsckError): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  | def log(format, *args): | ||||||
|  |     printf(format, *args) | ||||||
|  |  | ||||||
|  | def err(format, *args): | ||||||
|  |     fprintf(sys.stderr, format, *args) | ||||||
|  |  | ||||||
|  | # Decorator that retries a function if it returns a specific value | ||||||
|  | def retry_if_raised(exc, message = None, max_retries = 100): | ||||||
|  |     def f1(func): | ||||||
|  |         def f2(*args, **kwargs): | ||||||
|  |             for n in range(max_retries): | ||||||
|  |                 try: | ||||||
|  |                     return func(*args, **kwargs) | ||||||
|  |                 except exc as e: | ||||||
|  |                     if message: | ||||||
|  |                         log("%s\n\n", message) | ||||||
|  |             raise Exception("Max number of retries (%d) exceeded; giving up") | ||||||
|  |         return f2 | ||||||
|  |     return f1 | ||||||
|  |  | ||||||
|  | class Progress(object): | ||||||
|  |     def __init__(self, maxval): | ||||||
|  |         if maxval == 0: | ||||||
|  |             maxval = 1 | ||||||
|  |         self.bar = progressbar.ProgressBar( | ||||||
|  |             maxval = maxval, | ||||||
|  |             widgets = [ progressbar.Percentage(), ' ', | ||||||
|  |                         progressbar.Bar(), ' ', | ||||||
|  |                         progressbar.ETA() ]) | ||||||
|  |         if self.bar.term_width == 0: | ||||||
|  |             self.bar.term_width = 75 | ||||||
|  |     def __enter__(self): | ||||||
|  |         self.bar.start() | ||||||
|  |         self.last_update = 0 | ||||||
|  |         return self | ||||||
|  |     def __exit__(self, exc_type, exc_value, traceback): | ||||||
|  |         if exc_type is None: | ||||||
|  |             self.bar.finish() | ||||||
|  |         else: | ||||||
|  |             printf("\n") | ||||||
|  |     def update(self, val): | ||||||
|  |         self.bar.update(val) | ||||||
|  |  | ||||||
|  | class Fsck(object): | ||||||
|  |  | ||||||
|  |     def __init__(self, path, fix = False): | ||||||
|  |         self.basepath = path | ||||||
|  |         self.sqlpath = os.path.join(path, "data.sql") | ||||||
|  |         self.bulkpath = os.path.join(path, "data") | ||||||
|  |         self.bulklock = os.path.join(path, "data.lock") | ||||||
|  |         self.fix = fix | ||||||
|  |  | ||||||
|  |     ### Main checks | ||||||
|  |  | ||||||
|  |     @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck") | ||||||
|  |     def check(self, skip_data = False): | ||||||
|  |         self.bulk = None | ||||||
|  |         self.sql = None | ||||||
|  |         try: | ||||||
|  |             self.check_paths() | ||||||
|  |             self.check_sql() | ||||||
|  |             self.check_streams() | ||||||
|  |             self.check_intervals() | ||||||
|  |             if skip_data: | ||||||
|  |                 log("skipped data check\n") | ||||||
|  |             else: | ||||||
|  |                 self.check_data() | ||||||
|  |         finally: | ||||||
|  |             if self.bulk: | ||||||
|  |                 self.bulk.close() | ||||||
|  |             if self.sql: | ||||||
|  |                 self.sql.commit() | ||||||
|  |                 self.sql.close() | ||||||
|  |         log("ok\n") | ||||||
|  |  | ||||||
|  |     ### Check basic path structure | ||||||
|  |  | ||||||
|  |     def check_paths(self): | ||||||
|  |         log("checking paths\n") | ||||||
|  |         if self.bulk: | ||||||
|  |             self.bulk.close() | ||||||
|  |         if not os.path.isfile(self.sqlpath): | ||||||
|  |             raise FsckError("SQL database missing (%s)", self.sqlpath) | ||||||
|  |         if not os.path.isdir(self.bulkpath): | ||||||
|  |             raise FsckError("Bulk data directory missing (%s)", self.bulkpath) | ||||||
|  |         with open(self.bulklock, "w") as lockfile: | ||||||
|  |             if not nilmdb.utils.lock.exclusive_lock(lockfile): | ||||||
|  |                 raise FsckError('Database already locked by another process\n' | ||||||
|  |                                 'Make sure all other processes that might be ' | ||||||
|  |                                 'using the database are stopped.\n' | ||||||
|  |                                 'Restarting apache will cause it to unlock ' | ||||||
|  |                                 'the db until a request is received.') | ||||||
|  |             # unlocked immediately | ||||||
|  |         self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath) | ||||||
|  |  | ||||||
|  |     ### Check SQL database health | ||||||
|  |  | ||||||
|  |     def check_sql(self): | ||||||
|  |         log("checking sqlite database\n") | ||||||
|  |  | ||||||
|  |         self.sql = sqlite3.connect(self.sqlpath) | ||||||
|  |         with self.sql: | ||||||
|  |             cur = self.sql.cursor() | ||||||
|  |             ver = cur.execute("PRAGMA user_version").fetchone()[0] | ||||||
|  |             good = max(nilmdb.server.nilmdb._sql_schema_updates.keys()) | ||||||
|  |             if ver != good: | ||||||
|  |                 raise FsckError("database version %d too old, should be %d", | ||||||
|  |                                 ver, good) | ||||||
|  |             self.stream_path = {} | ||||||
|  |             self.stream_layout = {} | ||||||
|  |             log("  loading paths\n") | ||||||
|  |             result = cur.execute("SELECT id, path, layout FROM streams") | ||||||
|  |             for r in result: | ||||||
|  |                 if r[0] in self.stream_path: | ||||||
|  |                     raise FsckError("duplicated ID %d in stream IDs", r[0]) | ||||||
|  |                 self.stream_path[r[0]] = r[1] | ||||||
|  |                 self.stream_layout[r[0]] = r[2] | ||||||
|  |  | ||||||
|  |             log("  loading intervals\n") | ||||||
|  |             self.stream_interval = defaultdict(list) | ||||||
|  |             result = cur.execute("SELECT stream_id, start_time, end_time, " | ||||||
|  |                                  "start_pos, end_pos FROM ranges " | ||||||
|  |                                  "ORDER BY start_time") | ||||||
|  |             for r in result: | ||||||
|  |                 if r[0] not in self.stream_path: | ||||||
|  |                     raise FsckError("interval ID %d not in streams", k) | ||||||
|  |                 self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4])) | ||||||
|  |  | ||||||
|  |             log("  loading metadata\n") | ||||||
|  |             self.stream_meta = defaultdict(dict) | ||||||
|  |             result = cur.execute("SELECT stream_id, key, value FROM metadata") | ||||||
|  |             for r in result: | ||||||
|  |                 if r[0] not in self.stream_path: | ||||||
|  |                     raise FsckError("metadata ID %d not in streams", k) | ||||||
|  |                 if r[1] in self.stream_meta[r[0]]: | ||||||
|  |                     raise FsckError("duplicate metadata key '%s' for stream %d", | ||||||
|  |                                     r[1], r[0]) | ||||||
|  |                 self.stream_meta[r[0]][r[1]] = r[2] | ||||||
|  |  | ||||||
|  |     ### Check streams and basic interval overlap | ||||||
|  |  | ||||||
|  |     def check_streams(self): | ||||||
|  |         ids = self.stream_path.keys() | ||||||
|  |         log("checking %s streams\n", "{:,d}".format(len(ids))) | ||||||
|  |         with Progress(len(ids)) as pbar: | ||||||
|  |             for i, sid in enumerate(ids): | ||||||
|  |                 pbar.update(i) | ||||||
|  |                 path = self.stream_path[sid] | ||||||
|  |  | ||||||
|  |                 # unique path, valid layout | ||||||
|  |                 if self.stream_path.values().count(path) != 1: | ||||||
|  |                     raise FsckError("duplicated path %s", path) | ||||||
|  |                 layout = self.stream_layout[sid].split('_')[0] | ||||||
|  |                 if layout not in ('int8', 'int16', 'int32', 'int64', | ||||||
|  |                                   'uint8', 'uint16', 'uint32', 'uint64', | ||||||
|  |                                   'float32', 'float64'): | ||||||
|  |                     raise FsckError("bad layout %s for %s", layout, path) | ||||||
|  |                 count = int(self.stream_layout[sid].split('_')[1]) | ||||||
|  |                 if count < 1 or count > 1024: | ||||||
|  |                     raise FsckError("bad count %d for %s", count, path) | ||||||
|  |  | ||||||
|  |                 # must exist in bulkdata | ||||||
|  |                 bulk = self.bulkpath + path | ||||||
|  |                 if not os.path.isdir(bulk): | ||||||
|  |                     raise FsckError("%s: missing bulkdata dir", path) | ||||||
|  |                 if not nilmdb.server.bulkdata.Table.exists(bulk): | ||||||
|  |                     raise FsckError("%s: bad bulkdata table", path) | ||||||
|  |  | ||||||
|  |                 # intervals don't overlap.  Abuse IntervalSet to check | ||||||
|  |                 # for intervals in file positions, too. | ||||||
|  |                 timeiset = IntervalSet() | ||||||
|  |                 posiset = IntervalSet() | ||||||
|  |                 for (stime, etime, spos, epos) in self.stream_interval[sid]: | ||||||
|  |                     new = Interval(stime, etime) | ||||||
|  |                     try: | ||||||
|  |                         timeiset += new | ||||||
|  |                     except IntervalError: | ||||||
|  |                         raise FsckError("%s: overlap in intervals:\n" | ||||||
|  |                                         "set: %s\nnew: %s", | ||||||
|  |                                         path, str(timeiset), str(new)) | ||||||
|  |                     if spos != epos: | ||||||
|  |                         new = Interval(spos, epos) | ||||||
|  |                         try: | ||||||
|  |                             posiset += new | ||||||
|  |                         except IntervalError: | ||||||
|  |                             raise FsckError("%s: overlap in file offsets:\n" | ||||||
|  |                                             "set: %s\nnew: %s", | ||||||
|  |                                             path, str(posiset), str(new)) | ||||||
|  |  | ||||||
|  |                 # check bulkdata | ||||||
|  |                 self.check_bulkdata(sid, path, bulk) | ||||||
|  |  | ||||||
|  |                 # Check that we can open bulkdata | ||||||
|  |                 try: | ||||||
|  |                     tab = None | ||||||
|  |                     try: | ||||||
|  |                         tab = nilmdb.server.bulkdata.Table(bulk) | ||||||
|  |                     except Exception as e: | ||||||
|  |                         raise FsckError("%s: can't open bulkdata: %s", | ||||||
|  |                                         path, str(e)) | ||||||
|  |                 finally: | ||||||
|  |                     if tab: | ||||||
|  |                         tab.close() | ||||||
|  |  | ||||||
|  |     ### Check that bulkdata is good enough to be opened | ||||||
|  |  | ||||||
|  |     @retry_if_raised(RetryFsck) | ||||||
|  |     def check_bulkdata(self, sid, path, bulk): | ||||||
|  |         with open(os.path.join(bulk, "_format"), "rb") as f: | ||||||
|  |             fmt = pickle.load(f) | ||||||
|  |         if fmt["version"] != 3: | ||||||
|  |             raise FsckError("%s: bad or unsupported bulkdata version %d", | ||||||
|  |                             path, fmt["version"]) | ||||||
|  |         row_per_file = int(fmt["rows_per_file"]) | ||||||
|  |         files_per_dir = int(fmt["files_per_dir"]) | ||||||
|  |         layout = fmt["layout"] | ||||||
|  |         if layout != self.stream_layout[sid]: | ||||||
|  |             raise FsckError("%s: layout mismatch %s != %s", path, | ||||||
|  |                             layout, self.stream_layout[sid]) | ||||||
|  |  | ||||||
|  |         # Every file should have a size that's the multiple of the row size | ||||||
|  |         rkt = nilmdb.server.rocket.Rocket(layout, None) | ||||||
|  |         row_size = rkt.binary_size | ||||||
|  |         rkt.close() | ||||||
|  |  | ||||||
|  |         # Find all directories | ||||||
|  |         regex = re.compile("^[0-9a-f]{4,}$") | ||||||
|  |         subdirs = sorted(filter(regex.search, os.listdir(bulk)), | ||||||
|  |                          key = lambda x: int(x, 16), reverse = True) | ||||||
|  |         for subdir in subdirs: | ||||||
|  |             # Find all files in that dir | ||||||
|  |             subpath = os.path.join(bulk, subdir) | ||||||
|  |             files = filter(regex.search, os.listdir(subpath)) | ||||||
|  |             if not files: | ||||||
|  |                 self.fix_empty_subdir(subpath) | ||||||
|  |                 raise RetryFsck | ||||||
|  |             # Verify that their size is a multiple of the row size | ||||||
|  |             for filename in files: | ||||||
|  |                 filepath = os.path.join(subpath, filename) | ||||||
|  |                 offset = os.path.getsize(filepath) | ||||||
|  |                 if offset % row_size: | ||||||
|  |                     self.fix_bad_filesize(path, filepath, offset, row_size) | ||||||
|  |  | ||||||
|  |     def fix_empty_subdir(self, subpath): | ||||||
|  |         msg = sprintf("bulkdata path %s is missing data files", subpath) | ||||||
|  |         if not self.fix: | ||||||
|  |             raise FixableFsckError(msg) | ||||||
|  |         # Try to fix it by just deleting whatever is present, | ||||||
|  |         # as long as it's only ".removed" files. | ||||||
|  |         err("\n%s\n", msg) | ||||||
|  |         for fn in os.listdir(subpath): | ||||||
|  |             if not fn.endswith(".removed"): | ||||||
|  |                 raise FsckError("can't fix automatically: please manually " | ||||||
|  |                                 "remove the file %s and try again", | ||||||
|  |                                 os.path.join(subpath, fn)) | ||||||
|  |         # Remove the whole thing | ||||||
|  |         err("Removing empty subpath\n") | ||||||
|  |         shutil.rmtree(subpath) | ||||||
|  |         raise RetryFsck | ||||||
|  |  | ||||||
|  |     def fix_bad_filesize(self, path, filepath, offset, row_size): | ||||||
|  |         extra = offset % row_size | ||||||
|  |         msg = sprintf("%s: size of file %s (%d) is not a multiple" + | ||||||
|  |                       " of row size (%d): %d extra bytes present", | ||||||
|  |                       path, filepath, offset, row_size, extra) | ||||||
|  |         if not self.fix: | ||||||
|  |             raise FixableFsckError(msg) | ||||||
|  |         # Try to fix it by just truncating the file | ||||||
|  |         err("\n%s\n", msg) | ||||||
|  |         newsize = offset - extra | ||||||
|  |         err("Truncating file to %d bytes and retrying\n", newsize) | ||||||
|  |         with open(filepath, "r+b") as f: | ||||||
|  |             f.truncate(newsize) | ||||||
|  |             raise RetryFsck | ||||||
|  |  | ||||||
|  |     ### Check interval endpoints | ||||||
|  |  | ||||||
|  |     def check_intervals(self): | ||||||
|  |         total_ints = sum(len(x) for x in self.stream_interval.values()) | ||||||
|  |         log("checking %s intervals\n", "{:,d}".format(total_ints)) | ||||||
|  |         done = 0 | ||||||
|  |         with Progress(total_ints) as pbar: | ||||||
|  |             for sid in self.stream_interval: | ||||||
|  |                 try: | ||||||
|  |                     bulk = self.bulkpath + self.stream_path[sid] | ||||||
|  |                     tab = nilmdb.server.bulkdata.Table(bulk) | ||||||
|  |                     def update(x): | ||||||
|  |                         pbar.update(done + x) | ||||||
|  |                     ints = self.stream_interval[sid] | ||||||
|  |                     done += self.check_table_intervals(sid, ints, tab, update) | ||||||
|  |                 finally: | ||||||
|  |                     tab.close() | ||||||
|  |  | ||||||
|  |     def check_table_intervals(self, sid, ints, tab, update): | ||||||
|  |         # look in the table to make sure we can pick out the interval's | ||||||
|  |         # endpoints | ||||||
|  |         path = self.stream_path[sid] | ||||||
|  |         tab.file_open.cache_remove_all() | ||||||
|  |         for (i, intv) in enumerate(ints): | ||||||
|  |             update(i) | ||||||
|  |             (stime, etime, spos, epos) = intv | ||||||
|  |             if spos == epos and spos >= 0 and spos <= tab.nrows: | ||||||
|  |                 continue | ||||||
|  |             try: | ||||||
|  |                 srow = tab[spos] | ||||||
|  |                 erow = tab[epos-1] | ||||||
|  |             except Exception as e: | ||||||
|  |                 self.fix_bad_interval(sid, intv, tab, str(e)) | ||||||
|  |                 raise RetryFsck | ||||||
|  |         return len(ints) | ||||||
|  |  | ||||||
|  |     def fix_bad_interval(self, sid, intv, tab, msg): | ||||||
|  |         path = self.stream_path[sid] | ||||||
|  |         msg = sprintf("%s: interval %s error accessing rows: %s", | ||||||
|  |                       path, str(intv), str(msg)) | ||||||
|  |         if not self.fix: | ||||||
|  |             raise FixableFsckError(msg) | ||||||
|  |         err("\n%s\n", msg) | ||||||
|  |  | ||||||
|  |         (stime, etime, spos, epos) = intv | ||||||
|  |         # If it's just that the end pos is more than the number of rows | ||||||
|  |         # in the table, lower end pos and truncate interval time too. | ||||||
|  |         if spos < tab.nrows and epos >= tab.nrows: | ||||||
|  |             err("end position is past endrows, but it can be truncated\n") | ||||||
|  |             err("old end: time %d, pos %d\n", etime, epos) | ||||||
|  |             new_epos = tab.nrows | ||||||
|  |             new_etime = tab[new_epos-1] + 1 | ||||||
|  |             err("new end: time %d, pos %d\n", new_etime, new_epos) | ||||||
|  |             if stime < new_etime: | ||||||
|  |                 # Change it in SQL | ||||||
|  |                 with self.sql: | ||||||
|  |                     cur = self.sql.cursor() | ||||||
|  |                     cur.execute("UPDATE ranges SET end_time=?, end_pos=? " | ||||||
|  |                                 "WHERE stream_id=? AND start_time=? AND " | ||||||
|  |                                 "end_time=? AND start_pos=? AND end_pos=?", | ||||||
|  |                                 (new_etime, new_epos, sid, stime, etime, | ||||||
|  |                                  spos, epos)) | ||||||
|  |                     if cur.rowcount != 1: | ||||||
|  |                         raise FsckError("failed to fix SQL database") | ||||||
|  |                 raise RetryFsck | ||||||
|  |             err("actually it can't be truncated; times are bad too") | ||||||
|  |  | ||||||
|  |         # Otherwise, the only hope is to delete the interval entirely. | ||||||
|  |         err("*** Deleting the entire interval from SQL.\n") | ||||||
|  |         err("This may leave stale data on disk.  To fix that, copy all\n") | ||||||
|  |         err("data from this stream to a new stream, then remove all data\n") | ||||||
|  |         err("from and destroy %s.\n", path) | ||||||
|  |         with self.sql: | ||||||
|  |             cur = self.sql.cursor() | ||||||
|  |             cur.execute("DELETE FROM ranges WHERE " | ||||||
|  |                         "stream_id=? AND start_time=? AND " | ||||||
|  |                         "end_time=? AND start_pos=? AND end_pos=?", | ||||||
|  |                         (sid, stime, etime, spos, epos)) | ||||||
|  |             if cur.rowcount != 1: | ||||||
|  |                 raise FsckError("failed to remove interval") | ||||||
|  |         raise RetryFsck | ||||||
|  |  | ||||||
|  |     ### Check data in each interval | ||||||
|  |  | ||||||
|  |     def check_data(self): | ||||||
|  |         total_rows = sum(sum((y[3] - y[2]) for y in x) | ||||||
|  |                          for x in self.stream_interval.values()) | ||||||
|  |         log("checking %s rows of data\n", "{:,d}".format(total_rows)) | ||||||
|  |         done = 0 | ||||||
|  |         with Progress(total_rows) as pbar: | ||||||
|  |             for sid in self.stream_interval: | ||||||
|  |                 try: | ||||||
|  |                     bulk = self.bulkpath + self.stream_path[sid] | ||||||
|  |                     tab = nilmdb.server.bulkdata.Table(bulk) | ||||||
|  |                     def update(x): | ||||||
|  |                         pbar.update(done + x) | ||||||
|  |                     ints = self.stream_interval[sid] | ||||||
|  |                     done += self.check_table_data(sid, ints, tab, update) | ||||||
|  |                 finally: | ||||||
|  |                     tab.close() | ||||||
|  |  | ||||||
|  |     def check_table_data(self, sid, ints, tab, update): | ||||||
|  |         # Pull out all of the interval's data and verify that it's | ||||||
|  |         # monotonic. | ||||||
|  |         maxrows = 100000 | ||||||
|  |         path = self.stream_path[sid] | ||||||
|  |         layout = self.stream_layout[sid] | ||||||
|  |         dtype = nilmdb.client.numpyclient.layout_to_dtype(layout) | ||||||
|  |         tab.file_open.cache_remove_all() | ||||||
|  |         done = 0 | ||||||
|  |         for intv in ints: | ||||||
|  |             last_ts = None | ||||||
|  |             (stime, etime, spos, epos) = intv | ||||||
|  |             if spos == epos: | ||||||
|  |                 continue | ||||||
|  |             for start in xrange(*slice(spos, epos, maxrows).indices(epos)): | ||||||
|  |                 stop = min(start + maxrows, epos) | ||||||
|  |                 count = stop - start | ||||||
|  |                 # Get raw data, convert to NumPy arary | ||||||
|  |                 try: | ||||||
|  |                     raw = tab.get_data(start, stop, binary = True) | ||||||
|  |                     data = numpy.fromstring(raw, dtype) | ||||||
|  |                 except Exception as e: | ||||||
|  |                     raise FsckError("%s: failed to grab rows %d through %d: %s", | ||||||
|  |                                     path, start, stop, repr(e)) | ||||||
|  |  | ||||||
|  |                 # Verify that timestamps are monotonic | ||||||
|  |                 if (numpy.diff(data['timestamp']) <= 0).any(): | ||||||
|  |                     raise FsckError("%s: non-monotonic timestamp(s) in rows " | ||||||
|  |                                     "%d through %d", path, start, stop) | ||||||
|  |                 first_ts = data['timestamp'][0] | ||||||
|  |                 if last_ts is not None and first_ts <= last_ts: | ||||||
|  |                     raise FsckError("%s: first interval timestamp %d is not " | ||||||
|  |                                     "greater than the previous last interval " | ||||||
|  |                                     "timestamp %d, at row %d", | ||||||
|  |                                     path, first_ts, last_ts, start) | ||||||
|  |                 last_ts = data['timestamp'][-1] | ||||||
|  |  | ||||||
|  |                 # These are probably fixable, by removing the offending | ||||||
|  |                 # intervals.  But I'm not going to bother implementing | ||||||
|  |                 # that yet. | ||||||
|  |  | ||||||
|  |                 # Done | ||||||
|  |                 done += count | ||||||
|  |                 update(done) | ||||||
|  |         return done | ||||||
							
								
								
									
										26
									
								
								nilmdb/scripts/nilmdb_fsck.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										26
									
								
								nilmdb/scripts/nilmdb_fsck.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,26 @@ | |||||||
|  | #!/usr/bin/python | ||||||
|  |  | ||||||
|  | import nilmdb.fsck | ||||||
|  | import argparse | ||||||
|  | import os | ||||||
|  | import sys | ||||||
|  |  | ||||||
|  | def main(): | ||||||
|  |     """Main entry point for the 'nilmdb-fsck' command line script""" | ||||||
|  |  | ||||||
|  |     parser = argparse.ArgumentParser( | ||||||
|  |         description = 'Check database consistency', | ||||||
|  |         formatter_class = argparse.ArgumentDefaultsHelpFormatter, | ||||||
|  |         version = nilmdb.__version__) | ||||||
|  |     parser.add_argument("-f", "--fix", action="store_true", | ||||||
|  |                         default=False, help = 'Fix errors when possible ' | ||||||
|  |                         '(which may involve removing data)') | ||||||
|  |     parser.add_argument("-n", "--no-data", action="store_true", | ||||||
|  |                         default=False, help = 'Skip the slow full-data check') | ||||||
|  |     parser.add_argument('database', help = 'Database directory') | ||||||
|  |     args = parser.parse_args() | ||||||
|  |  | ||||||
|  |     nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data) | ||||||
|  |  | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     main() | ||||||
| @@ -10,10 +10,8 @@ def main(): | |||||||
|  |  | ||||||
|     parser = argparse.ArgumentParser( |     parser = argparse.ArgumentParser( | ||||||
|         description = 'Run the NilmDB server', |         description = 'Run the NilmDB server', | ||||||
|         formatter_class = argparse.ArgumentDefaultsHelpFormatter) |         formatter_class = argparse.ArgumentDefaultsHelpFormatter, | ||||||
|  |         version = nilmdb.__version__) | ||||||
|     parser.add_argument("-V", "--version", action="version", |  | ||||||
|                         version = nilmdb.__version__) |  | ||||||
|  |  | ||||||
|     group = parser.add_argument_group("Standard options") |     group = parser.add_argument_group("Standard options") | ||||||
|     group.add_argument('-a', '--address', |     group.add_argument('-a', '--address', | ||||||
|   | |||||||
| @@ -194,6 +194,9 @@ class BulkData(object): | |||||||
|         if oldospath == newospath: |         if oldospath == newospath: | ||||||
|             raise ValueError("old and new paths are the same") |             raise ValueError("old and new paths are the same") | ||||||
|  |  | ||||||
|  |         # Remove Table object at old path from cache | ||||||
|  |         self.getnode.cache_remove(self, oldunicodepath) | ||||||
|  |  | ||||||
|         # Move the table to a temporary location |         # Move the table to a temporary location | ||||||
|         tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) |         tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) | ||||||
|         tmppath = os.path.join(tmpdir, "table") |         tmppath = os.path.join(tmpdir, "table") | ||||||
| @@ -330,7 +333,8 @@ class Table(object): | |||||||
|  |  | ||||||
|         # Find the last directory.  We sort and loop through all of them, |         # Find the last directory.  We sort and loop through all of them, | ||||||
|         # starting with the numerically greatest, because the dirs could be |         # starting with the numerically greatest, because the dirs could be | ||||||
|         # empty if something was deleted. |         # empty if something was deleted but the directory was unexpectedly | ||||||
|  |         # not deleted. | ||||||
|         subdirs = sorted(filter(regex.search, os.listdir(self.root)), |         subdirs = sorted(filter(regex.search, os.listdir(self.root)), | ||||||
|                          key = lambda x: int(x, 16), reverse = True) |                          key = lambda x: int(x, 16), reverse = True) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -83,8 +83,11 @@ _sql_schema_updates = { | |||||||
| class NilmDB(object): | class NilmDB(object): | ||||||
|     verbose = 0 |     verbose = 0 | ||||||
|  |  | ||||||
|     def __init__(self, basepath, max_results=None, |     def __init__(self, basepath, | ||||||
|                  max_removals=None, bulkdata_args=None): |                  max_results=None, | ||||||
|  |                  max_removals=None, | ||||||
|  |                  max_int_removals=None, | ||||||
|  |                  bulkdata_args=None): | ||||||
|         """Initialize NilmDB at the given basepath. |         """Initialize NilmDB at the given basepath. | ||||||
|         Other arguments are for debugging / testing: |         Other arguments are for debugging / testing: | ||||||
|  |  | ||||||
| @@ -92,7 +95,10 @@ class NilmDB(object): | |||||||
|         stream_intervals or stream_extract response. |         stream_intervals or stream_extract response. | ||||||
|  |  | ||||||
|         'max_removals' is the max rows to delete at once |         'max_removals' is the max rows to delete at once | ||||||
|         in stream_move. |         in stream_remove. | ||||||
|  |  | ||||||
|  |         'max_int_removals' is the max intervals to delete | ||||||
|  |         at once in stream_remove. | ||||||
|  |  | ||||||
|         'bulkdata_args' is kwargs for the bulkdata module. |         'bulkdata_args' is kwargs for the bulkdata module. | ||||||
|         """ |         """ | ||||||
| @@ -134,6 +140,9 @@ class NilmDB(object): | |||||||
|         # Remove up to this many rows per call to stream_remove. |         # Remove up to this many rows per call to stream_remove. | ||||||
|         self.max_removals = max_removals or 1048576 |         self.max_removals = max_removals or 1048576 | ||||||
|  |  | ||||||
|  |         # Remove up to this many intervals per call to stream_remove. | ||||||
|  |         self.max_int_removals = max_int_removals or 4096 | ||||||
|  |  | ||||||
|     def get_basepath(self): |     def get_basepath(self): | ||||||
|         return self.basepath |         return self.basepath | ||||||
|  |  | ||||||
| @@ -643,13 +652,22 @@ class NilmDB(object): | |||||||
|         to_remove = Interval(start, end) |         to_remove = Interval(start, end) | ||||||
|         removed = 0 |         removed = 0 | ||||||
|         remaining = self.max_removals |         remaining = self.max_removals | ||||||
|  |         int_remaining = self.max_int_removals | ||||||
|         restart = None |         restart = None | ||||||
|  |  | ||||||
|         # Can't remove intervals from within the iterator, so we need to |         # Can't remove intervals from within the iterator, so we need to | ||||||
|         # remember what's currently in the intersection now. |         # remember what's currently in the intersection now. | ||||||
|         all_candidates = list(intervals.intersection(to_remove, orig = True)) |         all_candidates = list(intervals.intersection(to_remove, orig = True)) | ||||||
|  |  | ||||||
|  |         remove_start = None | ||||||
|  |         remove_end = None | ||||||
|  |  | ||||||
|         for (dbint, orig) in all_candidates: |         for (dbint, orig) in all_candidates: | ||||||
|  |             # Stop if we've hit the max number of interval removals | ||||||
|  |             if int_remaining <= 0: | ||||||
|  |                 restart = dbint.start | ||||||
|  |                 break | ||||||
|  |  | ||||||
|             # Find row start and end |             # Find row start and end | ||||||
|             row_start = self._find_start(table, dbint) |             row_start = self._find_start(table, dbint) | ||||||
|             row_end = self._find_end(table, dbint) |             row_end = self._find_end(table, dbint) | ||||||
| @@ -670,14 +688,29 @@ class NilmDB(object): | |||||||
|             # Remove interval from the database |             # Remove interval from the database | ||||||
|             self._remove_interval(stream_id, orig, dbint) |             self._remove_interval(stream_id, orig, dbint) | ||||||
|  |  | ||||||
|             # Remove data from the underlying table storage |             # Remove data from the underlying table storage, | ||||||
|             table.remove(row_start, row_end) |             # coalescing adjacent removals to reduce the number of calls | ||||||
|  |             # to table.remove. | ||||||
|  |             if remove_end == row_start: | ||||||
|  |                 # Extend our coalesced region | ||||||
|  |                 remove_end = row_end | ||||||
|  |             else: | ||||||
|  |                 # Perform previous removal, then save this one | ||||||
|  |                 if remove_end is not None: | ||||||
|  |                     table.remove(remove_start, remove_end) | ||||||
|  |                 remove_start = row_start | ||||||
|  |                 remove_end = row_end | ||||||
|  |  | ||||||
|             # Count how many were removed |             # Count how many were removed | ||||||
|             removed += row_end - row_start |             removed += row_end - row_start | ||||||
|             remaining -= row_end - row_start |             remaining -= row_end - row_start | ||||||
|  |             int_remaining -= 1 | ||||||
|  |  | ||||||
|             if restart is not None: |             if restart is not None: | ||||||
|                 break |                 break | ||||||
|  |  | ||||||
|  |         # Perform any final coalesced removal | ||||||
|  |         if remove_end is not None: | ||||||
|  |             table.remove(remove_start, remove_end) | ||||||
|  |  | ||||||
|         return (removed, restart) |         return (removed, restart) | ||||||
|   | |||||||
| @@ -74,8 +74,8 @@ class Root(NilmApp): | |||||||
|         dbsize = nilmdb.utils.du(path) |         dbsize = nilmdb.utils.du(path) | ||||||
|         return { "path": path, |         return { "path": path, | ||||||
|                  "size": dbsize, |                  "size": dbsize, | ||||||
|                  "other": usage.used - dbsize, |                  "other": max(usage.used - dbsize, 0), | ||||||
|                  "reserved": usage.total - usage.used - usage.free, |                  "reserved": max(usage.total - usage.used - usage.free, 0), | ||||||
|                  "free": usage.free } |                  "free": usage.free } | ||||||
|  |  | ||||||
| class Stream(NilmApp): | class Stream(NilmApp): | ||||||
| @@ -84,10 +84,18 @@ class Stream(NilmApp): | |||||||
|     # Helpers |     # Helpers | ||||||
|     def _get_times(self, start_param, end_param): |     def _get_times(self, start_param, end_param): | ||||||
|         (start, end) = (None, None) |         (start, end) = (None, None) | ||||||
|         if start_param is not None: |         try: | ||||||
|             start = string_to_timestamp(start_param) |             if start_param is not None: | ||||||
|         if end_param is not None: |                 start = string_to_timestamp(start_param) | ||||||
|             end = string_to_timestamp(end_param) |         except Exception: | ||||||
|  |             raise cherrypy.HTTPError("400 Bad Request", sprintf( | ||||||
|  |                 "invalid start (%s): must be a numeric timestamp", start_param)) | ||||||
|  |         try: | ||||||
|  |             if end_param is not None: | ||||||
|  |                 end = string_to_timestamp(end_param) | ||||||
|  |         except Exception: | ||||||
|  |             raise cherrypy.HTTPError("400 Bad Request", sprintf( | ||||||
|  |                 "invalid end (%s): must be a numeric timestamp", end_param)) | ||||||
|         if start is not None and end is not None: |         if start is not None and end is not None: | ||||||
|             if start >= end: |             if start >= end: | ||||||
|                 raise cherrypy.HTTPError( |                 raise cherrypy.HTTPError( | ||||||
|   | |||||||
| @@ -21,7 +21,8 @@ def du(path): | |||||||
|     errors that might occur if we encounter broken symlinks or |     errors that might occur if we encounter broken symlinks or | ||||||
|     files in the process of being removed.""" |     files in the process of being removed.""" | ||||||
|     try: |     try: | ||||||
|         size = os.path.getsize(path) |         st = os.stat(path) | ||||||
|  |         size = st.st_blocks * 512 | ||||||
|         if os.path.isdir(path): |         if os.path.isdir(path): | ||||||
|             for thisfile in os.listdir(path): |             for thisfile in os.listdir(path): | ||||||
|                 filepath = os.path.join(path, thisfile) |                 filepath = os.path.join(path, thisfile) | ||||||
|   | |||||||
| @@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False): | |||||||
|  |  | ||||||
|         @wrap_class_method |         @wrap_class_method | ||||||
|         def __del__(orig, self, *args, **kwargs): |         def __del__(orig, self, *args, **kwargs): | ||||||
|             if "_must_close" in self.__dict__: |             try: | ||||||
|                 fprintf(errorfile, "error: %s.close() wasn't called!\n", |                 if "_must_close" in self.__dict__: | ||||||
|                         self.__class__.__name__) |                     fprintf(errorfile, "error: %s.close() wasn't called!\n", | ||||||
|             return orig(self, *args, **kwargs) |                             self.__class__.__name__) | ||||||
|  |                 return orig(self, *args, **kwargs) | ||||||
|  |             except: # pragma: no cover | ||||||
|  |                 pass | ||||||
|  |  | ||||||
|         @wrap_class_method |         @wrap_class_method | ||||||
|         def close(orig, self, *args, **kwargs): |         def close(orig, self, *args, **kwargs): | ||||||
|   | |||||||
| @@ -117,7 +117,10 @@ def serializer_proxy(obj_or_type): | |||||||
|             return ret |             return ret | ||||||
|  |  | ||||||
|         def __del__(self): |         def __del__(self): | ||||||
|             self.__call_queue.put((None, None, None, None)) |             try: | ||||||
|             self.__thread.join() |                 self.__call_queue.put((None, None, None, None)) | ||||||
|  |                 self.__thread.join() | ||||||
|  |             except: # pragma: no cover | ||||||
|  |                 pass | ||||||
|  |  | ||||||
|     return SerializerObjectProxy(obj_or_type) |     return SerializerObjectProxy(obj_or_type) | ||||||
|   | |||||||
							
								
								
									
										5
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								setup.py
									
									
									
									
									
								
							| @@ -117,7 +117,8 @@ setup(name='nilmdb', | |||||||
|                            'python-dateutil', |                            'python-dateutil', | ||||||
|                            'pytz', |                            'pytz', | ||||||
|                            'psutil >= 0.3.0', |                            'psutil >= 0.3.0', | ||||||
|                            'requests >= 1.1.0, < 2.0.0', |                            'requests >= 1.1.0', | ||||||
|  |                            'progressbar >= 2.2', | ||||||
|                            ], |                            ], | ||||||
|       packages = [ 'nilmdb', |       packages = [ 'nilmdb', | ||||||
|                    'nilmdb.utils', |                    'nilmdb.utils', | ||||||
| @@ -126,11 +127,13 @@ setup(name='nilmdb', | |||||||
|                    'nilmdb.client', |                    'nilmdb.client', | ||||||
|                    'nilmdb.cmdline', |                    'nilmdb.cmdline', | ||||||
|                    'nilmdb.scripts', |                    'nilmdb.scripts', | ||||||
|  |                    'nilmdb.fsck', | ||||||
|                    ], |                    ], | ||||||
|       entry_points = { |       entry_points = { | ||||||
|           'console_scripts': [ |           'console_scripts': [ | ||||||
|               'nilmtool = nilmdb.scripts.nilmtool:main', |               'nilmtool = nilmdb.scripts.nilmtool:main', | ||||||
|               'nilmdb-server = nilmdb.scripts.nilmdb_server:main', |               'nilmdb-server = nilmdb.scripts.nilmdb_server:main', | ||||||
|  |               'nilmdb-fsck = nilmdb.scripts.nilmdb_fsck:main', | ||||||
|               ], |               ], | ||||||
|           }, |           }, | ||||||
|       ext_modules = ext_modules, |       ext_modules = ext_modules, | ||||||
|   | |||||||
| @@ -242,6 +242,19 @@ class TestClient(object): | |||||||
|         in_("400 Bad Request", str(e.exception)) |         in_("400 Bad Request", str(e.exception)) | ||||||
|         in_("start must precede end", str(e.exception)) |         in_("start must precede end", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Invalid times in HTTP request | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             client.http.put("stream/insert", "", { "path": "/newton/prep", | ||||||
|  |                                                    "start": "asdf", "end": 0 }) | ||||||
|  |         in_("400 Bad Request", str(e.exception)) | ||||||
|  |         in_("invalid start", str(e.exception)) | ||||||
|  |  | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             client.http.put("stream/insert", "", { "path": "/newton/prep", | ||||||
|  |                                                    "start": 0, "end": "asdf" }) | ||||||
|  |         in_("400 Bad Request", str(e.exception)) | ||||||
|  |         in_("invalid end", str(e.exception)) | ||||||
|  |  | ||||||
|         # Good content type |         # Good content type | ||||||
|         with assert_raises(ClientError) as e: |         with assert_raises(ClientError) as e: | ||||||
|             client.http.put("stream/insert", "", |             client.http.put("stream/insert", "", | ||||||
| @@ -677,40 +690,15 @@ class TestClient(object): | |||||||
|         client.close() |         client.close() | ||||||
|  |  | ||||||
|     def test_client_12_persistent(self): |     def test_client_12_persistent(self): | ||||||
|         # Check that connections are persistent when they should be. |         # Check that connections are NOT persistent.  Rather than trying | ||||||
|         # This is pretty hard to test; we have to poke deep into |         # to verify this at the TCP level, just make sure that the response | ||||||
|         # the Requests library. |         # contained a "Connection: close" header. | ||||||
|         with nilmdb.client.Client(url = testurl) as c: |         with nilmdb.client.Client(url = testurl) as c: | ||||||
|             def connections(): |  | ||||||
|                 try: |  | ||||||
|                     poolmanager = c.http._last_response.connection.poolmanager |  | ||||||
|                     pool = poolmanager.pools[('http','localhost',32180)] |  | ||||||
|                     return (pool.num_connections, pool.num_requests) |  | ||||||
|                 except Exception: |  | ||||||
|                     raise SkipTest("can't get connection info") |  | ||||||
|  |  | ||||||
|             # First request makes a connection |  | ||||||
|             c.stream_create("/persist/test", "uint16_1") |             c.stream_create("/persist/test", "uint16_1") | ||||||
|             eq_(connections(), (1, 1)) |             eq_(c.http._last_response.headers["Connection"], "close") | ||||||
|  |  | ||||||
|             # Non-generator |  | ||||||
|             c.stream_list("/persist/test") |  | ||||||
|             eq_(connections(), (1, 2)) |  | ||||||
|             c.stream_list("/persist/test") |  | ||||||
|             eq_(connections(), (1, 3)) |  | ||||||
|  |  | ||||||
|             # Generators |  | ||||||
|             for x in c.stream_intervals("/persist/test"): |  | ||||||
|                 pass |  | ||||||
|             eq_(connections(), (1, 4)) |  | ||||||
|             for x in c.stream_intervals("/persist/test"): |  | ||||||
|                 pass |  | ||||||
|             eq_(connections(), (1, 5)) |  | ||||||
|  |  | ||||||
|             # Clean up |  | ||||||
|             c.stream_remove("/persist/test") |  | ||||||
|             c.stream_destroy("/persist/test") |             c.stream_destroy("/persist/test") | ||||||
|             eq_(connections(), (1, 7)) |             eq_(c.http._last_response.headers["Connection"], "close") | ||||||
|  |  | ||||||
|     def test_client_13_timestamp_rounding(self): |     def test_client_13_timestamp_rounding(self): | ||||||
|         # Test potentially bad timestamps (due to floating point |         # Test potentially bad timestamps (due to floating point | ||||||
|   | |||||||
| @@ -21,13 +21,17 @@ from testutil.helpers import * | |||||||
|  |  | ||||||
| testdb = "tests/cmdline-testdb" | testdb = "tests/cmdline-testdb" | ||||||
|  |  | ||||||
| def server_start(max_results = None, max_removals = None, bulkdata_args = {}): | def server_start(max_results = None, | ||||||
|  |                  max_removals = None, | ||||||
|  |                  max_int_removals = None, | ||||||
|  |                  bulkdata_args = {}): | ||||||
|     global test_server, test_db |     global test_server, test_db | ||||||
|     # Start web app on a custom port |     # Start web app on a custom port | ||||||
|     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( |     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( | ||||||
|         testdb, |         testdb, | ||||||
|         max_results = max_results, |         max_results = max_results, | ||||||
|         max_removals = max_removals, |         max_removals = max_removals, | ||||||
|  |         max_int_removals = max_int_removals, | ||||||
|         bulkdata_args = bulkdata_args) |         bulkdata_args = bulkdata_args) | ||||||
|     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", |     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", | ||||||
|                                        port = 32180, stoppable = False, |                                        port = 32180, stoppable = False, | ||||||
| @@ -59,8 +63,7 @@ class TestCmdline(object): | |||||||
|  |  | ||||||
|     def run(self, arg_string, infile=None, outfile=None): |     def run(self, arg_string, infile=None, outfile=None): | ||||||
|         """Run a cmdline client with the specified argument string, |         """Run a cmdline client with the specified argument string, | ||||||
|         passing the given input.  Returns a tuple with the output and |         passing the given input.  Save the output and exit code.""" | ||||||
|         exit code""" |  | ||||||
|         # printf("TZ=UTC ./nilmtool.py %s\n", arg_string) |         # printf("TZ=UTC ./nilmtool.py %s\n", arg_string) | ||||||
|         os.environ['NILMDB_URL'] = "http://localhost:32180/" |         os.environ['NILMDB_URL'] = "http://localhost:32180/" | ||||||
|         class stdio_wrapper: |         class stdio_wrapper: | ||||||
| @@ -160,6 +163,12 @@ class TestCmdline(object): | |||||||
|         self.ok("--help") |         self.ok("--help") | ||||||
|         self.contain("usage:") |         self.contain("usage:") | ||||||
|  |  | ||||||
|  |         # help | ||||||
|  |         self.ok("--version") | ||||||
|  |         ver = self.captured | ||||||
|  |         self.ok("list --version") | ||||||
|  |         eq_(self.captured, ver) | ||||||
|  |  | ||||||
|         # fail for no args |         # fail for no args | ||||||
|         self.fail("") |         self.fail("") | ||||||
|  |  | ||||||
| @@ -285,6 +294,7 @@ class TestCmdline(object): | |||||||
|         self.ok("create /newton/zzz/rawnotch uint16_9") |         self.ok("create /newton/zzz/rawnotch uint16_9") | ||||||
|         self.ok("create /newton/prep float32_8") |         self.ok("create /newton/prep float32_8") | ||||||
|         self.ok("create /newton/raw uint16_6") |         self.ok("create /newton/raw uint16_6") | ||||||
|  |         self.ok("create /newton/raw~decim-1234 uint16_6") | ||||||
|  |  | ||||||
|         # Create a stream that already exists |         # Create a stream that already exists | ||||||
|         self.fail("create /newton/raw uint16_6") |         self.fail("create /newton/raw uint16_6") | ||||||
| @@ -300,13 +310,23 @@ class TestCmdline(object): | |||||||
|         self.fail("create /newton/zzz float32_8") |         self.fail("create /newton/zzz float32_8") | ||||||
|         self.contain("subdirs of this path already exist") |         self.contain("subdirs of this path already exist") | ||||||
|  |  | ||||||
|         # Verify we got those 3 streams and they're returned in |         # Verify we got those 4 streams and they're returned in | ||||||
|         # alphabetical order. |         # alphabetical order. | ||||||
|         self.ok("list -l") |         self.ok("list -l") | ||||||
|         self.match("/newton/prep float32_8\n" |         self.match("/newton/prep float32_8\n" | ||||||
|                    "/newton/raw uint16_6\n" |                    "/newton/raw uint16_6\n" | ||||||
|  |                    "/newton/raw~decim-1234 uint16_6\n" | ||||||
|                    "/newton/zzz/rawnotch uint16_9\n") |                    "/newton/zzz/rawnotch uint16_9\n") | ||||||
|  |  | ||||||
|  |         # No decimated streams if -n specified | ||||||
|  |         self.ok("list -n -l") | ||||||
|  |         self.match("/newton/prep float32_8\n" | ||||||
|  |                    "/newton/raw uint16_6\n" | ||||||
|  |                    "/newton/zzz/rawnotch uint16_9\n") | ||||||
|  |  | ||||||
|  |         # Delete that decimated stream | ||||||
|  |         self.ok("destroy /newton/raw~decim-1234") | ||||||
|  |  | ||||||
|         # Match just one type or one path.  Also check |         # Match just one type or one path.  Also check | ||||||
|         # that --path is optional |         # that --path is optional | ||||||
|         self.ok("list --layout /newton/raw") |         self.ok("list --layout /newton/raw") | ||||||
| @@ -864,14 +884,26 @@ class TestCmdline(object): | |||||||
|         self.ok("destroy -R /newton/prep") # destroy again |         self.ok("destroy -R /newton/prep") # destroy again | ||||||
|  |  | ||||||
|     def test_14_remove_files(self): |     def test_14_remove_files(self): | ||||||
|         # Test BulkData's ability to remove when data is split into |         # Limit max_removals, to cover more functionality. | ||||||
|         # multiple files.  Should be a fairly comprehensive test of |  | ||||||
|         # remove functionality. |  | ||||||
|         # Also limit max_removals, to cover more functionality. |  | ||||||
|         server_stop() |         server_stop() | ||||||
|         server_start(max_removals = 4321, |         server_start(max_removals = 4321, | ||||||
|                      bulkdata_args = { "file_size" : 920, # 23 rows per file |                      bulkdata_args = { "file_size" : 920, # 23 rows per file | ||||||
|                                        "files_per_dir" : 3 }) |                                        "files_per_dir" : 3 }) | ||||||
|  |         self.do_remove_files() | ||||||
|  |         self.ok("destroy -R /newton/prep") # destroy again | ||||||
|  |  | ||||||
|  |     def test_14b_remove_files_maxint(self): | ||||||
|  |         # Limit max_int_removals, to cover more functionality. | ||||||
|  |         server_stop() | ||||||
|  |         server_start(max_int_removals = 1, | ||||||
|  |                      bulkdata_args = { "file_size" : 920, # 23 rows per file | ||||||
|  |                                        "files_per_dir" : 3 }) | ||||||
|  |         self.do_remove_files() | ||||||
|  |  | ||||||
|  |     def do_remove_files(self): | ||||||
|  |         # Test BulkData's ability to remove when data is split into | ||||||
|  |         # multiple files.  Should be a fairly comprehensive test of | ||||||
|  |         # remove functionality. | ||||||
|  |  | ||||||
|         # Insert data.  Just for fun, insert out of order |         # Insert data.  Just for fun, insert out of order | ||||||
|         self.ok("create /newton/prep float32_8") |         self.ok("create /newton/prep float32_8") | ||||||
| @@ -1011,6 +1043,18 @@ class TestCmdline(object): | |||||||
|         self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" |         self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" | ||||||
|                    "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") |                    "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") | ||||||
|  |  | ||||||
|  |         # optimize | ||||||
|  |         self.ok("insert -s 01-01-2002 -e 01-01-2004 /diff/1 /dev/null") | ||||||
|  |         self.ok("intervals /diff/1") | ||||||
|  |         self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -" | ||||||
|  |                    "> Thu, 01 Jan 2004 00:00:00.000000 +0000 ]\n" | ||||||
|  |                    "[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" | ||||||
|  |                    "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") | ||||||
|  |         self.ok("intervals /diff/1 --optimize") | ||||||
|  |         self.ok("intervals /diff/1 -o") | ||||||
|  |         self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -" | ||||||
|  |                    "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") | ||||||
|  |  | ||||||
|         self.ok("destroy -R /diff/1") |         self.ok("destroy -R /diff/1") | ||||||
|         self.ok("destroy -R /diff/2") |         self.ok("destroy -R /diff/2") | ||||||
|  |  | ||||||
|   | |||||||
| @@ -110,6 +110,5 @@ class TestSerializer(Base): | |||||||
|     def test_iter(self): |     def test_iter(self): | ||||||
|         sp = nilmdb.utils.serializer_proxy |         sp = nilmdb.utils.serializer_proxy | ||||||
|         i = sp(ListLike)() |         i = sp(ListLike)() | ||||||
|         print iter(i) |  | ||||||
|         eq_(list(i), [1,2,3,4,5]) |         eq_(list(i), [1,2,3,4,5]) | ||||||
|         eq_(i[3], 3) |         eq_(i[3], 3) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user