# -*- coding: utf-8 -*- import nilmdb.server import nilmdb.client from nilmdb.utils.printf import * from nilmdb.utils import timestamper from nilmdb.client import ClientError, ServerError from nilmdb.utils import datetime_tz from nose.plugins.skip import SkipTest from nose.tools import * from nose.tools import assert_raises import itertools import distutils.version import os import sys import threading import cStringIO import simplejson as json import unittest import warnings import resource import time import re from testutil.helpers import * testdb = "tests/client-testdb" testurl = "http://localhost:32180/" def setup_module(): global test_server, test_db # Clear out DB recursive_unlink(testdb) # Start web app on a custom port test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", port = 32180, stoppable = False, fast_shutdown = True, force_traceback = True) test_server.start(blocking = False) def teardown_module(): global test_server, test_db # Close web app test_server.stop() test_db.close() class TestClient(object): def test_client_01_basic(self): # Test a fake host client = nilmdb.client.Client(url = "http://localhost:1/") with assert_raises(nilmdb.client.ServerError): client.version() client.close() # Then a fake URL on a real host client = nilmdb.client.Client(url = "http://localhost:32180/fake/") with assert_raises(nilmdb.client.ClientError): client.version() client.close() # Now a real URL with no http:// prefix client = nilmdb.client.Client(url = "localhost:32180") version = client.version() client.close() # Now use the real URL client = nilmdb.client.Client(url = testurl) version = client.version() eq_(distutils.version.LooseVersion(version), distutils.version.LooseVersion(test_server.version)) # Bad URLs should give 404, not 500 with assert_raises(ClientError): client.http.get("/stream/create") client.close() def test_client_02_createlist(self): # Basic stream tests, like those in test_nilmdb:test_stream client = nilmdb.client.Client(url = testurl) # Database starts empty eq_(client.stream_list(), []) # Bad path with assert_raises(ClientError): client.stream_create("foo/bar/baz", "float32_8") with assert_raises(ClientError): client.stream_create("/foo", "float32_8") # Bad layout type with assert_raises(ClientError): client.stream_create("/newton/prep", "NoSuchLayout") # Bad method types with assert_raises(ClientError): client.http.put("/stream/list","") # Try a bunch of times to make sure the request body is getting consumed for x in range(10): with assert_raises(ClientError): client.http.post("/stream/list") client = nilmdb.client.Client(url = testurl) # Create three streams client.stream_create("/newton/prep", "float32_8") client.stream_create("/newton/raw", "uint16_6") client.stream_create("/newton/zzz/rawnotch", "uint16_9") # Verify we got 3 streams eq_(client.stream_list(), [ ["/newton/prep", "float32_8"], ["/newton/raw", "uint16_6"], ["/newton/zzz/rawnotch", "uint16_9"] ]) # Match just one type or one path eq_(client.stream_list(layout="uint16_6"), [ ["/newton/raw", "uint16_6"] ]) eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "uint16_6"] ]) # Try messing with resource limits to trigger errors and get # more coverage. Here, make it so we can only create files 1 # byte in size, which will trigger an IOError in the server when # we create a table. limit = resource.getrlimit(resource.RLIMIT_FSIZE) resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1])) with assert_raises(ServerError) as e: client.stream_create("/newton/hello", "uint16_6") resource.setrlimit(resource.RLIMIT_FSIZE, limit) client.close() def test_client_03_metadata(self): client = nilmdb.client.Client(url = testurl) # Set / get metadata eq_(client.stream_get_metadata("/newton/prep"), {}) eq_(client.stream_get_metadata("/newton/raw"), {}) meta1 = { "description": "The Data", "v_scale": "1.234" } meta2 = { "description": "The Data" } meta3 = { "v_scale": "1.234" } client.stream_set_metadata("/newton/prep", meta1) client.stream_update_metadata("/newton/prep", {}) client.stream_update_metadata("/newton/raw", meta2) client.stream_update_metadata("/newton/raw", meta3) eq_(client.stream_get_metadata("/newton/prep"), meta1) eq_(client.stream_get_metadata("/newton/raw"), meta1) eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2) eq_(client.stream_get_metadata("/newton/raw", [ "description", "v_scale" ] ), meta1) # missing key eq_(client.stream_get_metadata("/newton/raw", "descr"), { "descr": None }) eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]), { "descr": None }) # test wrong types (list instead of dict) with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", [1,2,3]) with assert_raises(ClientError): client.stream_update_metadata("/newton/prep", [1,2,3]) # test wrong types (dict of non-strings) # numbers are OK; they'll get converted to strings client.stream_set_metadata("/newton/prep", { "hello": 1234 }) # anything else is not with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", { "world": { 1: 2 } }) with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", { "world": [ 1, 2 ] }) client.close() def test_client_04_insert(self): client = nilmdb.client.Client(url = testurl) # Limit _max_data to 1 MB, since our test file is 1.5 MB old_max_data = nilmdb.client.client.StreamInserter._max_data nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024 datetime_tz.localtz_set("America/New_York") testfile = "tests/data/prep-20120323T1000" start = nilmdb.utils.time.parse_time("20120323T1000") rate = 120 # First try a nonexistent path data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/no-such-path", data) in_("404 Not Found", str(e.exception)) # Now try reversed timestamps data = timestamper.TimestamperRate(testfile, start, 120) data = reversed(list(data)) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data) in_("400 Bad Request", str(e.exception)) in2_("timestamp is not monotonically increasing", "start must precede end", str(e.exception)) # Now try empty data (no server request made) empty = cStringIO.StringIO("") data = timestamper.TimestamperRate(empty, start, 120) result = client.stream_insert("/newton/prep", data) eq_(result, None) # It's OK to insert an empty interval client.http.put("stream/insert", "", { "path": "/newton/prep", "start": 1, "end": 2 }) eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Timestamps can be negative too client.http.put("stream/insert", "", { "path": "/newton/prep", "start": -2, "end": -1 }) eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Intervals that end at zero shouldn't be any different client.http.put("stream/insert", "", { "path": "/newton/prep", "start": -1, "end": 0 }) eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Try forcing a server request with equal start and end with assert_raises(ClientError) as e: client.http.put("stream/insert", "", { "path": "/newton/prep", "start": 0, "end": 0 }) in_("400 Bad Request", str(e.exception)) in_("start must precede end", str(e.exception)) # Specify start/end (starts too late) data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data, start + 5000000, start + 120000000) in_("400 Bad Request", str(e.exception)) in_("Data timestamp 1332511200000000 < start time 1332511205000000", str(e.exception)) # Specify start/end (ends too early) data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data, start, start + 1000000) in_("400 Bad Request", str(e.exception)) # Client chunks the input, so the exact timestamp here might change # if the chunk positions change. assert(re.search("Data timestamp 13325[0-9]+ " ">= end time 1332511201000000", str(e.exception)) is not None) # Now do the real load data = timestamper.TimestamperRate(testfile, start, 120) result = client.stream_insert("/newton/prep", data, start, start + 119999777) # Verify the intervals. Should be just one, even if the data # was inserted in chunks, due to nilmdb interval concatenation. intervals = list(client.stream_intervals("/newton/prep")) eq_(intervals, [[start, start + 119999777]]) # Try some overlapping data -- just insert it again data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data) in_("400 Bad Request", str(e.exception)) in_("verlap", str(e.exception)) nilmdb.client.client.StreamInserter._max_data = old_max_data client.close() def test_client_05_extractremove(self): # Misc tests for extract and remove. Most of them are in test_cmdline. client = nilmdb.client.Client(url = testurl) for x in client.stream_extract("/newton/prep", 999123000000, 999124000000): raise AssertionError("shouldn't be any data for this request") with assert_raises(ClientError) as e: client.stream_remove("/newton/prep", 123000000, 120000000) # Test count eq_(client.stream_count("/newton/prep"), 14400) client.close() def test_client_06_generators(self): # A lot of the client functionality is already tested by test_cmdline, # but this gets a bit more coverage that cmdline misses. client = nilmdb.client.Client(url = testurl) # Trigger a client error in generator start = nilmdb.utils.time.parse_time("20120323T2000") end = nilmdb.utils.time.parse_time("20120323T1000") for function in [ client.stream_intervals, client.stream_extract ]: with assert_raises(ClientError) as e: function("/newton/prep", start, end).next() in_("400 Bad Request", str(e.exception)) in_("start must precede end", str(e.exception)) # Trigger a curl error in generator with assert_raises(ServerError) as e: client.http.get_gen("http://nosuchurl/").next() # Trigger a curl error in generator with assert_raises(ServerError) as e: client.http.get_gen("http://nosuchurl/").next() # Check 404 for missing streams for function in [ client.stream_intervals, client.stream_extract ]: with assert_raises(ClientError) as e: function("/no/such/stream").next() in_("404 Not Found", str(e.exception)) in_("No such stream", str(e.exception)) client.close() def test_client_07_headers(self): # Make sure that /stream/intervals and /stream/extract # properly return streaming, chunked, text/plain response. # Pokes around in client.http internals a bit to look at the # response headers. client = nilmdb.client.Client(url = testurl) http = client.http # Use a warning rather than returning a test failure for the # transfer-encoding, so that we can still disable chunked # responses for debugging. def headers(): h = "" for (k, v) in http._last_response.headers.items(): h += k + ": " + v + "\n" return h.lower() # Intervals x = http.get("stream/intervals", { "path": "/newton/prep" }) if "transfer-encoding: chunked" not in headers(): warnings.warn("Non-chunked HTTP response for /stream/intervals") if "content-type: application/x-json-stream" not in headers(): raise AssertionError("/stream/intervals content type " "is not application/x-json-stream:\n" + headers()) # Extract x = http.get("stream/extract", { "path": "/newton/prep", "start": "123", "end": "124" }) if "transfer-encoding: chunked" not in headers(): warnings.warn("Non-chunked HTTP response for /stream/extract") if "content-type: text/plain;charset=utf-8" not in headers(): raise AssertionError("/stream/extract is not text/plain:\n" + headers()) client.close() def test_client_08_unicode(self): # Try both with and without posting JSON for post_json in (False, True): # Basic Unicode tests client = nilmdb.client.Client(url = testurl, post_json = post_json) # Delete streams that exist for stream in client.stream_list(): client.stream_remove(stream[0]) client.stream_destroy(stream[0]) # Database is empty eq_(client.stream_list(), []) # Create Unicode stream, match it raw = [ u"/düsseldorf/raw", u"uint16_6" ] prep = [ u"/düsseldorf/prep", u"uint16_6" ] client.stream_create(*raw) eq_(client.stream_list(), [raw]) eq_(client.stream_list(layout=raw[1]), [raw]) eq_(client.stream_list(path=raw[0]), [raw]) client.stream_create(*prep) eq_(client.stream_list(), [prep, raw]) # Set / get metadata with Unicode keys and values eq_(client.stream_get_metadata(raw[0]), {}) eq_(client.stream_get_metadata(prep[0]), {}) meta1 = { u"alpha": u"α", u"β": u"beta" } meta2 = { u"alpha": u"α" } meta3 = { u"β": u"beta" } client.stream_set_metadata(prep[0], meta1) client.stream_update_metadata(prep[0], {}) client.stream_update_metadata(raw[0], meta2) client.stream_update_metadata(raw[0], meta3) eq_(client.stream_get_metadata(prep[0]), meta1) eq_(client.stream_get_metadata(raw[0]), meta1) eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2) eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1) client.close() def test_client_09_closing(self): # Make sure we actually close sockets correctly. New # connections will block for a while if they're not, since the # server will stop accepting new connections. for test in [1, 2]: start = time.time() for i in range(50): if time.time() - start > 15: raise AssertionError("Connections seem to be blocking... " "probably not closing properly.") if test == 1: # explicit close client = nilmdb.client.Client(url = testurl) with assert_raises(ClientError) as e: client.stream_remove("/newton/prep", 123, 120) client.close() # remove this to see the failure elif test == 2: # use the context manager with nilmdb.client.Client(url = testurl) as c: with assert_raises(ClientError) as e: c.stream_remove("/newton/prep", 123, 120) def test_client_10_context(self): # Test using the client's stream insertion context manager to # insert data. client = nilmdb.client.Client(testurl) client.stream_create("/context/test", "uint16_1") with client.stream_insert_context("/context/test") as ctx: # override _max_data to trigger frequent server updates ctx._max_data = 15 ctx.insert("100 1\n") ctx.insert("101 ") ctx.insert("1\n102 1") ctx.insert("") ctx.insert("\n103 1\n") ctx.insert("104 1\n") ctx.insert("# hello\n") ctx.insert(" # hello\n") ctx.insert(" 105 1\n") ctx.finalize() ctx.insert("107 1\n") ctx.update_end(108) ctx.finalize() ctx.update_start(109) ctx.insert("110 1\n") ctx.insert("111 1\n") ctx.insert("112 1\n") ctx.insert("113 1\n") ctx.insert("114 1\n") ctx.update_end(116) ctx.insert("115 1\n") ctx.update_end(117) ctx.insert("116 1\n") ctx.update_end(118) ctx.insert("117 1" + " # this is super long" * 100 + "\n") ctx.finalize() ctx.insert("# this is super long" * 100) with assert_raises(ClientError): with client.stream_insert_context("/context/test", 100, 200) as ctx: ctx.insert("118 1\n") with assert_raises(ClientError): with client.stream_insert_context("/context/test", 200, 300) as ctx: ctx.insert("118 1\n") with assert_raises(ClientError): with client.stream_insert_context("/context/test") as ctx: ctx.insert("bogus data\n") with client.stream_insert_context("/context/test", 200, 300) as ctx: # make sure our override wasn't permanent ne_(ctx._max_data, 15) ctx.insert("225 1\n") ctx.finalize() with assert_raises(ClientError): with client.stream_insert_context("/context/test", 300, 400) as ctx: ctx.insert("301 1\n") ctx.insert("302 2\n") ctx.insert("303 3\n") ctx.insert("304 4\n") ctx.insert("304 4\n") # non-monotonic after a few lines ctx.finalize() eq_(list(client.stream_intervals("/context/test")), [ [ 100, 106 ], [ 107, 108 ], [ 109, 118 ], [ 200, 300 ] ]) # destroy stream (try without removing data first) with assert_raises(ClientError): client.stream_destroy("/context/test") client.stream_remove("/context/test") client.stream_destroy("/context/test") client.close() def test_client_11_emptyintervals(self): # Empty intervals are ok! If recording detection events # by inserting rows into the database, we want to be able to # have an interval where no events occurred. Test them here. client = nilmdb.client.Client(testurl) client.stream_create("/empty/test", "uint16_1") def info(): result = [] for interval in list(client.stream_intervals("/empty/test")): result.append((client.stream_count("/empty/test", *interval), interval)) return result eq_(info(), []) # Insert a region with just a few points with client.stream_insert_context("/empty/test") as ctx: ctx.update_start(100) ctx.insert("140 1\n") ctx.insert("150 1\n") ctx.insert("160 1\n") ctx.update_end(200) ctx.finalize() eq_(info(), [(3, [100, 200])]) # Delete chunk, which will leave one data point and two intervals client.stream_remove("/empty/test", 145, 175) eq_(info(), [(1, [100, 145]), (0, [175, 200])]) # Try also creating a completely empty interval from scratch, # in a few different ways. client.stream_insert("/empty/test", "", 300, 350) client.stream_insert("/empty/test", [], 400, 450) with client.stream_insert_context("/empty/test", 500, 550): pass # If enough timestamps aren't provided, empty streams won't be created. client.stream_insert("/empty/test", []) with client.stream_insert_context("/empty/test"): pass client.stream_insert("/empty/test", [], start = 600) with client.stream_insert_context("/empty/test", start = 700): pass client.stream_insert("/empty/test", [], end = 850) with client.stream_insert_context("/empty/test", end = 950): pass # Try various things that might cause problems with client.stream_insert_context("/empty/test", 1000, 1050): ctx.finalize() # inserts [1000, 1050] ctx.finalize() # nothing ctx.finalize() # nothing ctx.insert("1100 1\n") ctx.finalize() # inserts [1100, 1101] ctx.update_start(1199) ctx.insert("1200 1\n") ctx.update_end(1250) ctx.finalize() # inserts [1199, 1250] ctx.update_start(1299) ctx.finalize() # nothing ctx.update_end(1350) ctx.finalize() # nothing ctx.update_start(1400) ctx.insert("# nothing!\n") ctx.update_end(1450) ctx.finalize() ctx.update_start(1500) ctx.insert("# nothing!") ctx.update_end(1550) ctx.finalize() ctx.insert("# nothing!\n" * 10) ctx.finalize() # implicit last finalize inserts [1400, 1450] # Check everything eq_(info(), [(1, [100, 145]), (0, [175, 200]), (0, [300, 350]), (0, [400, 450]), (0, [500, 550]), (0, [1000, 1050]), (1, [1100, 1101]), (1, [1199, 1250]), (0, [1400, 1450]), (0, [1500, 1550]), ]) # Clean up client.stream_remove("/empty/test") client.stream_destroy("/empty/test") client.close() def test_client_12_persistent(self): # Check that connections are persistent when they should be. # This is pretty hard to test; we have to poke deep into # the Requests library. with nilmdb.client.Client(url = testurl) as c: def connections(): try: poolmanager = c.http._last_response.connection.poolmanager pool = poolmanager.pools[('http','localhost',32180)] return (pool.num_connections, pool.num_requests) except: raise SkipTest("can't get connection info") # First request makes a connection c.stream_create("/persist/test", "uint16_1") eq_(connections(), (1, 1)) # Non-generator c.stream_list("/persist/test") eq_(connections(), (1, 2)) c.stream_list("/persist/test") eq_(connections(), (1, 3)) # Generators for x in c.stream_intervals("/persist/test"): pass eq_(connections(), (1, 4)) for x in c.stream_intervals("/persist/test"): pass eq_(connections(), (1, 5)) # Clean up c.stream_remove("/persist/test") c.stream_destroy("/persist/test") eq_(connections(), (1, 7)) def test_client_13_timestamp_rounding(self): # Test potentially bad timestamps (due to floating point # roundoff etc). The server will round floating point values # to the nearest int. client = nilmdb.client.Client(testurl) client.stream_create("/rounding/test", "uint16_1") with client.stream_insert_context("/rounding/test", 100000000, 200000000.1) as ctx: ctx.insert("100000000.1 1\n") ctx.insert("150000000.00003 1\n") ctx.insert("199999999.4 1\n") eq_(list(client.stream_intervals("/rounding/test")), [ [ 100000000, 200000000 ] ]) with assert_raises(ClientError): with client.stream_insert_context("/rounding/test", 200000000, 300000000) as ctx: ctx.insert("200000000 1\n") ctx.insert("250000000 1\n") # Server will round this and give an error on finalize() ctx.insert("299999999.99 1\n") client.stream_remove("/rounding/test") client.stream_destroy("/rounding/test") client.close()