From 29479d064e38e956088f76f9e04bcce08186c10a Mon Sep 17 00:00:00 2001 From: Thomas Lauf Date: Mon, 2 Jan 2023 22:02:14 +0100 Subject: [PATCH] Add E2E tests --- .github/workflows/tests.yaml | 45 +++ docker-compose.yml | 71 +++++ test/basetest/__init__.py | 3 + test/basetest/exceptions.py | 52 ++++ test/basetest/hooks.py | 517 +++++++++++++++++++++++++++++++++++ test/basetest/meta.py | 36 +++ test/basetest/task.py | 335 +++++++++++++++++++++++ test/basetest/testing.py | 117 ++++++++ test/basetest/timew.py | 294 ++++++++++++++++++++ test/basetest/utils.py | 469 +++++++++++++++++++++++++++++++ test/docker/Dockerfile | 62 +++++ test/test_on-modify.py | 216 --------------- test/test_on-modify_e2e.py | 203 ++++++++++++++ 13 files changed, 2204 insertions(+), 216 deletions(-) create mode 100644 docker-compose.yml create mode 100644 test/basetest/__init__.py create mode 100644 test/basetest/exceptions.py create mode 100644 test/basetest/hooks.py create mode 100644 test/basetest/meta.py create mode 100644 test/basetest/task.py create mode 100644 test/basetest/testing.py create mode 100644 test/basetest/timew.py create mode 100644 test/basetest/utils.py create mode 100644 test/docker/Dockerfile delete mode 100755 test/test_on-modify.py create mode 100755 test/test_on-modify_e2e.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6bc4456..8adb869 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -16,3 +16,48 @@ jobs: venv/bin/pip install --upgrade pip venv/bin/pip install pytest mockito venv/bin/pytest test/test_on-modify_unit.py + + e2e-tests: + needs: unit-tests + strategy: + fail-fast: false + matrix: + include: + - name: "task [stable] & timew [stable]" + runner: ubuntu-latest + container: task-stable-timew-stable + - name: "task [develop] & timew [stable]" + runner: ubuntu-latest + container: task-develop-timew-stable + - name: "task [stable] & timew [develop]" + runner: ubuntu-latest + container: task-stable-timew-develop + - name: "task [develop] & timew [develop]" + runner: ubuntu-latest + container: task-develop-timew-develop + + runs-on: ${{ matrix.runner }} + continue-on-error: ${{ matrix.continue-on-error == true }} + steps: + - uses: actions/checkout@v3 + - name : Login to GHCR + uses : docker/login-action@v2.1.0 + with : + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build ${{ matrix.name }} + env: + REGISTRY: ghcr.io + OWNER: ${{ github.repository_owner }} + GITHUB_USER: ${{ github.repository_owner }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CONTAINER: ${{ matrix.container }} + run: docker-compose build ${CONTAINER} + - name: Test ${{ matrix.name }} + env: + REGISTRY: ghcr.io + OWNER: ${{ github.repository_owner }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CONTAINER: ${{ matrix.container }} + run: docker-compose run ${CONTAINER} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2519511 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,71 @@ +############################################################################### +# +# Copyright 2023, Gothenburg Bit Factory +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# https://www.opensource.org/licenses/mit-license.php +# +############################################################################### + +version: '3' +services: + task-stable-timew-stable: + build: + context: . + dockerfile: test/docker/Dockerfile + args: + TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:stable-stable" + network_mode: "host" + security_opt: + - label=type:container_runtime_t + tty: true + + task-develop-timew-stable: + build: + context: . + dockerfile: test/docker/Dockerfile + args: + TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:develop-stable" + network_mode: "host" + security_opt: + - label=type:container_runtime_t + tty: true + + task-stable-timew-develop: + build: + context: . + dockerfile: test/docker/Dockerfile + args: + TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:stable-develop" + network_mode: "host" + security_opt: + - label=type:container_runtime_t + tty: true + + task-develop-timew-develop: + build: + context: . + dockerfile: test/docker/Dockerfile + args: + TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:develop-develop" + network_mode: "host" + security_opt: + - label=type:container_runtime_t + tty: true diff --git a/test/basetest/__init__.py b/test/basetest/__init__.py new file mode 100644 index 0000000..5f9b24d --- /dev/null +++ b/test/basetest/__init__.py @@ -0,0 +1,3 @@ +from .task import Task +from .testing import TestCase +from .timew import Timew diff --git a/test/basetest/exceptions.py b/test/basetest/exceptions.py new file mode 100644 index 0000000..796e236 --- /dev/null +++ b/test/basetest/exceptions.py @@ -0,0 +1,52 @@ +import signal + +sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items())) + if v.startswith('SIG') and not v.startswith('SIG_')) + + +class CommandError(Exception): + def __init__(self, cmd, code, out, err=None, msg=None): + DEFAULT = ("Command '{{0}}' was {signal}'ed. " + "SIGABRT usually means program timed out.\n") + if msg is None: + msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n" + if err is not None: + msg_suffix += ( + "\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n" + ) + + if code < 0: + self.msg = DEFAULT.format(signal=sig_names[abs(code)]) + else: + self.msg = ("Command '{0}' finished with unexpected exit " + "code '{1}'.\n") + + self.msg += msg_suffix + else: + self.msg = msg + + self.cmd = cmd + self.out = out + self.err = err + self.code = code + + def __str__(self): + return self.msg.format(self.cmd, self.code, self.out, self.err) + + +class HookError(Exception): + pass + + +class TimeoutWaitingFor(object): + def __init__(self, name): + self.name = name + + def __repr__(self): + return "*** Timeout reached while waiting for {0} ***".format( + self.name) + + +class StreamsAreMerged(object): + def __repr__(self): + return "*** Streams are merged, STDERR is not available ***" diff --git a/test/basetest/hooks.py b/test/basetest/hooks.py new file mode 100644 index 0000000..83b46e3 --- /dev/null +++ b/test/basetest/hooks.py @@ -0,0 +1,517 @@ +# -*- coding: utf-8 -*- + +from __future__ import division + +import errno +import os +import shutil +import stat +from sys import stderr + +try: + import simplejson as json +except ImportError: + import json + +from datetime import datetime +from .utils import DEFAULT_HOOK_PATH +from .exceptions import HookError + + +class InvalidJSON(object): + """Object representing the original unparsed JSON string and the JSON error + """ + def __init__(self, original, error): + self.original = original + self.error = error + + +def json_decoder(string): + """Attempt to decode a JSON string and in case of error return an + InvalidJSON object + """ + decoder = json.JSONDecoder().decode + + try: + return decoder(string) + except ValueError as e: + return InvalidJSON(string, str(e)) + + +class Hooks(object): + """ + Abstraction to help interact with hooks (add, remove) during tests and + keep track of which are active. + """ + def __init__(self, datadir): + """ + Initialize hooks container which keeps track of active hooks and + + :arg datadir: Temporary location where task is running (/tmp/...) + """ + self.hookdir = os.path.join(datadir, "hooks") + self._hooks = {} + + # Check if the hooks dir already exists + if not os.path.isdir(self.hookdir): + os.makedirs(self.hookdir) + + def __repr__(self): + enabled = [] + disabled = [] + + for hook in self: + if self[hook].is_active(): + enabled.append(hook) + else: + disabled.append(hook) + + enabled = ", ".join(enabled) or None + disabled = ", ".join(disabled) or None + + return "".format(enabled, + disabled) + + def __getitem__(self, name): + return self._hooks[name] + + def __setitem__(self, key, value): + self._hooks[key] = value + + def __delitem__(self, key): + del self._hooks[key] + + def __iter__(self): + for item in self._hooks: + yield item + + def __len__(self): + return len(self._hooks) + + def add(self, hookname, content, log=False): + """ + Register hook with name 'hookname' and given file content. + + :arg hookname: Should be a string starting with one of: + - on-launch + - on-add + - on-exit + - on-modify + + :arg content: Content of the file as a (multi-line) string + :arg log: If we require checking input/output of the hook + """ + if log: + self[hookname] = LoggedHook(hookname, self.hookdir, content) + else: + try: + self[hookname] = Hook(hookname, self.hookdir, content) + except HookError: + return + + self[hookname].enable() + + def add_default(self, hookname, log=False): + """ + Register a pre-built hook that exists in the folder containing hooks used for testing. + If not explicitly passed hooks folder defaults to DEFAULT_HOOK_PATH + + :arg hookname: Name of the default hook + :arg log: If we require checking input/output of the hook + """ + if log: + self[hookname] = LoggedHook(hookname, self.hookdir, default=True) + else: + self[hookname] = Hook(hookname, self.hookdir, default=True) + + # Finally enable this hook + self[hookname].enable() + + def remove(self, hook): + """Remove the hook matching given hookname""" + try: + hookname = hook.hookname + except AttributeError: + hookname = hook + + hook = self[hookname] + + try: + del self[hookname] + except KeyError: + raise HookError("Hook {0} is not on record".format(hookname)) + + hook._delete() + + def clear(self): + """Remove all existing hooks and empty the hook registry""" + self._hooks = {} + + # Remove any existing hooks + try: + shutil.rmtree(self.hookdir) + except OSError as e: + # If the hookdir folder doesn't exist, no harm done and keep going + if e.errno != errno.ENOENT: + raise + + os.mkdir(self.hookdir) + + +class Hook(object): + """ + Represents a hook script and provides methods to enable/disable hooks + """ + + def __init__(self, hookname, hookdir, content=None, default=False, + default_hookpath=None): + """ + Initialize and create the hook + + This class supports creating hooks in two ways: + * by specifying default=True in which case hookname will be + searched on the hookpath and linked to the destination + * by specifying content="some text" in which case the hook will be + created with given content + + :arg hookname: Name of the hook e.g.: on-add.foobar + :arg hookdir: Hooks directory under temporary task/ folder + :arg content: What should be written to the hookfile + :arg default: If True hookname is looked up on default_hookpath + :arg default_hookpath: Default location where to look for preset hooks + """ + self.hookname = hookname + self.hookdir = hookdir + self.hookfile = os.path.join(self.hookdir, self.hookname) + + if default_hookpath is None: + self.default_hookpath = DEFAULT_HOOK_PATH + else: + self.default_hookpath = default_hookpath + + self._check_hook_type() + self._check_hook_not_exists(self.hookfile) + + if not default and content is None: + raise HookError("Cannot create hookfile {0} without content. " + "If using a builtin hook pass default=True" + .format(self.hookname)) + + if os.path.isfile(self.hookfile): + raise HookError("Hook with name {0} already exists. " + "Did you forget to remove() it before recreating?" + .format(self.hookname)) + + if default: + self.default_hookfile = os.path.join(self.default_hookpath, + self.hookname) + self._check_hook_exists(self.default_hookfile) + # Symlinks change permission of source file, cannot use one here + shutil.copy(self.default_hookfile, self.hookfile) + else: + self.default_hookfile = None + with open(self.hookfile, 'w') as fh: + fh.write(content) + + def __eq__(self, other): + try: + if self.hookname == other.hookname: + return True + except AttributeError: + pass + + return False + + def __ne__(self, other): + try: + if self.hookname != other.hookname: + return True + except AttributeError: + pass + + return False + + def __hash__(self): + return self.hookname.__hash__() + + def __repr__(self): + return "".format(self.hookname) + + def __str__(self): + return self.hookname + + def _check_hook_exists(self, hookfile): + """Checks if the file pointed to by the current hook exists""" + if not os.path.isfile(hookfile) and not os.path.islink(hookfile): + raise HookError("Hook {0} doesn't exist.".format(hookfile)) + + def _check_hook_not_exists(self, hookfile): + """Checks if the file pointed to by the current hook doesn't exist""" + try: + self._check_hook_exists(hookfile) + except HookError: + return + else: + raise HookError("Hook {0} already exists.".format(hookfile)) + + def _check_hook_type(self): + """ + Check if the hookname is valid and if another hook with the same + name was already created. + """ + for hooktype in ("on-launch", "on-add", "on-exit", "on-modify"): + if self.hookname.startswith(hooktype): + break + else: + stderr.write("WARNING: {0} is not a valid hook type. " + "It will not be triggered\n".format(self.hookname)) + + def _remove_file(self, file): + try: + os.remove(file) + except OSError as e: + if e.errno == errno.ENOENT: + raise HookError("Hook with name {0} was not found on " + "hooks/ folder".format(file)) + else: + raise + + def _delete(self): + """ + Remove the hook from disk + + Don't call this method directly. Use Hooks.remove(hook) instead + """ + self._remove_hookfile(self.hookfile) + + def enable(self): + """Make hookfile executable to allow triggering""" + os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + + def disable(self): + """Remove hookfile executable bit to deny triggering""" + os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE) + + def is_active(self): + """Check if hook is active by verifying the execute bit""" + return os.access(self.hookfile, os.X_OK) + + +class LoggedHook(Hook): + """ + A variant of a Hook that allows checking that the hook was called, + what was received via STDIN and what was answered to STDOUT + """ + def __init__(self, *args, **kwargs): + super(LoggedHook, self).__init__(*args, **kwargs) + + # The wrapper will replace the hookfile + # The original file will be 'wrappedfile' + + # NOTE If the prefix "original_" is changed here, update wrapper.sh + self.wrappedname = "original_" + self.hookname + self.wrappedfile = os.path.join(self.hookdir, self.wrappedname) + + self.original_wrapper = os.path.join(self.default_hookpath, + "wrapper.sh") + + self.hooklog_in = self.wrappedfile + ".log.in" + self.hooklog_out = self.wrappedfile + ".log.out" + + # Cache is used to avoid parsing the logfiles everytime it's needed + self._cache = {} + + # Setup wrapper pointing to the correct hook name + self._setup_wrapper() + + def __repr__(self): + return "".format(self.hookname) + + def _delete(self): + """ + Remove the hook from disk + + Don't call this method directly. Use Task.hooks.remove(hook) instead + """ + super(LoggedHook, self)._delete() + self._remove_file(self.wrappedfile) + self._remove_file(self.hooklog_in) + self._remove_file(self.hooklog_out) + + def _setup_wrapper(self): + """Setup wrapper shell script to allow capturing input/output of hook""" + # Create empty hooklog to allow checking that hook executed + open(self.hooklog_in, 'w').close() + open(self.hooklog_out, 'w').close() + + # Rename the original hook to the name that will be used by wrapper + self._check_hook_not_exists(self.wrappedfile) + os.rename(self.hookfile, self.wrappedfile) + + # Symlinks change permission of source file, cannot use one here + shutil.copy(self.original_wrapper, self.hookfile) + + def _get_log_stat(self): + """Return the most recent change timestamp and size of both logfiles""" + stdin = os.stat(self.hooklog_in) + stdout = os.stat(self.hooklog_out) + + last_change = max((stdin.st_mtime, stdout.st_mtime)) + return last_change, stdin.st_size, stdout.st_size + + def _use_cache(self): + """Check if log files were changed since last check""" + try: + last_change = self._cache["last_change"] + except KeyError: + # No cache available + return False + else: + change = self._get_log_stat() + + if last_change != change: + # Cache is outdated + return False + else: + # Cache is up to date + return True + + def enable(self): + """Make hookfile executable to allow triggering""" + super(LoggedHook, self).enable() + os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + + def disable(self): + """Remove hookfile executable bit to deny triggering""" + super(LoggedHook, self).disable() + os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE) + + def is_active(self): + """Check if hook is active by verifying the execute bit""" + parent_is_active = super(LoggedHook, self).disable() + return parent_is_active and os.access(self.wrappedfile, os.X_OK) + + def get_logs(self): + """ + Parse the logs generated by the hook and return a dictionary + containing the logs collected with the wrapper in a python friendly format: + * JSON is parsed as python dictionaries + * timestamps are parsed as datetime objects + + It should look something like this: + + ## STDIN file + % Called at 1414874711 with 'arg1 arg2 ...' + {... JSON received by the hook ... } + {... more JSON ...} + + ## STDOUT file + {... JSON emitted by the hook ... } + Logged messages + {... more JSON ...} + ! Exit code: 1 + """ + if self._use_cache(): + return self._cache["log"] + + log = {"calls": [], + "input": { + "json": [], + }, + "output": { + "json": [], + "msgs": [], + }, + "exitcode": None, + } + + with open(self.hooklog_in) as fh: + for i, line in enumerate(fh): + line = line.rstrip("\n") + if line.startswith("%"): + tstamp, args = line.split(" with ") + # Timestamp includes nanosecond resolution + timestamp = tstamp.split(" ")[-1] + # convert timestamp to python datetime object + log["calls"].append({ + "timestamp": datetime.fromtimestamp(float(timestamp)), + "args": args, + }) + elif line.startswith("{"): + # Decode json input (to hook) + log["input"]["json"].append(json_decoder(line)) + else: + raise IOError("Unexpected content on STDIN line {0}: {1}" + .format(i, line)) + + with open(self.hooklog_out) as fh: + for line in fh: + line = line.rstrip("\n") + if line.startswith("!"): + exitcode = int(line.split(" ")[-1]) + log["exitcode"] = exitcode + elif line.startswith("{"): + # Decode json output (from hook) + log["output"]["json"].append(json_decoder(line)) + else: + log["output"]["msgs"].append(line) + + # NOTE convert all lists to tuples to prevent tampering? + + self._cache["log"] = log + + # Update last modification timestamp in cache + self._cache["last_change"] = self._get_log_stat() + + return self._cache["log"] + + def assertTriggeredCount(self, count): + """ + Check if current hook file was triggered/used by taskwarrior and + how many times. + """ + log = self.get_logs() + + assert len(log["calls"]) == count, ("{0} calls expected for {1} but " + "found {2}".format( + count, + self.hookname, + log["calls"] + )) + + def assertExitcode(self, exitcode): + """Check if current hook finished with the expected exit code""" + log = self.get_logs() + + assert log["exitcode"] == exitcode, ("Expected exit code {0} for {1} " + "but found {2}".format( + exitcode, + self.hookname, + log["exitcode"] + )) + + def assertValidJSONOutput(self): + """Check if current hook output is valid JSON in all expected replies""" + log = self.get_logs() + + for i, out in enumerate(log["output"]["json"]): + assert not isinstance(out, InvalidJSON), ("Invalid JSON found at " + "reply number {0} with " + "content {1}".format( + i + 1, + out.original + )) + + def assertInvalidJSONOutput(self): + """Check if current hook output is invalid JSON in any expected reply""" + log = self.get_logs() + + for i, out in enumerate(log["output"]["json"]): + assert isinstance(out, InvalidJSON), ("Valid JSON found at reply " + "number {0} with content " + "{1}".format( + i + 1, + out.original + )) + +# vim: ai sts=4 et sw=4 diff --git a/test/basetest/meta.py b/test/basetest/meta.py new file mode 100644 index 0000000..51f74f2 --- /dev/null +++ b/test/basetest/meta.py @@ -0,0 +1,36 @@ +from __future__ import print_function, division + + +class MetaTest(type): + """Helper metaclass to simplify dynamic test creation + + Creates test_methods in the TestCase class dynamically named after the + arguments used. + """ + @staticmethod + def make_function(classname, *args, **kwargs): + def test(self): + # ### Body of the usual test_testcase ### # + # Override and redefine this method # + pass + + # Title of test in report + test.__doc__ = "{0}".format(args[0]) + + return test + + def __new__(meta, classname, bases, dct): + tests = dct.get("TESTS") + kwargs = dct.get("EXTRA", {}) + + for i, args in enumerate(tests): + func = meta.make_function(classname, *args, **kwargs) + + # Rename the function after a unique identifier + # Name of function must start with test_ to be ran by unittest + func.__name__ = "test_{0}".format(i) + + # Attach the new test to the testclass + dct[func.__name__] = func + + return super(MetaTest, meta).__new__(meta, classname, bases, dct) diff --git a/test/basetest/task.py b/test/basetest/task.py new file mode 100644 index 0000000..21ff792 --- /dev/null +++ b/test/basetest/task.py @@ -0,0 +1,335 @@ +# -*- coding: utf-8 -*- + +import atexit +import errno +import json +import os +import shlex +import shutil +import tempfile +import unittest + +from .exceptions import CommandError +from .hooks import Hooks +from .utils import run_cmd_wait, run_cmd_wait_nofail, which, task_binary_location + + +class Task(object): + """ + Manage a task warrior instance + + A temporary folder is used as data store of task warrior. + This class can be instantiated multiple times if multiple taskw clients are + needed. + + This class can be given a Taskd instance for simplified configuration. + + A taskw client should not be used after being destroyed. + """ + DEFAULT_TASK = task_binary_location() + + def __init__(self, taskw=DEFAULT_TASK, datadir=tempfile.mkdtemp(prefix="task_"), taskrc=None): + """ + Initialize a Task warrior (client) that can interact with a taskd server. + The task client runs in a temporary folder. + + :arg taskw: Task binary to use as client (defaults: task in PATH) + """ + self.taskw = taskw + + # Used to specify what command to launch (and to inject faketime) + self._command = [self.taskw] + + # Configuration of the isolated environment + self._original_pwd = os.getcwd() + self.datadir = datadir + self.taskrc = os.path.join(self.datadir, "test.rc") if taskrc is None else taskrc + + # Ensure any instance is properly destroyed at session end + atexit.register(lambda: self.destroy()) + + self.reset_env() + + with open(self.taskrc, 'w') as rc: + rc.write("data.location={0}\n" + "hooks=off\n" + "".format(self.datadir)) + + # Hooks disabled until requested + self.hooks = Hooks(self.datadir) + + def __repr__(self): + txt = super(Task, self).__repr__() + return "{0} running from {1}>".format(txt[:-1], self.datadir) + + def __call__(self, *args, **kwargs): + """aka t = Task() ; t() which is now an alias to t.runSuccess()""" + return self.runSuccess(*args, **kwargs) + + def reset(self, keep_config=False, keep_hooks=False): + """Reset this instance to its maiden state""" + self._purge_folder(self.datadir) + + if keep_hooks is False and self.hooks is not None: + self.hooks.clear() + + if keep_config is False: + open(self.taskrc, 'w').close() + pass + + def activate_hooks(self): + """Enable self.hooks functionality and activate hooks on config""" + self.config("hooks", "on") + + def deactivate_hooks(self): + """Enable self.hooks functionality and activate hooks on config""" + self.config("hooks", "off") + + def reset_env(self): + """Set a new environment derived from the one used to launch the test""" + # Copy all env variables to avoid clashing subprocess environments + self.env = os.environ.copy() + + # Make sure no TASKDDATA is isolated + self.env["TASKDATA"] = self.datadir + # As well as TASKRC + self.env["TASKRC"] = self.taskrc + + def config(self, var, value): + """Run setup `var` as `value` in task config""" + # Add -- to avoid misinterpretation of - in things like UUIDs + cmd = (self.taskw, "rc.confirmation=off", "config", "--", var, value) + return run_cmd_wait(cmd, env=self.env) + + def del_config(self, var): + """Remove `var` from taskd config""" + cmd = (self.taskw, "config", var) + return run_cmd_wait(cmd, env=self.env) + + @property + def taskrc_content(self): + """Returns the contents of the taskrc file.""" + + with open(self.taskrc, "r") as f: + return f.readlines() + + def export(self, export_filter=None): + """Run "task export", return JSON array of exported tasks.""" + if export_filter is None: + export_filter = "" + + code, out, err = self.runSuccess("rc.json.array=1 {0} export" + "".format(export_filter)) + + return json.loads(out) + + def export_one(self, export_filter=None): + """ + Return a dictionary representing the exported task. + Will fail if multiple tasks match the filter. + """ + + result = self.export(export_filter=export_filter) + + if len(result) != 1: + descriptions = [task.get('description') or '[description-missing]' + for task in result] + + raise ValueError( + "One task should match the '{0}' filter, '{1}' " + "matches:\n {2}".format( + export_filter or '', + len(result), + '\n '.join(descriptions) + )) + + return result[0] + + @property + def latest(self): + return self.export_one("+LATEST") + + @staticmethod + def _split_string_args_if_string(args): + """ + Helper function to parse and split into arguments a single string argument. + The string is literally the same as if written in the shell. + """ + # Enable nicer-looking calls by allowing plain strings + if isinstance(args, str): + args = shlex.split(args) + + return args + + def runSuccess(self, args="", input=None, merge_streams=False, + timeout=5): + """ + Invoke task with given arguments and fail if exit code != 0 + + Use runError if you want exit_code to be tested automatically and + *not* fail if program finishes abnormally. + + If you wish to pass instructions to task such as confirmations or other + input via stdin, you can do so by providing a input string. + Such as input="y\ny\n". + + If merge_streams=True stdout and stderr will be merged into stdout. + + timeout = number of seconds the test will wait for every task call. + Defaults to 1 second if not specified. Unit is seconds. + + Returns (exit_code, stdout, stderr) if merge_streams=False + (exit_code, output) if merge_streams=True + """ + # Create a copy of the command + command = self._command[:] + + args = self._split_string_args_if_string(args) + command.extend(args) + + output = run_cmd_wait_nofail(command, input, + merge_streams=merge_streams, + env=self.env, + timeout=timeout) + + if output[0] != 0: + raise CommandError(command, *output) + + return output + + def runError(self, args=(), input=None, merge_streams=False, timeout=5): + """ + Invoke task with given arguments and fail if exit code == 0 + + Use runSuccess if you want exit_code to be tested automatically and + *fail* if program finishes abnormally. + + If you wish to pass instructions to task such as confirmations or other + input via stdin, you can do so by providing a input string. + Such as input="y\ny\n". + + If merge_streams=True stdout and stderr will be merged into stdout. + + timeout = number of seconds the test will wait for every task call. + Defaults to 1 second if not specified. Unit is seconds. + + Returns (exit_code, stdout, stderr) if merge_streams=False + (exit_code, output) if merge_streams=True + """ + # Create a copy of the command + command = self._command[:] + + args = self._split_string_args_if_string(args) + command.extend(args) + + output = run_cmd_wait_nofail(command, input, + merge_streams=merge_streams, + env=self.env, + timeout=timeout) + + # output[0] is the exit code + if output[0] == 0 or output[0] is None: + raise CommandError(command, *output) + + return output + + def destroy(self): + """Cleanup the data folder and release server port for other instances""" + + try: + shutil.rmtree(self.datadir) + except OSError as e: + if e.errno == errno.ENOENT: + # Directory no longer exists + pass + else: + raise + + # Prevent future reuse of this instance + self.runSuccess = self.__destroyed + self.runError = self.__destroyed + + # self.destroy will get called when the python session closes. + # If self.destroy was already called, turn the action into a noop + self.destroy = lambda: None + + def __destroyed(self, *args, **kwargs): + raise AttributeError("Task instance has been destroyed. " + "Create a new instance if you need a new client.") + + def diag(self, merge_streams_with=None): + """ + Run task diagnostics. + + This function may fail in which case the exception text is returned as + stderr or appended to stderr if merge_streams_with is set. + + If set, merge_streams_with should have the format: + (exitcode, out, err) + which should be the output of any previous process that failed. + """ + try: + output = self.runSuccess("diag") + except CommandError as e: + # If task diag failed add the error to stderr + output = (e.code, None, str(e)) + + if merge_streams_with is None: + return output + else: + # Merge any given stdout and stderr with that of "task diag" + code, out, err = merge_streams_with + dcode, dout, derr = output + + # Merge stdout + newout = "\n##### Debugging information (task diag): #####\n{0}" + if dout is None: + newout = newout.format("Not available, check STDERR") + else: + newout = newout.format(dout) + + if out is not None: + newout = out + newout + + # And merge stderr + newerr = "\n##### Debugging information (task diag): #####\n{0}" + if derr is None: + newerr = newerr.format("Not available, check STDOUT") + else: + newerr = newerr.format(derr) + + if err is not None: + newerr = err + derr + + return code, newout, newerr + + def faketime(self, faketime=None): + """ + Set a faketime using libfaketime that will affect the following command calls. + + If faketime is None, faketime settings will be disabled. + """ + cmd = which("faketime") + if cmd is None: + raise unittest.SkipTest("libfaketime/faketime is not installed") + + if self._command[0] == cmd: + self._command = self._command[3:] + + if faketime is not None: + # Use advanced time format + self._command = [cmd, "-f", faketime] + self._command + + @staticmethod + def _purge_folder(folder): + if not os.path.exists(folder): + return + + for filename in os.listdir(folder): + file_path = os.path.join(folder, filename) + try: + print("Emptying {}".format(file_path)) + if os.path.isfile(file_path) or os.path.islink(file_path): + open(file_path, 'w').close() + except Exception as e: + print("Failed to delete {}. Reason: {}".format(file_path, e)) diff --git a/test/basetest/testing.py b/test/basetest/testing.py new file mode 100644 index 0000000..162986e --- /dev/null +++ b/test/basetest/testing.py @@ -0,0 +1,117 @@ +import datetime +import sys +import unittest + + +class BaseTestCase(unittest.TestCase): + def tap(self, out): + sys.stderr.write("--- tap output start ---\n") + for line in out.splitlines(): + sys.stderr.write(line + '\n') + sys.stderr.write("--- tap output end ---\n") + + +class TestCase(BaseTestCase): + def assertOpenInterval(self, interval, + expectedId=None, + expectedStart=None, + expectedTags=None, + expectedAnnotation=None, + description=None): + description = f"interval {interval}" if description is None else description + self.assertKeyExists(interval, "start", description, "{} does not contain a start date") + self.assertKeyNotExists(interval, "end", description, "{} does contain an end date") + + return self.assertInterval(interval, + expectedId=expectedId, + expectedStart=expectedStart, + expectedEnd=None, + expectedTags=expectedTags, + expectedAnnotation=expectedAnnotation, + description=description) + + def assertClosedInterval(self, interval, + expectedId=None, + expectedStart=None, + expectedEnd=None, + expectedTags=None, + expectedAnnotation=None, + description=None): + description = f"interval {interval}" if description is None else description + self.assertKeyExists(interval, "start", description, "{} does not contain a start date") + self.assertKeyExists(interval, "end", description, "{} does not contain an end date") + + return self.assertInterval(interval, + expectedId=expectedId, + expectedStart=expectedStart, + expectedEnd=expectedEnd, + expectedTags=expectedTags, + expectedAnnotation=expectedAnnotation, + description=description) + + def assertInterval(self, interval, + expectedId=None, + expectedStart=None, + expectedEnd=None, + expectedTags=None, + expectedAnnotation=None, + description="interval"): + if expectedId is not None: + self.assertKeyExists(interval, "id", description, "{} does not contain an id") + self.assertIntervalValue(interval, + "id", + expectedId, + description, + "{} of {} do not match (expected: '{}', actual: '{}')") + + if expectedStart: + self.assertIntervalTimestamp(interval, "start", expectedStart, description) + + if expectedEnd: + self.assertIntervalTimestamp(interval, "end", expectedEnd, description) + + if expectedTags: + self.assertKeyExists(interval, "tags", description, "{} does not contain tags") + self.assertIntervalValue(interval, + "tags", + expectedTags, + description, + "{} of {} do not match (expected: '{}', actual: '{}')") + + if expectedAnnotation: + self.assertKeyExists(interval, "annotation", description, "{} is not annotated") + self.assertIntervalValue(interval, + "annotation", + expectedAnnotation, + description, + "{} of {} do not match (expected: '{}', actual: '{}')") + + def assertKeyExists(self, interval, key, description, message): + self.assertTrue(key in interval, message.format(description)) + + def assertKeyNotExists(self, interval, key, description, message): + self.assertFalse(key in interval, message.format(description)) + + def assertIntervalTimestamp(self, interval, key, expected, description): + if isinstance(expected, datetime.datetime): + expected = "{:%Y%m%dT%H%M%SZ}".format(expected) + + self.assertIntervalValue(interval, + key, + expected, + description, + "{} time of {} does not match (expected: '{}', actual: '{}')") + + def assertIntervalValue(self, interval, key, expected, description, message): + actual = interval[key] + + if isinstance(actual, list): + actual.sort() + expected.sort() + self.assertSequenceEqual(actual, + expected, + message.format(key, description, expected, actual)) + else: + self.assertEqual(actual, + expected, + message.format(key, description, expected, actual)) diff --git a/test/basetest/timew.py b/test/basetest/timew.py new file mode 100644 index 0000000..fe7c734 --- /dev/null +++ b/test/basetest/timew.py @@ -0,0 +1,294 @@ +import atexit +import datetime +import json +import os +import shlex +import shutil +import tempfile +import unittest + +from .exceptions import CommandError +from .utils import run_cmd_wait, run_cmd_wait_nofail, which, timew_binary_location, DEFAULT_EXTENSION_PATH + + +class Timew(object): + """ + Manage a Timewarrior instance + + A temporary folder is used as data store of timewarrior. + + A timew client should not be used after being destroyed. + """ + DEFAULT_TIMEW = timew_binary_location() + + def __init__(self, + timew=DEFAULT_TIMEW, + datadir=tempfile.mkdtemp(prefix="timew_"), + configdir=tempfile.mkdtemp(prefix="timew_")): + """ + Initialize a timewarrior (client). + The program runs in a temporary folder. + + :arg timew: Timewarrior binary to use as client (defaults: timew in PATH) + """ + self.timew = timew + + # Used to specify what command to launch (and to inject faketime) + self._command = [self.timew] + + # Configuration of the isolated environment + self._original_pwd = os.getcwd() + self.datadir = datadir + self.configDir = configdir + self.timewrc = os.path.join(self.configDir, 'timewarrior.cfg') + self.extdir = os.path.join(self.datadir, 'extensions') + + # Ensure any instance is properly destroyed at session end + atexit.register(lambda: self.destroy()) + + self.reset_env() + + def reset(self, keep_config=False, keep_extensions=False): + """Reset this instance to its maiden state""" + + self._purge_folder(self.datadir) + + if keep_extensions is False: + self._purge_folder(self.extdir) + + if keep_config is False: + open(self.timewrc, 'w').close() + + def add_default_extension(self, filename): + """Add default extension to current instance""" + if not os.path.isdir(self.extdir): + os.mkdir(self.extdir) + + extfile = os.path.join(self.extdir, filename) + if os.path.isfile(extfile): + raise "{} already exists".format(extfile) + + shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile) + + def __repr__(self): + txt = super(Timew, self).__repr__() + return "{0} running from {1}>".format(txt[:-1], self.datadir) + + def __call__(self, *args, **kwargs): + """aka t = Timew() ; t() which is now an alias to t.runSuccess()""" + return self.runSuccess(*args, **kwargs) + + def reset_env(self): + """Set a new environment derived from the one used to launch the test""" + # Copy all env variables to avoid clashing subprocess environments + self.env = os.environ.copy() + + # As well as TIMEWARRIORDB + self.env["TIMEWARRIORDB"] = self.datadir + + # As well as MANPATH, so that the help tests can find the + # uninstalled man pages + parts = self.timew.split(os.path.sep)[0:-2] + parts.append("doc") + self.env["MANPATH"] = os.path.sep.join(parts) + + def config(self, var, value): + """Run setup `var` as `value` in timew config""" + cmd = (self.timew, ":yes", "config", var, value) + return run_cmd_wait(cmd, env=self.env) + + @staticmethod + def _create_exclusion_interval(interval): + if not isinstance(interval, tuple): + raise TypeError("Please specify interval(s) as a tuple or a list of tuples") + + if interval[0] is not None and not isinstance(interval[0], datetime.time): + raise TypeError("Start date must be a datetime.time but is {}".format(type(interval[0]))) + + if interval[1] is not None and not isinstance(interval[1], datetime.time): + raise TypeError("End date must be a datetime.time but is {}".format(type(interval[1]))) + + if interval[0] is None: + return "<{:%H:%M:%S}".format(interval[1]) + + if interval[1] is None: + return ">{:%H:%M:%S}".format(interval[0]) + + if interval[0] > interval[1]: + return "<{:%H:%M:%S} >{:%H:%M:%S}".format(interval[1], interval[0]) + + return "{:%H:%M:%S}-{:%H:%M:%S}".format(interval[0], interval[1]) + + def configure_exclusions(self, intervals): + if isinstance(intervals, list): + exclusion = " ".join([self._create_exclusion_interval(interval) for interval in intervals]) + + else: + exclusion = self._create_exclusion_interval(intervals) + + self.config("exclusions.monday", exclusion) + self.config("exclusions.tuesday", exclusion) + self.config("exclusions.wednesday", exclusion) + self.config("exclusions.thursday", exclusion) + self.config("exclusions.friday", exclusion) + self.config("exclusions.saturday", exclusion) + self.config("exclusions.sunday", exclusion) + + def del_config(self, var): + """Remove `var` from timew config""" + cmd = (self.timew, ":yes", "config", var) + return run_cmd_wait(cmd, env=self.env) + + @property + def timewrc_content(self): + """Returns the contents of the timewrc file.""" + + with open(self.timewrc, "r") as f: + return f.readlines() + + def export(self, export_filter=None): + """Run "timew export", return JSON array of exported intervals.""" + if export_filter is None: + export_filter = "" + + code, out, err = self.runSuccess("{0} export".format(export_filter)) + + return json.loads(out) + + @staticmethod + def _split_string_args_if_string(args): + """ + Helper function to parse and split into arguments a single string argument. + The string is literally the same as if written in the shell. + """ + # Enable nicer-looking calls by allowing plain strings + if isinstance(args, str): + args = shlex.split(args) + + return args + + def runSuccess(self, args="", input=None, merge_streams=False, timeout=5): + """ + Invoke timew with given arguments and fail if exit code != 0 + + Use runError if you want exit_code to be tested automatically and + *not* fail if program finishes abnormally. + + If you wish to pass instructions to timew such as confirmations or other + input via stdin, you can do so by providing a input string. + Such as input="y\ny\n". + + If merge_streams=True stdout and stderr will be merged into stdout. + + timeout = number of seconds the test will wait for every timew call. + Defaults to 1 second if not specified. Unit is seconds. + + Returns (exit_code, stdout, stderr) if merge_streams=False + (exit_code, output) if merge_streams=True + """ + # Create a copy of the command + command = self._command[:] + + args = self._split_string_args_if_string(args) + command.extend(args) + + output = run_cmd_wait_nofail(command, input, + merge_streams=merge_streams, + env=self.env, + timeout=timeout) + + if output[0] != 0: + raise CommandError(command, *output) + + return output + + def runError(self, args=(), input=None, merge_streams=False, timeout=5): + """ + Invoke timew with given arguments and fail if exit code == 0 + + Use runSuccess if you want exit_code to be tested automatically and + *fail* if program finishes abnormally. + + If you wish to pass instructions to timew such as confirmations or other + input via stdin, you can do so by providing a input string. + Such as input="y\ny\n". + + If merge_streams=True stdout and stderr will be merged into stdout. + + timeout = number of seconds the test will wait for every timew call. + Defaults to 1 second if not specified. Unit is seconds. + + Returns (exit_code, stdout, stderr) if merge_streams=False + (exit_code, output) if merge_streams=True + """ + # Create a copy of the command + command = self._command[:] + + args = self._split_string_args_if_string(args) + command.extend(args) + + output = run_cmd_wait_nofail(command, input, + merge_streams=merge_streams, + env=self.env, + timeout=timeout) + + # output[0] is the exit code + if output[0] == 0 or output[0] is None: + raise CommandError(command, *output) + + return output + + def destroy(self): + """Cleanup the data folder and release server port for other instances""" + try: + shutil.rmtree(self.datadir) + except OSError as e: + if e.errno == 2: + # Directory no longer exists + pass + else: + raise + + # Prevent future reuse of this instance + self.runSuccess = self.__destroyed + self.runError = self.__destroyed + + # self.destroy will get called when the python session closes. + # If self.destroy was already called, turn the action into a noop + self.destroy = lambda: None + + def __destroyed(self, *args, **kwargs): + raise AttributeError("Program instance has been destroyed. " + "Create a new instance if you need a new client.") + + def faketime(self, faketime=None): + """ + Set a faketime using libfaketime that will affect the following command calls. + + If faketime is None, faketime settings will be disabled. + """ + cmd = which("faketime") + if cmd is None: + raise unittest.SkipTest("libfaketime/faketime is not installed") + + if self._command[0] == cmd: + self._command = self._command[3:] + + if faketime is not None: + # Use advanced time format + self._command = [cmd, "-f", faketime] + self._command + + @staticmethod + def _purge_folder(folder): + if not os.path.exists(folder): + return + + for filename in os.listdir(folder): + file_path = os.path.join(folder, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + print("Failed to delete {}. Reason: {}".format(file_path, e)) diff --git a/test/basetest/utils.py b/test/basetest/utils.py new file mode 100644 index 0000000..b1f6928 --- /dev/null +++ b/test/basetest/utils.py @@ -0,0 +1,469 @@ +from __future__ import division + +import atexit +import functools +import json +import os +import signal +import socket +import sys +import tempfile +from queue import Queue, Empty +from subprocess import Popen, PIPE, STDOUT +from threading import Thread +from time import sleep + +from .exceptions import CommandError, TimeoutWaitingFor + +USED_PORTS = set() +ON_POSIX = 'posix' in sys.builtin_module_names + +# Directory relative to basetest module location +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Location of binary files (usually the src/ folder) +BIN_PREFIX = os.path.abspath( + os.path.join("usr", "local", "bin") +) + +# Default location of test certificates +DEFAULT_CERT_PATH = os.path.abspath( + os.path.join(CURRENT_DIR, "..", "test_certs") +) + +# Default location of test hooks +DEFAULT_HOOK_PATH = os.path.abspath( + os.path.join(CURRENT_DIR, "..", "test_hooks") +) + +# Default location of test extensions +DEFAULT_EXTENSION_PATH = os.path.abspath( + os.path.join(CURRENT_DIR, "..", "test_extensions") +) + + +# Environment flags to control skipping of timew, task, and taskd tests +TASKW_SKIP = os.environ.get("TASKW_SKIP", False) +TIMEW_SKIP = os.environ.get("TIMEW_SKIP", False) + +# Environment flags to control use of PATH or in-tree binaries +TASK_USE_PATH = os.environ.get("TASK_USE_PATH", False) +TIMEW_USE_PATH = os.environ.get("TIMEW_USE_PATH", False) + + +UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}") + + +def task_binary_location(cmd="task"): + """If TASK_USE_PATH is set rely on PATH to look for task binaries. + Otherwise ../src/ is used by default. + """ + return binary_location(cmd, TASK_USE_PATH) + + +def timew_binary_location(cmd="timew"): + """ ../src/ is used by default.""" + return binary_location(cmd, TIMEW_USE_PATH) + + +def binary_location(cmd, use_path=False): + """ + If USE_PATH is True rely on PATH to look for taskd binaries. + Otherwise ../src/ is used by default. + """ + if use_path: + return cmd + else: + return os.path.join(BIN_PREFIX, cmd) + + +def wait_condition(cond, timeout=1, sleeptime=.01): + """Wait for condition to return anything other than None""" + # NOTE Increasing sleeptime can dramatically increase testsuite runtime + # It also reduces CPU load significantly + if timeout is None: + timeout = 1 + + if timeout < sleeptime: + print("Warning, timeout cannot be smaller than", sleeptime) + timeout = sleeptime + + # Max number of attempts until giving up + tries = int(timeout / sleeptime) + + for i in range(tries): + val = cond() + + if val is not None: + break + + sleep(sleeptime) + + return val + + +def wait_process(pid, timeout=None): + """Wait for process to finish""" + def process(): + try: + os.kill(pid, 0) + except OSError: + # Process is dead + return True + else: + # Process is still ticking + return None + + return wait_condition(process, timeout) + + +def _queue_output(arguments, pidq, outputq): + """ + Read/Write output/input of given process. + This function is meant to be executed in a thread as it may block + """ + kwargs = arguments["process"] + input = arguments["input"] + + try: + proc = Popen(**kwargs) + except OSError as e: + # pid None is read by the main thread as a crash of the process + pidq.put(None) + + outputq.put(( + "", + ("Unexpected exception caught during execution: '{0}' . ".format(e)), + 255)) # false exitcode + + return + + # Put the PID in the queue for main process to know. + pidq.put(proc.pid) + + # Send input and wait for finish + out, err = proc.communicate(input) + + if sys.version_info > (3,): + out, err = out.decode('utf-8'), err.decode('utf-8') + + # Give the output back to the caller + outputq.put((out, err, proc.returncode)) + + +def _retrieve_output(thread, timeout, queue, thread_error): + """Fetch output from binary subprocess queues""" + # Try to join the thread on failure abort + thread.join(timeout) + if thread.is_alive(): + # Join should have killed the thread. This is unexpected + raise TimeoutWaitingFor(thread_error + ". Unexpected error") + + # Thread died so we should have output + try: + # data = (stdout, stderr, exitcode) + data = queue.get(timeout=timeout) + except Empty: + data = TimeoutWaitingFor("streams from program") + + return data + + +def _get_output(arguments, timeout=None): + """ + Collect output from the subprocess without blocking the main process + if subprocess hangs. + """ + # NOTE Increase this value if tests fail with None being received as + # stdout/stderr instead of the expected content + output_timeout = 0.1 # seconds + + pidq = Queue() + outputq = Queue() + print(arguments) + t = Thread(target=_queue_output, args=(arguments, pidq, outputq)) + t.daemon = True + t.start() + + try: + pid = pidq.get(timeout=timeout) + except Empty: + pid = None + + # Process crashed or timed out for some reason + if pid is None: + return _retrieve_output(t, output_timeout, outputq, "Program to start") + + # Wait for process to finish (normal execution) + state = wait_process(pid, timeout) + + if state: + # Process finished + return _retrieve_output(t, output_timeout, outputq, "Program thread to join") + + # If we reach this point we assume the process got stuck or timed out + for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL): + # Start with lower signals and escalate if process ignores them + try: + os.kill(pid, signal.SIGABRT) + except OSError as e: + # 3 means the process finished/died between last check and now + if e.errno != 3: + raise + + # Wait for process to finish (should die/exit after signal) + state = wait_process(pid, timeout) + + if state: + # Process finished + return _retrieve_output(t, output_timeout, outputq, "Program to die") + + # This should never happen but in case something goes really bad + raise OSError("Program stopped responding and couldn't be killed") + + +def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE, + merge_streams=False, env=os.environ, timeout=None): + """Run a subprocess and wait for it to finish""" + + if input is None: + stdin = None + else: + stdin = PIPE + + if merge_streams: + stderr = STDOUT + else: + stderr = PIPE + + arguments = { + "process": { + "args": cmd, + "stdin": stdin, + "stdout": stdout, + "stderr": stderr, + "close_fds": ON_POSIX, + "env": env, + }, + "input": input, + } + out, err, exit = _get_output(arguments, timeout) + + if merge_streams: + if exit != 0: + raise CommandError(cmd, exit, out) + else: + return exit, out + else: + if exit != 0: + raise CommandError(cmd, exit, out, err) + else: + return exit, out, err + + +def run_cmd_wait_nofail(*args, **kwargs): + """Same as run_cmd_wait but silence the exception if it happens""" + try: + return run_cmd_wait(*args, **kwargs) + except CommandError as e: + return e.code, e.out, e.err + + +def get_IPs(hostname): + output = {} + addrs = socket.getaddrinfo(hostname, 0, 0, 0, socket.IPPROTO_TCP) + + for family, socktype, proto, canonname, sockaddr in addrs: + addr = sockaddr[0] + output[family] = addr + + return output + + +def port_used(addr="localhost", port=None): + "Return True if port is in use, False otherwise" + if port is None: + raise TypeError("Argument 'port' may not be None") + + # If we got an address name, resolve it both to IPv6 and IPv4. + IPs = get_IPs(addr) + + # Taskd seems to prefer IPv6 so we do it first + for family in (socket.AF_INET6, socket.AF_INET): + try: + addr = IPs[family] + except KeyError: + continue + + s = socket.socket(family, socket.SOCK_STREAM) + result = s.connect_ex((addr, port)) + s.close() + if result == 0: + # connection was successful + return True + else: + return False + + +def find_unused_port(addr="localhost", start=53589, track=True): + """Find an unused port starting at `start` port + + If track=False the returned port will not be marked as in-use and the code + will rely entirely on the ability to connect to addr:port as detection + mechanism. Note this may cause problems if ports are assigned but not used + immediately + """ + maxport = 65535 + unused = None + + for port in xrange(start, maxport): + if not port_used(addr, port): + if track and port in USED_PORTS: + continue + + unused = port + break + + if unused is None: + raise ValueError("No available port in the range {0}-{1}".format( + start, maxport)) + + if track: + USED_PORTS.add(unused) + + return unused + + +def release_port(port): + """Forget that given port was marked as'in-use + """ + try: + USED_PORTS.remove(port) + except KeyError: + pass + + +def memoize(obj): + """Keep an in-memory cache of function results given its inputs""" + cache = obj.cache = {} + + @functools.wraps(obj) + def memoizer(*args, **kwargs): + key = str(args) + str(kwargs) + if key not in cache: + cache[key] = obj(*args, **kwargs) + return cache[key] + return memoizer + + +try: + from shutil import which + which = memoize(which) +except ImportError: + # NOTE: This is shutil.which backported from python-3.3.3 + @memoize + def which(cmd, mode=os.F_OK | os.X_OK, path=None): + """Given a command, mode, and a PATH string, return the path which + conforms to the given mode on the PATH, or None if there is no such + file. + + `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result + of os.environ.get("PATH"), or can be overridden with a custom search + path. + + """ + # Check that a given file can be accessed with the correct mode. + # Additionally check that `file` is not a directory, as on Windows + # directories pass the os.access check. + def _access_check(fn, mode): + return (os.path.exists(fn) and os.access(fn, mode) and + not os.path.isdir(fn)) + + # If we're given a path with a directory part, look it up directly + # rather than referring to PATH directories. This includes checking + # relative to the current directory, e.g. ./script + if os.path.dirname(cmd): + if _access_check(cmd, mode): + return cmd + return None + + if path is None: + path = os.environ.get("PATH", os.defpath) + if not path: + return None + path = path.split(os.pathsep) + + if sys.platform == "win32": + # The current directory takes precedence on Windows. + if os.curdir not in path: + path.insert(0, os.curdir) + + # PATHEXT is necessary to check on Windows. + pathext = os.environ.get("PATHEXT", "").split(os.pathsep) + # See if the given file matches any of the expected path + # extensions. This will allow us to short circuit when given + # "python.exe". If it does match, only test that one, otherwise we + # have to try others. + if any(cmd.lower().endswith(ext.lower()) for ext in pathext): + files = [cmd] + else: + files = [cmd + ext for ext in pathext] + else: + # On other platforms you don't have things like PATHEXT to tell you + # what file suffixes are executable, so just pass on cmd as-is. + files = [cmd] + + seen = set() + for dir in path: + normdir = os.path.normcase(dir) + if normdir not in seen: + seen.add(normdir) + for thefile in files: + name = os.path.join(dir, thefile) + if _access_check(name, mode): + return name + return None + + +def parse_datafile(file): + """Parse .data files, treating files as JSON""" + data = [] + with open(file) as fh: + for line in fh: + line = line.rstrip("\n") + + # Turn [] strings into {} to be treated properly as JSON hashes + if line.startswith('[') and line.endswith(']'): + line = '{' + line[1:-1] + '}' + + if line.startswith("{"): + data.append(json.loads(line)) + else: + data.append(line) + return data + + +def mkstemp(data): + """Create a temporary file that is removed at process exit""" + def rmtemp(name): + try: + os.remove(name) + except OSError: + pass + + f = tempfile.NamedTemporaryFile(delete=False) + f.write(data) + f.close() + + # Ensure removal at end of python session + atexit.register(rmtemp, f.name) + + return f.name + + +def mkstemp_exec(data): + """Create a temporary executable file that is removed at process exit""" + name = mkstemp(data) + os.chmod(name, 0o755) + + return name diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile new file mode 100644 index 0000000..79e8907 --- /dev/null +++ b/test/docker/Dockerfile @@ -0,0 +1,62 @@ +############################################################################### +# +# Copyright 2023, Gothenburg Bit Factory +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# https://www.opensource.org/licenses/mit-license.php +# +############################################################################### + +ARG TEST_IMAGE + +FROM ${TEST_IMAGE} + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get update && yes | unminimize +RUN apt-get install -y python3 python3-venv +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 + +# Create virtual environment +WORKDIR / +RUN python -m venv venv && \ + /venv/bin/pip install --upgrade pip && \ + /venv/bin/pip install pytest simpletap + +# Store diagnostics +RUN ( \ + python --version ; \ + task diagnostics ; \ + timew diagnostics ; \ + /venv/bin/pip freeze ; \ + ) > diagnostics.txt 2>&1 + +# Copy on-modify-hook +ADD . /task-on-modify-hook +WORKDIR /task-on-modify-hook + +# Install on-modify-hook +RUN mkdir -p ~/.task/hooks +RUN cp on_modify.py ~/.task/hooks/on-modify.timewarrior +RUN chmod +x ~/.task/hooks/on-modify.timewarrior + +# Run tests +ENV TASK_USE_PATH=true TIMEW_USE_PATH=true +CMD [ "bash", "-c", "/venv/bin/pytest /task-on-modify-hook/test/test_on-modify_e2e.py"] diff --git a/test/test_on-modify.py b/test/test_on-modify.py deleted file mode 100755 index a6e967f..0000000 --- a/test/test_on-modify.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python3 - -############################################################################### -# -# Copyright 2019, Thomas Lauf, Paul Beckingham, Federico Hernandez. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# -# https://www.opensource.org/licenses/mit-license.php -# -############################################################################### - -import os -import subprocess -import sys - -from basetest import Timew, TestCase - -# Ensure python finds the local simpletap module -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - - -class TestOnModifyHookScript(TestCase): - def setUp(self): - current_dir = os.path.dirname(os.path.abspath(__file__)) - self.t = Timew() - - self.process = subprocess.Popen([os.path.join(current_dir, '../on_modify.py')], - env={ - 'PATH': '../src:' + os.environ['PATH'], - 'TIMEWARRIORDB': self.t.datadir - }, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - def test_hook_should_process_annotate(self): - """on-modify hook should process 'task annotate'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"3495a755-c4c6-4106-aabe-c0d3d128b65a"} -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"3495a755-c4c6-4106-aabe-c0d3d128b65a","annotations":[{"entry":"20190820T201911Z","description":"Annotation"}]} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="Annotation") - - def test_hook_should_process_append(self): - """on-modify hook should process 'task append'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"da603270-ce2b-4a5a-9273-c67c2d2d0067"} -{"description":"Foo Bar","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"da603270-ce2b-4a5a-9273-c67c2d2d0067"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo Bar"]) - - def test_hook_should_process_delete(self): - """on-modify hook should process 'task delete'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"25b66283-96e0-42b4-b835-8efd0ea1043c"} -{"description":"Foo","end":"20190820T201911Z","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"deleted","uuid":"25b66283-96e0-42b4-b835-8efd0ea1043c"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertClosedInterval(j[0], expectedTags=["Foo"]) - - def test_hook_should_process_denotate(self): - """on-modify hook should process 'task denotate'""" - self.t("start 10min ago Foo") - self.t("annotate @1 Annotation") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"8811cc93-a495-4fa6-993e-2b96cffc48e0","annotations":[{"entry":"20190820T201911Z","description":"Annotation"}]} -{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"8811cc93-a495-4fa6-993e-2b96cffc48e0"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="") - - def test_hook_should_process_done(self): - """on-modify hook should process 'task done'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T201912Z","modified":"20190820T201912Z","start":"20190820T201912Z","status":"pending","uuid":"c418b958-5c3c-4633-89a4-4a2f678d74d0"} -{"description":"Foo","end":"20190820T201912Z","entry":"20190820T201912Z","modified":"20190820T201912Z","status":"completed","uuid":"c418b958-5c3c-4633-89a4-4a2f678d74d0"} -""") - - self.assertEqual(b'', err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertClosedInterval(j[0], expectedTags=["Foo"]) - - def test_hook_should_process_modify_desc(self): - """on-modify hook should process 'task modify' for changing description""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203416Z","modified":"20190820T203416Z","start":"20190820T203416Z","status":"pending","uuid":"189e6745-04e0-4b17-949f-900cf63ab8d9"} -{"description":"Bar","entry":"20190820T203416Z","modified":"20190820T203416Z","start":"20190820T203416Z","status":"pending","uuid":"189e6745-04e0-4b17-949f-900cf63ab8d9"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Bar"]) - - def test_hook_should_process_modify_tags(self): - """on-modify hook should process 'task modify' for changing tags""" - self.t("start 10min ago Foo Tag Bar") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203620Z","modified":"20190820T203620Z","start":"20190820T203620Z","status":"pending","tags":["Tag","Bar"],"uuid":"6cab88f0-ac12-4a87-995a-0e7d39810c05"} -{"description":"Foo","entry":"20190820T203620Z","modified":"20190820T203620Z","start":"20190820T203620Z","status":"pending","tags":["Tag","Baz"],"uuid":"6cab88f0-ac12-4a87-995a-0e7d39810c05"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo", "Tag", "Baz"]) - - def test_hook_should_process_modify_project(self): - """on-modify hook should process 'task modify' for changing project""" - self.t("start Foo dummy") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","project":"dummy","start":"20190820T203842Z","status":"pending","uuid":"d95dc7a0-6189-4692-b58a-4ab60d539c8d"} -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","project":"test","start":"20190820T203842Z","status":"pending","uuid":"d95dc7a0-6189-4692-b58a-4ab60d539c8d"} -""") - - self.assertEqual(b'', err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo", "test"]) - - def test_hook_should_process_prepend(self): - """on-modify hook should process 'task prepend'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"02bc8839-b304-49f9-ac1a-29ac4850583f"} -{"description":"Prefix Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"02bc8839-b304-49f9-ac1a-29ac4850583f"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Prefix Foo"]) - - def test_hook_should_process_start(self): - """on-modify hook should process 'task start'""" - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","status":"pending","uuid":"16af44c5-57d2-43bf-97ed-cf2e541d927f"} -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"16af44c5-57d2-43bf-97ed-cf2e541d927f"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertOpenInterval(j[0], expectedTags=["Foo"]) - - def test_hook_should_process_stop(self): - """on-modify hook should process 'task stop'""" - self.t("start 10min ago Foo") - - out, err = self.process.communicate(input=b"""\ -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"13f83e99-f6a2-4857-9e00-bdeede064772"} -{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","status":"pending","uuid":"13f83e99-f6a2-4857-9e00-bdeede064772"} -""") - - self.assertEqual(bytes(b''), err) - - j = self.t.export() - self.assertEqual(len(j), 1) - self.assertClosedInterval(j[0], expectedTags=["Foo"]) diff --git a/test/test_on-modify_e2e.py b/test/test_on-modify_e2e.py new file mode 100755 index 0000000..b1ab752 --- /dev/null +++ b/test/test_on-modify_e2e.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 + +############################################################################### +# +# Copyright 2023, Gothenburg Bit Factory +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# https://www.opensource.org/licenses/mit-license.php +# +############################################################################### + +import os.path + +from basetest import Timew, Task, TestCase + + +class TestOnModifyHookScript(TestCase): + + def setUp(self): + if os.path.exists("/root/.local/share/timewarrior"): + datadir = "/root/.local/share/timewarrior" + configdir = "/root/.config/timewarrior" + else: + datadir = "/root/.timewarrior" + configdir = "/root/.timewarrior" + + self.timew = Timew(datadir=datadir, configdir=configdir) + self.timew.reset(keep_config=True) + + self.task = Task(datadir="/root/.task", taskrc="/root/.taskrc") + self.task.reset(keep_config=True, keep_hooks=True) + + def test_hook_should_process_annotate(self): + """on-modify hook should process 'task annotate'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("start 1") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 annotate Annotation") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="Annotation") + + def test_hook_should_process_append(self): + """on-modify hook should process 'task append'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 append Bar") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo Bar"]) + + def test_hook_should_process_delete(self): + """on-modify hook should process 'task delete'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("start 1") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("rc.confirmation=off delete 1") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertClosedInterval(j[0], expectedTags=["Foo"]) + + def test_hook_should_process_denotate(self): + """on-modify hook should process 'task denotate'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.task("1 annotate Annotation") + self.timew("start 10min ago Foo") + self.timew("annotate @1 Annotation") + self.task.activate_hooks() + + self.task("1 denotate") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="") + + def test_hook_should_process_done(self): + """on-modify hook should process 'task done'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 done") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertClosedInterval(j[0], expectedTags=["Foo"]) + + def test_hook_should_process_modify_description(self): + """on-modify hook should process 'task modify' for changing description""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 modify /Foo/Bar/") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Bar"]) + + def test_hook_should_process_modify_tags(self): + """on-modify hook should process 'task modify' for changing tags""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.task("1 modify +Bar +Tag") + self.timew("start 10min ago Foo Tag Bar") + self.task.activate_hooks() + + self.task("1 modify -Bar +Baz") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo", "Tag", "Baz"]) + + def test_hook_should_process_modify_project(self): + """on-modify hook should process 'task modify' for changing project""" + self.task.deactivate_hooks() + self.task("add Foo project:dummy") + self.task("1 start") + self.timew("start 10min ago Foo dummy") + self.task.activate_hooks() + + self.task("1 modify project:test") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo", "test"]) + + def test_hook_should_process_prepend(self): + """on-modify hook should process 'task prepend'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 prepend 'Prefix '") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Prefix Foo"]) + + def test_hook_should_process_start(self): + """on-modify hook should process 'task start'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task.activate_hooks() + + self.task("1 start") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertOpenInterval(j[0], expectedTags=["Foo"]) + + def test_hook_should_process_stop(self): + """on-modify hook should process 'task stop'""" + self.task.deactivate_hooks() + self.task("add Foo") + self.task("1 start") + self.timew("start 10min ago Foo") + self.task.activate_hooks() + + self.task("1 stop") + + j = self.timew.export() + self.assertEqual(len(j), 1) + self.assertClosedInterval(j[0], expectedTags=["Foo"])