Compare commits

..

No commits in common. "master" and "python2" have entirely different histories.

60 changed files with 1152 additions and 47185 deletions

View File

@ -1,12 +0,0 @@
# -*- conf -*-
[run]
branch = True
[report]
exclude_lines =
pragma: no cover
if 0:
if __name__ == "__main__":
omit = nilmtools/_version.py
show_missing = True

4
.gitignore vendored
View File

@ -1,4 +1,3 @@
.coverage
oldprep
newprep
*.dat
@ -6,6 +5,5 @@ build/
*.pyc
dist/
*.egg-info/
.eggs/
tests/testdb*
MANIFEST.in
MANIFEST

View File

@ -1,8 +0,0 @@
# Root
include README.md
include setup.py
include versioneer.py
include Makefile
# Version
include nilmtools/_version.py

110
Makefile
View File

@ -1,45 +1,97 @@
# By default, run the tests.
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all: test
version:
python3 setup.py version
test:
ifeq ($(INSIDE_EMACS), t)
@make test_sinefit
else
@echo 'No test suite for nilmtools. Try "make install"'
endif
build:
python3 setup.py build_ext --inplace
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_trainola2:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
test_trainola3:
-nilmtool -u "http://bucket/nilmdb" destroy -R /test/jim
nilmtool -u "http://bucket/nilmdb" create /test/jim uint8_3
nilmtools/trainola.py "$$(cat extras/trainola-test-param-3.js)"
nilmtool -u "http://bucket/nilmdb" extract /test/jim -s min -e max
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
nilmtools/insert.py --skip --file --dry-run /foo/bar ~/data/20130311T2100.prep1.gz ~/data/20130311T2100.prep1.gz ~/data/20130311T2200.prep1.gz
test_copy:
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
/tmp/raw.dat:
octave --eval 'fs = 8000;' \
--eval 't = (0:fs*10)*2*pi*60/fs;' \
--eval 'raw = transpose([sin(t); 0.3*sin(3*t)+sin(t)]);' \
--eval 'save("-ascii","/tmp/raw.dat","raw");'
test_prep: /tmp/raw.dat
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
nilmtool create /test/raw float32_2
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
nilmtools/sinefit.py -a 0.5 -c 1 -s '@0' -e '@5000000' /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_sinefit:
make install >/dev/null 2>&1
-nilmtool destroy -R /test/sinefit
nilmtool create /test/sinefit float32_3
nilmtools/sinefit.py -c 5 -s '2013/03/25 09:11:00' \
-e '2013/03/25 10:11:00' /sharon/raw /test/sinefit
nilmtool extract -s min -e max /test/sinefit | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version
dist: sdist
sdist:
python3 setup.py sdist
python setup.py sdist
install:
python3 setup.py install
python setup.py install
develop:
python3 setup.py develop
ctrl: flake
flake:
flake8 nilmtools
lint:
pylint3 --rcfile=setup.cfg nilmtools
test:
ifneq ($(INSIDE_EMACS),)
# Use the slightly more flexible script
python3 setup.py build_ext --inplace
python3 tests/runtests.py
else
# Let setup.py check dependencies, build stuff, and run the test
python3 setup.py nosetests
endif
python setup.py develop
clean::
find . -name '*.pyc' -o -name '__pycache__' -print0 | xargs -0 rm -rf
rm -f .coverage
rm -rf nilmtools.egg-info/ build/ .eggs
find . -name '*pyc' | xargs rm -f
rm -rf nilmtools.egg-info/ build/ MANIFEST.in
gitclean::
git clean -dXf
.PHONY: all version dist sdist install test
.PHONY: ctrl lint flake clean gitclean
.PHONY: all version dist sdist install clean gitclean test

View File

@ -1,42 +0,0 @@
# nilmtools: Tools and utilities for NilmDB
Tools and utilities for interacting with the NILM Database, or writing
programs that interact with the NILM database.
by Jim Paris <jim@jtan.com>
## Prerequisites:
# Runtime and build environments
sudo apt-get install python3
# Create a new Python virtual environment to isolate deps.
python3 -m venv ../venv
source ../venv/bin/activate # run "deactivate" to leave
# Install all Python dependencies
pip3 install -r requirements.txt
## Install:
Install it into the virtual environment
python3 setup.py install
If you want to instead install it system-wide, you will also need to
install the requirements system-wide:
sudo pip3 install -r requirements.txt
sudo python3 setup.py install
## Building new tools:
The tools in this package are meant to be installed with `python3
setup.py install`. If you want to make a new one, an easier way to
develop would be to first install this package, and then copy a
specific script like `src/sinefit.py` to a new location, and modify it
as desired.
To add a tool to the package, place it in `src/` and add the
appropriate configuration to `setup.py`.

27
README.txt Normal file
View File

@ -0,0 +1,27 @@
nilmtools: Tools and utilities for interacting with the NILM Database,
or writing programs that interact with the NILM database.
by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.8.5+)
Install:
python setup.py install
Building new tools:
The tools in this package are meant to be installed with
"python setup.py install". If you want to make a new one,
an easier way to develop would be to first install this package,
and then copy a specific script like "src/sinefit.py" to a new
location, and modify it as desired.
To add a tool to the package, place it in "src/" and add the
appropriate configuration to "setup.py".

View File

@ -1,5 +1,5 @@
{
"url": "http://bucket.mit.edu/nilmdb",
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"dest_stream": "/test/jim",
"start": 1364184839901599,
@ -11,7 +11,7 @@
{
"name": "A - True DBL Freezer ON",
"dest_column": 0,
"url": "http://bucket.mit.edu/nilmdb",
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365277707649000,
@ -20,7 +20,7 @@
{
"name": "A - Boiler 1 Fan OFF",
"dest_column": 1,
"url": "http://bucket.mit.edu/nilmdb",
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1364188370735000,
@ -29,7 +29,7 @@
{
"name": "A - True DBL Freezer OFF",
"dest_column": 2,
"url": "http://bucket.mit.edu/nilmdb",
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365278087982000,

View File

@ -1,520 +1,197 @@
IN_LONG_VERSION_PY = True
# This file helps to compute a version number in source trees obtained from
# git-archive tarball (such as those provided by githubs download-from-tag
# feature). Distribution tarballs (built by setup.py sdist) and build
# feature). Distribution tarballs (build by setup.py sdist) and build
# directories (produced by setup.py build) will contain a much shorter file
# that just contains the computed version number.
# This file is released into the public domain. Generated by
# versioneer-0.18 (https://github.com/warner/python-versioneer)
# versioneer-0.7+ (https://github.com/warner/python-versioneer)
# these strings will be replaced by git during git-archive
git_refnames = "$Format:%d$"
git_full = "$Format:%H$"
"""Git implementation of _version.py."""
import errno
import os
import re
import subprocess
import sys
def get_keywords():
"""Get the keywords needed to look up the version information."""
# these strings will be replaced by git during git-archive.
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
git_refnames = "$Format:%d$"
git_full = "$Format:%H$"
git_date = "$Format:%ci$"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
class VersioneerConfig:
"""Container for Versioneer configuration parameters."""
def get_config():
"""Create, populate and return the VersioneerConfig() object."""
# these strings are filled in when 'setup.py versioneer' creates
# _version.py
cfg = VersioneerConfig()
cfg.VCS = "git"
cfg.style = "pep440"
cfg.tag_prefix = "nilmtools-"
cfg.parentdir_prefix = "nilmtools-"
cfg.versionfile_source = "nilmtools/_version.py"
cfg.verbose = False
return cfg
class NotThisMethod(Exception):
"""Exception raised if a method is not valid for the current scenario."""
LONG_VERSION_PY = {}
HANDLERS = {}
def register_vcs_handler(vcs, method): # decorator
"""Decorator to mark a method as the handler for a particular VCS."""
def decorate(f):
"""Store f in HANDLERS[vcs][method]."""
if vcs not in HANDLERS:
HANDLERS[vcs] = {}
HANDLERS[vcs][method] = f
return f
return decorate
def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False,
env=None):
"""Call the given command(s)."""
assert isinstance(commands, list)
p = None
for c in commands:
def run_command(args, cwd=None, verbose=False):
try:
dispcmd = str([c] + args)
# remember shell=False, so use git.cmd on windows, not just git
p = subprocess.Popen([c] + args, cwd=cwd, env=env,
stdout=subprocess.PIPE,
stderr=(subprocess.PIPE if hide_stderr
else None))
break
p = subprocess.Popen(args, stdout=subprocess.PIPE, cwd=cwd)
except EnvironmentError:
e = sys.exc_info()[1]
if e.errno == errno.ENOENT:
continue
if verbose:
print("unable to run %s" % dispcmd)
print("unable to run %s" % args[0])
print(e)
return None, None
else:
if verbose:
print("unable to find command, tried %s" % (commands,))
return None, None
return None
stdout = p.communicate()[0].strip()
if sys.version_info[0] >= 3:
if sys.version >= '3':
stdout = stdout.decode()
if p.returncode != 0:
if verbose:
print("unable to run %s (error)" % dispcmd)
print("stdout was %s" % stdout)
return None, p.returncode
return stdout, p.returncode
print("unable to run %s (error)" % args[0])
return None
return stdout
def versions_from_parentdir(parentdir_prefix, root, verbose):
"""Try to determine the version from the parent directory name.
import sys
import re
import os.path
Source tarballs conventionally unpack into a directory that includes both
the project name and a version string. We will also support searching up
two directory levels for an appropriately named parent directory
"""
rootdirs = []
for i in range(3):
dirname = os.path.basename(root)
if dirname.startswith(parentdir_prefix):
return {"version": dirname[len(parentdir_prefix):],
"full-revisionid": None,
"dirty": False, "error": None, "date": None}
else:
rootdirs.append(root)
root = os.path.dirname(root) # up a level
if verbose:
print("Tried directories %s but none started with prefix %s" %
(str(rootdirs), parentdir_prefix))
raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
@register_vcs_handler("git", "get_keywords")
def git_get_keywords(versionfile_abs):
"""Extract version information from the given file."""
def get_expanded_variables(versionfile_source):
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
# variables. When used from setup.py, we don't want to import
# _version.py, so we do it with a regexp instead. This function is not
# used from _version.py.
variables = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
for line in open(versionfile_source,"r").readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
variables["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
if line.strip().startswith("git_date ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["date"] = mo.group(1)
f.close()
variables["full"] = mo.group(1)
except EnvironmentError:
pass
return keywords
return variables
@register_vcs_handler("git", "keywords")
def git_versions_from_keywords(keywords, tag_prefix, verbose):
"""Get version information from git keywords."""
if not keywords:
raise NotThisMethod("no keywords at all, weird")
date = keywords.get("date")
if date is not None:
# git-2.2.0 added "%cI", which expands to an ISO-8601 -compliant
# datestamp. However we prefer "%ci" (which expands to an "ISO-8601
# -like" string, which we must then edit to make compliant), because
# it's been around since git-1.5.3, and it's too difficult to
# discover which version we're using, or to work around using an
# older one.
date = date.strip().replace(" ", "T", 1).replace(" ", "", 1)
refnames = keywords["refnames"].strip()
def versions_from_expanded_variables(variables, tag_prefix, verbose=False):
refnames = variables["refnames"].strip()
if refnames.startswith("$Format"):
if verbose:
print("keywords are unexpanded, not using")
raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
print("variables are unexpanded, not using")
return {} # unexpanded, so not in an unpacked git-archive tarball
refs = set([r.strip() for r in refnames.strip("()").split(",")])
# starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
# just "foo-1.0". If we see a "tag: " prefix, prefer those.
TAG = "tag: "
tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
if not tags:
# Either we're using git < 1.8.3, or there really are no tags. We use
# a heuristic: assume all version tags have a digit. The old git %d
# expansion behaves like git log --decorate=short and strips out the
# refs/heads/ and refs/tags/ prefixes that would let us distinguish
# between branches and tags. By ignoring refnames without digits, we
# filter out many common branch names like "release" and
# "stabilization", as well as "HEAD" and "master".
tags = set([r for r in refs if re.search(r'\d', r)])
for ref in list(refs):
if not re.search(r'\d', ref):
if verbose:
print("discarding '%s', no digits" % ",".join(refs - tags))
print("discarding '%s', no digits" % ref)
refs.discard(ref)
# Assume all version tags have a digit. git's %d expansion
# behaves like git log --decorate=short and strips out the
# refs/heads/ and refs/tags/ prefixes that would let us
# distinguish between branches and tags. By ignoring refnames
# without digits, we filter out many common branch names like
# "release" and "stabilization", as well as "HEAD" and "master".
if verbose:
print("likely tags: %s" % ",".join(sorted(tags)))
for ref in sorted(tags):
print("remaining refs: %s" % ",".join(sorted(refs)))
for ref in sorted(refs):
# sorting will prefer e.g. "2.0" over "2.0rc1"
if ref.startswith(tag_prefix):
r = ref[len(tag_prefix):]
if verbose:
print("picking %s" % r)
return {"version": r,
"full-revisionid": keywords["full"].strip(),
"dirty": False, "error": None,
"date": date}
# no suitable tags, so version is "0+unknown", but full hex is still there
return { "version": r,
"full": variables["full"].strip() }
# no suitable tags, so we use the full revision id
if verbose:
print("no suitable tags, using unknown + full revision id")
return {"version": "0+unknown",
"full-revisionid": keywords["full"].strip(),
"dirty": False, "error": "no suitable tags", "date": None}
print("no suitable tags, using full revision id")
return { "version": variables["full"].strip(),
"full": variables["full"].strip() }
@register_vcs_handler("git", "pieces_from_vcs")
def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
"""Get version from 'git describe' in the root of the source tree.
This only gets called if the git-archive 'subst' keywords were *not*
expanded, and _version.py hasn't already been rewritten with a short
version string, meaning we're inside a checked out source tree.
"""
GITS = ["git"]
if sys.platform == "win32":
GITS = ["git.cmd", "git.exe"]
out, rc = run_command(GITS, ["rev-parse", "--git-dir"], cwd=root,
hide_stderr=True)
if rc != 0:
if verbose:
print("Directory %s not under git control" % root)
raise NotThisMethod("'git rev-parse --git-dir' returned error")
# if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
# if there isn't one, this yields HEX[-dirty] (no NUM)
describe_out, rc = run_command(GITS, ["describe", "--tags", "--dirty",
"--always", "--long",
"--match", "%s*" % tag_prefix],
cwd=root)
# --long was added in git-1.5.5
if describe_out is None:
raise NotThisMethod("'git describe' failed")
describe_out = describe_out.strip()
full_out, rc = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
if full_out is None:
raise NotThisMethod("'git rev-parse' failed")
full_out = full_out.strip()
pieces = {}
pieces["long"] = full_out
pieces["short"] = full_out[:7] # maybe improved later
pieces["error"] = None
# parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
# TAG might have hyphens.
git_describe = describe_out
# look for -dirty suffix
dirty = git_describe.endswith("-dirty")
pieces["dirty"] = dirty
if dirty:
git_describe = git_describe[:git_describe.rindex("-dirty")]
# now we have TAG-NUM-gHEX or HEX
if "-" in git_describe:
# TAG-NUM-gHEX
mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
if not mo:
# unparseable. Maybe git-describe is misbehaving?
pieces["error"] = ("unable to parse git-describe output: '%s'"
% describe_out)
return pieces
# tag
full_tag = mo.group(1)
if not full_tag.startswith(tag_prefix):
if verbose:
fmt = "tag '%s' doesn't start with prefix '%s'"
print(fmt % (full_tag, tag_prefix))
pieces["error"] = ("tag '%s' doesn't start with prefix '%s'"
% (full_tag, tag_prefix))
return pieces
pieces["closest-tag"] = full_tag[len(tag_prefix):]
# distance: number of commits since tag
pieces["distance"] = int(mo.group(2))
# commit: short hex revision ID
pieces["short"] = mo.group(3)
else:
# HEX: no tags
pieces["closest-tag"] = None
count_out, rc = run_command(GITS, ["rev-list", "HEAD", "--count"],
cwd=root)
pieces["distance"] = int(count_out) # total number of commits
# commit date: see ISO-8601 comment in git_versions_from_keywords()
date = run_command(GITS, ["show", "-s", "--format=%ci", "HEAD"],
cwd=root)[0].strip()
pieces["date"] = date.strip().replace(" ", "T", 1).replace(" ", "", 1)
return pieces
def plus_or_dot(pieces):
"""Return a + if we don't already have one, else return a ."""
if "+" in pieces.get("closest-tag", ""):
return "."
return "+"
def render_pep440(pieces):
"""Build up version string, with post-release "local version identifier".
Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty
Exceptions:
1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty]
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"] or pieces["dirty"]:
rendered += plus_or_dot(pieces)
rendered += "%d.g%s" % (pieces["distance"], pieces["short"])
if pieces["dirty"]:
rendered += ".dirty"
else:
# exception #1
rendered = "0+untagged.%d.g%s" % (pieces["distance"],
pieces["short"])
if pieces["dirty"]:
rendered += ".dirty"
return rendered
def render_pep440_pre(pieces):
"""TAG[.post.devDISTANCE] -- No -dirty.
Exceptions:
1: no tags. 0.post.devDISTANCE
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"]:
rendered += ".post.dev%d" % pieces["distance"]
else:
# exception #1
rendered = "0.post.dev%d" % pieces["distance"]
return rendered
def render_pep440_post(pieces):
"""TAG[.postDISTANCE[.dev0]+gHEX] .
The ".dev0" means dirty. Note that .dev0 sorts backwards
(a dirty tree will appear "older" than the corresponding clean one),
but you shouldn't be releasing software with -dirty anyways.
Exceptions:
1: no tags. 0.postDISTANCE[.dev0]
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"] or pieces["dirty"]:
rendered += ".post%d" % pieces["distance"]
if pieces["dirty"]:
rendered += ".dev0"
rendered += plus_or_dot(pieces)
rendered += "g%s" % pieces["short"]
else:
# exception #1
rendered = "0.post%d" % pieces["distance"]
if pieces["dirty"]:
rendered += ".dev0"
rendered += "+g%s" % pieces["short"]
return rendered
def render_pep440_old(pieces):
"""TAG[.postDISTANCE[.dev0]] .
The ".dev0" means dirty.
Eexceptions:
1: no tags. 0.postDISTANCE[.dev0]
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"] or pieces["dirty"]:
rendered += ".post%d" % pieces["distance"]
if pieces["dirty"]:
rendered += ".dev0"
else:
# exception #1
rendered = "0.post%d" % pieces["distance"]
if pieces["dirty"]:
rendered += ".dev0"
return rendered
def render_git_describe(pieces):
"""TAG[-DISTANCE-gHEX][-dirty].
Like 'git describe --tags --dirty --always'.
Exceptions:
1: no tags. HEX[-dirty] (note: no 'g' prefix)
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"]:
rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
else:
# exception #1
rendered = pieces["short"]
if pieces["dirty"]:
rendered += "-dirty"
return rendered
def render_git_describe_long(pieces):
"""TAG-DISTANCE-gHEX[-dirty].
Like 'git describe --tags --dirty --always -long'.
The distance/hash is unconditional.
Exceptions:
1: no tags. HEX[-dirty] (note: no 'g' prefix)
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
else:
# exception #1
rendered = pieces["short"]
if pieces["dirty"]:
rendered += "-dirty"
return rendered
def render(pieces, style):
"""Render the given version pieces into the requested style."""
if pieces["error"]:
return {"version": "unknown",
"full-revisionid": pieces.get("long"),
"dirty": None,
"error": pieces["error"],
"date": None}
if not style or style == "default":
style = "pep440" # the default
if style == "pep440":
rendered = render_pep440(pieces)
elif style == "pep440-pre":
rendered = render_pep440_pre(pieces)
elif style == "pep440-post":
rendered = render_pep440_post(pieces)
elif style == "pep440-old":
rendered = render_pep440_old(pieces)
elif style == "git-describe":
rendered = render_git_describe(pieces)
elif style == "git-describe-long":
rendered = render_git_describe_long(pieces)
else:
raise ValueError("unknown style '%s'" % style)
return {"version": rendered, "full-revisionid": pieces["long"],
"dirty": pieces["dirty"], "error": None,
"date": pieces.get("date")}
def get_versions():
"""Get version information or return default if unable to do so."""
# I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have
# __file__, we can work backwards from there to the root. Some
# py2exe/bbfreeze/non-CPython implementations don't do __file__, in which
# case we can only use expanded keywords.
cfg = get_config()
verbose = cfg.verbose
def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):
# this runs 'git' from the root of the source tree. That either means
# someone ran a setup.py command (and this code is in versioneer.py, so
# IN_LONG_VERSION_PY=False, thus the containing directory is the root of
# the source tree), or someone ran a project-specific entry point (and
# this code is in _version.py, so IN_LONG_VERSION_PY=True, thus the
# containing directory is somewhere deeper in the source tree). This only
# gets called if the git-archive 'subst' variables were *not* expanded,
# and _version.py hasn't already been rewritten with a short version
# string, meaning we're inside a checked out source tree.
try:
return git_versions_from_keywords(get_keywords(), cfg.tag_prefix,
verbose)
except NotThisMethod:
pass
try:
root = os.path.realpath(__file__)
# versionfile_source is the relative path from the top of the source
# tree (where the .git directory might live) to this file. Invert
# this to find the root from __file__.
for i in cfg.versionfile_source.split('/'):
root = os.path.dirname(root)
here = os.path.abspath(__file__)
except NameError:
return {"version": "0+unknown", "full-revisionid": None,
"dirty": None,
"error": "unable to find root of source tree",
"date": None}
# some py2exe/bbfreeze/non-CPython implementations don't do __file__
return {} # not always correct
# versionfile_source is the relative path from the top of the source tree
# (where the .git directory might live) to this file. Invert this to find
# the root from __file__.
root = here
if IN_LONG_VERSION_PY:
for i in range(len(versionfile_source.split("/"))):
root = os.path.dirname(root)
else:
root = os.path.dirname(here)
if not os.path.exists(os.path.join(root, ".git")):
if verbose:
print("no .git in %s" % root)
return {}
GIT = "git"
if sys.platform == "win32":
GIT = "git.cmd"
stdout = run_command([GIT, "describe", "--tags", "--dirty", "--always"],
cwd=root)
if stdout is None:
return {}
if not stdout.startswith(tag_prefix):
if verbose:
print("tag '%s' doesn't start with prefix '%s'" % (stdout, tag_prefix))
return {}
tag = stdout[len(tag_prefix):]
stdout = run_command([GIT, "rev-parse", "HEAD"], cwd=root)
if stdout is None:
return {}
full = stdout.strip()
if tag.endswith("-dirty"):
full += "-dirty"
return {"version": tag, "full": full}
def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False):
if IN_LONG_VERSION_PY:
# We're running from _version.py. If it's from a source tree
# (execute-in-place), we can work upwards to find the root of the
# tree, and then check the parent directory for a version string. If
# it's in an installed application, there's no hope.
try:
pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose)
return render(pieces, cfg.style)
except NotThisMethod:
pass
here = os.path.abspath(__file__)
except NameError:
# py2exe/bbfreeze/non-CPython don't have __file__
return {} # without __file__, we have no hope
# versionfile_source is the relative path from the top of the source
# tree to _version.py. Invert this to find the root from __file__.
root = here
for i in range(len(versionfile_source.split("/"))):
root = os.path.dirname(root)
else:
# we're running from versioneer.py, which means we're running from
# the setup.py in a source tree. sys.argv[0] is setup.py in the root.
here = os.path.abspath(sys.argv[0])
root = os.path.dirname(here)
try:
if cfg.parentdir_prefix:
return versions_from_parentdir(cfg.parentdir_prefix, root, verbose)
except NotThisMethod:
pass
# Source tarballs conventionally unpack into a directory that includes
# both the project name and a version string.
dirname = os.path.basename(root)
if not dirname.startswith(parentdir_prefix):
if verbose:
print("guessing rootdir is '%s', but '%s' doesn't start with prefix '%s'" %
(root, dirname, parentdir_prefix))
return None
return {"version": dirname[len(parentdir_prefix):], "full": ""}
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }
ver = versions_from_expanded_variables(variables, tag_prefix, verbose)
if not ver:
ver = versions_from_vcs(tag_prefix, versionfile_source, verbose)
if not ver:
ver = versions_from_parentdir(parentdir_prefix, versionfile_source,
verbose)
if not ver:
ver = default
return ver
return {"version": "0+unknown", "full-revisionid": None,
"dirty": None,
"error": "unable to compute version", "date": None}

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
#!/usr/bin/python
from nilmdb.utils.printf import printf, fprintf, sprintf
from nilmdb.utils.time import (timestamp_to_human,
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp)
from nilmdb.utils.diskusage import human_size
from nilmdb.utils.interval import Interval
@ -9,24 +9,21 @@ import nilmdb.client
import nilmdb.client.numpyclient
import nilmtools
import argparse
import configparser
import ConfigParser
import sys
import collections
import fnmatch
import re
import os
def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = {'h': ('hour', 60*60),
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
'y': ('year', 60*60*24*365)}
'y': ('year', 60*60*24*365) }
def __init__(self, val):
for u in self._units:
@ -36,7 +33,7 @@ class TimePeriod(object):
self.count = float(val[:-len(u)])
break
else:
raise ValueError("unknown units: " + val)
raise ValueError("unknown units: " + units)
def seconds(self):
return self.count * self.scale
@ -52,7 +49,6 @@ class TimePeriod(object):
def __str__(self):
return self.describe_seconds(self.seconds())
class StreamCleanupConfig(object):
def __init__(self, info):
self.path = info[0]
@ -66,11 +62,11 @@ class StreamCleanupConfig(object):
self.decimated_from = None
self.also_clean_paths = []
def main(argv=None):
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Clean up old data from streams using a configuration file to specify
which data to remove.
@ -97,28 +93,26 @@ def main(argv=None):
Rate is optional and is only used for the --estimate option.
""")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
parser.add_argument("-u", "--url", action="store", default=def_url,
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-y", "--yes", action="store_true",
default=False,
default = False,
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default=False,
default = False,
help="Estimate how much disk space will be used")
parser.add_argument("configfile", type=argparse.FileType('r'),
help="Configuration file")
args = parser.parse_args(argv)
# Parse config file
config = configparser.RawConfigParser()
config = ConfigParser.RawConfigParser()
config.readfp(args.configfile)
# List all streams
client = nilmdb.client.Client(args.url)
streamlist = client.stream_list(extended=True)
streamlist = client.stream_list(extended = True)
# Create config objects
streams = collections.OrderedDict()
@ -131,7 +125,7 @@ def main(argv=None):
# Build up configuration
for section in config.sections():
matched = False
for path in streams.keys():
for path in streams.iterkeys():
# Decimated streams only allow exact matches
if streams[path].decimated_from and path != section:
continue
@ -163,7 +157,7 @@ def main(argv=None):
warn("config for '%s' did not match any existing streams", section)
# List all decimated streams in the parent stream's info
for path in list(streams.keys()):
for path in streams.keys():
src = streams[path].decimated_from
if src and src in streams:
if streams[src].clean_decimated:
@ -171,7 +165,7 @@ def main(argv=None):
del streams[path]
# Warn about streams that aren't getting cleaned up
for path in list(streams.keys()):
for path in streams.keys():
if streams[path].keep is None or streams[path].keep.seconds() < 0:
warn("no config for existing stream '%s'", path)
del streams[path]
@ -179,7 +173,7 @@ def main(argv=None):
if args.estimate:
# Estimate disk usage
total = 0
for path in list(streams.keys()):
for path in streams.keys():
rate = streams[path].rate
if not rate or rate < 0:
warn("unable to estimate disk usage for stream '%s' because "
@ -193,7 +187,7 @@ def main(argv=None):
printf("%17s: %s per row, %s rows per second\n",
"base rate",
human_size(per_row),
round(rate, 1))
round(rate,1))
printf("%17s: %s per hour, %s per day\n",
"base size",
human_size(per_sec * 3600),
@ -208,9 +202,7 @@ def main(argv=None):
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
d_per_row = d_dtype.itemsize
factor = 4.0
d_per_sec = (d_per_row *
(rate / factor) *
(1 / (1 - (1/factor))))
d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
per_sec += d_per_sec
printf("%17s: %s per hour, %s per day\n",
"with decimation",
@ -230,8 +222,8 @@ def main(argv=None):
printf("%s: keep %s\n", path, streams[path].keep)
# Figure out the earliest timestamp we should keep.
intervals = [Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path)))]
intervals = [ Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path))) ]
total = 0
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
@ -245,13 +237,12 @@ def main(argv=None):
streams[path].keep.describe_seconds(
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n",
timestamp_to_human(remove_before))
printf(" removing data before %s\n", timestamp_to_human(remove_before))
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [path]
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
@ -262,6 +253,5 @@ def main(argv=None):
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":
main()

View File

@ -1,13 +1,15 @@
#!/usr/bin/env python3
#!/usr/bin/python
# This is called copy_one instead of copy to avoid name conflicts with
# the Python standard library.
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
def main(argv=None):
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
parser.add_argument('-n', '--nometa', action='store_true',
@ -17,11 +19,11 @@ def main(argv=None):
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print("Source is %s (%s)" % (e.src.path, e.src.layout))
print("Destination %s doesn't exist" % (e.dest.path))
print("You could make it with a command like:")
print(" nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout))
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
@ -33,11 +35,10 @@ def main(argv=None):
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print("Processing", i.human_string())
print "Processing", i.human_string()
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

View File

@ -1,21 +1,21 @@
#!/usr/bin/env python3
#!/usr/bin/python
# Copy streams between NilmDB servers with wildcards
import nilmtools.filter
import nilmtools.copy_one
import nilmdb.client
import argparse
import fnmatch
def main(argv=None):
def main(argv = None):
f = nilmtools.filter.Filter()
# Reuse filter's parser, since it handles most options we need.
parser = f.setup_parser(description="""\
parser = f.setup_parser(description = """\
Copy all streams matching the given wildcard from one host to another.
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths=True)
""", skip_paths = True)
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
parser.add_argument("path", action="store", nargs="+",
@ -29,13 +29,13 @@ def main(argv=None):
client_dest = nilmdb.client.Client(args.dest_url)
if client_src.geturl() == client_dest.geturl():
parser.error("source and destination URL must be different")
print("Source URL:", client_src.geturl())
print(" Dest URL:", client_dest.geturl())
print "Source URL:", client_src.geturl()
print " Dest URL:", client_dest.geturl()
# Find matching streams
matched = []
for path in args.path:
matched.extend([s for s in client_src.stream_list(extended=True)
matched.extend([s for s in client_src.stream_list(extended = True)
if fnmatch.fnmatch(s[0], path)
and s not in matched])
@ -44,14 +44,14 @@ def main(argv=None):
src = nilmtools.filter.StreamInfo(client_src.geturl(), stream)
dest = nilmtools.filter.get_stream_info(client_dest, src.path)
if not dest:
print("Creating destination stream", src.path)
print "Creating destination stream", src.path
client_dest.stream_create(src.path, src.layout)
# Copy them all by running the "copy" tool as if it were
# invoked from the command line.
for stream in matched:
new_argv = ["--url", client_src.geturl(),
"--dest-url", client_dest.geturl()]
"--dest-url", client_dest.geturl() ]
if args.start:
new_argv.extend(["--start", "@" + repr(args.start)])
if args.end:
@ -67,9 +67,8 @@ def main(argv=None):
nilmtools.copy_one.main(new_argv)
except SystemExit as e:
# Ignore SystemExit which could be raised on --dry-run
if e.code != 0: # pragma: no cover (shouldn't happen)
if e.code != 0:
raise
if __name__ == "__main__":
main()

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main(argv=None):
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
@ -19,34 +20,31 @@ def main(argv=None):
# a recommended layout.
src = e.src
dest = e.dest
print("Source is %s (%s)" % (src.path, src.layout))
print("Destination %s doesn't exist" % (dest.path))
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client_src.stream_get_metadata(src.path):
rec = src.layout
elif ('int32' in src.layout_type or
'int64' in src.layout_type or
'float64' in src.layout_type):
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print("You could make it with a command like:")
print(" nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec))
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({"decimate_source": f.src.path,
"decimate_factor": args.factor})
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
again = True
else:
again = False
f.process_numpy(decimate, args=(args.factor, again))
f.process_numpy(decimate, args = (args.factor, again))
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
@ -70,19 +68,18 @@ def decimate(data, interval, args, insert_function, final):
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n, :]
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[np.mean(data[:, :, mean_col], axis=1),
np.min(data[:, :, min_col], axis=1),
np.max(data[:, :, max_col], axis=1)]
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

View File

@ -1,17 +1,16 @@
#!/usr/bin/env python3
#!/usr/bin/python
import os
import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv=None):
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
@ -19,62 +18,50 @@ def main(argv=None):
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
parser.add_argument("-u", "--url", action="store", default=def_url,
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-f", "--factor", action="store", default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("-m", "--max", action="store", default=500, type=int,
help='Maximum number of points in last level ' +
'(default: %(default)s)')
parser.add_argument("-F", "--force-metadata", action="store_true",
default=False,
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
if args.max < 0:
print("error: bad max, must be nonnegative")
raise SystemExit(1)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
# Find list of paths to process
streams = [str(s[0]) for s in client.stream_list()]
streams = [s for s in streams if "~decim-" not in s]
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, str(path))
new = fnmatch.filter(streams, unicode(path))
if not new:
print("error: no stream matched path:", path)
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print("Decimating", path)
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info: # pragma: no cover (only good paths passed above)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print("Stream", path, "was decimated from", meta["decimate_source"])
print("You need to pass the base stream instead")
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
# Figure out the type we should use for decimated streams
if ('int32' in info.layout_type or
'int64' in info.layout_type or
'float64' in info.layout_type):
if 'int32' in info.layout_type or 'float64' in info.layout_type:
decimated_type = 'float64_' + str(info.layout_count * 3)
else:
decimated_type = 'float32_' + str(info.layout_count * 3)
@ -82,8 +69,8 @@ def do_decimation(client, args, path):
# Now do the decimations until we have few enough points
factor = 1
while True:
print("Level", factor, "decimation has", info.rows, "rows")
if info.rows <= args.max:
print "Level", factor, "decimation has", info.rows, "rows"
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (path, factor)
@ -91,14 +78,14 @@ def do_decimation(client, args, path):
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
if not new_info:
print("Creating stream", new_path)
print "Creating stream", new_path
client.stream_create(new_path, decimated_type)
# Run the decimation as if it were run from the commandline
new_argv = ["-u", args.url,
"-f", str(args.factor)]
new_argv = [ "-u", args.url,
"-f", str(args.factor) ]
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([ "--force-metadata" ])
new_argv.extend([info.path, new_path])
nilmtools.decimate.main(new_argv)
@ -107,6 +94,5 @@ def do_decimation(client, args, path):
return
if __name__ == "__main__":
main()

View File

@ -1,30 +1,35 @@
#!/usr/bin/env python3
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import os
import itertools
import time
import sys
import re
import argparse
import numpy as np
import cStringIO
import functools
class ArgumentError(Exception):
pass
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, f"destination path {dest.path} not found")
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info):
@ -40,7 +45,7 @@ class StreamInfo(object):
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except (IndexError, TypeError):
except IndexError, TypeError:
pass
def string(self, interhost):
@ -56,19 +61,17 @@ class StreamInfo(object):
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended=True)
streams = client.stream_list(path, extended = True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
# Filter processing for a single interval of data.
def process_numpy_interval(interval, extractor, inserter, warn_rows,
function, args=None):
function, args = None):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
@ -130,7 +133,6 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
@ -159,10 +161,9 @@ def example_callback_function(data, interval, args, insert_func, final):
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, parser_description=None):
def __init__(self, parser_description = None):
self._parser = None
self._client_src = None
self._client_dest = None
@ -173,7 +174,6 @@ class Filter(object):
self.end = None
self._interhost = False
self._force_metadata = False
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@ -181,35 +181,33 @@ class Filter(object):
@property
def client_src(self):
if self._using_client:
raise Exception("Filter src client is in use; make another")
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter dest client is in use; make another")
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description="Filter data", skip_paths=False):
def setup_parser(self, description = "Filter data", skip_paths = False):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=description)
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default=self.def_url,
default="http://localhost/nilmdb/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default=False,
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-q", "--quiet", action="store_true",
default=False,
help="Don't print source and dest stream info")
group.add_argument("-F", "--force-metadata", action="store_true",
default=False,
default = False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
@ -220,23 +218,20 @@ class Filter(object):
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, eg. /foo/bar")
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, eg. /foo/bar")
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def set_args(self, url, dest_url, srcpath, destpath, start, end,
parsed_args=None, quiet=True):
parsed_args = None, quiet = True):
"""Set arguments directly from parameters"""
if dest_url is None:
dest_url = url
@ -247,8 +242,7 @@ class Filter(object):
self._client_dest = Client(dest_url)
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError(
"source and destination path must be different")
raise ArgumentError("source and destination path must be different")
# Open the streams
self.src = get_stream_info(self._client_src, srcpath)
@ -265,20 +259,20 @@ class Filter(object):
# Print info
if not quiet:
print("Source:", self.src.string(self._interhost))
print(" Dest:", self.dest.string(self._interhost))
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv=None):
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
args = self._parser.parse_args(argv)
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet=args.quiet, parsed_args=args)
args.start, args.end, quiet = False, parsed_args = args)
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print(interval.human_string())
print interval.human_string()
raise SystemExit(0)
return args
@ -288,25 +282,25 @@ class Filter(object):
if self._interhost:
# Do the difference ourselves
s_intervals = (Interval(start, end)
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start=self.start, end=self.end))
d_intervals = (Interval(start, end)
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start=self.start, end=self.end))
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = (Interval(start, end)
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath=self.dest.path,
start=self.start, end=self.end))
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in nilmdb.utils.interval.optimize(intervals):
yield interval
@ -330,9 +324,13 @@ class Filter(object):
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, str):
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
@ -347,8 +345,8 @@ class Filter(object):
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args=None, rows=100000,
intervals=None):
def process_numpy(self, function, args = None, rows = 100000,
intervals = None):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
@ -369,25 +367,23 @@ class Filter(object):
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
extractor_func = functools.partial(extractor, self.src.path,
layout=self.src.layout,
maxrows=rows)
layout = self.src.layout,
maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path)
for interval in (intervals or self.intervals()):
print("Processing", interval.human_string())
print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)
def main(argv=None):
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser() # noqa: F841
args = f.parse_args(argv) # noqa: F841
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print("Generic filter: need to handle", i.human_string())
print "Generic filter: need to handle", i.human_string()
if __name__ == "__main__":
main()

View File

@ -1,29 +1,29 @@
#!/usr/bin/env python3
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import os
import nilmtools
import time
import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args(argv=None):
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
This code tracks two timestamps:
@ -56,12 +56,9 @@ def parse_args(argv=None):
error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error.
"""))
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
parser.add_argument("-u", "--url", action="store", default=def_url,
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
@ -101,7 +98,7 @@ def parse_args(argv=None):
help="Path of stream, e.g. /foo/bar")
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('rb'), nargs='*',
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
@ -129,8 +126,7 @@ def parse_args(argv=None):
return args
def main(argv=None):
def main(argv = None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
@ -140,7 +136,6 @@ def main(argv=None):
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
@ -176,7 +171,7 @@ def main(argv=None):
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin=f, stdout=subprocess.PIPE)
stdin = f, stdout = subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
@ -189,11 +184,17 @@ def main(argv=None):
except ValueError:
pass
truncated_lines = 0
# Read each line
for line in f:
# The last line in the file may be truncated.
# Ignore it; we shouldn't ever see more than one at the end.
if line[-1] != b'\n'[0]:
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
@ -202,16 +203,13 @@ def main(argv=None):
continue
# If line starts with a comment, look for a timestamp
if line[0] == b'#'[0]:
if line[0] == '#':
try:
comment = line[1:].decode('utf-8', errors='ignore')
clock_ts = parse_time(comment) + offset_comment
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
# for some reason the following line doesn't show up as
# being covered, even though it definitely runs
continue # pragma: no cover
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
@ -270,9 +268,8 @@ def main(argv=None):
# Insert it
if not args.dry_run:
stream.insert(b"%d %s" % (data_ts, line))
print("Done")
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":
main()

View File

@ -1,22 +1,10 @@
#!/usr/bin/env python3
#!/usr/bin/python
# Miscellaenous useful mathematical functions
from nilmdb.utils.printf import *
from numpy import *
import scipy
from scipy import *
def numpy_raise_errors(func):
def wrap(*args, **kwargs):
old = seterr('raise')
try:
return func(*args, **kwargs)
finally:
seterr(**old)
return wrap
@numpy_raise_errors
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
@ -37,14 +25,10 @@ def sfit4(data, fs):
(Verified to match sfit4.m)
"""
N = len(data)
if N < 2:
raise ValueError("bad data")
t = linspace(0, (N-1) / float(fs), N)
#
# Estimate frequency using FFT (step b)
#
Fc = scipy.fft.fft(data)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
@ -82,24 +66,21 @@ def sfit4(data, fs):
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t)] # eqn B.16
s = linalg.lstsq(D, data, rcond=None)[0] # eqn B.18
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
#
# Extract results
#
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception: # pragma: no cover (not sure if we can hit this?)
except Exception as e:
# something broke down; just return zeros
return (0, 0, 0, 0)
def peak_detect(data, delta=0.1):
def peak_detect(data, delta = 0.1):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper.
@ -108,7 +89,7 @@ def peak_detect(data, delta=0.1):
where n is the row number in 'data', and p is 'data[n]',
and is_max is True if this is a maximum, False if it's a minimum,
"""
peaks = []
peaks = [];
cur_min = (None, inf)
cur_max = (None, -inf)
lookformax = False

View File

@ -1,33 +1,31 @@
#!/usr/bin/env python3
import nilmtools.filter
import scipy.signal
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv=None):
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help="median filter size (default %(default)s)")
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help="store difference rather than filtered values")
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print("Source is %s (%s)" % (e.src.path, e.src.layout))
print("Destination %s doesn't exist" % (e.dest.path))
print("You could make it with a command like:")
print(" nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout))
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
f.check_dest_metadata({"median_filter_source": f.src.path,
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference)})
f.process_numpy(median_filter, args=(args.size, args.difference))
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
@ -41,6 +39,5 @@ def median_filter(data, interval, args, insert, final):
insert(data)
return rows
if __name__ == "__main__":
main()

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import printf, fprintf
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools
@ -14,14 +14,14 @@ import tempfile
import threading
import select
import signal
import queue
import Queue
import daemon
def parse_args(argv=None):
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="""\
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
@ -30,8 +30,6 @@ def parse_args(argv=None):
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
@ -40,7 +38,7 @@ def parse_args(argv=None):
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Exit if no output from " +
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
@ -51,70 +49,65 @@ def parse_args(argv=None):
return args
def reader_thread(q, fd):
def reader_thread(queue, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x: # pragma: no cover -- never expect this to happen
# Very few things are "exceptional conditions";
# just TCP OOB data, some TTY state changes, etc.
raise Exception
if x:
raise Exception # generator died?
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == b"": # generator EOF
if data == "": # generator EOF
raise Exception
q.put(data)
queue.put(data)
except Exception:
q.put(None)
queue.put(None)
def watcher_thread(q, procs):
def watcher_thread(queue, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
q.put(None)
queue.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell=True,
bufsize=-1, close_fds=True,
stdin=devnull,
stdout=subprocess.PIPE,
stderr=None,
preexec_fn=os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell=True,
bufsize=-11, close_fds=True,
stdin=subprocess.PIPE,
stdout=None,
stderr=None,
preexec_fn=os.setpgrp)
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None,
preexec_fn = os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None,
stderr = None,
preexec_fn = os.setpgrp)
q = queue.Queue(maxsize=4)
reader = threading.Thread(target=reader_thread,
args=(q, generator.stdout.fileno()))
queue = Queue.Queue(maxsize = 4)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target=watcher_thread,
args=(q, [generator, consumer]))
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = q.get(True, args.timeout)
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except queue.Empty:
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
@ -138,8 +131,7 @@ def pipewatch(args):
os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None:
os.killpg(proc.pid, signal.SIGKILL)
except OSError: # pragma: no cover
# (hard to trigger race condition in os.killpg)
except OSError:
pass
return poll_timeout(proc, 0.5)
@ -150,7 +142,7 @@ def pipewatch(args):
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
q.get(True, 0.1)
queue.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
@ -158,8 +150,7 @@ def pipewatch(args):
sys.exit(0)
sys.exit(1)
def main(argv=None):
def main(argv = None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
@ -169,8 +160,8 @@ def main(argv=None):
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon: # pragma: no cover (hard to do from inside test suite)
with daemon.DaemonContext(files_preserve=[lockfile]):
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
@ -181,6 +172,5 @@ def main(argv=None):
except OSError:
pass
if __name__ == "__main__":
main()

View File

@ -1,23 +1,23 @@
#!/usr/bin/env python3
#!/usr/bin/python
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import printf
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import pi, zeros, r_, e, real, imag
from numpy import *
import scipy.fftpack
import scipy.signal
#from matplotlib import pyplot as p
import bisect
from nilmdb.utils.interval import Interval
def main(argv=None):
def main(argv = None):
# Set up argument parser
f = nilmtools.filter.Filter()
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths=True)
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths = True)
group = parser.add_argument_group("Prep options")
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
@ -43,10 +43,14 @@ def main(argv=None):
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_%d" % (e.parsed_args.nharm * 2)
print("Source is %s (%s)" % (e.src.path, e.src.layout))
print("Destination %s doesn't exist" % (e.dest.path))
print("You could make it with a command like:")
print(" nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec))
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
@ -64,10 +68,6 @@ def main(argv=None):
else:
rotation = args.rotate_rad or 0.0
if f.dest.layout_count != args.nharm * 2:
print("error: need", args.nharm*2, "columns in destination stream")
raise SystemExit(1)
# Check the sine fit stream
client_sinefit = nilmdb.client.Client(args.url)
sinefit = nilmtools.filter.get_stream_info(client_sinefit, args.sinepath)
@ -78,44 +78,40 @@ def main(argv=None):
+ "; expected float32_3")
# Check and set metadata in prep stream
f.check_dest_metadata({"prep_raw_source": f.src.path,
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column,
"prep_rotation": repr(rotation),
"prep_nshift": args.nshift})
"prep_nshift": args.nshift })
# Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is
# what we will process.
filter_int = f.intervals()
sinefit_int = (Interval(start, end) for (start, end) in
sinefit_int = ( Interval(start, end) for (start, end) in
client_sinefit.stream_intervals(
args.sinepath, start=f.start, end=f.end))
args.sinepath, start = f.start, end = f.end) )
intervals = nilmdb.utils.interval.intersection(filter_int, sinefit_int)
# Run the process (using the helper in the filter module)
f.process_numpy(process, args=(client_sinefit, sinefit.path, args.column,
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift),
intervals=intervals)
intervals = intervals)
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args
rows = data.shape[0]
data_timestamps = data[:, 0]
data_timestamps = data[:,0]
if rows < 2:
return 0
last_inserted = [nilmdb.utils.time.min_timestamp]
def insert_if_nonoverlapping(data):
"""Call insert_function to insert data, but only if this
data doesn't overlap with other data that we inserted."""
if data[0][0] <= last_inserted[0]: # pragma: no cover
# Getting coverage here is hard -- not sure exactly when
# it gets triggered or why this was added; probably some
# unlikely edge condition with timestamp rounding or something.
if data[0][0] <= last_inserted[0]:
return
last_inserted[0] = data[-1][0]
insert_function(data)
@ -162,7 +158,7 @@ def process(data, interval, args, insert_function, final):
# Extract sinefit data to get zero crossing timestamps.
# t_min = beginning of period
# t_max = end of period
(t_min, f0, A, C) = [float(x) for x in sinefit_line.split()]
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Compute prep over shifted windows of the period
@ -184,7 +180,7 @@ def process(data, interval, args, insert_function, final):
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows // 2
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
@ -192,6 +188,5 @@ def process(data, interval, args, insert_function, final):
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":
main()

View File

@ -1,19 +1,20 @@
#!/usr/bin/env python3
#!/usr/bin/python
# Sine wave fitting.
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmtools.math
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
import numpy
from numpy import *
from scipy import *
#import pylab as p
import sys
# import pylab as p
def main(argv=None):
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
@ -37,10 +38,10 @@ def main(argv=None):
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print("Source is %s (%s)" % (e.src.path, e.src.layout))
print("Destination %s doesn't exist" % (e.dest.path))
print("You could make it with a command like:")
print(" nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec))
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
@ -58,14 +59,13 @@ def main(argv=None):
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({"sinefit_source": f.src.path,
"sinefit_column": args.column})
f.process_numpy(process, args=(args.column, args.frequency, args.min_amp,
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount=10, maxsuppress=100):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
@ -78,20 +78,19 @@ class SuppressibleWarning(object):
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds=None):
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset()
self.reset(seconds)
def reset(self, seconds=None):
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
@ -99,7 +98,7 @@ def process(data, interval, args, insert_function, final):
# Estimate sampling frequency from timestamps
ts_min = timestamp_to_seconds(data[0][0])
ts_max = timestamp_to_seconds(data[-1][0])
if ts_min >= ts_max: # pragma: no cover; process_numpy shouldn't send this
if ts_min >= ts_max:
return 0
fs = (rows-1) / (ts_max - ts_min)
@ -120,7 +119,7 @@ def process(data, interval, args, insert_function, final):
while start < (rows - N):
this = data[start:start+N, column]
t_min = timestamp_to_seconds(data[start, 0])
# t_max = timestamp_to_seconds(data[start+N-1, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = nilmtools.math.sfit4(this, fs)
@ -139,13 +138,13 @@ def process(data, interval, args, insert_function, final):
start += N
continue
# p.plot(arange(N), this)
# p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
#p.plot(arange(N), this)
#p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of sine is 0 degrees,
# so we're looking for sample number:
# n = (0 - phi) / (f0/fs * 2 * pi)
zc_n = (0 - phi) / (f0 / fs * 2 * numpy.pi)
zc_n = (0 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
@ -156,16 +155,14 @@ def process(data, interval, args, insert_function, final):
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
# p.plot(zc_n, C, 'ro')
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else: # pragma: no cover -- this is hard to trigger,
# if it's even possible at all; I think it would require
# some jitter in how the waves fit, across a window boundary.
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
@ -178,8 +175,8 @@ def process(data, interval, args, insert_function, final):
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
# p.plot(advance, C, 'go')
# p.show()
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
@ -193,6 +190,5 @@ def process(data, interval, args, insert_function, final):
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
if __name__ == "__main__":
main()

View File

@ -1,35 +1,36 @@
#!/usr/bin/env python3
#!/usr/bin/python
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
import nilmtools.math
from nilmdb.utils.time import timestamp_to_seconds
import datetime_tz
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils import datetime_tz
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
import sys
import time
import functools
import collections
class DataError(ValueError):
pass
def build_column_mapping(colinfo, streaminfo):
"""Given the 'columns' list from the JSON data, verify and
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
col_num = c['index'] + 1 # skip timestamp
if (c['name'] in list(columns.keys()) or
col_num in list(columns.values())):
if (c['name'] in columns.keys() or col_num in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
@ -38,9 +39,8 @@ def build_column_mapping(colinfo, streaminfo):
raise DataError("no columns")
return columns
class Exemplar(object):
def __init__(self, exinfo, min_rows=10, max_rows=100000):
def __init__(self, exinfo, min_rows = 10, max_rows = 100000):
"""Given a dictionary entry from the 'exemplars' input JSON,
verify the stream, columns, etc. Then, fetch all the data
into self.data."""
@ -63,8 +63,7 @@ class Exemplar(object):
self.columns = build_column_mapping(exinfo['columns'], self.info)
# Count points
self.count = self.client.stream_count(self.stream,
self.start, self.end)
self.count = self.client.stream_count(self.stream, self.start, self.end)
# Verify count
if self.count == 0:
@ -78,13 +77,13 @@ class Exemplar(object):
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows=self.count)
maxrows = self.count)
self.data = list(datagen)[0]
# Extract just the columns that were specified in self.columns,
# skipping the timestamp.
extract_cols = [value for (key, value) in list(self.columns.items())]
self.data = self.data[:, extract_cols]
extract_columns = [ value for (key, value) in self.columns.items() ]
self.data = self.data[:,extract_columns]
# Fix the column indices in e.columns, since we removed/reordered
# columns in self.data
@ -103,23 +102,20 @@ class Exemplar(object):
def __str__(self):
return sprintf("\"%s\" %s [%s] %s rows",
self.name, self.stream,
",".join(list(self.columns.keys())),
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
def trainola_matcher(data, interval, args, insert_func, final_chunk):
"""Perform cross-correlation match"""
(src_columns, dest_count, exemplars) = args
( src_columns, dest_count, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([x.count for x in exemplars])
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
@ -205,12 +201,11 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
# Return how many rows we processed
valid = max(valid, 0)
printf(" [%s] matched %d exemplars in %d rows\n",
timestamp_to_short_human(data[0][0]), np.sum(out[:, 1:]), valid)
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
return valid
def trainola(conf):
print("Trainola", nilmtools.__version__)
print "Trainola", nilmtools.__version__
# Load main stream data
url = conf['url']
@ -232,14 +227,12 @@ def trainola(conf):
raise DataError("destination path '" + dest_path + "' does not exist")
printf("Source:\n")
printf(" %s [%s]\n", src.path, ",".join(list(src_columns.keys())))
printf(" %s [%s]\n", src.path, ",".join(src_columns.keys()))
printf("Destination:\n")
printf(" %s (%s columns)\n", dest.path, dest.layout_count)
# Pull in the exemplar data
exemplars = []
if 'exemplars' not in conf:
raise DataError("missing exemplars")
for n, exinfo in enumerate(conf['exemplars']):
printf("Loading exemplar %d:\n", n)
e = Exemplar(exinfo)
@ -261,16 +254,16 @@ def trainola(conf):
"available in source data", n, col))
# Figure out which intervals we should process
intervals = (Interval(s, e) for (s, e) in
intervals = ( Interval(s, e) for (s, e) in
src_client.stream_intervals(src_path,
diffpath=dest_path,
start=start, end=end))
diffpath = dest_path,
start = start, end = end) )
intervals = nilmdb.utils.interval.optimize(intervals)
# Do the processing
rows = 100000
extractor = functools.partial(src_client.stream_extract_numpy,
src.path, layout=src.layout, maxrows=rows)
src.path, layout = src.layout, maxrows = rows)
inserter = functools.partial(dest_client.stream_insert_numpy_context,
dest.path)
start = time.time()
@ -288,9 +281,8 @@ def trainola(conf):
printf("Done. Processed %.2f seconds per second.\n",
processed_time / elapsed)
def main(argv=None):
import json
def main(argv = None):
import simplejson as json
import sys
if argv is None:
@ -314,12 +306,12 @@ def main(argv=None):
try:
# Passed in a JSON string (e.g. on the command line)
conf = json.loads(argv[0])
except TypeError:
except TypeError as e:
# Passed in the config dictionary (e.g. from NilmRun)
conf = argv[0]
return trainola(conf)
if __name__ == "__main__":
main()

View File

@ -1,7 +0,0 @@
nilmdb>=2.0.3
numpy==1.19.1
scipy==1.5.2
python-daemon==2.2.4
docutils==0.16
lockfile==0.12.2
psutil==5.7.2

View File

@ -1,39 +0,0 @@
[aliases]
test = nosetests
[nosetests]
# Note: values must be set to 1, and have no comments on the same line,
# for "python setup.py nosetests" to work correctly.
nocapture=1
# Comment this out to see CherryPy logs on failure:
nologcapture=1
with-coverage=1
cover-inclusive=1
cover-package=nilmtools
cover-erase=1
# this works, puts html output in cover/ dir:
# cover-html=1
# need nose 1.1.3 for this:
# cover-branches=1
#debug=nose
#debug-log=nose.log
stop=1
verbosity=2
tests=tests
[versioneer]
VCS=git
style=pep440
versionfile_source=nilmtools/_version.py
versionfile_build=nilmtools/_version.py
tag_prefix=nilmtools-
parentdir_prefix=nilmtools-
[flake8]
exclude=_version.py
extend-ignore=E731
per-file-ignores=math.py:F403,F405
[pylint]
ignore=_version.py
disable=C0103,C0111,R0913,R0914

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/python
# To release a new version, tag it:
# git tag -a nilmtools-1.1 -m "Version 1.1"
@ -6,31 +6,67 @@
# Then just package it up:
# python setup.py sdist
# This is supposed to be using Distribute:
#
# distutils provides a "setup" method.
# setuptools is a set of monkeypatches on top of that.
# distribute is a particular version/implementation of setuptools.
#
# So we don't really know if this is using the old setuptools or the
# Distribute-provided version of setuptools.
import traceback
import sys
import os
from setuptools import setup
try:
from setuptools import setup, find_packages
import distutils.version
except ImportError:
traceback.print_exc()
print "Please install the prerequisites listed in README.txt"
sys.exit(1)
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
# Get list of requirements to use in `install_requires` below. Note
# that we don't make a distinction between things that are actually
# required for end-users vs developers (or use `test_requires` or
# anything else) -- just install everything for simplicity.
install_requires = open('requirements.txt').readlines()
# Hack to workaround logging/multiprocessing issue:
# https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ
try: import multiprocessing
except: pass
# We need a MANIFEST.in. Generate it here rather than polluting the
# repository with yet another setup-related file.
with open("MANIFEST.in", "w") as m:
m.write("""
# Root
include README.txt
include setup.py
include versioneer.py
include Makefile
""")
# Run setup
setup(name='nilmtools',
version = versioneer.get_version(),
cmdclass = versioneer.get_cmdclass(),
url = 'https://git.jim.sh/nilm/nilmtools.git',
url = 'https://git.jim.sh/jim/lees/nilmtools.git',
author = 'Jim Paris',
description = "NILM Database Tools",
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = install_requires,
install_requires = [ 'nilmdb >= 1.8.5',
'numpy',
'scipy',
'python-daemon >= 1.5',
#'matplotlib',
],
packages = [ 'nilmtools',
],
entry_points = {

Binary file not shown.

Binary file not shown.

View File

@ -1,2 +0,0 @@
[/newton/*]
keep = 3

View File

@ -1,4 +0,0 @@
[/newton/*]
keep = 3w
rate = 8000
decimated = false

View File

@ -1,13 +0,0 @@
[/newton/*]
keep = 3w
rate = 8000
decimated = true
[/sf/*]
keep = 0.01h
dummy = xxx
[/empty/foo]
[/nonexistent/bar]

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +0,0 @@
# deltas are in microseconds
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000 2.56439e+05 2.24775e+05 2.92897e+03 4.66646e+03 7.58491e+03 3.57351e+03 -4.34171e+02 2.98819e+03
1000000 2.51903e+05 2.23202e+05 4.23696e+03 3.49363e+03 8.53493e+03 4.29416e+03 8.49573e+02 2.38189e+03
1000000 2.57625e+05 2.20247e+05 5.47017e+03 1.35872e+03 9.18903e+03 4.56136e+03 2.65599e+03 2.60912e+03
1000000 2.63375e+05 2.20706e+05 4.51842e+03 1.80758e+03 8.17208e+03 4.17463e+03 2.57884e+03 3.32848e+03
1000000 2.59221e+05 2.22346e+05 2.98879e+03 3.66264e+03 6.87274e+03 3.94223e+03 1.25928e+03 3.51786e+03
50000000 2.51918e+05 2.22281e+05 4.22677e+03 2.84764e+03 7.78323e+03 3.81659e+03 8.04944e+02 3.46314e+03
1000000 2.54478e+05 2.21701e+05 5.61366e+03 1.02262e+03 9.26581e+03 3.50152e+03 1.29331e+03 3.07271e+03
1000000 2.59568e+05 2.22945e+05 4.97190e+03 1.28250e+03 8.62081e+03 4.06316e+03 1.85717e+03 2.61990e+03
1000000 2.57269e+05 2.23697e+05 3.60527e+03 3.05749e+03 7.22363e+03 4.90330e+03 1.93736e+03 2.35357e+03
1000000 2.52274e+05 2.21438e+05 5.01228e+03 2.86309e+03 7.87115e+03 4.80448e+03 2.18291e+03 2.93397e+03

View File

@ -1,12 +0,0 @@
# deltas are in microseconds
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000 2.56439e+05 2.24775e+05 2.92897e+03 4.66646e+03 7.58491e+03 3.57351e+03 -4.34171e+02 2.98819e+03
1000000 2.51903e+05 2.23202e+05 4.23696e+03 3.49363e+03 8.53493e+03 4.29416e+03 8.49573e+02 2.38189e+03
1000000 2.57625e+05 2.20247e+05 5.47017e+03 1.35872e+03 9.18903e+03 4.56136e+03 2.65599e+03 2.60912e+03
1000000 2.63375e+05 2.20706e+05 4.51842e+03 1.80758e+03 8.17208e+03 4.17463e+03 2.57884e+03 3.32848e+03
1000000 2.59221e+05 2.22346e+05 2.98879e+03 3.66264e+03 6.87274e+03 3.94223e+03 1.25928e+03 3.51786e+03
1000000 2.51918e+05 2.22281e+05 4.22677e+03 2.84764e+03 7.78323e+03 3.81659e+03 8.04944e+02 3.46314e+03
1000000 2.54478e+05 2.21701e+05 5.61366e+03 1.02262e+03 9.26581e+03 3.50152e+03 1.29331e+03 3.07271e+03
1000000 2.59568e+05 2.22945e+05 4.97190e+03 1.28250e+03 8.62081e+03 4.06316e+03 1.85717e+03 2.61990e+03
1000000 2.57269e+05 2.23697e+05 3.60527e+03 3.05749e+03 7.22363e+03 4.90330e+03 1.93736e+03 2.35357e+03
1000000 2.52274e+05 2.21438e+05 5.01228e+03 2.86309e+03 7.87115e+03 4.80448e+03 2.18291e+03 2.93397e+03

View File

@ -1,3 +0,0 @@
# deltas are in microseconds
1000000A 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
BAD_DELTA 2.56439e+05 2.24775e+05 2.92897e+03 4.66646e+03 7.58491e+03 3.57351e+03 -4.34171e+02 2.98819e+03

View File

@ -1,3 +0,0 @@
# comments are cool? what if they contain â†UNICODEâ†<C3A2> or invalid utf-8 like Ã(
2.66568e+05 2.24029e+05 5.16140e+03 2.52517e+03 8.35084e+03 3.72470e+03 1.35534e+03 2.03900e+03
2.57914e+05 2.27183e+05 4.30368e+03 4.13080e+03 7.25535e+03 4.89047e+03 1.63859e+03 1.93496e+03

View File

@ -1,7 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ ]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 34000000,
"end": 34000001,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/train/big",
"start": 0,
"end": 110000,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 34000000,
"end": 36000000,
"dest_column": 0,
"columns": [ { "name": "FOO", "index": 0 } ]
}
]
}

View File

@ -1,8 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "P1", "index": 1 } ]
}

View File

@ -1,7 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 10 } ]
}

View File

@ -1,7 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/a/b",
"stream": "/c/d",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ]
}

View File

@ -1,7 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/a/b",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ]
}

View File

@ -1,7 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ]
}

View File

@ -1,8 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [ ]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/e/f",
"start": 34000000,
"end": 36000000,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 10034000000,
"end": 10035000000,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1,25 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "Big ON",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 34000000,
"end": 36000000,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
},
{ "name": "Big OFF",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 36000000,
"end": 38000000,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1,17 +0,0 @@
{ "url": "http://localhost:32182/",
"dest_stream": "/train/matches2",
"stream": "/train/data",
"start": 0,
"end": 100000000,
"columns": [ { "name": "P1", "index": 0 } ],
"exemplars": [
{ "name": "a",
"url": "http://localhost:32182/",
"stream": "/train/data",
"start": 1000000,
"end": 2000000,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 } ]
}
]
}

View File

@ -1 +0,0 @@
1

View File

@ -1 +0,0 @@
1 2

View File

@ -1,5 +0,0 @@
# this has blank lines
# and timestamps: 2000-01-01 12:00:00
# and then the last is truncated
1 2 3

View File

@ -1 +0,0 @@
1 2 3 4

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python3
import nose
import os
import sys
import glob
from collections import OrderedDict
# Change into parent dir
os.chdir(os.path.dirname(os.path.realpath(__file__)) + "/..")
class JimOrderPlugin(nose.plugins.Plugin):
"""When searching for tests and encountering a directory that
contains a 'test.order' file, run tests listed in that file, in the
order that they're listed. Globs are OK in that file and duplicates
are removed."""
name = 'jimorder'
score = 10000
def prepareTestLoader(self, loader):
def wrap(func):
def wrapper(name, *args, **kwargs):
addr = nose.selector.TestAddress(
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except Exception:
order = None
if order and os.path.exists(order):
files = []
for line in open(order):
line = line.split('#')[0].strip()
if not line:
continue
fn = os.path.join(addr.filename, line.strip())
files.extend(sorted(glob.glob(fn)) or [fn])
files = list(OrderedDict.fromkeys(files))
tests = [ wrapper(fn, *args, **kwargs) for fn in files ]
return loader.suiteClass(tests)
return func(name, *args, **kwargs)
return wrapper
loader.loadTestsFromName = wrap(loader.loadTestsFromName)
return loader
# Use setup.cfg for most of the test configuration. Adding
# --with-jimorder here means that a normal "nosetests" run will
# still work, it just won't support test.order.
if __name__ == "__main__":
nose.main(addplugins = [ JimOrderPlugin() ],
argv = sys.argv + ["--with-jimorder"])

View File

@ -1,3 +0,0 @@
test.py
test*.py

View File

@ -1,911 +0,0 @@
# -*- coding: utf-8 -*-
import nilmtools.copy_one
import nilmtools.cleanup
import nilmtools.copy_one
import nilmtools.copy_wildcard
import nilmtools.decimate_auto
import nilmtools.decimate
import nilmtools.insert
import nilmtools.median
import nilmtools.pipewatch
import nilmtools.prep
import nilmtools.sinefit
import nilmtools.trainola
from nilmdb.utils.interval import Interval
from nose.tools import assert_raises
import unittest
import numpy
import math
import json
import random
from testutil.helpers import *
import subprocess
import traceback
import os
import atexit
import signal
import functools
from urllib.request import urlopen
from nilmtools.filter import ArgumentError
def run_cherrypy_server(path, port, event):
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(path)
server = nilmdb.server.Server(db, host="127.0.0.1",
port=port, stoppable=True)
server.start(blocking = True, event = event)
db.close()
class CommandTester():
url = "http://localhost:32182/"
url2 = "http://localhost:32183/"
@classmethod
def setup_class(cls):
# We need two servers running for "copy_multiple", but
# cherrypy uses globals and can only run once per process.
# Using multiprocessing with "spawn" method should work in
# theory, but is hard to get working when the test suite is
# spawned directly by nosetests (rather than ./run-tests.py).
# Instead, just run the real nilmdb-server that got installed
# along with our nilmdb dependency.
def terminate_servers():
for p in cls.servers:
p.terminate()
atexit.register(terminate_servers)
cls.servers = []
for (path, port) in (("tests/testdb1", 32182),
("tests/testdb2", 32183)):
def listening():
try:
urlopen(f"http://127.0.0.1:{port}/", timeout=0.1)
return True
except Exception as e:
return False
if listening():
raise Exception(f"another server already running on {port}")
recursive_unlink(path)
p = subprocess.Popen(["nilmdb-server",
"--address", "127.0.0.1",
"--database", path,
"--port", str(port),
"--quiet",
"--traceback"],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
for i in range(50):
if listening():
break
time.sleep(0.1)
else:
raise Exception(f"server didn't start on port {port}")
@classmethod
def teardown_class(cls):
for p in cls.servers:
p.terminate()
def run(self, arg_string, infile=None, outfile=None):
"""Run a cmdline client with the specified argument string,
passing the given input. Save the output and exit code."""
os.environ['NILMDB_URL'] = self.url
self.last_args = arg_string
class stdio_wrapper:
def __init__(self, stdin, stdout, stderr):
self.io = (stdin, stdout, stderr)
def __enter__(self):
self.saved = ( sys.stdin, sys.stdout, sys.stderr )
( sys.stdin, sys.stdout, sys.stderr ) = self.io
def __exit__(self, type, value, traceback):
( sys.stdin, sys.stdout, sys.stderr ) = self.saved
# Empty input if none provided
if infile is None:
infile = io.TextIOWrapper(io.BytesIO(b""))
# Capture stderr
errfile = io.TextIOWrapper(io.BytesIO())
if outfile is None:
# If no output file, capture stdout with stderr
outfile = errfile
with stdio_wrapper(infile, outfile, errfile) as s:
try:
args = shlex.split(arg_string)
sys.argv[0] = "test_runner"
self.main(args)
sys.exit(0)
except SystemExit as e:
exitcode = e.code
except Exception as e:
traceback.print_exc()
exitcode = 1
# Capture raw binary output, and also try to decode a Unicode
# string copy.
self.captured_binary = outfile.buffer.getvalue()
try:
outfile.seek(0)
self.captured = outfile.read()
except UnicodeDecodeError:
self.captured = None
self.exitcode = exitcode
def ok(self, arg_string, infile = None):
self.run(arg_string, infile)
if self.exitcode != 0:
self.dump()
eq_(self.exitcode, 0)
def fail(self, arg_string, infile=None, exitcode=None):
self.run(arg_string, infile)
if exitcode is not None and self.exitcode != exitcode:
# Wrong exit code
self.dump()
eq_(self.exitcode, exitcode)
if self.exitcode == 0:
# Success, when we wanted failure
self.dump()
ne_(self.exitcode, 0)
def contain(self, checkstring, contain=True):
if contain:
in_(checkstring, self.captured)
else:
nin_(checkstring, self.captured)
def match(self, checkstring):
eq_(checkstring, self.captured)
def matchfile(self, file):
# Captured data should match file contents exactly
with open(file) as f:
contents = f.read()
if contents != self.captured:
print("--- reference file (first 1000 bytes):\n")
print(contents[0:1000] + "\n")
print("--- captured data (first 1000 bytes):\n")
print(self.captured[0:1000] + "\n")
zipped = itertools.zip_longest(contents, self.captured)
for (n, (a, b)) in enumerate(zipped):
if a != b:
print("--- first difference is at offset", n)
print("--- reference:", repr(a))
print("--- captured:", repr(b))
break
raise AssertionError("captured data doesn't match " + file)
def matchfilecount(self, file):
# Last line of captured data should match the number of
# non-commented lines in file
count = 0
with open(file) as f:
for line in f:
if line[0] != '#':
count += 1
eq_(self.captured.splitlines()[-1], sprintf("%d", count))
def dump(self):
printf("\n===args start===\n%s\n===args end===\n", self.last_args)
printf("===dump start===\n%s===dump end===\n", self.captured)
class TestAllCommands(CommandTester):
def test_00_load_data(self):
client = nilmdb.client.Client(url=self.url)
client.stream_create("/newton/prep", "float32_8")
client.stream_set_metadata("/newton/prep",
{ "description": "newton" })
for ts in ("20120323T1000", "20120323T1002", "20120323T1004"):
start = nilmdb.utils.time.parse_time(ts)
fn = f"tests/data/prep-{ts}"
data = nilmdb.utils.timestamper.TimestamperRate(fn, start, 120)
client.stream_insert("/newton/prep", data);
def test_01_copy(self):
self.main = nilmtools.copy_one.main
client = nilmdb.client.Client(url=self.url)
# basic arguments
self.fail(f"")
self.fail(f"no-such-src no-such-dest")
self.contain("source path no-such-src not found")
self.fail(f"-u {self.url} no-such-src no-such-dest")
# nonexistent dest
self.fail(f"/newton/prep /newton/prep-copy")
self.contain("Destination /newton/prep-copy doesn't exist")
# wrong type
client.stream_create("/newton/prep-copy-wrongtype", "uint16_6")
self.fail(f"/newton/prep /newton/prep-copy-wrongtype")
self.contain("wrong number of fields")
# copy with metadata, and compare
client.stream_create("/newton/prep-copy", "float32_8")
self.ok(f"/newton/prep /newton/prep-copy")
a = list(client.stream_extract("/newton/prep"))
b = list(client.stream_extract("/newton/prep-copy"))
eq_(a, b)
a = client.stream_get_metadata("/newton/prep")
b = client.stream_get_metadata("/newton/prep-copy")
eq_(a, b)
# copy with no metadata
client.stream_create("/newton/prep-copy-nometa", "float32_8")
self.ok(f"--nometa /newton/prep /newton/prep-copy-nometa")
a = list(client.stream_extract("/newton/prep"))
b = list(client.stream_extract("/newton/prep-copy-nometa"))
eq_(a, b)
a = client.stream_get_metadata("/newton/prep")
b = client.stream_get_metadata("/newton/prep-copy-nometa")
ne_(a, b)
def test_02_copy_wildcard(self):
self.main = nilmtools.copy_wildcard.main
client1 = nilmdb.client.Client(url=self.url)
client2 = nilmdb.client.Client(url=self.url2)
# basic arguments
self.fail(f"")
self.fail(f"/newton")
self.fail(f"-u {self.url} -U {self.url} /newton")
self.contain("URL must be different")
# no matches; silent
self.ok(f"-u {self.url} -U {self.url2} /newton")
self.ok(f"-u {self.url} -U {self.url2} /asdf*")
self.ok(f"-u {self.url2} -U {self.url} /newton*")
eq_(client2.stream_list(), [])
# this won't actually copy, but will still create streams
self.ok(f"-u {self.url} -U {self.url2} --dry-run /newton*")
self.contain("Creating destination stream /newton/prep-copy")
eq_(len(list(client2.stream_extract("/newton/prep"))), 0)
# this should copy a bunch
self.ok(f"-u {self.url} -U {self.url2} /*")
self.contain("Creating destination stream /newton/prep-copy", False)
eq_(client1.stream_list(), client2.stream_list())
eq_(list(client1.stream_extract("/newton/prep")),
list(client2.stream_extract("/newton/prep")))
eq_(client1.stream_get_metadata("/newton/prep"),
client2.stream_get_metadata("/newton/prep"))
# repeating it is OK; it just won't recreate streams.
# Let's try with --nometa too
client2.stream_remove("/newton/prep")
client2.stream_destroy("/newton/prep")
self.ok(f"-u {self.url} -U {self.url2} --nometa /newton*")
self.contain("Creating destination stream /newton/prep-copy", False)
self.contain("Creating destination stream /newton/prep", True)
eq_(client1.stream_list(), client2.stream_list())
eq_(list(client1.stream_extract("/newton/prep")),
list(client2.stream_extract("/newton/prep")))
eq_(client2.stream_get_metadata("/newton/prep"), {})
# fill in test cases
self.ok(f"-u {self.url} -U {self.url2} -s 2010 -e 2020 -F /newton*")
def test_03_decimate(self):
self.main = nilmtools.decimate.main
client = nilmdb.client.Client(url=self.url)
# basic arguments
self.fail(f"")
# no dest
self.fail(f"/newton/prep /newton/prep-decimated-1")
self.contain("doesn't exist")
# wrong dest shape
client.stream_create("/newton/prep-decimated-bad", "float32_8")
self.fail(f"/newton/prep /newton/prep-decimated-bad")
self.contain("wrong number of fields")
# bad factor
self.fail(f"/newton/prep -f 1 /newton/prep-decimated-bad")
self.contain("needs to be 2 or more")
# ok, default factor 4
client.stream_create("/newton/prep-decimated-4", "float32_24")
self.ok(f"/newton/prep /newton/prep-decimated-4")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-4")
eq_(a // 4, b)
# factor 10
client.stream_create("/newton/prep-decimated-10", "float32_24")
self.ok(f"/newton/prep -f 10 /newton/prep-decimated-10")
self.contain("Processing")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-10")
eq_(a // 10, b)
# different factor, same target
self.fail(f"/newton/prep -f 16 /newton/prep-decimated-10")
self.contain("Metadata in destination stream")
self.contain("decimate_factor = 10")
self.contain("doesn't match desired data")
self.contain("decimate_factor = 16")
# unless we force it
self.ok(f"/newton/prep -f 16 -F /newton/prep-decimated-10")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-10")
# but all data was already converted, so no more
eq_(a // 10, b)
# if we try to decimate an already-decimated stream, the suggested
# shape is different
self.fail(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
self.contain("create /newton/prep-decimated-16 float32_24")
# decimate again
client.stream_create("/newton/prep-decimated-16", "float32_24")
self.ok(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
self.contain("Processing")
# check shape suggestion for different input types
for (shape, expected) in (("int32_1", "float64_3"),
("uint32_1", "float64_3"),
("int64_1", "float64_3"),
("uint64_1", "float64_3"),
("float32_1", "float32_3"),
("float64_1", "float64_3")):
client.stream_create(f"/test/{shape}", shape)
self.fail(f"/test/{shape} /test/{shape}-decim")
self.contain(f"create /test/{shape}-decim {expected}")
def test_04_decimate_auto(self):
self.main = nilmtools.decimate_auto.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.fail(f"--max -1 asdf")
self.contain("bad max")
self.fail(f"/no/such/stream")
self.contain("no stream matched path")
# normal run
self.ok(f"/newton/prep")
# can't auto decimate a decimated stream
self.fail(f"/newton/prep-decimated-16")
self.contain("need to pass the base stream instead")
# decimate prep again, this time much more; also use -F
self.ok(f"-m 10 --force-metadata /newton/pr??")
self.contain("Level 4096 decimation has 9 rows")
# decimate the different shapes
self.ok(f"/test/*")
self.contain("Level 1 decimation has 0 rows")
def test_05_insert(self):
self.main = nilmtools.insert.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
# mutually exclusive arguments
self.fail(f"--delta --rate 123 /foo bar")
self.fail(f"--live --filename /foo bar")
# Insert from file
client.stream_create("/insert/prep", "float32_8")
t0 = "tests/data/prep-20120323T1000"
t2 = "tests/data/prep-20120323T1002"
t4 = "tests/data/prep-20120323T1004"
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t4}")
self.contain("Dry run")
# wrong rate
self.fail(f"--file --dry-run --rate 10 /insert/prep {t0} {t2} {t4}")
self.contain("Data is coming in too fast")
# skip forward in time
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t4}")
self.contain("data timestamp behind by 120")
self.contain("Skipping data timestamp forward")
# skip backwards in time
self.fail(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t0}")
self.contain("data timestamp ahead by 240")
# skip backwards in time is OK if --skip provided
self.ok(f"--skip -f -D -r 120 insert/prep {t0} {t2} {t0} {t4}")
self.contain("Skipping the remainder of this file")
# Now insert for real
self.ok(f"--skip --file --rate 120 /insert/prep {t0} {t2} {t4}")
self.contain("Done")
# Overlap
self.fail(f"--skip --file --rate 120 /insert/prep {t0}")
self.contain("new data overlaps existing data")
# Not overlap if we change file offset
self.ok(f"--skip --file --rate 120 -o 0 /insert/prep {t0}")
# Data with no timestamp
self.fail(f"-f -r 120 /insert/prep tests/data/prep-notime")
self.contain("No idea what timestamp to use")
# Check intervals so far
eq_(list(client.stream_intervals("/insert/prep")),
[[1332507600000000, 1332507959991668],
[1332511200000000, 1332511319991668]])
# Delta supplied by file
self.ok(f"--file --delta -o 0 /insert/prep {t4}-delta")
eq_(list(client.stream_intervals("/insert/prep")),
[[1332507600000000, 1332507959991668],
[1332511200000000, 1332511319991668],
[1332511440000000, 1332511499000001]])
# Now fake live timestamps by using the delta file, and a
# fake clock that increments one second per call.
def fake_time_now():
nonlocal fake_time_base
ret = fake_time_base
fake_time_base += 1000000
return ret
real_time_now = nilmtools.insert.time_now
nilmtools.insert.time_now = fake_time_now
# Delta supplied by file. This data is too fast because delta
# contains a 50 sec jump
fake_time_base = 1332511560000000
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta")
self.contain("Data is coming in too fast")
self.contain("data time is Fri, 23 Mar 2012 10:06:55")
self.contain("clock time is only Fri, 23 Mar 2012 10:06:06")
# This data is OK, no jump
fake_time_base = 1332511560000000
self.ok(f"--live --delta -o 0 /insert/prep {t4}-delta2")
# This has unparseable delta
fake_time_base = 1332511560000000
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta3")
self.contain("can't parse delta")
# Insert some gzipped data, with no timestamp in name
bp1 = "tests/data/bpnilm-raw-1.gz"
bp2 = "tests/data/bpnilm-raw-2.gz"
client.stream_create("/insert/raw", "uint16_6")
self.ok(f"--file /insert/raw {bp1} {bp2}")
# Try truncated data
tr = "tests/data/trunc"
self.ok(f"--file /insert/raw {tr}1 {tr}2 {tr}3 {tr}4")
nilmtools.insert.time_now = real_time_now
def generate_sine_data(self, client, path, data_sec, fs, freq):
# generate raw data
client.stream_create(path, "uint16_2")
with client.stream_insert_context(path) as ctx:
for n in range(fs * data_sec):
t = n / fs
v = math.sin(t * 2 * math.pi * freq)
i = 0.3 * math.sin(3*t) + math.sin(t)
line = b"%d %d %d\n" % (
(t + 1234567890) * 1e6,
v * 32767 + 32768,
i * 32768 + 32768)
ctx.insert(line)
if 0:
for (s, e) in client.stream_intervals(path):
print(Interval(s,e).human_string())
def test_06_sinefit(self):
self.main = nilmtools.sinefit.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.generate_sine_data(client, "/sf/raw", 50, 8000, 60)
client.stream_create("/sf/out-bad", "float32_4")
self.fail(f"--column 1 /sf/raw /sf/out-bad")
self.contain("wrong number of fields")
self.fail(f"--column 1 /sf/raw /sf/out")
self.contain("/sf/out doesn't exist")
# basic run
client.stream_create("/sf/out", "float32_3")
self.ok(f"--column 1 /sf/raw /sf/out")
eq_(client.stream_count("/sf/out"), 3000)
# parameter errors
self.fail(f"--column 0 /sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"/sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"-c 1 --frequency 0 /sf/raw /sf/out")
self.contain("frequency must be")
self.fail(f"-c 1 --min-freq 100 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --max-freq 5 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --min-amp -1 /sf/raw /sf/out")
self.contain("min amplitude must be")
# trigger some warnings
client.stream_create("/sf/out2", "float32_3")
self.ok(f"-c 1 -f 500 -e @1234567897000000 /sf/raw /sf/out2")
self.contain("outside valid range")
self.contain("1000 warnings suppressed")
eq_(client.stream_count("/sf/out2"), 0)
self.ok(f"-c 1 -a 40000 -e @1234567898000000 /sf/raw /sf/out2")
self.contain("below minimum threshold")
# get coverage for "advance = N/2" line near end of sinefit,
# where we found a fit but it was after the end of the window,
# so we didn't actually mark anything in this window.
self.ok(f"-c 1 -f 240 -m 50 -e @1234567898010000 /sf/raw /sf/out2")
def test_07_median(self):
self.main = nilmtools.median.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
client.stream_create("/median/1", "float32_8")
client.stream_create("/median/2", "float32_8")
self.fail("/newton/prep /median/0")
self.contain("doesn't exist")
self.ok("/newton/prep /median/1")
self.ok("--difference /newton/prep /median/2")
def test_08_prep(self):
self.main = nilmtools.prep.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.fail(f"-c 2 /sf/raw /sf/out /prep/out")
self.contain("/prep/out doesn't exist")
# basic usage
client.stream_create("/prep/out", "float32_8")
self.ok(f"-c 2 /sf/raw /sf/out /prep/out")
self.contain("processed 100000")
# test arguments
self.fail(f"/sf/raw /sf/out /prep/out")
self.contain("need a column number")
self.fail(f"-c 0 /sf/raw /sf/out /prep/out")
self.contain("need a column number")
self.fail(f"-c 2 -n 3 /sf/raw /sf/out /prep/out")
self.contain("need 6 columns")
self.fail(f"-c 2 -n 0 /sf/raw /sf/out /prep/out")
self.contain("number of odd harmonics must be")
self.fail(f"-c 2 -N 0 /sf/raw /sf/out /prep/out")
self.contain("number of shifted FFTs must be")
self.ok(f"-c 2 -r 0 /sf/raw /sf/out /prep/out")
self.ok(f"-c 2 -R 0 /sf/raw /sf/out /prep/out")
self.fail(f"-c 2 -r 0 -R 0 /sf/raw /sf/out /prep/out")
self.fail(f"-c 2 /sf/raw /sf/no-sinefit-data /prep/out")
self.contain("sinefit data not found")
self.fail(f"-c 2 /sf/raw /prep/out /prep/out")
self.contain("sinefit data type is float32_8; expected float32_3")
# Limit time so only one row gets passed in
client.stream_create("/prep/tmp", "float32_8")
s = 1234567890000000
e = 1234567890000125
self.ok(f"-c 2 -s {s} -e {e} /sf/raw /sf/out /prep/tmp")
# Lower sampling rate on everything, so that the FFT doesn't
# return all the harmonics, and prep has to fill with zeros.
# Tests the "if N < (nharm * 2):" condition in prep
self.generate_sine_data(client, "/sf/raw-low", 5, 100, 60)
self.main = nilmtools.sinefit.main
client.stream_create("/sf/out-low", "float32_3")
self.ok(f"--column 1 /sf/raw-low /sf/out-low")
self.main = nilmtools.prep.main
client.stream_create("/prep/out-low", "float32_8")
self.ok(f"-c 2 /sf/raw-low /sf/out-low /prep/out-low")
# Test prep with empty sinefit data
client.stream_create("/sf/out-empty", "float32_3")
with client.stream_insert_context("/sf/out-empty",
1034567890123456,
2034567890123456):
pass
client.stream_create("/prep/out-empty", "float32_8")
self.ok(f"-c 2 /sf/raw /sf/out-empty /prep/out-empty")
self.contain("warning: no periods found; skipping")
def generate_trainola_data(self):
# Build some fake data for trainola, which is just pulses of varying
# length.
client = nilmdb.client.Client(url=self.url)
total_sec = 100
fs = 100
rg = numpy.random.Generator(numpy.random.MT19937(1234567))
path = "/train/data"
# Just build up some random pulses. This uses seeded random numbers,
# so any changes here will affect the success/failures of tests later.
client.stream_create(path, "float32_1")
with client.stream_insert_context(path) as ctx:
remaining = 0
for n in range(fs * total_sec):
t = n / fs
data = rg.normal(100) / 100 - 1
if remaining > 0:
remaining -= 1
data += 1
else:
if rg.integers(fs * 10 * total_sec) < fs:
if rg.integers(3) < 2:
remaining = fs*2
else:
remaining = fs/2
line = b"%d %f\n" % (t * 1e6, data)
ctx.insert(line)
# To view what was made, try:
if 0:
subprocess.call(f"nilmtool -u {self.url} extract -s min -e max " +
f"{path} > /tmp/data", shell=True)
# then in Octave: a=load("/tmp/data"); plot(a(:,2));
if 0:
for (s, e) in client.stream_intervals(path):
print(Interval(s,e).human_string())
# Also generate something with more than 100k data points
client.stream_create("/train/big", "uint8_1")
with client.stream_insert_context("/train/big") as ctx:
for n in range(110000):
ctx.insert(b"%d 0\n" % n)
def test_09_trainola(self):
self.main = nilmtools.trainola.main
client = nilmdb.client.numpyclient.NumpyClient(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.ok(f"--version")
self.generate_trainola_data()
def get_json(path):
with open(path) as f:
js = f.read().replace('\n', ' ')
return f"'{js}'"
# pass a dict as argv[0]
with assert_raises(KeyError):
saved_stdout = sys.stdout
try:
with open(os.devnull, 'w') as sys.stdout:
nilmtools.trainola.main([{ "url": self.url }])
finally:
sys.stdout = saved_stdout
# pass no args and they come from sys.argv
saved_argv = sys.argv
try:
sys.argv = [ "prog", "bad-json," ]
with assert_raises(json.decoder.JSONDecodeError):
nilmtools.trainola.main()
finally:
sys.argv = saved_argv
# catch a bunch of errors based on different json input
client.stream_create("/train/matches", "uint8_1")
for (num, error) in [ (1, "no columns"),
(2, "duplicated columns"),
(3, "bad column number"),
(4, "source path '/c/d' does not exist"),
(5, "destination path '/a/b' does not exist"),
(6, "missing exemplars"),
(7, "missing exemplars"),
(8, "exemplar stream '/e/f' does not exist"),
(9, "No data in this exemplar"),
(10, "Too few data points"),
(11, "Too many data points"),
(12, "column FOO is not available in source") ]:
self.fail(get_json(f"tests/data/trainola-bad{num}.js"))
self.contain(error)
# not enough columns in dest
self.fail(get_json("tests/data/trainola1.js"))
self.contain("bad destination column number")
# run normally
client.stream_destroy("/train/matches")
client.stream_create("/train/matches", "uint8_2")
self.ok(get_json("tests/data/trainola1.js"))
self.contain("matched 10 exemplars")
# check actual matches, since we made up the data
matches = list(client.stream_extract_numpy("/train/matches"))
eq_(matches[0].tolist(), [[34000000, 1, 0],
[36000000, 0, 1],
[40800000, 1, 0],
[42800000, 0, 1],
[60310000, 1, 0],
[62310000, 0, 1],
[69290000, 1, 0],
[71290000, 0, 1],
[91210000, 1, 0],
[93210000, 0, 1]])
# another run using random noise as an exemplar, to get better coverage
client.stream_create("/train/matches2", "uint8_1")
self.ok(get_json("tests/data/trainola2.js"))
def test_10_pipewatch(self):
self.main = nilmtools.pipewatch.main
self.fail(f"")
self.ok(f"--help")
lock = "tests/pipewatch.lock"
lk = f"--lock {lock}"
try:
os.unlink(lock)
except OSError:
pass
# try locking so pipewatch will exit (with code 0)
lockfile = open(lock, "w")
nilmdb.utils.lock.exclusive_lock(lockfile)
self.ok(f"{lk} true true")
self.contain("pipewatch process already running")
os.unlink(lock)
# have pipewatch remove its own lock to trigger error later
self.ok(f"{lk} 'rm {lock}' true")
# various cases to get coverage
self.ok(f"{lk} true 'cat >/dev/null'")
self.contain("generator returned 0, consumer returned 0")
self.fail(f"{lk} false true")
self.contain("generator returned 1, consumer returned 0")
self.fail(f"{lk} false false")
self.contain("generator returned 1, consumer returned 1")
self.fail(f"{lk} true false")
self.contain("generator returned 0, consumer returned 1")
self.fail(f"{lk} 'kill -15 $$' true")
self.ok(f"{lk} 'sleep 1 ; echo hi' 'cat >/dev/null'")
self.ok(f"{lk} 'echo hi' 'cat >/dev/null'")
self.fail(f"{lk} --timeout 0.5 'sleep 10 ; echo hi' 'cat >/dev/null'")
self.fail(f"{lk} 'yes' 'head -1 >/dev/null'")
self.fail(f"{lk} false 'exec 2>&-; trap \"sleep 10\" 0 15 ; sleep 10'")
def test_11_cleanup(self):
self.main = nilmtools.cleanup.main
client = nilmdb.client.Client(url=self.url)
# This mostly just gets coverage, doesn't carefully verify behavior
self.fail(f"")
self.ok(f"--help")
self.fail(f"tests/data/cleanup-bad.cfg")
self.contain("unknown units")
client.stream_create("/empty/foo", "uint16_1")
self.ok(f"tests/data/cleanup.cfg")
self.contain("'/nonexistent/bar' did not match any existing streams")
self.contain("no config for existing stream '/empty/foo'")
self.contain("nothing to do (only 0.00 weeks of data present)")
self.contain("specify --yes to actually perform")
self.ok(f"--yes tests/data/cleanup.cfg")
self.contain("removing data before")
self.contain("removing from /sf/raw")
self.ok(f"--estimate tests/data/cleanup.cfg")
self.contain("Total estimated disk usage")
self.contain("MiB")
self.contain("GiB")
self.ok(f"--yes tests/data/cleanup-nodecim.cfg")
self.ok(f"--estimate tests/data/cleanup-nodecim.cfg")
def test_12_misc(self):
# Fill in test cases that were missed by earlier code:
# math.py
with assert_raises(ValueError):
nilmtools.math.sfit4([1], 5)
nilmtools.math.sfit4([1,2], 5)
# filter.py
client = nilmdb.client.numpyclient.NumpyClient(self.url)
client.stream_create("/misc/a", "uint8_1")
client.stream_create("/misc/b", "uint8_1")
with client.stream_insert_context("/misc/a") as ctx:
for n in range(10000):
ctx.insert(b"%d 0\n" % n)
pni = nilmtools.filter.process_numpy_interval
src = nilmtools.filter.get_stream_info(client, "/misc/a")
extractor = functools.partial(
client.stream_extract_numpy, "/misc/a",
layout=src.layout, maxrows=1000)
inserter = functools.partial(
client.stream_insert_numpy_context, "/misc/b")
def func1(*args):
return 0
def func2(*args):
return -1
def func3(array, interval, args, insert_func, last):
if last:
return array.shape[0]
return 0
saved = (sys.stdout, sys.stderr)
try:
with open(os.devnull, 'w') as sys.stdout:
with open(os.devnull, 'w') as sys.stderr:
pni(Interval(0, 10000), extractor, inserter, 100, func1)
with assert_raises(SystemExit):
f = nilmtools.filter.Filter("hello world")
finally:
(sys.stdout, sys.stderr) = saved
with assert_raises(Exception):
pni(Interval(0, 10000), extractor, inserter, 100, func2)
pni(Interval(0, 10000), extractor, inserter, 100000, func3)
with assert_raises(NotImplementedError):
pni(Interval(0, 10000), extractor, inserter, 100000,
nilmtools.filter.example_callback_function)
self.main = nilmtools.filter.main
self.fail(f"")
self.ok(f"--help")
self.fail(f"/misc/a /misc/a")
self.contain("must be different")
self.fail(f"--start HELLOWORLD /misc/a /misc/a")
self.contain("not enough digits for a timestamp")
client.stream_create("/misc/c", "uint8_1")
self.ok(f"--quiet /misc/a /misc/c")
self.contain("Source: /misc/a", False)
self.contain("Generic filter: need to handle")
f = nilmtools.filter.Filter()
parser = f.setup_parser()
args = f.parse_args(["--quiet", "/misc/a", "/misc/c"])
x = f.client_src
x = f.client_dest
for i in f.intervals():
with assert_raises(Exception) as e:
x = f.client_src
in_("client is in use", str(e.exception))
with assert_raises(Exception) as e:
x = f.client_dest
in_("client is in use", str(e.exception))

View File

@ -1 +0,0 @@
# empty

View File

@ -1,64 +0,0 @@
# Just some helpers for test functions
import io
import os
import re
import sys
import time
import shlex
import shutil
import nilmdb.server
import nilmdb.utils
import nilmdb.utils.timestamper
from nilmdb.utils.printf import printf
def myrepr(x):
if isinstance(x, str):
return '"' + x + '"'
else:
return repr(x)
def eq_(a, b):
if not a == b:
raise AssertionError("%s != %s" % (myrepr(a), myrepr(b)))
def lt_(a, b):
if not a < b:
raise AssertionError("%s is not less than %s" % (myrepr(a), myrepr(b)))
def in_(a, b):
if a not in b:
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b)))
def nin_(a, b):
if a in b:
raise AssertionError("unexpected %s in %s" % (myrepr(a), myrepr(b)))
def in2_(a1, a2, b):
if a1 not in b and a2 not in b:
raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2),
myrepr(b)))
def ne_(a, b):
if not a != b:
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
def lines_(a, n):
l = a.count('\n')
if not l == n:
if len(a) > 5000:
a = a[0:5000] + " ... truncated"
raise AssertionError("wanted %d lines, got %d in output: '%s'"
% (n, l, a))
def recursive_unlink(path):
try:
shutil.rmtree(path)
except OSError:
pass
try:
os.unlink(path)
except OSError:
pass

File diff suppressed because it is too large Load Diff