# -*- coding: utf-8 -*- import atexit import errno import json import os import shlex import shutil import tempfile import unittest from .exceptions import CommandError from .hooks import Hooks from .utils import run_cmd_wait, run_cmd_wait_nofail, which, task_binary_location class Task(object): """ Manage a task warrior instance A temporary folder is used as data store of task warrior. This class can be instantiated multiple times if multiple taskw clients are needed. This class can be given a Taskd instance for simplified configuration. A taskw client should not be used after being destroyed. """ DEFAULT_TASK = task_binary_location() def __init__(self, taskw=DEFAULT_TASK, datadir=tempfile.mkdtemp(prefix="task_"), taskrc=None): """ Initialize a Task warrior (client) that can interact with a taskd server. The task client runs in a temporary folder. :arg taskw: Task binary to use as client (defaults: task in PATH) """ self.taskw = taskw # Used to specify what command to launch (and to inject faketime) self._command = [self.taskw] # Configuration of the isolated environment self._original_pwd = os.getcwd() self.datadir = datadir self.taskrc = os.path.join(self.datadir, "test.rc") if taskrc is None else taskrc # Ensure any instance is properly destroyed at session end atexit.register(lambda: self.destroy()) self.reset_env() with open(self.taskrc, 'w') as rc: rc.write("data.location={0}\n" "hooks=off\n" "".format(self.datadir)) # Hooks disabled until requested self.hooks = Hooks(self.datadir) def __repr__(self): txt = super(Task, self).__repr__() return "{0} running from {1}>".format(txt[:-1], self.datadir) def __call__(self, *args, **kwargs): """aka t = Task() ; t() which is now an alias to t.runSuccess()""" return self.runSuccess(*args, **kwargs) def reset(self, keep_config=False, keep_hooks=False): """Reset this instance to its maiden state""" self._purge_folder(self.datadir) if keep_hooks is False and self.hooks is not None: self.hooks.clear() if keep_config is False: open(self.taskrc, 'w').close() pass def activate_hooks(self): """Enable self.hooks functionality and activate hooks on config""" self.config("hooks", "on") def deactivate_hooks(self): """Enable self.hooks functionality and activate hooks on config""" self.config("hooks", "off") def reset_env(self): """Set a new environment derived from the one used to launch the test""" # Copy all env variables to avoid clashing subprocess environments self.env = os.environ.copy() # Make sure no TASKDDATA is isolated self.env["TASKDATA"] = self.datadir # As well as TASKRC self.env["TASKRC"] = self.taskrc def config(self, var, value): """Run setup `var` as `value` in task config""" # Add -- to avoid misinterpretation of - in things like UUIDs cmd = (self.taskw, "rc.confirmation=off", "config", "--", var, value) return run_cmd_wait(cmd, env=self.env) def del_config(self, var): """Remove `var` from taskd config""" cmd = (self.taskw, "config", var) return run_cmd_wait(cmd, env=self.env) @property def taskrc_content(self): """Returns the contents of the taskrc file.""" with open(self.taskrc, "r") as f: return f.readlines() def export(self, export_filter=None): """Run "task export", return JSON array of exported tasks.""" if export_filter is None: export_filter = "" code, out, err = self.runSuccess("rc.json.array=1 {0} export" "".format(export_filter)) return json.loads(out) def export_one(self, export_filter=None): """ Return a dictionary representing the exported task. Will fail if multiple tasks match the filter. """ result = self.export(export_filter=export_filter) if len(result) != 1: descriptions = [task.get('description') or '[description-missing]' for task in result] raise ValueError( "One task should match the '{0}' filter, '{1}' " "matches:\n {2}".format( export_filter or '', len(result), '\n '.join(descriptions) )) return result[0] @property def latest(self): return self.export_one("+LATEST") @staticmethod def _split_string_args_if_string(args): """ Helper function to parse and split into arguments a single string argument. The string is literally the same as if written in the shell. """ # Enable nicer-looking calls by allowing plain strings if isinstance(args, str): args = shlex.split(args) return args def runSuccess(self, args="", input=None, merge_streams=False, timeout=5): """ Invoke task with given arguments and fail if exit code != 0 Use runError if you want exit_code to be tested automatically and *not* fail if program finishes abnormally. If you wish to pass instructions to task such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every task call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail(command, input, merge_streams=merge_streams, env=self.env, timeout=timeout) if output[0] != 0: raise CommandError(command, *output) return output def runError(self, args=(), input=None, merge_streams=False, timeout=5): """ Invoke task with given arguments and fail if exit code == 0 Use runSuccess if you want exit_code to be tested automatically and *fail* if program finishes abnormally. If you wish to pass instructions to task such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every task call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail(command, input, merge_streams=merge_streams, env=self.env, timeout=timeout) # output[0] is the exit code if output[0] == 0 or output[0] is None: raise CommandError(command, *output) return output def destroy(self): """Cleanup the data folder and release server port for other instances""" try: shutil.rmtree(self.datadir) except OSError as e: if e.errno == errno.ENOENT: # Directory no longer exists pass else: raise # Prevent future reuse of this instance self.runSuccess = self.__destroyed self.runError = self.__destroyed # self.destroy will get called when the python session closes. # If self.destroy was already called, turn the action into a noop self.destroy = lambda: None def __destroyed(self, *args, **kwargs): raise AttributeError("Task instance has been destroyed. " "Create a new instance if you need a new client.") def diag(self, merge_streams_with=None): """ Run task diagnostics. This function may fail in which case the exception text is returned as stderr or appended to stderr if merge_streams_with is set. If set, merge_streams_with should have the format: (exitcode, out, err) which should be the output of any previous process that failed. """ try: output = self.runSuccess("diag") except CommandError as e: # If task diag failed add the error to stderr output = (e.code, None, str(e)) if merge_streams_with is None: return output else: # Merge any given stdout and stderr with that of "task diag" code, out, err = merge_streams_with dcode, dout, derr = output # Merge stdout newout = "\n##### Debugging information (task diag): #####\n{0}" if dout is None: newout = newout.format("Not available, check STDERR") else: newout = newout.format(dout) if out is not None: newout = out + newout # And merge stderr newerr = "\n##### Debugging information (task diag): #####\n{0}" if derr is None: newerr = newerr.format("Not available, check STDOUT") else: newerr = newerr.format(derr) if err is not None: newerr = err + derr return code, newout, newerr def faketime(self, faketime=None): """ Set a faketime using libfaketime that will affect the following command calls. If faketime is None, faketime settings will be disabled. """ cmd = which("faketime") if cmd is None: raise unittest.SkipTest("libfaketime/faketime is not installed") if self._command[0] == cmd: self._command = self._command[3:] if faketime is not None: # Use advanced time format self._command = [cmd, "-f", faketime] + self._command @staticmethod def _purge_folder(folder): if not os.path.exists(folder): return for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: print("Removing {}".format(file_path)) if os.path.isfile(file_path) or os.path.islink(file_path): os.remove(file_path) except Exception as e: print("Failed to delete {}. Reason: {}".format(file_path, e))