|
|
@@ -230,6 +230,7 @@ class StreamInserter(object): |
|
|
|
# See design.md for a discussion of how much data to send. This |
|
|
|
# is a soft limit -- we might send up to twice as much or so |
|
|
|
_max_data = 2 * 1024 * 1024 |
|
|
|
_max_data_after_send = 64 * 1024 |
|
|
|
|
|
|
|
# Delta to add to the final timestamp, if "end" wasn't given |
|
|
|
_end_epsilon = 1e-6 |
|
|
@@ -275,6 +276,10 @@ class StreamInserter(object): |
|
|
|
# Send the block once we have enough data |
|
|
|
if self._block_len >= maxdata: |
|
|
|
self._send_block(final = False) |
|
|
|
if self._block_len >= self._max_data_after_send: # pragma: no cover |
|
|
|
raise ValueError("too much data left over after trying" |
|
|
|
" to send intermediate block; is it" |
|
|
|
" missing newlines or malformed?") |
|
|
|
|
|
|
|
def update_start(self, start): |
|
|
|
"""Update the start time for the next contiguous interval. |
|
|
@@ -371,7 +376,7 @@ class StreamInserter(object): |
|
|
|
(spos, epos) = self._get_last_noncomment(block) |
|
|
|
end_ts = extract_timestamp(block[spos:epos]) |
|
|
|
except (ValueError, IndexError): |
|
|
|
# If we found no timestamp, give up; we'll send this |
|
|
|
# If we found no timestamp, give up; we could send this |
|
|
|
# block later when we have more data. |
|
|
|
return |
|
|
|
if spos == 0: |
|
|
@@ -403,3 +408,5 @@ class StreamInserter(object): |
|
|
|
"start": float_time_to_string(start_ts), |
|
|
|
"end": float_time_to_string(end_ts) } |
|
|
|
self.last_response = self._http.put("stream/insert", block, params) |
|
|
|
|
|
|
|
return |