|
|
@@ -1,142 +0,0 @@ |
|
|
|
# Python implementation of the "rocket" data parsing interface. |
|
|
|
# This interface translates between the binary format on disk |
|
|
|
# and the ASCII format used when communicating with clients. |
|
|
|
|
|
|
|
# This is slow! Use the C version instead. |
|
|
|
|
|
|
|
from __future__ import absolute_import |
|
|
|
import struct |
|
|
|
import cStringIO |
|
|
|
import itertools |
|
|
|
from . import layout as _layout |
|
|
|
import nilmdb.utils |
|
|
|
|
|
|
|
ERR_UNKNOWN = 0 |
|
|
|
ERR_NON_MONOTONIC = 1 |
|
|
|
ERR_OUT_OF_INTERVAL = 2 |
|
|
|
class ParseError(Exception): |
|
|
|
pass |
|
|
|
|
|
|
|
@nilmdb.utils.must_close(wrap_verify = False) |
|
|
|
class Rocket(object): |
|
|
|
def __init__(self, layout, filename): |
|
|
|
self.layout = layout |
|
|
|
if filename: |
|
|
|
self.file = open(filename, "a+b") |
|
|
|
else: |
|
|
|
self.file = None |
|
|
|
|
|
|
|
# For packing/unpacking into a binary file. |
|
|
|
# This will change in the C version |
|
|
|
try: |
|
|
|
(self.ltype, lcount) = layout.split('_', 2) |
|
|
|
self.lcount = int(lcount) |
|
|
|
except: |
|
|
|
raise ValueError("no such layout: badly formatted string") |
|
|
|
if self.lcount < 1: |
|
|
|
raise ValueError("no such layout: bad count") |
|
|
|
try: |
|
|
|
struct_fmt = '<d' # Little endian, double timestamp |
|
|
|
struct_mapping = { |
|
|
|
"int8": 'b', |
|
|
|
"uint8": 'B', |
|
|
|
"int16": 'h', |
|
|
|
"uint16": 'H', |
|
|
|
"int32": 'i', |
|
|
|
"uint32": 'I', |
|
|
|
"int64": 'q', |
|
|
|
"uint64": 'Q', |
|
|
|
"float32": 'f', |
|
|
|
"float64": 'd', |
|
|
|
} |
|
|
|
struct_fmt += struct_mapping[self.ltype] * self.lcount |
|
|
|
except KeyError: |
|
|
|
raise ValueError("no such layout: bad data type") |
|
|
|
self.packer = struct.Struct(struct_fmt) |
|
|
|
|
|
|
|
# For packing/unpacking from strings. |
|
|
|
self.layoutparser = _layout.Layout(self.layout) |
|
|
|
self.formatter = _layout.Formatter(self.layout) |
|
|
|
|
|
|
|
def close(self): |
|
|
|
if self.file: |
|
|
|
self.file.close() |
|
|
|
|
|
|
|
@property |
|
|
|
def binary_size(self): |
|
|
|
"""Return size of one row of data in the binary file, in bytes""" |
|
|
|
return self.packer.size |
|
|
|
|
|
|
|
def append_iter(self, maxrows, data): |
|
|
|
"""Append the list data to the file""" |
|
|
|
# We assume the file is opened in append mode, |
|
|
|
# so all writes go to the end. |
|
|
|
written = 0 |
|
|
|
for row in itertools.islice(data, maxrows): |
|
|
|
self.file.write(self.packer.pack(*row)) |
|
|
|
written += 1 |
|
|
|
self.file.flush() |
|
|
|
return written |
|
|
|
|
|
|
|
def append_string(self, count, data, data_offset, linenum, |
|
|
|
start, end, last_timestamp): |
|
|
|
"""Parse string and append data. |
|
|
|
|
|
|
|
count: maximum number of rows to add |
|
|
|
data: string data |
|
|
|
data_offset: byte offset into data to start parsing |
|
|
|
linenum: current line number of data |
|
|
|
start: starting timestamp for interval |
|
|
|
end: end timestamp for interval |
|
|
|
last_timestamp: last timestamp that was previously parsed |
|
|
|
|
|
|
|
Raises ParseError if timestamps are non-monotonic, outside the |
|
|
|
start/end interval, etc. |
|
|
|
|
|
|
|
On success, return a tuple with three values: |
|
|
|
added_rows: how many rows were added from the file |
|
|
|
data_offset: current offset into the data string |
|
|
|
last_timestamp: last timestamp we parsed |
|
|
|
""" |
|
|
|
# Parse the input data |
|
|
|
indata = cStringIO.StringIO(data) |
|
|
|
indata.seek(data_offset) |
|
|
|
written = 0 |
|
|
|
while written < count: |
|
|
|
line = indata.readline() |
|
|
|
linenum += 1 |
|
|
|
if line == "": |
|
|
|
break |
|
|
|
comment = line.find('#') |
|
|
|
if comment >= 0: |
|
|
|
line = line.split('#', 1)[0] |
|
|
|
line = line.strip() |
|
|
|
if line == "": |
|
|
|
continue |
|
|
|
try: |
|
|
|
(ts, row) = self.layoutparser.parse(line) |
|
|
|
except ValueError as e: |
|
|
|
raise ParseError(linenum, 0, ERR_UNKNOWN, e) |
|
|
|
if ts <= last_timestamp: |
|
|
|
raise ParseError(linenum, 0, ERR_NON_MONOTONIC, ts) |
|
|
|
last_timestamp = ts |
|
|
|
if ts < start or ts >= end: |
|
|
|
raise ParseError(linenum, 0, ERR_OUT_OF_INTERVAL, ts) |
|
|
|
self.append_iter(1, [row]) |
|
|
|
written += 1 |
|
|
|
return (written, indata.tell(), last_timestamp, linenum) |
|
|
|
|
|
|
|
def extract_list(self, offset, count): |
|
|
|
"""Extract count rows of data from the file at offset offset. |
|
|
|
Return a list of lists [[row],[row],...]""" |
|
|
|
ret = [] |
|
|
|
self.file.seek(offset) |
|
|
|
for i in xrange(count): |
|
|
|
data = self.file.read(self.binary_size) |
|
|
|
ret.append(list(self.packer.unpack(data))) |
|
|
|
return ret |
|
|
|
|
|
|
|
def extract_string(self, offset, count): |
|
|
|
"""Extract count rows of data from the file at offset offset. |
|
|
|
Return an ascii formatted string according to the layout""" |
|
|
|
return self.formatter.format(self.extract_list(offset, count)) |