From 362ca2997ae114ff709ce19c442085ade1a8c0e7 Mon Sep 17 00:00:00 2001 From: Avery Pennarun Date: Fri, 19 Nov 2010 06:04:45 -0800 Subject: [PATCH] A whole bunch of cleanups to state.Lock. Now t/curse passes again when parallelized (except for the countall mismatch, since we haven't fixed the source of that problem yet). At least it's consistent now. There's a bunch of stuff rearranged in here, but the actual important problem was that we were doing unlink() on the lock fifo even if ENXIO, which meant a reader could connect in between ENXIO and unlink(), and thus never get notified of the disconnection. This would cause the build to randomly freeze. --- clean.do | 2 +- helpers.py | 1 + jwack.py | 21 +++++++++++++------ redo.py | 61 ++++++++++++++++++++++++++++++------------------------ state.py | 22 ++++++++++++-------- t/clean.do | 1 + 6 files changed, 65 insertions(+), 43 deletions(-) diff --git a/clean.do b/clean.do index 4f59048..e3835a6 100644 --- a/clean.do +++ b/clean.do @@ -1,3 +1,3 @@ redo t/clean -rm -f t/hello t/[by]ellow t/*.o *~ .*~ */*~ */.*~ *.pyc t/CC t/LD +rm -f *~ .*~ */*~ */.*~ *.pyc rm -rf .redo t/.redo diff --git a/helpers.py b/helpers.py index 87f3a7c..1310aae 100644 --- a/helpers.py +++ b/helpers.py @@ -36,6 +36,7 @@ def mkdirp(d, mode=None): def log_(s): sys.stdout.flush() sys.stderr.write(s) + #sys.stderr.write('%d %s' % (os.getpid(), s)) sys.stderr.flush() diff --git a/jwack.py b/jwack.py index 893d00e..f8d5a4f 100644 --- a/jwack.py +++ b/jwack.py @@ -81,6 +81,7 @@ def wait(want_token): rfds = _waitfds.keys() if _fds and want_token: rfds.append(_fds[0]) + assert(rfds) r,w,x = select.select(rfds, [], []) _debug('_fds=%r; wfds=%r; readable: %r\n' % (_fds, _waitfds, r)) for fd in r: @@ -99,6 +100,7 @@ def wait(want_token): pd.rv = os.WEXITSTATUS(rv) else: pd.rv = -os.WTERMSIG(rv) + pd.donefunc(pd.name, pd.rv) def get_token(reason): @@ -119,9 +121,13 @@ def get_token(reason): _debug('(%r) got a token (%r).\n' % (reason, b)) +def running(): + return len(_waitfds) + + def wait_all(): _debug("wait_all\n") - while _waitfds: + while running(): _debug("wait_all: wait()\n") wait(want_token=0) _debug("wait_all: empty list\n") @@ -134,7 +140,7 @@ def wait_all(): if len(bb) != _toplevel-1: raise Exception('on exit: expected %d tokens; found only %d' % (_toplevel-1, len(b))) - _debug("wait_all: done\n") + os.write(_fds[1], bb) def force_return_tokens(): @@ -155,13 +161,17 @@ def _pre_job(r, w, pfn): class Job: - def __init__(self, name, pid): + def __init__(self, name, pid, donefunc): self.name = name self.pid = pid self.rv = None + self.donefunc = donefunc + + def __repr__(self): + return 'Job(%s,%d)' % (self.name, self.pid) -def start_job(reason, jobfunc): +def start_job(reason, jobfunc, donefunc): setup(1) get_token(reason) r,w = os.pipe() @@ -180,6 +190,5 @@ def start_job(reason, jobfunc): os._exit(201) # else we're the parent process os.close(w) - pd = Job(reason, pid) + pd = Job(reason, pid, donefunc) _waitfds[r] = pd - return pd diff --git a/redo.py b/redo.py index 2092a99..517d52e 100755 --- a/redo.py +++ b/redo.py @@ -48,14 +48,12 @@ if is_root: # deliberately starts more than one redo on the same repository, it's # sort of ok. mkdirp('%s/.redo' % base) - for f in glob.glob('%s/.redo/lock^*' % base): + for f in glob.glob('%s/.redo/lock*' % base): os.unlink(f) class BuildError(Exception): pass -class BuildLocked(Exception): - pass def _possible_do_files(t): @@ -100,7 +98,6 @@ def _build(t): # which is undesirable since hello.c existed already. state.stamp(t) return # success - state.unstamp(t) state.start(t) (dofile, basename, ext) = find_do_file(t) if not dofile: @@ -145,7 +142,7 @@ def _build(t): def build(t): lock = state.Lock(t) - lock.lock() + lock.trylock() if not lock.owned: log('%s (locked...)\n' % relpath(t, vars.STARTDIR)) os._exit(199) @@ -160,34 +157,44 @@ def build(t): def main(): - retcode = 0 - locked = {} - waits = {} + retcode = [0] # a list so that it can be reassigned from done() if vars.SHUFFLE: random.shuffle(targets) + + locked = [] + + def done(t, rv): + if rv == 199: + locked.append(t) + elif rv: + err('%s: exit code was %r\n' % (t, rv)) + retcode[0] = 1 + for t in targets: if os.path.exists('%s/all.do' % t): # t is a directory, but it has a default target t = '%s/all' % t - waits[t] = jwack.start_job(t, lambda: build(t)) - jwack.wait_all() - for t,pd in waits.items(): - assert(pd.rv != None) - if pd.rv == 199: - # target was locked - locked[t] = 1 - elif pd.rv: - err('%s: exit code was %r\n' % (t, pd.rv)) - retcode = 1 - for t in locked.keys(): - lock = state.Lock(t) - lock.wait() - relp = relpath(t, vars.STARTDIR) - log('%s (...unlocked!)\n' % relp) - if state.stamped(t) == None: - err('%s: failed in another thread\n' % relp) - retcode = 2 - return retcode + tt = t + jwack.start_job(t, lambda: build(t), lambda t,rv: done(t,rv)) + while locked or jwack.running(): + jwack.wait_all() + if locked: + t = locked.pop(0) + l = state.Lock(t) + while not l.owned: + l.wait() + l.trylock() + assert(l.owned) + relp = relpath(t, vars.STARTDIR) + log('%s (...unlocked!)\n' % relp) + if state.stamped(t) == None: + err('%s: failed in another thread\n' % relp) + retcode[0] = 2 + l.unlock() # build() reacquires it + jwack.start_job(t, lambda: build(t), lambda t,rv: done(t,rv)) + else: + l.unlock() + return retcode[0] if not vars.DEPTH: diff --git a/state.py b/state.py index 5ab86b4..2857c36 100644 --- a/state.py +++ b/state.py @@ -70,6 +70,7 @@ def is_generated(t): def start(t): + unstamp(t) open(_sname('dep', t), 'w').close() open(_sname('gen', t), 'w').close() # it's definitely a generated file @@ -77,13 +78,14 @@ def start(t): class Lock: def __init__(self, t): self.lockname = _sname('lock', t) + self.tmpname = _sname('lock%d' % os.getpid(), t) self.owned = False def __del__(self): if self.owned: self.unlock() - def lock(self): + def trylock(self): try: os.mkfifo(self.lockname, 0600) self.owned = True @@ -97,18 +99,22 @@ class Lock: if not self.owned: raise Exception("can't unlock %r - we don't own it" % self.lockname) - fd = None + # make sure no readers can connect try: - fd = os.open(self.lockname, os.O_WRONLY|os.O_NONBLOCK) + os.rename(self.lockname, self.tmpname) + except OSError, e: + if e.errno == errno.ENOENT: # 'make clean' might do this + self.owned = False + return + try: + # ping any connected readers + os.close(os.open(self.tmpname, os.O_WRONLY|os.O_NONBLOCK)) except OSError, e: if e.errno == errno.ENXIO: # no readers open; that's ok pass - elif e.errno == errno.ENOENT: # 'make clean' might do this - pass else: raise - unlink(self.lockname) # make sure no new readers can connect - if fd != None: os.close(fd) # now unlock any existing readers + os.unlink(self.tmpname) self.owned = False def wait(self): @@ -117,10 +123,8 @@ class Lock: try: # open() will finish only when a writer exists and does close() os.close(os.open(self.lockname, os.O_RDONLY)) - #sys.stderr.write('lock %r waited ok\n' % self.lockname) except OSError, e: if e.errno == errno.ENOENT: - #sys.stderr.write('lock %r missing\n' % self.lockname) pass # it's not even unlocked or was unlocked earlier else: raise diff --git a/t/clean.do b/t/clean.do index 68eadb5..1fde6f4 100644 --- a/t/clean.do +++ b/t/clean.do @@ -1,2 +1,3 @@ redo example/clean curse/clean rm -f c c.c c.c.c c.c.c.b c.c.c.b.b d +rm -f hello [by]ellow *.o *~ .*~ CC LD