The way the code was written, we'd give up our token, detect a cyclic dependency, and then try to get our token back before exiting. Even with -j1, the temporary token release allowed any parent up the tree to continue running jobs, so it would take an arbitrary amount of time before we could exit (and report an error code to the parent). There was no visible symptom of this except that, with -j1, t/355-deps-cyclic would not finish until some of the later tests finished, which was surprising. To fix it, let's just check for a cyclic dependency first, then release the token only once we're sure things are sane.
263 lines
6.9 KiB
Python
263 lines
6.9 KiB
Python
#
|
|
# beware the jobberwack
|
|
#
|
|
import sys, os, errno, select, fcntl, signal
|
|
from helpers import atoi, close_on_exec
|
|
import state
|
|
|
|
_toplevel = 0
|
|
_mytokens = 1
|
|
_fds = None
|
|
_waitfds = {}
|
|
|
|
|
|
def _debug(s):
|
|
if 0:
|
|
sys.stderr.write('jwack#%d: %s' % (os.getpid(),s))
|
|
|
|
|
|
def _release(n):
|
|
global _mytokens
|
|
_mytokens += n
|
|
_debug('release(%d) -> %d\n' % (n, _mytokens))
|
|
if _mytokens > 1:
|
|
os.write(_fds[1], 't' * (_mytokens-1))
|
|
_mytokens = 1
|
|
|
|
|
|
def release_mine():
|
|
global _mytokens
|
|
assert(_mytokens >= 1)
|
|
_mytokens -= 1
|
|
_debug('release_mine() -> %d\n' % _mytokens)
|
|
os.write(_fds[1], 't')
|
|
|
|
|
|
def _timeout(sig, frame):
|
|
pass
|
|
|
|
|
|
def _make_pipe(startfd):
|
|
(a,b) = os.pipe()
|
|
fds = (fcntl.fcntl(a, fcntl.F_DUPFD, startfd),
|
|
fcntl.fcntl(b, fcntl.F_DUPFD, startfd+1))
|
|
os.close(a)
|
|
os.close(b)
|
|
return fds
|
|
|
|
|
|
def _try_read(fd, n):
|
|
# using djb's suggested way of doing non-blocking reads from a blocking
|
|
# socket: http://cr.yp.to/unix/nonblock.html
|
|
# We can't just make the socket non-blocking, because we want to be
|
|
# compatible with GNU Make, and they can't handle it.
|
|
r,w,x = select.select([fd], [], [], 0)
|
|
if not r:
|
|
return '' # try again
|
|
# ok, the socket is readable - but some other process might get there
|
|
# first. We have to set an alarm() in case our read() gets stuck.
|
|
assert(state.is_flushed())
|
|
oldh = signal.signal(signal.SIGALRM, _timeout)
|
|
try:
|
|
signal.alarm(1) # emergency fallback
|
|
try:
|
|
b = os.read(_fds[0], 1)
|
|
except OSError, e:
|
|
if e.errno in (errno.EAGAIN, errno.EINTR):
|
|
# interrupted or it was nonblocking
|
|
return '' # try again
|
|
else:
|
|
raise
|
|
finally:
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, oldh)
|
|
return b and b or None # None means EOF
|
|
|
|
|
|
def setup(maxjobs):
|
|
global _fds, _toplevel
|
|
if _fds:
|
|
return # already set up
|
|
_debug('setup(%d)\n' % maxjobs)
|
|
flags = ' ' + os.getenv('MAKEFLAGS', '') + ' '
|
|
FIND1 = ' --jobserver-auth=' # renamed in GNU make 4.2
|
|
FIND2 = ' --jobserver-fds=' # fallback syntax
|
|
FIND = FIND1
|
|
ofs = flags.find(FIND1)
|
|
if ofs < 0:
|
|
FIND = FIND2
|
|
ofs = flags.find(FIND2)
|
|
if ofs >= 0:
|
|
s = flags[ofs+len(FIND):]
|
|
(arg,junk) = s.split(' ', 1)
|
|
(a,b) = arg.split(',', 1)
|
|
a = atoi(a)
|
|
b = atoi(b)
|
|
if a <= 0 or b <= 0:
|
|
raise ValueError('invalid --jobserver-auth: %r' % arg)
|
|
try:
|
|
fcntl.fcntl(a, fcntl.F_GETFL)
|
|
fcntl.fcntl(b, fcntl.F_GETFL)
|
|
except IOError, e:
|
|
if e.errno == errno.EBADF:
|
|
raise ValueError('broken --jobserver-auth from make; prefix your Makefile rule with a "+"')
|
|
else:
|
|
raise
|
|
_fds = (a,b)
|
|
if maxjobs and not _fds:
|
|
# need to start a new server
|
|
_toplevel = maxjobs
|
|
_fds = _make_pipe(100)
|
|
_release(maxjobs-1)
|
|
os.putenv('MAKEFLAGS',
|
|
'%s -j --jobserver-auth=%d,%d --jobserver-fds=%d,%d' %
|
|
(os.getenv('MAKEFLAGS', ''),
|
|
_fds[0], _fds[1],
|
|
_fds[0], _fds[1]))
|
|
|
|
|
|
def wait(want_token):
|
|
rfds = _waitfds.keys()
|
|
if _fds and want_token:
|
|
rfds.append(_fds[0])
|
|
assert(rfds)
|
|
assert(state.is_flushed())
|
|
r,w,x = select.select(rfds, [], [])
|
|
_debug('_fds=%r; wfds=%r; readable: %r\n' % (_fds, _waitfds, r))
|
|
for fd in r:
|
|
if _fds and fd == _fds[0]:
|
|
pass
|
|
else:
|
|
pd = _waitfds[fd]
|
|
_debug("done: %r\n" % pd.name)
|
|
_release(1)
|
|
os.close(fd)
|
|
del _waitfds[fd]
|
|
rv = os.waitpid(pd.pid, 0)
|
|
assert(rv[0] == pd.pid)
|
|
_debug("done1: rv=%r\n" % (rv,))
|
|
rv = rv[1]
|
|
if os.WIFEXITED(rv):
|
|
pd.rv = os.WEXITSTATUS(rv)
|
|
else:
|
|
pd.rv = -os.WTERMSIG(rv)
|
|
_debug("done2: rv=%d\n" % pd.rv)
|
|
pd.donefunc(pd.name, pd.rv)
|
|
|
|
|
|
def has_token():
|
|
if _mytokens >= 1:
|
|
return True
|
|
|
|
|
|
def get_token(reason):
|
|
assert(state.is_flushed())
|
|
global _mytokens
|
|
assert(_mytokens <= 1)
|
|
setup(1)
|
|
while 1:
|
|
if _mytokens >= 1:
|
|
_debug("_mytokens is %d\n" % _mytokens)
|
|
assert(_mytokens == 1)
|
|
_debug('(%r) used my own token...\n' % reason)
|
|
break
|
|
assert(_mytokens < 1)
|
|
_debug('(%r) waiting for tokens...\n' % reason)
|
|
wait(want_token=1)
|
|
if _mytokens >= 1:
|
|
break
|
|
assert(_mytokens < 1)
|
|
if _fds:
|
|
b = _try_read(_fds[0], 1)
|
|
if b == None:
|
|
raise Exception('unexpected EOF on token read')
|
|
if b:
|
|
_mytokens += 1
|
|
_debug('(%r) got a token (%r).\n' % (reason, b))
|
|
break
|
|
assert(_mytokens <= 1)
|
|
|
|
|
|
def running():
|
|
return len(_waitfds)
|
|
|
|
|
|
def wait_all():
|
|
_debug("wait_all\n")
|
|
assert(state.is_flushed())
|
|
while running():
|
|
while _mytokens >= 1:
|
|
release_mine()
|
|
_debug("wait_all: wait()\n")
|
|
wait(want_token=0)
|
|
_debug("wait_all: empty list\n")
|
|
get_token('self') # get my token back
|
|
if _toplevel:
|
|
bb = ''
|
|
while 1:
|
|
b = _try_read(_fds[0], 8192)
|
|
bb += b
|
|
if not b: break
|
|
if len(bb) != _toplevel-1:
|
|
raise Exception('on exit: expected %d tokens; found %r'
|
|
% (_toplevel-1, len(bb)))
|
|
os.write(_fds[1], bb)
|
|
|
|
|
|
def force_return_tokens():
|
|
n = len(_waitfds)
|
|
if n:
|
|
_debug('%d tokens left in force_return_tokens\n' % n)
|
|
_debug('returning %d tokens\n' % n)
|
|
for k in _waitfds.keys():
|
|
del _waitfds[k]
|
|
if _fds:
|
|
_release(n)
|
|
assert(state.is_flushed())
|
|
|
|
|
|
def _pre_job(r, w, pfn):
|
|
os.close(r)
|
|
if pfn:
|
|
pfn()
|
|
|
|
|
|
class Job:
|
|
def __init__(self, name, pid, donefunc):
|
|
self.name = name
|
|
self.pid = pid
|
|
self.rv = None
|
|
self.donefunc = donefunc
|
|
|
|
def __repr__(self):
|
|
return 'Job(%s,%d)' % (self.name, self.pid)
|
|
|
|
|
|
def start_job(reason, jobfunc, donefunc):
|
|
assert(state.is_flushed())
|
|
global _mytokens
|
|
assert(_mytokens <= 1)
|
|
get_token(reason)
|
|
assert(_mytokens >= 1)
|
|
assert(_mytokens == 1)
|
|
_mytokens -= 1
|
|
r,w = _make_pipe(50)
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
# child
|
|
os.close(r)
|
|
rv = 201
|
|
try:
|
|
try:
|
|
rv = jobfunc() or 0
|
|
_debug('jobfunc completed (%r, %r)\n' % (jobfunc,rv))
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
_debug('exit: %d\n' % rv)
|
|
os._exit(rv)
|
|
close_on_exec(r, True)
|
|
os.close(w)
|
|
pd = Job(reason, pid, donefunc)
|
|
_waitfds[r] = pd
|