add initial bulk run from pre-commit over all files

This commit is contained in:
Felix Schurk 2024-07-29 22:34:51 +02:00
parent 665aeeef61
commit 93356b39c3
418 changed files with 21354 additions and 23858 deletions

View file

@ -4,6 +4,7 @@ import os
from sys import stderr
import shutil
import stat
try:
import simplejson as json
except ImportError:
@ -15,8 +16,8 @@ from .exceptions import HookError
class InvalidJSON(object):
"""Object representing the original unparsed JSON string and the JSON error
"""
"""Object representing the original unparsed JSON string and the JSON error"""
def __init__(self, original, error):
self.original = original
self.error = error
@ -38,6 +39,7 @@ class Hooks(object):
"""Abstraction to help interact with hooks (add, remove) during tests and
keep track of which are active.
"""
def __init__(self, datadir):
"""Initialize hooks container which keeps track of active hooks and
@ -63,8 +65,7 @@ class Hooks(object):
enabled = ", ".join(enabled) or None
disabled = ", ".join(disabled) or None
return "<Hooks: enabled: {0} | disabled: {1}>".format(enabled,
disabled)
return "<Hooks: enabled: {0} | disabled: {1}>".format(enabled, disabled)
def __getitem__(self, name):
return self._hooks[name]
@ -134,8 +135,7 @@ class Hooks(object):
hook._delete()
def clear(self):
"""Remove all existing hooks and empty the hook registry
"""
"""Remove all existing hooks and empty the hook registry"""
self._hooks = {}
# Remove any existing hooks
@ -150,10 +150,11 @@ class Hooks(object):
class Hook(object):
"""Represents a hook script and provides methods to enable/disable hooks
"""
def __init__(self, hookname, hookdir, content=None, default=False,
default_hookpath=None):
"""Represents a hook script and provides methods to enable/disable hooks"""
def __init__(
self, hookname, hookdir, content=None, default=False, default_hookpath=None
):
"""Initialize and create the hook
This class supports creating hooks in two ways:
@ -181,24 +182,25 @@ class Hook(object):
self._check_hook_not_exists(self.hookfile)
if not default and content is None:
raise HookError("Cannot create hookfile {0} without content. "
"If using a builtin hook pass default=True"
.format(self.hookname))
raise HookError(
"Cannot create hookfile {0} without content. "
"If using a builtin hook pass default=True".format(self.hookname)
)
if os.path.isfile(self.hookfile):
raise HookError("Hook with name {0} already exists. "
"Did you forget to remove() it before recreating?"
.format(self.hookname))
raise HookError(
"Hook with name {0} already exists. "
"Did you forget to remove() it before recreating?".format(self.hookname)
)
if default:
self.default_hookfile = os.path.join(self.default_hookpath,
self.hookname)
self.default_hookfile = os.path.join(self.default_hookpath, self.hookname)
self._check_hook_exists(self.default_hookfile)
# Symlinks change permission of source file, cannot use one here
shutil.copy(self.default_hookfile, self.hookfile)
else:
self.default_hookfile = None
with open(self.hookfile, 'w') as fh:
with open(self.hookfile, "w") as fh:
fh.write(content)
def __eq__(self, other):
@ -250,16 +252,19 @@ class Hook(object):
if self.hookname.startswith(hooktype):
break
else:
stderr.write("WARNING: {0} is not a valid hook type. "
"It will not be triggered\n".format(self.hookname))
stderr.write(
"WARNING: {0} is not a valid hook type. "
"It will not be triggered\n".format(self.hookname)
)
def _remove_file(self, file):
try:
os.remove(file)
except OSError as e:
if e.errno == errno.ENOENT:
raise HookError("Hook with name {0} was not found on "
"hooks/ folder".format(file))
raise HookError(
"Hook with name {0} was not found on " "hooks/ folder".format(file)
)
else:
raise
@ -271,18 +276,15 @@ class Hook(object):
self._remove_hookfile(self.hookfile)
def enable(self):
"""Make hookfile executable to allow triggering
"""
"""Make hookfile executable to allow triggering"""
os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
def disable(self):
"""Remove hookfile executable bit to deny triggering
"""
"""Remove hookfile executable bit to deny triggering"""
os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE)
def is_active(self):
"""Check if hook is active by verifying the execute bit
"""
"""Check if hook is active by verifying the execute bit"""
return os.access(self.hookfile, os.X_OK)
@ -290,6 +292,7 @@ class LoggedHook(Hook):
"""A variant of a Hook that allows checking that the hook was called, what
was received via STDIN and what was answered to STDOUT
"""
def __init__(self, *args, **kwargs):
super(LoggedHook, self).__init__(*args, **kwargs)
@ -300,8 +303,7 @@ class LoggedHook(Hook):
self.wrappedname = "original_" + self.hookname
self.wrappedfile = os.path.join(self.hookdir, self.wrappedname)
self.original_wrapper = os.path.join(self.default_hookpath,
"wrapper.sh")
self.original_wrapper = os.path.join(self.default_hookpath, "wrapper.sh")
self.hooklog_in = self.wrappedfile + ".log.in"
self.hooklog_out = self.wrappedfile + ".log.out"
@ -326,11 +328,10 @@ class LoggedHook(Hook):
self._remove_file(self.hooklog_out)
def _setup_wrapper(self):
"""Setup wrapper shell script to allow capturing input/output of hook
"""
"""Setup wrapper shell script to allow capturing input/output of hook"""
# Create empty hooklog to allow checking that hook executed
open(self.hooklog_in, 'w').close()
open(self.hooklog_out, 'w').close()
open(self.hooklog_in, "w").close()
open(self.hooklog_out, "w").close()
# Rename the original hook to the name that will be used by wrapper
self._check_hook_not_exists(self.wrappedfile)
@ -340,8 +341,7 @@ class LoggedHook(Hook):
shutil.copy(self.original_wrapper, self.hookfile)
def _get_log_stat(self):
"""Return the most recent change timestamp and size of both logfiles
"""
"""Return the most recent change timestamp and size of both logfiles"""
stdin = os.stat(self.hooklog_in)
stdout = os.stat(self.hooklog_out)
@ -349,8 +349,7 @@ class LoggedHook(Hook):
return last_change, stdin.st_size, stdout.st_size
def _use_cache(self):
"""Check if log files were changed since last check
"""
"""Check if log files were changed since last check"""
try:
last_change = self._cache["last_change"]
except KeyError:
@ -367,20 +366,17 @@ class LoggedHook(Hook):
return True
def enable(self):
"""Make hookfile executable to allow triggering
"""
"""Make hookfile executable to allow triggering"""
super(LoggedHook, self).enable()
os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
def disable(self):
"""Remove hookfile executable bit to deny triggering
"""
"""Remove hookfile executable bit to deny triggering"""
super(LoggedHook, self).disable()
os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE)
def is_active(self):
"""Check if hook is active by verifying the execute bit
"""
"""Check if hook is active by verifying the execute bit"""
parent_is_active = super(LoggedHook, self).disable()
return parent_is_active and os.access(self.wrappedfile, os.X_OK)
@ -407,16 +403,17 @@ class LoggedHook(Hook):
if self._use_cache():
return self._cache["log"]
log = {"calls": [],
"input": {
"json": [],
},
"output": {
"json": [],
"msgs": [],
},
"exitcode": None,
}
log = {
"calls": [],
"input": {
"json": [],
},
"output": {
"json": [],
"msgs": [],
},
"exitcode": None,
}
with open(self.hooklog_in) as fh:
for i, line in enumerate(fh):
@ -426,16 +423,19 @@ class LoggedHook(Hook):
# Timestamp includes nanosecond resolution
timestamp = tstamp.split(" ")[-1]
# convert timestamp to python datetime object
log["calls"].append({
"timestamp": datetime.fromtimestamp(float(timestamp)),
"args": args,
})
log["calls"].append(
{
"timestamp": datetime.fromtimestamp(float(timestamp)),
"args": args,
}
)
elif line.startswith("{"):
# Decode json input (to hook)
log["input"]["json"].append(json_decoder(line))
else:
raise IOError("Unexpected content on STDIN line {0}: {1}"
.format(i, line))
raise IOError(
"Unexpected content on STDIN line {0}: {1}".format(i, line)
)
with open(self.hooklog_out) as fh:
for line in fh:
@ -464,49 +464,43 @@ class LoggedHook(Hook):
"""
log = self.get_logs()
assert len(log["calls"]) == count, ("{0} calls expected for {1} but "
"found {2}".format(
count,
self.hookname,
log["calls"]
))
assert (
len(log["calls"]) == count
), "{0} calls expected for {1} but " "found {2}".format(
count, self.hookname, log["calls"]
)
def assertExitcode(self, exitcode):
"""Check if current hook finished with the expected exit code
"""
"""Check if current hook finished with the expected exit code"""
log = self.get_logs()
assert log["exitcode"] == exitcode, ("Expected exit code {0} for {1} "
"but found {2}".format(
exitcode,
self.hookname,
log["exitcode"]
))
assert (
log["exitcode"] == exitcode
), "Expected exit code {0} for {1} " "but found {2}".format(
exitcode, self.hookname, log["exitcode"]
)
def assertValidJSONOutput(self):
"""Check if current hook output is valid JSON in all expected replies
"""
"""Check if current hook output is valid JSON in all expected replies"""
log = self.get_logs()
for i, out in enumerate(log["output"]["json"]):
assert not isinstance(out, InvalidJSON), ("Invalid JSON found at "
"reply number {0} with "
"content {1}".format(
i + 1,
out.original
))
assert not isinstance(out, InvalidJSON), (
"Invalid JSON found at "
"reply number {0} with "
"content {1}".format(i + 1, out.original)
)
def assertInvalidJSONOutput(self):
"""Check if current hook output is invalid JSON in any expected reply
"""
"""Check if current hook output is invalid JSON in any expected reply"""
log = self.get_logs()
for i, out in enumerate(log["output"]["json"]):
assert isinstance(out, InvalidJSON), ("Valid JSON found at reply "
"number {0} with content "
"{1}".format(
i + 1,
out.original
))
assert isinstance(out, InvalidJSON), (
"Valid JSON found at reply "
"number {0} with content "
"{1}".format(i + 1, out.original)
)
# vim: ai sts=4 et sw=4