test: Add write-failure.t

This test uses libfiu to create random failures in fputs and the write system
calls to verify out-of-space errors on the filesystem do not corrupt the
database.

Related to #155
This commit is contained in:
Shaun Ruffell 2020-02-16 12:52:16 -06:00 committed by lauft
parent 4798e6f26b
commit 926ecd7333

144
test/write-failure.t Executable file
View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
# write-failure-test.py - Make sure the test will fail t
import sys
import os
import atexit
import filecmp
import shutil
import tempfile
import subprocess
import unittest
from basetest import Timew, TestCase
from basetest.utils import run_cmd_wait_nofail
# Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
class WrappedTimew(Timew):
def __init__(self):
try:
self._fiu_path = subprocess.check_output("command -v fiu-run", shell=True).strip()
except:
raise unittest.SkipTest("fiu-run is not installed")
super().__init__()
def __call__(self, *args, **kwargs):
"""aka t = Timew() ; t() which is now an alias to t.runSuccess()"""
return self.run(*args, **kwargs)
def run(self, args="", input=None, merge_streams=False, timeout=5):
"""Invoke timew with given arguments
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# This will run timewarrior with a random chance that fputs will return
# error 28, "No space left on device", in order to simulate running out
# of space on the drive the database is on. It is random in order to
# catch different places in the code that are writing as part of the
# update (tags, undo, datafiles, configs, etc..)
command = [
self._fiu_path, "-x",
"-c", "enable_random name=posix/stdio/gp/fputs,probability=0.05,failinfo=28",
"-c", "enable_random name=posix/io/rw/write,probability=0.05,failinfo=28",
"-c", "enable_random name=posix/io/rw/pwrite,probability=0.05,failinfo=28",
"-c", "enable_random name=posix/io/rw/writev,probability=0.05,failinfo=28",
"-c", "enable_random name=posix/io/rw/pwritev,probability=0.05,failinfo=28",
]
command.extend(self._command[:])
args = self._split_string_args_if_string(args)
command.extend(args)
return run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
class TestWriteFailure(TestCase):
def setUp(self):
self._goldendir = tempfile.mkdtemp(prefix="timew_golden_")
self.t = WrappedTimew()
def tearDown(self):
shutil.rmtree(self._goldendir)
def save_database(self):
shutil.rmtree(self._goldendir)
shutil.copytree(self.t.datadir, self._goldendir)
def compare_dirs(self, dir1, dir2):
compared = filecmp.dircmp(dir1, dir2)
if (compared.left_only or compared.right_only or
compared.diff_files or compared.funny_files):
return False
for subdir in compared.common_dirs:
if not self.compare_dirs(os.path.join(dir1, subdir),
os.path.join(dir2, subdir)):
return False
return True
def test_write_failures_do_not_corrupt_database(self):
"""write failures shall not corrupt the database"""
# First prepopulate the database
start = 10
for x in range(start, 0, -1):
self.t.runSuccess("track tag{0} {1}min ago".format((start + 1)-x, x))
self.save_database()
# Now start in with the failures and make sure the database compares
# equally
TAG_COUNT = 30
error_count = 0
success_count = 0
MAX_ERROR_COUNT=250
for x in range(TAG_COUNT, 0, -1):
while True:
exitcode, stdout, stderr = self.t("track tag-{0} {1}s ago".format((TAG_COUNT+1)-x, x))
if exitcode != 0:
error_count += 1
if error_count >= MAX_ERROR_COUNT:
break
if not self.compare_dirs(self.t.datadir, self._goldendir):
filecmp.dircmp(self.t.datadir, self._goldendir).report_full_closure()
self.fail("{0} not equal to {1}".format(self.t.datadir, self._goldendir))
else:
success_count += 1
break
if error_count >= MAX_ERROR_COUNT:
break
self.save_database()
self.t.runSuccess("export")
print("Test pass. Timew returned an error on {0} runs and none on {1} runs.".format(error_count, success_count))
if __name__ == "__main__":
from simpletap import TAPTestRunner
unittest.main(testRunner=TAPTestRunner())
#unittest.main()