You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

253 lines
9.2 KiB

  1. #!/usr/bin/python
  2. from nilmdb.utils.printf import *
  3. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  4. timestamp_to_seconds, seconds_to_timestamp)
  5. from nilmdb.utils.diskusage import human_size
  6. from nilmdb.utils.interval import Interval
  7. import nilmdb.client
  8. import nilmdb.client.numpyclient
  9. import nilmtools
  10. import argparse
  11. import ConfigParser
  12. import sys
  13. import collections
  14. import fnmatch
  15. import re
  16. def warn(msg, *args):
  17. fprintf(sys.stderr, "warning: " + msg + "\n", *args)
  18. class TimePeriod(object):
  19. _units = { 'h': ('hour', 60*60*24),
  20. 'd': ('day', 60*60*24),
  21. 'w': ('week', 60*60*24*7),
  22. 'm': ('month', 60*60*24*30),
  23. 'y': ('year', 60*60*24*365) }
  24. def __init__(self, val):
  25. for u in self._units:
  26. if val.endswith(u):
  27. self.unit = self._units[u][0]
  28. self.scale = self._units[u][1]
  29. self.count = float(val[:-len(u)])
  30. break
  31. else:
  32. raise ValueError("unknown units: " + units)
  33. def seconds(self):
  34. return self.count * self.scale
  35. def describe_seconds(self, seconds):
  36. count = seconds / self.scale
  37. units = self.unit if count == 1 else (self.unit + "s")
  38. if count == int(count):
  39. return sprintf("%d %s", count, units)
  40. else:
  41. return sprintf("%.2f %s", count, units)
  42. def __str__(self):
  43. return self.describe_seconds(self.seconds())
  44. class StreamCleanupConfig(object):
  45. def __init__(self, info):
  46. self.path = info[0]
  47. self.layout = info[1]
  48. if info[4] != 0 and info[5] != 0:
  49. self.rate = info[4] / timestamp_to_seconds(info[5])
  50. else:
  51. self.rate = None
  52. self.keep = None
  53. self.clean_decimated = True
  54. self.decimated_from = None
  55. self.also_clean_paths = []
  56. def main(argv = None):
  57. parser = argparse.ArgumentParser(
  58. formatter_class = argparse.RawDescriptionHelpFormatter,
  59. version = nilmtools.__version__,
  60. description = """\
  61. Clean up old data from streams using a configuration file to specify
  62. which data to remove.
  63. The format of the config file is as follows:
  64. [/stream/path]
  65. keep = 3w # keep up to 3 weeks of data
  66. rate = 8000 # optional, used for the --estimate option
  67. decimated = false # whether to delete decimated data too (default true)
  68. [*/prep]
  69. keep = 3.5m # or 2520h or 105d or 15w or 0.29y
  70. The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
  71. 'm' for months, or 'y' for years.
  72. Streams paths may include wildcards. If a path is matched by more than
  73. one config section, data from the last config section counts.
  74. Decimated streams (paths containing '~decim-') are treated specially:
  75. - They don't match wildcards
  76. - When deleting data from a parent stream, data is also deleted
  77. from its decimated streams, unless decimated=false
  78. Rate is optional and is only used for the --estimate option.
  79. """)
  80. parser.add_argument("-u", "--url", action="store",
  81. default="http://localhost/nilmdb/",
  82. help="NilmDB server URL (default: %(default)s)")
  83. parser.add_argument("-D", "--dry-run", action="store_true",
  84. default = False,
  85. help="Don't actually remove any data")
  86. parser.add_argument("-e", "--estimate", action="store_true",
  87. default = False,
  88. help="Estimate how much disk space will be used")
  89. parser.add_argument("configfile", type=argparse.FileType('r'),
  90. help="Configuration file")
  91. args = parser.parse_args(argv)
  92. # Parse config file
  93. config = ConfigParser.RawConfigParser()
  94. config.readfp(args.configfile)
  95. # List all streams
  96. client = nilmdb.client.Client(args.url)
  97. streamlist = client.stream_list(extended = True)
  98. # Create config objects
  99. streams = collections.OrderedDict()
  100. for s in streamlist:
  101. streams[s[0]] = StreamCleanupConfig(s)
  102. m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
  103. if m:
  104. streams[s[0]].decimated_from = m.group(1)
  105. # Build up configuration
  106. for section in config.sections():
  107. matched = False
  108. for path in streams.iterkeys():
  109. # Decimated streams only allow exact matches
  110. if streams[path].decimated_from and path != section:
  111. continue
  112. if not fnmatch.fnmatch(path, section):
  113. continue
  114. matched = True
  115. options = config.options(section)
  116. # Keep period (days, weeks, months, years)
  117. if 'keep' in options:
  118. streams[path].keep = TimePeriod(config.get(section, 'keep'))
  119. options.remove('keep')
  120. # Rate
  121. if 'rate' in options:
  122. streams[path].rate = config.getfloat(section, 'rate')
  123. options.remove('rate')
  124. # Decimated
  125. if 'decimated' in options:
  126. val = config.getboolean(section, 'decimated')
  127. streams[path].clean_decimated = val
  128. options.remove('decimated')
  129. for leftover in options:
  130. warn("option '%s' for '%s' is unknown", leftover, section)
  131. if not matched:
  132. warn("config for '%s' did not match any existing streams", section)
  133. # List all decimated streams in the parent stream's info
  134. for path in streams.keys():
  135. src = streams[path].decimated_from
  136. if src and src in streams:
  137. if streams[src].clean_decimated:
  138. streams[src].also_clean_paths.append(path)
  139. del streams[path]
  140. # Warn about streams that aren't getting cleaned up
  141. for path in streams.keys():
  142. if streams[path].keep is None or streams[path].keep.seconds() < 0:
  143. warn("no config for existing stream '%s'", path)
  144. del streams[path]
  145. if args.estimate:
  146. # Estimate disk usage
  147. total = 0
  148. for path in streams.keys():
  149. rate = streams[path].rate
  150. if not rate or rate < 0:
  151. warn("unable to estimate disk usage for stream '%s' because "
  152. "the data rate is unknown", path)
  153. continue
  154. printf("%s:\n", path)
  155. layout = streams[path].layout
  156. dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
  157. per_row = dtype.itemsize
  158. per_sec = per_row * rate
  159. printf("%17s: %s per row, %s rows per second\n",
  160. "base rate",
  161. human_size(per_row),
  162. round(rate,1))
  163. printf("%17s: %s per hour, %s per day\n",
  164. "base size",
  165. human_size(per_sec * 3600),
  166. human_size(per_sec * 3600 * 24))
  167. # If we'll be cleaning up decimated data, add an
  168. # estimation for how much room decimated data takes up.
  169. if streams[path].clean_decimated:
  170. d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
  171. d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
  172. # Assume the decimations will be a factor of 4
  173. # sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
  174. d_per_row = d_dtype.itemsize
  175. factor = 4.0
  176. d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
  177. per_sec += d_per_sec
  178. printf("%17s: %s per hour, %s per day\n",
  179. "with decimation",
  180. human_size(per_sec * 3600),
  181. human_size(per_sec * 3600 * 24))
  182. keep = per_sec * streams[path].keep.seconds()
  183. printf("%17s: %s\n\n",
  184. "keep " + str(streams[path].keep), human_size(keep))
  185. total += keep
  186. printf("Total estimated disk usage for these streams:\n")
  187. printf(" %s\n", human_size(total))
  188. raise SystemExit(0)
  189. # Do the cleanup
  190. for path in streams:
  191. printf("%s: keep %s\n", path, streams[path].keep)
  192. # Figure out the earliest timestamp we should keep.
  193. intervals = [ Interval(start, end) for (start, end) in
  194. reversed(list(client.stream_intervals(path))) ]
  195. total = 0
  196. keep = seconds_to_timestamp(streams[path].keep.seconds())
  197. for i in intervals:
  198. total += i.end - i.start
  199. if total < keep:
  200. continue
  201. remove_before = i.start + (total - keep)
  202. break
  203. else:
  204. printf(" nothing to do (only %s of data present)\n",
  205. streams[path].keep.describe_seconds(
  206. timestamp_to_seconds(total)))
  207. continue
  208. printf(" removing data before %s\n", timestamp_to_human(remove_before))
  209. if not args.dry_run:
  210. client.stream_remove(path, None, remove_before)
  211. for ap in streams[path].also_clean_paths:
  212. printf(" also removing from %s\n", ap)
  213. if not args.dry_run:
  214. client.stream_remove(ap, None, remove_before)
  215. # All done
  216. return
  217. if __name__ == "__main__":
  218. main()