|
- #!/usr/bin/python
-
- # Miscellaenous useful mathematical functions
- from nilmdb.utils.printf import *
- from numpy import *
- from scipy import *
-
- def sfit4(data, fs):
- """(A, f0, phi, C) = sfit4(data, fs)
-
- Compute 4-parameter (unknown-frequency) least-squares fit to
- sine-wave data, according to IEEE Std 1241-2010 Annex B
-
- Input:
- data vector of input samples
- fs sampling rate (Hz)
-
- Output:
- Parameters [A, f0, phi, C] to fit the equation
- x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
- where n is sample number. Or, as a function of time:
- x(t) = A * sin(f0 * 2 * pi * t + phi) + C
-
- by Jim Paris
- (Verified to match sfit4.m)
- """
- N = len(data)
- t = linspace(0, (N-1) / float(fs), N)
-
- ## Estimate frequency using FFT (step b)
- Fc = fft(data)
- F = abs(Fc)
- F[0] = 0 # eliminate DC
-
- # Find pair of spectral lines with largest amplitude:
- # resulting values are in F(i) and F(i+1)
- i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
-
- # Interpolate FFT to get a better result (from Markus [B37])
- U1 = real(Fc[i])
- U2 = real(Fc[i+1])
- V1 = imag(Fc[i])
- V2 = imag(Fc[i+1])
- n = 2 * pi / N
- ni1 = n * i
- ni2 = n * (i+1)
- K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
- Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
- Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
- i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
-
- # Convert to Hz
- f0 = i * float(fs) / N
-
- # Fit it. We'll catch exceptions here and just returns zeros
- # if something fails with the least squares fit, etc.
- try:
- # first guess for A0, B0 using 3-parameter fit (step c)
- s = zeros(3)
- w = 2*pi*f0
-
- # Now iterate 7 times (step b, plus 6 iterations of step i)
- for idx in range(7):
- D = c_[cos(w*t), sin(w*t), ones(N),
- -s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
- s = linalg.lstsq(D, data)[0] # eqn B.18
- w = w + s[3] # update frequency estimate
-
- ## Extract results
- A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
- f0 = w / (2*pi)
- phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
- C = s[2]
- return (A, f0, phi, C)
- except Exception as e:
- # something broke down; just return zeros
- return (0, 0, 0, 0)
-
- def peak_detect(data, delta = 0.1):
- """Simple min/max peak detection algorithm, taken from my code
- in the disagg.m from the 10-8-5 paper.
-
- Returns an array of peaks: each peak is a tuple
- (n, p, is_max)
- where n is the row number in 'data', and p is 'data[n]',
- and is_max is True if this is a maximum, False if it's a minimum,
- """
- peaks = [];
- cur_min = (None, inf)
- cur_max = (None, -inf)
- lookformax = False
- for (n, p) in enumerate(data):
- if p > cur_max[1]:
- cur_max = (n, p)
- if p < cur_min[1]:
- cur_min = (n, p)
- if lookformax:
- if p < (cur_max[1] - delta):
- peaks.append((cur_max[0], cur_max[1], True))
- cur_min = (n, p)
- lookformax = False
- else:
- if p > (cur_min[1] + delta):
- peaks.append((cur_min[0], cur_min[1], False))
- cur_max = (n, p)
- lookformax = True
- return peaks
|