mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-06-26 10:54:26 +02:00
397 lines
13 KiB
Python
397 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
from sys import stderr
|
|
import tempfile
|
|
import shutil
|
|
import stat
|
|
import atexit
|
|
import unittest
|
|
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, binary_location
|
|
from .exceptions import CommandError, HookError
|
|
|
|
|
|
class Hooks(object):
|
|
"""Abstraction to help interact with hooks (add, remove, enable, disable)
|
|
during tests
|
|
"""
|
|
def __init__(self, datadir):
|
|
self.hookdir = os.path.join(datadir, "hooks")
|
|
self._hooks = {}
|
|
|
|
# Check if the hooks dir already exists
|
|
if not os.path.isdir(self.hookdir):
|
|
os.mkdir(self.hookdir)
|
|
|
|
def __repr__(self):
|
|
enabled = []
|
|
disabled = []
|
|
|
|
for hook in self._hooks:
|
|
if self.isactive(hook):
|
|
enabled.append(hook)
|
|
else:
|
|
disabled.append(hook)
|
|
|
|
enabled = ", ".join(enabled) or None
|
|
disabled = ", ".join(disabled) or None
|
|
|
|
return "<Hooks: enabled: {0} | disabled: {1}>".format(enabled,
|
|
disabled)
|
|
|
|
def get_hookfile(self, hookname):
|
|
"""Return location of given hookname"""
|
|
return os.path.join(self.hookdir, hookname)
|
|
|
|
def check_exists(self, hookname):
|
|
"""Checks if the file pointed to by hookfile exists"""
|
|
|
|
hookfile = self.get_hookfile(hookname)
|
|
|
|
if not os.path.isfile(hookfile):
|
|
raise HookError("Hook {0} doesn't exist.".format(hookfile))
|
|
|
|
return hookfile
|
|
|
|
def enable(self, hookname):
|
|
"""Make hookfile executable to allow triggering
|
|
"""
|
|
hookfile = self.check_exists(hookname)
|
|
os.chmod(hookfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
|
|
def disable(self, hookname):
|
|
"""Remove hookfile executable bit to deny triggering
|
|
"""
|
|
hookfile = self.check_exists(hookname)
|
|
os.chmod(hookfile, stat.S_IREAD | stat.S_IWRITE)
|
|
|
|
def isactive(self, hookname):
|
|
"""Check if hook is active by verifying the execute bit
|
|
"""
|
|
hookfile = self.check_exists(hookname)
|
|
return os.access(hookfile, os.X_OK)
|
|
|
|
def add(self, hookname, content, overwrite=False):
|
|
"""Register hook with name 'hookname' and given file content.
|
|
|
|
:param hookname: Should be a string starting with one of:
|
|
- on-launch
|
|
- on-add
|
|
- on-exit
|
|
- on-modify
|
|
|
|
:param content: Content of the file as a (multi-line) string
|
|
:param overwrite: What to do if a hook with same name already exists
|
|
"""
|
|
for hooktype in ("on-launch", "on-add", "on-exit", "on-modify"):
|
|
if hookname.startswith(hooktype):
|
|
break
|
|
else:
|
|
stderr.write("WARNING: {0} is not a valid hook type. "
|
|
"It will not be triggered\n".format(hookname))
|
|
|
|
if hookname in self._hooks and not overwrite:
|
|
raise HookError("Hook with name {0} already exists. "
|
|
"Pass overwrite=True if intended or use "
|
|
"hooks.remove() before.".format(hookname))
|
|
else:
|
|
self._hooks[hookname] = content
|
|
|
|
hookfile = self.get_hookfile(hookname)
|
|
|
|
# Create the hook on disk
|
|
with open(hookfile, 'w') as fh:
|
|
fh.write(content)
|
|
|
|
# Ensure it's executable
|
|
self.enable(hookname)
|
|
|
|
def remove(self, hookname):
|
|
"""Remove the hook matching given hookname"""
|
|
try:
|
|
del self._hooks[hookname]
|
|
except KeyError:
|
|
raise HookError("Hook with name {0} in record".format(hookname))
|
|
|
|
try:
|
|
os.remove(self.get_hookfile(hookname))
|
|
except OSError as e:
|
|
if e.errno == 2:
|
|
raise HookError("Hook with name {0} was not found on hooks/ "
|
|
"folder".format(hookname))
|
|
else:
|
|
raise
|
|
|
|
def clear(self):
|
|
"""Remove all existing hooks and empty the hook registry
|
|
"""
|
|
self._hooks = {}
|
|
|
|
# Remove any existing hooks
|
|
try:
|
|
shutil.rmtree(self.hookdir)
|
|
except OSError as e:
|
|
if e.errno != 2:
|
|
raise
|
|
|
|
os.mkdir(self.hookdir)
|
|
|
|
|
|
class Task(object):
|
|
"""Manage a task warrior instance
|
|
|
|
A temporary folder is used as data store of task warrior.
|
|
This class can be instanciated multiple times if multiple taskw clients are
|
|
needed.
|
|
|
|
This class can be given a Taskd instance for simplified configuration.
|
|
|
|
A taskw client should not be used after being destroyed.
|
|
"""
|
|
DEFAULT_TASK = binary_location("task")
|
|
|
|
def __init__(self, taskd=None, taskw=DEFAULT_TASK):
|
|
"""Initialize a Task warrior (client) that can interact with a taskd
|
|
server. The task client runs in a temporary folder.
|
|
|
|
:arg taskd: Taskd instance for client-server configuration
|
|
:arg taskw: Task binary to use as client (defaults: task in PATH)
|
|
"""
|
|
self.taskw = taskw
|
|
self.taskd = taskd
|
|
|
|
# Used to specify what command to launch (and to inject faketime)
|
|
self._command = [self.taskw]
|
|
|
|
# Configuration of the isolated environment
|
|
self._original_pwd = os.getcwd()
|
|
self.datadir = tempfile.mkdtemp(prefix="task_")
|
|
self.taskrc = os.path.join(self.datadir, "test.rc")
|
|
|
|
# Ensure any instance is properly destroyed at session end
|
|
atexit.register(lambda: self.destroy())
|
|
|
|
self.reset_env()
|
|
|
|
# Cannot call self.config until confirmation is disabled
|
|
with open(self.taskrc, 'w') as rc:
|
|
rc.write("data.location={0}\n"
|
|
"confirmation=no\n"
|
|
"hooks=off\n".format(self.datadir))
|
|
|
|
# Setup configuration to talk to taskd automatically
|
|
if self.taskd is not None:
|
|
self.bind_taskd_server(self.taskd)
|
|
|
|
self.hooks = Hooks(self.datadir)
|
|
|
|
def __repr__(self):
|
|
txt = super(Task, self).__repr__()
|
|
return "{0} running from {1}>".format(txt[:-1], self.datadir)
|
|
|
|
def reset_env(self):
|
|
"""Set a new environment derived from the one used to launch the test
|
|
"""
|
|
# Copy all env variables to avoid clashing subprocess environments
|
|
self.env = os.environ.copy()
|
|
|
|
# Make sure no TASKDDATA is isolated
|
|
self.env["TASKDATA"] = self.datadir
|
|
# As well as TASKRC
|
|
self.env["TASKRC"] = self.taskrc
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
"aka t = Task() ; t() which is now an alias to t.runSuccess()"
|
|
return self.runSuccess(*args, **kwargs)
|
|
|
|
def bind_taskd_server(self, taskd):
|
|
"""Configure the present task client to talk to given taskd server
|
|
|
|
Note that this can be performed automatically by passing taskd when
|
|
creating an instance of the current class.
|
|
"""
|
|
self.taskd = taskd
|
|
|
|
cert = os.path.join(self.taskd.certpath, "test_client.cert.pem")
|
|
key = os.path.join(self.taskd.certpath, "test_client.key.pem")
|
|
self.config("taskd.certificate", cert)
|
|
self.config("taskd.key", key)
|
|
self.config("taskd.ca", self.taskd.ca_cert)
|
|
|
|
address = ":".join((self.taskd.address, str(self.taskd.port)))
|
|
self.config("taskd.server", address)
|
|
|
|
# Also configure the default user for given taskd server
|
|
self.set_taskd_user()
|
|
|
|
def set_taskd_user(self, taskd_user=None, default=True):
|
|
"""Assign a new user user to the present task client
|
|
|
|
If default==False, a new user will be assigned instead of reusing the
|
|
default taskd user for the corresponding instance.
|
|
"""
|
|
if taskd_user is None:
|
|
if default:
|
|
user, group, org, userkey = self.taskd.default_user
|
|
else:
|
|
user, group, org, userkey = self.taskd.create_user()
|
|
else:
|
|
user, group, org, userkey = taskd_user
|
|
|
|
credentials = "/".join((org, user, userkey))
|
|
self.config("taskd.credentials", credentials)
|
|
|
|
self.credentials = {
|
|
"user": user,
|
|
"group": group,
|
|
"org": org,
|
|
"userkey": userkey,
|
|
}
|
|
|
|
def config(self, var, value):
|
|
"""Run setup `var` as `value` in taskd config
|
|
"""
|
|
# Add -- to avoid misinterpretation of - in things like UUIDs
|
|
cmd = (self.taskw, "config", "--", var, value)
|
|
return run_cmd_wait(cmd, env=self.env)
|
|
|
|
def runSuccess(self, args=(), input=None, merge_streams=True):
|
|
"""Invoke task with given arguments and fail if exit code != 0
|
|
|
|
Use runError if you want exit_code to be tested automatically and
|
|
*not* fail if program finishes abnormally.
|
|
|
|
If you wish to pass instructions to task such as confirmations or other
|
|
input via stdin, you can do so by providing a input string.
|
|
Such as input="y\ny".
|
|
|
|
If merge_streams=True stdout and stderr will be merged into stdout.
|
|
|
|
Returns (exit_code, stdout, stderr)
|
|
"""
|
|
# Create a copy of the command
|
|
command = self._command[:]
|
|
command.extend(args)
|
|
|
|
output = run_cmd_wait_nofail(command, input,
|
|
merge_streams=merge_streams, env=self.env)
|
|
|
|
if output[0] != 0:
|
|
raise CommandError(command, *output)
|
|
|
|
return output
|
|
|
|
def runError(self, args=(), input=None, merge_streams=True):
|
|
"""Invoke task with given arguments and fail if exit code == 0
|
|
|
|
Use runSuccess if you want exit_code to be tested automatically and
|
|
*fail* if program finishes abnormally.
|
|
|
|
If you wish to pass instructions to task such as confirmations or other
|
|
input via stdin, you can do so by providing a input string.
|
|
Such as input="y\ny".
|
|
|
|
If merge_streams=True stdout and stderr will be merged into stdout.
|
|
|
|
Returns (exit_code, stdout, stderr)
|
|
"""
|
|
# Create a copy of the command
|
|
command = self._command[:]
|
|
command.extend(args)
|
|
|
|
output = run_cmd_wait_nofail(command, input,
|
|
merge_streams=merge_streams, env=self.env)
|
|
|
|
# output[0] is the exit code
|
|
if output[0] == 0 or output[0] is None:
|
|
raise CommandError(command, *output)
|
|
|
|
return output
|
|
|
|
def destroy(self):
|
|
"""Cleanup the data folder and release server port for other instances
|
|
"""
|
|
try:
|
|
shutil.rmtree(self.datadir)
|
|
except OSError as e:
|
|
if e.errno == 2:
|
|
# Directory no longer exists
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
# Prevent future reuse of this instance
|
|
self.runSuccess = self.__destroyed
|
|
self.runError = self.__destroyed
|
|
|
|
# self.destroy will get called when the python session closes.
|
|
# If self.destroy was already called, turn the action into a noop
|
|
self.destroy = lambda: None
|
|
|
|
def __destroyed(self, *args, **kwargs):
|
|
raise AttributeError("Task instance has been destroyed. "
|
|
"Create a new instance if you need a new client.")
|
|
|
|
def diag(self, merge_streams_with=None):
|
|
"""Run task diagnostics.
|
|
|
|
This function may fail in which case the exception text is returned as
|
|
stderr or appended to stderr if merge_streams_with is set.
|
|
|
|
If set, merge_streams_with should have the format:
|
|
(exitcode, out, err)
|
|
which should be the output of any previous process that failed.
|
|
"""
|
|
try:
|
|
output = self.runSuccess(("diag",))
|
|
except CommandError as e:
|
|
# If task diag failed add the error to stderr
|
|
output = (e.code, None, str(e))
|
|
|
|
if merge_streams_with is None:
|
|
return output
|
|
else:
|
|
# Merge any given stdout and stderr with that of "task diag"
|
|
code, out, err = merge_streams_with
|
|
dcode, dout, derr = output
|
|
|
|
# Merge stdout
|
|
newout = "\n##### Debugging information (task diag): #####\n{0}"
|
|
if dout is None:
|
|
newout = newout.format("Not available, check STDERR")
|
|
else:
|
|
newout = newout.format(dout)
|
|
|
|
if out is not None:
|
|
newout = out + newout
|
|
|
|
# And merge stderr
|
|
newerr = "\n##### Debugging information (task diag): #####\n{0}"
|
|
if derr is None:
|
|
newerr = newerr.format("Not available, check STDOUT")
|
|
else:
|
|
newerr = newerr.format(derr)
|
|
|
|
if err is not None:
|
|
newerr = err + derr
|
|
|
|
return code, newout, newerr
|
|
|
|
def faketime(self, faketime=None):
|
|
"""Set a faketime using libfaketime that will affect the following
|
|
command calls.
|
|
|
|
If faketime is None, faketime settings will be disabled.
|
|
"""
|
|
cmd = which("faketime")
|
|
if cmd is None:
|
|
raise unittest.SkipTest("libfaketime/faketime is not installed")
|
|
|
|
if self._command[0] == cmd:
|
|
self._command = self._command[3:]
|
|
|
|
if faketime is not None:
|
|
# Use advanced time format
|
|
self._command = [cmd, "-f", faketime] + self._command
|
|
|
|
# vim: ai sts=4 et sw=4
|