Merge pull request #7 in TM/task from ~UNODE/task:2.4.0 to 2.4.0

* commit '9bd7b336f9':
  Unittest - Don't use unittest.TestCase use basetest.TestCase instead
  Unittest - Example of TAP diag use in template.t
  Unittest - Stream blocking tests can now be safely performed
  Unittest - CommandError exception treats SIGABRT specially
  Unittest - exit code may be None if process failed to finish
  Unittest - Add example of skipping taskd test on template.t
  Unittest - Add code to check if taskd is available
  Unittest - Add which() backported from py3.3
  Unittest - Don't display "task diag" by default on error
  Unittest - Don't escape new-line characters in TAP output
  Unittest - Bring back diag() for generating TAP output
This commit is contained in:
Paul Beckingham 2014-07-15 02:05:53 +00:00
commit 9b6b1cf5f8
15 changed files with 199 additions and 34 deletions

View file

@ -2,5 +2,6 @@
from .task import Task
from .taskd import Taskd
from .testing import TestCase
# vim: ai sts=4 et sw=4

View file

@ -1,9 +1,15 @@
# -*- coding: utf-8 -*-
import signal
class CommandError(Exception):
def __init__(self, cmd, code, out, err, msg=None):
if msg is None:
if code == signal.SIGABRT:
self.msg = ("Command '{0}' was aborted, likely due to not "
"finishing in due time. The exit code was "
"'{1}':\nStdout: '{2}'\nStderr: '{3}'")
else:
self.msg = ("Command '{0}' finished with unexpected exit code "
"'{1}':\nStdout: '{2}'\nStderr: '{3}'")
else:

View file

@ -127,7 +127,6 @@ class Task(object):
merge_streams=merge_streams, env=self.env)
if output[0] != 0:
output = self.diag(merge_streams_with=output)
raise CommandError(command, *output)
return output
@ -153,8 +152,7 @@ class Task(object):
merge_streams=merge_streams, env=self.env)
# output[0] is the exit code
if output[0] == 0:
output = self.diag(merge_streams_with=output)
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output

View file

@ -7,7 +7,8 @@ import signal
import atexit
from time import sleep
from subprocess import Popen
from .utils import find_unused_port, release_port, port_used, run_cmd_wait
from .utils import (find_unused_port, release_port, port_used, run_cmd_wait,
which)
from .exceptions import CommandError
try:
@ -33,7 +34,10 @@ class Taskd(object):
A server can be stopped and started multiple times, but should not be
started or stopped after being destroyed.
"""
def __init__(self, taskd="taskd", certpath=None, address="127.0.0.1"):
DEFAULT_TASKD = "taskd"
def __init__(self, taskd=DEFAULT_TASKD, certpath=None,
address="127.0.0.1"):
"""Initialize a Task server that runs in the background and stores data
in a temporary folder
@ -260,4 +264,12 @@ class Taskd(object):
raise AttributeError("Taskd instance has been destroyed. "
"Create a new instance if you need a new server.")
@classmethod
def not_available(cls):
"""Check if the taskd binary is available in the path"""
if which(cls.DEFAULT_TASKD):
return False
else:
return True
# vim: ai sts=4 et sw=4

15
test/basetest/testing.py Normal file
View file

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import unittest
import sys
class TestCase(unittest.TestCase):
def diag(self, out):
sys.stdout.write("# --- diag start ---\n")
for line in out.split("\n"):
sys.stdout.write("# " + line + "\n")
sys.stdout.write("# --- diag end ---\n")
# vim: ai sts=4 et sw=4

View file

@ -1,10 +1,77 @@
# -*- coding: utf-8 -*-
from __future__ import division
import os
import sys
import socket
import signal
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from Queue import Queue, Empty
from time import sleep
from .exceptions import CommandError
USED_PORTS = set()
ON_POSIX = 'posix' in sys.builtin_module_names
def wait_process(proc, timeout=1):
"""Wait for process to finish
"""
sleeptime = .1
# Max number of attempts until giving up
tries = int(timeout / sleeptime)
# Wait for up to a second for the process to finish and avoid zombies
for i in range(tries):
exit = proc.poll()
if exit is not None:
break
sleep(sleeptime)
return exit
def _get_output(proc, input):
"""Collect output from the subprocess without blocking the main process if
subprocess hangs.
"""
def queue_output(proc, input, outq, errq):
"""Read/Write output/input of given process.
This function is meant to be executed in a thread as it may block
"""
# Send input and wait for finish
out, err = proc.communicate(input)
# Give the output back to the caller
outq.put(out)
errq.put(err)
outq = Queue()
errq = Queue()
t = Thread(target=queue_output, args=(proc, input, outq, errq))
t.daemon = True
t.start()
# A task process shouldn't take longer than 1 second to finish
exit = wait_process(proc)
# If it does take longer than 1 second, abort it
if exit is None:
proc.send_signal(signal.SIGABRT)
exit = wait_process(proc)
try:
out = outq.get_nowait()
except Empty:
out = None
try:
err = errq.get_nowait()
except Empty:
err = None
return out, err
def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
@ -21,16 +88,9 @@ def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
else:
stderr = PIPE
p = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr, env=env)
out, err = p.communicate(input)
# In python3 we will be able use the following instead of the previous
# line to avoid locking if task is unexpectedly waiting for input
# try:
# out, err = p.communicate(input, timeout=15)
# except TimeoutExpired:
# p.kill()
# out, err = proc.communicate()
p = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=1,
close_fds=ON_POSIX, env=env)
out, err = _get_output(p, input)
if p.returncode != 0:
raise CommandError(cmd, p.returncode, out, err)
@ -95,4 +155,71 @@ def release_port(port):
except KeyError:
pass
try:
from shutil import which
except ImportError:
# NOTE: This is shutil.which backported from python-3.3.3
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode)
and not os.path.isdir(fn))
# If we're given a path with a directory part, look it up directly
# rather than referring to PATH directories. This includes checking
# relative to the current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None
if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)
if sys.platform == "win32":
# The current directory takes precedence on Windows.
if os.curdir not in path:
path.insert(0, os.curdir)
# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path
# extensions. This will allow us to short circuit when given
# "python.exe". If it does match, only test that one, otherwise we
# have to try others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]
seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if normdir not in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
# vim: ai sts=4 et sw=4

View file

@ -33,10 +33,10 @@ import unittest
# Ensure python finds the local simpletap and basetest modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class TestBug1254(unittest.TestCase):
class TestBug1254(TestCase):
def setUp(self):
self.t = Task()

View file

@ -32,10 +32,10 @@ import unittest
# Ensure python finds the local simpletap and basetest modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class TestBug1267(unittest.TestCase):
class TestBug1267(TestCase):
def setUp(self):
self.t = Task()

View file

@ -33,10 +33,10 @@ import unittest
# Ensure python finds the local simpletap and basetest modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class BaseTestBug360(unittest.TestCase):
class BaseTestBug360(TestCase):
def setUp(self):
"""Executed before each test in the class"""
self.t = Task()

View file

@ -71,6 +71,8 @@ class TAPTestResult(unittest.result.TestResult):
self.stream.writeln("# {0}: {1}".format(status, exception))
padding = " " * (len(status) + 3)
for line in msg.splitlines():
# Force displaying new-line characters as literal new lines
line = line.replace("\\n", "\n")
self.stream.writeln("#{0}{1}".format(padding, line))
else:
self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc))

View file

@ -8,10 +8,10 @@ from datetime import datetime
# Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task, Taskd
from basetest import Task, Taskd, TestCase
class TestCase(unittest.TestCase):
class TestBugNumber(TestCase):
@classmethod
def setUpClass(cls):
"""Executed once before any test in the class"""
@ -34,6 +34,9 @@ class TestCase(unittest.TestCase):
expected = "Copyright \(C\) \d{4} - %d" % (datetime.now().year,)
self.assertRegexpMatches(out.decode("utf8"), expected)
# TAP diagnostics on the bas
self.diag("Yay TAP diagnostics")
def test_fail_other(self):
"""Nothing to do with Copyright"""
self.assertEqual("I like to code", "I like\nto code\n")
@ -50,7 +53,8 @@ class TestCase(unittest.TestCase):
"""Executed once after all tests in the class"""
class ServerTestCase(unittest.TestCase):
@unittest.skipIf(Taskd.not_available(), "Taskd binary not available")
class ServerTestCase(TestCase):
@classmethod
def setUpClass(cls):
cls.taskd = Taskd()

View file

@ -32,10 +32,10 @@ import unittest
# Ensure python finds the local simpletap and basetest modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class TestBug1300(unittest.TestCase):
class TestBug1300(TestCase):
@classmethod
def setUp(cls):
cls.t = Task()

View file

@ -31,10 +31,10 @@ import os
import unittest
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class TestBug1306(unittest.TestCase):
class TestBug1306(TestCase):
def setUp(self):
self.t = Task()

View file

@ -32,10 +32,10 @@ import unittest
# Ensure python finds the local simpletap and basetest modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class Test285(unittest.TestCase):
class Test285(TestCase):
@classmethod
def setUpClass(cls):
cls.t = Task()

View file

@ -33,10 +33,10 @@ from datetime import datetime
# Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task
from basetest import Task, TestCase
class TestVersion(unittest.TestCase):
class TestVersion(TestCase):
def setUp(self):
self.t = Task()