diff --git a/test/write-failure.t b/test/write-failure.t new file mode 100755 index 00000000..27101ee6 --- /dev/null +++ b/test/write-failure.t @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +# write-failure-test.py - Make sure the test will fail t + +import sys +import os + +import atexit +import filecmp +import shutil +import tempfile +import subprocess +import unittest + +from basetest import Timew, TestCase +from basetest.utils import run_cmd_wait_nofail + +# Ensure python finds the local simpletap module +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +class WrappedTimew(Timew): + def __init__(self): + + try: + self._fiu_path = subprocess.check_output("command -v fiu-run", shell=True).strip() + except: + raise unittest.SkipTest("fiu-run is not installed") + + super().__init__() + + def __call__(self, *args, **kwargs): + """aka t = Timew() ; t() which is now an alias to t.runSuccess()""" + return self.run(*args, **kwargs) + + def run(self, args="", input=None, merge_streams=False, timeout=5): + """Invoke timew with given arguments + + If you wish to pass instructions to timew such as confirmations or other + input via stdin, you can do so by providing a input string. + Such as input="y\ny\n". + + If merge_streams=True stdout and stderr will be merged into stdout. + + timeout = number of seconds the test will wait for every timew call. + Defaults to 1 second if not specified. Unit is seconds. + + Returns (exit_code, stdout, stderr) if merge_streams=False + (exit_code, output) if merge_streams=True + """ + + # This will run timewarrior with a random chance that fputs will return + # error 28, "No space left on device", in order to simulate running out + # of space on the drive the database is on. It is random in order to + # catch different places in the code that are writing as part of the + # update (tags, undo, datafiles, configs, etc..) + command = [ + self._fiu_path, "-x", + "-c", "enable_random name=posix/stdio/gp/fputs,probability=0.05,failinfo=28", + "-c", "enable_random name=posix/io/rw/write,probability=0.05,failinfo=28", + "-c", "enable_random name=posix/io/rw/pwrite,probability=0.05,failinfo=28", + "-c", "enable_random name=posix/io/rw/writev,probability=0.05,failinfo=28", + "-c", "enable_random name=posix/io/rw/pwritev,probability=0.05,failinfo=28", + ] + + command.extend(self._command[:]) + + args = self._split_string_args_if_string(args) + command.extend(args) + + return run_cmd_wait_nofail(command, input, + merge_streams=merge_streams, + env=self.env, + timeout=timeout) + + +class TestWriteFailure(TestCase): + + def setUp(self): + self._goldendir = tempfile.mkdtemp(prefix="timew_golden_") + self.t = WrappedTimew() + + def tearDown(self): + shutil.rmtree(self._goldendir) + + def save_database(self): + shutil.rmtree(self._goldendir) + shutil.copytree(self.t.datadir, self._goldendir) + + def compare_dirs(self, dir1, dir2): + compared = filecmp.dircmp(dir1, dir2) + if (compared.left_only or compared.right_only or + compared.diff_files or compared.funny_files): + return False + + for subdir in compared.common_dirs: + if not self.compare_dirs(os.path.join(dir1, subdir), + os.path.join(dir2, subdir)): + return False + + return True + + def test_write_failures_do_not_corrupt_database(self): + """write failures shall not corrupt the database""" + + # First prepopulate the database + start = 10 + for x in range(start, 0, -1): + self.t.runSuccess("track tag{0} {1}min ago".format((start + 1)-x, x)) + + self.save_database() + + # Now start in with the failures and make sure the database compares + # equally + TAG_COUNT = 30 + error_count = 0 + success_count = 0 + MAX_ERROR_COUNT=250 + for x in range(TAG_COUNT, 0, -1): + while True: + exitcode, stdout, stderr = self.t("track tag-{0} {1}s ago".format((TAG_COUNT+1)-x, x)) + if exitcode != 0: + error_count += 1 + if error_count >= MAX_ERROR_COUNT: + break + + if not self.compare_dirs(self.t.datadir, self._goldendir): + filecmp.dircmp(self.t.datadir, self._goldendir).report_full_closure() + self.fail("{0} not equal to {1}".format(self.t.datadir, self._goldendir)) + else: + success_count += 1 + break + + if error_count >= MAX_ERROR_COUNT: + break + + self.save_database() + + self.t.runSuccess("export") + print("Test pass. Timew returned an error on {0} runs and none on {1} runs.".format(error_count, success_count)) + +if __name__ == "__main__": + from simpletap import TAPTestRunner + + unittest.main(testRunner=TAPTestRunner()) + #unittest.main()