import atexit import errno import json import os import shlex import shutil import tempfile import unittest from .exceptions import CommandError from .hooks import Hooks from .utils import run_cmd_wait, run_cmd_wait_nofail, which, task_binary_location from .compat import STRING_TYPE class Task(object): """Manage a task warrior instance A temporary folder is used as data store of task warrior. This class can be instanciated multiple times if multiple taskw clients are needed. A taskw client should not be used after being destroyed. """ DEFAULT_TASK = task_binary_location() def __init__(self, taskw=DEFAULT_TASK): """Initialize a Task warrior (client). The task client runs in a temporary folder. :arg taskw: Task binary to use as client (defaults: task in PATH) """ self.taskw = taskw # Used to specify what command to launch (and to inject faketime) self._command = [self.taskw] # Configuration of the isolated environment self._original_pwd = os.getcwd() self.datadir = tempfile.mkdtemp(prefix="task_") self.taskrc = os.path.join(self.datadir, "test.rc") # Ensure any instance is properly destroyed at session end atexit.register(lambda: self.destroy()) self.reset_env() with open(self.taskrc, "w") as rc: rc.write( "data.location={0}\n" "hooks=off\n" "news.version=2.6.0\n" "".format(self.datadir) ) # Hooks disabled until requested self.hooks = None def __repr__(self): txt = super(Task, self).__repr__() return "{0} running from {1}>".format(txt[:-1], self.datadir) def __call__(self, *args, **kwargs): "aka t = Task() ; t() which is now an alias to t.runSuccess()" return self.runSuccess(*args, **kwargs) def activate_hooks(self): """Enable self.hooks functionality and activate hooks on config""" self.config("hooks", "1") self.hooks = Hooks(self.datadir) def reset_env(self): """Set a new environment derived from the one used to launch the test""" # Copy all env variables to avoid clashing subprocess environments self.env = os.environ.copy() # Make sure no TASKDDATA is isolated self.env["TASKDATA"] = self.datadir # As well as TASKRC self.env["TASKRC"] = self.taskrc def config(self, var, value): """Run setup `var` as `value` in taskd config""" # Add -- to avoid misinterpretation of - in things like UUIDs cmd = (self.taskw, "config", "--", var, value) return run_cmd_wait(cmd, env=self.env, input="y\n") def del_config(self, var): """Remove `var` from taskd config""" cmd = (self.taskw, "config", var) return run_cmd_wait(cmd, env=self.env, input="y\n") @property def taskrc_content(self): """ Returns the contents of the taskrc file. """ with open(self.taskrc, "r") as f: return f.readlines() def export(self, export_filter=None): """Run "task export", return JSON array of exported tasks.""" if export_filter is None: export_filter = "" code, out, err = self.runSuccess( "rc.json.array=1 {0} export" "".format(export_filter) ) return json.loads(out) def export_one(self, export_filter=None): """ Return a dictionary representing the exported task. Will fail if mutliple tasks match the filter. """ result = self.export(export_filter=export_filter) if len(result) != 1: descriptions = [ task.get("description") or "[description-missing]" for task in result ] raise ValueError( "One task should match the '{0}' filter, '{1}' " "matches:\n {2}".format( export_filter or "", len(result), "\n ".join(descriptions) ) ) return result[0] @property def latest(self): return self.export_one("+LATEST") @staticmethod def _split_string_args_if_string(args): """Helper function to parse and split into arguments a single string argument. The string is literally the same as if written in the shell. """ # Enable nicer-looking calls by allowing plain strings if isinstance(args, STRING_TYPE): args = shlex.split(args) return args def runSuccess(self, args="", input=None, merge_streams=False, timeout=5): """Invoke task with given arguments and fail if exit code != 0 Use runError if you want exit_code to be tested automatically and *not* fail if program finishes abnormally. If you wish to pass instructions to task such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every task call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail( command, input, merge_streams=merge_streams, env=self.env, timeout=timeout ) if output[0] != 0: raise CommandError(command, *output) return output def runError(self, args=(), input=None, merge_streams=False, timeout=5): """Invoke task with given arguments and fail if exit code == 0 Use runSuccess if you want exit_code to be tested automatically and *fail* if program finishes abnormally. If you wish to pass instructions to task such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every task call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail( command, input, merge_streams=merge_streams, env=self.env, timeout=timeout ) # output[0] is the exit code if output[0] == 0 or output[0] is None: raise CommandError(command, *output) return output def destroy(self): """Cleanup the data folder and release server port for other instances""" try: shutil.rmtree(self.datadir) except OSError as e: if e.errno == errno.ENOENT: # Directory no longer exists pass else: raise # Prevent future reuse of this instance self.runSuccess = self.__destroyed self.runError = self.__destroyed # self.destroy will get called when the python session closes. # If self.destroy was already called, turn the action into a noop self.destroy = lambda: None def __destroyed(self, *args, **kwargs): raise AttributeError( "Task instance has been destroyed. " "Create a new instance if you need a new client." ) def diag(self, merge_streams_with=None): """Run task diagnostics. This function may fail in which case the exception text is returned as stderr or appended to stderr if merge_streams_with is set. If set, merge_streams_with should have the format: (exitcode, out, err) which should be the output of any previous process that failed. """ try: output = self.runSuccess("diag") except CommandError as e: # If task diag failed add the error to stderr output = (e.code, None, str(e)) if merge_streams_with is None: return output else: # Merge any given stdout and stderr with that of "task diag" code, out, err = merge_streams_with dcode, dout, derr = output # Merge stdout newout = "\n##### Debugging information (task diag): #####\n{0}" if dout is None: newout = newout.format("Not available, check STDERR") else: newout = newout.format(dout) if out is not None: newout = out + newout # And merge stderr newerr = "\n##### Debugging information (task diag): #####\n{0}" if derr is None: newerr = newerr.format("Not available, check STDOUT") else: newerr = newerr.format(derr) if err is not None: newerr = err + derr return code, newout, newerr def faketime(self, faketime=None): """Set a faketime using libfaketime that will affect the following command calls. If faketime is None, faketime settings will be disabled. """ cmd = which("faketime") if cmd is None: raise unittest.SkipTest("libfaketime/faketime is not installed") if self._command[0] == cmd: self._command = self._command[3:] if faketime is not None: # Use advanced time format self._command = [cmd, "-f", faketime] + self._command # vim: ai sts=4 et sw=4