Compare commits
	
		
			6 Commits
		
	
	
		
			276fbc652a
			...
			e0559c2ed1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| e0559c2ed1 | |||
| 759492298a | |||
| b5f6fcc253 | |||
| 905e325ded | |||
| 648b6f4b70 | |||
| 7f8a2c7027 | 
@@ -39,6 +39,10 @@ class RetryFsck(FsckError):
 | 
				
			|||||||
    pass
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class FsckFormatError(FsckError):
 | 
				
			||||||
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def log(format, *args):
 | 
					def log(format, *args):
 | 
				
			||||||
    printf(format, *args)
 | 
					    printf(format, *args)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -238,16 +242,28 @@ class Fsck(object):
 | 
				
			|||||||
                                            "set: %s\nnew: %s",
 | 
					                                            "set: %s\nnew: %s",
 | 
				
			||||||
                                            path, str(posiset), str(new))
 | 
					                                            path, str(posiset), str(new))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # check bulkdata
 | 
					 | 
				
			||||||
                self.check_bulkdata(sid, path, bulk)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # Check that we can open bulkdata
 | 
					 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
 | 
					                    # Check bulkdata
 | 
				
			||||||
 | 
					                    self.check_bulkdata(sid, path, bulk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # Check that we can open bulkdata
 | 
				
			||||||
                    tab = nilmdb.server.bulkdata.Table(bulk)
 | 
					                    tab = nilmdb.server.bulkdata.Table(bulk)
 | 
				
			||||||
                except Exception as e:  # pragma: no cover --
 | 
					                except FsckFormatError as e:
 | 
				
			||||||
                    # No coverage here because, in the current code,
 | 
					                    # If there are no files except _format, try deleting
 | 
				
			||||||
                    # everything that would cause the bulkdata to fail
 | 
					                    # the entire stream; this may remove metadata, but
 | 
				
			||||||
                    # has been already checked.
 | 
					                    # it's probably unimportant.
 | 
				
			||||||
 | 
					                    files = list(os.listdir(bulk))
 | 
				
			||||||
 | 
					                    if len(files) > 1:
 | 
				
			||||||
 | 
					                        raise FsckFormatError(f"{path}: can't load _format, "
 | 
				
			||||||
 | 
					                                              f"but data is also present")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # Since the stream was empty, just remove it
 | 
				
			||||||
 | 
					                    self.fix_remove_stream(sid, path, bulk,
 | 
				
			||||||
 | 
					                                           "empty, with corrupted format file")
 | 
				
			||||||
 | 
					                except FsckError as e:
 | 
				
			||||||
 | 
					                    raise e
 | 
				
			||||||
 | 
					                except Exception as e: # pragma: no cover
 | 
				
			||||||
 | 
					                    # No coverage because this is an unknown/unexpected error
 | 
				
			||||||
                    raise FsckError("%s: can't open bulkdata: %s",
 | 
					                    raise FsckError("%s: can't open bulkdata: %s",
 | 
				
			||||||
                                    path, str(e))
 | 
					                                    path, str(e))
 | 
				
			||||||
                tab.close()
 | 
					                tab.close()
 | 
				
			||||||
@@ -256,21 +272,25 @@ class Fsck(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @retry_if_raised(RetryFsck)
 | 
					    @retry_if_raised(RetryFsck)
 | 
				
			||||||
    def check_bulkdata(self, sid, path, bulk):
 | 
					    def check_bulkdata(self, sid, path, bulk):
 | 
				
			||||||
        with open(os.path.join(bulk, b"_format"), "rb") as f:
 | 
					        try:
 | 
				
			||||||
            fmt = pickle.load(f)
 | 
					            with open(os.path.join(bulk, b"_format"), "rb") as f:
 | 
				
			||||||
 | 
					                fmt = pickle.load(f)
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            raise FsckFormatError(f"{path}: can't load _format file ({e})")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if fmt["version"] != 3:
 | 
					        if fmt["version"] != 3:
 | 
				
			||||||
            raise FsckError("%s: bad or unsupported bulkdata version %d",
 | 
					            raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
 | 
				
			||||||
                            path, fmt["version"])
 | 
					                                  path, fmt["version"])
 | 
				
			||||||
        rows_per_file = int(fmt["rows_per_file"])
 | 
					        rows_per_file = int(fmt["rows_per_file"])
 | 
				
			||||||
        if rows_per_file < 1:
 | 
					        if rows_per_file < 1:
 | 
				
			||||||
            raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
 | 
					            raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
 | 
				
			||||||
        files_per_dir = int(fmt["files_per_dir"])
 | 
					        files_per_dir = int(fmt["files_per_dir"])
 | 
				
			||||||
        if files_per_dir < 1:
 | 
					        if files_per_dir < 1:
 | 
				
			||||||
            raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
 | 
					            raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
 | 
				
			||||||
        layout = fmt["layout"]
 | 
					        layout = fmt["layout"]
 | 
				
			||||||
        if layout != self.stream_layout[sid]:
 | 
					        if layout != self.stream_layout[sid]:
 | 
				
			||||||
            raise FsckError("%s: layout mismatch %s != %s", path,
 | 
					            raise FsckFormatError("%s: layout mismatch %s != %s", path,
 | 
				
			||||||
                            layout, self.stream_layout[sid])
 | 
					                                  layout, self.stream_layout[sid])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Every file should have a size that's the multiple of the row size
 | 
					        # Every file should have a size that's the multiple of the row size
 | 
				
			||||||
        rkt = nilmdb.server.rocket.Rocket(layout, None)
 | 
					        rkt = nilmdb.server.rocket.Rocket(layout, None)
 | 
				
			||||||
@@ -287,7 +307,7 @@ class Fsck(object):
 | 
				
			|||||||
            files = list(filter(regex.search, os.listdir(subpath)))
 | 
					            files = list(filter(regex.search, os.listdir(subpath)))
 | 
				
			||||||
            if not files:
 | 
					            if not files:
 | 
				
			||||||
                self.fix_empty_subdir(subpath)
 | 
					                self.fix_empty_subdir(subpath)
 | 
				
			||||||
                raise RetryFsck  # pragma: no cover; raised by fix_empty_subdir
 | 
					
 | 
				
			||||||
            # Verify that their size is a multiple of the row size
 | 
					            # Verify that their size is a multiple of the row size
 | 
				
			||||||
            for filename in files:
 | 
					            for filename in files:
 | 
				
			||||||
                filepath = os.path.join(subpath, filename)
 | 
					                filepath = os.path.join(subpath, filename)
 | 
				
			||||||
@@ -328,6 +348,24 @@ class Fsck(object):
 | 
				
			|||||||
            f.truncate(newsize)
 | 
					            f.truncate(newsize)
 | 
				
			||||||
            raise RetryFsck
 | 
					            raise RetryFsck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def fix_remove_stream(self, sid, path, bulk, reason):
 | 
				
			||||||
 | 
					        msg = f"stream {path} is corrupted: {reason}"
 | 
				
			||||||
 | 
					        if not self.fix:
 | 
				
			||||||
 | 
					            raise FixableFsckError(msg)
 | 
				
			||||||
 | 
					        # Remove the stream from disk and the database
 | 
				
			||||||
 | 
					        err(f"\n{msg}\n")
 | 
				
			||||||
 | 
					        err(f"Removing stream {path} from disk and database\n")
 | 
				
			||||||
 | 
					        shutil.rmtree(bulk)
 | 
				
			||||||
 | 
					        with self.sql:
 | 
				
			||||||
 | 
					            cur = self.sql.cursor()
 | 
				
			||||||
 | 
					            cur.execute("DELETE FROM streams WHERE id=?",
 | 
				
			||||||
 | 
					                        (sid,))
 | 
				
			||||||
 | 
					            if cur.rowcount != 1:  # pragma: no cover (shouldn't fail)
 | 
				
			||||||
 | 
					                raise FsckError("failed to remove stream")
 | 
				
			||||||
 | 
					            cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
 | 
				
			||||||
 | 
					            cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
 | 
				
			||||||
 | 
					        raise RetryFsck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ### Check interval endpoints
 | 
					    ### Check interval endpoints
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def check_intervals(self):
 | 
					    def check_intervals(self):
 | 
				
			||||||
@@ -364,7 +402,7 @@ class Fsck(object):
 | 
				
			|||||||
                erow = tab[epos-1]  # noqa: F841 unused
 | 
					                erow = tab[epos-1]  # noqa: F841 unused
 | 
				
			||||||
            except Exception as e:
 | 
					            except Exception as e:
 | 
				
			||||||
                self.fix_bad_interval(sid, intv, tab, str(e))
 | 
					                self.fix_bad_interval(sid, intv, tab, str(e))
 | 
				
			||||||
                raise RetryFsck  # pragma: no cover; raised by fix_bad_interval
 | 
					
 | 
				
			||||||
        return len(ints)
 | 
					        return len(ints)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def fix_bad_interval(self, sid, intv, tab, msg):
 | 
					    def fix_bad_interval(self, sid, intv, tab, msg):
 | 
				
			||||||
@@ -401,8 +439,8 @@ class Fsck(object):
 | 
				
			|||||||
        # Otherwise, the only hope is to delete the interval entirely.
 | 
					        # Otherwise, the only hope is to delete the interval entirely.
 | 
				
			||||||
        err("*** Deleting the entire interval from SQL.\n")
 | 
					        err("*** Deleting the entire interval from SQL.\n")
 | 
				
			||||||
        err("This may leave stale data on disk.  To fix that, copy all\n")
 | 
					        err("This may leave stale data on disk.  To fix that, copy all\n")
 | 
				
			||||||
        err("data from this stream to a new stream, then remove all data\n")
 | 
					        err("data from this stream to a new stream using nilm-copy, then\n")
 | 
				
			||||||
        err("from and destroy %s.\n", path)
 | 
					        err("remove all data from and destroy %s.\n", path)
 | 
				
			||||||
        with self.sql:
 | 
					        with self.sql:
 | 
				
			||||||
            cur = self.sql.cursor()
 | 
					            cur = self.sql.cursor()
 | 
				
			||||||
            cur.execute("DELETE FROM ranges WHERE "
 | 
					            cur.execute("DELETE FROM ranges WHERE "
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -293,8 +293,8 @@ class Table():
 | 
				
			|||||||
            "layout": layout,
 | 
					            "layout": layout,
 | 
				
			||||||
            "version": 3
 | 
					            "version": 3
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        with open(os.path.join(root, b"_format"), "wb") as f:
 | 
					        nilmdb.utils.atomic.replace_file(
 | 
				
			||||||
            pickle.dump(fmt, f, 2)
 | 
					            os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Normal methods
 | 
					    # Normal methods
 | 
				
			||||||
    def __init__(self, root, initial_nrows=0):
 | 
					    def __init__(self, root, initial_nrows=0):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -163,3 +163,8 @@ class TestFsck(object):
 | 
				
			|||||||
            raise Exception("hi")
 | 
					            raise Exception("hi")
 | 
				
			||||||
        with assert_raises(Exception):
 | 
					        with assert_raises(Exception):
 | 
				
			||||||
            foo()
 | 
					            foo()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.failmsg("test2v", "can't load _format, but data is also present")
 | 
				
			||||||
 | 
					        self.failmsg("test2v1", "bad bulkdata table")
 | 
				
			||||||
 | 
					        self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
 | 
				
			||||||
 | 
					        self.okmsg("test2v2", "empty, with corrupted format file")
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user