You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

701 lines
27 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the table storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in
  7. # nilmdb.py, but will pull the parent nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb.utils
  10. from nilmdb.utils.printf import *
  11. from nilmdb.utils.time import timestamp_to_string
  12. from nilmdb.utils.interval import IntervalError
  13. from nilmdb.server.interval import Interval, DBInterval, IntervalSet
  14. from nilmdb.server import bulkdata
  15. from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
  16. import sqlite3
  17. import os
  18. import errno
  19. import bisect
  20. # Note about performance and transactions:
  21. #
  22. # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
  23. # takes about 125msec. sqlite3 will commit transactions at 3 times:
  24. # 1: explicit con.commit()
  25. # 2: between a series of DML commands and non-DML commands, e.g.
  26. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
  27. # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
  28. #
  29. # To speed things up, we can set 'PRAGMA synchronous=OFF'. Or, it
  30. # seems that 'PRAGMA synchronous=NORMAL' and 'PRAGMA journal_mode=WAL'
  31. # give an equivalent speedup more safely. That is what is used here.
  32. _sql_schema_updates = {
  33. 0: { "next": 1, "sql": """
  34. -- All streams
  35. CREATE TABLE streams(
  36. id INTEGER PRIMARY KEY, -- stream ID
  37. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  38. layout TEXT NOT NULL -- layout name, e.g. float32_8
  39. );
  40. -- Individual timestamped ranges in those streams.
  41. -- For a given start_time and end_time, this tells us that the
  42. -- data is stored between start_pos and end_pos.
  43. -- Times are stored as μs since Unix epoch
  44. -- Positions are opaque: PyTables rows, file offsets, etc.
  45. --
  46. -- Note: end_pos points to the row _after_ end_time, so end_pos-1
  47. -- is the last valid row.
  48. CREATE TABLE ranges(
  49. stream_id INTEGER NOT NULL,
  50. start_time INTEGER NOT NULL,
  51. end_time INTEGER NOT NULL,
  52. start_pos INTEGER NOT NULL,
  53. end_pos INTEGER NOT NULL
  54. );
  55. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  56. """ },
  57. 1: { "next": 3, "sql": """
  58. -- Generic dictionary-type metadata that can be associated with a stream
  59. CREATE TABLE metadata(
  60. stream_id INTEGER NOT NULL,
  61. key TEXT NOT NULL,
  62. value TEXT
  63. );
  64. """ },
  65. 2: { "error": "old format with floating-point timestamps requires "
  66. "nilmdb 1.3.1 or older" },
  67. 3: { "next": None },
  68. }
  69. @nilmdb.utils.must_close()
  70. class NilmDB(object):
  71. verbose = 0
  72. def __init__(self, basepath, max_results=None,
  73. max_removals=None, bulkdata_args=None):
  74. """Initialize NilmDB at the given basepath.
  75. Other arguments are for debugging / testing:
  76. 'max_results' is the max rows to send in a single
  77. stream_intervals or stream_extract response.
  78. 'max_removals' is the max rows to delete at once
  79. in stream_move.
  80. 'bulkdata_args' is kwargs for the bulkdata module.
  81. """
  82. if bulkdata_args is None:
  83. bulkdata_args = {}
  84. # set up path
  85. self.basepath = os.path.abspath(basepath)
  86. # Create the database path if it doesn't exist
  87. try:
  88. os.makedirs(self.basepath)
  89. except OSError as e:
  90. if e.errno != errno.EEXIST: # pragma: no cover
  91. # (no coverage, because it's hard to trigger this case
  92. # if tests are run as root)
  93. raise IOError("can't create tree " + self.basepath)
  94. # Our data goes inside it
  95. self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
  96. # SQLite database too
  97. sqlfilename = os.path.join(self.basepath, "data.sql")
  98. self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
  99. try:
  100. self._sql_schema_update()
  101. except Exception: # pragma: no cover
  102. self.data.close()
  103. raise
  104. # See big comment at top about the performance implications of this
  105. self.con.execute("PRAGMA synchronous=NORMAL")
  106. self.con.execute("PRAGMA journal_mode=WAL")
  107. # Approximate largest number of elements that we want to send
  108. # in a single reply (for stream_intervals, stream_extract).
  109. self.max_results = max_results or 16384
  110. # Remove up to this many rows per call to stream_remove.
  111. self.max_removals = max_removals or 1048576
  112. def get_basepath(self):
  113. return self.basepath
  114. def close(self):
  115. if self.con:
  116. self.con.commit()
  117. self.con.close()
  118. self.data.close()
  119. def _sql_schema_update(self):
  120. cur = self.con.cursor()
  121. version = cur.execute("PRAGMA user_version").fetchone()[0]
  122. oldversion = version
  123. while True:
  124. if version not in _sql_schema_updates: # pragma: no cover
  125. raise Exception(self.basepath + ": unknown database version "
  126. + str(version))
  127. update = _sql_schema_updates[version]
  128. if "error" in update: # pragma: no cover
  129. raise Exception(self.basepath + ": can't use database version "
  130. + str(version) + ": " + update["error"])
  131. if update["next"] is None:
  132. break
  133. cur.executescript(update["sql"])
  134. version = update["next"]
  135. if self.verbose: # pragma: no cover
  136. printf("Database schema updated to %d\n", version)
  137. if version != oldversion:
  138. with self.con:
  139. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  140. def _check_user_times(self, start, end):
  141. if start is None:
  142. start = nilmdb.utils.time.min_timestamp
  143. if end is None:
  144. end = nilmdb.utils.time.max_timestamp
  145. if start >= end:
  146. raise NilmDBError("start must precede end")
  147. return (start, end)
  148. @nilmdb.utils.lru_cache(size = 64)
  149. def _get_intervals(self, stream_id):
  150. """
  151. Return a mutable IntervalSet corresponding to the given stream ID.
  152. """
  153. iset = IntervalSet()
  154. result = self.con.execute("SELECT start_time, end_time, "
  155. "start_pos, end_pos "
  156. "FROM ranges "
  157. "WHERE stream_id=?", (stream_id,))
  158. try:
  159. for (start_time, end_time, start_pos, end_pos) in result:
  160. iset += DBInterval(start_time, end_time,
  161. start_time, end_time,
  162. start_pos, end_pos)
  163. except IntervalError: # pragma: no cover
  164. raise NilmDBError("unexpected overlap in ranges table!")
  165. return iset
  166. def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
  167. """Helper that adds interval to the SQL database only"""
  168. self.con.execute("INSERT INTO ranges "
  169. "(stream_id,start_time,end_time,start_pos,end_pos) "
  170. "VALUES (?,?,?,?,?)",
  171. (id, start, end, start_pos, end_pos))
  172. def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
  173. """Helper that removes interval from the SQL database only"""
  174. self.con.execute("DELETE FROM ranges WHERE "
  175. "stream_id=? AND start_time=? AND "
  176. "end_time=? AND start_pos=? AND end_pos=?",
  177. (id, start, end, start_pos, end_pos))
  178. def _add_interval(self, stream_id, interval, start_pos, end_pos):
  179. """
  180. Add interval to the internal interval cache, and to the database.
  181. Note: arguments must be ints (not numpy.int64, etc)
  182. """
  183. # Load this stream's intervals
  184. iset = self._get_intervals(stream_id)
  185. # Check for overlap
  186. if iset.intersects(interval): # pragma: no cover (gets caught earlier)
  187. raise NilmDBError("new interval overlaps existing data")
  188. # Check for adjacency. If there's a stream in the database
  189. # that ends exactly when this one starts, and the database
  190. # rows match up, we can make one interval that covers the
  191. # time range [adjacent.start -> interval.end)
  192. # and database rows [ adjacent.start_pos -> end_pos ].
  193. # Only do this if the resulting interval isn't too large.
  194. max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
  195. adjacent = iset.find_end(interval.start)
  196. if (adjacent is not None and
  197. start_pos == adjacent.db_endpos and
  198. (end_pos - adjacent.db_startpos) < max_merged_rows):
  199. # First delete the old one, both from our iset and the
  200. # database
  201. iset -= adjacent
  202. self._sql_interval_delete(stream_id,
  203. adjacent.db_start, adjacent.db_end,
  204. adjacent.db_startpos, adjacent.db_endpos)
  205. # Now update our interval so the fallthrough add is
  206. # correct.
  207. interval.start = adjacent.start
  208. start_pos = adjacent.db_startpos
  209. # Add the new interval to the iset
  210. iset.iadd_nocheck(DBInterval(interval.start, interval.end,
  211. interval.start, interval.end,
  212. start_pos, end_pos))
  213. # Insert into the database
  214. self._sql_interval_insert(stream_id, interval.start, interval.end,
  215. int(start_pos), int(end_pos))
  216. self.con.commit()
  217. def _remove_interval(self, stream_id, original, remove):
  218. """
  219. Remove an interval from the internal cache and the database.
  220. stream_id: id of stream
  221. original: original DBInterval; must be already present in DB
  222. to_remove: DBInterval to remove; must be subset of 'original'
  223. """
  224. # Just return if we have nothing to remove
  225. if remove.start == remove.end: # pragma: no cover
  226. return
  227. # Load this stream's intervals
  228. iset = self._get_intervals(stream_id)
  229. # Remove existing interval from the cached set and the database
  230. iset -= original
  231. self._sql_interval_delete(stream_id,
  232. original.db_start, original.db_end,
  233. original.db_startpos, original.db_endpos)
  234. # Add back the intervals that would be left over if the
  235. # requested interval is removed. There may be two of them, if
  236. # the removed piece was in the middle.
  237. def add(iset, start, end, start_pos, end_pos):
  238. iset += DBInterval(start, end, start, end, start_pos, end_pos)
  239. self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
  240. if original.start != remove.start:
  241. # Interval before the removed region
  242. add(iset, original.start, remove.start,
  243. original.db_startpos, remove.db_startpos)
  244. if original.end != remove.end:
  245. # Interval after the removed region
  246. add(iset, remove.end, original.end,
  247. remove.db_endpos, original.db_endpos)
  248. # Commit SQL changes
  249. self.con.commit()
  250. return
  251. def stream_list(self, path = None, layout = None, extended = False):
  252. """Return list of lists of all streams in the database.
  253. If path is specified, include only streams with a path that
  254. matches the given string.
  255. If layout is specified, include only streams with a layout
  256. that matches the given string.
  257. If extended = False, returns a list of lists containing
  258. the path and layout: [ path, layout ]
  259. If extended = True, returns a list of lists containing
  260. more information:
  261. path
  262. layout
  263. interval_min (earliest interval start)
  264. interval_max (latest interval end)
  265. rows (total number of rows of data)
  266. time (total time covered by this stream, in timestamp units)
  267. """
  268. params = ()
  269. query = "SELECT streams.path, streams.layout"
  270. if extended:
  271. query += ", min(ranges.start_time), max(ranges.end_time) "
  272. query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
  273. query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
  274. query += " FROM streams"
  275. if extended:
  276. query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
  277. query += " WHERE 1=1"
  278. if layout is not None:
  279. query += " AND streams.layout=?"
  280. params += (layout,)
  281. if path is not None:
  282. query += " AND streams.path=?"
  283. params += (path,)
  284. query += " GROUP BY streams.id ORDER BY streams.path"
  285. result = self.con.execute(query, params).fetchall()
  286. return [ list(x) for x in result ]
  287. def stream_intervals(self, path, start = None, end = None, diffpath = None):
  288. """
  289. List all intervals in 'path' between 'start' and 'end'. If
  290. 'diffpath' is not none, list instead the set-difference
  291. between the intervals in the two streams; i.e. all interval
  292. ranges that are present in 'path' but not 'diffpath'.
  293. Returns (intervals, restart) tuple.
  294. 'intervals' is a list of [start,end] timestamps of all intervals
  295. that exist for path, between start and end.
  296. 'restart', if not None, means that there were too many results
  297. to return in a single request. The data is complete from the
  298. starting timestamp to the point at which it was truncated, and
  299. a new request with a start time of 'restart' will fetch the
  300. next block of data.
  301. """
  302. stream_id = self._stream_id(path)
  303. intervals = self._get_intervals(stream_id)
  304. if diffpath:
  305. diffstream_id = self._stream_id(diffpath)
  306. diffintervals = self._get_intervals(diffstream_id)
  307. (start, end) = self._check_user_times(start, end)
  308. requested = Interval(start, end)
  309. result = []
  310. if diffpath:
  311. getter = nilmdb.utils.interval.set_difference(
  312. intervals.intersection(requested),
  313. diffintervals.intersection(requested))
  314. else:
  315. getter = intervals.intersection(requested)
  316. for n, i in enumerate(getter):
  317. if n >= self.max_results:
  318. restart = i.start
  319. break
  320. result.append([i.start, i.end])
  321. else:
  322. restart = None
  323. return (result, restart)
  324. def stream_create(self, path, layout_name):
  325. """Create a new table in the database.
  326. path: path to the data (e.g. '/newton/prep').
  327. Paths must contain at least two elements, e.g.:
  328. /newton/prep
  329. /newton/raw
  330. /newton/upstairs/prep
  331. /newton/upstairs/raw
  332. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  333. """
  334. # Create the bulk storage. Raises ValueError on error, which we
  335. # pass along.
  336. self.data.create(path, layout_name)
  337. # Insert into SQL database once the bulk storage is happy
  338. with self.con as con:
  339. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  340. (path, layout_name))
  341. def _stream_id(self, path):
  342. """Return unique stream ID"""
  343. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  344. (path,)).fetchone()
  345. if result is None:
  346. raise StreamError("No stream at path " + path)
  347. return result[0]
  348. def stream_set_metadata(self, path, data):
  349. """Set stream metadata from a dictionary, e.g.
  350. { description = 'Downstairs lighting',
  351. v_scaling = 123.45 }
  352. This replaces all existing metadata.
  353. """
  354. stream_id = self._stream_id(path)
  355. with self.con as con:
  356. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  357. for key in data:
  358. if data[key] != '':
  359. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  360. (stream_id, key, data[key]))
  361. def stream_get_metadata(self, path):
  362. """Return stream metadata as a dictionary."""
  363. stream_id = self._stream_id(path)
  364. result = self.con.execute("SELECT metadata.key, metadata.value "
  365. "FROM metadata "
  366. "WHERE metadata.stream_id=?", (stream_id,))
  367. data = {}
  368. for (key, value) in result:
  369. data[key] = value
  370. return data
  371. def stream_update_metadata(self, path, newdata):
  372. """Update stream metadata from a dictionary"""
  373. data = self.stream_get_metadata(path)
  374. data.update(newdata)
  375. self.stream_set_metadata(path, data)
  376. def stream_rename(self, oldpath, newpath):
  377. """Rename a stream."""
  378. stream_id = self._stream_id(oldpath)
  379. # Rename the data
  380. self.data.rename(oldpath, newpath)
  381. # Rename the stream in the database
  382. with self.con as con:
  383. con.execute("UPDATE streams SET path=? WHERE id=?",
  384. (newpath, stream_id))
  385. def stream_destroy(self, path):
  386. """Fully remove a table from the database. Fails if there are
  387. any intervals data present; remove them first. Metadata is
  388. also removed."""
  389. stream_id = self._stream_id(path)
  390. # Verify that no intervals are present, and clear the cache
  391. iset = self._get_intervals(stream_id)
  392. if len(iset):
  393. raise NilmDBError("all intervals must be removed before "
  394. "destroying a stream")
  395. self._get_intervals.cache_remove(self, stream_id)
  396. # Delete the bulkdata storage
  397. self.data.destroy(path)
  398. # Delete metadata, stream, intervals (should be none)
  399. with self.con as con:
  400. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  401. con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
  402. con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
  403. def stream_insert(self, path, start, end, data, binary = False):
  404. """Insert new data into the database.
  405. path: Path at which to add the data
  406. start: Starting timestamp
  407. end: Ending timestamp
  408. data: Textual data, formatted according to the layout of path
  409. 'binary', if True, means that 'data' is raw binary:
  410. little-endian, matching the current table's layout,
  411. including the int64 timestamp.
  412. """
  413. # First check for basic overlap using timestamp info given.
  414. stream_id = self._stream_id(path)
  415. iset = self._get_intervals(stream_id)
  416. interval = Interval(start, end)
  417. if iset.intersects(interval):
  418. raise OverlapError("new data overlaps existing data at range: "
  419. + str(iset & interval))
  420. # Tenatively append the data. This will raise a ValueError if
  421. # there are any parse errors.
  422. table = self.data.getnode(path)
  423. row_start = table.nrows
  424. table.append_data(data, start, end, binary)
  425. row_end = table.nrows
  426. # Insert the record into the sql database.
  427. self._add_interval(stream_id, interval, row_start, row_end)
  428. # And that's all
  429. return
  430. def _find_start(self, table, dbinterval):
  431. """
  432. Given a DBInterval, find the row in the database that
  433. corresponds to the start time. Return the first database
  434. position with a timestamp (first element) greater than or
  435. equal to 'start'.
  436. """
  437. # Optimization for the common case where an interval wasn't truncated
  438. if dbinterval.start == dbinterval.db_start:
  439. return dbinterval.db_startpos
  440. return bisect.bisect_left(table,
  441. dbinterval.start,
  442. dbinterval.db_startpos,
  443. dbinterval.db_endpos)
  444. def _find_end(self, table, dbinterval):
  445. """
  446. Given a DBInterval, find the row in the database that follows
  447. the end time. Return the first database position after the
  448. row with timestamp (first element) greater than or equal
  449. to 'end'.
  450. """
  451. # Optimization for the common case where an interval wasn't truncated
  452. if dbinterval.end == dbinterval.db_end:
  453. return dbinterval.db_endpos
  454. # Note that we still use bisect_left here, because we don't
  455. # want to include the given timestamp in the results. This is
  456. # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
  457. # non-overlapping data.
  458. return bisect.bisect_left(table,
  459. dbinterval.end,
  460. dbinterval.db_startpos,
  461. dbinterval.db_endpos)
  462. def stream_extract(self, path, start = None, end = None,
  463. count = False, markup = False, binary = False):
  464. """
  465. Returns (data, restart) tuple.
  466. 'data' is ASCII-formatted data from the database, formatted
  467. according to the layout of the stream.
  468. 'restart', if not None, means that there were too many results to
  469. return in a single request. The data is complete from the
  470. starting timestamp to the point at which it was truncated,
  471. and a new request with a start time of 'restart' will fetch
  472. the next block of data.
  473. 'count', if true, means to not return raw data, but just the count
  474. of rows that would have been returned. This is much faster
  475. than actually fetching the data. It is not limited by
  476. max_results.
  477. 'markup', if true, indicates that returned data should be
  478. marked with a comment denoting when a particular interval
  479. starts, and another comment when an interval ends.
  480. 'binary', if true, means to return raw binary rather than
  481. ASCII-formatted data.
  482. """
  483. stream_id = self._stream_id(path)
  484. table = self.data.getnode(path)
  485. intervals = self._get_intervals(stream_id)
  486. (start, end) = self._check_user_times(start, end)
  487. requested = Interval(start, end)
  488. result = []
  489. matched = 0
  490. remaining = self.max_results
  491. restart = None
  492. if binary and (markup or count):
  493. raise NilmDBError("binary mode can't be used with markup or count")
  494. for interval in intervals.intersection(requested):
  495. # Reading single rows from the table is too slow, so
  496. # we use two bisections to find both the starting and
  497. # ending row for this particular interval, then
  498. # read the entire range as one slice.
  499. row_start = self._find_start(table, interval)
  500. row_end = self._find_end(table, interval)
  501. if count:
  502. matched += row_end - row_start
  503. continue
  504. # Shorten it if we'll hit the maximum number of results
  505. row_max = row_start + remaining
  506. if row_max < row_end:
  507. row_end = row_max
  508. restart = table[row_max]
  509. # Add markup
  510. if markup:
  511. result.append("# interval-start " +
  512. timestamp_to_string(interval.start) + "\n")
  513. # Gather these results up
  514. result.append(table.get_data(row_start, row_end, binary))
  515. # Count them
  516. remaining -= row_end - row_start
  517. # Add markup, and exit if restart is set.
  518. if restart is not None:
  519. if markup:
  520. result.append("# interval-end " +
  521. timestamp_to_string(restart) + "\n")
  522. break
  523. if markup:
  524. result.append("# interval-end " +
  525. timestamp_to_string(interval.end) + "\n")
  526. if count:
  527. return matched
  528. return ("".join(result), restart)
  529. def stream_remove(self, path, start = None, end = None):
  530. """
  531. Remove data from the specified time interval within a stream.
  532. Removes data in the interval [start, end), and intervals are
  533. truncated or split appropriately.
  534. Returns a (removed, restart) tuple.
  535. 'removed' is the number of data points that were removed.
  536. 'restart', if not None, means there were too many rows to
  537. remove in a single request. This function should be called
  538. again with a start time of 'restart' to complete the removal.
  539. """
  540. stream_id = self._stream_id(path)
  541. table = self.data.getnode(path)
  542. intervals = self._get_intervals(stream_id)
  543. (start, end) = self._check_user_times(start, end)
  544. to_remove = Interval(start, end)
  545. removed = 0
  546. remaining = self.max_removals
  547. restart = None
  548. # Can't remove intervals from within the iterator, so we need to
  549. # remember what's currently in the intersection now.
  550. all_candidates = list(intervals.intersection(to_remove, orig = True))
  551. remove_start = None
  552. remove_end = None
  553. for (dbint, orig) in all_candidates:
  554. # Find row start and end
  555. row_start = self._find_start(table, dbint)
  556. row_end = self._find_end(table, dbint)
  557. # Shorten it if we'll hit the maximum number of removals
  558. row_max = row_start + remaining
  559. if row_max < row_end:
  560. row_end = row_max
  561. dbint.end = table[row_max]
  562. restart = dbint.end
  563. # Adjust the DBInterval to match the newly found ends
  564. dbint.db_start = dbint.start
  565. dbint.db_end = dbint.end
  566. dbint.db_startpos = row_start
  567. dbint.db_endpos = row_end
  568. # Remove interval from the database
  569. self._remove_interval(stream_id, orig, dbint)
  570. # Remove data from the underlying table storage,
  571. # coalescing adjacent removals to reduce the number of calls
  572. # to table.remove.
  573. if remove_end == row_start:
  574. # Extend our coalesced region
  575. remove_end = row_end
  576. else:
  577. # Perform previous removal, then save this one
  578. if remove_end is not None:
  579. table.remove(remove_start, remove_end)
  580. remove_start = row_start
  581. remove_end = row_end
  582. # Count how many were removed
  583. removed += row_end - row_start
  584. remaining -= row_end - row_start
  585. if restart is not None:
  586. break
  587. # Perform any final coalesced removal
  588. if remove_end is not None:
  589. table.remove(remove_start, remove_end)
  590. return (removed, restart)