You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

124 lines
3.8 KiB

  1. #!/usr/bin/env python3
  2. # Miscellaenous useful mathematical functions
  3. from nilmdb.utils.printf import *
  4. from numpy import *
  5. import scipy
  6. def numpy_raise_errors(func):
  7. def wrap(*args, **kwargs):
  8. old = seterr('raise')
  9. try:
  10. return func(*args, **kwargs)
  11. finally:
  12. seterr(**old)
  13. return wrap
  14. @numpy_raise_errors
  15. def sfit4(data, fs):
  16. """(A, f0, phi, C) = sfit4(data, fs)
  17. Compute 4-parameter (unknown-frequency) least-squares fit to
  18. sine-wave data, according to IEEE Std 1241-2010 Annex B
  19. Input:
  20. data vector of input samples
  21. fs sampling rate (Hz)
  22. Output:
  23. Parameters [A, f0, phi, C] to fit the equation
  24. x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
  25. where n is sample number. Or, as a function of time:
  26. x(t) = A * sin(f0 * 2 * pi * t + phi) + C
  27. by Jim Paris
  28. (Verified to match sfit4.m)
  29. """
  30. N = len(data)
  31. if N < 2:
  32. raise ValueError("bad data")
  33. t = linspace(0, (N-1) / float(fs), N)
  34. ## Estimate frequency using FFT (step b)
  35. Fc = scipy.fft.fft(data)
  36. F = abs(Fc)
  37. F[0] = 0 # eliminate DC
  38. # Find pair of spectral lines with largest amplitude:
  39. # resulting values are in F(i) and F(i+1)
  40. i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
  41. # Interpolate FFT to get a better result (from Markus [B37])
  42. try:
  43. U1 = real(Fc[i])
  44. U2 = real(Fc[i+1])
  45. V1 = imag(Fc[i])
  46. V2 = imag(Fc[i+1])
  47. n = 2 * pi / N
  48. ni1 = n * i
  49. ni2 = n * (i+1)
  50. K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
  51. Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
  52. Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
  53. i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
  54. except Exception:
  55. # Just go with the biggest FFT peak
  56. i = argmax(F[0:int(N/2)])
  57. # Convert to Hz
  58. f0 = i * float(fs) / N
  59. # Fit it. We'll catch exceptions here and just returns zeros
  60. # if something fails with the least squares fit, etc.
  61. try:
  62. # first guess for A0, B0 using 3-parameter fit (step c)
  63. s = zeros(3)
  64. w = 2*pi*f0
  65. # Now iterate 7 times (step b, plus 6 iterations of step i)
  66. for idx in range(7):
  67. D = c_[cos(w*t), sin(w*t), ones(N),
  68. -s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
  69. s = linalg.lstsq(D, data, rcond=None)[0] # eqn B.18
  70. w = w + s[3] # update frequency estimate
  71. ## Extract results
  72. A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
  73. f0 = w / (2*pi)
  74. phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
  75. C = s[2]
  76. return (A, f0, phi, C)
  77. except Exception as e: # pragma: no cover (not sure if we can hit this?)
  78. # something broke down; just return zeros
  79. return (0, 0, 0, 0)
  80. def peak_detect(data, delta = 0.1):
  81. """Simple min/max peak detection algorithm, taken from my code
  82. in the disagg.m from the 10-8-5 paper.
  83. Returns an array of peaks: each peak is a tuple
  84. (n, p, is_max)
  85. where n is the row number in 'data', and p is 'data[n]',
  86. and is_max is True if this is a maximum, False if it's a minimum,
  87. """
  88. peaks = [];
  89. cur_min = (None, inf)
  90. cur_max = (None, -inf)
  91. lookformax = False
  92. for (n, p) in enumerate(data):
  93. if p > cur_max[1]:
  94. cur_max = (n, p)
  95. if p < cur_min[1]:
  96. cur_min = (n, p)
  97. if lookformax:
  98. if p < (cur_max[1] - delta):
  99. peaks.append((cur_max[0], cur_max[1], True))
  100. cur_min = (n, p)
  101. lookformax = False
  102. else:
  103. if p > (cur_min[1] + delta):
  104. peaks.append((cur_min[0], cur_min[1], False))
  105. cur_max = (n, p)
  106. lookformax = True
  107. return peaks