Tests: Added simpletap and basetest modules

This commit is contained in:
Paul Beckingham 2016-09-02 21:39:02 -04:00
parent 65604e9ff1
commit db73778571
9 changed files with 965 additions and 0 deletions

1
test/.gitignore vendored
View file

@ -1 +1,2 @@
all.log
*.pyc

View file

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from .testing import TestCase
from .tasksh import Tasksh
# flake8:noqa
# vim: ai sts=4 et sw=4

9
test/basetest/compat.py Normal file
View file

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
try:
STRING_TYPE = basestring
except NameError:
# Python 3
STRING_TYPE = str
# vim: ai sts=4 et sw=4

View file

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
import signal
sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
class CommandError(Exception):
def __init__(self, cmd, code, out, err=None, msg=None):
DEFAULT = ("Command '{{0}}' was {signal}'ed. "
"SIGABRT usually means program timed out.\n")
if msg is None:
msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
if err is not None:
msg_suffix += (
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
)
if code < 0:
self.msg = DEFAULT.format(signal=sig_names[abs(code)])
else:
self.msg = ("Command '{0}' finished with unexpected exit "
"code '{1}'.\n")
self.msg += msg_suffix
else:
self.msg = msg
self.cmd = cmd
self.out = out
self.err = err
self.code = code
def __str__(self):
return self.msg.format(self.cmd, self.code, self.out, self.err)
class HookError(Exception):
pass
class TimeoutWaitingFor(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return "*** Timeout reached while waiting for {0} ***".format(
self.name)
class StreamsAreMerged(object):
def __repr__(self):
return "*** Streams are merged, STDERR is not available ***"
# vim: ai sts=4 et sw=4

40
test/basetest/meta.py Normal file
View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, division
class MetaTest(type):
"""Helper metaclass to simplify dynamic test creation
Creates test_methods in the TestCase class dynamically named after the
arguments used.
"""
@staticmethod
def make_function(classname, *args, **kwargs):
def test(self):
# ### Body of the usual test_testcase ### #
# Override and redefine this method #
pass
# Title of test in report
test.__doc__ = "{0}".format(args[0])
return test
def __new__(meta, classname, bases, dct):
tests = dct.get("TESTS")
kwargs = dct.get("EXTRA", {})
for i, args in enumerate(tests):
func = meta.make_function(classname, *args, **kwargs)
# Rename the function after a unique identifier
# Name of function must start with test_ to be ran by unittest
func.__name__ = "test_{0}".format(i)
# Attach the new test to the testclass
dct[func.__name__] = func
return super(MetaTest, meta).__new__(meta, classname, bases, dct)
# vim: ai sts=4 et sw=4

192
test/basetest/tasksh.py Normal file
View file

@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-
import atexit
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, tasksh_binary_location, DEFAULT_EXTENSION_PATH
from .compat import STRING_TYPE
class Tasksh(object):
"""Manage a Tasksh instance
A temporary folder is used as data store of tasksh.
A tasksh client should not be used after being destroyed.
"""
DEFAULT_TASKSH = tasksh_binary_location()
def __init__(self, tasksh=DEFAULT_TASKSH):
"""Initialize a tasksh (client).
The program runs in a temporary folder.
:arg tasksh: Tasksh binary to use as client (defaults: tasksh in PATH)
"""
self.tasksh = tasksh
# Used to specify what command to launch (and to inject faketime)
self._command = [self.tasksh]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = tempfile.mkdtemp(prefix="tasksh_")
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
def add_default_extension(self, filename):
"""Add default extension to current instance
"""
if not os.path.isdir(self.extdir):
os.mkdir(self.extdir)
extfile = os.path.join(self.extdir, filename)
if os.path.isfile(extfile):
raise "{} already exists".format(extfile)
shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile)
def __repr__(self):
txt = super(Tasksh, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"aka t = Tasksh() ; t() which is now an alias to t.runSuccess()"
return self.runSuccess(*args, **kwargs)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test
"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
@staticmethod
def _split_string_args_if_string(args):
"""Helper function to parse and split into arguments a single string
argument. The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, STRING_TYPE):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False,
timeout=5):
"""Invoke tasksh with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to tasksh such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every tasksh call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""Invoke tasksh with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to tasksh such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every tasksh call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances
"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == 2:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Program instance has been destroyed. "
"Create a new instance if you need a new client.")
def faketime(self, faketime=None):
"""Set a faketime using libfaketime that will affect the following
command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
# vim: ai sts=4 et sw=4

16
test/basetest/testing.py Normal file
View file

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
import unittest
import sys
class BaseTestCase(unittest.TestCase):
def tap(self, out):
sys.stderr.write("--- tap output start ---\n")
for line in out.splitlines():
sys.stderr.write(line + '\n')
sys.stderr.write("--- tap output end ---\n")
class TestCase(BaseTestCase):
pass
# vim: ai sts=4 et sw=4

392
test/basetest/utils.py Normal file
View file

@ -0,0 +1,392 @@
# -*- coding: utf-8 -*-
from __future__ import division
import os
import sys
import socket
import signal
import functools
import atexit
import tempfile
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
from time import sleep
try:
import simplejson as json
except ImportError:
import json
from .exceptions import CommandError, TimeoutWaitingFor
ON_POSIX = 'posix' in sys.builtin_module_names
# Directory relative to basetest module location
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Location of binary files (usually the src/ folder)
BIN_PREFIX = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "..", "src")
)
# Default location of test certificates
DEFAULT_CERT_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_certs")
)
# Default location of test extensions
DEFAULT_EXTENSION_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_extensions")
)
# Environment flags to control skipping of tasksh tests
TASKSH_SKIP = os.environ.get("TASKSH_SKIP", False)
# Environment flags to control use of PATH or in-tree binaries
TASKSH_USE_PATH = os.environ.get("TASKSH_USE_PATH", False)
UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}")
def tasksh_binary_location(cmd="tasksh"):
""" ../src/ is used by default.
"""
return os.path.join(BIN_PREFIX, cmd)
return binary_location(cmd, TASKSH_USE_PATH)
def binary_location(cmd, USE_PATH=False):
""" ../src/ is used by default.
"""
return os.path.join(BIN_PREFIX, cmd)
def wait_condition(cond, timeout=1, sleeptime=.01):
"""Wait for condition to return anything other than None
"""
# NOTE Increasing sleeptime can dramatically increase testsuite runtime
# It also reduces CPU load significantly
if timeout is None:
timeout = 1
if timeout < sleeptime:
print("Warning, timeout cannot be smaller than", sleeptime)
timeout = sleeptime
# Max number of attempts until giving up
tries = int(timeout / sleeptime)
for i in range(tries):
val = cond()
if val is not None:
break
sleep(sleeptime)
return val
def wait_process(pid, timeout=None):
"""Wait for process to finish
"""
def process():
try:
os.kill(pid, 0)
except OSError:
# Process is dead
return True
else:
# Process is still ticking
return None
return wait_condition(process, timeout)
def _queue_output(arguments, pidq, outputq):
"""Read/Write output/input of given process.
This function is meant to be executed in a thread as it may block
"""
kwargs = arguments["process"]
input = arguments["input"]
try:
proc = Popen(**kwargs)
except OSError as e:
# pid None is read by the main thread as a crash of the process
pidq.put(None)
outputq.put((
"",
("Unexpected exception caught during execution: '{0}' . ".format(e)),
255)) # false exitcode
return
# Put the PID in the queue for main process to know.
pidq.put(proc.pid)
# Send input and wait for finish
out, err = proc.communicate(input)
if sys.version_info > (3,):
out, err = out.decode('utf-8'), err.decode('utf-8')
# Give the output back to the caller
outputq.put((out, err, proc.returncode))
def _retrieve_output(thread, timeout, queue, thread_error):
"""Fetch output from binary subprocess queues
"""
# Try to join the thread on failure abort
thread.join(timeout)
if thread.isAlive():
# Join should have killed the thread. This is unexpected
raise TimeoutWaitingFor(thread_error + ". Unexpected error")
# Thread died so we should have output
try:
# data = (stdout, stderr, exitcode)
data = queue.get(timeout=timeout)
except Empty:
data = TimeoutWaitingFor("streams from program")
return data
def _get_output(arguments, timeout=None):
"""Collect output from the subprocess without blocking the main process if
subprocess hangs.
"""
# NOTE Increase this value if tests fail with None being received as
# stdout/stderr instead of the expected content
output_timeout = 0.1 # seconds
pidq = Queue()
outputq = Queue()
t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
t.daemon = True
t.start()
try:
pid = pidq.get(timeout=timeout)
except Empty:
pid = None
# Process crashed or timed out for some reason
if pid is None:
return _retrieve_output(t, output_timeout, outputq,
"Program to start")
# Wait for process to finish (normal execution)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq,
"Program thread to join")
# If we reach this point we assume the process got stuck or timed out
for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
# Start with lower signals and escalate if process ignores them
try:
os.kill(pid, signal.SIGABRT)
except OSError as e:
# 3 means the process finished/died between last check and now
if e.errno != 3:
raise
# Wait for process to finish (should die/exit after signal)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq,
"Program to die")
# This should never happen but in case something goes really bad
raise OSError("Program stopped responding and couldn't be killed")
def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
merge_streams=False, env=os.environ, timeout=None):
"Run a subprocess and wait for it to finish"
if input is None:
stdin = None
else:
stdin = PIPE
if merge_streams:
stderr = STDOUT
else:
stderr = PIPE
arguments = {
"process": {
"args": cmd,
"stdin": stdin,
"stdout": stdout,
"stderr": stderr,
"bufsize": 1,
"close_fds": ON_POSIX,
"env": env,
},
"input": input,
}
out, err, exit = _get_output(arguments, timeout)
if merge_streams:
if exit != 0:
raise CommandError(cmd, exit, out)
else:
return exit, out
else:
if exit != 0:
raise CommandError(cmd, exit, out, err)
else:
return exit, out, err
def run_cmd_wait_nofail(*args, **kwargs):
"Same as run_cmd_wait but silence the exception if it happens"
try:
return run_cmd_wait(*args, **kwargs)
except CommandError as e:
return e.code, e.out, e.err
def memoize(obj):
"""Keep an in-memory cache of function results given it's inputs
"""
cache = obj.cache = {}
@functools.wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = obj(*args, **kwargs)
return cache[key]
return memoizer
try:
from shutil import which
which = memoize(which)
except ImportError:
# NOTE: This is shutil.which backported from python-3.3.3
@memoize
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode) and
not os.path.isdir(fn))
# If we're given a path with a directory part, look it up directly
# rather than referring to PATH directories. This includes checking
# relative to the current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None
if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)
if sys.platform == "win32":
# The current directory takes precedence on Windows.
if os.curdir not in path:
path.insert(0, os.curdir)
# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path
# extensions. This will allow us to short circuit when given
# "python.exe". If it does match, only test that one, otherwise we
# have to try others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]
seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if normdir not in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
def parse_datafile(file):
"""Parse .data files, treating files as JSON
"""
data = []
with open(file) as fh:
for line in fh:
line = line.rstrip("\n")
# Turn [] strings into {} to be treated properly as JSON hashes
if line.startswith('[') and line.endswith(']'):
line = '{' + line[1:-1] + '}'
if line.startswith("{"):
data.append(json.loads(line))
else:
data.append(line)
return data
def mkstemp(data):
"""
Create a temporary file that is removed at process exit
"""
def rmtemp(name):
try:
os.remove(name)
except OSError:
pass
f = tempfile.NamedTemporaryFile(delete=False)
f.write(data)
f.close()
# Ensure removal at end of python session
atexit.register(rmtemp, f.name)
return f.name
def mkstemp_exec(data):
"""Create a temporary executable file that is removed at process exit
"""
name = mkstemp(data)
os.chmod(name, 0o755)
return name
# vim: ai sts=4 et sw=4

253
test/simpletap/__init__.py Normal file
View file

@ -0,0 +1,253 @@
###############################################################################
#
# Copyright 2006-2016, Paul Beckingham, Federico Hernandez.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# http://www.opensource.org/licenses/mit-license.php
#
###############################################################################
# Original version by Renato Alves
import os
import sys
import unittest
import warnings
import traceback
import inspect
def color(text, c):
"""
Add color on the keyword that identifies the state of the test
"""
if sys.stdout.isatty():
clear = "\033[0m"
colors = {
"red": "\033[1m\033[91m",
"yellow": "\033[1m\033[93m",
"green": "\033[1m\033[92m",
}
return colors[c] + text + clear
else:
return text
class TAPTestResult(unittest.result.TestResult):
def __init__(self, stream, descriptions, verbosity):
super(TAPTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.descriptions = descriptions
self.verbosity = verbosity
# Buffer stdout and stderr
self.buffer = True
def getDescription(self, test):
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return doc_first_line
else:
try:
method = test._testMethodName
except AttributeError:
return "Preparation error on: {0}".format(test.description)
else:
return "{0} ({1})".format(method, test.__class__.__name__)
def startTestRun(self, total="unk"):
self.stream.writeln("1..{0}".format(total))
def stopTest(self, test):
"""Prevent flushing of stdout/stderr buffers until later"""
pass
def _restoreStdout(self):
"""Restore sys.stdout and sys.stderr, don't merge buffered output yet
"""
if self.buffer:
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
@staticmethod
def _do_stream(data, stream):
"""Helper function for _mergeStdout"""
for line in data.splitlines(True):
# newlines should be taken literally and be comments in TAP
line = line.replace("\\n", "\n# ")
# Add a comment sign before each line
if line.startswith("#"):
stream.write(line)
else:
stream.write("# " + line)
if not line.endswith('\n'):
stream.write('\n')
def _mergeStdout(self):
"""Merge buffered output with main streams
"""
if self.buffer:
output = self._stdout_buffer.getvalue()
error = self._stderr_buffer.getvalue()
if output:
self._do_stream(output, sys.stdout)
if error:
self._do_stream(error, sys.stderr)
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
# Needed to fix the stopTest override
self._mirrorOutput = False
def report(self, test, status=None, err=None):
# Restore stdout/stderr but don't flush just yet
self._restoreStdout()
desc = self.getDescription(test)
try:
exception, msg, tb = err
except (TypeError, ValueError):
exception_name = ""
msg = err
tb = None
else:
exception_name = exception.__name__
msg = str(msg)
trace_msg = ""
# Extract line where error happened for easier debugging
trace = traceback.extract_tb(tb)
for t in trace:
# t = (filename, line_number, function_name, raw_line)
if t[2].startswith("test"):
trace_msg = " on file {0} line {1} in {2}: '{3}'".format(*t)
break
# Retrieve the name of the file containing the test
filename = os.path.basename(inspect.getfile(test.__class__))
if status:
if status == "SKIP":
self.stream.writeln("{0} {1} - {2}: {3}".format(
color("skip", "yellow"), self.testsRun, filename, desc)
)
elif status == "EXPECTED_FAILURE":
self.stream.writeln("{0} {1} - {2}: {3}".format(
color("skip", "yellow"), self.testsRun, filename, desc)
)
else:
self.stream.writeln("{0} {1} - {2}: {3}".format(
color("not ok", "red"), self.testsRun, filename, desc)
)
if exception_name:
self.stream.writeln("# {0}: {1}{2}:".format(
status, exception_name, trace_msg)
)
else:
self.stream.writeln("# {0}:".format(status))
# Magic 3 is just for pretty indentation
padding = " " * (len(status) + 3)
for line in msg.splitlines():
# Force displaying new-line characters as literal new lines
line = line.replace("\\n", "\n# ")
self.stream.writeln("#{0}{1}".format(padding, line))
else:
self.stream.writeln("{0} {1} - {2}: {3}".format(
color("ok", "green"), self.testsRun, filename, desc)
)
# Flush all buffers to stdout
self._mergeStdout()
def addSuccess(self, test):
super(TAPTestResult, self).addSuccess(test)
self.report(test)
def addError(self, test, err):
super(TAPTestResult, self).addError(test, err)
self.report(test, "ERROR", err)
def addFailure(self, test, err):
super(TAPTestResult, self).addFailure(test, err)
self.report(test, "FAIL", err)
def addSkip(self, test, reason):
super(TAPTestResult, self).addSkip(test, reason)
self.report(test, "SKIP", reason)
def addExpectedFailure(self, test, err):
super(TAPTestResult, self).addExpectedFailure(test, err)
self.report(test, "EXPECTED_FAILURE", err)
def addUnexpectedSuccess(self, test):
super(TAPTestResult, self).addUnexpectedSuccess(test)
self.report(test, "UNEXPECTED_SUCCESS", str(test))
class TAPTestRunner(unittest.runner.TextTestRunner):
"""A test runner that displays results using the Test Anything Protocol
syntax.
Inherits from TextTestRunner the default runner.
"""
resultclass = TAPTestResult
def run(self, test):
result = self._makeResult()
unittest.signals.registerResult(result)
result.failfast = self.failfast
with warnings.catch_warnings():
if getattr(self, "warnings", None):
# if self.warnings is set, use it to filter all the warnings
warnings.simplefilter(self.warnings)
# if the filter is 'default' or 'always', special-case the
# warnings from the deprecated unittest methods to show them
# no more than once per module, because they can be fairly
# noisy. The -Wd and -Wa flags can be used to bypass this
# only when self.warnings is None.
if self.warnings in ['default', 'always']:
warnings.filterwarnings(
'module',
category=DeprecationWarning,
message='Please use assert\w+ instead.')
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun(test.countTestCases())
try:
test(result)
finally:
stopTestRun = getattr(result, 'stopTestRun', None)
if stopTestRun is not None:
stopTestRun()
return result