mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-06-26 10:54:26 +02:00

The values of ENOENT and ESRCH are architecture-dependent, so don't assume they're always 2 and 3.
366 lines
12 KiB
Python
366 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import division, print_function
|
|
import errno
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import signal
|
|
import atexit
|
|
from time import sleep
|
|
from subprocess import Popen, PIPE
|
|
from .utils import (find_unused_port, release_port, port_used, run_cmd_wait,
|
|
which, parse_datafile, DEFAULT_CERT_PATH,
|
|
taskd_binary_location)
|
|
from .exceptions import CommandError
|
|
|
|
try:
|
|
from subprocess import DEVNULL
|
|
except ImportError:
|
|
DEVNULL = open(os.devnull, 'w')
|
|
|
|
|
|
class Taskd(object):
|
|
"""Manage a taskd instance
|
|
|
|
A temporary folder is used as data store of taskd.
|
|
This class can be instanciated multiple times if multiple taskd servers are
|
|
needed.
|
|
|
|
This class implements mechanisms to automatically select an available port
|
|
and prevent assigning the same port to different instances.
|
|
|
|
A server can be stopped and started multiple times, but should not be
|
|
started or stopped after being destroyed.
|
|
"""
|
|
DEFAULT_TASKD = taskd_binary_location()
|
|
TASKD_RUNNING = 0
|
|
TASKD_NEVER_STARTED = 1
|
|
TASKD_EXITED = 2
|
|
TASKD_NOT_LISTENING = 3
|
|
|
|
def __init__(self, taskd=DEFAULT_TASKD, certpath=None,
|
|
address="localhost"):
|
|
"""Initialize a Task server that runs in the background and stores data
|
|
in a temporary folder
|
|
|
|
:arg taskd: Taskd binary to launch the server (defaults: taskd in PATH)
|
|
:arg certpath: Folder where to find all certificates needed for taskd
|
|
:arg address: Address to bind to
|
|
"""
|
|
self.taskd = taskd
|
|
self.usercount = 0
|
|
|
|
# Will hold the taskd subprocess if it's running
|
|
self.proc = None
|
|
self.datadir = tempfile.mkdtemp(prefix="taskd_")
|
|
self.tasklog = os.path.join(self.datadir, "taskd.log")
|
|
self.taskpid = os.path.join(self.datadir, "taskd.pid")
|
|
|
|
# Ensure any instance is properly destroyed at session end
|
|
atexit.register(lambda: self.destroy())
|
|
|
|
self.reset_env()
|
|
|
|
if certpath is None:
|
|
certpath = DEFAULT_CERT_PATH
|
|
self.certpath = certpath
|
|
|
|
self.address = address
|
|
self.port = find_unused_port(self.address)
|
|
|
|
# Keep all certificate paths public for access by TaskClients
|
|
self.client_cert = os.path.join(self.certpath, "client.cert.pem")
|
|
self.client_key = os.path.join(self.certpath, "client.key.pem")
|
|
self.server_cert = os.path.join(self.certpath, "server.cert.pem")
|
|
self.server_key = os.path.join(self.certpath, "server.key.pem")
|
|
self.server_crl = os.path.join(self.certpath, "server.crl.pem")
|
|
self.ca_cert = os.path.join(self.certpath, "ca.cert.pem")
|
|
|
|
# Initialize taskd
|
|
cmd = (self.taskd, "init", "--data", self.datadir)
|
|
run_cmd_wait(cmd, env=self.env)
|
|
|
|
self.config("server", "{0}:{1}".format(self.address, self.port))
|
|
self.config("family", "IPv4")
|
|
self.config("log", self.tasklog)
|
|
self.config("pid.file", self.taskpid)
|
|
self.config("root", self.datadir)
|
|
self.config("client.allow", "^task [2-9]")
|
|
|
|
# Setup all necessary certificates
|
|
self.config("client.cert", self.client_cert)
|
|
self.config("client.key", self.client_key)
|
|
self.config("server.cert", self.server_cert)
|
|
self.config("server.key", self.server_key)
|
|
self.config("server.crl", self.server_crl)
|
|
self.config("ca.cert", self.ca_cert)
|
|
|
|
self.default_user = self.create_user()
|
|
|
|
def __repr__(self):
|
|
txt = super(Taskd, self).__repr__()
|
|
return "{0} running from {1}>".format(txt[:-1], self.datadir)
|
|
|
|
def reset_env(self):
|
|
"""Set a new environment derived from the one used to launch the test
|
|
"""
|
|
# Copy all env variables to avoid clashing subprocess environments
|
|
self.env = os.environ.copy()
|
|
|
|
# Make sure TASKDDATA points to the temporary folder
|
|
self.env["TASKDDATA"] = self.datadir
|
|
|
|
def create_user(self, user=None, org=None):
|
|
"""Create a user in the server and return the user credentials to use in a taskw client.
|
|
"""
|
|
if user is None:
|
|
# Create a unique user ID
|
|
uid = self.usercount
|
|
user = "test_user_{0}".format(uid)
|
|
|
|
# Increment the user_id
|
|
self.usercount += 1
|
|
|
|
if org is None:
|
|
org = "default_org"
|
|
|
|
self._add_entity("org", org, ignore_exists=True)
|
|
userkey = self._add_entity("user", org, user)
|
|
|
|
return user, org, userkey
|
|
|
|
def _add_entity(self, keyword, org, value=None, ignore_exists=False):
|
|
"""Add an organization or user to the current server
|
|
|
|
If a user creation is requested, the user unique ID is returned
|
|
"""
|
|
cmd = (self.taskd, "add", "--data", self.datadir, keyword, org)
|
|
|
|
if value is not None:
|
|
cmd += (value,)
|
|
|
|
try:
|
|
code, out, err = run_cmd_wait(cmd, env=self.env)
|
|
except CommandError as e:
|
|
match = False
|
|
for line in e.out.splitlines():
|
|
if line.endswith("already exists.") and ignore_exists:
|
|
match = True
|
|
break
|
|
|
|
# If the error was not "Already exists" report it
|
|
if not match:
|
|
raise
|
|
|
|
if keyword == "user":
|
|
expected = "New user key: "
|
|
for line in out.splitlines():
|
|
if line.startswith(expected):
|
|
return line.replace(expected, '')
|
|
|
|
def config(self, var, value):
|
|
"""Run setup `var` as `value` in taskd config
|
|
"""
|
|
cmd = (self.taskd, "config", "--force", "--data", self.datadir, var,
|
|
value)
|
|
run_cmd_wait(cmd, env=self.env)
|
|
|
|
# If server is running send a SIGHUP to force config reload
|
|
if self.proc is not None:
|
|
try:
|
|
self.proc.send_signal(signal.SIGHUP)
|
|
except:
|
|
pass
|
|
|
|
def status(self):
|
|
"""Check the status of the server by checking if it's still running and
|
|
listening for connections
|
|
:returns: Taskd.TASKD_[NEVER_STARTED/EXITED/NOT_LISTENING/RUNNING]
|
|
"""
|
|
if self.proc is None:
|
|
return self.TASKD_NEVER_STARTED
|
|
|
|
if self.returncode() is not None:
|
|
return self.TASKD_EXITED
|
|
|
|
if not port_used(addr=self.address, port=self.port):
|
|
return self.TASKD_NOT_LISTENING
|
|
|
|
return self.TASKD_RUNNING
|
|
|
|
def returncode(self):
|
|
"""If taskd finished, return its exit code, otherwise return None.
|
|
:returns: taskd's exit code or None
|
|
"""
|
|
return self.proc.poll()
|
|
|
|
def start(self, minutes=5, tries_per_minute=2):
|
|
"""Start the taskd server if it's not running.
|
|
If it's already running OSError will be raised
|
|
"""
|
|
if self.proc is None:
|
|
cmd = (self.taskd, "server", "--data", self.datadir)
|
|
self.proc = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
|
|
env=self.env)
|
|
else:
|
|
self.show_log_contents()
|
|
|
|
raise OSError("Taskd server is still running or crashed")
|
|
|
|
# Wait for server to listen by checking connectivity in the port
|
|
# Default is to wait up to 5 minutes checking once every 500ms
|
|
for i in range(minutes * 60 * tries_per_minute):
|
|
status = self.status()
|
|
|
|
if status == self.TASKD_RUNNING:
|
|
return
|
|
|
|
elif status == self.TASKD_NEVER_STARTED:
|
|
self.show_log_contents()
|
|
|
|
raise OSError("Task server was never started. "
|
|
"This shouldn't happen!!")
|
|
|
|
elif status == self.TASKD_EXITED:
|
|
# Collect output logs
|
|
out, err = self.proc.communicate()
|
|
|
|
self.show_log_contents()
|
|
|
|
raise OSError(
|
|
"Task server launched with '{0}' crashed or exited "
|
|
"prematurely. Exit code: {1}. "
|
|
"Listening on port: {2}. "
|
|
"Stdout: {3!r}, "
|
|
"Stderr: {4!r}.".format(
|
|
self.taskd,
|
|
self.returncode(),
|
|
self.port,
|
|
out,
|
|
err,
|
|
))
|
|
|
|
elif status == self.TASKD_NOT_LISTENING:
|
|
sleep(1 / tries_per_minute)
|
|
|
|
else:
|
|
self.show_log_contents()
|
|
|
|
raise OSError("Unknown running status for taskd '{0}'".format(
|
|
status))
|
|
|
|
# Force stop so we can collect output
|
|
proc = self.stop()
|
|
|
|
# Collect output logs
|
|
out, err = proc.communicate()
|
|
|
|
self.show_log_contents()
|
|
|
|
raise OSError("Task server didn't start and listen on port {0} after "
|
|
"{1} minutes. Stdout: {2!r}. Stderr: {3!r}.".format(
|
|
self.port, minutes, out, err))
|
|
|
|
def stop(self):
|
|
"""Stop the server by sending a SIGTERM and SIGKILL if fails to
|
|
terminate.
|
|
If it's already stopped OSError will be raised
|
|
|
|
Returns: a reference to the old process object
|
|
"""
|
|
if self.proc is None:
|
|
raise OSError("Taskd server is not running")
|
|
|
|
if self._check_pid():
|
|
self.proc.send_signal(signal.SIGTERM)
|
|
|
|
if self._check_pid():
|
|
self.proc.kill()
|
|
|
|
# Wait for process to end to avoid zombies
|
|
self.proc.wait()
|
|
|
|
# Keep a reference to the old process
|
|
proc = self.proc
|
|
|
|
# Unset the process to inform that no process is running
|
|
self.proc = None
|
|
|
|
return proc
|
|
|
|
def _check_pid(self):
|
|
"Check if self.proc is still running and a PID still exists"
|
|
# Wait ~1 sec for taskd to finish
|
|
signal = True
|
|
for i in range(10):
|
|
sleep(0.1)
|
|
if self.proc.poll() is not None:
|
|
signal = False
|
|
break
|
|
|
|
return signal
|
|
|
|
def destroy(self):
|
|
"""Cleanup the data folder and release server port for other instances
|
|
"""
|
|
# Ensure server is stopped first
|
|
if self.proc is not None:
|
|
self.stop()
|
|
|
|
try:
|
|
shutil.rmtree(self.datadir)
|
|
except OSError as e:
|
|
if e.errno == errno.ENOENT:
|
|
# Directory no longer exists
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
release_port(self.port)
|
|
|
|
# Prevent future reuse of this instance
|
|
self.start = self.__destroyed
|
|
self.config = self.__destroyed
|
|
self.stop = self.__destroyed
|
|
|
|
# self.destroy will get called when the python session closes.
|
|
# If self.destroy was already called, turn the action into a noop
|
|
self.destroy = lambda: None
|
|
|
|
def __destroyed(self, *args, **kwargs):
|
|
raise AttributeError("Taskd instance has been destroyed. "
|
|
"Create a new instance if you need a new server.")
|
|
|
|
@classmethod
|
|
def not_available(cls):
|
|
"""Check if the taskd binary is available in the path"""
|
|
if which(cls.DEFAULT_TASKD):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def client_data(self, client):
|
|
"""Return a python list with the content of tx.data matching the given
|
|
task client. tx.data will be parsed to string and JSON.
|
|
"""
|
|
file = os.path.join(self.datadir,
|
|
"orgs",
|
|
client.credentials["org"],
|
|
"users",
|
|
client.credentials["userkey"],
|
|
"tx.data")
|
|
|
|
return parse_datafile(file)
|
|
|
|
def show_log_contents(self):
|
|
"""Print to to STDOUT the contents of taskd.log
|
|
"""
|
|
if os.path.isfile(self.tasklog):
|
|
with open(self.tasklog) as fh:
|
|
print("#### Start taskd.log ####")
|
|
for line in fh:
|
|
print(line, end='')
|
|
print("#### End taskd.log ####")
|
|
|
|
# vim: ai sts=4 et sw=4
|