# -*- coding: utf-8 -*- import nilmdb.server import nilmdb.client from nilmdb.utils.printf import * from nilmdb.utils import timestamper from nilmdb.client import ClientError, ServerError, Error from nilmdb.utils.sort import sort_human import datetime_tz from nose.plugins.skip import SkipTest from nose.tools import * from nose.tools import assert_raises import itertools import distutils.version import os import sys import threading import io import json import unittest import warnings import resource import time import re import struct from testutil.helpers import * testdb = "tests/client-testdb" testurl = "http://localhost:32180/" def setup_module(): global test_server, test_db # Clear out DB recursive_unlink(testdb) # Start web app on a custom port test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", port = 32180, stoppable = False, fast_shutdown = True, force_traceback = True) test_server.start(blocking = False) def teardown_module(): global test_server, test_db # Close web app test_server.stop() test_db.close() class TestClient(object): def test_client_01_basic(self): # Test a fake host client = nilmdb.client.Client(url = "http://localhost:1/") with assert_raises(nilmdb.client.ServerError): client.version() client.close() # Then a fake URL on a real host client = nilmdb.client.Client(url = "http://localhost:32180/fake/") with assert_raises(nilmdb.client.ClientError): client.version() client.close() # Now a real URL with no http:// prefix client = nilmdb.client.Client(url = "localhost:32180") version = client.version() client.close() # Now use the real URL client = nilmdb.client.Client(url = testurl) version = client.version() eq_(distutils.version.LooseVersion(version), distutils.version.LooseVersion(test_server.version)) # Bad URLs should give 404, not 500 with assert_raises(ClientError): client.http.get("/stream/create") # Test error handling url = testurl args = { "url": url, "status": "400", "message": "Something went wrong", "traceback": None } with assert_raises(ClientError): client.http._handle_error(url, 400, json.dumps(args)) with assert_raises(ClientError): client.http._handle_error(url, 400, "this is not JSON.. {") args["status"] = "500" with assert_raises(ServerError): client.http._handle_error(url, 500, json.dumps(args)) args["message"] = None with assert_raises(ServerError): client.http._handle_error(url, 500, json.dumps(args)) args["status"] = "600" with assert_raises(Error): client.http._handle_error(url, 600, json.dumps(args)) # Use get_gen for an endpoint that doesn't have newlines, # for better test coverage. for line in client.http.get_gen("/version"): pass client.close() def test_client_02_createlist(self): # Basic stream tests, like those in test_nilmdb:test_stream client = nilmdb.client.Client(url = testurl) # Database starts empty eq_(client.stream_list(), []) # Bad path with assert_raises(ClientError): client.stream_create("foo/bar/baz", "float32_8") with assert_raises(ClientError): client.stream_create("/foo", "float32_8") # Bad layout type with assert_raises(ClientError): client.stream_create("/newton/prep", "NoSuchLayout") # Bad method types with assert_raises(ClientError): client.http.put("/stream/list",b"") # Try a bunch of times to make sure the request body is getting consumed for x in range(10): with assert_raises(ClientError): client.http.post("/stream/list") client = nilmdb.client.Client(url = testurl) # Create four streams client.stream_create("/newton/prep", "float32_8") client.stream_create("/newton/raw", "uint16_6") client.stream_create("/newton/zzz/rawnotch2", "uint16_9") client.stream_create("/newton/zzz/rawnotch11", "uint16_9") # Test sort_human (used by stream_list) eq_(sort_human(["/s/10", "/s/2"]), ["/s/2", "/s/10"]) # Verify we got 4 streams in the right order eq_(client.stream_list(), [ ["/newton/prep", "float32_8"], ["/newton/raw", "uint16_6"], ["/newton/zzz/rawnotch2", "uint16_9"], ["/newton/zzz/rawnotch11", "uint16_9"] ]) # Match just one type or one path eq_(client.stream_list(layout="uint16_6"), [ ["/newton/raw", "uint16_6"] ]) eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "uint16_6"] ]) # Try messing with resource limits to trigger errors and get # more coverage. Here, make it so we can only create files 1 # byte in size, which will trigger an IOError in the server when # we create a table. limit = resource.getrlimit(resource.RLIMIT_FSIZE) resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1])) # normal with assert_raises(ServerError) as e: client.stream_create("/newton/hello", "uint16_6") # same but with force_traceback == False, to improve coverage global test_server test_server.force_traceback = False with assert_raises(ServerError) as e: client.stream_create("/newton/world", "uint16_6") test_server.force_traceback = True # Reset resource limit resource.setrlimit(resource.RLIMIT_FSIZE, limit) client.close() def test_client_03_metadata(self): client = nilmdb.client.Client(url = testurl) # Set / get metadata eq_(client.stream_get_metadata("/newton/prep"), {}) eq_(client.stream_get_metadata("/newton/raw"), {}) meta1 = { "description": "The Data", "v_scale": "1.234" } meta2 = { "description": "The Data" } meta3 = { "v_scale": "1.234" } client.stream_set_metadata("/newton/prep", meta1) client.stream_update_metadata("/newton/prep", {}) client.stream_update_metadata("/newton/raw", meta2) client.stream_update_metadata("/newton/raw", meta3) eq_(client.stream_get_metadata("/newton/prep"), meta1) eq_(client.stream_get_metadata("/newton/raw"), meta1) eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2) eq_(client.stream_get_metadata("/newton/raw", [ "description", "v_scale" ] ), meta1) # missing key eq_(client.stream_get_metadata("/newton/raw", "descr"), { "descr": None }) eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]), { "descr": None }) # test wrong types (list instead of dict) with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", [1,2,3]) with assert_raises(ClientError): client.stream_update_metadata("/newton/prep", [1,2,3]) # test wrong types (dict of non-strings) # numbers are OK; they'll get converted to strings client.stream_set_metadata("/newton/prep", { "hello": 1234 }) # anything else is not with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", { "world": { 1: 2 } }) with assert_raises(ClientError): client.stream_set_metadata("/newton/prep", { "world": [ 1, 2 ] }) client.close() def test_client_04_insert(self): client = nilmdb.client.Client(url = testurl) # Limit _max_data to 1 MB, since our test file is 1.5 MB old_max_data = nilmdb.client.client.StreamInserter._max_data nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024 datetime_tz.localtz_set("America/New_York") testfile = "tests/data/prep-20120323T1000" start = nilmdb.utils.time.parse_time("20120323T1000") rate = 120 # First try a nonexistent path data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/no-such-path", data) in_("404 Not Found", repr(e.exception)) # Now try reversed timestamps data = timestamper.TimestamperRate(testfile, start, 120) data = reversed(list(data)) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data) in_("400 Bad Request", str(e.exception)) in2_("timestamp is not monotonically increasing", "start must precede end", str(e.exception)) # Now try empty data (no server request made) empty = io.StringIO("") data = timestamper.TimestamperRate(empty, start, 120) result = client.stream_insert("/newton/prep", data) eq_(result, None) # It's OK to insert an empty interval client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": 1, "end": 2 }) eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Timestamps can be negative too client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": -2, "end": -1 }) eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Intervals that end at zero shouldn't be any different client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": -1, "end": 0 }) eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]]) client.stream_remove("/newton/prep") eq_(list(client.stream_intervals("/newton/prep")), []) # Try forcing a server request with equal start and end with assert_raises(ClientError) as e: client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": 0, "end": 0 }) in_("400 Bad Request", str(e.exception)) in_("start must precede end", str(e.exception)) # Invalid times in HTTP request with assert_raises(ClientError) as e: client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": "asdf", "end": 0 }) in_("400 Bad Request", str(e.exception)) in_("invalid start", str(e.exception)) with assert_raises(ClientError) as e: client.http.put("stream/insert", b"", { "path": "/newton/prep", "start": 0, "end": "asdf" }) in_("400 Bad Request", str(e.exception)) in_("invalid end", str(e.exception)) # Good content type with assert_raises(ClientError) as e: client.http.put("stream/insert", b"", { "path": "xxxx", "start": 0, "end": 1, "binary": 1 }) in_("No such stream", str(e.exception)) # Bad content type with assert_raises(ClientError) as e: client.http.put("stream/insert", b"", { "path": "xxxx", "start": 0, "end": 1, "binary": 1 }, content_type="text/plain; charset=utf-8") in_("Content type must be application/octet-stream", str(e.exception)) # Specify start/end (starts too late) data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data, start + 5000000, start + 120000000) in_("400 Bad Request", str(e.exception)) in_("Data timestamp 1332511200000000 < start time 1332511205000000", str(e.exception)) # Specify start/end (ends too early) data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data, start, start + 1000000) in_("400 Bad Request", str(e.exception)) # Client chunks the input, so the exact timestamp here might change # if the chunk positions change. assert(re.search("Data timestamp 13325[0-9]+ " ">= end time 1332511201000000", str(e.exception)) is not None) def check_data(): # Verify the intervals. Should be just one, even if the data # was inserted in chunks, due to nilmdb interval concatenation. intervals = list(client.stream_intervals("/newton/prep")) eq_(intervals, [[start, start + 119999777]]) # Try some overlapping data -- just insert it again data = timestamper.TimestamperRate(testfile, start, 120) with assert_raises(ClientError) as e: result = client.stream_insert("/newton/prep", data) in_("400 Bad Request", str(e.exception)) in_("verlap", str(e.exception)) # Now do the real load data = timestamper.TimestamperRate(testfile, start, 120) result = client.stream_insert("/newton/prep", data, start, start + 119999777) check_data() # Try inserting directly-passed data client.stream_remove("/newton/prep", start, start + 119999777) data = timestamper.TimestamperRate(testfile, start, 120) data_bytes = b''.join(data) result = client.stream_insert("/newton/prep", data_bytes, start, start + 119999777) check_data() nilmdb.client.client.StreamInserter._max_data = old_max_data client.close() def test_client_05_extractremove(self): # Misc tests for extract and remove. Most of them are in test_cmdline. client = nilmdb.client.Client(url = testurl) for x in client.stream_extract("/newton/prep", 999123000000, 999124000000): raise AssertionError("shouldn't be any data for this request") with assert_raises(ClientError) as e: client.stream_remove("/newton/prep", 123000000, 120000000) # Test count eq_(client.stream_count("/newton/prep"), 14400) # Test binary output with assert_raises(ClientError) as e: list(client.stream_extract("/newton/prep", markup = True, binary = True)) with assert_raises(ClientError) as e: list(client.stream_extract("/newton/prep", count = True, binary = True)) data = b"".join(client.stream_extract("/newton/prep", binary = True)) # Quick check using struct unpacker = struct.Struct(" 15: raise AssertionError("Connections seem to be blocking... " "probably not closing properly.") if test == 1: # explicit close client = nilmdb.client.Client(url = testurl) with assert_raises(ClientError) as e: client.stream_remove("/newton/prep", 123, 120) client.close() # remove this to see the failure elif test == 2: # use the context manager with nilmdb.client.Client(url = testurl) as c: with assert_raises(ClientError) as e: c.stream_remove("/newton/prep", 123, 120) def test_client_10_context(self): # Test using the client's stream insertion context manager to # insert data. client = nilmdb.client.Client(testurl) client.stream_create("/context/test", "uint16_1") with client.stream_insert_context("/context/test") as ctx: # override _max_data to trigger frequent server updates ctx._max_data = 15 ctx.insert(b"1000 1\n") ctx.insert(b"1010 ") ctx.insert(b"1\n1020 1") ctx.insert(b"") ctx.insert(b"\n1030 1\n") ctx.insert(b"1040 1\n") ctx.insert(b"# hello\n") ctx.insert(b" # hello\n") ctx.insert(b" 1050 1\n") ctx.finalize() ctx.insert(b"1070 1\n") ctx.update_end(1080) ctx.finalize() ctx.update_start(1090) ctx.insert(b"1100 1\n") ctx.insert(b"1110 1\n") ctx.send() ctx.insert(b"1120 1\n") ctx.insert(b"1130 1\n") ctx.insert(b"1140 1\n") ctx.update_end(1160) ctx.insert(b"1150 1\n") ctx.update_end(1170) ctx.insert(b"1160 1\n") ctx.update_end(1180) ctx.insert(b"1170 1" + b" # this is super long" * 100 + b"\n") ctx.finalize() ctx.insert(b"# this is super long" * 100) # override _max_data_after_send to trigger ValueError on a # long nonterminated line ctx._max_data_after_send = 1000 with assert_raises(ValueError): ctx.insert(b"# this is super long" * 100) with assert_raises(ClientError): with client.stream_insert_context("/context/test", 1000, 2000) as ctx: ctx.insert(b"1180 1\n") with assert_raises(ClientError): with client.stream_insert_context("/context/test", 2000, 3000) as ctx: ctx.insert(b"1180 1\n") with assert_raises(ClientError): with client.stream_insert_context("/context/test") as ctx: ctx.insert(b"bogus data\n") with client.stream_insert_context("/context/test", 2000, 3000) as ctx: # make sure our override wasn't permanent ne_(ctx._max_data, 15) ctx.insert(b"2250 1\n") ctx.finalize() with assert_raises(ClientError): with client.stream_insert_context("/context/test", 3000, 4000) as ctx: ctx.insert(b"3010 1\n") ctx.insert(b"3020 2\n") ctx.insert(b"3030 3\n") ctx.insert(b"3040 4\n") ctx.insert(b"3040 4\n") # non-monotonic after a few lines ctx.finalize() eq_(list(client.stream_intervals("/context/test")), [ [ 1000, 1051 ], [ 1070, 1080 ], [ 1090, 1180 ], [ 2000, 3000 ] ]) # destroy stream (try without removing data first) with assert_raises(ClientError): client.stream_destroy("/context/test") client.stream_remove("/context/test") client.stream_destroy("/context/test") client.close() def test_client_11_emptyintervals(self): # Empty intervals are ok! If recording detection events # by inserting rows into the database, we want to be able to # have an interval where no events occurred. Test them here. client = nilmdb.client.Client(testurl) client.stream_create("/empty/test", "uint16_1") def info(): result = [] for interval in list(client.stream_intervals("/empty/test")): result.append((client.stream_count("/empty/test", *interval), interval)) return result eq_(info(), []) # Insert a region with just a few points with client.stream_insert_context("/empty/test") as ctx: ctx.update_start(100) ctx.insert(b"140 1\n") ctx.insert(b"150 1\n") ctx.insert(b"160 1\n") ctx.update_end(200) ctx.finalize() eq_(info(), [(3, [100, 200])]) # Delete chunk, which will leave one data point and two intervals client.stream_remove("/empty/test", 145, 175) eq_(info(), [(1, [100, 145]), (0, [175, 200])]) # Try also creating a completely empty interval from scratch, # in a few different ways. client.stream_insert("/empty/test", b"", 300, 350) client.stream_insert("/empty/test", [], 400, 450) with client.stream_insert_context("/empty/test", 500, 550): pass # If enough timestamps aren't provided, empty streams won't be created. client.stream_insert("/empty/test", []) with client.stream_insert_context("/empty/test"): pass client.stream_insert("/empty/test", [], start = 600) with client.stream_insert_context("/empty/test", start = 700): pass client.stream_insert("/empty/test", [], end = 850) with client.stream_insert_context("/empty/test", end = 950): pass # Equal start and end is OK as long as there's no data with client.stream_insert_context("/empty/test", start=9, end=9): pass # Try various things that might cause problems with client.stream_insert_context("/empty/test", 1000, 1050) as ctx: ctx.finalize() # inserts [1000, 1050] ctx.finalize() # nothing ctx.finalize() # nothing ctx.insert(b"1100 1\n") ctx.finalize() # inserts [1100, 1101] ctx.update_start(1199) ctx.insert(b"1200 1\n") ctx.update_end(1250) ctx.finalize() # inserts [1199, 1250] ctx.update_start(1299) ctx.finalize() # nothing ctx.update_end(1350) ctx.finalize() # nothing ctx.update_start(1400) ctx.insert(b"# nothing!\n") ctx.update_end(1450) ctx.finalize() ctx.update_start(1500) ctx.insert(b"# nothing!") ctx.update_end(1550) ctx.finalize() ctx.insert(b"# nothing!\n" * 10) ctx.finalize() # implicit last finalize inserts [1400, 1450] # Check everything eq_(info(), [(1, [100, 145]), (0, [175, 200]), (0, [300, 350]), (0, [400, 450]), (0, [500, 550]), (0, [1000, 1050]), (1, [1100, 1101]), (1, [1199, 1250]), (0, [1400, 1450]), (0, [1500, 1550]), ]) # Clean up client.stream_remove("/empty/test") client.stream_destroy("/empty/test") client.close() def test_client_12_persistent(self): # Check that connections are NOT persistent. Rather than trying # to verify this at the TCP level, just make sure that the response # contained a "Connection: close" header. with nilmdb.client.Client(url = testurl) as c: c.stream_create("/persist/test", "uint16_1") eq_(c.http._last_response.headers["Connection"], "close") c.stream_destroy("/persist/test") eq_(c.http._last_response.headers["Connection"], "close") def test_client_13_timestamp_rounding(self): # Test potentially bad timestamps (due to floating point # roundoff etc). The server will round floating point values # to the nearest int. client = nilmdb.client.Client(testurl) client.stream_create("/rounding/test", "uint16_1") with client.stream_insert_context("/rounding/test", 100000000, 200000000.1) as ctx: ctx.insert(b"100000000.1 1\n") ctx.insert(b"150000000.00003 1\n") ctx.insert(b"199999999.4 1\n") eq_(list(client.stream_intervals("/rounding/test")), [ [ 100000000, 200000000 ] ]) with assert_raises(ClientError): with client.stream_insert_context("/rounding/test", 200000000, 300000000) as ctx: ctx.insert(b"200000000 1\n") ctx.insert(b"250000000 1\n") # Server will round this and give an error on finalize() ctx.insert(b"299999999.99 1\n") client.stream_remove("/rounding/test") client.stream_destroy("/rounding/test") client.close()