Add E2E tests

This commit is contained in:
Thomas Lauf 2023-01-02 22:02:14 +01:00
parent 487dfed981
commit 29479d064e
13 changed files with 2204 additions and 216 deletions

View file

@ -16,3 +16,48 @@ jobs:
venv/bin/pip install --upgrade pip venv/bin/pip install --upgrade pip
venv/bin/pip install pytest mockito venv/bin/pip install pytest mockito
venv/bin/pytest test/test_on-modify_unit.py venv/bin/pytest test/test_on-modify_unit.py
e2e-tests:
needs: unit-tests
strategy:
fail-fast: false
matrix:
include:
- name: "task [stable] & timew [stable]"
runner: ubuntu-latest
container: task-stable-timew-stable
- name: "task [develop] & timew [stable]"
runner: ubuntu-latest
container: task-develop-timew-stable
- name: "task [stable] & timew [develop]"
runner: ubuntu-latest
container: task-stable-timew-develop
- name: "task [develop] & timew [develop]"
runner: ubuntu-latest
container: task-develop-timew-develop
runs-on: ${{ matrix.runner }}
continue-on-error: ${{ matrix.continue-on-error == true }}
steps:
- uses: actions/checkout@v3
- name : Login to GHCR
uses : docker/login-action@v2.1.0
with :
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build ${{ matrix.name }}
env:
REGISTRY: ghcr.io
OWNER: ${{ github.repository_owner }}
GITHUB_USER: ${{ github.repository_owner }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CONTAINER: ${{ matrix.container }}
run: docker-compose build ${CONTAINER}
- name: Test ${{ matrix.name }}
env:
REGISTRY: ghcr.io
OWNER: ${{ github.repository_owner }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CONTAINER: ${{ matrix.container }}
run: docker-compose run ${CONTAINER}

71
docker-compose.yml Normal file
View file

@ -0,0 +1,71 @@
###############################################################################
#
# Copyright 2023, Gothenburg Bit Factory
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# https://www.opensource.org/licenses/mit-license.php
#
###############################################################################
version: '3'
services:
task-stable-timew-stable:
build:
context: .
dockerfile: test/docker/Dockerfile
args:
TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:stable-stable"
network_mode: "host"
security_opt:
- label=type:container_runtime_t
tty: true
task-develop-timew-stable:
build:
context: .
dockerfile: test/docker/Dockerfile
args:
TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:develop-stable"
network_mode: "host"
security_opt:
- label=type:container_runtime_t
tty: true
task-stable-timew-develop:
build:
context: .
dockerfile: test/docker/Dockerfile
args:
TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:stable-develop"
network_mode: "host"
security_opt:
- label=type:container_runtime_t
tty: true
task-develop-timew-develop:
build:
context: .
dockerfile: test/docker/Dockerfile
args:
TEST_IMAGE: "${REGISTRY}/${OWNER}/task-timew:develop-develop"
network_mode: "host"
security_opt:
- label=type:container_runtime_t
tty: true

View file

@ -0,0 +1,3 @@
from .task import Task
from .testing import TestCase
from .timew import Timew

View file

@ -0,0 +1,52 @@
import signal
sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
class CommandError(Exception):
def __init__(self, cmd, code, out, err=None, msg=None):
DEFAULT = ("Command '{{0}}' was {signal}'ed. "
"SIGABRT usually means program timed out.\n")
if msg is None:
msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
if err is not None:
msg_suffix += (
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
)
if code < 0:
self.msg = DEFAULT.format(signal=sig_names[abs(code)])
else:
self.msg = ("Command '{0}' finished with unexpected exit "
"code '{1}'.\n")
self.msg += msg_suffix
else:
self.msg = msg
self.cmd = cmd
self.out = out
self.err = err
self.code = code
def __str__(self):
return self.msg.format(self.cmd, self.code, self.out, self.err)
class HookError(Exception):
pass
class TimeoutWaitingFor(object):
def __init__(self, name):
self.name = name
def __repr__(self):
return "*** Timeout reached while waiting for {0} ***".format(
self.name)
class StreamsAreMerged(object):
def __repr__(self):
return "*** Streams are merged, STDERR is not available ***"

517
test/basetest/hooks.py Normal file
View file

@ -0,0 +1,517 @@
# -*- coding: utf-8 -*-
from __future__ import division
import errno
import os
import shutil
import stat
from sys import stderr
try:
import simplejson as json
except ImportError:
import json
from datetime import datetime
from .utils import DEFAULT_HOOK_PATH
from .exceptions import HookError
class InvalidJSON(object):
"""Object representing the original unparsed JSON string and the JSON error
"""
def __init__(self, original, error):
self.original = original
self.error = error
def json_decoder(string):
"""Attempt to decode a JSON string and in case of error return an
InvalidJSON object
"""
decoder = json.JSONDecoder().decode
try:
return decoder(string)
except ValueError as e:
return InvalidJSON(string, str(e))
class Hooks(object):
"""
Abstraction to help interact with hooks (add, remove) during tests and
keep track of which are active.
"""
def __init__(self, datadir):
"""
Initialize hooks container which keeps track of active hooks and
:arg datadir: Temporary location where task is running (/tmp/...)
"""
self.hookdir = os.path.join(datadir, "hooks")
self._hooks = {}
# Check if the hooks dir already exists
if not os.path.isdir(self.hookdir):
os.makedirs(self.hookdir)
def __repr__(self):
enabled = []
disabled = []
for hook in self:
if self[hook].is_active():
enabled.append(hook)
else:
disabled.append(hook)
enabled = ", ".join(enabled) or None
disabled = ", ".join(disabled) or None
return "<Hooks: enabled: {0} | disabled: {1}>".format(enabled,
disabled)
def __getitem__(self, name):
return self._hooks[name]
def __setitem__(self, key, value):
self._hooks[key] = value
def __delitem__(self, key):
del self._hooks[key]
def __iter__(self):
for item in self._hooks:
yield item
def __len__(self):
return len(self._hooks)
def add(self, hookname, content, log=False):
"""
Register hook with name 'hookname' and given file content.
:arg hookname: Should be a string starting with one of:
- on-launch
- on-add
- on-exit
- on-modify
:arg content: Content of the file as a (multi-line) string
:arg log: If we require checking input/output of the hook
"""
if log:
self[hookname] = LoggedHook(hookname, self.hookdir, content)
else:
try:
self[hookname] = Hook(hookname, self.hookdir, content)
except HookError:
return
self[hookname].enable()
def add_default(self, hookname, log=False):
"""
Register a pre-built hook that exists in the folder containing hooks used for testing.
If not explicitly passed hooks folder defaults to DEFAULT_HOOK_PATH
:arg hookname: Name of the default hook
:arg log: If we require checking input/output of the hook
"""
if log:
self[hookname] = LoggedHook(hookname, self.hookdir, default=True)
else:
self[hookname] = Hook(hookname, self.hookdir, default=True)
# Finally enable this hook
self[hookname].enable()
def remove(self, hook):
"""Remove the hook matching given hookname"""
try:
hookname = hook.hookname
except AttributeError:
hookname = hook
hook = self[hookname]
try:
del self[hookname]
except KeyError:
raise HookError("Hook {0} is not on record".format(hookname))
hook._delete()
def clear(self):
"""Remove all existing hooks and empty the hook registry"""
self._hooks = {}
# Remove any existing hooks
try:
shutil.rmtree(self.hookdir)
except OSError as e:
# If the hookdir folder doesn't exist, no harm done and keep going
if e.errno != errno.ENOENT:
raise
os.mkdir(self.hookdir)
class Hook(object):
"""
Represents a hook script and provides methods to enable/disable hooks
"""
def __init__(self, hookname, hookdir, content=None, default=False,
default_hookpath=None):
"""
Initialize and create the hook
This class supports creating hooks in two ways:
* by specifying default=True in which case hookname will be
searched on the hookpath and linked to the destination
* by specifying content="some text" in which case the hook will be
created with given content
:arg hookname: Name of the hook e.g.: on-add.foobar
:arg hookdir: Hooks directory under temporary task/ folder
:arg content: What should be written to the hookfile
:arg default: If True hookname is looked up on default_hookpath
:arg default_hookpath: Default location where to look for preset hooks
"""
self.hookname = hookname
self.hookdir = hookdir
self.hookfile = os.path.join(self.hookdir, self.hookname)
if default_hookpath is None:
self.default_hookpath = DEFAULT_HOOK_PATH
else:
self.default_hookpath = default_hookpath
self._check_hook_type()
self._check_hook_not_exists(self.hookfile)
if not default and content is None:
raise HookError("Cannot create hookfile {0} without content. "
"If using a builtin hook pass default=True"
.format(self.hookname))
if os.path.isfile(self.hookfile):
raise HookError("Hook with name {0} already exists. "
"Did you forget to remove() it before recreating?"
.format(self.hookname))
if default:
self.default_hookfile = os.path.join(self.default_hookpath,
self.hookname)
self._check_hook_exists(self.default_hookfile)
# Symlinks change permission of source file, cannot use one here
shutil.copy(self.default_hookfile, self.hookfile)
else:
self.default_hookfile = None
with open(self.hookfile, 'w') as fh:
fh.write(content)
def __eq__(self, other):
try:
if self.hookname == other.hookname:
return True
except AttributeError:
pass
return False
def __ne__(self, other):
try:
if self.hookname != other.hookname:
return True
except AttributeError:
pass
return False
def __hash__(self):
return self.hookname.__hash__()
def __repr__(self):
return "<Hook '{0}'>".format(self.hookname)
def __str__(self):
return self.hookname
def _check_hook_exists(self, hookfile):
"""Checks if the file pointed to by the current hook exists"""
if not os.path.isfile(hookfile) and not os.path.islink(hookfile):
raise HookError("Hook {0} doesn't exist.".format(hookfile))
def _check_hook_not_exists(self, hookfile):
"""Checks if the file pointed to by the current hook doesn't exist"""
try:
self._check_hook_exists(hookfile)
except HookError:
return
else:
raise HookError("Hook {0} already exists.".format(hookfile))
def _check_hook_type(self):
"""
Check if the hookname is valid and if another hook with the same
name was already created.
"""
for hooktype in ("on-launch", "on-add", "on-exit", "on-modify"):
if self.hookname.startswith(hooktype):
break
else:
stderr.write("WARNING: {0} is not a valid hook type. "
"It will not be triggered\n".format(self.hookname))
def _remove_file(self, file):
try:
os.remove(file)
except OSError as e:
if e.errno == errno.ENOENT:
raise HookError("Hook with name {0} was not found on "
"hooks/ folder".format(file))
else:
raise
def _delete(self):
"""
Remove the hook from disk
Don't call this method directly. Use Hooks.remove(hook) instead
"""
self._remove_hookfile(self.hookfile)
def enable(self):
"""Make hookfile executable to allow triggering"""
os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
def disable(self):
"""Remove hookfile executable bit to deny triggering"""
os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE)
def is_active(self):
"""Check if hook is active by verifying the execute bit"""
return os.access(self.hookfile, os.X_OK)
class LoggedHook(Hook):
"""
A variant of a Hook that allows checking that the hook was called,
what was received via STDIN and what was answered to STDOUT
"""
def __init__(self, *args, **kwargs):
super(LoggedHook, self).__init__(*args, **kwargs)
# The wrapper will replace the hookfile
# The original file will be 'wrappedfile'
# NOTE If the prefix "original_" is changed here, update wrapper.sh
self.wrappedname = "original_" + self.hookname
self.wrappedfile = os.path.join(self.hookdir, self.wrappedname)
self.original_wrapper = os.path.join(self.default_hookpath,
"wrapper.sh")
self.hooklog_in = self.wrappedfile + ".log.in"
self.hooklog_out = self.wrappedfile + ".log.out"
# Cache is used to avoid parsing the logfiles everytime it's needed
self._cache = {}
# Setup wrapper pointing to the correct hook name
self._setup_wrapper()
def __repr__(self):
return "<LoggedHook '{0}'>".format(self.hookname)
def _delete(self):
"""
Remove the hook from disk
Don't call this method directly. Use Task.hooks.remove(hook) instead
"""
super(LoggedHook, self)._delete()
self._remove_file(self.wrappedfile)
self._remove_file(self.hooklog_in)
self._remove_file(self.hooklog_out)
def _setup_wrapper(self):
"""Setup wrapper shell script to allow capturing input/output of hook"""
# Create empty hooklog to allow checking that hook executed
open(self.hooklog_in, 'w').close()
open(self.hooklog_out, 'w').close()
# Rename the original hook to the name that will be used by wrapper
self._check_hook_not_exists(self.wrappedfile)
os.rename(self.hookfile, self.wrappedfile)
# Symlinks change permission of source file, cannot use one here
shutil.copy(self.original_wrapper, self.hookfile)
def _get_log_stat(self):
"""Return the most recent change timestamp and size of both logfiles"""
stdin = os.stat(self.hooklog_in)
stdout = os.stat(self.hooklog_out)
last_change = max((stdin.st_mtime, stdout.st_mtime))
return last_change, stdin.st_size, stdout.st_size
def _use_cache(self):
"""Check if log files were changed since last check"""
try:
last_change = self._cache["last_change"]
except KeyError:
# No cache available
return False
else:
change = self._get_log_stat()
if last_change != change:
# Cache is outdated
return False
else:
# Cache is up to date
return True
def enable(self):
"""Make hookfile executable to allow triggering"""
super(LoggedHook, self).enable()
os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
def disable(self):
"""Remove hookfile executable bit to deny triggering"""
super(LoggedHook, self).disable()
os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE)
def is_active(self):
"""Check if hook is active by verifying the execute bit"""
parent_is_active = super(LoggedHook, self).disable()
return parent_is_active and os.access(self.wrappedfile, os.X_OK)
def get_logs(self):
"""
Parse the logs generated by the hook and return a dictionary
containing the logs collected with the wrapper in a python friendly format:
* JSON is parsed as python dictionaries
* timestamps are parsed as datetime objects
It should look something like this:
## STDIN file
% Called at 1414874711 with 'arg1 arg2 ...'
{... JSON received by the hook ... }
{... more JSON ...}
## STDOUT file
{... JSON emitted by the hook ... }
Logged messages
{... more JSON ...}
! Exit code: 1
"""
if self._use_cache():
return self._cache["log"]
log = {"calls": [],
"input": {
"json": [],
},
"output": {
"json": [],
"msgs": [],
},
"exitcode": None,
}
with open(self.hooklog_in) as fh:
for i, line in enumerate(fh):
line = line.rstrip("\n")
if line.startswith("%"):
tstamp, args = line.split(" with ")
# Timestamp includes nanosecond resolution
timestamp = tstamp.split(" ")[-1]
# convert timestamp to python datetime object
log["calls"].append({
"timestamp": datetime.fromtimestamp(float(timestamp)),
"args": args,
})
elif line.startswith("{"):
# Decode json input (to hook)
log["input"]["json"].append(json_decoder(line))
else:
raise IOError("Unexpected content on STDIN line {0}: {1}"
.format(i, line))
with open(self.hooklog_out) as fh:
for line in fh:
line = line.rstrip("\n")
if line.startswith("!"):
exitcode = int(line.split(" ")[-1])
log["exitcode"] = exitcode
elif line.startswith("{"):
# Decode json output (from hook)
log["output"]["json"].append(json_decoder(line))
else:
log["output"]["msgs"].append(line)
# NOTE convert all lists to tuples to prevent tampering?
self._cache["log"] = log
# Update last modification timestamp in cache
self._cache["last_change"] = self._get_log_stat()
return self._cache["log"]
def assertTriggeredCount(self, count):
"""
Check if current hook file was triggered/used by taskwarrior and
how many times.
"""
log = self.get_logs()
assert len(log["calls"]) == count, ("{0} calls expected for {1} but "
"found {2}".format(
count,
self.hookname,
log["calls"]
))
def assertExitcode(self, exitcode):
"""Check if current hook finished with the expected exit code"""
log = self.get_logs()
assert log["exitcode"] == exitcode, ("Expected exit code {0} for {1} "
"but found {2}".format(
exitcode,
self.hookname,
log["exitcode"]
))
def assertValidJSONOutput(self):
"""Check if current hook output is valid JSON in all expected replies"""
log = self.get_logs()
for i, out in enumerate(log["output"]["json"]):
assert not isinstance(out, InvalidJSON), ("Invalid JSON found at "
"reply number {0} with "
"content {1}".format(
i + 1,
out.original
))
def assertInvalidJSONOutput(self):
"""Check if current hook output is invalid JSON in any expected reply"""
log = self.get_logs()
for i, out in enumerate(log["output"]["json"]):
assert isinstance(out, InvalidJSON), ("Valid JSON found at reply "
"number {0} with content "
"{1}".format(
i + 1,
out.original
))
# vim: ai sts=4 et sw=4

36
test/basetest/meta.py Normal file
View file

@ -0,0 +1,36 @@
from __future__ import print_function, division
class MetaTest(type):
"""Helper metaclass to simplify dynamic test creation
Creates test_methods in the TestCase class dynamically named after the
arguments used.
"""
@staticmethod
def make_function(classname, *args, **kwargs):
def test(self):
# ### Body of the usual test_testcase ### #
# Override and redefine this method #
pass
# Title of test in report
test.__doc__ = "{0}".format(args[0])
return test
def __new__(meta, classname, bases, dct):
tests = dct.get("TESTS")
kwargs = dct.get("EXTRA", {})
for i, args in enumerate(tests):
func = meta.make_function(classname, *args, **kwargs)
# Rename the function after a unique identifier
# Name of function must start with test_ to be ran by unittest
func.__name__ = "test_{0}".format(i)
# Attach the new test to the testclass
dct[func.__name__] = func
return super(MetaTest, meta).__new__(meta, classname, bases, dct)

335
test/basetest/task.py Normal file
View file

@ -0,0 +1,335 @@
# -*- coding: utf-8 -*-
import atexit
import errno
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .hooks import Hooks
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, task_binary_location
class Task(object):
"""
Manage a task warrior instance
A temporary folder is used as data store of task warrior.
This class can be instantiated multiple times if multiple taskw clients are
needed.
This class can be given a Taskd instance for simplified configuration.
A taskw client should not be used after being destroyed.
"""
DEFAULT_TASK = task_binary_location()
def __init__(self, taskw=DEFAULT_TASK, datadir=tempfile.mkdtemp(prefix="task_"), taskrc=None):
"""
Initialize a Task warrior (client) that can interact with a taskd server.
The task client runs in a temporary folder.
:arg taskw: Task binary to use as client (defaults: task in PATH)
"""
self.taskw = taskw
# Used to specify what command to launch (and to inject faketime)
self._command = [self.taskw]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = datadir
self.taskrc = os.path.join(self.datadir, "test.rc") if taskrc is None else taskrc
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
with open(self.taskrc, 'w') as rc:
rc.write("data.location={0}\n"
"hooks=off\n"
"".format(self.datadir))
# Hooks disabled until requested
self.hooks = Hooks(self.datadir)
def __repr__(self):
txt = super(Task, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"""aka t = Task() ; t() which is now an alias to t.runSuccess()"""
return self.runSuccess(*args, **kwargs)
def reset(self, keep_config=False, keep_hooks=False):
"""Reset this instance to its maiden state"""
self._purge_folder(self.datadir)
if keep_hooks is False and self.hooks is not None:
self.hooks.clear()
if keep_config is False:
open(self.taskrc, 'w').close()
pass
def activate_hooks(self):
"""Enable self.hooks functionality and activate hooks on config"""
self.config("hooks", "on")
def deactivate_hooks(self):
"""Enable self.hooks functionality and activate hooks on config"""
self.config("hooks", "off")
def reset_env(self):
"""Set a new environment derived from the one used to launch the test"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# Make sure no TASKDDATA is isolated
self.env["TASKDATA"] = self.datadir
# As well as TASKRC
self.env["TASKRC"] = self.taskrc
def config(self, var, value):
"""Run setup `var` as `value` in task config"""
# Add -- to avoid misinterpretation of - in things like UUIDs
cmd = (self.taskw, "rc.confirmation=off", "config", "--", var, value)
return run_cmd_wait(cmd, env=self.env)
def del_config(self, var):
"""Remove `var` from taskd config"""
cmd = (self.taskw, "config", var)
return run_cmd_wait(cmd, env=self.env)
@property
def taskrc_content(self):
"""Returns the contents of the taskrc file."""
with open(self.taskrc, "r") as f:
return f.readlines()
def export(self, export_filter=None):
"""Run "task export", return JSON array of exported tasks."""
if export_filter is None:
export_filter = ""
code, out, err = self.runSuccess("rc.json.array=1 {0} export"
"".format(export_filter))
return json.loads(out)
def export_one(self, export_filter=None):
"""
Return a dictionary representing the exported task.
Will fail if multiple tasks match the filter.
"""
result = self.export(export_filter=export_filter)
if len(result) != 1:
descriptions = [task.get('description') or '[description-missing]'
for task in result]
raise ValueError(
"One task should match the '{0}' filter, '{1}' "
"matches:\n {2}".format(
export_filter or '',
len(result),
'\n '.join(descriptions)
))
return result[0]
@property
def latest(self):
return self.export_one("+LATEST")
@staticmethod
def _split_string_args_if_string(args):
"""
Helper function to parse and split into arguments a single string argument.
The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, str):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False,
timeout=5):
"""
Invoke task with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to task such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every task call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""
Invoke task with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to task such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every task call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == errno.ENOENT:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Task instance has been destroyed. "
"Create a new instance if you need a new client.")
def diag(self, merge_streams_with=None):
"""
Run task diagnostics.
This function may fail in which case the exception text is returned as
stderr or appended to stderr if merge_streams_with is set.
If set, merge_streams_with should have the format:
(exitcode, out, err)
which should be the output of any previous process that failed.
"""
try:
output = self.runSuccess("diag")
except CommandError as e:
# If task diag failed add the error to stderr
output = (e.code, None, str(e))
if merge_streams_with is None:
return output
else:
# Merge any given stdout and stderr with that of "task diag"
code, out, err = merge_streams_with
dcode, dout, derr = output
# Merge stdout
newout = "\n##### Debugging information (task diag): #####\n{0}"
if dout is None:
newout = newout.format("Not available, check STDERR")
else:
newout = newout.format(dout)
if out is not None:
newout = out + newout
# And merge stderr
newerr = "\n##### Debugging information (task diag): #####\n{0}"
if derr is None:
newerr = newerr.format("Not available, check STDOUT")
else:
newerr = newerr.format(derr)
if err is not None:
newerr = err + derr
return code, newout, newerr
def faketime(self, faketime=None):
"""
Set a faketime using libfaketime that will affect the following command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
@staticmethod
def _purge_folder(folder):
if not os.path.exists(folder):
return
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
print("Emptying {}".format(file_path))
if os.path.isfile(file_path) or os.path.islink(file_path):
open(file_path, 'w').close()
except Exception as e:
print("Failed to delete {}. Reason: {}".format(file_path, e))

117
test/basetest/testing.py Normal file
View file

@ -0,0 +1,117 @@
import datetime
import sys
import unittest
class BaseTestCase(unittest.TestCase):
def tap(self, out):
sys.stderr.write("--- tap output start ---\n")
for line in out.splitlines():
sys.stderr.write(line + '\n')
sys.stderr.write("--- tap output end ---\n")
class TestCase(BaseTestCase):
def assertOpenInterval(self, interval,
expectedId=None,
expectedStart=None,
expectedTags=None,
expectedAnnotation=None,
description=None):
description = f"interval {interval}" if description is None else description
self.assertKeyExists(interval, "start", description, "{} does not contain a start date")
self.assertKeyNotExists(interval, "end", description, "{} does contain an end date")
return self.assertInterval(interval,
expectedId=expectedId,
expectedStart=expectedStart,
expectedEnd=None,
expectedTags=expectedTags,
expectedAnnotation=expectedAnnotation,
description=description)
def assertClosedInterval(self, interval,
expectedId=None,
expectedStart=None,
expectedEnd=None,
expectedTags=None,
expectedAnnotation=None,
description=None):
description = f"interval {interval}" if description is None else description
self.assertKeyExists(interval, "start", description, "{} does not contain a start date")
self.assertKeyExists(interval, "end", description, "{} does not contain an end date")
return self.assertInterval(interval,
expectedId=expectedId,
expectedStart=expectedStart,
expectedEnd=expectedEnd,
expectedTags=expectedTags,
expectedAnnotation=expectedAnnotation,
description=description)
def assertInterval(self, interval,
expectedId=None,
expectedStart=None,
expectedEnd=None,
expectedTags=None,
expectedAnnotation=None,
description="interval"):
if expectedId is not None:
self.assertKeyExists(interval, "id", description, "{} does not contain an id")
self.assertIntervalValue(interval,
"id",
expectedId,
description,
"{} of {} do not match (expected: '{}', actual: '{}')")
if expectedStart:
self.assertIntervalTimestamp(interval, "start", expectedStart, description)
if expectedEnd:
self.assertIntervalTimestamp(interval, "end", expectedEnd, description)
if expectedTags:
self.assertKeyExists(interval, "tags", description, "{} does not contain tags")
self.assertIntervalValue(interval,
"tags",
expectedTags,
description,
"{} of {} do not match (expected: '{}', actual: '{}')")
if expectedAnnotation:
self.assertKeyExists(interval, "annotation", description, "{} is not annotated")
self.assertIntervalValue(interval,
"annotation",
expectedAnnotation,
description,
"{} of {} do not match (expected: '{}', actual: '{}')")
def assertKeyExists(self, interval, key, description, message):
self.assertTrue(key in interval, message.format(description))
def assertKeyNotExists(self, interval, key, description, message):
self.assertFalse(key in interval, message.format(description))
def assertIntervalTimestamp(self, interval, key, expected, description):
if isinstance(expected, datetime.datetime):
expected = "{:%Y%m%dT%H%M%SZ}".format(expected)
self.assertIntervalValue(interval,
key,
expected,
description,
"{} time of {} does not match (expected: '{}', actual: '{}')")
def assertIntervalValue(self, interval, key, expected, description, message):
actual = interval[key]
if isinstance(actual, list):
actual.sort()
expected.sort()
self.assertSequenceEqual(actual,
expected,
message.format(key, description, expected, actual))
else:
self.assertEqual(actual,
expected,
message.format(key, description, expected, actual))

294
test/basetest/timew.py Normal file
View file

@ -0,0 +1,294 @@
import atexit
import datetime
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, timew_binary_location, DEFAULT_EXTENSION_PATH
class Timew(object):
"""
Manage a Timewarrior instance
A temporary folder is used as data store of timewarrior.
A timew client should not be used after being destroyed.
"""
DEFAULT_TIMEW = timew_binary_location()
def __init__(self,
timew=DEFAULT_TIMEW,
datadir=tempfile.mkdtemp(prefix="timew_"),
configdir=tempfile.mkdtemp(prefix="timew_")):
"""
Initialize a timewarrior (client).
The program runs in a temporary folder.
:arg timew: Timewarrior binary to use as client (defaults: timew in PATH)
"""
self.timew = timew
# Used to specify what command to launch (and to inject faketime)
self._command = [self.timew]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = datadir
self.configDir = configdir
self.timewrc = os.path.join(self.configDir, 'timewarrior.cfg')
self.extdir = os.path.join(self.datadir, 'extensions')
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
def reset(self, keep_config=False, keep_extensions=False):
"""Reset this instance to its maiden state"""
self._purge_folder(self.datadir)
if keep_extensions is False:
self._purge_folder(self.extdir)
if keep_config is False:
open(self.timewrc, 'w').close()
def add_default_extension(self, filename):
"""Add default extension to current instance"""
if not os.path.isdir(self.extdir):
os.mkdir(self.extdir)
extfile = os.path.join(self.extdir, filename)
if os.path.isfile(extfile):
raise "{} already exists".format(extfile)
shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile)
def __repr__(self):
txt = super(Timew, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"""aka t = Timew() ; t() which is now an alias to t.runSuccess()"""
return self.runSuccess(*args, **kwargs)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# As well as TIMEWARRIORDB
self.env["TIMEWARRIORDB"] = self.datadir
# As well as MANPATH, so that the help tests can find the
# uninstalled man pages
parts = self.timew.split(os.path.sep)[0:-2]
parts.append("doc")
self.env["MANPATH"] = os.path.sep.join(parts)
def config(self, var, value):
"""Run setup `var` as `value` in timew config"""
cmd = (self.timew, ":yes", "config", var, value)
return run_cmd_wait(cmd, env=self.env)
@staticmethod
def _create_exclusion_interval(interval):
if not isinstance(interval, tuple):
raise TypeError("Please specify interval(s) as a tuple or a list of tuples")
if interval[0] is not None and not isinstance(interval[0], datetime.time):
raise TypeError("Start date must be a datetime.time but is {}".format(type(interval[0])))
if interval[1] is not None and not isinstance(interval[1], datetime.time):
raise TypeError("End date must be a datetime.time but is {}".format(type(interval[1])))
if interval[0] is None:
return "<{:%H:%M:%S}".format(interval[1])
if interval[1] is None:
return ">{:%H:%M:%S}".format(interval[0])
if interval[0] > interval[1]:
return "<{:%H:%M:%S} >{:%H:%M:%S}".format(interval[1], interval[0])
return "{:%H:%M:%S}-{:%H:%M:%S}".format(interval[0], interval[1])
def configure_exclusions(self, intervals):
if isinstance(intervals, list):
exclusion = " ".join([self._create_exclusion_interval(interval) for interval in intervals])
else:
exclusion = self._create_exclusion_interval(intervals)
self.config("exclusions.monday", exclusion)
self.config("exclusions.tuesday", exclusion)
self.config("exclusions.wednesday", exclusion)
self.config("exclusions.thursday", exclusion)
self.config("exclusions.friday", exclusion)
self.config("exclusions.saturday", exclusion)
self.config("exclusions.sunday", exclusion)
def del_config(self, var):
"""Remove `var` from timew config"""
cmd = (self.timew, ":yes", "config", var)
return run_cmd_wait(cmd, env=self.env)
@property
def timewrc_content(self):
"""Returns the contents of the timewrc file."""
with open(self.timewrc, "r") as f:
return f.readlines()
def export(self, export_filter=None):
"""Run "timew export", return JSON array of exported intervals."""
if export_filter is None:
export_filter = ""
code, out, err = self.runSuccess("{0} export".format(export_filter))
return json.loads(out)
@staticmethod
def _split_string_args_if_string(args):
"""
Helper function to parse and split into arguments a single string argument.
The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, str):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False, timeout=5):
"""
Invoke timew with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""
Invoke timew with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == 2:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Program instance has been destroyed. "
"Create a new instance if you need a new client.")
def faketime(self, faketime=None):
"""
Set a faketime using libfaketime that will affect the following command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
@staticmethod
def _purge_folder(folder):
if not os.path.exists(folder):
return
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print("Failed to delete {}. Reason: {}".format(file_path, e))

469
test/basetest/utils.py Normal file
View file

@ -0,0 +1,469 @@
from __future__ import division
import atexit
import functools
import json
import os
import signal
import socket
import sys
import tempfile
from queue import Queue, Empty
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from time import sleep
from .exceptions import CommandError, TimeoutWaitingFor
USED_PORTS = set()
ON_POSIX = 'posix' in sys.builtin_module_names
# Directory relative to basetest module location
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Location of binary files (usually the src/ folder)
BIN_PREFIX = os.path.abspath(
os.path.join("usr", "local", "bin")
)
# Default location of test certificates
DEFAULT_CERT_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_certs")
)
# Default location of test hooks
DEFAULT_HOOK_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_hooks")
)
# Default location of test extensions
DEFAULT_EXTENSION_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, "..", "test_extensions")
)
# Environment flags to control skipping of timew, task, and taskd tests
TASKW_SKIP = os.environ.get("TASKW_SKIP", False)
TIMEW_SKIP = os.environ.get("TIMEW_SKIP", False)
# Environment flags to control use of PATH or in-tree binaries
TASK_USE_PATH = os.environ.get("TASK_USE_PATH", False)
TIMEW_USE_PATH = os.environ.get("TIMEW_USE_PATH", False)
UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}")
def task_binary_location(cmd="task"):
"""If TASK_USE_PATH is set rely on PATH to look for task binaries.
Otherwise ../src/ is used by default.
"""
return binary_location(cmd, TASK_USE_PATH)
def timew_binary_location(cmd="timew"):
""" ../src/ is used by default."""
return binary_location(cmd, TIMEW_USE_PATH)
def binary_location(cmd, use_path=False):
"""
If USE_PATH is True rely on PATH to look for taskd binaries.
Otherwise ../src/ is used by default.
"""
if use_path:
return cmd
else:
return os.path.join(BIN_PREFIX, cmd)
def wait_condition(cond, timeout=1, sleeptime=.01):
"""Wait for condition to return anything other than None"""
# NOTE Increasing sleeptime can dramatically increase testsuite runtime
# It also reduces CPU load significantly
if timeout is None:
timeout = 1
if timeout < sleeptime:
print("Warning, timeout cannot be smaller than", sleeptime)
timeout = sleeptime
# Max number of attempts until giving up
tries = int(timeout / sleeptime)
for i in range(tries):
val = cond()
if val is not None:
break
sleep(sleeptime)
return val
def wait_process(pid, timeout=None):
"""Wait for process to finish"""
def process():
try:
os.kill(pid, 0)
except OSError:
# Process is dead
return True
else:
# Process is still ticking
return None
return wait_condition(process, timeout)
def _queue_output(arguments, pidq, outputq):
"""
Read/Write output/input of given process.
This function is meant to be executed in a thread as it may block
"""
kwargs = arguments["process"]
input = arguments["input"]
try:
proc = Popen(**kwargs)
except OSError as e:
# pid None is read by the main thread as a crash of the process
pidq.put(None)
outputq.put((
"",
("Unexpected exception caught during execution: '{0}' . ".format(e)),
255)) # false exitcode
return
# Put the PID in the queue for main process to know.
pidq.put(proc.pid)
# Send input and wait for finish
out, err = proc.communicate(input)
if sys.version_info > (3,):
out, err = out.decode('utf-8'), err.decode('utf-8')
# Give the output back to the caller
outputq.put((out, err, proc.returncode))
def _retrieve_output(thread, timeout, queue, thread_error):
"""Fetch output from binary subprocess queues"""
# Try to join the thread on failure abort
thread.join(timeout)
if thread.is_alive():
# Join should have killed the thread. This is unexpected
raise TimeoutWaitingFor(thread_error + ". Unexpected error")
# Thread died so we should have output
try:
# data = (stdout, stderr, exitcode)
data = queue.get(timeout=timeout)
except Empty:
data = TimeoutWaitingFor("streams from program")
return data
def _get_output(arguments, timeout=None):
"""
Collect output from the subprocess without blocking the main process
if subprocess hangs.
"""
# NOTE Increase this value if tests fail with None being received as
# stdout/stderr instead of the expected content
output_timeout = 0.1 # seconds
pidq = Queue()
outputq = Queue()
print(arguments)
t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
t.daemon = True
t.start()
try:
pid = pidq.get(timeout=timeout)
except Empty:
pid = None
# Process crashed or timed out for some reason
if pid is None:
return _retrieve_output(t, output_timeout, outputq, "Program to start")
# Wait for process to finish (normal execution)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq, "Program thread to join")
# If we reach this point we assume the process got stuck or timed out
for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
# Start with lower signals and escalate if process ignores them
try:
os.kill(pid, signal.SIGABRT)
except OSError as e:
# 3 means the process finished/died between last check and now
if e.errno != 3:
raise
# Wait for process to finish (should die/exit after signal)
state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq, "Program to die")
# This should never happen but in case something goes really bad
raise OSError("Program stopped responding and couldn't be killed")
def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
merge_streams=False, env=os.environ, timeout=None):
"""Run a subprocess and wait for it to finish"""
if input is None:
stdin = None
else:
stdin = PIPE
if merge_streams:
stderr = STDOUT
else:
stderr = PIPE
arguments = {
"process": {
"args": cmd,
"stdin": stdin,
"stdout": stdout,
"stderr": stderr,
"close_fds": ON_POSIX,
"env": env,
},
"input": input,
}
out, err, exit = _get_output(arguments, timeout)
if merge_streams:
if exit != 0:
raise CommandError(cmd, exit, out)
else:
return exit, out
else:
if exit != 0:
raise CommandError(cmd, exit, out, err)
else:
return exit, out, err
def run_cmd_wait_nofail(*args, **kwargs):
"""Same as run_cmd_wait but silence the exception if it happens"""
try:
return run_cmd_wait(*args, **kwargs)
except CommandError as e:
return e.code, e.out, e.err
def get_IPs(hostname):
output = {}
addrs = socket.getaddrinfo(hostname, 0, 0, 0, socket.IPPROTO_TCP)
for family, socktype, proto, canonname, sockaddr in addrs:
addr = sockaddr[0]
output[family] = addr
return output
def port_used(addr="localhost", port=None):
"Return True if port is in use, False otherwise"
if port is None:
raise TypeError("Argument 'port' may not be None")
# If we got an address name, resolve it both to IPv6 and IPv4.
IPs = get_IPs(addr)
# Taskd seems to prefer IPv6 so we do it first
for family in (socket.AF_INET6, socket.AF_INET):
try:
addr = IPs[family]
except KeyError:
continue
s = socket.socket(family, socket.SOCK_STREAM)
result = s.connect_ex((addr, port))
s.close()
if result == 0:
# connection was successful
return True
else:
return False
def find_unused_port(addr="localhost", start=53589, track=True):
"""Find an unused port starting at `start` port
If track=False the returned port will not be marked as in-use and the code
will rely entirely on the ability to connect to addr:port as detection
mechanism. Note this may cause problems if ports are assigned but not used
immediately
"""
maxport = 65535
unused = None
for port in xrange(start, maxport):
if not port_used(addr, port):
if track and port in USED_PORTS:
continue
unused = port
break
if unused is None:
raise ValueError("No available port in the range {0}-{1}".format(
start, maxport))
if track:
USED_PORTS.add(unused)
return unused
def release_port(port):
"""Forget that given port was marked as'in-use
"""
try:
USED_PORTS.remove(port)
except KeyError:
pass
def memoize(obj):
"""Keep an in-memory cache of function results given its inputs"""
cache = obj.cache = {}
@functools.wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = obj(*args, **kwargs)
return cache[key]
return memoizer
try:
from shutil import which
which = memoize(which)
except ImportError:
# NOTE: This is shutil.which backported from python-3.3.3
@memoize
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode) and
not os.path.isdir(fn))
# If we're given a path with a directory part, look it up directly
# rather than referring to PATH directories. This includes checking
# relative to the current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None
if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)
if sys.platform == "win32":
# The current directory takes precedence on Windows.
if os.curdir not in path:
path.insert(0, os.curdir)
# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path
# extensions. This will allow us to short circuit when given
# "python.exe". If it does match, only test that one, otherwise we
# have to try others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]
seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if normdir not in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
def parse_datafile(file):
"""Parse .data files, treating files as JSON"""
data = []
with open(file) as fh:
for line in fh:
line = line.rstrip("\n")
# Turn [] strings into {} to be treated properly as JSON hashes
if line.startswith('[') and line.endswith(']'):
line = '{' + line[1:-1] + '}'
if line.startswith("{"):
data.append(json.loads(line))
else:
data.append(line)
return data
def mkstemp(data):
"""Create a temporary file that is removed at process exit"""
def rmtemp(name):
try:
os.remove(name)
except OSError:
pass
f = tempfile.NamedTemporaryFile(delete=False)
f.write(data)
f.close()
# Ensure removal at end of python session
atexit.register(rmtemp, f.name)
return f.name
def mkstemp_exec(data):
"""Create a temporary executable file that is removed at process exit"""
name = mkstemp(data)
os.chmod(name, 0o755)
return name

62
test/docker/Dockerfile Normal file
View file

@ -0,0 +1,62 @@
###############################################################################
#
# Copyright 2023, Gothenburg Bit Factory
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# https://www.opensource.org/licenses/mit-license.php
#
###############################################################################
ARG TEST_IMAGE
FROM ${TEST_IMAGE}
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && yes | unminimize
RUN apt-get install -y python3 python3-venv
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10
# Create virtual environment
WORKDIR /
RUN python -m venv venv && \
/venv/bin/pip install --upgrade pip && \
/venv/bin/pip install pytest simpletap
# Store diagnostics
RUN ( \
python --version ; \
task diagnostics ; \
timew diagnostics ; \
/venv/bin/pip freeze ; \
) > diagnostics.txt 2>&1
# Copy on-modify-hook
ADD . /task-on-modify-hook
WORKDIR /task-on-modify-hook
# Install on-modify-hook
RUN mkdir -p ~/.task/hooks
RUN cp on_modify.py ~/.task/hooks/on-modify.timewarrior
RUN chmod +x ~/.task/hooks/on-modify.timewarrior
# Run tests
ENV TASK_USE_PATH=true TIMEW_USE_PATH=true
CMD [ "bash", "-c", "/venv/bin/pytest /task-on-modify-hook/test/test_on-modify_e2e.py"]

View file

@ -1,216 +0,0 @@
#!/usr/bin/env python3
###############################################################################
#
# Copyright 2019, Thomas Lauf, Paul Beckingham, Federico Hernandez.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# https://www.opensource.org/licenses/mit-license.php
#
###############################################################################
import os
import subprocess
import sys
from basetest import Timew, TestCase
# Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
class TestOnModifyHookScript(TestCase):
def setUp(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.t = Timew()
self.process = subprocess.Popen([os.path.join(current_dir, '../on_modify.py')],
env={
'PATH': '../src:' + os.environ['PATH'],
'TIMEWARRIORDB': self.t.datadir
},
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_hook_should_process_annotate(self):
"""on-modify hook should process 'task annotate'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"3495a755-c4c6-4106-aabe-c0d3d128b65a"}
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"3495a755-c4c6-4106-aabe-c0d3d128b65a","annotations":[{"entry":"20190820T201911Z","description":"Annotation"}]}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="Annotation")
def test_hook_should_process_append(self):
"""on-modify hook should process 'task append'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"da603270-ce2b-4a5a-9273-c67c2d2d0067"}
{"description":"Foo Bar","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"da603270-ce2b-4a5a-9273-c67c2d2d0067"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo Bar"])
def test_hook_should_process_delete(self):
"""on-modify hook should process 'task delete'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"25b66283-96e0-42b4-b835-8efd0ea1043c"}
{"description":"Foo","end":"20190820T201911Z","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"deleted","uuid":"25b66283-96e0-42b4-b835-8efd0ea1043c"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_denotate(self):
"""on-modify hook should process 'task denotate'"""
self.t("start 10min ago Foo")
self.t("annotate @1 Annotation")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"8811cc93-a495-4fa6-993e-2b96cffc48e0","annotations":[{"entry":"20190820T201911Z","description":"Annotation"}]}
{"description":"Foo","entry":"20190820T201911Z","modified":"20190820T201911Z","start":"20190820T201911Z","status":"pending","uuid":"8811cc93-a495-4fa6-993e-2b96cffc48e0"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="")
def test_hook_should_process_done(self):
"""on-modify hook should process 'task done'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T201912Z","modified":"20190820T201912Z","start":"20190820T201912Z","status":"pending","uuid":"c418b958-5c3c-4633-89a4-4a2f678d74d0"}
{"description":"Foo","end":"20190820T201912Z","entry":"20190820T201912Z","modified":"20190820T201912Z","status":"completed","uuid":"c418b958-5c3c-4633-89a4-4a2f678d74d0"}
""")
self.assertEqual(b'', err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_modify_desc(self):
"""on-modify hook should process 'task modify' for changing description"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203416Z","modified":"20190820T203416Z","start":"20190820T203416Z","status":"pending","uuid":"189e6745-04e0-4b17-949f-900cf63ab8d9"}
{"description":"Bar","entry":"20190820T203416Z","modified":"20190820T203416Z","start":"20190820T203416Z","status":"pending","uuid":"189e6745-04e0-4b17-949f-900cf63ab8d9"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Bar"])
def test_hook_should_process_modify_tags(self):
"""on-modify hook should process 'task modify' for changing tags"""
self.t("start 10min ago Foo Tag Bar")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203620Z","modified":"20190820T203620Z","start":"20190820T203620Z","status":"pending","tags":["Tag","Bar"],"uuid":"6cab88f0-ac12-4a87-995a-0e7d39810c05"}
{"description":"Foo","entry":"20190820T203620Z","modified":"20190820T203620Z","start":"20190820T203620Z","status":"pending","tags":["Tag","Baz"],"uuid":"6cab88f0-ac12-4a87-995a-0e7d39810c05"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo", "Tag", "Baz"])
def test_hook_should_process_modify_project(self):
"""on-modify hook should process 'task modify' for changing project"""
self.t("start Foo dummy")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","project":"dummy","start":"20190820T203842Z","status":"pending","uuid":"d95dc7a0-6189-4692-b58a-4ab60d539c8d"}
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","project":"test","start":"20190820T203842Z","status":"pending","uuid":"d95dc7a0-6189-4692-b58a-4ab60d539c8d"}
""")
self.assertEqual(b'', err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo", "test"])
def test_hook_should_process_prepend(self):
"""on-modify hook should process 'task prepend'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"02bc8839-b304-49f9-ac1a-29ac4850583f"}
{"description":"Prefix Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"02bc8839-b304-49f9-ac1a-29ac4850583f"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Prefix Foo"])
def test_hook_should_process_start(self):
"""on-modify hook should process 'task start'"""
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","status":"pending","uuid":"16af44c5-57d2-43bf-97ed-cf2e541d927f"}
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"16af44c5-57d2-43bf-97ed-cf2e541d927f"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_stop(self):
"""on-modify hook should process 'task stop'"""
self.t("start 10min ago Foo")
out, err = self.process.communicate(input=b"""\
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","start":"20190820T203842Z","status":"pending","uuid":"13f83e99-f6a2-4857-9e00-bdeede064772"}
{"description":"Foo","entry":"20190820T203842Z","modified":"20190820T203842Z","status":"pending","uuid":"13f83e99-f6a2-4857-9e00-bdeede064772"}
""")
self.assertEqual(bytes(b''), err)
j = self.t.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])

203
test/test_on-modify_e2e.py Executable file
View file

@ -0,0 +1,203 @@
#!/usr/bin/env python3
###############################################################################
#
# Copyright 2023, Gothenburg Bit Factory
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# https://www.opensource.org/licenses/mit-license.php
#
###############################################################################
import os.path
from basetest import Timew, Task, TestCase
class TestOnModifyHookScript(TestCase):
def setUp(self):
if os.path.exists("/root/.local/share/timewarrior"):
datadir = "/root/.local/share/timewarrior"
configdir = "/root/.config/timewarrior"
else:
datadir = "/root/.timewarrior"
configdir = "/root/.timewarrior"
self.timew = Timew(datadir=datadir, configdir=configdir)
self.timew.reset(keep_config=True)
self.task = Task(datadir="/root/.task", taskrc="/root/.taskrc")
self.task.reset(keep_config=True, keep_hooks=True)
def test_hook_should_process_annotate(self):
"""on-modify hook should process 'task annotate'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("start 1")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 annotate Annotation")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="Annotation")
def test_hook_should_process_append(self):
"""on-modify hook should process 'task append'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 append Bar")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo Bar"])
def test_hook_should_process_delete(self):
"""on-modify hook should process 'task delete'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("start 1")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("rc.confirmation=off delete 1")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_denotate(self):
"""on-modify hook should process 'task denotate'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.task("1 annotate Annotation")
self.timew("start 10min ago Foo")
self.timew("annotate @1 Annotation")
self.task.activate_hooks()
self.task("1 denotate")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"], expectedAnnotation="")
def test_hook_should_process_done(self):
"""on-modify hook should process 'task done'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 done")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_modify_description(self):
"""on-modify hook should process 'task modify' for changing description"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 modify /Foo/Bar/")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Bar"])
def test_hook_should_process_modify_tags(self):
"""on-modify hook should process 'task modify' for changing tags"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.task("1 modify +Bar +Tag")
self.timew("start 10min ago Foo Tag Bar")
self.task.activate_hooks()
self.task("1 modify -Bar +Baz")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo", "Tag", "Baz"])
def test_hook_should_process_modify_project(self):
"""on-modify hook should process 'task modify' for changing project"""
self.task.deactivate_hooks()
self.task("add Foo project:dummy")
self.task("1 start")
self.timew("start 10min ago Foo dummy")
self.task.activate_hooks()
self.task("1 modify project:test")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo", "test"])
def test_hook_should_process_prepend(self):
"""on-modify hook should process 'task prepend'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 prepend 'Prefix '")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Prefix Foo"])
def test_hook_should_process_start(self):
"""on-modify hook should process 'task start'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task.activate_hooks()
self.task("1 start")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertOpenInterval(j[0], expectedTags=["Foo"])
def test_hook_should_process_stop(self):
"""on-modify hook should process 'task stop'"""
self.task.deactivate_hooks()
self.task("add Foo")
self.task("1 start")
self.timew("start 10min ago Foo")
self.task.activate_hooks()
self.task("1 stop")
j = self.timew.export()
self.assertEqual(len(j), 1)
self.assertClosedInterval(j[0], expectedTags=["Foo"])