# -*- coding: utf-8 -*- import nilmrun.server from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError from nilmdb.utils.printf import * from nose.plugins.skip import SkipTest from nose.tools import * from nose.tools import assert_raises import itertools import distutils.version import os import sys import threading import cStringIO import simplejson as json import unittest import warnings import time import re import urllib2 from urllib2 import urlopen, HTTPError import requests import pprint from testutil.helpers import * testurl = "http://localhost:32181/" def setup_module(): global test_server # Start web app on a custom port test_server = nilmrun.server.Server(host = "127.0.0.1", port = 32181, force_traceback = True) test_server.start(blocking = False) def teardown_module(): global test_server # Close web app test_server.stop() class TestClient(object): def wait_end(self, client, pid, timeout = 5): start = time.time() status = None while (time.time() - start) < timeout: status = client.get("/process/status", { "pid": pid }) if status["alive"] == False: return status raise AssertionError("process " + str(pid) + " didn't die in " + str(timeout) + " seconds: " + repr(status)) def test_client_01_basic(self): client = HTTPClient(baseurl = testurl) version = client.get("/version") eq_(distutils.version.LooseVersion(version), distutils.version.LooseVersion(nilmrun.__version__)) in_("This is NilmRun", client.get("/")) with assert_raises(ClientError): client.get("/favicon.ico") def test_client_02_manager(self): client = HTTPClient(baseurl = testurl) eq_(client.get("/process/list"), []) with assert_raises(ClientError) as e: client.get("/process/status", { "pid": 12345 }) in_("No such PID", str(e.exception)) with assert_raises(ClientError): client.get("/process/remove", { "pid": 12345 }) in_("No such PID", str(e.exception)) def test_client_03_process_basic(self): client = HTTPClient(baseurl = testurl, post_json = True) # start dummy filter pid = client.post("/run/dummy", { "data": 30 }) eq_(client.get("/process/list"), [pid]) time.sleep(1) # Verify that status looks OK status = client.get("/process/status", { "pid": pid, "clear": True }) for x in [ "pid", "alive", "exitcode", "name", "start_time", "parameters", "log" ]: in_(x, status) in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) eq_(status["alive"], True) eq_(status["exitcode"], None) # Check that the log got cleared status = client.get("/process/status", { "pid": pid }) nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) # See that it ended properly status = self.wait_end(client, pid) in_("dummy 27\ndummy 28\ndummy 29\n", status["log"]) eq_(status["exitcode"], 0) # Remove it killstatus = client.post("/process/remove", { "pid": pid }) eq_(status, killstatus) eq_(client.get("/process/list"), []) with assert_raises(ClientError) as e: client.post("/process/remove", { "pid": pid }) in_("No such PID", str(e.exception)) def test_client_04_process_terminate(self): client = HTTPClient(baseurl = testurl, post_json = True) # Trigger exception in filter pid = client.post("/run/dummy", { "data": -1 }) time.sleep(0.5) status = client.get("/process/status", { "pid": pid }) eq_(status["alive"], False) eq_(status["exitcode"], 1) in_("Exception: test exception", status["log"]) client.post("/process/remove", { "pid": pid }) # Kill a running filter by removing it early newpid = client.post("/run/dummy", { "data": 50 }) ne_(newpid, pid) time.sleep(0.5) start = time.time() status = client.post("/process/remove", { "pid": newpid }) elapsed = time.time() - start # Should have died in slightly over 1 second assert(0.5 < elapsed < 2) eq_(status["alive"], False) ne_(status["exitcode"], 0) # No more eq_(client.get("/process/list"), []) # Try to remove a running filter that ignored SIGTERM pid = client.post("/run/dummy", { "data": 0 }) start = time.time() status = client.post("/process/remove", { "pid": pid }) elapsed = time.time() - start # Should have died in slightly over 2 seconds assert(1.5 < elapsed < 3) eq_(status["alive"], False) ne_(status["exitcode"], 0) def test_client_05_trainola_simple(self): client = HTTPClient(baseurl = testurl, post_json = True) pid = client.post("/run/trainola", { "data": {} }) status = self.wait_end(client, pid) ne_(status["exitcode"], 0) status = client.post("/process/remove", { "pid": pid }) @unittest.skip("needs a running nilmdb") def test_client_06_trainola(self): client = HTTPClient(baseurl = testurl, post_json = True) data = { "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", "start": 1366111383280463, "end": 1366126163457797, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 }, { "name": "P3", "index": 2 } ], "exemplars": [ { "name": "Boiler Pump ON", "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", "start": 1366260494269078, "end": 1366260608185031, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 } ] }, { "name": "Boiler Pump OFF", "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", "start": 1366260864215764, "end": 1366260870882998, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 } ] } ] } # start trainola pid = client.post("/run/trainola", { "data": data }) # wait for it to finish for i in range(60): time.sleep(1) if i == 2: status = client.get("/process/status", { "pid": pid, "clear": True }) in_("Loading stream data", status['log']) elif i == 3: status = client.get("/process/status", { "pid": pid }) nin_("Loading stream data", status['log']) else: status = client.get("/process/status", { "pid": pid }) if status["alive"] == False: break else: client.post("/process/remove", {"pid": pid }) raise AssertionError("took too long") if i < 3: raise AssertionError("too fast?") def test_client_07_process_command(self): client = HTTPClient(baseurl = testurl, post_json = True) eq_(client.get("/process/list"), []) def do(args, kill): pid = client.post("/run/command", { "args": args } ) eq_(client.get("/process/list"), [pid]) if kill: time.sleep(1) status = client.get("/process/status", { "pid": pid }) if not status["alive"]: raise AssertionError("died before we could kill it") status = client.post("/process/remove", { "pid": pid }) if status["alive"]: raise AssertionError("didn't get killed") else: self.wait_end(client, pid) status = client.post("/process/remove", { "pid": pid }) return status # Simple command status = do(["pwd"], False) eq_(status["exitcode"], 0) eq_("/tmp\n", status["log"]) # Command with args status = do(["expr", "1", "+", "2"], False) eq_(status["exitcode"], 0) eq_("3\n", status["log"]) # Missing command status = do(["/no-such-command-blah-blah"], False) ne_(status["exitcode"], 0) # Kill a slow command status = do(["sleep", "60"], True) ne_(status["exitcode"], 0)