mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-08-19 00:43:07 +02:00
Testing
Add a class to manage one or more taskd servers running on the same machine.
This commit is contained in:
parent
1d85416692
commit
29d5de1154
2 changed files with 252 additions and 0 deletions
68
test/basetest/utils.py
Normal file
68
test/basetest/utils.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import socket
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
USED_PORTS = set()
|
||||
|
||||
|
||||
def run_cmd_wait(cmd):
|
||||
"Run a subprocess and wait for it to finish"
|
||||
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
out, err = p.communicate()
|
||||
|
||||
if p.returncode != 0:
|
||||
raise IOError("Failed to run '{0}', exit code was '{1}', stdout"
|
||||
" '{2}' and stderr '{3}'".format(cmd, p.returncode,
|
||||
out, err))
|
||||
|
||||
|
||||
def port_used(addr="localhost", port=None):
|
||||
"Return True if port is in use, False otherwise"
|
||||
if port is None:
|
||||
raise TypeError("Argument 'port' may not be None")
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
result = s.connect_ex((addr, port))
|
||||
s.close()
|
||||
# result == 0 if connection was successful
|
||||
return result == 0
|
||||
|
||||
|
||||
def find_unused_port(addr="localhost", start=53589, track=True):
|
||||
"""Find an unused port starting at `port`
|
||||
|
||||
If track=False the returned port will not be marked as in-use and the code
|
||||
will rely entirely on the ability to connect to addr:port as detection
|
||||
mechanism. Note this may cause problems if ports are assigned but not used
|
||||
immediately
|
||||
"""
|
||||
maxport = 65535
|
||||
unused = None
|
||||
|
||||
for port in xrange(start, maxport):
|
||||
if not port_used(addr, port):
|
||||
if track and port in USED_PORTS:
|
||||
continue
|
||||
|
||||
unused = port
|
||||
break
|
||||
|
||||
if unused is None:
|
||||
raise ValueError("No available port in the range {0}-{1}".format(
|
||||
start, maxport))
|
||||
|
||||
if track:
|
||||
USED_PORTS.add(unused)
|
||||
|
||||
return unused
|
||||
|
||||
|
||||
def release_port(port):
|
||||
"""Forget that given port was marked as'in-use
|
||||
"""
|
||||
try:
|
||||
USED_PORTS.remove(port)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# vim: ai sts=4 et sw=4
|
Loading…
Add table
Add a link
Reference in a new issue