#!/usr/bin/env python3 # write-failure-test.py - Make sure the test will fail t import sys import os import atexit import filecmp import shutil import tempfile import subprocess import unittest from basetest import Timew, TestCase from basetest.utils import run_cmd_wait_nofail # Ensure python finds the local simpletap module sys.path.append(os.path.dirname(os.path.abspath(__file__))) class WrappedTimew(Timew): def __init__(self): try: self._fiu_path = subprocess.check_output("command -v fiu-run", shell=True).strip() except: raise unittest.SkipTest("fiu-run is not installed") super().__init__() def __call__(self, *args, **kwargs): """aka t = Timew() ; t() which is now an alias to t.runSuccess()""" return self.run(*args, **kwargs) def run(self, args="", input=None, merge_streams=False, timeout=5): """Invoke timew with given arguments If you wish to pass instructions to timew such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every timew call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # This will run timewarrior with a random chance that fputs will return # error 28, "No space left on device", in order to simulate running out # of space on the drive the database is on. It is random in order to # catch different places in the code that are writing as part of the # update (tags, undo, datafiles, configs, etc..) command = [ self._fiu_path, "-x", "-c", "enable_random name=posix/stdio/gp/fputs,probability=0.05,failinfo=28", "-c", "enable_random name=posix/io/rw/write,probability=0.05,failinfo=28", "-c", "enable_random name=posix/io/rw/pwrite,probability=0.05,failinfo=28", "-c", "enable_random name=posix/io/rw/writev,probability=0.05,failinfo=28", "-c", "enable_random name=posix/io/rw/pwritev,probability=0.05,failinfo=28", ] command.extend(self._command[:]) args = self._split_string_args_if_string(args) command.extend(args) return run_cmd_wait_nofail(command, input, merge_streams=merge_streams, env=self.env, timeout=timeout) class TestWriteFailure(TestCase): def setUp(self): self._goldendir = tempfile.mkdtemp(prefix="timew_golden_") self.t = WrappedTimew() def tearDown(self): shutil.rmtree(self._goldendir) def save_database(self): shutil.rmtree(self._goldendir) shutil.copytree(self.t.datadir, self._goldendir) def compare_dirs(self, dir1, dir2): compared = filecmp.dircmp(dir1, dir2) if (compared.left_only or compared.right_only or compared.diff_files or compared.funny_files): return False for subdir in compared.common_dirs: if not self.compare_dirs(os.path.join(dir1, subdir), os.path.join(dir2, subdir)): return False return True def test_write_failures_do_not_corrupt_database(self): """write failures shall not corrupt the database""" # First prepopulate the database start = 10 for x in range(start, 0, -1): self.t.runSuccess("track tag{0} {1}min ago".format((start + 1)-x, x)) self.save_database() # Now start in with the failures and make sure the database compares # equally TAG_COUNT = 30 error_count = 0 success_count = 0 MAX_ERROR_COUNT=250 for x in range(TAG_COUNT, 0, -1): while True: exitcode, stdout, stderr = self.t("track tag-{0} {1}s ago".format((TAG_COUNT+1)-x, x)) if exitcode != 0: error_count += 1 if error_count >= MAX_ERROR_COUNT: break if not self.compare_dirs(self.t.datadir, self._goldendir): filecmp.dircmp(self.t.datadir, self._goldendir).report_full_closure() self.fail("{0} not equal to {1}".format(self.t.datadir, self._goldendir)) else: success_count += 1 break if error_count >= MAX_ERROR_COUNT: break self.save_database() self.t.runSuccess("export") print("Test pass. Timew returned an error on {0} runs and none on {1} runs.".format(error_count, success_count)) if __name__ == "__main__": from simpletap import TAPTestRunner unittest.main(testRunner=TAPTestRunner()) #unittest.main()