Tests: Inherited basetest class from Taskwarrior

This commit is contained in:
Paul Beckingham 2016-03-01 00:21:22 -05:00
parent dc42d2038e
commit 84e2579e03
7 changed files with 731 additions and 0 deletions

View file

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from .testing import TestCase
from .timew import Timew
# flake8:noqa
# vim: ai sts=4 et sw=4

9
test/basetest/compat.py Normal file
View file

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
try:
STRING_TYPE = basestring
except NameError:
# Python 3
STRING_TYPE = str
# vim: ai sts=4 et sw=4

View file

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
import signal
sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
class CommandError(Exception):
def __init__(self, cmd, code, out, err=None, msg=None):
DEFAULT = ("Command '{{0}}' was {signal}'ed. "
"SIGABRT usually means program timed out.\n")
if msg is None:
msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
if err is not None:
msg_suffix += (
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
)
if code < 0:
self.msg = DEFAULT.format(signal=sig_names[abs(code)])
else:
self.msg = ("Command '{0}' finished with unexpected exit "
"code '{1}'.\n")
self.msg += msg_suffix
else:
self.msg = msg
self.cmd = cmd
self.out = out
self.err = err
self.code = code
def __str__(self):
return self.msg.format(self.cmd, self.code, self.out, self.err)
class HookError(Exception):
pass
class TimeoutWaitingFor(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return "*** Timeout reached while waiting for {0} ***".format(
self.name)
class StreamsAreMerged(object):
def __repr__(self):
return "*** Streams are merged, STDERR is not available ***"
# vim: ai sts=4 et sw=4

40
test/basetest/meta.py Normal file
View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, division
class MetaTest(type):
"""Helper metaclass to simplify dynamic test creation
Creates test_methods in the TestCase class dynamically named after the
arguments used.
"""
@staticmethod
def make_function(classname, *args, **kwargs):
def test(self):
# ### Body of the usual test_testcase ### #
# Override and redefine this method #
pass
# Title of test in report
test.__doc__ = "{0}".format(args[0])
return test
def __new__(meta, classname, bases, dct):
tests = dct.get("TESTS")
kwargs = dct.get("EXTRA", {})
for i, args in enumerate(tests):
func = meta.make_function(classname, *args, **kwargs)
# Rename the function after a unique identifier
# Name of function must start with test_ to be ran by unittest
func.__name__ = "test_{0}".format(i)
# Attach the new test to the testclass
dct[func.__name__] = func
return super(MetaTest, meta).__new__(meta, classname, bases, dct)
# vim: ai sts=4 et sw=4

16
test/basetest/testing.py Normal file
View file

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
import unittest
import sys
class BaseTestCase(unittest.TestCase):
def tap(self, out):
sys.stderr.write("--- tap output start ---\n")
for line in out.splitlines():
sys.stderr.write(line + '\n')
sys.stderr.write("--- tap output end ---\n")
class TestCase(BaseTestCase):
pass
# vim: ai sts=4 et sw=4

212
test/basetest/timew.py Normal file
View file

@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
import atexit
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, timew_binary_location
from .compat import STRING_TYPE
class Timew(object):
"""Manage a Timewarrior instance
A temporary folder is used as data store of timewarrior.
A timew client should not be used after being destroyed.
"""
DEFAULT_TIMEW = timew_binary_location()
def __init__(self, timew=DEFAULT_TIMEW):
"""Initialize a timewarrior (client).
The program runs in a temporary folder.
:arg timew: Timewarrior binary to use as client (defaults: timew in PATH)
"""
self.timew = timew
# Used to specify what command to launch (and to inject faketime)
self._command = [self.timew]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = tempfile.mkdtemp(prefix="timew_")
self.timewrc = os.path.join(self.datadir, "test.rc")
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
def __repr__(self):
txt = super(Timew, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"aka t = Timew() ; t() which is now an alias to t.runSuccess()"
return self.runSuccess(*args, **kwargs)
def activate_hooks(self):
"""Enable self.hooks functionality and activate hooks on config
"""
self.config("hooks", "on")
self.hooks = Hooks(self.datadir)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test
"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# As well as TIMEWRC
self.env["TIMEWRC"] = self.timewrc
def config(self, var, value):
"""Run setup `var` as `value` in timew config
"""
# Add -- to avoid misinterpretation of - in things like UUIDs
cmd = (self.timew, "config", "--", var, value)
return run_cmd_wait(cmd, env=self.env, input="y\n")
def del_config(self, var):
"""Remove `var` from timew config
"""
cmd = (self.timew, "config", var)
return run_cmd_wait(cmd, env=self.env, input="y\n")
@property
def timewrc_content(self):
"""
Returns the contents of the timewrc file.
"""
with open(self.timewrc, "r") as f:
return f.readlines()
@staticmethod
def _split_string_args_if_string(args):
"""Helper function to parse and split into arguments a single string
argument. The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, STRING_TYPE):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False,
timeout=5):
"""Invoke timew with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""Invoke timew with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances
"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == 2:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Program instance has been destroyed. "
"Create a new instance if you need a new client.")
def faketime(self, faketime=None):
"""Set a faketime using libfaketime that will affect the following
command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
# vim: ai sts=4 et sw=4

392
test/basetest/utils.py Normal file
View file

@ -0,0 +1,392 @@
# -*- coding: utf-8 -*-
from __future__ import division
import os
import sys
import socket
import signal
import functools
import atexit
import tempfile
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
from time import sleep
try:
import simplejson as json
except ImportError:
import json
from .exceptions import CommandError, TimeoutWaitingFor
ON_POSIX = 'posix' in sys.builtin_module_names
# Directory relative to basetest module location
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Location of binary files (usually the src/ folder)
BIN_PREFIX = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "..", "src")
)
# Default location of test certificates
DEFAULT_CERT_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_certs")
)
# Default location of test hooks
DEFAULT_HOOK_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_hooks")
)
# Environment flags to control skipping of timew tests
TIMEW_SKIP = os.environ.get("TIMEW_SKIP", False)
# Environment flags to control use of PATH or in-tree binaries
TIMEW_USE_PATH = os.environ.get("TIMEW_USE_PATH", False)
UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}")
def timew_binary_location(cmd="timew"):
""" ../src/ is used by default.
"""
return os.path.join(BIN_PREFIX, cmd)
return binary_location(cmd, TIMEW_USE_PATH)
def binary_location(cmd, USE_PATH=False):
""" ../src/ is used by default.
"""
return os.path.join(BIN_PREFIX, cmd)
def wait_condition(cond, timeout=1, sleeptime=.01):
"""Wait for condition to return anything other than None
"""
# NOTE Increasing sleeptime can dramatically increase testsuite runtime
# It also reduces CPU load significantly
if timeout is None:
timeout = 1
if timeout < sleeptime:
print("Warning, timeout cannot be smaller than", sleeptime)
timeout = sleeptime
# Max number of attempts until giving up
tries = int(timeout / sleeptime)
for i in range(tries):
val = cond()
if val is not None:
break
sleep(sleeptime)
return val
def wait_process(pid, timeout=None):
"""Wait for process to finish
"""
def process():
try:
os.kill(pid, 0)
except OSError:
# Process is dead
return True
else:
# Process is still ticking
return None
return wait_condition(process, timeout)
def _queue_output(arguments, pidq, outputq):
"""Read/Write output/input of given process.
This function is meant to be executed in a thread as it may block
"""
kwargs = arguments["process"]
input = arguments["input"]
try:
proc = Popen(**kwargs)
except OSError as e:
# pid None is read by the main thread as a crash of the process
pidq.put(None)
outputq.put((
"",
("Unexpected exception caught during execution: '{0}' . ".format(e)),
255)) # false exitcode
return
# Put the PID in the queue for main process to know.
pidq.put(proc.pid)
# Send input and wait for finish
out, err = proc.communicate(input)
if sys.version_info > (3,):
out, err = out.decode('utf-8'), err.decode('utf-8')
# Give the output back to the caller
outputq.put((out, err, proc.returncode))
def _retrieve_output(thread, timeout, queue, thread_error):
"""Fetch output from binary subprocess queues
"""
# Try to join the thread on failure abort
thread.join(timeout)
if thread.isAlive():
# Join should have killed the thread. This is unexpected
raise TimeoutWaitingFor(thread_error + ". Unexpected error")
# Thread died so we should have output
try:
# data = (stdout, stderr, exitcode)
data = queue.get(timeout=timeout)
except Empty:
data = TimeoutWaitingFor("streams from program")
return data
def _get_output(arguments, timeout=None):
"""Collect output from the subprocess without blocking the main process if
subprocess hangs.
"""
# NOTE Increase this value if tests fail with None being received as
# stdout/stderr instead of the expected content
output_timeout = 0.1 # seconds
pidq = Queue()
outputq = Queue()
t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
t.daemon = True
t.start()
try:
pid = pidq.get(timeout=timeout)
except Empty:
pid = None
# Process crashed or timed out for some reason
if pid is None:
return _retrieve_output(t, output_timeout, outputq,
"Program to start")
# Wait for process to finish (normal execution)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq,
"Program thread to join")
# If we reach this point we assume the process got stuck or timed out
for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
# Start with lower signals and escalate if process ignores them
try:
os.kill(pid, signal.SIGABRT)
except OSError as e:
# 3 means the process finished/died between last check and now
if e.errno != 3:
raise
# Wait for process to finish (should die/exit after signal)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq,
"Program to die")
# This should never happen but in case something goes really bad
raise OSError("Program stopped responding and couldn't be killed")
def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
merge_streams=False, env=os.environ, timeout=None):
"Run a subprocess and wait for it to finish"
if input is None:
stdin = None
else:
stdin = PIPE
if merge_streams:
stderr = STDOUT
else:
stderr = PIPE
arguments = {
"process": {
"args": cmd,
"stdin": stdin,
"stdout": stdout,
"stderr": stderr,
"bufsize": 1,
"close_fds": ON_POSIX,
"env": env,
},
"input": input,
}
out, err, exit = _get_output(arguments, timeout)
if merge_streams:
if exit != 0:
raise CommandError(cmd, exit, out)
else:
return exit, out
else:
if exit != 0:
raise CommandError(cmd, exit, out, err)
else:
return exit, out, err
def run_cmd_wait_nofail(*args, **kwargs):
"Same as run_cmd_wait but silence the exception if it happens"
try:
return run_cmd_wait(*args, **kwargs)
except CommandError as e:
return e.code, e.out, e.err
def memoize(obj):
"""Keep an in-memory cache of function results given it's inputs
"""
cache = obj.cache = {}
@functools.wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = obj(*args, **kwargs)
return cache[key]
return memoizer
try:
from shutil import which
which = memoize(which)
except ImportError:
# NOTE: This is shutil.which backported from python-3.3.3
@memoize
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode) and
not os.path.isdir(fn))
# If we're given a path with a directory part, look it up directly
# rather than referring to PATH directories. This includes checking
# relative to the current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None
if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)
if sys.platform == "win32":
# The current directory takes precedence on Windows.
if os.curdir not in path:
path.insert(0, os.curdir)
# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path
# extensions. This will allow us to short circuit when given
# "python.exe". If it does match, only test that one, otherwise we
# have to try others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]
seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if normdir not in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
def parse_datafile(file):
"""Parse .data files, treating files as JSON
"""
data = []
with open(file) as fh:
for line in fh:
line = line.rstrip("\n")
# Turn [] strings into {} to be treated properly as JSON hashes
if line.startswith('[') and line.endswith(']'):
line = '{' + line[1:-1] + '}'
if line.startswith("{"):
data.append(json.loads(line))
else:
data.append(line)
return data
def mkstemp(data):
"""
Create a temporary file that is removed at process exit
"""
def rmtemp(name):
try:
os.remove(name)
except OSError:
pass
f = tempfile.NamedTemporaryFile(delete=False)
f.write(data)
f.close()
# Ensure removal at end of python session
atexit.register(rmtemp, f.name)
return f.name
def mkstemp_exec(data):
"""Create a temporary executable file that is removed at process exit
"""
name = mkstemp(data)
os.chmod(name, 0o755)
return name
# vim: ai sts=4 et sw=4