723 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			723 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						||
 | 
						||
import nilmdb.server
 | 
						||
import nilmdb.client
 | 
						||
 | 
						||
from nilmdb.utils.printf import *
 | 
						||
from nilmdb.utils import timestamper
 | 
						||
from nilmdb.client import ClientError, ServerError
 | 
						||
from nilmdb.utils import datetime_tz
 | 
						||
 | 
						||
from nose.plugins.skip import SkipTest
 | 
						||
from nose.tools import *
 | 
						||
from nose.tools import assert_raises
 | 
						||
import itertools
 | 
						||
import distutils.version
 | 
						||
import os
 | 
						||
import sys
 | 
						||
import threading
 | 
						||
import cStringIO
 | 
						||
import simplejson as json
 | 
						||
import unittest
 | 
						||
import warnings
 | 
						||
import resource
 | 
						||
import time
 | 
						||
import re
 | 
						||
import struct
 | 
						||
 | 
						||
from testutil.helpers import *
 | 
						||
 | 
						||
testdb = "tests/client-testdb"
 | 
						||
testurl = "http://localhost:32180/"
 | 
						||
 | 
						||
def setup_module():
 | 
						||
    global test_server, test_db
 | 
						||
    # Clear out DB
 | 
						||
    recursive_unlink(testdb)
 | 
						||
 | 
						||
    # Start web app on a custom port
 | 
						||
    test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
 | 
						||
    test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
 | 
						||
                                       port = 32180, stoppable = False,
 | 
						||
                                       fast_shutdown = True,
 | 
						||
                                       force_traceback = True)
 | 
						||
    test_server.start(blocking = False)
 | 
						||
 | 
						||
def teardown_module():
 | 
						||
    global test_server, test_db
 | 
						||
    # Close web app
 | 
						||
    test_server.stop()
 | 
						||
    test_db.close()
 | 
						||
 | 
						||
class TestClient(object):
 | 
						||
 | 
						||
    def test_client_01_basic(self):
 | 
						||
        # Test a fake host
 | 
						||
        client = nilmdb.client.Client(url = "http://localhost:1/")
 | 
						||
        with assert_raises(nilmdb.client.ServerError):
 | 
						||
            client.version()
 | 
						||
        client.close()
 | 
						||
 | 
						||
        # Then a fake URL on a real host
 | 
						||
        client = nilmdb.client.Client(url = "http://localhost:32180/fake/")
 | 
						||
        with assert_raises(nilmdb.client.ClientError):
 | 
						||
            client.version()
 | 
						||
        client.close()
 | 
						||
 | 
						||
        # Now a real URL with no http:// prefix
 | 
						||
        client = nilmdb.client.Client(url = "localhost:32180")
 | 
						||
        version = client.version()
 | 
						||
        client.close()
 | 
						||
 | 
						||
        # Now use the real URL
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
        version = client.version()
 | 
						||
        eq_(distutils.version.LooseVersion(version),
 | 
						||
            distutils.version.LooseVersion(test_server.version))
 | 
						||
 | 
						||
        # Bad URLs should give 404, not 500
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.http.get("/stream/create")
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_02_createlist(self):
 | 
						||
        # Basic stream tests, like those in test_nilmdb:test_stream
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        # Database starts empty
 | 
						||
        eq_(client.stream_list(), [])
 | 
						||
 | 
						||
        # Bad path
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_create("foo/bar/baz", "float32_8")
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_create("/foo", "float32_8")
 | 
						||
        # Bad layout type
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_create("/newton/prep", "NoSuchLayout")
 | 
						||
 | 
						||
        # Bad method types
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.http.put("/stream/list","")
 | 
						||
        # Try a bunch of times to make sure the request body is getting consumed
 | 
						||
        for x in range(10):
 | 
						||
            with assert_raises(ClientError):
 | 
						||
                client.http.post("/stream/list")
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        # Create three streams
 | 
						||
        client.stream_create("/newton/prep", "float32_8")
 | 
						||
        client.stream_create("/newton/raw", "uint16_6")
 | 
						||
        client.stream_create("/newton/zzz/rawnotch", "uint16_9")
 | 
						||
 | 
						||
        # Verify we got 3 streams
 | 
						||
        eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
 | 
						||
                                    ["/newton/raw", "uint16_6"],
 | 
						||
                                    ["/newton/zzz/rawnotch", "uint16_9"]
 | 
						||
                                    ])
 | 
						||
        # Match just one type or one path
 | 
						||
        eq_(client.stream_list(layout="uint16_6"),
 | 
						||
            [ ["/newton/raw", "uint16_6"] ])
 | 
						||
        eq_(client.stream_list(path="/newton/raw"),
 | 
						||
            [ ["/newton/raw", "uint16_6"] ])
 | 
						||
 | 
						||
        # Try messing with resource limits to trigger errors and get
 | 
						||
        # more coverage.  Here, make it so we can only create files 1
 | 
						||
        # byte in size, which will trigger an IOError in the server when
 | 
						||
        # we create a table.
 | 
						||
        limit = resource.getrlimit(resource.RLIMIT_FSIZE)
 | 
						||
        resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
 | 
						||
        with assert_raises(ServerError) as e:
 | 
						||
            client.stream_create("/newton/hello", "uint16_6")
 | 
						||
        resource.setrlimit(resource.RLIMIT_FSIZE, limit)
 | 
						||
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_03_metadata(self):
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        # Set / get metadata
 | 
						||
        eq_(client.stream_get_metadata("/newton/prep"), {})
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw"), {})
 | 
						||
        meta1 = { "description": "The Data",
 | 
						||
                  "v_scale": "1.234" }
 | 
						||
        meta2 = { "description": "The Data" }
 | 
						||
        meta3 = { "v_scale": "1.234" }
 | 
						||
        client.stream_set_metadata("/newton/prep", meta1)
 | 
						||
        client.stream_update_metadata("/newton/prep", {})
 | 
						||
        client.stream_update_metadata("/newton/raw", meta2)
 | 
						||
        client.stream_update_metadata("/newton/raw", meta3)
 | 
						||
        eq_(client.stream_get_metadata("/newton/prep"), meta1)
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw"), meta1)
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw",
 | 
						||
                                       [ "description" ] ), meta2)
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw",
 | 
						||
                                       [ "description", "v_scale" ] ), meta1)
 | 
						||
 | 
						||
        # missing key
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw", "descr"),
 | 
						||
            { "descr": None })
 | 
						||
        eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]),
 | 
						||
            { "descr": None })
 | 
						||
 | 
						||
        # test wrong types (list instead of dict)
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_set_metadata("/newton/prep", [1,2,3])
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_update_metadata("/newton/prep", [1,2,3])
 | 
						||
 | 
						||
        # test wrong types (dict of non-strings)
 | 
						||
        # numbers are OK; they'll get converted to strings
 | 
						||
        client.stream_set_metadata("/newton/prep", { "hello": 1234 })
 | 
						||
        # anything else is not
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_set_metadata("/newton/prep", { "world": { 1: 2 } })
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_set_metadata("/newton/prep", { "world": [ 1, 2 ] })
 | 
						||
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_04_insert(self):
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        # Limit _max_data to 1 MB, since our test file is 1.5 MB
 | 
						||
        old_max_data = nilmdb.client.client.StreamInserter._max_data
 | 
						||
        nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
 | 
						||
 | 
						||
        datetime_tz.localtz_set("America/New_York")
 | 
						||
 | 
						||
        testfile = "tests/data/prep-20120323T1000"
 | 
						||
        start = nilmdb.utils.time.parse_time("20120323T1000")
 | 
						||
        rate = 120
 | 
						||
 | 
						||
        # First try a nonexistent path
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            result = client.stream_insert("/newton/no-such-path", data)
 | 
						||
        in_("404 Not Found", str(e.exception))
 | 
						||
 | 
						||
        # Now try reversed timestamps
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        data = reversed(list(data))
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            result = client.stream_insert("/newton/prep", data)
 | 
						||
        in_("400 Bad Request", str(e.exception))
 | 
						||
        in2_("timestamp is not monotonically increasing",
 | 
						||
             "start must precede end", str(e.exception))
 | 
						||
 | 
						||
        # Now try empty data (no server request made)
 | 
						||
        empty = cStringIO.StringIO("")
 | 
						||
        data = timestamper.TimestamperRate(empty, start, 120)
 | 
						||
        result = client.stream_insert("/newton/prep", data)
 | 
						||
        eq_(result, None)
 | 
						||
 | 
						||
        # It's OK to insert an empty interval
 | 
						||
        client.http.put("stream/insert", "", { "path": "/newton/prep",
 | 
						||
                                               "start": 1, "end": 2 })
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
 | 
						||
        client.stream_remove("/newton/prep")
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [])
 | 
						||
 | 
						||
        # Timestamps can be negative too
 | 
						||
        client.http.put("stream/insert", "", { "path": "/newton/prep",
 | 
						||
                                               "start": -2, "end": -1 })
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
 | 
						||
        client.stream_remove("/newton/prep")
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [])
 | 
						||
 | 
						||
        # Intervals that end at zero shouldn't be any different
 | 
						||
        client.http.put("stream/insert", "", { "path": "/newton/prep",
 | 
						||
                                               "start": -1, "end": 0 })
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
 | 
						||
        client.stream_remove("/newton/prep")
 | 
						||
        eq_(list(client.stream_intervals("/newton/prep")), [])
 | 
						||
 | 
						||
        # Try forcing a server request with equal start and end
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            client.http.put("stream/insert", "", { "path": "/newton/prep",
 | 
						||
                                                   "start": 0, "end": 0 })
 | 
						||
        in_("400 Bad Request", str(e.exception))
 | 
						||
        in_("start must precede end", str(e.exception))
 | 
						||
 | 
						||
        # Good content type
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            client.http.put("stream/insert", "",
 | 
						||
                            { "path": "xxxx", "start": 0, "end": 1,
 | 
						||
                              "binary": 1 },
 | 
						||
                            binary = True)
 | 
						||
        in_("No such stream", str(e.exception))
 | 
						||
 | 
						||
        # Bad content type
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            client.http.put("stream/insert", "",
 | 
						||
                            { "path": "xxxx", "start": 0, "end": 1,
 | 
						||
                              "binary": 1 },
 | 
						||
                            binary = False)
 | 
						||
        in_("Content type must be application/octet-stream", str(e.exception))
 | 
						||
 | 
						||
        # Specify start/end (starts too late)
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            result = client.stream_insert("/newton/prep", data,
 | 
						||
                                          start + 5000000, start + 120000000)
 | 
						||
        in_("400 Bad Request", str(e.exception))
 | 
						||
        in_("Data timestamp 1332511200000000 < start time 1332511205000000",
 | 
						||
            str(e.exception))
 | 
						||
 | 
						||
        # Specify start/end (ends too early)
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            result = client.stream_insert("/newton/prep", data,
 | 
						||
                                          start, start + 1000000)
 | 
						||
        in_("400 Bad Request", str(e.exception))
 | 
						||
        # Client chunks the input, so the exact timestamp here might change
 | 
						||
        # if the chunk positions change.
 | 
						||
        assert(re.search("Data timestamp 13325[0-9]+ "
 | 
						||
                         ">= end time 1332511201000000", str(e.exception))
 | 
						||
               is not None)
 | 
						||
 | 
						||
        # Now do the real load
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        result = client.stream_insert("/newton/prep", data,
 | 
						||
                                      start, start + 119999777)
 | 
						||
 | 
						||
        # Verify the intervals.  Should be just one, even if the data
 | 
						||
        # was inserted in chunks, due to nilmdb interval concatenation.
 | 
						||
        intervals = list(client.stream_intervals("/newton/prep"))
 | 
						||
        eq_(intervals, [[start, start + 119999777]])
 | 
						||
 | 
						||
        # Try some overlapping data -- just insert it again
 | 
						||
        data = timestamper.TimestamperRate(testfile, start, 120)
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            result = client.stream_insert("/newton/prep", data)
 | 
						||
        in_("400 Bad Request", str(e.exception))
 | 
						||
        in_("verlap", str(e.exception))
 | 
						||
 | 
						||
        nilmdb.client.client.StreamInserter._max_data = old_max_data
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_05_extractremove(self):
 | 
						||
        # Misc tests for extract and remove.  Most of them are in test_cmdline.
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        for x in client.stream_extract("/newton/prep",
 | 
						||
                                       999123000000, 999124000000):
 | 
						||
            raise AssertionError("shouldn't be any data for this request")
 | 
						||
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            client.stream_remove("/newton/prep", 123000000, 120000000)
 | 
						||
 | 
						||
        # Test count
 | 
						||
        eq_(client.stream_count("/newton/prep"), 14400)
 | 
						||
 | 
						||
        # Test binary output
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            list(client.stream_extract("/newton/prep",
 | 
						||
                                       markup = True, binary = True))
 | 
						||
        with assert_raises(ClientError) as e:
 | 
						||
            list(client.stream_extract("/newton/prep",
 | 
						||
                                       count = True, binary = True))
 | 
						||
        data = "".join(client.stream_extract("/newton/prep", binary = True))
 | 
						||
        # Quick check using struct
 | 
						||
        unpacker = struct.Struct("<qffffffff")
 | 
						||
        out = []
 | 
						||
        for i in range(14400):
 | 
						||
            out.append(unpacker.unpack_from(data, i * unpacker.size))
 | 
						||
        eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
 | 
						||
                     2525.169921875, 8350.83984375, 3724.699951171875,
 | 
						||
                     1355.3399658203125, 2039.0))
 | 
						||
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_06_generators(self):
 | 
						||
        # A lot of the client functionality is already tested by test_cmdline,
 | 
						||
        # but this gets a bit more coverage that cmdline misses.
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
 | 
						||
        # Trigger a client error in generator
 | 
						||
        start = nilmdb.utils.time.parse_time("20120323T2000")
 | 
						||
        end = nilmdb.utils.time.parse_time("20120323T1000")
 | 
						||
        for function in [ client.stream_intervals, client.stream_extract ]:
 | 
						||
            with assert_raises(ClientError) as e:
 | 
						||
                function("/newton/prep", start, end).next()
 | 
						||
            in_("400 Bad Request", str(e.exception))
 | 
						||
            in_("start must precede end", str(e.exception))
 | 
						||
 | 
						||
        # Trigger a curl error in generator
 | 
						||
        with assert_raises(ServerError) as e:
 | 
						||
            client.http.get_gen("http://nosuchurl.example.com./").next()
 | 
						||
 | 
						||
        # Trigger a curl error in generator
 | 
						||
        with assert_raises(ServerError) as e:
 | 
						||
            client.http.get_gen("http://nosuchurl.example.com./").next()
 | 
						||
 | 
						||
        # Check 404 for missing streams
 | 
						||
        for function in [ client.stream_intervals, client.stream_extract ]:
 | 
						||
            with assert_raises(ClientError) as e:
 | 
						||
                function("/no/such/stream").next()
 | 
						||
            in_("404 Not Found", str(e.exception))
 | 
						||
            in_("No such stream", str(e.exception))
 | 
						||
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_07_headers(self):
 | 
						||
        # Make sure that /stream/intervals and /stream/extract
 | 
						||
        # properly return streaming, chunked, text/plain response.
 | 
						||
        # Pokes around in client.http internals a bit to look at the
 | 
						||
        # response headers.
 | 
						||
 | 
						||
        client = nilmdb.client.Client(url = testurl)
 | 
						||
        http = client.http
 | 
						||
 | 
						||
        # Use a warning rather than returning a test failure for the
 | 
						||
        # transfer-encoding, so that we can still disable chunked
 | 
						||
        # responses for debugging.
 | 
						||
 | 
						||
        def headers():
 | 
						||
            h = ""
 | 
						||
            for (k, v) in http._last_response.headers.items():
 | 
						||
                h += k + ": " + v + "\n"
 | 
						||
            return h.lower()
 | 
						||
 | 
						||
        # Intervals
 | 
						||
        x = http.get("stream/intervals", { "path": "/newton/prep" })
 | 
						||
        if "transfer-encoding: chunked" not in headers():
 | 
						||
            warnings.warn("Non-chunked HTTP response for /stream/intervals")
 | 
						||
        if "content-type: application/x-json-stream" not in headers():
 | 
						||
            raise AssertionError("/stream/intervals content type "
 | 
						||
                                 "is not application/x-json-stream:\n" +
 | 
						||
                                 headers())
 | 
						||
 | 
						||
        # Extract
 | 
						||
        x = http.get("stream/extract",
 | 
						||
                            { "path": "/newton/prep",
 | 
						||
                              "start": "123",
 | 
						||
                              "end": "124" })
 | 
						||
        if "transfer-encoding: chunked" not in headers():
 | 
						||
            warnings.warn("Non-chunked HTTP response for /stream/extract")
 | 
						||
        if "content-type: text/plain;charset=utf-8" not in headers():
 | 
						||
            raise AssertionError("/stream/extract is not text/plain:\n" +
 | 
						||
                                 headers())
 | 
						||
 | 
						||
        x = http.get("stream/extract",
 | 
						||
                            { "path": "/newton/prep",
 | 
						||
                              "start": "123",
 | 
						||
                              "end": "124",
 | 
						||
                              "binary": "1" })
 | 
						||
        if "transfer-encoding: chunked" not in headers():
 | 
						||
            warnings.warn("Non-chunked HTTP response for /stream/extract")
 | 
						||
        if "content-type: application/octet-stream" not in headers():
 | 
						||
            raise AssertionError("/stream/extract is not binary:\n" +
 | 
						||
                                 headers())
 | 
						||
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_08_unicode(self):
 | 
						||
        # Try both with and without posting JSON
 | 
						||
        for post_json in (False, True):
 | 
						||
            # Basic Unicode tests
 | 
						||
            client = nilmdb.client.Client(url = testurl, post_json = post_json)
 | 
						||
 | 
						||
            # Delete streams that exist
 | 
						||
            for stream in client.stream_list():
 | 
						||
                client.stream_remove(stream[0])
 | 
						||
                client.stream_destroy(stream[0])
 | 
						||
 | 
						||
            # Database is empty
 | 
						||
            eq_(client.stream_list(), [])
 | 
						||
 | 
						||
            # Create Unicode stream, match it
 | 
						||
            raw = [ u"/düsseldorf/raw", u"uint16_6" ]
 | 
						||
            prep = [ u"/düsseldorf/prep", u"uint16_6" ]
 | 
						||
            client.stream_create(*raw)
 | 
						||
            eq_(client.stream_list(), [raw])
 | 
						||
            eq_(client.stream_list(layout=raw[1]), [raw])
 | 
						||
            eq_(client.stream_list(path=raw[0]), [raw])
 | 
						||
            client.stream_create(*prep)
 | 
						||
            eq_(client.stream_list(), [prep, raw])
 | 
						||
 | 
						||
            # Set / get metadata with Unicode keys and values
 | 
						||
            eq_(client.stream_get_metadata(raw[0]), {})
 | 
						||
            eq_(client.stream_get_metadata(prep[0]), {})
 | 
						||
            meta1 = { u"alpha": u"α",
 | 
						||
                      u"β": u"beta" }
 | 
						||
            meta2 = { u"alpha": u"α" }
 | 
						||
            meta3 = { u"β": u"beta" }
 | 
						||
            client.stream_set_metadata(prep[0], meta1)
 | 
						||
            client.stream_update_metadata(prep[0], {})
 | 
						||
            client.stream_update_metadata(raw[0], meta2)
 | 
						||
            client.stream_update_metadata(raw[0], meta3)
 | 
						||
            eq_(client.stream_get_metadata(prep[0]), meta1)
 | 
						||
            eq_(client.stream_get_metadata(raw[0]), meta1)
 | 
						||
            eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
 | 
						||
            eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
 | 
						||
 | 
						||
            client.close()
 | 
						||
 | 
						||
    def test_client_09_closing(self):
 | 
						||
        # Make sure we actually close sockets correctly.  New
 | 
						||
        # connections will block for a while if they're not, since the
 | 
						||
        # server will stop accepting new connections.
 | 
						||
        for test in [1, 2]:
 | 
						||
            start = time.time()
 | 
						||
            for i in range(50):
 | 
						||
                if time.time() - start > 15:
 | 
						||
                    raise AssertionError("Connections seem to be blocking... "
 | 
						||
                                         "probably not closing properly.")
 | 
						||
                if test == 1:
 | 
						||
                    # explicit close
 | 
						||
                    client = nilmdb.client.Client(url = testurl)
 | 
						||
                    with assert_raises(ClientError) as e:
 | 
						||
                        client.stream_remove("/newton/prep", 123, 120)
 | 
						||
                    client.close() # remove this to see the failure
 | 
						||
                elif test == 2:
 | 
						||
                    # use the context manager
 | 
						||
                    with nilmdb.client.Client(url = testurl) as c:
 | 
						||
                        with assert_raises(ClientError) as e:
 | 
						||
                            c.stream_remove("/newton/prep", 123, 120)
 | 
						||
 | 
						||
    def test_client_10_context(self):
 | 
						||
        # Test using the client's stream insertion context manager to
 | 
						||
        # insert data.
 | 
						||
        client = nilmdb.client.Client(testurl)
 | 
						||
 | 
						||
        client.stream_create("/context/test", "uint16_1")
 | 
						||
        with client.stream_insert_context("/context/test") as ctx:
 | 
						||
            # override _max_data to trigger frequent server updates
 | 
						||
            ctx._max_data = 15
 | 
						||
 | 
						||
            ctx.insert("1000 1\n")
 | 
						||
 | 
						||
            ctx.insert("1010 ")
 | 
						||
            ctx.insert("1\n1020 1")
 | 
						||
            ctx.insert("")
 | 
						||
            ctx.insert("\n1030 1\n")
 | 
						||
 | 
						||
            ctx.insert("1040 1\n")
 | 
						||
            ctx.insert("# hello\n")
 | 
						||
            ctx.insert("   # hello\n")
 | 
						||
            ctx.insert("  1050 1\n")
 | 
						||
            ctx.finalize()
 | 
						||
 | 
						||
            ctx.insert("1070 1\n")
 | 
						||
            ctx.update_end(1080)
 | 
						||
            ctx.finalize()
 | 
						||
            ctx.update_start(1090)
 | 
						||
            ctx.insert("1100 1\n")
 | 
						||
            ctx.insert("1110 1\n")
 | 
						||
            ctx.send()
 | 
						||
            ctx.insert("1120 1\n")
 | 
						||
            ctx.insert("1130 1\n")
 | 
						||
            ctx.insert("1140 1\n")
 | 
						||
            ctx.update_end(1160)
 | 
						||
            ctx.insert("1150 1\n")
 | 
						||
            ctx.update_end(1170)
 | 
						||
            ctx.insert("1160 1\n")
 | 
						||
            ctx.update_end(1180)
 | 
						||
            ctx.insert("1170 1" +
 | 
						||
                       " # this is super long" * 100 +
 | 
						||
                       "\n")
 | 
						||
            ctx.finalize()
 | 
						||
            ctx.insert("# this is super long" * 100)
 | 
						||
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            with client.stream_insert_context("/context/test",
 | 
						||
                                              1000, 2000) as ctx:
 | 
						||
                ctx.insert("1180 1\n")
 | 
						||
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            with client.stream_insert_context("/context/test",
 | 
						||
                                              2000, 3000) as ctx:
 | 
						||
                ctx.insert("1180 1\n")
 | 
						||
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            with client.stream_insert_context("/context/test") as ctx:
 | 
						||
                ctx.insert("bogus data\n")
 | 
						||
 | 
						||
        with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
 | 
						||
            # make sure our override wasn't permanent
 | 
						||
            ne_(ctx._max_data, 15)
 | 
						||
            ctx.insert("2250 1\n")
 | 
						||
            ctx.finalize()
 | 
						||
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            with client.stream_insert_context("/context/test",
 | 
						||
                                              3000, 4000) as ctx:
 | 
						||
                ctx.insert("3010 1\n")
 | 
						||
                ctx.insert("3020 2\n")
 | 
						||
                ctx.insert("3030 3\n")
 | 
						||
                ctx.insert("3040 4\n")
 | 
						||
                ctx.insert("3040 4\n") # non-monotonic after a few lines
 | 
						||
                ctx.finalize()
 | 
						||
 | 
						||
        eq_(list(client.stream_intervals("/context/test")),
 | 
						||
            [ [ 1000, 1051 ],
 | 
						||
              [ 1070, 1080 ],
 | 
						||
              [ 1090, 1180 ],
 | 
						||
              [ 2000, 3000 ] ])
 | 
						||
 | 
						||
        # destroy stream (try without removing data first)
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            client.stream_destroy("/context/test")
 | 
						||
        client.stream_remove("/context/test")
 | 
						||
        client.stream_destroy("/context/test")
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_11_emptyintervals(self):
 | 
						||
        # Empty intervals are ok!  If recording detection events
 | 
						||
        # by inserting rows into the database, we want to be able to
 | 
						||
        # have an interval where no events occurred.  Test them here.
 | 
						||
        client = nilmdb.client.Client(testurl)
 | 
						||
        client.stream_create("/empty/test", "uint16_1")
 | 
						||
 | 
						||
        def info():
 | 
						||
            result = []
 | 
						||
            for interval in list(client.stream_intervals("/empty/test")):
 | 
						||
                result.append((client.stream_count("/empty/test", *interval),
 | 
						||
                               interval))
 | 
						||
            return result
 | 
						||
 | 
						||
        eq_(info(), [])
 | 
						||
 | 
						||
        # Insert a region with just a few points
 | 
						||
        with client.stream_insert_context("/empty/test") as ctx:
 | 
						||
            ctx.update_start(100)
 | 
						||
            ctx.insert("140 1\n")
 | 
						||
            ctx.insert("150 1\n")
 | 
						||
            ctx.insert("160 1\n")
 | 
						||
            ctx.update_end(200)
 | 
						||
            ctx.finalize()
 | 
						||
 | 
						||
        eq_(info(), [(3, [100, 200])])
 | 
						||
 | 
						||
        # Delete chunk, which will leave one data point and two intervals
 | 
						||
        client.stream_remove("/empty/test", 145, 175)
 | 
						||
        eq_(info(), [(1, [100, 145]),
 | 
						||
                     (0, [175, 200])])
 | 
						||
 | 
						||
        # Try also creating a completely empty interval from scratch,
 | 
						||
        # in a few different ways.
 | 
						||
        client.stream_insert("/empty/test", "", 300, 350)
 | 
						||
        client.stream_insert("/empty/test", [], 400, 450)
 | 
						||
        with client.stream_insert_context("/empty/test", 500, 550):
 | 
						||
            pass
 | 
						||
 | 
						||
        # If enough timestamps aren't provided, empty streams won't be created.
 | 
						||
        client.stream_insert("/empty/test", [])
 | 
						||
        with client.stream_insert_context("/empty/test"):
 | 
						||
            pass
 | 
						||
        client.stream_insert("/empty/test", [], start = 600)
 | 
						||
        with client.stream_insert_context("/empty/test", start = 700):
 | 
						||
            pass
 | 
						||
        client.stream_insert("/empty/test", [], end = 850)
 | 
						||
        with client.stream_insert_context("/empty/test", end = 950):
 | 
						||
            pass
 | 
						||
 | 
						||
        # Try various things that might cause problems
 | 
						||
        with client.stream_insert_context("/empty/test", 1000, 1050):
 | 
						||
            ctx.finalize() # inserts [1000, 1050]
 | 
						||
            ctx.finalize() # nothing
 | 
						||
            ctx.finalize() # nothing
 | 
						||
            ctx.insert("1100 1\n")
 | 
						||
            ctx.finalize() # inserts [1100, 1101]
 | 
						||
            ctx.update_start(1199)
 | 
						||
            ctx.insert("1200 1\n")
 | 
						||
            ctx.update_end(1250)
 | 
						||
            ctx.finalize() # inserts [1199, 1250]
 | 
						||
            ctx.update_start(1299)
 | 
						||
            ctx.finalize() # nothing
 | 
						||
            ctx.update_end(1350)
 | 
						||
            ctx.finalize() # nothing
 | 
						||
            ctx.update_start(1400)
 | 
						||
            ctx.insert("# nothing!\n")
 | 
						||
            ctx.update_end(1450)
 | 
						||
            ctx.finalize()
 | 
						||
            ctx.update_start(1500)
 | 
						||
            ctx.insert("# nothing!")
 | 
						||
            ctx.update_end(1550)
 | 
						||
            ctx.finalize()
 | 
						||
            ctx.insert("# nothing!\n" * 10)
 | 
						||
            ctx.finalize()
 | 
						||
            # implicit last finalize inserts [1400, 1450]
 | 
						||
 | 
						||
        # Check everything
 | 
						||
        eq_(info(), [(1, [100, 145]),
 | 
						||
                     (0, [175, 200]),
 | 
						||
                     (0, [300, 350]),
 | 
						||
                     (0, [400, 450]),
 | 
						||
                     (0, [500, 550]),
 | 
						||
                     (0, [1000, 1050]),
 | 
						||
                     (1, [1100, 1101]),
 | 
						||
                     (1, [1199, 1250]),
 | 
						||
                     (0, [1400, 1450]),
 | 
						||
                     (0, [1500, 1550]),
 | 
						||
                     ])
 | 
						||
 | 
						||
        # Clean up
 | 
						||
        client.stream_remove("/empty/test")
 | 
						||
        client.stream_destroy("/empty/test")
 | 
						||
        client.close()
 | 
						||
 | 
						||
    def test_client_12_persistent(self):
 | 
						||
        # Check that connections are persistent when they should be.
 | 
						||
        # This is pretty hard to test; we have to poke deep into
 | 
						||
        # the Requests library.
 | 
						||
        with nilmdb.client.Client(url = testurl) as c:
 | 
						||
            def connections():
 | 
						||
                try:
 | 
						||
                    poolmanager = c.http._last_response.connection.poolmanager
 | 
						||
                    pool = poolmanager.pools[('http','localhost',32180)]
 | 
						||
                    return (pool.num_connections, pool.num_requests)
 | 
						||
                except Exception:
 | 
						||
                    raise SkipTest("can't get connection info")
 | 
						||
 | 
						||
            # First request makes a connection
 | 
						||
            c.stream_create("/persist/test", "uint16_1")
 | 
						||
            eq_(connections(), (1, 1))
 | 
						||
 | 
						||
            # Non-generator
 | 
						||
            c.stream_list("/persist/test")
 | 
						||
            eq_(connections(), (1, 2))
 | 
						||
            c.stream_list("/persist/test")
 | 
						||
            eq_(connections(), (1, 3))
 | 
						||
 | 
						||
            # Generators
 | 
						||
            for x in c.stream_intervals("/persist/test"):
 | 
						||
                pass
 | 
						||
            eq_(connections(), (1, 4))
 | 
						||
            for x in c.stream_intervals("/persist/test"):
 | 
						||
                pass
 | 
						||
            eq_(connections(), (1, 5))
 | 
						||
 | 
						||
            # Clean up
 | 
						||
            c.stream_remove("/persist/test")
 | 
						||
            c.stream_destroy("/persist/test")
 | 
						||
            eq_(connections(), (1, 7))
 | 
						||
 | 
						||
    def test_client_13_timestamp_rounding(self):
 | 
						||
        # Test potentially bad timestamps (due to floating point
 | 
						||
        # roundoff etc).  The server will round floating point values
 | 
						||
        # to the nearest int.
 | 
						||
        client = nilmdb.client.Client(testurl)
 | 
						||
 | 
						||
        client.stream_create("/rounding/test", "uint16_1")
 | 
						||
        with client.stream_insert_context("/rounding/test",
 | 
						||
                                          100000000, 200000000.1) as ctx:
 | 
						||
            ctx.insert("100000000.1 1\n")
 | 
						||
            ctx.insert("150000000.00003 1\n")
 | 
						||
            ctx.insert("199999999.4 1\n")
 | 
						||
        eq_(list(client.stream_intervals("/rounding/test")),
 | 
						||
            [ [ 100000000, 200000000 ] ])
 | 
						||
 | 
						||
        with assert_raises(ClientError):
 | 
						||
            with client.stream_insert_context("/rounding/test",
 | 
						||
                                              200000000, 300000000) as ctx:
 | 
						||
                ctx.insert("200000000 1\n")
 | 
						||
                ctx.insert("250000000 1\n")
 | 
						||
                # Server will round this and give an error on finalize()
 | 
						||
                ctx.insert("299999999.99 1\n")
 | 
						||
 | 
						||
        client.stream_remove("/rounding/test")
 | 
						||
        client.stream_destroy("/rounding/test")
 | 
						||
        client.close()
 |