mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-08-19 09:53:08 +02:00
Unittest - default to not merging stdout/stderr during tests
* Tests will now check that stderr is used for errors * Tests were adjusted to reflect the API change
This commit is contained in:
parent
c9102fd79c
commit
efdb31fb90
4 changed files with 22 additions and 18 deletions
|
@ -3,10 +3,11 @@ import signal
|
||||||
|
|
||||||
|
|
||||||
class CommandError(Exception):
|
class CommandError(Exception):
|
||||||
def __init__(self, cmd, code, out, err, msg=None):
|
def __init__(self, cmd, code, out, err=None, msg=None):
|
||||||
if msg is None:
|
if msg is None:
|
||||||
msg_suffix = (
|
msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
|
||||||
"\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
|
if err is not None:
|
||||||
|
msg_suffix += (
|
||||||
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
|
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
|
||||||
)
|
)
|
||||||
if code == -signal.SIGABRT:
|
if code == -signal.SIGABRT:
|
||||||
|
|
|
@ -256,7 +256,7 @@ class Task(object):
|
||||||
cmd = (self.taskw, "config", "--", var, value)
|
cmd = (self.taskw, "config", "--", var, value)
|
||||||
return run_cmd_wait(cmd, env=self.env)
|
return run_cmd_wait(cmd, env=self.env)
|
||||||
|
|
||||||
def runSuccess(self, args=(), input=None, merge_streams=True,
|
def runSuccess(self, args=(), input=None, merge_streams=False,
|
||||||
timeout=1):
|
timeout=1):
|
||||||
"""Invoke task with given arguments and fail if exit code != 0
|
"""Invoke task with given arguments and fail if exit code != 0
|
||||||
|
|
||||||
|
@ -272,7 +272,8 @@ class Task(object):
|
||||||
timeout = number of seconds the test will wait for every task call.
|
timeout = number of seconds the test will wait for every task call.
|
||||||
Defaults to 1 second if not specified. Unit is seconds.
|
Defaults to 1 second if not specified. Unit is seconds.
|
||||||
|
|
||||||
Returns (exit_code, stdout, stderr)
|
Returns (exit_code, stdout, stderr) if merge_streams=False
|
||||||
|
(exit_code, output) if merge_streams=True
|
||||||
"""
|
"""
|
||||||
# Create a copy of the command
|
# Create a copy of the command
|
||||||
command = self._command[:]
|
command = self._command[:]
|
||||||
|
@ -288,7 +289,7 @@ class Task(object):
|
||||||
|
|
||||||
return output
|
return output
|
||||||
|
|
||||||
def runError(self, args=(), input=None, merge_streams=True, timeout=1):
|
def runError(self, args=(), input=None, merge_streams=False, timeout=1):
|
||||||
"""Invoke task with given arguments and fail if exit code == 0
|
"""Invoke task with given arguments and fail if exit code == 0
|
||||||
|
|
||||||
Use runSuccess if you want exit_code to be tested automatically and
|
Use runSuccess if you want exit_code to be tested automatically and
|
||||||
|
@ -303,7 +304,8 @@ class Task(object):
|
||||||
timeout = number of seconds the test will wait for every task call.
|
timeout = number of seconds the test will wait for every task call.
|
||||||
Defaults to 1 second if not specified. Unit is seconds.
|
Defaults to 1 second if not specified. Unit is seconds.
|
||||||
|
|
||||||
Returns (exit_code, stdout, stderr)
|
Returns (exit_code, stdout, stderr) if merge_streams=False
|
||||||
|
(exit_code, output) if merge_streams=True
|
||||||
"""
|
"""
|
||||||
# Create a copy of the command
|
# Create a copy of the command
|
||||||
command = self._command[:]
|
command = self._command[:]
|
||||||
|
|
|
@ -13,7 +13,7 @@ try:
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import json
|
import json
|
||||||
from .exceptions import CommandError, TimeoutWaitingForStream, StreamsAreMerged
|
from .exceptions import CommandError, TimeoutWaitingForStream
|
||||||
|
|
||||||
USED_PORTS = set()
|
USED_PORTS = set()
|
||||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||||
|
@ -133,14 +133,15 @@ def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
|
||||||
close_fds=ON_POSIX, env=env)
|
close_fds=ON_POSIX, env=env)
|
||||||
out, err, exit = _get_output(p, input, timeout)
|
out, err, exit = _get_output(p, input, timeout)
|
||||||
|
|
||||||
# If streams were merged err should always be None so represent it with a
|
|
||||||
# more informative object
|
|
||||||
if merge_streams:
|
if merge_streams:
|
||||||
err = StreamsAreMerged()
|
if exit != 0:
|
||||||
|
raise CommandError(cmd, exit, out)
|
||||||
|
else:
|
||||||
|
return exit, out
|
||||||
|
else:
|
||||||
if exit != 0:
|
if exit != 0:
|
||||||
raise CommandError(cmd, exit, out, err)
|
raise CommandError(cmd, exit, out, err)
|
||||||
|
else:
|
||||||
return exit, out, err
|
return exit, out, err
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ class TestBug1441(TestCase):
|
||||||
def test_import_filename(self):
|
def test_import_filename(self):
|
||||||
"""import fails if file doesn't exist"""
|
"""import fails if file doesn't exist"""
|
||||||
command = ("import", "xxx_doesnotexist")
|
command = ("import", "xxx_doesnotexist")
|
||||||
code, out, err = self.t.runError(command, merge_streams=False)
|
code, out, err = self.t.runError(command)
|
||||||
|
|
||||||
self.assertIn("File 'xxx_doesnotexist' not found.", err)
|
self.assertIn("File 'xxx_doesnotexist' not found.", err)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue