import atexit import datetime import json import os import shlex import shutil import tempfile import unittest from .exceptions import CommandError from .utils import run_cmd_wait, run_cmd_wait_nofail, which, timew_binary_location, DEFAULT_EXTENSION_PATH class Timew(object): """ Manage a Timewarrior instance A temporary folder is used as data store of timewarrior. A timew client should not be used after being destroyed. """ DEFAULT_TIMEW = timew_binary_location() def __init__(self, timew=DEFAULT_TIMEW, datadir=tempfile.mkdtemp(prefix="timew_"), configdir=tempfile.mkdtemp(prefix="timew_")): """ Initialize a timewarrior (client). The program runs in a temporary folder. :arg timew: Timewarrior binary to use as client (defaults: timew in PATH) """ self.timew = timew # Used to specify what command to launch (and to inject faketime) self._command = [self.timew] # Configuration of the isolated environment self._original_pwd = os.getcwd() self.datadir = datadir self.configDir = configdir self.timewrc = os.path.join(self.configDir, 'timewarrior.cfg') self.extdir = os.path.join(self.datadir, 'extensions') # Ensure any instance is properly destroyed at session end atexit.register(lambda: self.destroy()) self.reset_env() def reset(self, keep_config=False, keep_extensions=False): """Reset this instance to its maiden state""" self._purge_folder(self.datadir) if keep_extensions is False: self._purge_folder(self.extdir) if keep_config is False: open(self.timewrc, 'w').close() def add_default_extension(self, filename): """Add default extension to current instance""" if not os.path.isdir(self.extdir): os.mkdir(self.extdir) extfile = os.path.join(self.extdir, filename) if os.path.isfile(extfile): raise "{} already exists".format(extfile) shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile) def __repr__(self): txt = super(Timew, self).__repr__() return "{0} running from {1}>".format(txt[:-1], self.datadir) def __call__(self, *args, **kwargs): """aka t = Timew() ; t() which is now an alias to t.runSuccess()""" return self.runSuccess(*args, **kwargs) def reset_env(self): """Set a new environment derived from the one used to launch the test""" # Copy all env variables to avoid clashing subprocess environments self.env = os.environ.copy() # As well as TIMEWARRIORDB self.env["TIMEWARRIORDB"] = self.datadir # As well as MANPATH, so that the help tests can find the # uninstalled man pages parts = self.timew.split(os.path.sep)[0:-2] parts.append("doc") self.env["MANPATH"] = os.path.sep.join(parts) def config(self, var, value): """Run setup `var` as `value` in timew config""" cmd = (self.timew, ":yes", "config", var, value) return run_cmd_wait(cmd, env=self.env) @staticmethod def _create_exclusion_interval(interval): if not isinstance(interval, tuple): raise TypeError("Please specify interval(s) as a tuple or a list of tuples") if interval[0] is not None and not isinstance(interval[0], datetime.time): raise TypeError("Start date must be a datetime.time but is {}".format(type(interval[0]))) if interval[1] is not None and not isinstance(interval[1], datetime.time): raise TypeError("End date must be a datetime.time but is {}".format(type(interval[1]))) if interval[0] is None: return "<{:%H:%M:%S}".format(interval[1]) if interval[1] is None: return ">{:%H:%M:%S}".format(interval[0]) if interval[0] > interval[1]: return "<{:%H:%M:%S} >{:%H:%M:%S}".format(interval[1], interval[0]) return "{:%H:%M:%S}-{:%H:%M:%S}".format(interval[0], interval[1]) def configure_exclusions(self, intervals): if isinstance(intervals, list): exclusion = " ".join([self._create_exclusion_interval(interval) for interval in intervals]) else: exclusion = self._create_exclusion_interval(intervals) self.config("exclusions.monday", exclusion) self.config("exclusions.tuesday", exclusion) self.config("exclusions.wednesday", exclusion) self.config("exclusions.thursday", exclusion) self.config("exclusions.friday", exclusion) self.config("exclusions.saturday", exclusion) self.config("exclusions.sunday", exclusion) def del_config(self, var): """Remove `var` from timew config""" cmd = (self.timew, ":yes", "config", var) return run_cmd_wait(cmd, env=self.env) @property def timewrc_content(self): """Returns the contents of the timewrc file.""" with open(self.timewrc, "r") as f: return f.readlines() def export(self, export_filter=None): """Run "timew export", return JSON array of exported intervals.""" if export_filter is None: export_filter = "" code, out, err = self.runSuccess("{0} export".format(export_filter)) return json.loads(out) @staticmethod def _split_string_args_if_string(args): """ Helper function to parse and split into arguments a single string argument. The string is literally the same as if written in the shell. """ # Enable nicer-looking calls by allowing plain strings if isinstance(args, str): args = shlex.split(args) return args def runSuccess(self, args="", input=None, merge_streams=False, timeout=5): """ Invoke timew with given arguments and fail if exit code != 0 Use runError if you want exit_code to be tested automatically and *not* fail if program finishes abnormally. If you wish to pass instructions to timew such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every timew call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail(command, input, merge_streams=merge_streams, env=self.env, timeout=timeout) if output[0] != 0: raise CommandError(command, *output) return output def runError(self, args=(), input=None, merge_streams=False, timeout=5): """ Invoke timew with given arguments and fail if exit code == 0 Use runSuccess if you want exit_code to be tested automatically and *fail* if program finishes abnormally. If you wish to pass instructions to timew such as confirmations or other input via stdin, you can do so by providing a input string. Such as input="y\ny\n". If merge_streams=True stdout and stderr will be merged into stdout. timeout = number of seconds the test will wait for every timew call. Defaults to 1 second if not specified. Unit is seconds. Returns (exit_code, stdout, stderr) if merge_streams=False (exit_code, output) if merge_streams=True """ # Create a copy of the command command = self._command[:] args = self._split_string_args_if_string(args) command.extend(args) output = run_cmd_wait_nofail(command, input, merge_streams=merge_streams, env=self.env, timeout=timeout) # output[0] is the exit code if output[0] == 0 or output[0] is None: raise CommandError(command, *output) return output def destroy(self): """Cleanup the data folder and release server port for other instances""" try: shutil.rmtree(self.datadir) except OSError as e: if e.errno == 2: # Directory no longer exists pass else: raise # Prevent future reuse of this instance self.runSuccess = self.__destroyed self.runError = self.__destroyed # self.destroy will get called when the python session closes. # If self.destroy was already called, turn the action into a noop self.destroy = lambda: None def __destroyed(self, *args, **kwargs): raise AttributeError("Program instance has been destroyed. " "Create a new instance if you need a new client.") def faketime(self, faketime=None): """ Set a faketime using libfaketime that will affect the following command calls. If faketime is None, faketime settings will be disabled. """ cmd = which("faketime") if cmd is None: raise unittest.SkipTest("libfaketime/faketime is not installed") if self._command[0] == cmd: self._command = self._command[3:] if faketime is not None: # Use advanced time format self._command = [cmd, "-f", faketime] + self._command @staticmethod def _purge_folder(folder): if not os.path.exists(folder): return for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print("Failed to delete {}. Reason: {}".format(file_path, e))