taskshell/test/basetest/tasksh.py
2016-09-02 21:39:02 -04:00

192 lines
6.5 KiB
Python

# -*- coding: utf-8 -*-
import atexit
import json
import os
import shlex
import shutil
import tempfile
import unittest
from .exceptions import CommandError
from .utils import run_cmd_wait, run_cmd_wait_nofail, which, tasksh_binary_location, DEFAULT_EXTENSION_PATH
from .compat import STRING_TYPE
class Tasksh(object):
"""Manage a Tasksh instance
A temporary folder is used as data store of tasksh.
A tasksh client should not be used after being destroyed.
"""
DEFAULT_TASKSH = tasksh_binary_location()
def __init__(self, tasksh=DEFAULT_TASKSH):
"""Initialize a tasksh (client).
The program runs in a temporary folder.
:arg tasksh: Tasksh binary to use as client (defaults: tasksh in PATH)
"""
self.tasksh = tasksh
# Used to specify what command to launch (and to inject faketime)
self._command = [self.tasksh]
# Configuration of the isolated environment
self._original_pwd = os.getcwd()
self.datadir = tempfile.mkdtemp(prefix="tasksh_")
# Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy())
self.reset_env()
def add_default_extension(self, filename):
"""Add default extension to current instance
"""
if not os.path.isdir(self.extdir):
os.mkdir(self.extdir)
extfile = os.path.join(self.extdir, filename)
if os.path.isfile(extfile):
raise "{} already exists".format(extfile)
shutil.copy(os.path.join(DEFAULT_EXTENSION_PATH, filename), extfile)
def __repr__(self):
txt = super(Tasksh, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir)
def __call__(self, *args, **kwargs):
"aka t = Tasksh() ; t() which is now an alias to t.runSuccess()"
return self.runSuccess(*args, **kwargs)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test
"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
@staticmethod
def _split_string_args_if_string(args):
"""Helper function to parse and split into arguments a single string
argument. The string is literally the same as if written in the shell.
"""
# Enable nicer-looking calls by allowing plain strings
if isinstance(args, STRING_TYPE):
args = shlex.split(args)
return args
def runSuccess(self, args="", input=None, merge_streams=False,
timeout=5):
"""Invoke tasksh with given arguments and fail if exit code != 0
Use runError if you want exit_code to be tested automatically and
*not* fail if program finishes abnormally.
If you wish to pass instructions to tasksh such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every tasksh call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
if output[0] != 0:
raise CommandError(command, *output)
return output
def runError(self, args=(), input=None, merge_streams=False, timeout=5):
"""Invoke tasksh with given arguments and fail if exit code == 0
Use runSuccess if you want exit_code to be tested automatically and
*fail* if program finishes abnormally.
If you wish to pass instructions to tasksh such as confirmations or other
input via stdin, you can do so by providing a input string.
Such as input="y\ny\n".
If merge_streams=True stdout and stderr will be merged into stdout.
timeout = number of seconds the test will wait for every tasksh call.
Defaults to 1 second if not specified. Unit is seconds.
Returns (exit_code, stdout, stderr) if merge_streams=False
(exit_code, output) if merge_streams=True
"""
# Create a copy of the command
command = self._command[:]
args = self._split_string_args_if_string(args)
command.extend(args)
output = run_cmd_wait_nofail(command, input,
merge_streams=merge_streams,
env=self.env,
timeout=timeout)
# output[0] is the exit code
if output[0] == 0 or output[0] is None:
raise CommandError(command, *output)
return output
def destroy(self):
"""Cleanup the data folder and release server port for other instances
"""
try:
shutil.rmtree(self.datadir)
except OSError as e:
if e.errno == 2:
# Directory no longer exists
pass
else:
raise
# Prevent future reuse of this instance
self.runSuccess = self.__destroyed
self.runError = self.__destroyed
# self.destroy will get called when the python session closes.
# If self.destroy was already called, turn the action into a noop
self.destroy = lambda: None
def __destroyed(self, *args, **kwargs):
raise AttributeError("Program instance has been destroyed. "
"Create a new instance if you need a new client.")
def faketime(self, faketime=None):
"""Set a faketime using libfaketime that will affect the following
command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
# vim: ai sts=4 et sw=4