mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-08-20 04:13:07 +02:00
Unittest - Allow specifying custom timeout for slow commands.
* Useful when testing with big tasks (1000+ annotations) or sync operations that take longer than 1 second (default)
This commit is contained in:
parent
b236e78f2e
commit
bb060d5ff8
2 changed files with 25 additions and 9 deletions
|
@ -177,7 +177,8 @@ class Task(object):
|
|||
with open(self.taskrc, 'w') as rc:
|
||||
rc.write("data.location={0}\n"
|
||||
"confirmation=no\n"
|
||||
"hooks=off\n".format(self.datadir))
|
||||
"hooks=off\n"
|
||||
"".format(self.datadir))
|
||||
|
||||
# Setup configuration to talk to taskd automatically
|
||||
if self.taskd is not None:
|
||||
|
@ -255,7 +256,8 @@ class Task(object):
|
|||
cmd = (self.taskw, "config", "--", var, value)
|
||||
return run_cmd_wait(cmd, env=self.env)
|
||||
|
||||
def runSuccess(self, args=(), input=None, merge_streams=True):
|
||||
def runSuccess(self, args=(), input=None, merge_streams=True,
|
||||
timeout=1):
|
||||
"""Invoke task with given arguments and fail if exit code != 0
|
||||
|
||||
Use runError if you want exit_code to be tested automatically and
|
||||
|
@ -267,6 +269,9 @@ class Task(object):
|
|||
|
||||
If merge_streams=True stdout and stderr will be merged into stdout.
|
||||
|
||||
timeout = number of seconds the test will wait for every task call.
|
||||
Defaults to 1 second if not specified. Unit is seconds.
|
||||
|
||||
Returns (exit_code, stdout, stderr)
|
||||
"""
|
||||
# Create a copy of the command
|
||||
|
@ -274,14 +279,16 @@ class Task(object):
|
|||
command.extend(args)
|
||||
|
||||
output = run_cmd_wait_nofail(command, input,
|
||||
merge_streams=merge_streams, env=self.env)
|
||||
merge_streams=merge_streams,
|
||||
env=self.env,
|
||||
timeout=timeout)
|
||||
|
||||
if output[0] != 0:
|
||||
raise CommandError(command, *output)
|
||||
|
||||
return output
|
||||
|
||||
def runError(self, args=(), input=None, merge_streams=True):
|
||||
def runError(self, args=(), input=None, merge_streams=True, timeout=1):
|
||||
"""Invoke task with given arguments and fail if exit code == 0
|
||||
|
||||
Use runSuccess if you want exit_code to be tested automatically and
|
||||
|
@ -293,6 +300,9 @@ class Task(object):
|
|||
|
||||
If merge_streams=True stdout and stderr will be merged into stdout.
|
||||
|
||||
timeout = number of seconds the test will wait for every task call.
|
||||
Defaults to 1 second if not specified. Unit is seconds.
|
||||
|
||||
Returns (exit_code, stdout, stderr)
|
||||
"""
|
||||
# Create a copy of the command
|
||||
|
@ -300,7 +310,9 @@ class Task(object):
|
|||
command.extend(args)
|
||||
|
||||
output = run_cmd_wait_nofail(command, input,
|
||||
merge_streams=merge_streams, env=self.env)
|
||||
merge_streams=merge_streams,
|
||||
env=self.env,
|
||||
timeout=timeout)
|
||||
|
||||
# output[0] is the exit code
|
||||
if output[0] == 0 or output[0] is None:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue