Totally disgusting support for jobservers.

It needs some heavy rethinking and cleanup.  But it seems to work!  And it's
even compatible with GNU make, apparently!
This commit is contained in:
Avery Pennarun 2010-11-13 04:36:44 -08:00
commit bd5daf9754
2 changed files with 163 additions and 123 deletions

197
jwack.py
View file

@ -2,34 +2,52 @@
# #
# beware the jobberwack # beware the jobberwack
# #
import sys, os, errno, select, subprocess, fcntl import sys, os, errno, select, fcntl
import options
from helpers import *
optspec = """
jwack [-j maxjobs] -- <command...>
--
j,jobs= maximum jobs to run at once
"""
_toplevel = 0
_mytokens = 1
_fds = None _fds = None
_tokens = {}
_waitfds = {} _waitfds = {}
_fake_token = 0
def _debug(s):
if 0:
sys.stderr.write('jwack#%d: %s' % (os.getpid(),s))
def _atoi(v):
try:
return int(v or 0)
except ValueError:
return 0
def _release(n): def _release(n):
global _fake_token global _mytokens
if _fake_token: _debug('release(%d)\n' % n)
_fake_token = 0 _mytokens += n
n -= 1 if _mytokens > 1:
os.write(_fds[1], 't' * n) os.write(_fds[1], 't' * (_mytokens-1))
_mytokens = 1
def _try_read(fd, n):
fcntl.fcntl(_fds[0], fcntl.F_SETFL, os.O_NONBLOCK)
try:
b = os.read(_fds[0], 1) # FIXME try: block
except OSError, e:
if e.errno == errno.EAGAIN:
return ''
else:
raise
return b and b or None
def setup(maxjobs): def setup(maxjobs):
global _fds global _fds, _toplevel
if _fds: if _fds:
return # already set up return # already set up
_debug('setup(%d)\n' % maxjobs)
flags = ' ' + os.getenv('MAKEFLAGS', '') + ' ' flags = ' ' + os.getenv('MAKEFLAGS', '') + ' '
FIND = ' --jobserver-fds=' FIND = ' --jobserver-fds='
ofs = flags.find(FIND) ofs = flags.find(FIND)
@ -37,15 +55,16 @@ def setup(maxjobs):
s = flags[ofs+len(FIND):] s = flags[ofs+len(FIND):]
(arg,junk) = s.split(' ', 1) (arg,junk) = s.split(' ', 1)
(a,b) = arg.split(',', 1) (a,b) = arg.split(',', 1)
a = atoi(a) a = _atoi(a)
b = atoi(b) b = _atoi(b)
if a <= 0 or b <= 0: if a <= 0 or b <= 0:
raise ValueError('invalid --jobserver-fds: %r' % arg) raise ValueError('invalid --jobserver-fds: %r' % arg)
_fds = (a,b) _fds = (a,b)
if maxjobs and not _fds: if maxjobs and not _fds:
# need to start a new server # need to start a new server
_toplevel = maxjobs
_fds = os.pipe() _fds = os.pipe()
_release(maxjobs) _release(maxjobs-1)
os.putenv('MAKEFLAGS', os.putenv('MAKEFLAGS',
'%s --jobserver-fds=%d,%d -j' % (os.getenv('MAKEFLAGS'), '%s --jobserver-fds=%d,%d -j' % (os.getenv('MAKEFLAGS'),
_fds[0], _fds[1])) _fds[0], _fds[1]))
@ -61,100 +80,98 @@ def wait(want_token):
if _fds and fd == _fds[0]: if _fds and fd == _fds[0]:
pass pass
else: else:
p = _waitfds[fd] pd = _waitfds[fd]
_release(_tokens[fd]) _debug("done: %r\n" % pd.name)
b = os.read(fd, 1) _release(1)
#print 'read: %r' % b
if b:
#print 'giving up %d tokens for child' % _tokens[fd]
_tokens[fd] = 0
else:
os.close(fd) os.close(fd)
del _waitfds[fd] del _waitfds[fd]
del _tokens[fd] rv = os.waitpid(pd.pid, 0)
p.wait() assert(rv[0] == pd.pid)
rv = rv[1]
if os.WIFEXITED(rv):
def wait_for_token(): pd.rv = os.WEXITSTATUS(rv)
pfd = atoi(os.getenv('JWACK_PARENT_FD', ''))
#print 'pfd is %d' % pfd
if pfd:
# tell parent jwack to give back his token
os.write(pfd, 'j')
else: else:
# parent is a "real" GNU make. He'll assume we already have a token, pd.rv = -os.WTERMSIG(rv)
# so manufacture one and don't bother waiting.
global _fake_token
_fake_token = 1 def get_token(reason):
return global _mytokens
while 1: while 1:
print 'waiting for tokens...' if _mytokens >= 1:
_debug('(%r) used my own token...\n' % reason)
_mytokens -= 1
return
_debug('(%r) waiting for tokens...\n' % reason)
wait(want_token=1) wait(want_token=1)
if _fds: if _fds:
fcntl.fcntl(_fds[0], fcntl.F_SETFL, os.O_NONBLOCK) b = _try_read(_fds[0], 1)
try: if b == None:
b = os.read(_fds[0], 1) # FIXME try: block raise Exception('unexpected EOF on token read')
except OSError, e:
if e.errno == errno.EAGAIN:
b = ''
pass
else:
raise
if b: if b:
break break
print 'got a token (%r).' % b _debug('(%r) got a token (%r).\n' % (reason, b))
def wait_all(): def wait_all():
_debug("wait_all\n")
while _waitfds: while _waitfds:
_debug("wait_all: wait()\n")
wait(want_token=0) wait(want_token=0)
_debug("wait_all: empty list\n")
if _toplevel:
bb = ''
while 1:
b = _try_read(_fds[0], 8192)
bb += b
if not b: break
if len(bb) != _toplevel-1:
raise Exception('on exit: expected %d tokens; found only %d'
% (_toplevel-1, len(b)))
_debug("wait_all: done\n")
def force_return_tokens(): def force_return_tokens():
n = sum(_tokens.values()) n = len(_waitfds)
print 'returning %d tokens' % n if n:
_debug('%d tokens left in force_return_tokens\n' % n)
_debug('returning %d tokens\n' % n)
for k in _waitfds.keys():
del _waitfds[k]
if _fds: if _fds:
_release(n) _release(n)
for k in _tokens.keys():
_tokens[k] = 0
def _pre_job(r,w): def _pre_job(r, w, pfn):
os.putenv('JWACK_PARENT_FD', str(w))
os.close(r) os.close(r)
if pfn:
pfn()
def start_job(argv, stdout=None): class Job:
global _mytokens def __init__(self, name, pid):
self.name = name
self.pid = pid
self.rv = None
def start_job(reason, jobfunc):
setup(1) setup(1)
if stdout: get_token(reason)
argx = dict(stdout=stdout)
else:
argx = dict()
wait_for_token()
r,w = os.pipe() r,w = os.pipe()
p = subprocess.Popen(argv, preexec_fn=lambda: _pre_job(r,w), **argx) pid = os.fork()
os.close(w) if pid == 0:
_waitfds[r] = p # child
_tokens[r] = 1 os.close(r)
return p
def main():
o = options.Options('jwack', optspec)
(opt, flags, extra) = o.parse(sys.argv[1:])
if not extra:
o.fatal("no command line given")
setup(opt.jobs)
try: try:
p = start_job(extra) try:
wait_all() jobfunc()
return p.wait() os._exit(0)
except Exception, e:
sys.stderr.write("Exception: %s\n" % e)
finally: finally:
force_return_tokens() os._exit(201)
# else we're the parent process
os.close(w)
if __name__ == "__main__": pd = Job(reason, pid)
sys.exit(main()) _waitfds[r] = pd
return pd

81
redo.py
View file

@ -1,10 +1,11 @@
#!/usr/bin/python #!/usr/bin/python
import sys, os, subprocess, glob, time import sys, os, subprocess, glob, time
import options import options, jwack
optspec = """ optspec = """
redo [targets...] redo [targets...]
-- --
j,jobs= maximum number of jobs to build at once
d,debug print dependency checks as they happen d,debug print dependency checks as they happen
v,verbose print commands as they are run v,verbose print commands as they are run
""" """
@ -34,8 +35,8 @@ if not os.environ.get('REDO_BASE', ''):
# only do this from the toplevel redo process, so unless the user # only do this from the toplevel redo process, so unless the user
# deliberately starts more than one redo on the same repository, it's # deliberately starts more than one redo on the same repository, it's
# sort of ok. # sort of ok.
for f in glob.glob('%s/lock.*' % base): for f in glob.glob('%s/.redo/lock^*' % base):
unlink(f) os.unlink(f)
import vars import vars
from helpers import * from helpers import *
@ -120,22 +121,60 @@ def _build(t):
def build(t): def build(t):
mkdirp('%s/.redo' % vars.BASE)
lockname = sname('lock', t) lockname = sname('lock', t)
try: try:
fd = os.open(lockname, os.O_CREAT|os.O_EXCL) fd = os.open(lockname, os.O_CREAT|os.O_EXCL)
except OSError, e: except OSError, e:
if e.errno == errno.EEXIST: if e.errno == errno.EEXIST:
log('%s (locked...)\n' % relpath(t, vars.STARTDIR)) log('%s (locked...)\n' % relpath(t, vars.STARTDIR))
raise BuildLocked(t) os._exit(199)
else: else:
raise raise
os.close(fd) os.close(fd)
try:
try: try:
return _build(t) return _build(t)
except BuildError, e:
err('%s\n' % e)
os._exit(1)
finally: finally:
unlink(lockname) unlink(lockname)
def main():
retcode = 0
locked = {}
waits = {}
for t in targets:
if os.path.exists('%s/all.do' % t):
# t is a directory, but it has a default target
t = '%s/all' % t
waits[t] = jwack.start_job(t, lambda: build(t))
jwack.wait_all()
for t,pd in waits.items():
assert(pd.rv != None)
if pd.rv == 199:
# target was locked
locked[t] = 1
elif pd.rv:
err('%s: exit code was %r\n' % (t, pd.rv))
retcode = 1
while locked:
for t in locked.keys():
lockname = sname('lock', t)
stampname = sname('stamp', t)
if not os.path.exists(lockname):
relp = relpath(t, vars.STARTDIR)
log('%s (...unlocked!)\n' % relp)
if not os.path.exists(stampname):
err('%s: failed in another thread\n' % relp)
retcode = 2
del locked[t]
time.sleep(0.2)
return retcode
if not vars.DEPTH: if not vars.DEPTH:
# toplevel call to redo # toplevel call to redo
exenames = [os.path.abspath(sys.argv[0]), os.path.realpath(sys.argv[0])] exenames = [os.path.abspath(sys.argv[0]), os.path.realpath(sys.argv[0])]
@ -145,32 +184,16 @@ if not vars.DEPTH:
os.environ['PATH'] = ':'.join(dirnames) + ':' + os.environ['PATH'] os.environ['PATH'] = ':'.join(dirnames) + ':' + os.environ['PATH']
try: try:
retcode = 0 j = atoi(opt.jobs or 1)
locked = {} if j < 1 or j > 1000:
for t in targets: err('invalid --jobs value: %r\n' % opt.jobs)
if os.path.exists('%s/all.do' % t): jwack.setup(j)
# t is a directory, but it has a default target
t = '%s/all' % t
mkdirp('%s/.redo' % vars.BASE)
try: try:
build(t) retcode = main()
except BuildError, e: finally:
err('%s\n' % e) jwack.force_return_tokens()
retcode = 1 if retcode:
except BuildLocked, e: err('exiting: %d\n' % retcode)
locked[t] = 1
while locked:
for l in locked.keys():
lockname = sname('lock', t)
stampname = sname('stamp', t)
if not os.path.exists(lockname):
relp = relpath(t, vars.STARTDIR)
log('%s (...unlocked!)\n' % relp)
if not os.path.exists(stampname):
err('%s: failed in another thread\n' % relp)
retcode = 2
del locked[l]
time.sleep(0.2)
sys.exit(retcode) sys.exit(retcode)
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(200) sys.exit(200)