Compare commits
	
		
			6 Commits
		
	
	
		
			nilmdb-1.5
			...
			nilmdb-1.6
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 49d04db1d6 | |||
| ea838d05ae | |||
| f2a48bdb2a | |||
| 6d14e0b8aa | |||
| b31b9327b9 | |||
| b98ff1331a | 
| @@ -421,3 +421,20 @@ and has all of the same functions.  It adds three new functions: | |||||||
| It is significantly faster!  It is about 20 times faster to decimate a | It is significantly faster!  It is about 20 times faster to decimate a | ||||||
| stream with `nilm-decimate` when the filter code is using the new | stream with `nilm-decimate` when the filter code is using the new | ||||||
| binary/numpy interface. | binary/numpy interface. | ||||||
|  |  | ||||||
|  |  | ||||||
|  | WSGI interface & chunked requests | ||||||
|  | --------------------------------- | ||||||
|  |  | ||||||
|  | mod_wsgi requires "WSGIChunkedRequest On" to handle | ||||||
|  | "Transfer-encoding: Chunked" requests.  However, `/stream/insert` | ||||||
|  | doesn't handle this correctly right now, because: | ||||||
|  |  | ||||||
|  | - The `cherrpy.request.body.read()` call needs to be fixed for chunked requests | ||||||
|  |  | ||||||
|  | - We don't want to just buffer endlessly in the server, and it will | ||||||
|  |   require some thought on how to handle data in chunks (what to do about | ||||||
|  |   interval endpoints). | ||||||
|  |  | ||||||
|  | It is probably better to just keep the endpoint management on the client | ||||||
|  | side, so leave "WSGIChunkedRequest off" for now. | ||||||
|   | |||||||
							
								
								
									
										50
									
								
								extras/fix-oversize-files.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								extras/fix-oversize-files.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,50 @@ | |||||||
|  | #!/usr/bin/python | ||||||
|  |  | ||||||
|  | import os | ||||||
|  | import sys | ||||||
|  | import cPickle as pickle | ||||||
|  | import argparse | ||||||
|  | import fcntl | ||||||
|  | import re | ||||||
|  | from nilmdb.client.numpyclient import layout_to_dtype | ||||||
|  |  | ||||||
|  | parser = argparse.ArgumentParser( | ||||||
|  |     description = """ | ||||||
|  | Fix database corruption where binary writes caused too much data to be | ||||||
|  | written to the file.  Truncates files to the correct length.  This was | ||||||
|  | fixed by b98ff1331a515ad47fd3203615e835b529b039f9. | ||||||
|  | """) | ||||||
|  | parser.add_argument("path", action="store", help='Database root path') | ||||||
|  | parser.add_argument("-y", "--yes", action="store_true", help='Fix them') | ||||||
|  | args = parser.parse_args() | ||||||
|  |  | ||||||
|  | lock = os.path.join(args.path, "data.lock") | ||||||
|  | with open(lock, "w") as f: | ||||||
|  |     fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | ||||||
|  |  | ||||||
|  |     fix = {} | ||||||
|  |  | ||||||
|  |     for (path, dirs, files) in os.walk(args.path): | ||||||
|  |         if "_format" in files: | ||||||
|  |             with open(os.path.join(path, "_format")) as format: | ||||||
|  |                 fmt = pickle.load(format) | ||||||
|  |                 rowsize = layout_to_dtype(fmt["layout"]).itemsize | ||||||
|  |                 maxsize = rowsize * fmt["rows_per_file"] | ||||||
|  |                 fix[path] = maxsize | ||||||
|  |                 if maxsize < 128000000: # sanity check | ||||||
|  |                     raise Exception("bad maxsize " + str(maxsize)) | ||||||
|  |  | ||||||
|  |     for fixpath in fix: | ||||||
|  |         for (path, dirs, files) in os.walk(fixpath): | ||||||
|  |             for fn in files: | ||||||
|  |                 if not re.match("^[0-9a-f]{4,}$", fn): | ||||||
|  |                     continue | ||||||
|  |                 fn = os.path.join(path, fn) | ||||||
|  |                 size = os.path.getsize(fn) | ||||||
|  |                 maxsize = fix[fixpath] | ||||||
|  |                 if size > maxsize: | ||||||
|  |                     diff = size - maxsize | ||||||
|  |                     print diff, "too big:", fn | ||||||
|  |                     if args.yes: | ||||||
|  |                         with open(fn, "a+") as dbfile: | ||||||
|  |                             dbfile.truncate(maxsize) | ||||||
| @@ -144,6 +144,7 @@ class Client(object): | |||||||
|         ctx = StreamInserter(self, path, start, end) |         ctx = StreamInserter(self, path, start, end) | ||||||
|         yield ctx |         yield ctx | ||||||
|         ctx.finalize() |         ctx.finalize() | ||||||
|  |         ctx.destroy() | ||||||
|  |  | ||||||
|     def stream_insert(self, path, data, start = None, end = None): |     def stream_insert(self, path, data, start = None, end = None): | ||||||
|         """Insert rows of data into a stream.  data should be a string |         """Insert rows of data into a stream.  data should be a string | ||||||
| @@ -293,6 +294,15 @@ class StreamInserter(object): | |||||||
|         self._block_data = [] |         self._block_data = [] | ||||||
|         self._block_len = 0 |         self._block_len = 0 | ||||||
|  |  | ||||||
|  |         self.destroyed = False | ||||||
|  |  | ||||||
|  |     def destroy(self): | ||||||
|  |         """Ensure this object can't be used again without raising | ||||||
|  |         an error""" | ||||||
|  |         def error(*args, **kwargs): | ||||||
|  |             raise Exception("don't reuse this context object") | ||||||
|  |         self._send_block = self.insert = self.finalize = self.send = error | ||||||
|  |  | ||||||
|     def insert(self, data): |     def insert(self, data): | ||||||
|         """Insert a chunk of ASCII formatted data in string form.  The |         """Insert a chunk of ASCII formatted data in string form.  The | ||||||
|         overall data must consist of lines terminated by '\\n'.""" |         overall data must consist of lines terminated by '\\n'.""" | ||||||
| @@ -439,7 +449,7 @@ class StreamInserter(object): | |||||||
|             self._interval_start = end_ts |             self._interval_start = end_ts | ||||||
|  |  | ||||||
|         # Double check endpoints |         # Double check endpoints | ||||||
|         if start_ts is None or end_ts is None: |         if (start_ts is None or end_ts is None) or (start_ts == end_ts): | ||||||
|             # If the block has no non-comment lines, it's OK |             # If the block has no non-comment lines, it's OK | ||||||
|             try: |             try: | ||||||
|                 self._get_first_noncomment(block) |                 self._get_first_noncomment(block) | ||||||
|   | |||||||
| @@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client): | |||||||
|         ctx = StreamInserterNumpy(self, path, start, end, dtype) |         ctx = StreamInserterNumpy(self, path, start, end, dtype) | ||||||
|         yield ctx |         yield ctx | ||||||
|         ctx.finalize() |         ctx.finalize() | ||||||
|  |         ctx.destroy() | ||||||
|  |  | ||||||
|     def stream_insert_numpy(self, path, data, start = None, end = None, |     def stream_insert_numpy(self, path, data, start = None, end = None, | ||||||
|                             layout = None): |                             layout = None): | ||||||
| @@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter): | |||||||
|         contiguous interval and may be None.  'dtype' is the Numpy |         contiguous interval and may be None.  'dtype' is the Numpy | ||||||
|         dtype for this stream. |         dtype for this stream. | ||||||
|         """ |         """ | ||||||
|         self.last_response = None |         super(StreamInserterNumpy, self).__init__(client, path, start, end) | ||||||
|  |  | ||||||
|         self._dtype = dtype |         self._dtype = dtype | ||||||
|         self._client = client |  | ||||||
|         self._path = path |  | ||||||
|  |  | ||||||
|         # Start and end for the overall contiguous interval we're |  | ||||||
|         # filling |  | ||||||
|         self._interval_start = start |  | ||||||
|         self._interval_end = end |  | ||||||
|  |  | ||||||
|         # Max rows to send at once |         # Max rows to send at once | ||||||
|         self._max_rows = self._max_data // self._dtype.itemsize |         self._max_rows = self._max_data // self._dtype.itemsize | ||||||
| @@ -250,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter): | |||||||
|             # Next block continues where this one ended |             # Next block continues where this one ended | ||||||
|             self._interval_start = end_ts |             self._interval_start = end_ts | ||||||
|  |  | ||||||
|         # If we have no endpoints, it's because we had no data to send. |         # If we have no endpoints, or equal endpoints, it's OK as long | ||||||
|         if start_ts is None or end_ts is None: |         # as there's no data to send | ||||||
|             return |         if (start_ts is None or end_ts is None) or (start_ts == end_ts): | ||||||
|  |             if len(array) == 0: | ||||||
|  |                 return | ||||||
|  |             raise ClientError("have data to send, but invalid start/end times") | ||||||
|  |  | ||||||
|         # Send it |         # Send it | ||||||
|         data = array.tostring() |         data = array.tostring() | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| from __future__ import print_function | from __future__ import print_function | ||||||
| from nilmdb.utils.printf import * | from nilmdb.utils.printf import * | ||||||
| import nilmdb.client | import nilmdb.client | ||||||
|  | import sys | ||||||
|  |  | ||||||
| def setup(self, sub): | def setup(self, sub): | ||||||
|     cmd = sub.add_parser("extract", help="Extract data", |     cmd = sub.add_parser("extract", help="Extract data", | ||||||
| @@ -24,6 +25,8 @@ def setup(self, sub): | |||||||
|                        ).completer = self.complete.time |                        ).completer = self.complete.time | ||||||
|  |  | ||||||
|     group = cmd.add_argument_group("Output format") |     group = cmd.add_argument_group("Output format") | ||||||
|  |     group.add_argument("-B", "--binary", action="store_true", | ||||||
|  |                        help="Raw binary output") | ||||||
|     group.add_argument("-b", "--bare", action="store_true", |     group.add_argument("-b", "--bare", action="store_true", | ||||||
|                        help="Exclude timestamps from output lines") |                        help="Exclude timestamps from output lines") | ||||||
|     group.add_argument("-a", "--annotate", action="store_true", |     group.add_argument("-a", "--annotate", action="store_true", | ||||||
| @@ -42,6 +45,11 @@ def cmd_extract_verify(self): | |||||||
|         if self.args.start > self.args.end: |         if self.args.start > self.args.end: | ||||||
|             self.parser.error("start is after end") |             self.parser.error("start is after end") | ||||||
|  |  | ||||||
|  |     if self.args.binary: | ||||||
|  |         if (self.args.bare or self.args.annotate or self.args.markup or | ||||||
|  |             self.args.timestamp_raw or self.args.count): | ||||||
|  |             self.parser.error("--binary cannot be combined with other options") | ||||||
|  |  | ||||||
| def cmd_extract(self): | def cmd_extract(self): | ||||||
|     streams = self.client.stream_list(self.args.path) |     streams = self.client.stream_list(self.args.path) | ||||||
|     if len(streams) != 1: |     if len(streams) != 1: | ||||||
| @@ -60,16 +68,23 @@ def cmd_extract(self): | |||||||
|         printf("# end: %s\n", time_string(self.args.end)) |         printf("# end: %s\n", time_string(self.args.end)) | ||||||
|  |  | ||||||
|     printed = False |     printed = False | ||||||
|  |     if self.args.binary: | ||||||
|  |         printer = sys.stdout.write | ||||||
|  |     else: | ||||||
|  |         printer = print | ||||||
|  |     bare = self.args.bare | ||||||
|  |     count = self.args.count | ||||||
|     for dataline in self.client.stream_extract(self.args.path, |     for dataline in self.client.stream_extract(self.args.path, | ||||||
|                                                self.args.start, |                                                self.args.start, | ||||||
|                                                self.args.end, |                                                self.args.end, | ||||||
|                                                self.args.count, |                                                self.args.count, | ||||||
|                                                self.args.markup): |                                                self.args.markup, | ||||||
|         if self.args.bare and not self.args.count: |                                                self.args.binary): | ||||||
|  |         if bare and not count: | ||||||
|             # Strip timestamp (first element).  Doesn't make sense |             # Strip timestamp (first element).  Doesn't make sense | ||||||
|             # if we are only returning a count. |             # if we are only returning a count. | ||||||
|             dataline = ' '.join(dataline.split(' ')[1:]) |             dataline = ' '.join(dataline.split(' ')[1:]) | ||||||
|         print(dataline) |         printer(dataline) | ||||||
|         printed = True |         printed = True | ||||||
|     if not printed: |     if not printed: | ||||||
|         if self.args.annotate: |         if self.args.annotate: | ||||||
|   | |||||||
| @@ -468,7 +468,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args) | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         /* Write binary data */ |         /* Write binary data */ | ||||||
|         if (fwrite(data, data_len, 1, self->file) != 1) { |         if (fwrite(data, self->binary_size, rows, self->file) != rows) { | ||||||
|                 PyErr_SetFromErrno(PyExc_OSError); |                 PyErr_SetFromErrno(PyExc_OSError); | ||||||
|                 return NULL; |                 return NULL; | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -620,8 +620,12 @@ class TestClient(object): | |||||||
|         with client.stream_insert_context("/empty/test", end = 950): |         with client.stream_insert_context("/empty/test", end = 950): | ||||||
|             pass |             pass | ||||||
|  |  | ||||||
|  |         # Equal start and end is OK as long as there's no data | ||||||
|  |         with client.stream_insert_context("/empty/test", start=9, end=9): | ||||||
|  |             pass | ||||||
|  |  | ||||||
|         # Try various things that might cause problems |         # Try various things that might cause problems | ||||||
|         with client.stream_insert_context("/empty/test", 1000, 1050): |         with client.stream_insert_context("/empty/test", 1000, 1050) as ctx: | ||||||
|             ctx.finalize() # inserts [1000, 1050] |             ctx.finalize() # inserts [1000, 1050] | ||||||
|             ctx.finalize() # nothing |             ctx.finalize() # nothing | ||||||
|             ctx.finalize() # nothing |             ctx.finalize() # nothing | ||||||
|   | |||||||
| @@ -601,6 +601,14 @@ class TestCmdline(object): | |||||||
|         self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") |         self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") | ||||||
|         self.match("43200\n") |         self.match("43200\n") | ||||||
|  |  | ||||||
|  |         # test binary mode | ||||||
|  |         self.fail("extract -c -B /newton/prep -s min -e max") | ||||||
|  |         self.contain("binary cannot be combined") | ||||||
|  |         self.fail("extract -m -B /newton/prep -s min -e max") | ||||||
|  |         self.contain("binary cannot be combined") | ||||||
|  |         self.ok("extract -B /newton/prep -s min -e max") | ||||||
|  |         eq_(len(self.captured), 43200 * (8 + 8*4)) | ||||||
|  |  | ||||||
|         # markup for 3 intervals, plus extra markup lines whenever we had |         # markup for 3 intervals, plus extra markup lines whenever we had | ||||||
|         # a "restart" from the nilmdb.stream_extract function |         # a "restart" from the nilmdb.stream_extract function | ||||||
|         self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01") |         self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01") | ||||||
|   | |||||||
| @@ -28,7 +28,10 @@ def setup_module(): | |||||||
|     recursive_unlink(testdb) |     recursive_unlink(testdb) | ||||||
|  |  | ||||||
|     # Start web app on a custom port |     # Start web app on a custom port | ||||||
|     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) |     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( | ||||||
|  |         testdb, bulkdata_args = { "file_size" : 16384, | ||||||
|  |                                   "files_per_dir" : 3 } ) | ||||||
|  |  | ||||||
|     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", |     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", | ||||||
|                                        port = 32180, stoppable = False, |                                        port = 32180, stoppable = False, | ||||||
|                                        fast_shutdown = True, |                                        fast_shutdown = True, | ||||||
| @@ -179,6 +182,17 @@ class TestNumpyClient(object): | |||||||
|         assert(np.array_equal(a,b)) |         assert(np.array_equal(a,b)) | ||||||
|         assert(np.array_equal(a,c)) |         assert(np.array_equal(a,c)) | ||||||
|  |  | ||||||
|  |         # Make sure none of the files are greater than 16384 bytes as | ||||||
|  |         # we configured with the bulkdata_args above. | ||||||
|  |         datapath = os.path.join(testdb, "data") | ||||||
|  |         for (dirpath, dirnames, filenames) in os.walk(datapath): | ||||||
|  |             for f in filenames: | ||||||
|  |                 fn = os.path.join(dirpath, f) | ||||||
|  |                 size = os.path.getsize(fn) | ||||||
|  |                 if size > 16384: | ||||||
|  |                     raise AssertionError(sprintf("%s is too big: %d > %d\n", | ||||||
|  |                                                  fn, size, 16384)) | ||||||
|  |  | ||||||
|         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data |         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data | ||||||
|         client.close() |         client.close() | ||||||
|  |  | ||||||
| @@ -295,8 +309,25 @@ class TestNumpyClient(object): | |||||||
|         with client.stream_insert_numpy_context("/empty/test", end = 950): |         with client.stream_insert_numpy_context("/empty/test", end = 950): | ||||||
|             pass |             pass | ||||||
|  |  | ||||||
|  |         # Equal start and end is OK as long as there's no data | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             with client.stream_insert_numpy_context("/empty/test", | ||||||
|  |                                                     start=9, end=9) as ctx: | ||||||
|  |                 ctx.insert([[9, 9]]) | ||||||
|  |                 ctx.finalize() | ||||||
|  |         in_("have data to send, but invalid start/end times", str(e.exception)) | ||||||
|  |  | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test", | ||||||
|  |                                                 start=9, end=9) as ctx: | ||||||
|  |             pass | ||||||
|  |  | ||||||
|  |         # reusing a context object is bad | ||||||
|  |         with assert_raises(Exception) as e: | ||||||
|  |             ctx.insert([[9, 9]]) | ||||||
|  |  | ||||||
|         # Try various things that might cause problems |         # Try various things that might cause problems | ||||||
|         with client.stream_insert_numpy_context("/empty/test", 1000, 1050): |         with client.stream_insert_numpy_context("/empty/test", | ||||||
|  |                                                 1000, 1050) as ctx: | ||||||
|             ctx.finalize() # inserts [1000, 1050] |             ctx.finalize() # inserts [1000, 1050] | ||||||
|             ctx.finalize() # nothing |             ctx.finalize() # nothing | ||||||
|             ctx.finalize() # nothing |             ctx.finalize() # nothing | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user