timewarrior/test/basetest/timew.py
Thomas Lauf 082708a7ca Fix typos and wording
Signed-off-by: Thomas Lauf <thomas.lauf@tngtech.com>
2023-04-04 22:44:44 +02:00

264 lines
9.2 KiB
Python

import atexit
import datetime
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, timew_binary_location, DEFAULT_EXTENSION_PATH
class Timew(object):
"""
Manage a Timewarrior instance
A temporary folder is used as data store of timewarrior.
A timew client should not be used after being destroyed.
"""
DEFAULT_TIMEW = timew_binary_location()
def __init__(self, timew=DEFAULT_TIMEW):
"""
Initialize a timewarrior (client).
The program runs in a temporary folder.
:arg timew: Timewarrior binary to use as client (defaults: timew in PATH)
"""
self.timew = timew
# Used to specify what command to launch (and to inject faketime)
self._command = [self.timew]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = tempfile.mkdtemp(prefix="timew_")
self.timewrc = os.path.join(self.datadir, "timewarrior.cfg")
self.extdir = os.path.join(self.datadir, "extensions")
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
def add_default_extension(self, filename):
"""Add default extension to current instance"""
if not os.path.isdir(self.extdir):
os.mkdir(self.extdir)
extfile = os.path.join(self.extdir, filename)
if os.path.isfile(extfile):
raise "{} already exists".format(extfile)
shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile)
def __repr__(self):
txt = super(Timew, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"""aka t = Timew() ; t() which is now an alias to t.runSuccess()"""
return self.runSuccess(*args, **kwargs)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# As well as TIMEWARRIORDB
self.env["TIMEWARRIORDB"] = self.datadir
# As well as MANPATH, so that the help tests can find the
# uninstalled man pages
parts = self.timew.split(os.path.sep)[0:-2]
parts.append("doc")
self.env["MANPATH"] = os.path.sep.join(parts)
def config(self, var, value):
"""Run setup `var` as `value` in timew config"""
cmd = (self.timew, ":yes", "config", var, value)
return run_cmd_wait(cmd, env=self.env)
@staticmethod
def _create_exclusion_interval(interval):
if not isinstance(interval, tuple):
raise TypeError("Please specify interval(s) as a tuple or a list of tuples")
if interval[0] is not None and not isinstance(interval[0], datetime.time):
raise TypeError("Start date must be a datetime.time but is {}".format(type(interval[0])))
if interval[1] is not None and not isinstance(interval[1], datetime.time):
raise TypeError("End date must be a datetime.time but is {}".format(type(interval[1])))
if interval[0] is None:
return "<{:%H:%M:%S}".format(interval[1])
if interval[1] is None:
return ">{:%H:%M:%S}".format(interval[0])
if interval[0] > interval[1]:
return "<{:%H:%M:%S} >{:%H:%M:%S}".format(interval[1], interval[0])
return "{:%H:%M:%S}-{:%H:%M:%S}".format(interval[0], interval[1])
def configure_exclusions(self, intervals):
if isinstance(intervals, list):
exclusion = " ".join([self._create_exclusion_interval(interval) for interval in intervals])
else:
exclusion = self._create_exclusion_interval(intervals)
self.config("exclusions.monday", exclusion)
self.config("exclusions.tuesday", exclusion)
self.config("exclusions.wednesday", exclusion)
self.config("exclusions.thursday", exclusion)
self.config("exclusions.friday", exclusion)
self.config("exclusions.saturday", exclusion)
self.config("exclusions.sunday", exclusion)
def del_config(self, var):
"""Remove `var` from timew config"""
cmd = (self.timew, ":yes", "config", var)
return run_cmd_wait(cmd, env=self.env)
@property
def timewrc_content(self):
"""Returns the contents of the timewrc file."""
with open(self.timewrc, "r") as f:
return f.readlines()
def export(self, export_filter=None):
"""Run "timew export", return JSON array of exported intervals."""
if export_filter is None:
export_filter = ""
code, out, err = self.runSuccess("{0} export".format(export_filter))
return json.loads(out)
@staticmethod
def _split_string_args_if_string(args):
"""
Helper function to parse and split into arguments a single string argument.
The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, str):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False, timeout=5):
"""
Invoke timew with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing an input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""
Invoke timew with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to timew such as confirmations or other
input via stdin, you can do so by providing an input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every timew call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == 2:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Program instance has been destroyed. "
"Create a new instance if you need a new client.")
def faketime(self, faketime=None):
"""
Set a faketime using libfaketime that will affect the following command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command