From bd5daf9754dff3667d88874ad779708b3086f9c0 Mon Sep 17 00:00:00 2001 From: Avery Pennarun Date: Sat, 13 Nov 2010 04:36:44 -0800 Subject: [PATCH] Totally disgusting support for jobservers. It needs some heavy rethinking and cleanup. But it seems to work! And it's even compatible with GNU make, apparently! --- jwack.py | 201 ++++++++++++++++++++++++++++++------------------------- redo.py | 85 ++++++++++++++--------- 2 files changed, 163 insertions(+), 123 deletions(-) diff --git a/jwack.py b/jwack.py index cdf81b6..ab0e2f5 100755 --- a/jwack.py +++ b/jwack.py @@ -2,34 +2,52 @@ # # beware the jobberwack # -import sys, os, errno, select, subprocess, fcntl -import options -from helpers import * - -optspec = """ -jwack [-j maxjobs] -- --- -j,jobs= maximum jobs to run at once -""" +import sys, os, errno, select, fcntl +_toplevel = 0 +_mytokens = 1 _fds = None -_tokens = {} _waitfds = {} -_fake_token = 0 + + +def _debug(s): + if 0: + sys.stderr.write('jwack#%d: %s' % (os.getpid(),s)) + + +def _atoi(v): + try: + return int(v or 0) + except ValueError: + return 0 def _release(n): - global _fake_token - if _fake_token: - _fake_token = 0 - n -= 1 - os.write(_fds[1], 't' * n) + global _mytokens + _debug('release(%d)\n' % n) + _mytokens += n + if _mytokens > 1: + os.write(_fds[1], 't' * (_mytokens-1)) + _mytokens = 1 + + +def _try_read(fd, n): + fcntl.fcntl(_fds[0], fcntl.F_SETFL, os.O_NONBLOCK) + try: + b = os.read(_fds[0], 1) # FIXME try: block + except OSError, e: + if e.errno == errno.EAGAIN: + return '' + else: + raise + return b and b or None def setup(maxjobs): - global _fds + global _fds, _toplevel if _fds: return # already set up + _debug('setup(%d)\n' % maxjobs) flags = ' ' + os.getenv('MAKEFLAGS', '') + ' ' FIND = ' --jobserver-fds=' ofs = flags.find(FIND) @@ -37,15 +55,16 @@ def setup(maxjobs): s = flags[ofs+len(FIND):] (arg,junk) = s.split(' ', 1) (a,b) = arg.split(',', 1) - a = atoi(a) - b = atoi(b) + a = _atoi(a) + b = _atoi(b) if a <= 0 or b <= 0: raise ValueError('invalid --jobserver-fds: %r' % arg) _fds = (a,b) if maxjobs and not _fds: # need to start a new server + _toplevel = maxjobs _fds = os.pipe() - _release(maxjobs) + _release(maxjobs-1) os.putenv('MAKEFLAGS', '%s --jobserver-fds=%d,%d -j' % (os.getenv('MAKEFLAGS'), _fds[0], _fds[1])) @@ -61,100 +80,98 @@ def wait(want_token): if _fds and fd == _fds[0]: pass else: - p = _waitfds[fd] - _release(_tokens[fd]) - b = os.read(fd, 1) - #print 'read: %r' % b - if b: - #print 'giving up %d tokens for child' % _tokens[fd] - _tokens[fd] = 0 + pd = _waitfds[fd] + _debug("done: %r\n" % pd.name) + _release(1) + os.close(fd) + del _waitfds[fd] + rv = os.waitpid(pd.pid, 0) + assert(rv[0] == pd.pid) + rv = rv[1] + if os.WIFEXITED(rv): + pd.rv = os.WEXITSTATUS(rv) else: - os.close(fd) - del _waitfds[fd] - del _tokens[fd] - p.wait() + pd.rv = -os.WTERMSIG(rv) -def wait_for_token(): - pfd = atoi(os.getenv('JWACK_PARENT_FD', '')) - #print 'pfd is %d' % pfd - if pfd: - # tell parent jwack to give back his token - os.write(pfd, 'j') - else: - # parent is a "real" GNU make. He'll assume we already have a token, - # so manufacture one and don't bother waiting. - global _fake_token - _fake_token = 1 - return +def get_token(reason): + global _mytokens while 1: - print 'waiting for tokens...' + if _mytokens >= 1: + _debug('(%r) used my own token...\n' % reason) + _mytokens -= 1 + return + _debug('(%r) waiting for tokens...\n' % reason) wait(want_token=1) if _fds: - fcntl.fcntl(_fds[0], fcntl.F_SETFL, os.O_NONBLOCK) - try: - b = os.read(_fds[0], 1) # FIXME try: block - except OSError, e: - if e.errno == errno.EAGAIN: - b = '' - pass - else: - raise + b = _try_read(_fds[0], 1) + if b == None: + raise Exception('unexpected EOF on token read') if b: break - print 'got a token (%r).' % b + _debug('(%r) got a token (%r).\n' % (reason, b)) def wait_all(): + _debug("wait_all\n") while _waitfds: + _debug("wait_all: wait()\n") wait(want_token=0) + _debug("wait_all: empty list\n") + if _toplevel: + bb = '' + while 1: + b = _try_read(_fds[0], 8192) + bb += b + if not b: break + if len(bb) != _toplevel-1: + raise Exception('on exit: expected %d tokens; found only %d' + % (_toplevel-1, len(b))) + _debug("wait_all: done\n") def force_return_tokens(): - n = sum(_tokens.values()) - print 'returning %d tokens' % n + n = len(_waitfds) + if n: + _debug('%d tokens left in force_return_tokens\n' % n) + _debug('returning %d tokens\n' % n) + for k in _waitfds.keys(): + del _waitfds[k] if _fds: _release(n) - for k in _tokens.keys(): - _tokens[k] = 0 -def _pre_job(r,w): - os.putenv('JWACK_PARENT_FD', str(w)) +def _pre_job(r, w, pfn): os.close(r) - + if pfn: + pfn() -def start_job(argv, stdout=None): - global _mytokens + +class Job: + def __init__(self, name, pid): + self.name = name + self.pid = pid + self.rv = None + + +def start_job(reason, jobfunc): setup(1) - if stdout: - argx = dict(stdout=stdout) - else: - argx = dict() - wait_for_token() + get_token(reason) r,w = os.pipe() - p = subprocess.Popen(argv, preexec_fn=lambda: _pre_job(r,w), **argx) + pid = os.fork() + if pid == 0: + # child + os.close(r) + try: + try: + jobfunc() + os._exit(0) + except Exception, e: + sys.stderr.write("Exception: %s\n" % e) + finally: + os._exit(201) + # else we're the parent process os.close(w) - _waitfds[r] = p - _tokens[r] = 1 - return p - - -def main(): - o = options.Options('jwack', optspec) - (opt, flags, extra) = o.parse(sys.argv[1:]) - - if not extra: - o.fatal("no command line given") - - setup(opt.jobs) - try: - p = start_job(extra) - wait_all() - return p.wait() - finally: - force_return_tokens() - - -if __name__ == "__main__": - sys.exit(main()) + pd = Job(reason, pid) + _waitfds[r] = pd + return pd diff --git a/redo.py b/redo.py index acd8421..57db064 100755 --- a/redo.py +++ b/redo.py @@ -1,10 +1,11 @@ #!/usr/bin/python import sys, os, subprocess, glob, time -import options +import options, jwack optspec = """ redo [targets...] -- +j,jobs= maximum number of jobs to build at once d,debug print dependency checks as they happen v,verbose print commands as they are run """ @@ -34,8 +35,8 @@ if not os.environ.get('REDO_BASE', ''): # only do this from the toplevel redo process, so unless the user # deliberately starts more than one redo on the same repository, it's # sort of ok. - for f in glob.glob('%s/lock.*' % base): - unlink(f) + for f in glob.glob('%s/.redo/lock^*' % base): + os.unlink(f) import vars from helpers import * @@ -120,22 +121,60 @@ def _build(t): def build(t): + mkdirp('%s/.redo' % vars.BASE) lockname = sname('lock', t) try: fd = os.open(lockname, os.O_CREAT|os.O_EXCL) except OSError, e: if e.errno == errno.EEXIST: log('%s (locked...)\n' % relpath(t, vars.STARTDIR)) - raise BuildLocked(t) + os._exit(199) else: raise os.close(fd) try: - return _build(t) + try: + return _build(t) + except BuildError, e: + err('%s\n' % e) + os._exit(1) finally: unlink(lockname) +def main(): + retcode = 0 + locked = {} + waits = {} + for t in targets: + if os.path.exists('%s/all.do' % t): + # t is a directory, but it has a default target + t = '%s/all' % t + waits[t] = jwack.start_job(t, lambda: build(t)) + jwack.wait_all() + for t,pd in waits.items(): + assert(pd.rv != None) + if pd.rv == 199: + # target was locked + locked[t] = 1 + elif pd.rv: + err('%s: exit code was %r\n' % (t, pd.rv)) + retcode = 1 + while locked: + for t in locked.keys(): + lockname = sname('lock', t) + stampname = sname('stamp', t) + if not os.path.exists(lockname): + relp = relpath(t, vars.STARTDIR) + log('%s (...unlocked!)\n' % relp) + if not os.path.exists(stampname): + err('%s: failed in another thread\n' % relp) + retcode = 2 + del locked[t] + time.sleep(0.2) + return retcode + + if not vars.DEPTH: # toplevel call to redo exenames = [os.path.abspath(sys.argv[0]), os.path.realpath(sys.argv[0])] @@ -145,32 +184,16 @@ if not vars.DEPTH: os.environ['PATH'] = ':'.join(dirnames) + ':' + os.environ['PATH'] try: - retcode = 0 - locked = {} - for t in targets: - if os.path.exists('%s/all.do' % t): - # t is a directory, but it has a default target - t = '%s/all' % t - mkdirp('%s/.redo' % vars.BASE) - try: - build(t) - except BuildError, e: - err('%s\n' % e) - retcode = 1 - except BuildLocked, e: - locked[t] = 1 - while locked: - for l in locked.keys(): - lockname = sname('lock', t) - stampname = sname('stamp', t) - if not os.path.exists(lockname): - relp = relpath(t, vars.STARTDIR) - log('%s (...unlocked!)\n' % relp) - if not os.path.exists(stampname): - err('%s: failed in another thread\n' % relp) - retcode = 2 - del locked[l] - time.sleep(0.2) + j = atoi(opt.jobs or 1) + if j < 1 or j > 1000: + err('invalid --jobs value: %r\n' % opt.jobs) + jwack.setup(j) + try: + retcode = main() + finally: + jwack.force_return_tokens() + if retcode: + err('exiting: %d\n' % retcode) sys.exit(retcode) except KeyboardInterrupt: sys.exit(200)