Compare commits
	
		
			11 Commits
		
	
	
		
			nilmtools-
			...
			nilmtools-
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 5b67b68fd2 | |||
| 97503b73b9 | |||
| 4e64c804bf | |||
| 189fb9df3a | |||
| 3323c997a7 | |||
| e09153e34b | |||
| 5c56e9d075 | |||
| 60f09427cf | |||
| d6d31190eb | |||
| 2ec574c59d | |||
| 1988955671 | 
							
								
								
									
										3
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								Makefile
									
									
									
									
									
								
							| @@ -9,6 +9,9 @@ else | |||||||
| endif | endif | ||||||
|  |  | ||||||
| test: | test: | ||||||
|  | 	src/decimate.py | ||||||
|  |  | ||||||
|  | test_insert: | ||||||
| 	@make install >/dev/null | 	@make install >/dev/null | ||||||
| 	src/insert.py --file --dry-run  /test/foo </dev/null | 	src/insert.py --file --dry-run  /test/foo </dev/null | ||||||
|  |  | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ Prerequisites: | |||||||
|   sudo apt-get install python2.7 python2.7-dev python-setuptools |   sudo apt-get install python2.7 python2.7-dev python-setuptools | ||||||
|   sudo apt-get install python-numpy python-scipy python-matplotlib |   sudo apt-get install python-numpy python-scipy python-matplotlib | ||||||
|  |  | ||||||
|   nilmdb (1.3.1+) |   nilmdb (1.5.0+) | ||||||
|  |  | ||||||
| Install: | Install: | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @@ -61,7 +61,7 @@ setup(name='nilmtools', | |||||||
|       long_description = "NILM Database Tools", |       long_description = "NILM Database Tools", | ||||||
|       license = "Proprietary", |       license = "Proprietary", | ||||||
|       author_email = 'jim@jtan.com', |       author_email = 'jim@jtan.com', | ||||||
|       install_requires = [ 'nilmdb >= 1.4.6', |       install_requires = [ 'nilmdb >= 1.5.0', | ||||||
|                            'numpy', |                            'numpy', | ||||||
|                            'scipy', |                            'scipy', | ||||||
|                            'matplotlib', |                            'matplotlib', | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ | |||||||
|  |  | ||||||
| import nilmtools.filter | import nilmtools.filter | ||||||
| import nilmdb.client | import nilmdb.client | ||||||
|  | from nilmdb.client.numpyclient import NumpyClient | ||||||
| import numpy as np | import numpy as np | ||||||
| import sys | import sys | ||||||
|  |  | ||||||
| @@ -27,14 +28,14 @@ def main(argv = None): | |||||||
|     meta = f.client_src.stream_get_metadata(f.src.path) |     meta = f.client_src.stream_get_metadata(f.src.path) | ||||||
|     f.check_dest_metadata(meta) |     f.check_dest_metadata(meta) | ||||||
|  |  | ||||||
|     # Copy all rows of data as ASCII strings |     # Copy all rows of data using the faster Numpy interfaces | ||||||
|     extractor = nilmdb.client.Client(f.src.url).stream_extract |     extractor = NumpyClient(f.src.url).stream_extract_numpy | ||||||
|     inserter = nilmdb.client.Client(f.dest.url).stream_insert_context |     inserter = NumpyClient(f.dest.url).stream_insert_numpy_context | ||||||
|     for i in f.intervals(): |     for i in f.intervals(): | ||||||
|         print "Processing", f.interval_string(i) |         print "Processing", f.interval_string(i) | ||||||
|         with inserter(f.dest.path, i.start, i.end) as insert_ctx: |         with inserter(f.dest.path, i.start, i.end) as insert_ctx: | ||||||
|             for row in extractor(f.src.path, i.start, i.end): |             for data in extractor(f.src.path, i.start, i.end): | ||||||
|                 insert_ctx.insert(row + "\n") |                 insert_ctx.insert(data) | ||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     main() |     main() | ||||||
|   | |||||||
| @@ -41,41 +41,45 @@ def main(argv = None): | |||||||
|  |  | ||||||
|     # If source is decimated, we have to decimate a bit differently |     # If source is decimated, we have to decimate a bit differently | ||||||
|     if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath): |     if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath): | ||||||
|         n = f.src.layout_count // 3 |         again = True | ||||||
|         f.process_python(function = decimate_again, rows = args.factor, |  | ||||||
|                          args = (n,)) |  | ||||||
|     else: |     else: | ||||||
|         n = f.src.layout_count |         again = False | ||||||
|         f.process_python(function = decimate_first, rows = args.factor, |     f.process_numpy(decimate, args = (args.factor, again)) | ||||||
|                          args = (n,)) |  | ||||||
|  |  | ||||||
| def decimate_first(data, n): | def decimate(data, interval, args, insert_function, final): | ||||||
|     """Decimate original data -- result has 3 times as many columns""" |     """Decimate data""" | ||||||
|     # For this simple calculation, converting to a Numpy array |     (factor, again) = args | ||||||
|     # and doing the math is slower than just doing it directly. |     (n, m) = data.shape | ||||||
|     rows = iter(data) |  | ||||||
|     r_sum = r_min = r_max = rows.next() |  | ||||||
|     for row in rows: |  | ||||||
|         r_sum = map(operator.add, r_sum, row) |  | ||||||
|         r_min = map(min, r_min, row) |  | ||||||
|         r_max = map(max, r_max, row) |  | ||||||
|     r_mean = [ x / len(data) for x in r_sum ] |  | ||||||
|     return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ] |  | ||||||
|  |  | ||||||
| def decimate_again(data, n): |     # Figure out which columns to use as the source for mean, min, and max, | ||||||
|     """Decimate already-decimated data -- result has the same number |     # depending on whether this is the first decimation or we're decimating | ||||||
|     of columns""" |     # again.  Note that we include the timestamp in the means. | ||||||
|     rows = iter(data) |     if again: | ||||||
|     r = rows.next() |         c = (m - 1) // 3 | ||||||
|     r_sum = r[0:(n+1)] |         # e.g. c = 3 | ||||||
|     r_min = r[(n+1):(2*n+1)] |         # ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3 | ||||||
|     r_max = r[(2*n+1):(3*n+1)] |         mean_col = slice(0, c + 1) | ||||||
|     for r in rows: |         min_col = slice(c + 1, 2 * c + 1) | ||||||
|         r_sum = map(operator.add, r_sum, r[0:(n+1)]) |         max_col = slice(2 * c + 1, 3 * c + 1) | ||||||
|         r_min = map(min, r_min, r[(n+1):(2*n+1)]) |     else: | ||||||
|         r_max = map(max, r_max, r[(2*n+1):(3*n+1)]) |         mean_col = slice(0, m) | ||||||
|     r_mean = [ x / len(data) for x in r_sum ] |         min_col = slice(1, m) | ||||||
|     return [ r_mean + r_min + r_max ] |         max_col = slice(1, m) | ||||||
|  |  | ||||||
|  |     # Discard extra rows that aren't a multiple of factor | ||||||
|  |     n = n // factor * factor | ||||||
|  |     data = data[:n,:] | ||||||
|  |  | ||||||
|  |     # Reshape it into 3D so we can process 'factor' rows at a time | ||||||
|  |     data = data.reshape(n // factor, factor, m) | ||||||
|  |  | ||||||
|  |     # Fill the result | ||||||
|  |     out = np.c_[ np.mean(data[:,:,mean_col], axis=1), | ||||||
|  |                  np.min(data[:,:,min_col], axis=1), | ||||||
|  |                  np.max(data[:,:,max_col], axis=1) ] | ||||||
|  |  | ||||||
|  |     insert_function(out) | ||||||
|  |     return n | ||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     main() |     main() | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ from __future__ import absolute_import | |||||||
|  |  | ||||||
| import nilmdb.client | import nilmdb.client | ||||||
| from nilmdb.client import Client | from nilmdb.client import Client | ||||||
|  | from nilmdb.client.numpyclient import NumpyClient | ||||||
| from nilmdb.utils.printf import * | from nilmdb.utils.printf import * | ||||||
| from nilmdb.utils.time import (parse_time, timestamp_to_human, | from nilmdb.utils.time import (parse_time, timestamp_to_human, | ||||||
|                                timestamp_to_seconds) |                                timestamp_to_seconds) | ||||||
| @@ -247,72 +248,7 @@ class Filter(object): | |||||||
|         # All good -- write the metadata in case it's not already there |         # All good -- write the metadata in case it's not already there | ||||||
|         self._client_dest.stream_update_metadata(self.dest.path, data) |         self._client_dest.stream_update_metadata(self.dest.path, data) | ||||||
|  |  | ||||||
|     # Main processing helper |     # The main filter processing method. | ||||||
|     def process_python(self, function, rows, args = None, partial = False): |  | ||||||
|         """Process data in chunks of 'rows' data at a time. |  | ||||||
|  |  | ||||||
|         This provides data as nested Python lists and expects the same |  | ||||||
|         back. |  | ||||||
|  |  | ||||||
|         function: function to process the data |  | ||||||
|         rows: maximum number of rows to pass to 'function' at once |  | ||||||
|         args: tuple containing extra arguments to pass to 'function' |  | ||||||
|         partial: if true, less than 'rows' may be passed to 'function'. |  | ||||||
|                  if false, partial data at the end of an interval will |  | ||||||
|                  be dropped. |  | ||||||
|  |  | ||||||
|         'function' should be defined like: |  | ||||||
|             function(data, *args) |  | ||||||
|         It will be passed a list containing up to 'rows' rows of |  | ||||||
|         data from the source stream, and any arguments passed in |  | ||||||
|         'args'.  It should transform the data as desired, and return a |  | ||||||
|         new list of rdata, which will be inserted into the destination |  | ||||||
|         stream. |  | ||||||
|         """ |  | ||||||
|         if args is None: |  | ||||||
|             args = [] |  | ||||||
|         extractor = Client(self.src.url).stream_extract |  | ||||||
|         inserter = Client(self.dest.url).stream_insert_context |  | ||||||
|  |  | ||||||
|         # Parse input data.  We use homogenous types for now, which |  | ||||||
|         # means the timestamp type will be either float or int. |  | ||||||
|         if "int" in self.src.layout_type: |  | ||||||
|             parser = lambda line: [ int(x) for x in line.split() ] |  | ||||||
|         else: |  | ||||||
|             parser = lambda line: [ float(x) for x in line.split() ] |  | ||||||
|  |  | ||||||
|         # Format output data. |  | ||||||
|         formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" |  | ||||||
|  |  | ||||||
|         for interval in self.intervals(): |  | ||||||
|             print "Processing", self.interval_string(interval) |  | ||||||
|             with inserter(self.dest.path, |  | ||||||
|                           interval.start, interval.end) as insert_ctx: |  | ||||||
|                 src_array = [] |  | ||||||
|                 for line in extractor(self.src.path, |  | ||||||
|                                       interval.start, interval.end): |  | ||||||
|                     # Read in data |  | ||||||
|                     src_array.append([ float(x) for x in line.split() ]) |  | ||||||
|  |  | ||||||
|                     if len(src_array) == rows: |  | ||||||
|                         # Pass through filter function |  | ||||||
|                         dest_array = function(src_array, *args) |  | ||||||
|  |  | ||||||
|                         # Write result to destination |  | ||||||
|                         out = [ formatter(row) for row in dest_array ] |  | ||||||
|                         insert_ctx.insert("".join(out)) |  | ||||||
|  |  | ||||||
|                         # Clear source array |  | ||||||
|                         src_array = [] |  | ||||||
|  |  | ||||||
|                 # Take care of partial chunk |  | ||||||
|                 if len(src_array) and partial: |  | ||||||
|                     dest_array = function(src_array, *args) |  | ||||||
|                     out = [ formatter(row) for row in dest_array ] |  | ||||||
|                     insert_ctx.insert("".join(out)) |  | ||||||
|  |  | ||||||
|     # Like process_python, but provides Numpy arrays and allows for |  | ||||||
|     # partial processing. |  | ||||||
|     def process_numpy(self, function, args = None, rows = 100000): |     def process_numpy(self, function, args = None, rows = 100000): | ||||||
|         """For all intervals that exist in self.src but don't exist in |         """For all intervals that exist in self.src but don't exist in | ||||||
|         self.dest, call 'function' with a Numpy array corresponding to |         self.dest, call 'function' with a Numpy array corresponding to | ||||||
| @@ -342,8 +278,8 @@ class Filter(object): | |||||||
|         """ |         """ | ||||||
|         if args is None: |         if args is None: | ||||||
|             args = [] |             args = [] | ||||||
|         extractor = Client(self.src.url).stream_extract |         extractor = NumpyClient(self.src.url).stream_extract_numpy | ||||||
|         inserter = Client(self.dest.url).stream_insert_context |         inserter = NumpyClient(self.dest.url).stream_insert_numpy_context | ||||||
|  |  | ||||||
|         # Format output data. |         # Format output data. | ||||||
|         formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" |         formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" | ||||||
| @@ -357,19 +293,12 @@ class Filter(object): | |||||||
|             print "Processing", self.interval_string(interval) |             print "Processing", self.interval_string(interval) | ||||||
|             with inserter(self.dest.path, |             with inserter(self.dest.path, | ||||||
|                           interval.start, interval.end) as insert_ctx: |                           interval.start, interval.end) as insert_ctx: | ||||||
|                 def insert_function(array): |                 insert_function = insert_ctx.insert | ||||||
|                     s = cStringIO.StringIO() |  | ||||||
|                     if len(np.shape(array)) != 2: |  | ||||||
|                         raise Exception("array must be 2-dimensional") |  | ||||||
|                     np.savetxt(s, array) |  | ||||||
|                     insert_ctx.insert(s.getvalue()) |  | ||||||
|  |  | ||||||
|                 extract = extractor(self.src.path, interval.start, interval.end) |  | ||||||
|                 old_array = np.array([]) |                 old_array = np.array([]) | ||||||
|                 for batched in batch(extract, rows): |                 for new_array in extractor(self.src.path, | ||||||
|                     # Read in this batch of data |                                            interval.start, interval.end, | ||||||
|                     new_array = np.loadtxt(batched) |                                            layout = self.src.layout, | ||||||
|  |                                            maxrows = rows): | ||||||
|                     # If we still had old data left, combine it |                     # If we still had old data left, combine it | ||||||
|                     if old_array.shape[0] != 0: |                     if old_array.shape[0] != 0: | ||||||
|                         array = np.vstack((old_array, new_array)) |                         array = np.vstack((old_array, new_array)) | ||||||
|   | |||||||
| @@ -132,9 +132,10 @@ def main(argv = None): | |||||||
|     data_ts_base = 0 |     data_ts_base = 0 | ||||||
|     data_ts_inc = 0 |     data_ts_inc = 0 | ||||||
|     data_ts_rate = args.rate |     data_ts_rate = args.rate | ||||||
|  |     data_ts_delta = 0 | ||||||
|     def get_data_ts(): |     def get_data_ts(): | ||||||
|         if args.delta: |         if args.delta: | ||||||
|             return data_ts_base |             return data_ts_base + data_ts_delta | ||||||
|         else: |         else: | ||||||
|             return data_ts_base + rate_to_period(data_ts_rate, |             return data_ts_base + rate_to_period(data_ts_rate, | ||||||
|                                                  data_ts_inc) |                                                  data_ts_inc) | ||||||
| @@ -207,12 +208,12 @@ def main(argv = None): | |||||||
|                         pass |                         pass | ||||||
|                     continue |                     continue | ||||||
|  |  | ||||||
|                 # If --delta mode, increment data_ts_base by the delta |                 # If --delta mode, increment data_ts_delta by the | ||||||
|                 # from the file. |                 # delta from the file. | ||||||
|                 if args.delta: |                 if args.delta: | ||||||
|                     try: |                     try: | ||||||
|                         (delta, line) = line.split(None, 1) |                         (delta, line) = line.split(None, 1) | ||||||
|                         data_ts_base += float(delta) |                         data_ts_delta += float(delta) | ||||||
|                     except ValueError: |                     except ValueError: | ||||||
|                         raise ParseError(filename, "can't parse delta") |                         raise ParseError(filename, "can't parse delta") | ||||||
|  |  | ||||||
| @@ -247,7 +248,7 @@ def main(argv = None): | |||||||
|                                    timestamp_to_human(clock_ts)) |                                    timestamp_to_human(clock_ts)) | ||||||
|                         stream.finalize() |                         stream.finalize() | ||||||
|                         data_ts_base = data_ts = clock_ts |                         data_ts_base = data_ts = clock_ts | ||||||
|                         data_ts_inc = 0 |                         data_ts_inc = data_ts_delta = 0 | ||||||
|  |  | ||||||
|                     # Don't use this clock time anymore until we update it |                     # Don't use this clock time anymore until we update it | ||||||
|                     clock_ts = None |                     clock_ts = None | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ import nilmdb.client | |||||||
| from numpy import * | from numpy import * | ||||||
| import scipy.fftpack | import scipy.fftpack | ||||||
| import scipy.signal | import scipy.signal | ||||||
| from matplotlib import pyplot as p | #from matplotlib import pyplot as p | ||||||
| import bisect | import bisect | ||||||
|  |  | ||||||
| def main(argv = None): | def main(argv = None): | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user