Compare commits
	
		
			50 Commits
		
	
	
		
			nilmdb-1.4
			...
			nilmdb-1.5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 99a4228285 | |||
| 230ec72609 | |||
| d36ece3767 | |||
| 231963538e | |||
| b4d6aad6de | |||
| e95142eabf | |||
| d21c3470bc | |||
| 7576883f49 | |||
| cc211542f8 | |||
| 8292dcf70b | |||
| b362fd37f6 | |||
| 41ec13ee17 | |||
| efa9aa9097 | |||
| d9afb48f45 | |||
| d1140e0f16 | |||
| 6091e44561 | |||
| e233ba790f | |||
| f0304b4c00 | |||
| 60594ca58e | |||
| c7f2df4abc | |||
| 5b7409f802 | |||
| 06038062a2 | |||
| ae9fe89759 | |||
| 04def60021 | |||
| 9ce0f69dff | |||
| 90c3be91c4 | |||
| ebccfb3531 | |||
| e006f1d02e | |||
| 5292319802 | |||
| 173121ca87 | |||
| 26bab031bd | |||
| b5fefffa09 | |||
| dccb3e370a | |||
| 95ca55aa7e | |||
| e01813f29d | |||
| 7f41e117a2 | |||
| dd5fc806e5 | |||
| f8ca8d31e6 | |||
| ed89d803f0 | |||
| 3d24092cd2 | |||
| 304bb43d85 | |||
| 59a79a30a5 | |||
| c0d450d39e | |||
| 6f14d609b2 | |||
| 77ef87456f | |||
| 32d6af935c | |||
| 6af3a6fc41 | |||
| f8a06fb3b7 | |||
| e790bb9e8a | |||
| 89be6f5931 | 
| @@ -10,6 +10,9 @@ Prerequisites: | ||||
|   sudo apt-get install python-cherrypy3 python-decorator python-simplejson | ||||
|   sudo apt-get install python-requests python-dateutil python-tz python-psutil | ||||
|  | ||||
|   # Other dependencies (required by some modules) | ||||
|   sudo apt-get install python-numpy | ||||
|  | ||||
|   # Tools for running tests | ||||
|   sudo apt-get install python-nose python-coverage | ||||
|  | ||||
| @@ -24,3 +27,5 @@ Usage: | ||||
|  | ||||
|   nilmdb-server --help | ||||
|   nilmtool --help | ||||
|  | ||||
| See docs/wsgi.md for info on setting up a WSGI application in Apache. | ||||
|   | ||||
| @@ -389,3 +389,35 @@ Possible solutions: | ||||
|     are always printed as int64 values, and a new format | ||||
|     "@1234567890123456" is added to the parser for specifying them | ||||
|     exactly. | ||||
|  | ||||
| Binary interface | ||||
| ---------------- | ||||
|  | ||||
| The ASCII interface is too slow for high-bandwidth processing, like | ||||
| sinefits, prep, etc.  A binary interface was added so that you can | ||||
| extract the raw binary out of the bulkdata storage.  This binary is | ||||
| a little-endian format, e.g. in C a uint16_6 stream would be: | ||||
|  | ||||
|     #include <endian.h> | ||||
|     #include <stdint.h> | ||||
|     struct { | ||||
|         int64_t timestamp_le; | ||||
|         uint16_t data_le[6]; | ||||
|     } __attribute__((packed)); | ||||
|  | ||||
| Remember to byteswap (with e.g. `letoh` in C)! | ||||
|  | ||||
| This interface is used by the new `nilmdb.client.numpyclient.NumpyClient` | ||||
| class, which is a subclass of the normal `nilmcb.client.client.Client` | ||||
| and has all of the same functions.  It adds three new functions: | ||||
|  | ||||
| - `stream_extract_numpy` to extract data as a Numpy array | ||||
|  | ||||
| - `stream_insert_numpy` to insert data as a Numpy array | ||||
|  | ||||
| - `stream_insert_numpy_context` is the context manager for | ||||
|   incrementally inserting data | ||||
|  | ||||
| It is significantly faster!  It is about 20 times faster to decimate a | ||||
| stream with `nilm-decimate` when the filter code is using the new | ||||
| binary/numpy interface. | ||||
|   | ||||
							
								
								
									
										32
									
								
								docs/wsgi.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								docs/wsgi.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,32 @@ | ||||
| WSGI Application in Apache | ||||
| -------------------------- | ||||
|  | ||||
| Install `apache2` and `libapache2-mod-wsgi` | ||||
|  | ||||
| We'll set up the database server at URL `http://myhost.com/nilmdb`. | ||||
| The database will be stored in `/home/nilm/db`, and the process will | ||||
| run as user `nilm`, group `nilm`. | ||||
|  | ||||
| First, create a WSGI script `/home/nilm/nilmdb.wsgi` containing: | ||||
|  | ||||
|     import nilmdb.server | ||||
|     application = nilmdb.server.wsgi_application("/home/nilm/db", "/nilmdb") | ||||
|  | ||||
| The first parameter is the local filesystem path, and the second | ||||
| parameter is the path part of the URL. | ||||
|  | ||||
| Then, set up Apache with a configuration like: | ||||
|  | ||||
|     <VirtualHost> | ||||
|         WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi | ||||
|         WSGIApplicationGroup nilmdb-appgroup | ||||
|         WSGIProcessGroup nilmdb-procgroup | ||||
|         WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm | ||||
|  | ||||
|         # Access control example: | ||||
|         <Location /nilmdb> | ||||
|             Order deny,allow | ||||
|             Deny from all | ||||
|             Allow from 1.2.3.4 | ||||
|         </Location> | ||||
|     </VirtualHost> | ||||
| @@ -6,6 +6,7 @@ import nilmdb.utils | ||||
| import nilmdb.client.httpclient | ||||
| from nilmdb.client.errors import ClientError | ||||
|  | ||||
| import re | ||||
| import time | ||||
| import simplejson as json | ||||
| import contextlib | ||||
| @@ -65,7 +66,12 @@ class Client(object): | ||||
|             params["layout"] = layout | ||||
|         if extended: | ||||
|             params["extended"] = 1 | ||||
|         return self.http.get("stream/list", params) | ||||
|         def sort_streams_nicely(x): | ||||
|             """Human-friendly sort (/stream/2 before /stream/10)""" | ||||
|             num = lambda t: int(t) if t.isdigit() else t | ||||
|             key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ] | ||||
|             return sorted(x, key = key) | ||||
|         return sort_streams_nicely(self.http.get("stream/list", params)) | ||||
|  | ||||
|     def stream_get_metadata(self, path, keys = None): | ||||
|         params = { "path": path } | ||||
| @@ -121,10 +127,11 @@ class Client(object): | ||||
|     @contextlib.contextmanager | ||||
|     def stream_insert_context(self, path, start = None, end = None): | ||||
|         """Return a context manager that allows data to be efficiently | ||||
|         inserted into a stream in a piecewise manner.  Data is be provided | ||||
|         as single lines, and is aggregated and sent to the server in larger | ||||
|         chunks as necessary.  Data lines must match the database layout for | ||||
|         the given path, and end with a newline. | ||||
|         inserted into a stream in a piecewise manner.  Data is | ||||
|         provided as ASCII lines, and is aggregated and sent to the | ||||
|         server in larger or smaller chunks as necessary.  Data lines | ||||
|         must match the database layout for the given path, and end | ||||
|         with a newline. | ||||
|  | ||||
|         Example: | ||||
|           with client.stream_insert_context('/path', start, end) as ctx: | ||||
| @@ -136,15 +143,16 @@ class Client(object): | ||||
|         This may make multiple requests to the server, if the data is | ||||
|         large enough or enough time has passed between insertions. | ||||
|         """ | ||||
|         ctx = StreamInserter(self.http, path, start, end) | ||||
|         ctx = StreamInserter(self, path, start, end) | ||||
|         yield ctx | ||||
|         ctx.finalize() | ||||
|  | ||||
|     def stream_insert(self, path, data, start = None, end = None): | ||||
|         """Insert rows of data into a stream.  data should be a string | ||||
|         or iterable that provides ASCII data that matches the database | ||||
|         layout for path.  See stream_insert_context for details on the | ||||
|         'start' and 'end' parameters.""" | ||||
|         layout for path.  Data is passed through stream_insert_context, | ||||
|         so it will be broken into reasonably-sized chunks and | ||||
|         start/end will be deduced if missing.""" | ||||
|         with self.stream_insert_context(path, start, end) as ctx: | ||||
|             if isinstance(data, basestring): | ||||
|                 ctx.insert(data) | ||||
| @@ -153,11 +161,28 @@ class Client(object): | ||||
|                     ctx.insert(chunk) | ||||
|         return ctx.last_response | ||||
|  | ||||
|     def stream_insert_block(self, path, data, start, end, binary = False): | ||||
|         """Insert a single fixed block of data into the stream.  It is | ||||
|         sent directly to the server in one block with no further | ||||
|         processing. | ||||
|  | ||||
|         If 'binary' is True, provide raw binary data in little-endian | ||||
|         format matching the path layout, including an int64 timestamp. | ||||
|         Otherwise, provide ASCII data matching the layout.""" | ||||
|         params = { | ||||
|             "path": path, | ||||
|             "start": timestamp_to_string(start), | ||||
|             "end": timestamp_to_string(end), | ||||
|         } | ||||
|         if binary: | ||||
|             params["binary"] = 1 | ||||
|         return self.http.put("stream/insert", data, params, binary = binary) | ||||
|  | ||||
|     def stream_intervals(self, path, start = None, end = None, diffpath = None): | ||||
|         """ | ||||
|         Return a generator that yields each stream interval. | ||||
|  | ||||
|         If diffpath is not None, yields only interval ranges that are | ||||
|         If 'diffpath' is not None, yields only interval ranges that are | ||||
|         present in 'path' but not in 'diffpath'. | ||||
|         """ | ||||
|         params = { | ||||
| @@ -171,14 +196,23 @@ class Client(object): | ||||
|             params["end"] = timestamp_to_string(end) | ||||
|         return self.http.get_gen("stream/intervals", params) | ||||
|  | ||||
|     def stream_extract(self, path, start = None, end = None, count = False): | ||||
|     def stream_extract(self, path, start = None, end = None, | ||||
|                        count = False, markup = False, binary = False): | ||||
|         """ | ||||
|         Extract data from a stream.  Returns a generator that yields | ||||
|         lines of ASCII-formatted data that matches the database | ||||
|         layout for the given path. | ||||
|  | ||||
|         Specify count = True to return a count of matching data points | ||||
|         If 'count' is True, return a count of matching data points | ||||
|         rather than the actual data.  The output format is unchanged. | ||||
|  | ||||
|         If 'markup' is True, include comments in the returned data | ||||
|         that indicate interval starts and ends. | ||||
|  | ||||
|         If 'binary' is True, return chunks of raw binary data, rather | ||||
|         than lines of ASCII-formatted data.  Raw binary data is | ||||
|         little-endian and matches the database types (including an | ||||
|         int64 timestamp). | ||||
|         """ | ||||
|         params = { | ||||
|             "path": path, | ||||
| @@ -189,7 +223,11 @@ class Client(object): | ||||
|             params["end"] = timestamp_to_string(end) | ||||
|         if count: | ||||
|             params["count"] = 1 | ||||
|         return self.http.get_gen("stream/extract", params) | ||||
|         if markup: | ||||
|             params["markup"] = 1 | ||||
|         if binary: | ||||
|             params["binary"] = 1 | ||||
|         return self.http.get_gen("stream/extract", params, binary = binary) | ||||
|  | ||||
|     def stream_count(self, path, start = None, end = None): | ||||
|         """ | ||||
| @@ -238,13 +276,13 @@ class StreamInserter(object): | ||||
|     _max_data = 2 * 1024 * 1024 | ||||
|     _max_data_after_send = 64 * 1024 | ||||
|  | ||||
|     def __init__(self, http, path, start = None, end = None): | ||||
|         """'http' is the httpclient object.  'path' is the database | ||||
|     def __init__(self, client, path, start, end): | ||||
|         """'client' is the client object.  'path' is the database | ||||
|         path to insert to.  'start' and 'end' are used for the first | ||||
|         contiguous interval.""" | ||||
|         contiguous interval and may be None.""" | ||||
|         self.last_response = None | ||||
|  | ||||
|         self._http = http | ||||
|         self._client = client | ||||
|         self._path = path | ||||
|  | ||||
|         # Start and end for the overall contiguous interval we're | ||||
| @@ -307,6 +345,11 @@ class StreamInserter(object): | ||||
|         part of a new interval and there may be a gap left in-between.""" | ||||
|         self._send_block(final = True) | ||||
|  | ||||
|     def send(self): | ||||
|         """Send any data that we might have buffered up.  Does not affect | ||||
|         any other treatment of timestamps or endpoints.""" | ||||
|         self._send_block(final = False) | ||||
|  | ||||
|     def _get_first_noncomment(self, block): | ||||
|         """Return the (start, end) indices of the first full line in | ||||
|         block that isn't a comment, or raise IndexError if | ||||
| @@ -407,9 +450,7 @@ class StreamInserter(object): | ||||
|             raise ClientError("have data to send, but no start/end times") | ||||
|  | ||||
|         # Send it | ||||
|         params = { "path": self._path, | ||||
|                    "start": timestamp_to_string(start_ts), | ||||
|                    "end": timestamp_to_string(end_ts) } | ||||
|         self.last_response = self._http.put("stream/insert", block, params) | ||||
|         self.last_response = self._client.stream_insert_block( | ||||
|             self._path, block, start_ts, end_ts, binary = False) | ||||
|  | ||||
|         return | ||||
|   | ||||
| @@ -16,7 +16,7 @@ class HTTPClient(object): | ||||
|         reparsed = urlparse.urlparse(baseurl).geturl() | ||||
|         if '://' not in reparsed: | ||||
|             reparsed = urlparse.urlparse("http://" + baseurl).geturl() | ||||
|         self.baseurl = reparsed | ||||
|         self.baseurl = reparsed.rstrip('/') + '/' | ||||
|  | ||||
|         # Build Requests session object, enable SSL verification | ||||
|         self.session = requests.Session() | ||||
| @@ -105,12 +105,17 @@ class HTTPClient(object): | ||||
|         else: | ||||
|             return self._req("POST", url, None, params) | ||||
|  | ||||
|     def put(self, url, data, params = None): | ||||
|     def put(self, url, data, params = None, binary = False): | ||||
|         """Simple PUT (parameters in URL, data in body)""" | ||||
|         return self._req("PUT", url, params, data) | ||||
|         if binary: | ||||
|             h = { 'Content-type': 'application/octet-stream' } | ||||
|         else: | ||||
|             h = { 'Content-type': 'text/plain; charset=utf-8' } | ||||
|         return self._req("PUT", url, query = params, body = data, headers = h) | ||||
|  | ||||
|     # Generator versions that return data one line at a time. | ||||
|     def _req_gen(self, method, url, query = None, body = None, headers = None): | ||||
|     def _req_gen(self, method, url, query = None, body = None, | ||||
|                  headers = None, binary = False): | ||||
|         """ | ||||
|         Make a request and return a generator that gives back strings | ||||
|         or JSON decoded lines of the body data, or raise an error if | ||||
| @@ -118,16 +123,19 @@ class HTTPClient(object): | ||||
|         """ | ||||
|         (response, isjson) = self._do_req(method, url, query, body, | ||||
|                                           stream = True, headers = headers) | ||||
|         if isjson: | ||||
|         if binary: | ||||
|             for chunk in response.iter_content(chunk_size = 65536): | ||||
|                 yield chunk | ||||
|         elif isjson: | ||||
|             for line in response.iter_lines(): | ||||
|                 yield json.loads(line) | ||||
|         else: | ||||
|             for line in response.iter_lines(): | ||||
|                 yield line | ||||
|  | ||||
|     def get_gen(self, url, params = None): | ||||
|     def get_gen(self, url, params = None, binary = False): | ||||
|         """Simple GET (parameters in URL) returning a generator""" | ||||
|         return self._req_gen("GET", url, params) | ||||
|         return self._req_gen("GET", url, params, binary = binary) | ||||
|  | ||||
|     # Not much use for a POST or PUT generator, since they don't | ||||
|     # return much data. | ||||
|   | ||||
							
								
								
									
										259
									
								
								nilmdb/client/numpyclient.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										259
									
								
								nilmdb/client/numpyclient.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,259 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
|  | ||||
| """Provide a NumpyClient class that is based on normal Client, but has | ||||
| additional methods for extracting and inserting data via Numpy arrays.""" | ||||
|  | ||||
| import nilmdb.utils | ||||
| import nilmdb.client.client | ||||
| import nilmdb.client.httpclient | ||||
| from nilmdb.client.errors import ClientError | ||||
|  | ||||
| import contextlib | ||||
| from nilmdb.utils.time import timestamp_to_string, string_to_timestamp | ||||
|  | ||||
| import numpy | ||||
| import cStringIO | ||||
|  | ||||
| def layout_to_dtype(layout): | ||||
|     ltype = layout.split('_')[0] | ||||
|     lcount = int(layout.split('_')[1]) | ||||
|     if ltype.startswith('int'): | ||||
|         atype = '<i' + str(int(ltype[3:]) / 8) | ||||
|     elif ltype.startswith('uint'): | ||||
|         atype = '<u' + str(int(ltype[4:]) / 8) | ||||
|     elif ltype.startswith('float'): | ||||
|         atype = '<f' + str(int(ltype[5:]) / 8) | ||||
|     else: | ||||
|         raise ValueError("bad layout") | ||||
|     return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)]) | ||||
|  | ||||
| class NumpyClient(nilmdb.client.client.Client): | ||||
|     """Subclass of nilmdb.client.Client that adds additional methods for | ||||
|     extracting and inserting data via Numpy arrays.""" | ||||
|  | ||||
|     def _get_dtype(self, path, layout): | ||||
|         if layout is None: | ||||
|             streams = self.stream_list(path) | ||||
|             if len(streams) != 1: | ||||
|                 raise ClientError("can't get layout for path: " + path) | ||||
|             layout = streams[0][1] | ||||
|         return layout_to_dtype(layout) | ||||
|  | ||||
|     def stream_extract_numpy(self, path, start = None, end = None, | ||||
|                              layout = None, maxrows = 100000, | ||||
|                              structured = False): | ||||
|         """ | ||||
|         Extract data from a stream.  Returns a generator that yields | ||||
|         Numpy arrays of up to 'maxrows' of data each. | ||||
|  | ||||
|         If 'layout' is None, it is read using stream_info. | ||||
|  | ||||
|         If 'structured' is False, all data is converted to float64 | ||||
|         and returned in a flat 2D array.  Otherwise, data is returned | ||||
|         as a structured dtype in a 1D array. | ||||
|         """ | ||||
|         dtype = self._get_dtype(path, layout) | ||||
|  | ||||
|         def to_numpy(data): | ||||
|             a = numpy.fromstring(data, dtype) | ||||
|             if structured: | ||||
|                 return a | ||||
|             return numpy.c_[a['timestamp'], a['data']] | ||||
|  | ||||
|         chunks = [] | ||||
|         total_len = 0 | ||||
|         maxsize = dtype.itemsize * maxrows | ||||
|         for data in self.stream_extract(path, start, end, binary = True): | ||||
|             # Add this block of binary data | ||||
|             chunks.append(data) | ||||
|             total_len += len(data) | ||||
|  | ||||
|             # See if we have enough to make the requested Numpy array | ||||
|             while total_len >= maxsize: | ||||
|                 assembled = "".join(chunks) | ||||
|                 total_len -= maxsize | ||||
|                 chunks = [ assembled[maxsize:] ] | ||||
|                 block = assembled[:maxsize] | ||||
|                 yield to_numpy(block) | ||||
|  | ||||
|         if total_len: | ||||
|             yield to_numpy("".join(chunks)) | ||||
|  | ||||
|     @contextlib.contextmanager | ||||
|     def stream_insert_numpy_context(self, path, start = None, end = None, | ||||
|                                     layout = None): | ||||
|         """Return a context manager that allows data to be efficiently | ||||
|         inserted into a stream in a piecewise manner.  Data is | ||||
|         provided as Numpy arrays, and is aggregated and sent to the | ||||
|         server in larger or smaller chunks as necessary.  Data format | ||||
|         must match the database layout for the given path. | ||||
|  | ||||
|         For more details, see help for | ||||
|         nilmdb.client.numpyclient.StreamInserterNumpy | ||||
|  | ||||
|         If 'layout' is not None, use it as the layout rather than | ||||
|         querying the database. | ||||
|         """ | ||||
|         dtype = self._get_dtype(path, layout) | ||||
|         ctx = StreamInserterNumpy(self, path, start, end, dtype) | ||||
|         yield ctx | ||||
|         ctx.finalize() | ||||
|  | ||||
|     def stream_insert_numpy(self, path, data, start = None, end = None, | ||||
|                             layout = None): | ||||
|         """Insert data into a stream.  data should be a Numpy array | ||||
|         which will be passed through stream_insert_numpy_context to | ||||
|         break it into chunks etc.  See the help for that function | ||||
|         for details.""" | ||||
|         with self.stream_insert_numpy_context(path, start, end, layout) as ctx: | ||||
|             if isinstance(data, numpy.ndarray): | ||||
|                 ctx.insert(data) | ||||
|             else: | ||||
|                 for chunk in data: | ||||
|                     ctx.insert(chunk) | ||||
|         return ctx.last_response | ||||
|  | ||||
| class StreamInserterNumpy(nilmdb.client.client.StreamInserter): | ||||
|     """Object returned by stream_insert_numpy_context() that manages | ||||
|     the insertion of rows of data into a particular path. | ||||
|  | ||||
|     See help for nilmdb.client.client.StreamInserter for details. | ||||
|     The only difference is that, instead of ASCII formatted data, | ||||
|     this context manager can take Numpy arrays, which are either | ||||
|     structured (1D with complex dtype) or flat (2D with simple dtype). | ||||
|     """ | ||||
|  | ||||
|     # Soft limit of how many bytes to send per HTTP request. | ||||
|     _max_data = 2 * 1024 * 1024 | ||||
|  | ||||
|     def __init__(self, client, path, start, end, dtype): | ||||
|         """ | ||||
|         'client' is the client object.  'path' is the database path | ||||
|         to insert to.  'start' and 'end' are used for the first | ||||
|         contiguous interval and may be None.  'dtype' is the Numpy | ||||
|         dtype for this stream. | ||||
|         """ | ||||
|         self.last_response = None | ||||
|  | ||||
|         self._dtype = dtype | ||||
|         self._client = client | ||||
|         self._path = path | ||||
|  | ||||
|         # Start and end for the overall contiguous interval we're | ||||
|         # filling | ||||
|         self._interval_start = start | ||||
|         self._interval_end = end | ||||
|  | ||||
|         # Max rows to send at once | ||||
|         self._max_rows = self._max_data // self._dtype.itemsize | ||||
|  | ||||
|         # List of the current arrays we're building up to send | ||||
|         self._block_arrays = [] | ||||
|         self._block_rows = 0 | ||||
|  | ||||
|     def insert(self, array): | ||||
|         """Insert Numpy data, which must match the layout type.""" | ||||
|         if type(array) != numpy.ndarray: | ||||
|             array = numpy.array(array) | ||||
|         if array.ndim == 1: | ||||
|             # Already a structured array; just verify the type | ||||
|             if array.dtype != self._dtype: | ||||
|                 raise ValueError("wrong dtype for 1D (structured) array") | ||||
|         elif array.ndim == 2: | ||||
|             # Convert to structured array | ||||
|             sarray = numpy.zeros(array.shape[0], dtype=self._dtype) | ||||
|             sarray['timestamp'] = array[:,0] | ||||
|             # Need the squeeze in case sarray['data'] is 1 dimensional | ||||
|             sarray['data'] = numpy.squeeze(array[:,1:]) | ||||
|             array = sarray | ||||
|         else: | ||||
|             raise ValueError("wrong number of dimensions in array") | ||||
|  | ||||
|         length = len(array) | ||||
|         maxrows = self._max_rows | ||||
|  | ||||
|         if length == 0: | ||||
|             return | ||||
|         if length > maxrows: | ||||
|             # This is more than twice what we wanted to send, so split | ||||
|             # it up.  This is a bit inefficient, but the user really | ||||
|             # shouldn't be providing this much data at once. | ||||
|             for cut in range(0, length, maxrows): | ||||
|                 self.insert(array[cut:(cut + maxrows)]) | ||||
|             return | ||||
|  | ||||
|         # Add this array to our list | ||||
|         self._block_arrays.append(array) | ||||
|         self._block_rows += length | ||||
|  | ||||
|         # Send if it's too long | ||||
|         if self._block_rows >= maxrows: | ||||
|             self._send_block(final = False) | ||||
|  | ||||
|     def _send_block(self, final = False): | ||||
|         """Send the data current stored up.  One row might be left | ||||
|         over if we need its timestamp saved.""" | ||||
|  | ||||
|         # Build the full array to send | ||||
|         if self._block_rows == 0: | ||||
|             array = numpy.zeros(0, dtype = self._dtype) | ||||
|         else: | ||||
|             array = numpy.hstack(self._block_arrays) | ||||
|  | ||||
|         # Get starting timestamp | ||||
|         start_ts = self._interval_start | ||||
|         if start_ts is None: | ||||
|             # Pull start from the first row | ||||
|             try: | ||||
|                 start_ts = array['timestamp'][0] | ||||
|             except IndexError: | ||||
|                 pass # no timestamp is OK, if we have no data | ||||
|  | ||||
|         # Get ending timestamp | ||||
|         if final: | ||||
|             # For a final block, the timestamp is either the | ||||
|             # user-provided end, or the timestamp of the last line | ||||
|             # plus epsilon. | ||||
|             end_ts = self._interval_end | ||||
|             if end_ts is None: | ||||
|                 try: | ||||
|                     end_ts = array['timestamp'][-1] | ||||
|                     end_ts += nilmdb.utils.time.epsilon | ||||
|                 except IndexError: | ||||
|                     pass # no timestamp is OK, if we have no data | ||||
|             self._block_arrays = [] | ||||
|             self._block_rows = 0 | ||||
|  | ||||
|             # Next block is completely fresh | ||||
|             self._interval_start = None | ||||
|             self._interval_end = None | ||||
|         else: | ||||
|             # An intermediate block.  We need to save the last row | ||||
|             # for the next block, and use its timestamp as the ending | ||||
|             # timestamp for this one. | ||||
|             if len(array) < 2: | ||||
|                 # Not enough data to send an intermediate block | ||||
|                 return | ||||
|             end_ts = array['timestamp'][-1] | ||||
|             if self._interval_end is not None and end_ts > self._interval_end: | ||||
|                 # User gave us bad endpoints; send it anyway, and let | ||||
|                 # the server complain so that the error is the same | ||||
|                 # as if we hadn't done this chunking. | ||||
|                 end_ts = self._interval_end | ||||
|             self._block_arrays = [ array[-1:] ] | ||||
|             self._block_rows = 1 | ||||
|             array = array[:-1] | ||||
|  | ||||
|             # Next block continues where this one ended | ||||
|             self._interval_start = end_ts | ||||
|  | ||||
|         # If we have no endpoints, it's because we had no data to send. | ||||
|         if start_ts is None or end_ts is None: | ||||
|             return | ||||
|  | ||||
|         # Send it | ||||
|         data = array.tostring() | ||||
|         self.last_response = self._client.stream_insert_block( | ||||
|             self._path, data, start_ts, end_ts, binary = True) | ||||
|  | ||||
|         return | ||||
| @@ -10,6 +10,7 @@ import sys | ||||
| import os | ||||
| import argparse | ||||
| from argparse import ArgumentDefaultsHelpFormatter as def_form | ||||
| import signal | ||||
|  | ||||
| try: # pragma: no cover | ||||
|     import argcomplete | ||||
| @@ -81,7 +82,7 @@ class Cmdline(object): | ||||
|     def __init__(self, argv = None): | ||||
|         self.argv = argv or sys.argv[1:] | ||||
|         self.client = None | ||||
|         self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380") | ||||
|         self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/") | ||||
|         self.subcmd = {} | ||||
|         self.complete = Complete() | ||||
|  | ||||
| @@ -126,6 +127,13 @@ class Cmdline(object): | ||||
|         sys.exit(-1) | ||||
|  | ||||
|     def run(self): | ||||
|         # Set SIGPIPE to its default handler -- we don't need Python | ||||
|         # to catch it for us. | ||||
|         try: | ||||
|             signal.signal(signal.SIGPIPE, signal.SIG_DFL) | ||||
|         except ValueError: # pragma: no cover | ||||
|             pass | ||||
|  | ||||
|         # Clear cached timezone, so that we can pick up timezone changes | ||||
|         # while running this from the test suite. | ||||
|         datetime_tz._localtz = None | ||||
|   | ||||
| @@ -29,6 +29,8 @@ def setup(self, sub): | ||||
|     group.add_argument("-a", "--annotate", action="store_true", | ||||
|                        help="Include comments with some information " | ||||
|                        "about the stream") | ||||
|     group.add_argument("-m", "--markup", action="store_true", | ||||
|                        help="Include comments with interval starts and ends") | ||||
|     group.add_argument("-T", "--timestamp-raw", action="store_true", | ||||
|                        help="Show raw timestamps in annotated information") | ||||
|     group.add_argument("-c", "--count", action="store_true", | ||||
| @@ -61,7 +63,8 @@ def cmd_extract(self): | ||||
|     for dataline in self.client.stream_extract(self.args.path, | ||||
|                                                self.args.start, | ||||
|                                                self.args.end, | ||||
|                                                self.args.count): | ||||
|                                                self.args.count, | ||||
|                                                self.args.markup): | ||||
|         if self.args.bare and not self.args.count: | ||||
|             # Strip timestamp (first element).  Doesn't make sense | ||||
|             # if we are only returning a count. | ||||
|   | ||||
| @@ -9,7 +9,8 @@ def setup(self, sub): | ||||
|                          a stream. | ||||
|                          """, | ||||
|                          usage="%(prog)s path [-g [key ...] | " | ||||
|                          "-s key=value [...] | -u key=value [...]]") | ||||
|                          "-s key=value [...] | -u key=value [...]] | " | ||||
|                          "-d [key ...]") | ||||
|     cmd.set_defaults(handler = cmd_metadata) | ||||
|  | ||||
|     group = cmd.add_argument_group("Required arguments") | ||||
| @@ -30,6 +31,9 @@ def setup(self, sub): | ||||
|                      help="Update metadata using provided " | ||||
|                      "key=value pairs", | ||||
|                      ).completer = self.complete.meta_keyval | ||||
|     exc.add_argument("-d", "--delete", nargs="*", metavar="key", | ||||
|                      help="Delete metadata for specified keys (default all)", | ||||
|                      ).completer = self.complete.meta_key | ||||
|     return cmd | ||||
|  | ||||
| def cmd_metadata(self): | ||||
| @@ -56,6 +60,16 @@ def cmd_metadata(self): | ||||
|             handler(self.args.path, data) | ||||
|         except nilmdb.client.ClientError as e: | ||||
|             self.die("error setting/updating metadata: %s", str(e)) | ||||
|     elif self.args.delete is not None: | ||||
|         # Delete (by setting values to empty strings) | ||||
|         keys = self.args.delete or None | ||||
|         try: | ||||
|             data = self.client.stream_get_metadata(self.args.path, keys) | ||||
|             for key in data: | ||||
|                 data[key] = "" | ||||
|             self.client.stream_update_metadata(self.args.path, data) | ||||
|         except nilmdb.client.ClientError as e: | ||||
|             self.die("error deleting metadata: %s", str(e)) | ||||
|     else: | ||||
|         # Get (or unspecified) | ||||
|         keys = self.args.get or None | ||||
| @@ -64,7 +78,7 @@ def cmd_metadata(self): | ||||
|         except nilmdb.client.ClientError as e: | ||||
|             self.die("error getting metadata: %s", str(e)) | ||||
|         for key, value in sorted(data.items()): | ||||
|             # Omit nonexistant keys | ||||
|             # Print nonexistant keys as having empty value | ||||
|             if value is None: | ||||
|                 value = "" | ||||
|             printf("%s=%s\n", key, value) | ||||
|   | ||||
| @@ -22,7 +22,7 @@ def main(): | ||||
|     group.add_argument('-p', '--port', help = 'Listen on the given port', | ||||
|                        type = int, default = 12380) | ||||
|     group.add_argument('-d', '--database', help = 'Database directory', | ||||
|                        default = os.path.join(os.getcwd(), "db")) | ||||
|                        default = "./db") | ||||
|     group.add_argument('-q', '--quiet', help = 'Silence output', | ||||
|                        action = 'store_true') | ||||
|     group.add_argument('-t', '--traceback', | ||||
|   | ||||
| @@ -17,5 +17,5 @@ except (ImportError, TypeError): # pragma: no cover | ||||
|     pass | ||||
|  | ||||
| from nilmdb.server.nilmdb import NilmDB | ||||
| from nilmdb.server.server import Server | ||||
| from nilmdb.server.server import Server, wsgi_application | ||||
| from nilmdb.server.errors import NilmDBError, StreamError, OverlapError | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import re | ||||
| import sys | ||||
| import tempfile | ||||
|  | ||||
| import nilmdb.utils.lock | ||||
| from . import rocket | ||||
|  | ||||
| # Up to 256 open file descriptors at any given time. | ||||
| @@ -26,6 +27,8 @@ class BulkData(object): | ||||
|     def __init__(self, basepath, **kwargs): | ||||
|         self.basepath = basepath | ||||
|         self.root = os.path.join(self.basepath, "data") | ||||
|         self.lock = self.root + ".lock" | ||||
|         self.lockfile = None | ||||
|  | ||||
|         # Tuneables | ||||
|         if "file_size" in kwargs: | ||||
| @@ -44,8 +47,22 @@ class BulkData(object): | ||||
|         if not os.path.isdir(self.root): | ||||
|             os.mkdir(self.root) | ||||
|  | ||||
|         # Create the lock | ||||
|         self.lockfile = open(self.lock, "w") | ||||
|         if not nilmdb.utils.lock.exclusive_lock(self.lockfile): | ||||
|             raise IOError('database at "' + self.basepath + | ||||
|                           '" is already locked by another process') | ||||
|  | ||||
|     def close(self): | ||||
|         self.getnode.cache_remove_all() | ||||
|         if self.lockfile: | ||||
|             nilmdb.utils.lock.exclusive_unlock(self.lockfile) | ||||
|             self.lockfile.close() | ||||
|             try: | ||||
|                 os.unlink(self.lock) | ||||
|             except OSError: # pragma: no cover | ||||
|                 pass | ||||
|             self.lockfile = None | ||||
|  | ||||
|     def _encode_filename(self, path): | ||||
|         # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(), | ||||
| @@ -62,7 +79,12 @@ class BulkData(object): | ||||
|         if Table.exists(ospath): | ||||
|             raise ValueError("stream already exists at this path") | ||||
|         if os.path.isdir(ospath): | ||||
|             raise ValueError("subdirs of this path already exist") | ||||
|             # Look for any files in subdirectories.  Fully empty subdirectories | ||||
|             # are OK; they might be there during a rename | ||||
|             for (root, dirs, files) in os.walk(ospath): | ||||
|                 if len(files): | ||||
|                     raise ValueError( | ||||
|                         "non-empty subdirs of this path already exist") | ||||
|  | ||||
|     def _create_parents(self, unicodepath): | ||||
|         """Verify the path name, and create parent directories if they | ||||
| @@ -134,7 +156,7 @@ class BulkData(object): | ||||
|  | ||||
|             # Open and cache it | ||||
|             self.getnode(unicodepath) | ||||
|         except: | ||||
|         except Exception: | ||||
|             exc_info = sys.exc_info() | ||||
|             try: | ||||
|                 os.rmdir(ospath) | ||||
| @@ -171,7 +193,6 @@ class BulkData(object): | ||||
|         # Basic checks | ||||
|         if oldospath == newospath: | ||||
|             raise ValueError("old and new paths are the same") | ||||
|         self._create_check_ospath(newospath) | ||||
|  | ||||
|         # Move the table to a temporary location | ||||
|         tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) | ||||
| @@ -179,6 +200,9 @@ class BulkData(object): | ||||
|         os.rename(oldospath, tmppath) | ||||
|  | ||||
|         try: | ||||
|             # Check destination path | ||||
|             self._create_check_ospath(newospath) | ||||
|  | ||||
|             # Create parent dirs for new location | ||||
|             self._create_parents(newunicodepath) | ||||
|  | ||||
| @@ -371,7 +395,7 @@ class Table(object): | ||||
|             # Try deleting subdir, too | ||||
|             try: | ||||
|                 os.rmdir(os.path.join(self.root, subdir)) | ||||
|             except: | ||||
|             except Exception: | ||||
|                 pass | ||||
|  | ||||
|     # Cache open files | ||||
| @@ -389,12 +413,16 @@ class Table(object): | ||||
|         return rocket.Rocket(self.layout, | ||||
|                              os.path.join(self.root, subdir, filename)) | ||||
|  | ||||
|     def append_string(self, data, start, end): | ||||
|     def append_data(self, data, start, end, binary = False): | ||||
|         """Parse the formatted string in 'data', according to the | ||||
|         current layout, and append it to the table.  If any timestamps | ||||
|         are non-monotonic, or don't fall between 'start' and 'end', | ||||
|         a ValueError is raised. | ||||
|  | ||||
|         If 'binary' is True, the data should be in raw binary format | ||||
|         instead: little-endian, matching the current table's layout, | ||||
|         including the int64 timestamp. | ||||
|  | ||||
|         If this function succeeds, it returns normally.  Otherwise, | ||||
|         the table is reverted back to its original state by truncating | ||||
|         or deleting files as necessary.""" | ||||
| @@ -413,17 +441,26 @@ class Table(object): | ||||
|                 # Ask the rocket object to parse and append up to "count" | ||||
|                 # rows of data, verifying things along the way. | ||||
|                 try: | ||||
|                     if binary: | ||||
|                         appender = f.append_binary | ||||
|                     else: | ||||
|                         appender = f.append_string | ||||
|                     (added_rows, data_offset, last_timestamp, linenum | ||||
|                      ) = f.append_string(count, data, data_offset, linenum, | ||||
|                      ) = appender(count, data, data_offset, linenum, | ||||
|                                   start, end, last_timestamp) | ||||
|                 except rocket.ParseError as e: | ||||
|                     (linenum, colnum, errtype, obj) = e.args | ||||
|                     if binary: | ||||
|                         where = "byte %d: " % (linenum) | ||||
|                     else: | ||||
|                         where = "line %d, column %d: " % (linenum, colnum) | ||||
|                     # Extract out the error line, add column marker | ||||
|                     try: | ||||
|                         if binary: | ||||
|                             raise IndexError | ||||
|                         bad = data.splitlines()[linenum-1] | ||||
|                         badptr = ' ' * (colnum - 1) + '^' | ||||
|                     except IndexError: # pragma: no cover | ||||
|                         bad += '\n' + ' ' * (colnum - 1) + '^' | ||||
|                     except IndexError: | ||||
|                         bad = "" | ||||
|                     if errtype == rocket.ERR_NON_MONOTONIC: | ||||
|                         err = "timestamp is not monotonically increasing" | ||||
| @@ -439,7 +476,7 @@ class Table(object): | ||||
|                     else: | ||||
|                         err = str(obj) | ||||
|                     raise ValueError("error parsing input data: " + | ||||
|                                      where + err + "\n" + bad + "\n" + badptr) | ||||
|                                      where + err + "\n" + bad) | ||||
|                 tot_rows += added_rows | ||||
|         except Exception: | ||||
|             # Some failure, so try to roll things back by truncating or | ||||
| @@ -455,7 +492,7 @@ class Table(object): | ||||
|             # Success, so update self.nrows accordingly | ||||
|             self.nrows = tot_rows | ||||
|  | ||||
|     def get_data(self, start, stop): | ||||
|     def get_data(self, start, stop, binary = False): | ||||
|         """Extract data corresponding to Python range [n:m], | ||||
|         and returns a formatted string""" | ||||
|         if (start is None or | ||||
| @@ -473,10 +510,13 @@ class Table(object): | ||||
|             if count > remaining: | ||||
|                 count = remaining | ||||
|             f = self.file_open(subdir, filename) | ||||
|             if binary: | ||||
|                 ret.append(f.extract_binary(offset, count)) | ||||
|             else: | ||||
|                 ret.append(f.extract_string(offset, count)) | ||||
|             remaining -= count | ||||
|             row += count | ||||
|         return "".join(ret) | ||||
|         return b"".join(ret) | ||||
|  | ||||
|     def __getitem__(self, row): | ||||
|         """Extract timestamps from a row, with table[n] notation.""" | ||||
| @@ -504,7 +544,7 @@ class Table(object): | ||||
|             with open(cachefile, "rb") as f: | ||||
|                 ranges = pickle.load(f) | ||||
|             cachefile_present = True | ||||
|         except: | ||||
|         except Exception: | ||||
|             ranges = [] | ||||
|             cachefile_present = False | ||||
|  | ||||
|   | ||||
| @@ -12,6 +12,7 @@ Manages both the SQL database and the table storage backend. | ||||
| from __future__ import absolute_import | ||||
| import nilmdb.utils | ||||
| from nilmdb.utils.printf import * | ||||
| from nilmdb.utils.time import timestamp_to_string | ||||
|  | ||||
| from nilmdb.utils.interval import IntervalError | ||||
| from nilmdb.server.interval import Interval, DBInterval, IntervalSet | ||||
| @@ -105,7 +106,9 @@ class NilmDB(object): | ||||
|         try: | ||||
|             os.makedirs(self.basepath) | ||||
|         except OSError as e: | ||||
|             if e.errno != errno.EEXIST: | ||||
|             if e.errno != errno.EEXIST: # pragma: no cover | ||||
|                 # (no coverage, because it's hard to trigger this case | ||||
|                 # if tests are run as root) | ||||
|                 raise IOError("can't create tree " + self.basepath) | ||||
|  | ||||
|         # Our data goes inside it | ||||
| @@ -116,8 +119,9 @@ class NilmDB(object): | ||||
|         self.con = sqlite3.connect(sqlfilename, check_same_thread = True) | ||||
|         try: | ||||
|             self._sql_schema_update() | ||||
|         finally: # pragma: no cover | ||||
|         except Exception: # pragma: no cover | ||||
|             self.data.close() | ||||
|             raise | ||||
|  | ||||
|         # See big comment at top about the performance implications of this | ||||
|         self.con.execute("PRAGMA synchronous=NORMAL") | ||||
| @@ -471,12 +475,16 @@ class NilmDB(object): | ||||
|             con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) | ||||
|             con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) | ||||
|  | ||||
|     def stream_insert(self, path, start, end, data): | ||||
|     def stream_insert(self, path, start, end, data, binary = False): | ||||
|         """Insert new data into the database. | ||||
|            path: Path at which to add the data | ||||
|            start: Starting timestamp | ||||
|            end: Ending timestamp | ||||
|            data: Textual data, formatted according to the layout of path | ||||
|  | ||||
|            'binary', if True, means that 'data' is raw binary: | ||||
|            little-endian, matching the current table's layout, | ||||
|            including the int64 timestamp. | ||||
|            """ | ||||
|         # First check for basic overlap using timestamp info given. | ||||
|         stream_id = self._stream_id(path) | ||||
| @@ -490,7 +498,7 @@ class NilmDB(object): | ||||
|         # there are any parse errors. | ||||
|         table = self.data.getnode(path) | ||||
|         row_start = table.nrows | ||||
|         table.append_string(data, start, end) | ||||
|         table.append_data(data, start, end, binary) | ||||
|         row_end = table.nrows | ||||
|  | ||||
|         # Insert the record into the sql database. | ||||
| @@ -533,7 +541,8 @@ class NilmDB(object): | ||||
|                                   dbinterval.db_startpos, | ||||
|                                   dbinterval.db_endpos) | ||||
|  | ||||
|     def stream_extract(self, path, start = None, end = None, count = False): | ||||
|     def stream_extract(self, path, start = None, end = None, | ||||
|                        count = False, markup = False, binary = False): | ||||
|         """ | ||||
|         Returns (data, restart) tuple. | ||||
|  | ||||
| @@ -546,10 +555,17 @@ class NilmDB(object): | ||||
|         and a new request with a start time of 'restart' will fetch | ||||
|         the next block of data. | ||||
|  | ||||
|         count, if true, means to not return raw data, but just the count | ||||
|         'count', if true, means to not return raw data, but just the count | ||||
|         of rows that would have been returned.  This is much faster | ||||
|         than actually fetching the data.  It is not limited by | ||||
|         max_results. | ||||
|  | ||||
|         'markup', if true, indicates that returned data should be | ||||
|         marked with a comment denoting when a particular interval | ||||
|         starts, and another comment when an interval ends. | ||||
|  | ||||
|         'binary', if true, means to return raw binary rather than | ||||
|         ASCII-formatted data. | ||||
|         """ | ||||
|         stream_id = self._stream_id(path) | ||||
|         table = self.data.getnode(path) | ||||
| @@ -560,6 +576,8 @@ class NilmDB(object): | ||||
|         matched = 0 | ||||
|         remaining = self.max_results | ||||
|         restart = None | ||||
|         if binary and (markup or count): | ||||
|             raise NilmDBError("binary mode can't be used with markup or count") | ||||
|         for interval in intervals.intersection(requested): | ||||
|             # Reading single rows from the table is too slow, so | ||||
|             # we use two bisections to find both the starting and | ||||
| @@ -578,14 +596,26 @@ class NilmDB(object): | ||||
|                 row_end = row_max | ||||
|                 restart = table[row_max] | ||||
|  | ||||
|             # Add markup | ||||
|             if markup: | ||||
|                 result.append("# interval-start " + | ||||
|                               timestamp_to_string(interval.start) + "\n") | ||||
|  | ||||
|             # Gather these results up | ||||
|             result.append(table.get_data(row_start, row_end)) | ||||
|             result.append(table.get_data(row_start, row_end, binary)) | ||||
|  | ||||
|             # Count them | ||||
|             remaining -= row_end - row_start | ||||
|  | ||||
|             # Add markup, and exit if restart is set. | ||||
|             if restart is not None: | ||||
|                 if markup: | ||||
|                     result.append("# interval-end " + | ||||
|                                   timestamp_to_string(restart) + "\n") | ||||
|                 break | ||||
|             if markup: | ||||
|                 result.append("# interval-end " + | ||||
|                               timestamp_to_string(interval.end) + "\n") | ||||
|  | ||||
|         if count: | ||||
|             return matched | ||||
|   | ||||
| @@ -419,6 +419,68 @@ extra_data_on_line: | ||||
| 			 ERR_OTHER, "extra data on line"); | ||||
| } | ||||
|  | ||||
| /**** | ||||
|  * Append from binary data | ||||
|  */ | ||||
|  | ||||
| /* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */ | ||||
| static PyObject *Rocket_append_binary(Rocket *self, PyObject *args) | ||||
| { | ||||
|         int count; | ||||
| 	const uint8_t *data; | ||||
|         int data_len; | ||||
|         int linenum; | ||||
| 	int offset; | ||||
| 	timestamp_t start; | ||||
| 	timestamp_t end; | ||||
| 	timestamp_t last_timestamp; | ||||
|  | ||||
| 	if (!PyArg_ParseTuple(args, "it#iilll:append_binary", | ||||
|                               &count, &data, &data_len, &offset, | ||||
|                               &linenum, &start, &end, &last_timestamp)) | ||||
| 		return NULL; | ||||
|  | ||||
|         /* Advance to offset */ | ||||
|         if (offset > data_len) | ||||
|                 return raise_str(0, 0, ERR_OTHER, "bad offset"); | ||||
|         data += offset; | ||||
|         data_len -= offset; | ||||
|  | ||||
|         /* Figure out max number of rows to insert */ | ||||
|         int rows = data_len / self->binary_size; | ||||
|         if (rows > count) | ||||
|                 rows = count; | ||||
|  | ||||
|         /* Check timestamps */ | ||||
|         timestamp_t ts; | ||||
| 	int i; | ||||
|         for (i = 0; i < rows; i++) { | ||||
|                 /* Read raw timestamp, byteswap if needed */ | ||||
|                 memcpy(&ts, &data[i * self->binary_size], 8); | ||||
|                 ts = le64toh(ts); | ||||
|  | ||||
|                 /* Check limits */ | ||||
|                 if (ts <= last_timestamp) | ||||
|                         return raise_int(i, 0, ERR_NON_MONOTONIC, ts); | ||||
|                 last_timestamp = ts; | ||||
|                 if (ts < start || ts >= end) | ||||
|                         return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts); | ||||
|         } | ||||
|  | ||||
|         /* Write binary data */ | ||||
|         if (fwrite(data, data_len, 1, self->file) != 1) { | ||||
|                 PyErr_SetFromErrno(PyExc_OSError); | ||||
|                 return NULL; | ||||
|         } | ||||
| 	fflush(self->file); | ||||
|  | ||||
| 	/* Build return value and return */ | ||||
| 	PyObject *o; | ||||
| 	o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size, | ||||
|                           last_timestamp, linenum); | ||||
| 	return o; | ||||
| } | ||||
|  | ||||
| /**** | ||||
|  * Extract to string | ||||
|  */ | ||||
| @@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args) | ||||
| 			/* read and format in a loop */			\ | ||||
| 			for (i = 0; i < self->layout_count; i++) {	\ | ||||
| 				if (fread(&disktype, bytes,		\ | ||||
| 					  1, self->file) < 0)		\ | ||||
| 					  1, self->file) != 1)		\ | ||||
| 					goto err;			\ | ||||
| 				disktype = letoh(disktype);		\ | ||||
| 				ret = sprintf(&str[len], " " fmt,	\ | ||||
| @@ -527,6 +589,46 @@ err: | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| /**** | ||||
|  * Extract to binary string containing raw little-endian binary data | ||||
|  */ | ||||
| static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args) | ||||
| { | ||||
| 	long count; | ||||
| 	long offset; | ||||
|  | ||||
| 	if (!PyArg_ParseTuple(args, "ll", &offset, &count)) | ||||
| 		return NULL; | ||||
| 	if (!self->file) { | ||||
| 		PyErr_SetString(PyExc_Exception, "no file"); | ||||
| 		return NULL; | ||||
| 	} | ||||
| 	/* Seek to target location */ | ||||
| 	if (fseek(self->file, offset, SEEK_SET) < 0) { | ||||
| 		PyErr_SetFromErrno(PyExc_OSError); | ||||
| 		return NULL; | ||||
| 	} | ||||
|  | ||||
|         uint8_t *str; | ||||
|         int len = count * self->binary_size; | ||||
|         str = malloc(len); | ||||
|         if (str == NULL) { | ||||
|                 PyErr_SetFromErrno(PyExc_OSError); | ||||
|                 return NULL; | ||||
|         } | ||||
|  | ||||
|         /* Data in the file is already in the desired little-endian | ||||
|            binary format, so just read it directly. */ | ||||
|         if (fread(str, self->binary_size, count, self->file) != count) { | ||||
|                 free(str); | ||||
|                 PyErr_SetFromErrno(PyExc_OSError); | ||||
|                 return NULL; | ||||
|         } | ||||
|  | ||||
| 	PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len); | ||||
| 	free(str); | ||||
| 	return pystr; | ||||
| } | ||||
|  | ||||
| /**** | ||||
|  * Extract timestamp | ||||
| @@ -571,11 +673,13 @@ static PyMemberDef Rocket_members[] = { | ||||
| }; | ||||
|  | ||||
| static PyMethodDef Rocket_methods[] = { | ||||
| 	{ "close", (PyCFunction)Rocket_close, METH_NOARGS, | ||||
| 	{ "close", | ||||
|           (PyCFunction)Rocket_close, METH_NOARGS, | ||||
| 	  "close(self)\n\n" | ||||
| 	  "Close file handle" }, | ||||
|  | ||||
| 	{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS, | ||||
| 	{ "append_string", | ||||
|           (PyCFunction)Rocket_append_string, METH_VARARGS, | ||||
| 	  "append_string(self, count, data, offset, line, start, end, ts)\n\n" | ||||
|           "Parse string and append data.\n" | ||||
| 	  "\n" | ||||
| @@ -590,16 +694,46 @@ static PyMethodDef Rocket_methods[] = { | ||||
| 	  "Raises ParseError if timestamps are non-monotonic, outside\n" | ||||
| 	  "the start/end interval etc.\n" | ||||
| 	  "\n" | ||||
|           "On success, return a tuple with three values:\n" | ||||
|           "On success, return a tuple:\n" | ||||
|           "  added_rows: how many rows were added from the file\n" | ||||
|           "  data_offset: current offset into the data string\n" | ||||
|           "  last_timestamp: last timestamp we parsed" }, | ||||
|           "  last_timestamp: last timestamp we parsed\n" | ||||
|           "  linenum: current line number" }, | ||||
|  | ||||
| 	{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS, | ||||
| 	{ "append_binary", | ||||
| 	  (PyCFunction)Rocket_append_binary, METH_VARARGS, | ||||
| 	  "append_binary(self, count, data, offset, line, start, end, ts)\n\n" | ||||
|           "Append binary data, which must match the data layout.\n" | ||||
| 	  "\n" | ||||
| 	  "  count: maximum number of rows to add\n" | ||||
|           "  data: binary data\n" | ||||
|           "  offset: byte offset into data to start adding\n" | ||||
|           "  line: current line number (unused)\n" | ||||
|           "  start: starting timestamp for interval\n" | ||||
|           "  end: end timestamp for interval\n" | ||||
|           "  ts: last timestamp that was previously parsed\n" | ||||
| 	  "\n" | ||||
| 	  "Raises ParseError if timestamps are non-monotonic, outside\n" | ||||
| 	  "the start/end interval etc.\n" | ||||
| 	  "\n" | ||||
|           "On success, return a tuple:\n" | ||||
|           "  added_rows: how many rows were added from the file\n" | ||||
|           "  data_offset: current offset into the data string\n" | ||||
|           "  last_timestamp: last timestamp we parsed\n" | ||||
|           "  linenum: current line number (copied from argument)" }, | ||||
|  | ||||
| 	{ "extract_string", | ||||
|           (PyCFunction)Rocket_extract_string, METH_VARARGS, | ||||
| 	  "extract_string(self, offset, count)\n\n" | ||||
| 	  "Extract count rows of data from the file at offset offset.\n" | ||||
| 	  "Return an ascii formatted string according to the layout" }, | ||||
|  | ||||
| 	{ "extract_binary", | ||||
| 	  (PyCFunction)Rocket_extract_binary, METH_VARARGS, | ||||
| 	  "extract_binary(self, offset, count)\n\n" | ||||
| 	  "Extract count rows of data from the file at offset offset.\n" | ||||
| 	  "Return a raw binary string of data matching the data layout." }, | ||||
|  | ||||
| 	{ "extract_timestamp", | ||||
| 	  (PyCFunction)Rocket_extract_timestamp, METH_VARARGS, | ||||
| 	  "extract_timestamp(self, offset)\n\n" | ||||
|   | ||||
| @@ -11,9 +11,11 @@ from nilmdb.utils.time import string_to_timestamp | ||||
| import cherrypy | ||||
| import sys | ||||
| import os | ||||
| import socket | ||||
| import simplejson as json | ||||
| import decorator | ||||
| import psutil | ||||
| import traceback | ||||
|  | ||||
| class NilmApp(object): | ||||
|     def __init__(self, db): | ||||
| @@ -172,6 +174,21 @@ class Root(NilmApp): | ||||
| class Stream(NilmApp): | ||||
|     """Stream-specific operations""" | ||||
|  | ||||
|     # Helpers | ||||
|     def _get_times(self, start_param, end_param): | ||||
|         (start, end) = (None, None) | ||||
|         if start_param is not None: | ||||
|             start = string_to_timestamp(start_param) | ||||
|         if end_param is not None: | ||||
|             end = string_to_timestamp(end_param) | ||||
|         if start is not None and end is not None: | ||||
|             if start >= end: | ||||
|                 raise cherrypy.HTTPError( | ||||
|                     "400 Bad Request", | ||||
|                     sprintf("start must precede end (%s >= %s)", | ||||
|                             start_param, end_param)) | ||||
|         return (start, end) | ||||
|  | ||||
|     # /stream/list | ||||
|     # /stream/list?layout=float32_8 | ||||
|     # /stream/list?path=/newton/prep&extended=1 | ||||
| @@ -288,10 +305,15 @@ class Stream(NilmApp): | ||||
|     @cherrypy.tools.json_out() | ||||
|     @exception_to_httperror(NilmDBError, ValueError) | ||||
|     @cherrypy.tools.CORS_allow(methods = ["PUT"]) | ||||
|     def insert(self, path, start, end): | ||||
|     def insert(self, path, start, end, binary = False): | ||||
|         """ | ||||
|         Insert new data into the database.  Provide textual data | ||||
|         (matching the path's layout) as a HTTP PUT. | ||||
|  | ||||
|         If 'binary' is True, expect raw binary data, rather than lines | ||||
|         of ASCII-formatted data.  Raw binary data is always | ||||
|         little-endian and matches the database types (including an | ||||
|         int64 timestamp). | ||||
|         """ | ||||
|         # Important that we always read the input before throwing any | ||||
|         # errors, to keep lengths happy for persistent connections. | ||||
| @@ -299,21 +321,24 @@ class Stream(NilmApp): | ||||
|         # requests, if we ever want to handle those (issue #1134) | ||||
|         body = cherrypy.request.body.read() | ||||
|  | ||||
|         # Verify content type for binary data | ||||
|         content_type = cherrypy.request.headers.get('content-type') | ||||
|         if binary and content_type: | ||||
|             if content_type != "application/octet-stream": | ||||
|                 raise cherrypy.HTTPError("400", "Content type must be " | ||||
|                                          "application/octet-stream for " | ||||
|                                          "binary data, not " + content_type) | ||||
|  | ||||
|         # Check path and get layout | ||||
|         streams = self.db.stream_list(path = path) | ||||
|         if len(streams) != 1: | ||||
|             raise cherrypy.HTTPError("404 Not Found", "No such stream") | ||||
|         if len(self.db.stream_list(path = path)) != 1: | ||||
|             raise cherrypy.HTTPError("404", "No such stream: " + path) | ||||
|  | ||||
|         # Check limits | ||||
|         start = string_to_timestamp(start) | ||||
|         end = string_to_timestamp(end) | ||||
|         if start >= end: | ||||
|             raise cherrypy.HTTPError("400 Bad Request", | ||||
|                                      "start must precede end") | ||||
|         (start, end) = self._get_times(start, end) | ||||
|  | ||||
|         # Pass the data directly to nilmdb, which will parse it and | ||||
|         # raise a ValueError if there are any problems. | ||||
|         self.db.stream_insert(path, start, end, body) | ||||
|         self.db.stream_insert(path, start, end, body, binary) | ||||
|  | ||||
|         # Done | ||||
|         return | ||||
| @@ -331,14 +356,7 @@ class Stream(NilmApp): | ||||
|         the interval [start, end).  Returns the number of data points | ||||
|         removed. | ||||
|         """ | ||||
|         if start is not None: | ||||
|             start = string_to_timestamp(start) | ||||
|         if end is not None: | ||||
|             end = string_to_timestamp(end) | ||||
|         if start is not None and end is not None: | ||||
|             if start >= end: | ||||
|                 raise cherrypy.HTTPError("400 Bad Request", | ||||
|                                          "start must precede end") | ||||
|         (start, end) = self._get_times(start, end) | ||||
|         total_removed = 0 | ||||
|         while True: | ||||
|             (removed, restart) = self.db.stream_remove(path, start, end) | ||||
| @@ -369,15 +387,7 @@ class Stream(NilmApp): | ||||
|         Note that the response type is the non-standard | ||||
|         'application/x-json-stream' for lack of a better option. | ||||
|         """ | ||||
|         if start is not None: | ||||
|             start = string_to_timestamp(start) | ||||
|         if end is not None: | ||||
|             end = string_to_timestamp(end) | ||||
|  | ||||
|         if start is not None and end is not None: | ||||
|             if start >= end: | ||||
|                 raise cherrypy.HTTPError("400 Bad Request", | ||||
|                                          "start must precede end") | ||||
|         (start, end) = self._get_times(start, end) | ||||
|  | ||||
|         if len(self.db.stream_list(path = path)) != 1: | ||||
|             raise cherrypy.HTTPError("404", "No such stream: " + path) | ||||
| @@ -401,48 +411,58 @@ class Stream(NilmApp): | ||||
|     # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 | ||||
|     @cherrypy.expose | ||||
|     @chunked_response | ||||
|     @response_type("text/plain") | ||||
|     def extract(self, path, start = None, end = None, count = False): | ||||
|     def extract(self, path, start = None, end = None, | ||||
|                 count = False, markup = False, binary = False): | ||||
|         """ | ||||
|         Extract data from backend database.  Streams the resulting | ||||
|         entries as ASCII text lines separated by newlines.  This may | ||||
|         make multiple requests to the nilmdb backend to avoid causing | ||||
|         it to block for too long. | ||||
|  | ||||
|         Add count=True to return a count rather than actual data. | ||||
|         """ | ||||
|         if start is not None: | ||||
|             start = string_to_timestamp(start) | ||||
|         if end is not None: | ||||
|             end = string_to_timestamp(end) | ||||
|         If 'count' is True, returns a count rather than actual data. | ||||
|  | ||||
|         # Check parameters | ||||
|         if start is not None and end is not None: | ||||
|             if start >= end: | ||||
|                 raise cherrypy.HTTPError("400 Bad Request", | ||||
|                                          "start must precede end") | ||||
|         If 'markup' is True, adds comments to the stream denoting each | ||||
|         interval's start and end timestamp. | ||||
|  | ||||
|         If 'binary' is True, return raw binary data, rather than lines | ||||
|         of ASCII-formatted data.  Raw binary data is always | ||||
|         little-endian and matches the database types (including an | ||||
|         int64 timestamp). | ||||
|         """ | ||||
|         (start, end) = self._get_times(start, end) | ||||
|  | ||||
|         # Check path and get layout | ||||
|         streams = self.db.stream_list(path = path) | ||||
|         if len(streams) != 1: | ||||
|             raise cherrypy.HTTPError("404 Not Found", "No such stream") | ||||
|         if len(self.db.stream_list(path = path)) != 1: | ||||
|             raise cherrypy.HTTPError("404", "No such stream: " + path) | ||||
|  | ||||
|         if binary: | ||||
|             content_type = "application/octet-stream" | ||||
|             if markup or count: | ||||
|                 raise cherrypy.HTTPError("400", "can't mix binary and " | ||||
|                                          "markup or count modes") | ||||
|         else: | ||||
|             content_type = "text/plain" | ||||
|         cherrypy.response.headers['Content-Type'] = content_type | ||||
|  | ||||
|         @workaround_cp_bug_1200 | ||||
|         def content(start, end, count): | ||||
|         def content(start, end): | ||||
|             # Note: disable chunked responses to see tracebacks from here. | ||||
|             if count: | ||||
|                 matched = self.db.stream_extract(path, start, end, count) | ||||
|                 matched = self.db.stream_extract(path, start, end, | ||||
|                                                  count = True) | ||||
|                 yield sprintf("%d\n", matched) | ||||
|                 return | ||||
|  | ||||
|             while True: | ||||
|                 (data, restart) = self.db.stream_extract(path, start, end) | ||||
|                 (data, restart) = self.db.stream_extract( | ||||
|                     path, start, end, count = False, | ||||
|                     markup = markup, binary = binary) | ||||
|                 yield data | ||||
|  | ||||
|                 if restart is None: | ||||
|                     return | ||||
|                 start = restart | ||||
|         return content(start, end, count) | ||||
|         return content(start, end) | ||||
|  | ||||
| class Exiter(object): | ||||
|     """App that exits the server, for testing""" | ||||
| @@ -460,7 +480,8 @@ class Server(object): | ||||
|                  stoppable = False,       # whether /exit URL exists | ||||
|                  embedded = True,         # hide diagnostics and output, etc | ||||
|                  fast_shutdown = False,   # don't wait for clients to disconn. | ||||
|                  force_traceback = False  # include traceback in all errors | ||||
|                  force_traceback = False, # include traceback in all errors | ||||
|                  basepath = '',           # base URL path for cherrypy.tree | ||||
|                  ): | ||||
|         # Save server version, just for verification during tests | ||||
|         self.version = nilmdb.__version__ | ||||
| @@ -520,7 +541,7 @@ class Server(object): | ||||
|         if stoppable: | ||||
|             root.exit = Exiter() | ||||
|         cherrypy.tree.apps = {} | ||||
|         cherrypy.tree.mount(root, "/", config = { "/" : app_config }) | ||||
|         cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) | ||||
|  | ||||
|         # Shutdowns normally wait for clients to disconnect.  To speed | ||||
|         # up tests, set fast_shutdown = True | ||||
| @@ -530,6 +551,9 @@ class Server(object): | ||||
|         else: | ||||
|             cherrypy.server.shutdown_timeout = 5 | ||||
|  | ||||
|         # Set up the WSGI application pointer for external programs | ||||
|         self.wsgi_application = cherrypy.tree | ||||
|  | ||||
|     def json_error_page(self, status, message, traceback, version): | ||||
|         """Return a custom error page in JSON so the client can parse it""" | ||||
|         errordata = { "status" : status, | ||||
| @@ -596,3 +620,55 @@ class Server(object): | ||||
|  | ||||
|     def stop(self): | ||||
|         cherrypy.engine.exit() | ||||
|  | ||||
| # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server | ||||
| # instance since the database can only be opened once.  For this to | ||||
| # work, the web server must use only a single process and single | ||||
| # Python interpreter.  Multiple threads are OK. | ||||
| _wsgi_server = None | ||||
| def wsgi_application(dbpath, basepath): # pragma: no cover | ||||
|     """Return a WSGI application object with a database at the | ||||
|     specified path. | ||||
|  | ||||
|     'dbpath' is a filesystem location, e.g. /home/nilm/db | ||||
|  | ||||
|     'basepath' is the URL path of the application base, which | ||||
|     is the same as the first argument to Apache's WSGIScriptAlias | ||||
|     directive. | ||||
|     """ | ||||
|     def application(environ, start_response): | ||||
|         global _wsgi_server | ||||
|         if _wsgi_server is None: | ||||
|             # Try to start the server | ||||
|             try: | ||||
|                 db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath) | ||||
|                 _wsgi_server = nilmdb.server.Server( | ||||
|                     db, embedded = True, | ||||
|                     basepath = basepath.rstrip('/')) | ||||
|             except Exception: | ||||
|                 # Build an error message on failure | ||||
|                 import pprint | ||||
|                 err = sprintf("Initializing database at path '%s' failed:\n\n", | ||||
|                               dbpath) | ||||
|                 err += traceback.format_exc() | ||||
|                 try: | ||||
|                     import pwd | ||||
|                     import grp | ||||
|                     err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) " | ||||
|                                    "on host %s, pid %d\n", | ||||
|                                    os.getuid(), pwd.getpwuid(os.getuid())[0], | ||||
|                                    os.getgid(), grp.getgrgid(os.getgid())[0], | ||||
|                                    socket.gethostname(), os.getpid()) | ||||
|                 except ImportError: | ||||
|                     pass | ||||
|                 err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ)) | ||||
|         if _wsgi_server is None: | ||||
|             # Serve up the error with our own mini WSGI app. | ||||
|             headers = [ ('Content-type', 'text/plain'), | ||||
|                         ('Content-length', str(len(err))) ] | ||||
|             start_response("500 Internal Server Error", headers) | ||||
|             return [err] | ||||
|  | ||||
|         # Call the normal application | ||||
|         return _wsgi_server.wsgi_application(environ, start_response) | ||||
|     return application | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| """NilmDB utilities""" | ||||
|  | ||||
| from __future__ import absolute_import | ||||
| from nilmdb.utils.timer import Timer | ||||
| from nilmdb.utils.iteratorizer import Iteratorizer | ||||
| from nilmdb.utils.serializer import serializer_proxy | ||||
| from nilmdb.utils.lrucache import lru_cache | ||||
| from nilmdb.utils.diskusage import du, human_size | ||||
| @@ -12,3 +12,4 @@ import nilmdb.utils.fallocate | ||||
| import nilmdb.utils.time | ||||
| import nilmdb.utils.iterator | ||||
| import nilmdb.utils.interval | ||||
| import nilmdb.utils.lock | ||||
|   | ||||
| @@ -1,100 +0,0 @@ | ||||
| import Queue | ||||
| import threading | ||||
| import sys | ||||
| import contextlib | ||||
|  | ||||
| # This file provides a context manager that converts a function | ||||
| # that takes a callback into a generator that returns an iterable. | ||||
| # This is done by running the function in a new thread. | ||||
|  | ||||
| # Based partially on http://stackoverflow.com/questions/9968592/ | ||||
|  | ||||
| class IteratorizerThread(threading.Thread): | ||||
|     def __init__(self, queue, function, curl_hack): | ||||
|         """ | ||||
|         function: function to execute, which takes the | ||||
|         callback (provided by this class) as an argument | ||||
|         """ | ||||
|         threading.Thread.__init__(self) | ||||
|         self.name = "Iteratorizer-" + function.__name__ + "-" + self.name | ||||
|         self.function = function | ||||
|         self.queue = queue | ||||
|         self.die = False | ||||
|         self.curl_hack = curl_hack | ||||
|  | ||||
|     def callback(self, data): | ||||
|         try: | ||||
|             if self.die: | ||||
|                 raise Exception() # trigger termination | ||||
|             self.queue.put((1, data)) | ||||
|         except: | ||||
|             if self.curl_hack: | ||||
|                 # We can't raise exceptions, because the pycurl | ||||
|                 # extension module will unconditionally print the | ||||
|                 # exception itself, and not pass it up to the caller. | ||||
|                 # Instead, just return a value that tells curl to | ||||
|                 # abort.  (-1 would be best, in case we were given 0 | ||||
|                 # bytes, but the extension doesn't support that). | ||||
|                 self.queue.put((2, sys.exc_info())) | ||||
|                 return 0 | ||||
|             raise | ||||
|  | ||||
|     def run(self): | ||||
|         try: | ||||
|             result = self.function(self.callback) | ||||
|         except: | ||||
|             self.queue.put((2, sys.exc_info())) | ||||
|         else: | ||||
|             self.queue.put((0, result)) | ||||
|  | ||||
| @contextlib.contextmanager | ||||
| def Iteratorizer(function, curl_hack = False): | ||||
|     """ | ||||
|     Context manager that takes a function expecting a callback, | ||||
|     and provides an iterable that yields the values passed to that | ||||
|     callback instead. | ||||
|  | ||||
|     function: function to execute, which takes a callback | ||||
|     (provided by this context manager) as an argument | ||||
|  | ||||
|         with iteratorizer(func) as it: | ||||
|             for i in it: | ||||
|                 print 'callback was passed:', i | ||||
|         print 'function returned:', it.retval | ||||
|     """ | ||||
|     queue = Queue.Queue(maxsize = 1) | ||||
|     thread = IteratorizerThread(queue, function, curl_hack) | ||||
|     thread.daemon = True | ||||
|     thread.start() | ||||
|  | ||||
|     class iteratorizer_gen(object): | ||||
|         def __init__(self, queue): | ||||
|             self.queue = queue | ||||
|             self.retval = None | ||||
|  | ||||
|         def __iter__(self): | ||||
|             return self | ||||
|  | ||||
|         def next(self): | ||||
|             (typ, data) = self.queue.get() | ||||
|             if typ == 0: | ||||
|                 # function has returned | ||||
|                 self.retval = data | ||||
|                 raise StopIteration | ||||
|             elif typ == 1: | ||||
|                 # data is available | ||||
|                 return data | ||||
|             else: | ||||
|                 # callback raised an exception | ||||
|                 raise data[0], data[1], data[2] | ||||
|  | ||||
|     try: | ||||
|         yield iteratorizer_gen(queue) | ||||
|     finally: | ||||
|         # Ask the thread to die, if it's still running. | ||||
|         thread.die = True | ||||
|         while thread.isAlive(): | ||||
|             try: | ||||
|                 queue.get(True, 0.01) | ||||
|             except: # pragma: no cover | ||||
|                 pass | ||||
							
								
								
									
										33
									
								
								nilmdb/utils/lock.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								nilmdb/utils/lock.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,33 @@ | ||||
| # File locking | ||||
|  | ||||
| import warnings | ||||
|  | ||||
| try: | ||||
|     import fcntl | ||||
|     import errno | ||||
|  | ||||
|     def exclusive_lock(f): | ||||
|         """Acquire an exclusive lock.  Returns True on successful | ||||
|         lock, or False on error.""" | ||||
|         try: | ||||
|             fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | ||||
|         except IOError as e: | ||||
|             if e.errno in (errno.EACCES, errno.EAGAIN): | ||||
|                 return False | ||||
|             else: # pragma: no cover | ||||
|                 raise | ||||
|         return True | ||||
|  | ||||
|     def exclusive_unlock(f): | ||||
|         """Release an exclusive lock.""" | ||||
|         fcntl.flock(f.fileno(), fcntl.LOCK_UN) | ||||
|  | ||||
| except ImportError: # pragma: no cover | ||||
|     def exclusive_lock(f): | ||||
|         """Dummy lock function -- does not lock!""" | ||||
|         warnings.warn("Pretending to lock " + str(f)) | ||||
|         return True | ||||
|  | ||||
|     def exclusive_unlock(f): | ||||
|         """Release an exclusive lock.""" | ||||
|         return | ||||
| @@ -15,7 +15,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False): | ||||
|         def wrap_class_method(wrapper): | ||||
|             try: | ||||
|                 orig = getattr(cls, wrapper.__name__).im_func | ||||
|             except: | ||||
|             except Exception: | ||||
|                 orig = lambda x: None | ||||
|             setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig)) | ||||
|  | ||||
|   | ||||
| @@ -6,7 +6,7 @@ import time | ||||
|  | ||||
| # Range | ||||
| min_timestamp = (-2**63) | ||||
| max_timestamp = (2**62 - 1) | ||||
| max_timestamp = (2**63 - 1) | ||||
|  | ||||
| # Smallest representable step | ||||
| epsilon = 1 | ||||
| @@ -32,6 +32,10 @@ def timestamp_to_human(timestamp): | ||||
|     """Convert a timestamp (integer microseconds since epoch) to a | ||||
|     human-readable string, using the local timezone for display | ||||
|     (e.g. from the TZ env var).""" | ||||
|     if timestamp == min_timestamp: | ||||
|         return "(minimum)" | ||||
|     if timestamp == max_timestamp: | ||||
|         return "(maximum)" | ||||
|     dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp)) | ||||
|     return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z") | ||||
|  | ||||
| @@ -65,6 +69,14 @@ def parse_time(toparse): | ||||
|     if toparse == "max": | ||||
|         return max_timestamp | ||||
|  | ||||
|     # If it starts with @, treat it as a NILM timestamp | ||||
|     # (integer microseconds since epoch) | ||||
|     try: | ||||
|         if toparse[0] == '@': | ||||
|             return int(toparse[1:]) | ||||
|     except (ValueError, KeyError, IndexError): | ||||
|         pass | ||||
|  | ||||
|     # If string isn't "now" and doesn't contain at least 4 digits, | ||||
|     # consider it invalid.  smartparse might otherwise accept | ||||
|     # empty strings and strings with just separators. | ||||
| @@ -78,14 +90,6 @@ def parse_time(toparse): | ||||
|     except (ValueError, OverflowError): | ||||
|         pass | ||||
|  | ||||
|     # If it starts with @, treat it as a NILM timestamp | ||||
|     # (integer microseconds since epoch) | ||||
|     try: | ||||
|         if toparse[0] == '@': | ||||
|             return int(toparse[1:]) | ||||
|     except (ValueError, KeyError): | ||||
|         pass | ||||
|  | ||||
|     # If it's parseable as a float, treat it as a Unix or NILM | ||||
|     # timestamp based on its range. | ||||
|     try: | ||||
|   | ||||
							
								
								
									
										4
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								setup.py
									
									
									
									
									
								
							| @@ -39,7 +39,7 @@ versioneer.parentdir_prefix = 'nilmdb-' | ||||
| # Hack to workaround logging/multiprocessing issue: | ||||
| # https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ | ||||
| try: import multiprocessing | ||||
| except: pass | ||||
| except Exception: pass | ||||
|  | ||||
| # Use Cython if it's new enough, otherwise use preexisting C files. | ||||
| cython_modules = [ 'nilmdb.server.interval', | ||||
| @@ -107,13 +107,13 @@ setup(name='nilmdb', | ||||
|       author_email = 'jim@jtan.com', | ||||
|       tests_require = [ 'nose', | ||||
|                         'coverage', | ||||
|                         'numpy', | ||||
|                         ], | ||||
|       setup_requires = [ 'distribute', | ||||
|                          ], | ||||
|       install_requires = [ 'decorator', | ||||
|                            'cherrypy >= 3.2', | ||||
|                            'simplejson', | ||||
|                            'pycurl', | ||||
|                            'python-dateutil', | ||||
|                            'pytz', | ||||
|                            'psutil >= 0.3.0', | ||||
|   | ||||
							
								
								
									
										28
									
								
								tests/data/extract-8
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								tests/data/extract-8
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | ||||
| # interval-start 1332496919900000 | ||||
| 1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03 | ||||
| 1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03 | ||||
| 1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03 | ||||
| 1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03 | ||||
| 1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03 | ||||
| 1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03 | ||||
| 1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03 | ||||
| 1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03 | ||||
| 1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03 | ||||
| 1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03 | ||||
| 1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03 | ||||
| 1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03 | ||||
| # interval-end 1332496919991668 | ||||
| # interval-start 1332496920000000 | ||||
| 1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03 | ||||
| 1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03 | ||||
| 1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03 | ||||
| 1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03 | ||||
| 1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03 | ||||
| 1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03 | ||||
| 1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03 | ||||
| 1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03 | ||||
| 1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03 | ||||
| 1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03 | ||||
| 1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03 | ||||
| 1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03 | ||||
| # interval-end 1332496920100000 | ||||
| @@ -24,7 +24,7 @@ class JimOrderPlugin(nose.plugins.Plugin): | ||||
|                     name, workingDir=loader.workingDir) | ||||
|                 try: | ||||
|                     order = os.path.join(addr.filename, "test.order") | ||||
|                 except: | ||||
|                 except Exception: | ||||
|                     order = None | ||||
|                 if order and os.path.exists(order): | ||||
|                     files = [] | ||||
|   | ||||
| @@ -4,7 +4,6 @@ test_lrucache.py | ||||
| test_mustclose.py | ||||
|  | ||||
| test_serializer.py | ||||
| test_iteratorizer.py | ||||
|  | ||||
| test_timestamper.py | ||||
| test_rbtree.py | ||||
| @@ -13,6 +12,7 @@ test_interval.py | ||||
| test_bulkdata.py | ||||
| test_nilmdb.py | ||||
| test_client.py | ||||
| test_numpyclient.py | ||||
| test_cmdline.py | ||||
|  | ||||
| test_*.py | ||||
|   | ||||
| @@ -30,6 +30,11 @@ class TestBulkData(object): | ||||
|         else: | ||||
|             data = BulkData(db, file_size = size, files_per_dir = files) | ||||
|  | ||||
|         # Try opening it again (should result in locking error) | ||||
|         with assert_raises(IOError) as e: | ||||
|             data2 = BulkData(db) | ||||
|         in_("already locked by another process", str(e.exception)) | ||||
|  | ||||
|         # create empty | ||||
|         with assert_raises(ValueError): | ||||
|             data.create("/foo", "uint16_8") | ||||
| @@ -64,9 +69,9 @@ class TestBulkData(object): | ||||
|         raw = [] | ||||
|         for i in range(1000): | ||||
|             raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) | ||||
|         node.append_string("".join(raw[0:1]), 0, 50000) | ||||
|         node.append_string("".join(raw[1:100]), 0, 50000) | ||||
|         node.append_string("".join(raw[100:]), 0, 50000) | ||||
|         node.append_data("".join(raw[0:1]), 0, 50000) | ||||
|         node.append_data("".join(raw[1:100]), 0, 50000) | ||||
|         node.append_data("".join(raw[100:]), 0, 50000) | ||||
|  | ||||
|         misc_slices = [ 0, 100, slice(None), slice(0), slice(10), | ||||
|                         slice(5,10), slice(3,None), slice(3,-3), | ||||
| @@ -80,7 +85,7 @@ class TestBulkData(object): | ||||
|         # Extract misc slices while appending, to make sure the | ||||
|         # data isn't being added in the middle of the file | ||||
|         for s in [2, slice(1,5), 2, slice(1,5)]: | ||||
|             node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000) | ||||
|             node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000) | ||||
|             raw.append("0 0 0 0 0 0 0 0 0\n") | ||||
|             eq_(get_node_slice(s), raw[s]) | ||||
|  | ||||
|   | ||||
| @@ -23,6 +23,7 @@ import warnings | ||||
| import resource | ||||
| import time | ||||
| import re | ||||
| import struct | ||||
|  | ||||
| from testutil.helpers import * | ||||
|  | ||||
| @@ -238,6 +239,22 @@ class TestClient(object): | ||||
|         in_("400 Bad Request", str(e.exception)) | ||||
|         in_("start must precede end", str(e.exception)) | ||||
|  | ||||
|         # Good content type | ||||
|         with assert_raises(ClientError) as e: | ||||
|             client.http.put("stream/insert", "", | ||||
|                             { "path": "xxxx", "start": 0, "end": 1, | ||||
|                               "binary": 1 }, | ||||
|                             binary = True) | ||||
|         in_("No such stream", str(e.exception)) | ||||
|  | ||||
|         # Bad content type | ||||
|         with assert_raises(ClientError) as e: | ||||
|             client.http.put("stream/insert", "", | ||||
|                             { "path": "xxxx", "start": 0, "end": 1, | ||||
|                               "binary": 1 }, | ||||
|                             binary = False) | ||||
|         in_("Content type must be application/octet-stream", str(e.exception)) | ||||
|  | ||||
|         # Specify start/end (starts too late) | ||||
|         data = timestamper.TimestamperRate(testfile, start, 120) | ||||
|         with assert_raises(ClientError) as e: | ||||
| @@ -293,6 +310,23 @@ class TestClient(object): | ||||
|         # Test count | ||||
|         eq_(client.stream_count("/newton/prep"), 14400) | ||||
|  | ||||
|         # Test binary output | ||||
|         with assert_raises(ClientError) as e: | ||||
|             list(client.stream_extract("/newton/prep", | ||||
|                                        markup = True, binary = True)) | ||||
|         with assert_raises(ClientError) as e: | ||||
|             list(client.stream_extract("/newton/prep", | ||||
|                                        count = True, binary = True)) | ||||
|         data = "".join(client.stream_extract("/newton/prep", binary = True)) | ||||
|         # Quick check using struct | ||||
|         unpacker = struct.Struct("<qffffffff") | ||||
|         out = [] | ||||
|         for i in range(14400): | ||||
|             out.append(unpacker.unpack_from(data, i * unpacker.size)) | ||||
|         eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375, | ||||
|                      2525.169921875, 8350.83984375, 3724.699951171875, | ||||
|                      1355.3399658203125, 2039.0)) | ||||
|  | ||||
|         client.close() | ||||
|  | ||||
|     def test_client_06_generators(self): | ||||
| @@ -311,11 +345,11 @@ class TestClient(object): | ||||
|  | ||||
|         # Trigger a curl error in generator | ||||
|         with assert_raises(ServerError) as e: | ||||
|             client.http.get_gen("http://nosuchurl/").next() | ||||
|             client.http.get_gen("http://nosuchurl.example.com./").next() | ||||
|  | ||||
|         # Trigger a curl error in generator | ||||
|         with assert_raises(ServerError) as e: | ||||
|             client.http.get_gen("http://nosuchurl/").next() | ||||
|             client.http.get_gen("http://nosuchurl.example.com./").next() | ||||
|  | ||||
|         # Check 404 for missing streams | ||||
|         for function in [ client.stream_intervals, client.stream_extract ]: | ||||
| @@ -365,6 +399,17 @@ class TestClient(object): | ||||
|             raise AssertionError("/stream/extract is not text/plain:\n" + | ||||
|                                  headers()) | ||||
|  | ||||
|         x = http.get("stream/extract", | ||||
|                             { "path": "/newton/prep", | ||||
|                               "start": "123", | ||||
|                               "end": "124", | ||||
|                               "binary": "1" }) | ||||
|         if "transfer-encoding: chunked" not in headers(): | ||||
|             warnings.warn("Non-chunked HTTP response for /stream/extract") | ||||
|         if "content-type: application/octet-stream" not in headers(): | ||||
|             raise AssertionError("/stream/extract is not binary:\n" + | ||||
|                                  headers()) | ||||
|  | ||||
|         client.close() | ||||
|  | ||||
|     def test_client_08_unicode(self): | ||||
| @@ -441,71 +486,75 @@ class TestClient(object): | ||||
|             # override _max_data to trigger frequent server updates | ||||
|             ctx._max_data = 15 | ||||
|  | ||||
|             ctx.insert("100 1\n") | ||||
|             ctx.insert("1000 1\n") | ||||
|  | ||||
|             ctx.insert("101 ") | ||||
|             ctx.insert("1\n102 1") | ||||
|             ctx.insert("1010 ") | ||||
|             ctx.insert("1\n1020 1") | ||||
|             ctx.insert("") | ||||
|             ctx.insert("\n103 1\n") | ||||
|             ctx.insert("\n1030 1\n") | ||||
|  | ||||
|             ctx.insert("104 1\n") | ||||
|             ctx.insert("1040 1\n") | ||||
|             ctx.insert("# hello\n") | ||||
|             ctx.insert("   # hello\n") | ||||
|             ctx.insert("  105 1\n") | ||||
|             ctx.insert("  1050 1\n") | ||||
|             ctx.finalize() | ||||
|  | ||||
|             ctx.insert("107 1\n") | ||||
|             ctx.update_end(108) | ||||
|             ctx.insert("1070 1\n") | ||||
|             ctx.update_end(1080) | ||||
|             ctx.finalize() | ||||
|             ctx.update_start(109) | ||||
|             ctx.insert("110 1\n") | ||||
|             ctx.insert("111 1\n") | ||||
|             ctx.insert("112 1\n") | ||||
|             ctx.insert("113 1\n") | ||||
|             ctx.insert("114 1\n") | ||||
|             ctx.update_end(116) | ||||
|             ctx.insert("115 1\n") | ||||
|             ctx.update_end(117) | ||||
|             ctx.insert("116 1\n") | ||||
|             ctx.update_end(118) | ||||
|             ctx.insert("117 1" + | ||||
|             ctx.update_start(1090) | ||||
|             ctx.insert("1100 1\n") | ||||
|             ctx.insert("1110 1\n") | ||||
|             ctx.send() | ||||
|             ctx.insert("1120 1\n") | ||||
|             ctx.insert("1130 1\n") | ||||
|             ctx.insert("1140 1\n") | ||||
|             ctx.update_end(1160) | ||||
|             ctx.insert("1150 1\n") | ||||
|             ctx.update_end(1170) | ||||
|             ctx.insert("1160 1\n") | ||||
|             ctx.update_end(1180) | ||||
|             ctx.insert("1170 1" + | ||||
|                        " # this is super long" * 100 + | ||||
|                        "\n") | ||||
|             ctx.finalize() | ||||
|             ctx.insert("# this is super long" * 100) | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_context("/context/test", 100, 200) as ctx: | ||||
|                 ctx.insert("118 1\n") | ||||
|             with client.stream_insert_context("/context/test", | ||||
|                                               1000, 2000) as ctx: | ||||
|                 ctx.insert("1180 1\n") | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_context("/context/test", 200, 300) as ctx: | ||||
|                 ctx.insert("118 1\n") | ||||
|             with client.stream_insert_context("/context/test", | ||||
|                                               2000, 3000) as ctx: | ||||
|                 ctx.insert("1180 1\n") | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_context("/context/test") as ctx: | ||||
|                 ctx.insert("bogus data\n") | ||||
|  | ||||
|         with client.stream_insert_context("/context/test", 200, 300) as ctx: | ||||
|         with client.stream_insert_context("/context/test", 2000, 3000) as ctx: | ||||
|             # make sure our override wasn't permanent | ||||
|             ne_(ctx._max_data, 15) | ||||
|             ctx.insert("225 1\n") | ||||
|             ctx.insert("2250 1\n") | ||||
|             ctx.finalize() | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_context("/context/test", 300, 400) as ctx: | ||||
|                 ctx.insert("301 1\n") | ||||
|                 ctx.insert("302 2\n") | ||||
|                 ctx.insert("303 3\n") | ||||
|                 ctx.insert("304 4\n") | ||||
|                 ctx.insert("304 4\n") # non-monotonic after a few lines | ||||
|             with client.stream_insert_context("/context/test", | ||||
|                                               3000, 4000) as ctx: | ||||
|                 ctx.insert("3010 1\n") | ||||
|                 ctx.insert("3020 2\n") | ||||
|                 ctx.insert("3030 3\n") | ||||
|                 ctx.insert("3040 4\n") | ||||
|                 ctx.insert("3040 4\n") # non-monotonic after a few lines | ||||
|                 ctx.finalize() | ||||
|  | ||||
|         eq_(list(client.stream_intervals("/context/test")), | ||||
|             [ [ 100, 106 ], | ||||
|               [ 107, 108 ], | ||||
|               [ 109, 118 ], | ||||
|               [ 200, 300 ] ]) | ||||
|             [ [ 1000, 1051 ], | ||||
|               [ 1070, 1080 ], | ||||
|               [ 1090, 1180 ], | ||||
|               [ 2000, 3000 ] ]) | ||||
|  | ||||
|         # destroy stream (try without removing data first) | ||||
|         with assert_raises(ClientError): | ||||
| @@ -619,7 +668,7 @@ class TestClient(object): | ||||
|                     poolmanager = c.http._last_response.connection.poolmanager | ||||
|                     pool = poolmanager.pools[('http','localhost',32180)] | ||||
|                     return (pool.num_connections, pool.num_requests) | ||||
|                 except: | ||||
|                 except Exception: | ||||
|                     raise SkipTest("can't get connection info") | ||||
|  | ||||
|             # First request makes a connection | ||||
|   | ||||
| @@ -369,6 +369,8 @@ class TestCmdline(object): | ||||
|         self.contain("No stream at path") | ||||
|         self.fail("metadata /newton/nosuchstream --set foo=bar") | ||||
|         self.contain("No stream at path") | ||||
|         self.fail("metadata /newton/nosuchstream --delete") | ||||
|         self.contain("No stream at path") | ||||
|  | ||||
|         self.ok("metadata /newton/prep") | ||||
|         self.match("description=The Data\nv_scale=1.234\n") | ||||
| @@ -394,6 +396,19 @@ class TestCmdline(object): | ||||
|         self.fail("metadata /newton/nosuchpath") | ||||
|         self.contain("No stream at path /newton/nosuchpath") | ||||
|  | ||||
|         self.ok("metadata /newton/prep --delete") | ||||
|         self.ok("metadata /newton/prep --get") | ||||
|         self.match("") | ||||
|         self.ok("metadata /newton/prep --set " | ||||
|                 "'description=The Data' " | ||||
|                 "v_scale=1.234") | ||||
|         self.ok("metadata /newton/prep --delete v_scale") | ||||
|         self.ok("metadata /newton/prep --get") | ||||
|         self.match("description=The Data\n") | ||||
|         self.ok("metadata /newton/prep --set description=") | ||||
|         self.ok("metadata /newton/prep --get") | ||||
|         self.match("") | ||||
|  | ||||
|     def test_06_insert(self): | ||||
|         self.ok("insert --help") | ||||
|  | ||||
| @@ -596,13 +611,20 @@ class TestCmdline(object): | ||||
|         test(6, "10:00:30", "10:00:31", extra="-b") | ||||
|         test(7, "10:00:30", "10:00:30.999", extra="-a -T") | ||||
|         test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw") | ||||
|         test(8, "10:01:59.9", "10:02:00.1", extra="--markup") | ||||
|         test(8, "10:01:59.9", "10:02:00.1", extra="-m") | ||||
|  | ||||
|         # all data put in by tests | ||||
|         self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01") | ||||
|         self.ok("extract -a /newton/prep --start min --end max") | ||||
|         lines_(self.captured, 43204) | ||||
|         self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") | ||||
|         self.match("43200\n") | ||||
|  | ||||
|         # markup for 3 intervals, plus extra markup lines whenever we had | ||||
|         # a "restart" from the nilmdb.stream_extract function | ||||
|         self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01") | ||||
|         lines_(self.captured, 43210) | ||||
|  | ||||
|     def test_09_truncated(self): | ||||
|         # Test truncated responses by overriding the nilmdb max_results | ||||
|         server_stop() | ||||
| @@ -1031,10 +1053,12 @@ class TestCmdline(object): | ||||
|         self.contain("old and new paths are the same") | ||||
|         check_path("newton", "prep") | ||||
|         self.fail("rename /newton/prep /newton") | ||||
|         self.contain("subdirs of this path already exist") | ||||
|         self.contain("path must contain at least one folder") | ||||
|         self.fail("rename /newton/prep /newton/prep/") | ||||
|         self.contain("invalid path") | ||||
|         self.ok("rename /newton/prep /newton/foo") | ||||
|         self.ok("rename /newton/prep /newton/foo/1") | ||||
|         check_path("newton", "foo", "1") | ||||
|         self.ok("rename /newton/foo/1 /newton/foo") | ||||
|         check_path("newton", "foo") | ||||
|         self.ok("rename /newton/foo /totally/different/thing") | ||||
|         check_path("totally", "different", "thing") | ||||
|   | ||||
| @@ -385,7 +385,6 @@ class TestIntervalSpeed: | ||||
|     def test_interval_speed(self): | ||||
|         import yappi | ||||
|         import time | ||||
|         import testutil.aplotter as aplotter | ||||
|         import random | ||||
|         import math | ||||
|  | ||||
| @@ -406,6 +405,5 @@ class TestIntervalSpeed: | ||||
|                    speed/j, | ||||
|                    speed / (j*math.log(j))) # should be constant | ||||
|             speeds[j] = speed | ||||
|         aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True) | ||||
|         yappi.stop() | ||||
|         yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10) | ||||
|   | ||||
| @@ -1,61 +0,0 @@ | ||||
| import nilmdb | ||||
| from nilmdb.utils.printf import * | ||||
|  | ||||
| import nose | ||||
| from nose.tools import * | ||||
| from nose.tools import assert_raises | ||||
| import threading | ||||
| import time | ||||
|  | ||||
| from testutil.helpers import * | ||||
|  | ||||
| def func_with_callback(a, b, callback): | ||||
|     callback(a) | ||||
|     callback(b) | ||||
|     callback(a+b) | ||||
|     return "return value" | ||||
|  | ||||
| class TestIteratorizer(object): | ||||
|     def test(self): | ||||
|  | ||||
|         # First try it with a normal callback | ||||
|         self.result = "" | ||||
|         def cb(x): | ||||
|             self.result += str(x) | ||||
|         func_with_callback(1, 2, cb) | ||||
|         eq_(self.result, "123") | ||||
|  | ||||
|         # Now make it an iterator | ||||
|         result = "" | ||||
|         f = lambda x: func_with_callback(1, 2, x) | ||||
|         with nilmdb.utils.Iteratorizer(f) as it: | ||||
|             for i in it: | ||||
|                 result += str(i) | ||||
|         eq_(result, "123") | ||||
|         eq_(it.retval, "return value") | ||||
|  | ||||
|         # Make sure things work when an exception occurs | ||||
|         result = "" | ||||
|         with nilmdb.utils.Iteratorizer( | ||||
|             lambda x: func_with_callback(1, "a", x)) as it: | ||||
|             with assert_raises(TypeError) as e: | ||||
|                 for i in it: | ||||
|                     result += str(i) | ||||
|         eq_(result, "1a") | ||||
|  | ||||
|         # Now try to trigger the case where we stop iterating | ||||
|         # mid-generator, and expect the iteratorizer to clean up after | ||||
|         # itself.  This doesn't have a particular result in the test, | ||||
|         # but gains coverage. | ||||
|         def foo(): | ||||
|             with nilmdb.utils.Iteratorizer(f) as it: | ||||
|                 it.next() | ||||
|         foo() | ||||
|         eq_(it.retval, None) | ||||
|  | ||||
|         # Do the same thing when the curl hack is applied | ||||
|         def foo(): | ||||
|             with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it: | ||||
|                 it.next() | ||||
|         foo() | ||||
|         eq_(it.retval, None) | ||||
| @@ -28,9 +28,6 @@ class Test00Nilmdb(object):  # named 00 so it runs first | ||||
|     def test_NilmDB(self): | ||||
|         recursive_unlink(testdb) | ||||
|  | ||||
|         with assert_raises(IOError): | ||||
|             nilmdb.server.NilmDB("/nonexistant-db/foo") | ||||
|  | ||||
|         db = nilmdb.server.NilmDB(testdb) | ||||
|         db.close() | ||||
|         db = nilmdb.server.NilmDB(testdb) | ||||
| @@ -93,13 +90,16 @@ class Test00Nilmdb(object):  # named 00 so it runs first | ||||
|         eq_(db.stream_get_metadata("/newton/prep"), meta1) | ||||
|         eq_(db.stream_get_metadata("/newton/raw"), meta1) | ||||
|  | ||||
|         # fill in some test coverage for start >= end | ||||
|         # fill in some misc. test coverage | ||||
|         with assert_raises(nilmdb.server.NilmDBError): | ||||
|             db.stream_remove("/newton/prep", 0, 0) | ||||
|         with assert_raises(nilmdb.server.NilmDBError): | ||||
|             db.stream_remove("/newton/prep", 1, 0) | ||||
|         db.stream_remove("/newton/prep", 0, 1) | ||||
|  | ||||
|         with assert_raises(nilmdb.server.NilmDBError): | ||||
|             db.stream_extract("/newton/prep", count = True, binary = True) | ||||
|  | ||||
|         db.close() | ||||
|  | ||||
| class TestBlockingServer(object): | ||||
|   | ||||
							
								
								
									
										333
									
								
								tests/test_numpyclient.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										333
									
								
								tests/test_numpyclient.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,333 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
|  | ||||
| import nilmdb.server | ||||
| import nilmdb.client | ||||
| import nilmdb.client.numpyclient | ||||
|  | ||||
| from nilmdb.utils.printf import * | ||||
| from nilmdb.utils import timestamper | ||||
| from nilmdb.client import ClientError, ServerError | ||||
| from nilmdb.utils import datetime_tz | ||||
|  | ||||
| from nose.plugins.skip import SkipTest | ||||
| from nose.tools import * | ||||
| from nose.tools import assert_raises | ||||
| import itertools | ||||
| import distutils.version | ||||
|  | ||||
| from testutil.helpers import * | ||||
|  | ||||
| import numpy as np | ||||
|  | ||||
| testdb = "tests/numpyclient-testdb" | ||||
| testurl = "http://localhost:32180/" | ||||
|  | ||||
| def setup_module(): | ||||
|     global test_server, test_db | ||||
|     # Clear out DB | ||||
|     recursive_unlink(testdb) | ||||
|  | ||||
|     # Start web app on a custom port | ||||
|     test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) | ||||
|     test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", | ||||
|                                        port = 32180, stoppable = False, | ||||
|                                        fast_shutdown = True, | ||||
|                                        force_traceback = True) | ||||
|     test_server.start(blocking = False) | ||||
|  | ||||
| def teardown_module(): | ||||
|     global test_server, test_db | ||||
|     # Close web app | ||||
|     test_server.stop() | ||||
|     test_db.close() | ||||
|  | ||||
| class TestNumpyClient(object): | ||||
|  | ||||
|     def test_numpyclient_01_basic(self): | ||||
|         # Test basic connection | ||||
|         client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | ||||
|         version = client.version() | ||||
|         eq_(distutils.version.LooseVersion(version), | ||||
|             distutils.version.LooseVersion(test_server.version)) | ||||
|  | ||||
|         # Verify subclassing | ||||
|         assert(isinstance(client, nilmdb.client.Client)) | ||||
|  | ||||
|         # Layouts | ||||
|         for layout in "int8_t", "something_8", "integer_1": | ||||
|             with assert_raises(ValueError): | ||||
|                 for x in client.stream_extract_numpy("/foo", layout=layout): | ||||
|                     pass | ||||
|         for layout in "int8_1", "uint8_30", "int16_20", "float64_100": | ||||
|             with assert_raises(ClientError) as e: | ||||
|                 for x in client.stream_extract_numpy("/foo", layout=layout): | ||||
|                     pass | ||||
|             in_("No such stream", str(e.exception)) | ||||
|  | ||||
|         with assert_raises(ClientError) as e: | ||||
|             for x in client.stream_extract_numpy("/foo"): | ||||
|                 pass | ||||
|         in_("can't get layout for path", str(e.exception)) | ||||
|  | ||||
|         client.close() | ||||
|  | ||||
|     def test_numpyclient_02_extract(self): | ||||
|         client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | ||||
|  | ||||
|         # Insert some data as text | ||||
|         client.stream_create("/newton/prep", "float32_8") | ||||
|         testfile = "tests/data/prep-20120323T1000" | ||||
|         start = nilmdb.utils.time.parse_time("20120323T1000") | ||||
|         rate = 120 | ||||
|         data = timestamper.TimestamperRate(testfile, start, rate) | ||||
|         result = client.stream_insert("/newton/prep", data, | ||||
|                                       start, start + 119999777) | ||||
|  | ||||
|         # Extract Numpy arrays | ||||
|         array = None | ||||
|         pieces = 0 | ||||
|         for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000): | ||||
|             pieces += 1 | ||||
|             if array is not None: | ||||
|                 array = np.vstack((array, chunk)) | ||||
|             else: | ||||
|                 array = chunk | ||||
|         eq_(array.shape, (14400, 9)) | ||||
|         eq_(pieces, 15) | ||||
|  | ||||
|         # Try structured | ||||
|         s = list(client.stream_extract_numpy("/newton/prep", structured = True)) | ||||
|         assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array)) | ||||
|  | ||||
|         # Compare.  Will be close but not exact because the conversion | ||||
|         # to and from ASCII was lossy. | ||||
|         data = timestamper.TimestamperRate(testfile, start, rate) | ||||
|         actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9) | ||||
|         assert(np.allclose(array, actual)) | ||||
|  | ||||
|         client.close() | ||||
|  | ||||
|     def test_numpyclient_03_insert(self): | ||||
|         client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | ||||
|  | ||||
|         # Limit _max_data just to get better coverage | ||||
|         old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data | ||||
|         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000 | ||||
|  | ||||
|         client.stream_create("/test/1", "uint16_1") | ||||
|         client.stream_insert_numpy("/test/1", | ||||
|                                    np.array([[0, 1], | ||||
|                                              [1, 2], | ||||
|                                              [2, 3], | ||||
|                                              [3, 4]])) | ||||
|  | ||||
|         # Wrong number of dimensions | ||||
|         with assert_raises(ValueError) as e: | ||||
|             client.stream_insert_numpy("/test/1", | ||||
|                                        np.array([[[0, 1], | ||||
|                                                   [1, 2]], | ||||
|                                                  [[3, 4], | ||||
|                                                   [4, 5]]])) | ||||
|         in_("wrong number of dimensions", str(e.exception)) | ||||
|  | ||||
|         # Unstructured | ||||
|         client.stream_create("/test/2", "float32_8") | ||||
|         client.stream_insert_numpy( | ||||
|             "/test/2", | ||||
|             client.stream_extract_numpy( | ||||
|                 "/newton/prep", structured = False, maxrows = 1000)) | ||||
|  | ||||
|         # Structured, and specifying layout | ||||
|         client.stream_create("/test/3", "float32_8") | ||||
|         client.stream_insert_numpy( | ||||
|             path = "/test/3", layout = "float32_8", | ||||
|             data = client.stream_extract_numpy( | ||||
|                 "/newton/prep", structured = True, maxrows = 1000)) | ||||
|  | ||||
|         # Structured, specifying wrong layout | ||||
|         client.stream_create("/test/4", "float32_8") | ||||
|         with assert_raises(ValueError) as e: | ||||
|             client.stream_insert_numpy( | ||||
|                 "/test/4", layout = "uint16_1", | ||||
|                 data = client.stream_extract_numpy( | ||||
|                     "/newton/prep", structured = True, maxrows = 1000)) | ||||
|         in_("wrong dtype", str(e.exception)) | ||||
|  | ||||
|         # Unstructured, and specifying wrong layout | ||||
|         client.stream_create("/test/5", "float32_8") | ||||
|         with assert_raises(ClientError) as e: | ||||
|             client.stream_insert_numpy( | ||||
|                 "/test/5", layout = "uint16_8", | ||||
|                 data = client.stream_extract_numpy( | ||||
|                     "/newton/prep", structured = False, maxrows = 1000)) | ||||
|         # timestamps will be screwy here, because data will be parsed wrong | ||||
|         in_("error parsing input data", str(e.exception)) | ||||
|  | ||||
|         # Make sure the /newton/prep copies are identical | ||||
|         a = np.vstack(client.stream_extract_numpy("/newton/prep")) | ||||
|         b = np.vstack(client.stream_extract_numpy("/test/2")) | ||||
|         c = np.vstack(client.stream_extract_numpy("/test/3")) | ||||
|         assert(np.array_equal(a,b)) | ||||
|         assert(np.array_equal(a,c)) | ||||
|  | ||||
|         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data | ||||
|         client.close() | ||||
|  | ||||
|     def test_numpyclient_04_context(self): | ||||
|         # Like test_client_context, but with Numpy data | ||||
|         client = nilmdb.client.numpyclient.NumpyClient(testurl) | ||||
|  | ||||
|         client.stream_create("/context/test", "uint16_1") | ||||
|         with client.stream_insert_numpy_context("/context/test") as ctx: | ||||
|             # override _max_rows to trigger frequent server updates | ||||
|             ctx._max_rows = 2 | ||||
|             ctx.insert([[1000, 1]]) | ||||
|             ctx.insert([[1010, 1], [1020, 1], [1030, 1]]) | ||||
|             ctx.insert([[1040, 1], [1050, 1]]) | ||||
|             ctx.finalize() | ||||
|             ctx.insert([[1070, 1]]) | ||||
|             ctx.update_end(1080) | ||||
|             ctx.finalize() | ||||
|             ctx.update_start(1090) | ||||
|             ctx.insert([[1100, 1]]) | ||||
|             ctx.insert([[1110, 1]]) | ||||
|             ctx.send() | ||||
|             ctx.insert([[1120, 1], [1130, 1], [1140, 1]]) | ||||
|             ctx.update_end(1160) | ||||
|             ctx.insert([[1150, 1]]) | ||||
|             ctx.update_end(1170) | ||||
|             ctx.insert([[1160, 1]]) | ||||
|             ctx.update_end(1180) | ||||
|             ctx.insert([[1170, 123456789.0]]) | ||||
|             ctx.finalize() | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_numpy_context("/context/test", | ||||
|                                                     1000, 2000) as ctx: | ||||
|                 ctx.insert([[1180, 1]]) | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_numpy_context("/context/test", | ||||
|                                                     2000, 3000) as ctx: | ||||
|                 ctx._max_rows = 2 | ||||
|                 ctx.insert([[3180, 1]]) | ||||
|                 ctx.insert([[3181, 1]]) | ||||
|  | ||||
|         with client.stream_insert_numpy_context("/context/test", | ||||
|                                                 2000, 3000) as ctx: | ||||
|             # make sure our override wasn't permanent | ||||
|             ne_(ctx._max_rows, 2) | ||||
|             ctx.insert([[2250, 1]]) | ||||
|             ctx.finalize() | ||||
|  | ||||
|         with assert_raises(ClientError): | ||||
|             with client.stream_insert_numpy_context("/context/test", | ||||
|                                                     3000, 4000) as ctx: | ||||
|                 ctx.insert([[3010, 1]]) | ||||
|                 ctx.insert([[3020, 2]]) | ||||
|                 ctx.insert([[3030, 3]]) | ||||
|                 ctx.insert([[3040, 4]]) | ||||
|                 ctx.insert([[3040, 4]]) # non-monotonic after a few lines | ||||
|                 ctx.finalize() | ||||
|  | ||||
|         eq_(list(client.stream_intervals("/context/test")), | ||||
|             [ [ 1000, 1051 ], | ||||
|               [ 1070, 1080 ], | ||||
|               [ 1090, 1180 ], | ||||
|               [ 2000, 3000 ] ]) | ||||
|  | ||||
|         client.stream_remove("/context/test") | ||||
|         client.stream_destroy("/context/test") | ||||
|         client.close() | ||||
|  | ||||
|     def test_numpyclient_05_emptyintervals(self): | ||||
|         # Like test_client_emptyintervals, with insert_numpy_context | ||||
|         client = nilmdb.client.numpyclient.NumpyClient(testurl) | ||||
|         client.stream_create("/empty/test", "uint16_1") | ||||
|         def info(): | ||||
|             result = [] | ||||
|             for interval in list(client.stream_intervals("/empty/test")): | ||||
|                 result.append((client.stream_count("/empty/test", *interval), | ||||
|                                interval)) | ||||
|             return result | ||||
|         eq_(info(), []) | ||||
|  | ||||
|         # Insert a region with just a few points | ||||
|         with client.stream_insert_numpy_context("/empty/test") as ctx: | ||||
|             ctx.update_start(100) | ||||
|             ctx.insert([[140, 1]]) | ||||
|             ctx.insert([[150, 1]]) | ||||
|             ctx.insert([[160, 1]]) | ||||
|             ctx.update_end(200) | ||||
|             ctx.finalize() | ||||
|         eq_(info(), [(3, [100, 200])]) | ||||
|  | ||||
|         # Delete chunk, which will leave one data point and two intervals | ||||
|         client.stream_remove("/empty/test", 145, 175) | ||||
|         eq_(info(), [(1, [100, 145]), | ||||
|                      (0, [175, 200])]) | ||||
|  | ||||
|         # Try also creating a completely empty interval from scratch, | ||||
|         # in a few different ways. | ||||
|         client.stream_insert("/empty/test", "", 300, 350) | ||||
|         client.stream_insert("/empty/test", [], 400, 450) | ||||
|         with client.stream_insert_numpy_context("/empty/test", 500, 550): | ||||
|             pass | ||||
|  | ||||
|         # If enough timestamps aren't provided, empty streams won't be created. | ||||
|         client.stream_insert("/empty/test", []) | ||||
|         with client.stream_insert_numpy_context("/empty/test"): | ||||
|             pass | ||||
|         client.stream_insert("/empty/test", [], start = 600) | ||||
|         with client.stream_insert_numpy_context("/empty/test", start = 700): | ||||
|             pass | ||||
|         client.stream_insert("/empty/test", [], end = 850) | ||||
|         with client.stream_insert_numpy_context("/empty/test", end = 950): | ||||
|             pass | ||||
|  | ||||
|         # Try various things that might cause problems | ||||
|         with client.stream_insert_numpy_context("/empty/test", 1000, 1050): | ||||
|             ctx.finalize() # inserts [1000, 1050] | ||||
|             ctx.finalize() # nothing | ||||
|             ctx.finalize() # nothing | ||||
|             ctx.insert([[1100, 1]]) | ||||
|             ctx.finalize() # inserts [1100, 1101] | ||||
|             ctx.update_start(1199) | ||||
|             ctx.insert([[1200, 1]]) | ||||
|             ctx.update_end(1250) | ||||
|             ctx.finalize() # inserts [1199, 1250] | ||||
|             ctx.update_start(1299) | ||||
|             ctx.finalize() # nothing | ||||
|             ctx.update_end(1350) | ||||
|             ctx.finalize() # nothing | ||||
|             ctx.update_start(1400) | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|             ctx.update_end(1450) | ||||
|             ctx.finalize() | ||||
|             ctx.update_start(1500) | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|             ctx.update_end(1550) | ||||
|             ctx.finalize() | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|             ctx.insert(np.zeros((0,2))) | ||||
|             ctx.finalize() | ||||
|  | ||||
|         # Check everything | ||||
|         eq_(info(), [(1, [100, 145]), | ||||
|                      (0, [175, 200]), | ||||
|                      (0, [300, 350]), | ||||
|                      (0, [400, 450]), | ||||
|                      (0, [500, 550]), | ||||
|                      (0, [1000, 1050]), | ||||
|                      (1, [1100, 1101]), | ||||
|                      (1, [1199, 1250]), | ||||
|                      (0, [1400, 1450]), | ||||
|                      (0, [1500, 1550]), | ||||
|                      ]) | ||||
|  | ||||
|         # Clean up | ||||
|         client.stream_remove("/empty/test") | ||||
|         client.stream_destroy("/empty/test") | ||||
|         client.close() | ||||
| @@ -18,7 +18,7 @@ class TestPrintf(object): | ||||
|             printf("hello, world: %d", 123) | ||||
|             fprintf(test2, "hello too: %d", 123) | ||||
|             test3 = sprintf("hello three: %d", 123) | ||||
|         except: | ||||
|         except Exception: | ||||
|             sys.stdout = old_stdout | ||||
|             raise | ||||
|         sys.stdout = old_stdout | ||||
|   | ||||
| @@ -1,419 +0,0 @@ | ||||
|  | ||||
| #----------------------------------------------- | ||||
| #aplotter.py - ascii art function plotter | ||||
| #Copyright (c) 2006, Imri Goldberg | ||||
| #All rights reserved. | ||||
| # | ||||
| #Redistribution and use in source and binary forms, | ||||
| #with or without modification, are permitted provided | ||||
| #that the following conditions are met: | ||||
| # | ||||
| #    * Redistributions of source code must retain the | ||||
| #		above copyright notice, this list of conditions | ||||
| #		and the following disclaimer. | ||||
| #    * Redistributions in binary form must reproduce the | ||||
| #		above copyright notice, this list of conditions | ||||
| #		and the following disclaimer in the documentation | ||||
| #		and/or other materials provided with the distribution. | ||||
| #    * Neither the name of the <ORGANIZATION> nor the names of | ||||
| #		its contributors may be used to endorse or promote products | ||||
| #		derived from this software without specific prior written permission. | ||||
| # | ||||
| #THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||||
| #AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||||
| #IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | ||||
| #ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | ||||
| #LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||||
| #DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||||
| #SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||||
| #CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||||
| #OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||||
| #OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||||
| #----------------------------------------------- | ||||
|  | ||||
| import math | ||||
|  | ||||
|  | ||||
| EPSILON = 0.000001 | ||||
|  | ||||
| def transposed(mat): | ||||
| 	result = [] | ||||
| 	for i in xrange(len(mat[0])): | ||||
| 		result.append([x[i] for x in mat]) | ||||
| 	return result | ||||
|  | ||||
| def y_reversed(mat):     | ||||
| 	result = [] | ||||
| 	for i in range(len(mat)): | ||||
| 		result.append(list(reversed(mat[i]))) | ||||
| 	return result | ||||
|  | ||||
| def sign(x): | ||||
| 	if 0<x: | ||||
| 		return 1 | ||||
| 	if 0 == x: | ||||
| 		return 0 | ||||
| 	return -1 | ||||
|  | ||||
| class Plotter(object): | ||||
|  | ||||
| 	class PlotData(object): | ||||
| 		def __init__(self, x_size, y_size, min_x, max_x, min_y, max_y, x_mod, y_mod): | ||||
| 			self.x_size = x_size | ||||
| 			self.y_size = y_size | ||||
| 			self.min_x = min_x | ||||
| 			self.max_x = max_x | ||||
| 			self.min_y = min_y | ||||
| 			self.max_y = max_y | ||||
| 			self.x_mod = x_mod | ||||
| 			self.y_mod = y_mod | ||||
|  | ||||
| 			self.x_step = float(max_x - min_x)/float(self.x_size) | ||||
| 			self.y_step = float(max_y - min_y)/float(self.y_size) | ||||
| 			self.inv_x_step = 1/self.x_step | ||||
| 			self.inv_y_step = 1/self.y_step | ||||
|  | ||||
| 			self.ratio = self.y_step / self.x_step | ||||
| 		def __repr__(self): | ||||
| 			s = "size: %s, bl: %s, tr: %s, step: %s" % ((self.x_size, self.y_size), (self.min_x, self.min_y), (self.max_x, self.max_y), | ||||
| 														 (self.x_step, self.y_step)) | ||||
| 			return s | ||||
| 	 | ||||
| 	def __init__(self, **kwargs): | ||||
|  | ||||
| 		self.x_size = kwargs.get("x_size", 80) | ||||
| 		self.y_size = kwargs.get("y_size", 20) | ||||
|  | ||||
| 		self.will_draw_axes = kwargs.get("draw_axes", True) | ||||
|  | ||||
| 		self.new_line = kwargs.get("newline", "\n") | ||||
|  | ||||
| 		self.dot = kwargs.get("dot", "*") | ||||
|  | ||||
| 		self.plot_slope = kwargs.get("plot_slope", True) | ||||
|  | ||||
| 		self.x_margin = kwargs.get("x_margin", 0.05) | ||||
| 		self.y_margin = kwargs.get("y_margin", 0.1) | ||||
|  | ||||
| 		self.will_plot_labels = kwargs.get("plot_labels", True) | ||||
|  | ||||
| 	@staticmethod | ||||
| 	def get_symbol_by_slope(slope, default_symbol): | ||||
| 		draw_symbol = default_symbol | ||||
| 		if slope > math.tan(3*math.pi/8): | ||||
| 			draw_symbol = "|" | ||||
| 		elif slope > math.tan(math.pi/8) and slope < math.tan(3*math.pi/8): | ||||
| 			draw_symbol = "/" | ||||
| 		elif abs(slope) < math.tan(math.pi/8): | ||||
| 			draw_symbol = "-" | ||||
| 		elif slope < math.tan(-math.pi/8) and slope > math.tan(-3*math.pi/8): | ||||
| 			draw_symbol = "\\" | ||||
| 		elif slope < math.tan(-3*math.pi/8): | ||||
| 			draw_symbol = "|" | ||||
| 		return draw_symbol     | ||||
|  | ||||
|  | ||||
| 	def plot_labels(self, output_buffer, plot_data): | ||||
| 		if plot_data.y_size < 2: | ||||
| 			return | ||||
|  | ||||
| 		margin_factor = 1 | ||||
|  | ||||
| 		do_plot_x_label = True | ||||
| 		do_plot_y_label = True | ||||
|  | ||||
| 		x_str = "%+g" | ||||
| 		if plot_data.x_size < 16: | ||||
| 			do_plot_x_label = False | ||||
| 		elif plot_data.x_size < 23: | ||||
| 			x_str = "%+.2g"  | ||||
|  | ||||
| 		y_str = "%+g"     | ||||
| 		if plot_data.x_size < 8: | ||||
| 			do_plot_y_label = False | ||||
| 		elif plot_data.x_size < 11: | ||||
| 			y_str = "%+.2g" | ||||
| 			 | ||||
| 		act_min_x = (plot_data.min_x + plot_data.x_mod*margin_factor) | ||||
| 		act_max_x = (plot_data.max_x - plot_data.x_mod*margin_factor) | ||||
| 		act_min_y = (plot_data.min_y + plot_data.y_mod*margin_factor) | ||||
| 		act_max_y = (plot_data.max_y - plot_data.y_mod*margin_factor) | ||||
|  | ||||
| 		if abs(act_min_x) < 1: | ||||
| 			min_x_str = "%+.2g" % act_min_x | ||||
| 		else: | ||||
| 			min_x_str = x_str % act_min_x | ||||
|  | ||||
| 		if abs(act_max_x) < 1: | ||||
| 			max_x_str = "%+.2g" % act_max_x | ||||
| 		else: | ||||
| 			max_x_str = x_str % act_max_x | ||||
| 		 | ||||
| 		if abs(act_min_y) < 1: | ||||
| 			min_y_str = "%+.2g" % act_min_y | ||||
| 		else: | ||||
| 			min_y_str = y_str % act_min_y | ||||
|  | ||||
| 		if abs(act_max_y) < 1: | ||||
| 			max_y_str = "%+.2g" % act_max_y | ||||
| 		else: | ||||
| 			max_y_str = y_str % act_max_y | ||||
| 						  | ||||
| 		min_x_coord = self.get_coord(act_min_x,plot_data.min_x,plot_data.x_step) | ||||
| 		max_x_coord = self.get_coord(act_max_x,plot_data.min_x,plot_data.x_step) | ||||
| 		min_y_coord = self.get_coord(act_min_y,plot_data.min_y,plot_data.y_step) | ||||
| 		max_y_coord = self.get_coord(act_max_y,plot_data.min_y,plot_data.y_step) | ||||
| 								  | ||||
|  | ||||
| 		#print plot_data | ||||
| 		 | ||||
| 		y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step) | ||||
|  | ||||
| 		#if plot_data.min_x < 0 and plot_data.max_x > 0: | ||||
| 		x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step) | ||||
| 		#else: | ||||
| 		 | ||||
| 		#pass | ||||
|  | ||||
| 		output_buffer[x_zero_coord][min_y_coord] = "+" | ||||
| 		output_buffer[x_zero_coord][max_y_coord] = "+" | ||||
| 		output_buffer[min_x_coord][y_zero_coord] = "+" | ||||
| 		output_buffer[max_x_coord][y_zero_coord] = "+" | ||||
|  | ||||
| 		if do_plot_x_label:         | ||||
|  | ||||
| 			for i,c in enumerate(min_x_str): | ||||
| 				output_buffer[min_x_coord+i][y_zero_coord-1] = c | ||||
| 			for i,c in enumerate(max_x_str): | ||||
| 				output_buffer[max_x_coord+i-len(max_x_str)][y_zero_coord-1] = c | ||||
|  | ||||
| 		if do_plot_y_label: | ||||
|  | ||||
| 			for i,c in enumerate(max_y_str): | ||||
| 				output_buffer[x_zero_coord+i][max_y_coord] = c | ||||
| 			for i,c in enumerate(min_y_str): | ||||
| 				output_buffer[x_zero_coord+i][min_y_coord] = c | ||||
| 			 | ||||
|  | ||||
| 		 | ||||
| 		 | ||||
| 	 | ||||
| 	def plot_data(self, xy_seq, output_buffer, plot_data): | ||||
| 		if self.plot_slope: | ||||
| 			xy_seq = list(xy_seq) | ||||
| 			#sort according to the x coord | ||||
| 			xy_seq.sort(key = lambda c: c[0]) | ||||
| 			prev_p = xy_seq[0] | ||||
| 			e_xy_seq = enumerate(xy_seq) | ||||
| 			e_xy_seq.next() | ||||
| 			for i,(x,y) in e_xy_seq: | ||||
| 				draw_symbol = self.dot | ||||
| 				line_drawn = self.plot_line(prev_p, (x,y), output_buffer, plot_data) | ||||
| 				prev_p = (x,y) | ||||
| 				if not line_drawn: | ||||
| 					if i > 0 and i < len(xy_seq)-1: | ||||
| 						px,py = xy_seq[i-1] | ||||
| 						nx,ny = xy_seq[i+1] | ||||
|  | ||||
| 						if abs(nx-px) > EPSILON: | ||||
| 							slope = (1.0/plot_data.ratio)*(ny-py)/(nx-px) | ||||
| 							draw_symbol = self.get_symbol_by_slope(slope, draw_symbol) | ||||
| 					if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y: | ||||
| 						continue | ||||
| 					 | ||||
| 					x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step) | ||||
| 					y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)             | ||||
| 					if x_coord >= 0 and x_coord < len(output_buffer) and y_coord >= 0 and y_coord < len(output_buffer[0]): | ||||
| 						if self.draw_axes: | ||||
| 							if y_coord == self.get_coord(0, plot_data.min_y, plot_data.y_step) and draw_symbol == "-": | ||||
| 								draw_symbol = "=" | ||||
| 						output_buffer[x_coord][y_coord] = draw_symbol | ||||
| 		else: | ||||
| 			for x,y in xy_seq: | ||||
| 				if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y: | ||||
| 					continue | ||||
| 				x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step) | ||||
| 				y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step) | ||||
| 				if x_coord >= 0 and x_coord < len(output_buffer) and y_coord > 0 and y_coord < len(output_buffer[0]): | ||||
| 					output_buffer[x_coord][y_coord] = self.dot | ||||
|  | ||||
|  | ||||
| 	def plot_line(self, start, end, output_buffer, plot_data): | ||||
|  | ||||
| 		start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step) | ||||
| 		end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step) | ||||
|  | ||||
| 		x0,y0 = start_coord | ||||
| 		x1,y1 = end_coord | ||||
| 		if (x0,y0) == (x1,y1): | ||||
| 			return True     | ||||
| 		 | ||||
| 		clipped_line = clip_line(start, end, (plot_data.min_x, plot_data.min_y), (plot_data.max_x, plot_data.max_y)) | ||||
| 		if clipped_line != None: | ||||
| 			start,end = clipped_line | ||||
| 		else: | ||||
| 			return False | ||||
| 		start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step) | ||||
| 		end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step) | ||||
|  | ||||
| 		x0,y0 = start_coord | ||||
| 		x1,y1 = end_coord | ||||
| 		if (x0,y0) == (x1,y1): | ||||
| 			return True | ||||
| 		x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step) | ||||
| 		y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)    | ||||
|  | ||||
| 		if start[0]-end[0] == 0: | ||||
| 			draw_symbol = "|" | ||||
| 		else: | ||||
| 			slope = (1.0/plot_data.ratio)*(end[1]-start[1])/(end[0]-start[0]) | ||||
| 			draw_symbol = self.get_symbol_by_slope(slope, self.dot) | ||||
| 		try: | ||||
|  | ||||
| 			delta = x1-x0, y1-y0 | ||||
| 			if abs(delta[0])>abs(delta[1]): | ||||
| 				s = sign(delta[0]) | ||||
| 				slope = float(delta[1])/delta[0] | ||||
| 				for i in range(0,abs(int(delta[0]))): | ||||
| 					cur_draw_symbol = draw_symbol | ||||
| 					x = i*s | ||||
| 					cur_y = int(y0+slope*x) | ||||
| 					if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-": | ||||
| 						cur_draw_symbol = "=" | ||||
| 					output_buffer[x0+x][cur_y] = cur_draw_symbol | ||||
| 				 | ||||
| 				 | ||||
| 			else: | ||||
| 				s = sign(delta[1]) | ||||
| 				slope = float(delta[0])/delta[1] | ||||
| 				for i in range(0,abs(int(delta[1]))): | ||||
| 					y = i*s | ||||
| 					cur_draw_symbol = draw_symbol | ||||
| 					cur_y = y0+y | ||||
| 					if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-": | ||||
| 						cur_draw_symbol = "=" | ||||
| 					output_buffer[int(x0+slope*y)][cur_y] = cur_draw_symbol | ||||
| 		except: | ||||
| 			print start, end | ||||
| 			print start_coord, end_coord | ||||
| 			print plot_data | ||||
| 			raise | ||||
|  | ||||
| 		return False             | ||||
| 		 | ||||
| 		 | ||||
| 	def plot_single(self, seq, min_x = None, max_x = None, min_y = None, max_y = None): | ||||
| 		return self.plot_double(range(len(seq)),seq, min_x, max_x, min_y, max_y) | ||||
| 		 | ||||
|  | ||||
|  | ||||
|  | ||||
| 	def plot_double(self, x_seq, y_seq, min_x = None, max_x = None, min_y = None, max_y = None): | ||||
| 		if min_x == None: | ||||
| 			min_x = min(x_seq) | ||||
| 		if max_x == None: | ||||
| 			max_x = max(x_seq) | ||||
| 		if min_y == None: | ||||
| 			min_y = min(y_seq) | ||||
| 		if max_y == None: | ||||
| 			max_y = max(y_seq) | ||||
|  | ||||
| 		if max_y == min_y: | ||||
| 			max_y += 1 | ||||
|  | ||||
| 		x_mod = (max_x-min_x)*self.x_margin | ||||
| 		y_mod = (max_y-min_y)*self.y_margin | ||||
| 		min_x-=x_mod | ||||
| 		max_x+=x_mod | ||||
| 		min_y-=y_mod | ||||
| 		max_y+=y_mod | ||||
|  | ||||
|  | ||||
| 		plot_data = self.PlotData(self.x_size, self.y_size, min_x, max_x, min_y, max_y, x_mod, y_mod) | ||||
|  | ||||
| 		output_buffer = [[" "]*self.y_size for i in range(self.x_size)] | ||||
|  | ||||
| 		if self.will_draw_axes: | ||||
| 			self.draw_axes(output_buffer, plot_data) | ||||
|  | ||||
| 		self.plot_data(zip(x_seq, y_seq), output_buffer, plot_data) | ||||
|  | ||||
| 		if self.will_plot_labels: | ||||
| 			self.plot_labels(output_buffer, plot_data) | ||||
|  | ||||
| 		trans_result = transposed(y_reversed(output_buffer)) | ||||
|  | ||||
| 		result = self.new_line.join(["".join(row) for row in trans_result]) | ||||
| 		return result | ||||
|  | ||||
| 	def draw_axes(self, output_buffer, plot_data): | ||||
| 		 | ||||
| 		 | ||||
| 		draw_x = False | ||||
| 		draw_y = False | ||||
|  | ||||
| 		if plot_data.min_x <= 0 and plot_data.max_x > 0: | ||||
| 			draw_y = True | ||||
| 			zero_x = self.get_coord(0, plot_data.min_x, plot_data.x_step) | ||||
| 			for y in xrange(plot_data.y_size): | ||||
| 				output_buffer[zero_x][y] = "|" | ||||
| 				 | ||||
| 		if plot_data.min_y <= 0 and plot_data.max_y > 0: | ||||
| 			draw_x = True | ||||
| 			zero_y = self.get_coord(0, plot_data.min_y, plot_data.y_step)     | ||||
| 			for x in xrange(plot_data.x_size): | ||||
| 				output_buffer[x][zero_y] = "-" | ||||
|  | ||||
| 		if draw_x and draw_y: | ||||
| 			output_buffer[zero_x][zero_y] = "+" | ||||
| 		 | ||||
| 		 | ||||
| 	@staticmethod | ||||
| 	def get_coord(val, min, step): | ||||
| 		result = int((val - min)/step) | ||||
| 		return result | ||||
|  | ||||
| def clip_line(line_pt_1, line_pt_2, rect_bottom_left, rect_top_right): | ||||
| 	ts = [0.0,1.0] | ||||
| 	if line_pt_1[0] == line_pt_2[0]: | ||||
| 		return ((line_pt_1[0], max(min(line_pt_1[1], line_pt_2[1]), rect_bottom_left[1])), | ||||
| 				(line_pt_1[0], min(max(line_pt_1[1], line_pt_2[1]), rect_top_right[1]))) | ||||
| 	if line_pt_1[1] == line_pt_2[1]: | ||||
| 		return ((max(min(line_pt_1[0], line_pt_2[0]), rect_bottom_left[0]), line_pt_1[1]), | ||||
| 				(min(max(line_pt_1[0], line_pt_2[0]), rect_top_right[0]), line_pt_1[1])) | ||||
|  | ||||
| 	if ((rect_bottom_left[0] <= line_pt_1[0] and line_pt_1[0] < rect_top_right[0]) and | ||||
| 		(rect_bottom_left[1] <= line_pt_1[1] and line_pt_1[1] < rect_top_right[1]) and | ||||
| 		(rect_bottom_left[0] <= line_pt_2[0] and line_pt_2[0] < rect_top_right[0]) and | ||||
| 		(rect_bottom_left[1] <= line_pt_2[1] and line_pt_2[1] < rect_top_right[1])): | ||||
| 		return line_pt_1, line_pt_2 | ||||
|  | ||||
| 	ts.append( float(rect_bottom_left[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) ) | ||||
| 	ts.append( float(rect_top_right[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) ) | ||||
| 	ts.append( float(rect_bottom_left[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) ) | ||||
| 	ts.append( float(rect_top_right[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) ) | ||||
| 	 | ||||
| 	ts.sort() | ||||
| 	if ts[2] < 0 or ts[2] >= 1 or ts[3] < 0 or ts[2]>= 1: | ||||
| 		return None | ||||
| 	result = [(pt_1 + t*(pt_2-pt_1)) for t in (ts[2],ts[3]) for (pt_1, pt_2) in zip(line_pt_1, line_pt_2)] | ||||
| 	return (result[0],result[1]), (result[2], result[3]) | ||||
| 	 | ||||
|  | ||||
|  | ||||
| def plot(*args,**flags): | ||||
| 	limit_flags_names = set(["min_x","min_y","max_x","max_y"]) | ||||
| 	limit_flags = dict([(n,flags[n]) for n in limit_flags_names & set(flags)]) | ||||
| 	settting_flags = dict([(n,flags[n]) for n in set(flags) - limit_flags_names]) | ||||
| 	 | ||||
| 	if len(args) == 1: | ||||
| 		p = Plotter(**settting_flags) | ||||
| 		print p.plot_single(args[0],**limit_flags) | ||||
| 	elif len(args) == 2: | ||||
| 		p = Plotter(**settting_flags) | ||||
| 		print p.plot_double(args[0],args[1],**limit_flags) | ||||
| 	else: | ||||
| 		raise NotImplementedError("can't draw multiple graphs yet") | ||||
| 	 | ||||
| __all__ = ["Plotter","plot"] | ||||
|  | ||||
		Reference in New Issue
	
	Block a user