mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-06-26 10:54:26 +02:00
68 lines
1.8 KiB
Python
68 lines
1.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
import socket
|
|
from subprocess import Popen, PIPE
|
|
|
|
USED_PORTS = set()
|
|
|
|
|
|
def run_cmd_wait(cmd):
|
|
"Run a subprocess and wait for it to finish"
|
|
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
|
out, err = p.communicate()
|
|
|
|
if p.returncode != 0:
|
|
raise IOError("Failed to run '{0}', exit code was '{1}', stdout"
|
|
" '{2}' and stderr '{3}'".format(cmd, p.returncode,
|
|
out, err))
|
|
|
|
|
|
def port_used(addr="localhost", port=None):
|
|
"Return True if port is in use, False otherwise"
|
|
if port is None:
|
|
raise TypeError("Argument 'port' may not be None")
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
result = s.connect_ex((addr, port))
|
|
s.close()
|
|
# result == 0 if connection was successful
|
|
return result == 0
|
|
|
|
|
|
def find_unused_port(addr="localhost", start=53589, track=True):
|
|
"""Find an unused port starting at `port`
|
|
|
|
If track=False the returned port will not be marked as in-use and the code
|
|
will rely entirely on the ability to connect to addr:port as detection
|
|
mechanism. Note this may cause problems if ports are assigned but not used
|
|
immediately
|
|
"""
|
|
maxport = 65535
|
|
unused = None
|
|
|
|
for port in xrange(start, maxport):
|
|
if not port_used(addr, port):
|
|
if track and port in USED_PORTS:
|
|
continue
|
|
|
|
unused = port
|
|
break
|
|
|
|
if unused is None:
|
|
raise ValueError("No available port in the range {0}-{1}".format(
|
|
start, maxport))
|
|
|
|
if track:
|
|
USED_PORTS.add(unused)
|
|
|
|
return unused
|
|
|
|
|
|
def release_port(port):
|
|
"""Forget that given port was marked as'in-use
|
|
"""
|
|
try:
|
|
USED_PORTS.remove(port)
|
|
except KeyError:
|
|
pass
|
|
|
|
# vim: ai sts=4 et sw=4
|