Compare commits
	
		
			17 Commits
		
	
	
		
			nilmdb-1.9
			...
			nilmdb-1.1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8125d9c840 | |||
| ba55ad82f0 | |||
| 45c81d2019 | |||
| 78cfda32e3 | |||
| 3658d3876b | |||
| 022b50950f | |||
| e5efbadc8e | |||
| 74f633c9da | |||
| ab9a327130 | |||
| da72fc9777 | |||
| a01cb4132d | |||
| 7c3da2fe44 | |||
| f0e06dc436 | |||
| ddc0eb4264 | |||
| 0a22db3965 | |||
| 8bb8f068de | |||
| 416902097d | 
| @@ -18,10 +18,8 @@ class HTTPClient(object): | ||||
|             reparsed = urlparse.urlparse("http://" + baseurl).geturl() | ||||
|         self.baseurl = reparsed.rstrip('/') + '/' | ||||
|  | ||||
|         # Build Requests session object, enable SSL verification | ||||
|         # Note whether we want SSL verification | ||||
|         self.verify_ssl = verify_ssl | ||||
|         self.session = requests.Session() | ||||
|         self.session.verify = True | ||||
|  | ||||
|         # Saved response, so that tests can verify a few things. | ||||
|         self._last_response = {} | ||||
| @@ -59,17 +57,34 @@ class HTTPClient(object): | ||||
|                 raise Error(**args) | ||||
|  | ||||
|     def close(self): | ||||
|         self.session.close() | ||||
|         pass | ||||
|  | ||||
|     def _do_req(self, method, url, query_data, body_data, stream, headers): | ||||
|         url = urlparse.urljoin(self.baseurl, url) | ||||
|         try: | ||||
|             response = self.session.request(method, url, | ||||
|                                             params = query_data, | ||||
|                                             data = body_data, | ||||
|                                             stream = stream, | ||||
|                                             headers = headers, | ||||
|                                             verify = self.verify_ssl) | ||||
|             # Create a new session, ensure we send "Connection: close", | ||||
|             # and explicitly close connection after the transfer. | ||||
|             # This is to avoid HTTP/1.1 persistent connections | ||||
|             # (keepalive), because they have fundamental race | ||||
|             # conditions when there are delays between requests: | ||||
|             # a new request may be sent at the same instant that the | ||||
|             # server decides to timeout the connection. | ||||
|             session = requests.Session() | ||||
|             if headers is None: | ||||
|                 headers = {} | ||||
|             headers["Connection"] = "close" | ||||
|             response = session.request(method, url, | ||||
|                                        params = query_data, | ||||
|                                        data = body_data, | ||||
|                                        stream = stream, | ||||
|                                        headers = headers, | ||||
|                                        verify = self.verify_ssl) | ||||
|  | ||||
|             # Close the connection.  If it's a generator (stream = | ||||
|             # True), the requests library shouldn't actually close the | ||||
|             # HTTP connection until all data has been read from the | ||||
|             # response. | ||||
|             session.close() | ||||
|         except requests.RequestException as e: | ||||
|             raise ServerError(status = "502 Error", url = url, | ||||
|                               message = str(e.message)) | ||||
|   | ||||
| @@ -59,6 +59,8 @@ def retry_if_raised(exc, message = None, max_retries = 100): | ||||
|  | ||||
| class Progress(object): | ||||
|     def __init__(self, maxval): | ||||
|         if maxval == 0: | ||||
|             maxval = 1 | ||||
|         self.bar = progressbar.ProgressBar( | ||||
|             maxval = maxval, | ||||
|             widgets = [ progressbar.Percentage(), ' ', | ||||
| @@ -423,11 +425,15 @@ class Fsck(object): | ||||
|         for intv in ints: | ||||
|             last_ts = None | ||||
|             (stime, etime, spos, epos) = intv | ||||
|             if spos == epos: | ||||
|                 continue | ||||
|             for start in xrange(*slice(spos, epos, maxrows).indices(epos)): | ||||
|  | ||||
|             # Break interval into maxrows-sized chunks | ||||
|             next_start = spos | ||||
|             while next_start < epos: | ||||
|                 start = next_start | ||||
|                 stop = min(start + maxrows, epos) | ||||
|                 count = stop - start | ||||
|                 next_start = stop | ||||
|  | ||||
|                 # Get raw data, convert to NumPy arary | ||||
|                 try: | ||||
|                     raw = tab.get_data(start, stop, binary = True) | ||||
|   | ||||
| @@ -43,6 +43,12 @@ class BulkData(object): | ||||
|             # 32768 files per dir should work even on FAT32 | ||||
|             self.files_per_dir = 32768 | ||||
|  | ||||
|         if "initial_nrows" in kwargs: | ||||
|             self.initial_nrows = kwargs["initial_nrows"] | ||||
|         else: | ||||
|             # First row is 0 | ||||
|             self.initial_nrows = 0 | ||||
|  | ||||
|         # Make root path | ||||
|         if not os.path.isdir(self.root): | ||||
|             os.mkdir(self.root) | ||||
| @@ -254,7 +260,7 @@ class BulkData(object): | ||||
|         path = self._encode_filename(unicodepath) | ||||
|         elements = path.lstrip('/').split('/') | ||||
|         ospath = os.path.join(self.root, *elements) | ||||
|         return Table(ospath) | ||||
|         return Table(ospath, self.initial_nrows) | ||||
|  | ||||
| @nilmdb.utils.must_close(wrap_verify = False) | ||||
| class Table(object): | ||||
| @@ -291,9 +297,10 @@ class Table(object): | ||||
|             pickle.dump(fmt, f, 2) | ||||
|  | ||||
|     # Normal methods | ||||
|     def __init__(self, root): | ||||
|     def __init__(self, root, initial_nrows = 0): | ||||
|         """'root' is the full OS path to the directory of this table""" | ||||
|         self.root = root | ||||
|         self.initial_nrows = initial_nrows | ||||
|  | ||||
|         # Load the format | ||||
|         with open(os.path.join(self.root, "_format"), "rb") as f: | ||||
| @@ -353,8 +360,14 @@ class Table(object): | ||||
|             # Convert to row number | ||||
|             return self._row_from_offset(subdir, filename, offset) | ||||
|  | ||||
|         # No files, so no data | ||||
|         return 0 | ||||
|         # No files, so no data.  We typically start at row 0 in this | ||||
|         # case, although initial_nrows is specified during some tests | ||||
|         # to exercise other parts of the code better.  Since we have | ||||
|         # no files yet, round initial_nrows up so it points to a row | ||||
|         # that would begin a new file. | ||||
|         nrows = ((self.initial_nrows + (self.rows_per_file - 1)) // | ||||
|                  self.rows_per_file) * self.rows_per_file | ||||
|         return nrows | ||||
|  | ||||
|     def _offset_from_row(self, row): | ||||
|         """Return a (subdir, filename, offset, count) tuple: | ||||
|   | ||||
| @@ -23,7 +23,6 @@ from nilmdb.server.errors import NilmDBError, StreamError, OverlapError | ||||
| import sqlite3 | ||||
| import os | ||||
| import errno | ||||
| import bisect | ||||
|  | ||||
| # Note about performance and transactions: | ||||
| # | ||||
| @@ -83,8 +82,11 @@ _sql_schema_updates = { | ||||
| class NilmDB(object): | ||||
|     verbose = 0 | ||||
|  | ||||
|     def __init__(self, basepath, max_results=None, | ||||
|                  max_removals=None, bulkdata_args=None): | ||||
|     def __init__(self, basepath, | ||||
|                  max_results=None, | ||||
|                  max_removals=None, | ||||
|                  max_int_removals=None, | ||||
|                  bulkdata_args=None): | ||||
|         """Initialize NilmDB at the given basepath. | ||||
|         Other arguments are for debugging / testing: | ||||
|  | ||||
| @@ -92,7 +94,10 @@ class NilmDB(object): | ||||
|         stream_intervals or stream_extract response. | ||||
|  | ||||
|         'max_removals' is the max rows to delete at once | ||||
|         in stream_move. | ||||
|         in stream_remove. | ||||
|  | ||||
|         'max_int_removals' is the max intervals to delete | ||||
|         at once in stream_remove. | ||||
|  | ||||
|         'bulkdata_args' is kwargs for the bulkdata module. | ||||
|         """ | ||||
| @@ -134,6 +139,9 @@ class NilmDB(object): | ||||
|         # Remove up to this many rows per call to stream_remove. | ||||
|         self.max_removals = max_removals or 1048576 | ||||
|  | ||||
|         # Remove up to this many intervals per call to stream_remove. | ||||
|         self.max_int_removals = max_int_removals or 4096 | ||||
|  | ||||
|     def get_basepath(self): | ||||
|         return self.basepath | ||||
|  | ||||
| @@ -507,6 +515,17 @@ class NilmDB(object): | ||||
|         # And that's all | ||||
|         return | ||||
|  | ||||
|     def _bisect_left(self, a, x, lo, hi): | ||||
|         # Like bisect.bisect_left, but doesn't choke on large indices on | ||||
|         # 32-bit systems, like bisect's fast C implementation does. | ||||
|         while lo < hi: | ||||
|             mid = (lo + hi) / 2 | ||||
|             if a[mid] < x: | ||||
|                 lo = mid + 1 | ||||
|             else: | ||||
|                 hi = mid | ||||
|         return lo | ||||
|  | ||||
|     def _find_start(self, table, dbinterval): | ||||
|         """ | ||||
|         Given a DBInterval, find the row in the database that | ||||
| @@ -517,10 +536,10 @@ class NilmDB(object): | ||||
|         # Optimization for the common case where an interval wasn't truncated | ||||
|         if dbinterval.start == dbinterval.db_start: | ||||
|             return dbinterval.db_startpos | ||||
|         return bisect.bisect_left(table, | ||||
|                                   dbinterval.start, | ||||
|                                   dbinterval.db_startpos, | ||||
|                                   dbinterval.db_endpos) | ||||
|         return self._bisect_left(table, | ||||
|                                  dbinterval.start, | ||||
|                                  dbinterval.db_startpos, | ||||
|                                  dbinterval.db_endpos) | ||||
|  | ||||
|     def _find_end(self, table, dbinterval): | ||||
|         """ | ||||
| @@ -536,10 +555,10 @@ class NilmDB(object): | ||||
|         # want to include the given timestamp in the results.  This is | ||||
|         # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return | ||||
|         # non-overlapping data. | ||||
|         return bisect.bisect_left(table, | ||||
|                                   dbinterval.end, | ||||
|                                   dbinterval.db_startpos, | ||||
|                                   dbinterval.db_endpos) | ||||
|         return self._bisect_left(table, | ||||
|                                  dbinterval.end, | ||||
|                                  dbinterval.db_startpos, | ||||
|                                  dbinterval.db_endpos) | ||||
|  | ||||
|     def stream_extract(self, path, start = None, end = None, | ||||
|                        count = False, markup = False, binary = False): | ||||
| @@ -643,13 +662,22 @@ class NilmDB(object): | ||||
|         to_remove = Interval(start, end) | ||||
|         removed = 0 | ||||
|         remaining = self.max_removals | ||||
|         int_remaining = self.max_int_removals | ||||
|         restart = None | ||||
|  | ||||
|         # Can't remove intervals from within the iterator, so we need to | ||||
|         # remember what's currently in the intersection now. | ||||
|         all_candidates = list(intervals.intersection(to_remove, orig = True)) | ||||
|  | ||||
|         remove_start = None | ||||
|         remove_end = None | ||||
|  | ||||
|         for (dbint, orig) in all_candidates: | ||||
|             # Stop if we've hit the max number of interval removals | ||||
|             if int_remaining <= 0: | ||||
|                 restart = dbint.start | ||||
|                 break | ||||
|  | ||||
|             # Find row start and end | ||||
|             row_start = self._find_start(table, dbint) | ||||
|             row_end = self._find_end(table, dbint) | ||||
| @@ -670,14 +698,29 @@ class NilmDB(object): | ||||
|             # Remove interval from the database | ||||
|             self._remove_interval(stream_id, orig, dbint) | ||||
|  | ||||
|             # Remove data from the underlying table storage | ||||
|             table.remove(row_start, row_end) | ||||
|             # Remove data from the underlying table storage, | ||||
|             # coalescing adjacent removals to reduce the number of calls | ||||
|             # to table.remove. | ||||
|             if remove_end == row_start: | ||||
|                 # Extend our coalesced region | ||||
|                 remove_end = row_end | ||||
|             else: | ||||
|                 # Perform previous removal, then save this one | ||||
|                 if remove_end is not None: | ||||
|                     table.remove(remove_start, remove_end) | ||||
|                 remove_start = row_start | ||||
|                 remove_end = row_end | ||||
|  | ||||
|             # Count how many were removed | ||||
|             removed += row_end - row_start | ||||
|             remaining -= row_end - row_start | ||||
|             int_remaining -= 1 | ||||
|  | ||||
|             if restart is not None: | ||||
|                 break | ||||
|  | ||||
|         # Perform any final coalesced removal | ||||
|         if remove_end is not None: | ||||
|             table.remove(remove_start, remove_end) | ||||
|  | ||||
|         return (removed, restart) | ||||
|   | ||||
| @@ -429,7 +429,7 @@ class Server(object): | ||||
|         cherrypy.config.update({ | ||||
|             'server.socket_host': host, | ||||
|             'server.socket_port': port, | ||||
|             'engine.autoreload_on': False, | ||||
|             'engine.autoreload.on': False, | ||||
|             'server.max_request_body_size': 8*1024*1024, | ||||
|             }) | ||||
|         if self.embedded: | ||||
|   | ||||
| @@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False): | ||||
|  | ||||
|         @wrap_class_method | ||||
|         def __del__(orig, self, *args, **kwargs): | ||||
|             if "_must_close" in self.__dict__: | ||||
|                 fprintf(errorfile, "error: %s.close() wasn't called!\n", | ||||
|                         self.__class__.__name__) | ||||
|             return orig(self, *args, **kwargs) | ||||
|             try: | ||||
|                 if "_must_close" in self.__dict__: | ||||
|                     fprintf(errorfile, "error: %s.close() wasn't called!\n", | ||||
|                             self.__class__.__name__) | ||||
|                 return orig(self, *args, **kwargs) | ||||
|             except: # pragma: no cover | ||||
|                 pass | ||||
|  | ||||
|         @wrap_class_method | ||||
|         def close(orig, self, *args, **kwargs): | ||||
|   | ||||
| @@ -117,7 +117,10 @@ def serializer_proxy(obj_or_type): | ||||
|             return ret | ||||
|  | ||||
|         def __del__(self): | ||||
|             self.__call_queue.put((None, None, None, None)) | ||||
|             self.__thread.join() | ||||
|             try: | ||||
|                 self.__call_queue.put((None, None, None, None)) | ||||
|                 self.__thread.join() | ||||
|             except: # pragma: no cover | ||||
|                 pass | ||||
|  | ||||
|     return SerializerObjectProxy(obj_or_type) | ||||
|   | ||||
| @@ -87,7 +87,7 @@ def parse_time(toparse): | ||||
|     try: | ||||
|         return unix_to_timestamp(datetime_tz.datetime_tz. | ||||
|                                  smartparse(toparse).totimestamp()) | ||||
|     except (ValueError, OverflowError): | ||||
|     except (ValueError, OverflowError, TypeError): | ||||
|         pass | ||||
|  | ||||
|     # If it's parseable as a float, treat it as a Unix or NILM | ||||
|   | ||||
							
								
								
									
										13
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								setup.py
									
									
									
									
									
								
							| @@ -6,15 +6,6 @@ | ||||
| # Then just package it up: | ||||
| #   python setup.py sdist | ||||
|  | ||||
| # This is supposed to be using Distribute: | ||||
| # | ||||
| #   distutils provides a "setup" method. | ||||
| #   setuptools is a set of monkeypatches on top of that. | ||||
| #   distribute is a particular version/implementation of setuptools. | ||||
| # | ||||
| # So we don't really know if this is using the old setuptools or the | ||||
| # Distribute-provided version of setuptools. | ||||
|  | ||||
| import traceback | ||||
| import sys | ||||
| import os | ||||
| @@ -109,7 +100,7 @@ setup(name='nilmdb', | ||||
|                         'coverage', | ||||
|                         'numpy', | ||||
|                         ], | ||||
|       setup_requires = [ 'distribute', | ||||
|       setup_requires = [ 'setuptools', | ||||
|                          ], | ||||
|       install_requires = [ 'decorator', | ||||
|                            'cherrypy >= 3.2', | ||||
| @@ -117,7 +108,7 @@ setup(name='nilmdb', | ||||
|                            'python-dateutil', | ||||
|                            'pytz', | ||||
|                            'psutil >= 0.3.0', | ||||
|                            'requests >= 1.1.0, < 2.0.0', | ||||
|                            'requests >= 1.1.0', | ||||
|                            'progressbar >= 2.2', | ||||
|                            ], | ||||
|       packages = [ 'nilmdb', | ||||
|   | ||||
| @@ -690,40 +690,15 @@ class TestClient(object): | ||||
|         client.close() | ||||
|  | ||||
|     def test_client_12_persistent(self): | ||||
|         # Check that connections are persistent when they should be. | ||||
|         # This is pretty hard to test; we have to poke deep into | ||||
|         # the Requests library. | ||||
|         # Check that connections are NOT persistent.  Rather than trying | ||||
|         # to verify this at the TCP level, just make sure that the response | ||||
|         # contained a "Connection: close" header. | ||||
|         with nilmdb.client.Client(url = testurl) as c: | ||||
|             def connections(): | ||||
|                 try: | ||||
|                     poolmanager = c.http._last_response.connection.poolmanager | ||||
|                     pool = poolmanager.pools[('http','localhost',32180)] | ||||
|                     return (pool.num_connections, pool.num_requests) | ||||
|                 except Exception: | ||||
|                     raise SkipTest("can't get connection info") | ||||
|  | ||||
|             # First request makes a connection | ||||
|             c.stream_create("/persist/test", "uint16_1") | ||||
|             eq_(connections(), (1, 1)) | ||||
|             eq_(c.http._last_response.headers["Connection"], "close") | ||||
|  | ||||
|             # Non-generator | ||||
|             c.stream_list("/persist/test") | ||||
|             eq_(connections(), (1, 2)) | ||||
|             c.stream_list("/persist/test") | ||||
|             eq_(connections(), (1, 3)) | ||||
|  | ||||
|             # Generators | ||||
|             for x in c.stream_intervals("/persist/test"): | ||||
|                 pass | ||||
|             eq_(connections(), (1, 4)) | ||||
|             for x in c.stream_intervals("/persist/test"): | ||||
|                 pass | ||||
|             eq_(connections(), (1, 5)) | ||||
|  | ||||
|             # Clean up | ||||
|             c.stream_remove("/persist/test") | ||||
|             c.stream_destroy("/persist/test") | ||||
|             eq_(connections(), (1, 7)) | ||||
|             eq_(c.http._last_response.headers["Connection"], "close") | ||||
|  | ||||
|     def test_client_13_timestamp_rounding(self): | ||||
|         # Test potentially bad timestamps (due to floating point | ||||
|   | ||||
| @@ -21,13 +21,17 @@ from testutil.helpers import * | ||||
|  | ||||
| testdb = "tests/cmdline-testdb" | ||||
|  | ||||
| def server_start(max_results = None, max_removals = None, bulkdata_args = {}): | ||||
| def server_start(max_results = None, | ||||
|                  max_removals = None, | ||||
|                  max_int_removals = None, | ||||
|                  bulkdata_args = {}): | ||||
|     global test_server, test_db | ||||
|     # Start web app on a custom port | ||||
|     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( | ||||
|         testdb, | ||||
|         max_results = max_results, | ||||
|         max_removals = max_removals, | ||||
|         max_int_removals = max_int_removals, | ||||
|         bulkdata_args = bulkdata_args) | ||||
|     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", | ||||
|                                        port = 32180, stoppable = False, | ||||
| @@ -830,9 +834,12 @@ class TestCmdline(object): | ||||
|     def test_13_files(self): | ||||
|         # Test BulkData's ability to split into multiple files, | ||||
|         # by forcing the file size to be really small. | ||||
|         # Also increase the initial nrows, so that start/end positions | ||||
|         # in the database are very large (> 32 bit) | ||||
|         server_stop() | ||||
|         server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file | ||||
|                                        "files_per_dir" : 3 }) | ||||
|                                        "files_per_dir" : 3, | ||||
|                                        "initial_nrows" : 2**40 }) | ||||
|  | ||||
|         # Fill data | ||||
|         self.ok("create /newton/prep float32_8") | ||||
| @@ -880,14 +887,28 @@ class TestCmdline(object): | ||||
|         self.ok("destroy -R /newton/prep") # destroy again | ||||
|  | ||||
|     def test_14_remove_files(self): | ||||
|         # Test BulkData's ability to remove when data is split into | ||||
|         # multiple files.  Should be a fairly comprehensive test of | ||||
|         # remove functionality. | ||||
|         # Also limit max_removals, to cover more functionality. | ||||
|         # Limit max_removals, to cover more functionality. | ||||
|         server_stop() | ||||
|         server_start(max_removals = 4321, | ||||
|                      bulkdata_args = { "file_size" : 920, # 23 rows per file | ||||
|                                        "files_per_dir" : 3 }) | ||||
|                                        "files_per_dir" : 3, | ||||
|                                        "initial_nrows" : 2**40 }) | ||||
|         self.do_remove_files() | ||||
|         self.ok("destroy -R /newton/prep") # destroy again | ||||
|  | ||||
|     def test_14b_remove_files_maxint(self): | ||||
|         # Limit max_int_removals, to cover more functionality. | ||||
|         server_stop() | ||||
|         server_start(max_int_removals = 1, | ||||
|                      bulkdata_args = { "file_size" : 920, # 23 rows per file | ||||
|                                        "files_per_dir" : 3, | ||||
|                                        "initial_nrows" : 2**40 }) | ||||
|         self.do_remove_files() | ||||
|  | ||||
|     def do_remove_files(self): | ||||
|         # Test BulkData's ability to remove when data is split into | ||||
|         # multiple files.  Should be a fairly comprehensive test of | ||||
|         # remove functionality. | ||||
|  | ||||
|         # Insert data.  Just for fun, insert out of order | ||||
|         self.ok("create /newton/prep float32_8") | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| import nilmdb | ||||
| from nilmdb.utils.printf import * | ||||
| from nilmdb.utils import datetime_tz | ||||
|  | ||||
| from nose.tools import * | ||||
| from nose.tools import assert_raises | ||||
| @@ -19,6 +20,8 @@ class TestTimestamper(object): | ||||
|         def join(list): | ||||
|             return "\n".join(list) + "\n" | ||||
|  | ||||
|         datetime_tz.localtz_set("America/New_York") | ||||
|  | ||||
|         start = nilmdb.utils.time.parse_time("03/24/2012") | ||||
|         lines_in  = [ "hello", "world", "hello world", "# commented out" ] | ||||
|         lines_out = [ "1332561600000000 hello", | ||||
|   | ||||
		Reference in New Issue
	
	Block a user