A whole bunch of cleanups to state.Lock.
Now t/curse passes again when parallelized (except for the countall mismatch, since we haven't fixed the source of that problem yet). At least it's consistent now. There's a bunch of stuff rearranged in here, but the actual important problem was that we were doing unlink() on the lock fifo even if ENXIO, which meant a reader could connect in between ENXIO and unlink(), and thus never get notified of the disconnection. This would cause the build to randomly freeze.
This commit is contained in:
parent
132ff02840
commit
362ca2997a
6 changed files with 65 additions and 43 deletions
2
clean.do
2
clean.do
|
|
@ -1,3 +1,3 @@
|
||||||
redo t/clean
|
redo t/clean
|
||||||
rm -f t/hello t/[by]ellow t/*.o *~ .*~ */*~ */.*~ *.pyc t/CC t/LD
|
rm -f *~ .*~ */*~ */.*~ *.pyc
|
||||||
rm -rf .redo t/.redo
|
rm -rf .redo t/.redo
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ def mkdirp(d, mode=None):
|
||||||
def log_(s):
|
def log_(s):
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
sys.stderr.write(s)
|
sys.stderr.write(s)
|
||||||
|
#sys.stderr.write('%d %s' % (os.getpid(), s))
|
||||||
sys.stderr.flush()
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
21
jwack.py
21
jwack.py
|
|
@ -81,6 +81,7 @@ def wait(want_token):
|
||||||
rfds = _waitfds.keys()
|
rfds = _waitfds.keys()
|
||||||
if _fds and want_token:
|
if _fds and want_token:
|
||||||
rfds.append(_fds[0])
|
rfds.append(_fds[0])
|
||||||
|
assert(rfds)
|
||||||
r,w,x = select.select(rfds, [], [])
|
r,w,x = select.select(rfds, [], [])
|
||||||
_debug('_fds=%r; wfds=%r; readable: %r\n' % (_fds, _waitfds, r))
|
_debug('_fds=%r; wfds=%r; readable: %r\n' % (_fds, _waitfds, r))
|
||||||
for fd in r:
|
for fd in r:
|
||||||
|
|
@ -99,6 +100,7 @@ def wait(want_token):
|
||||||
pd.rv = os.WEXITSTATUS(rv)
|
pd.rv = os.WEXITSTATUS(rv)
|
||||||
else:
|
else:
|
||||||
pd.rv = -os.WTERMSIG(rv)
|
pd.rv = -os.WTERMSIG(rv)
|
||||||
|
pd.donefunc(pd.name, pd.rv)
|
||||||
|
|
||||||
|
|
||||||
def get_token(reason):
|
def get_token(reason):
|
||||||
|
|
@ -119,9 +121,13 @@ def get_token(reason):
|
||||||
_debug('(%r) got a token (%r).\n' % (reason, b))
|
_debug('(%r) got a token (%r).\n' % (reason, b))
|
||||||
|
|
||||||
|
|
||||||
|
def running():
|
||||||
|
return len(_waitfds)
|
||||||
|
|
||||||
|
|
||||||
def wait_all():
|
def wait_all():
|
||||||
_debug("wait_all\n")
|
_debug("wait_all\n")
|
||||||
while _waitfds:
|
while running():
|
||||||
_debug("wait_all: wait()\n")
|
_debug("wait_all: wait()\n")
|
||||||
wait(want_token=0)
|
wait(want_token=0)
|
||||||
_debug("wait_all: empty list\n")
|
_debug("wait_all: empty list\n")
|
||||||
|
|
@ -134,7 +140,7 @@ def wait_all():
|
||||||
if len(bb) != _toplevel-1:
|
if len(bb) != _toplevel-1:
|
||||||
raise Exception('on exit: expected %d tokens; found only %d'
|
raise Exception('on exit: expected %d tokens; found only %d'
|
||||||
% (_toplevel-1, len(b)))
|
% (_toplevel-1, len(b)))
|
||||||
_debug("wait_all: done\n")
|
os.write(_fds[1], bb)
|
||||||
|
|
||||||
|
|
||||||
def force_return_tokens():
|
def force_return_tokens():
|
||||||
|
|
@ -155,13 +161,17 @@ def _pre_job(r, w, pfn):
|
||||||
|
|
||||||
|
|
||||||
class Job:
|
class Job:
|
||||||
def __init__(self, name, pid):
|
def __init__(self, name, pid, donefunc):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.pid = pid
|
self.pid = pid
|
||||||
self.rv = None
|
self.rv = None
|
||||||
|
self.donefunc = donefunc
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'Job(%s,%d)' % (self.name, self.pid)
|
||||||
|
|
||||||
|
|
||||||
def start_job(reason, jobfunc):
|
def start_job(reason, jobfunc, donefunc):
|
||||||
setup(1)
|
setup(1)
|
||||||
get_token(reason)
|
get_token(reason)
|
||||||
r,w = os.pipe()
|
r,w = os.pipe()
|
||||||
|
|
@ -180,6 +190,5 @@ def start_job(reason, jobfunc):
|
||||||
os._exit(201)
|
os._exit(201)
|
||||||
# else we're the parent process
|
# else we're the parent process
|
||||||
os.close(w)
|
os.close(w)
|
||||||
pd = Job(reason, pid)
|
pd = Job(reason, pid, donefunc)
|
||||||
_waitfds[r] = pd
|
_waitfds[r] = pd
|
||||||
return pd
|
|
||||||
|
|
|
||||||
61
redo.py
61
redo.py
|
|
@ -48,14 +48,12 @@ if is_root:
|
||||||
# deliberately starts more than one redo on the same repository, it's
|
# deliberately starts more than one redo on the same repository, it's
|
||||||
# sort of ok.
|
# sort of ok.
|
||||||
mkdirp('%s/.redo' % base)
|
mkdirp('%s/.redo' % base)
|
||||||
for f in glob.glob('%s/.redo/lock^*' % base):
|
for f in glob.glob('%s/.redo/lock*' % base):
|
||||||
os.unlink(f)
|
os.unlink(f)
|
||||||
|
|
||||||
|
|
||||||
class BuildError(Exception):
|
class BuildError(Exception):
|
||||||
pass
|
pass
|
||||||
class BuildLocked(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def _possible_do_files(t):
|
def _possible_do_files(t):
|
||||||
|
|
@ -100,7 +98,6 @@ def _build(t):
|
||||||
# which is undesirable since hello.c existed already.
|
# which is undesirable since hello.c existed already.
|
||||||
state.stamp(t)
|
state.stamp(t)
|
||||||
return # success
|
return # success
|
||||||
state.unstamp(t)
|
|
||||||
state.start(t)
|
state.start(t)
|
||||||
(dofile, basename, ext) = find_do_file(t)
|
(dofile, basename, ext) = find_do_file(t)
|
||||||
if not dofile:
|
if not dofile:
|
||||||
|
|
@ -145,7 +142,7 @@ def _build(t):
|
||||||
|
|
||||||
def build(t):
|
def build(t):
|
||||||
lock = state.Lock(t)
|
lock = state.Lock(t)
|
||||||
lock.lock()
|
lock.trylock()
|
||||||
if not lock.owned:
|
if not lock.owned:
|
||||||
log('%s (locked...)\n' % relpath(t, vars.STARTDIR))
|
log('%s (locked...)\n' % relpath(t, vars.STARTDIR))
|
||||||
os._exit(199)
|
os._exit(199)
|
||||||
|
|
@ -160,34 +157,44 @@ def build(t):
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
retcode = 0
|
retcode = [0] # a list so that it can be reassigned from done()
|
||||||
locked = {}
|
|
||||||
waits = {}
|
|
||||||
if vars.SHUFFLE:
|
if vars.SHUFFLE:
|
||||||
random.shuffle(targets)
|
random.shuffle(targets)
|
||||||
|
|
||||||
|
locked = []
|
||||||
|
|
||||||
|
def done(t, rv):
|
||||||
|
if rv == 199:
|
||||||
|
locked.append(t)
|
||||||
|
elif rv:
|
||||||
|
err('%s: exit code was %r\n' % (t, rv))
|
||||||
|
retcode[0] = 1
|
||||||
|
|
||||||
for t in targets:
|
for t in targets:
|
||||||
if os.path.exists('%s/all.do' % t):
|
if os.path.exists('%s/all.do' % t):
|
||||||
# t is a directory, but it has a default target
|
# t is a directory, but it has a default target
|
||||||
t = '%s/all' % t
|
t = '%s/all' % t
|
||||||
waits[t] = jwack.start_job(t, lambda: build(t))
|
tt = t
|
||||||
jwack.wait_all()
|
jwack.start_job(t, lambda: build(t), lambda t,rv: done(t,rv))
|
||||||
for t,pd in waits.items():
|
while locked or jwack.running():
|
||||||
assert(pd.rv != None)
|
jwack.wait_all()
|
||||||
if pd.rv == 199:
|
if locked:
|
||||||
# target was locked
|
t = locked.pop(0)
|
||||||
locked[t] = 1
|
l = state.Lock(t)
|
||||||
elif pd.rv:
|
while not l.owned:
|
||||||
err('%s: exit code was %r\n' % (t, pd.rv))
|
l.wait()
|
||||||
retcode = 1
|
l.trylock()
|
||||||
for t in locked.keys():
|
assert(l.owned)
|
||||||
lock = state.Lock(t)
|
relp = relpath(t, vars.STARTDIR)
|
||||||
lock.wait()
|
log('%s (...unlocked!)\n' % relp)
|
||||||
relp = relpath(t, vars.STARTDIR)
|
if state.stamped(t) == None:
|
||||||
log('%s (...unlocked!)\n' % relp)
|
err('%s: failed in another thread\n' % relp)
|
||||||
if state.stamped(t) == None:
|
retcode[0] = 2
|
||||||
err('%s: failed in another thread\n' % relp)
|
l.unlock() # build() reacquires it
|
||||||
retcode = 2
|
jwack.start_job(t, lambda: build(t), lambda t,rv: done(t,rv))
|
||||||
return retcode
|
else:
|
||||||
|
l.unlock()
|
||||||
|
return retcode[0]
|
||||||
|
|
||||||
|
|
||||||
if not vars.DEPTH:
|
if not vars.DEPTH:
|
||||||
|
|
|
||||||
22
state.py
22
state.py
|
|
@ -70,6 +70,7 @@ def is_generated(t):
|
||||||
|
|
||||||
|
|
||||||
def start(t):
|
def start(t):
|
||||||
|
unstamp(t)
|
||||||
open(_sname('dep', t), 'w').close()
|
open(_sname('dep', t), 'w').close()
|
||||||
open(_sname('gen', t), 'w').close() # it's definitely a generated file
|
open(_sname('gen', t), 'w').close() # it's definitely a generated file
|
||||||
|
|
||||||
|
|
@ -77,13 +78,14 @@ def start(t):
|
||||||
class Lock:
|
class Lock:
|
||||||
def __init__(self, t):
|
def __init__(self, t):
|
||||||
self.lockname = _sname('lock', t)
|
self.lockname = _sname('lock', t)
|
||||||
|
self.tmpname = _sname('lock%d' % os.getpid(), t)
|
||||||
self.owned = False
|
self.owned = False
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if self.owned:
|
if self.owned:
|
||||||
self.unlock()
|
self.unlock()
|
||||||
|
|
||||||
def lock(self):
|
def trylock(self):
|
||||||
try:
|
try:
|
||||||
os.mkfifo(self.lockname, 0600)
|
os.mkfifo(self.lockname, 0600)
|
||||||
self.owned = True
|
self.owned = True
|
||||||
|
|
@ -97,18 +99,22 @@ class Lock:
|
||||||
if not self.owned:
|
if not self.owned:
|
||||||
raise Exception("can't unlock %r - we don't own it"
|
raise Exception("can't unlock %r - we don't own it"
|
||||||
% self.lockname)
|
% self.lockname)
|
||||||
fd = None
|
# make sure no readers can connect
|
||||||
try:
|
try:
|
||||||
fd = os.open(self.lockname, os.O_WRONLY|os.O_NONBLOCK)
|
os.rename(self.lockname, self.tmpname)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno == errno.ENOENT: # 'make clean' might do this
|
||||||
|
self.owned = False
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
# ping any connected readers
|
||||||
|
os.close(os.open(self.tmpname, os.O_WRONLY|os.O_NONBLOCK))
|
||||||
except OSError, e:
|
except OSError, e:
|
||||||
if e.errno == errno.ENXIO: # no readers open; that's ok
|
if e.errno == errno.ENXIO: # no readers open; that's ok
|
||||||
pass
|
pass
|
||||||
elif e.errno == errno.ENOENT: # 'make clean' might do this
|
|
||||||
pass
|
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
unlink(self.lockname) # make sure no new readers can connect
|
os.unlink(self.tmpname)
|
||||||
if fd != None: os.close(fd) # now unlock any existing readers
|
|
||||||
self.owned = False
|
self.owned = False
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
|
@ -117,10 +123,8 @@ class Lock:
|
||||||
try:
|
try:
|
||||||
# open() will finish only when a writer exists and does close()
|
# open() will finish only when a writer exists and does close()
|
||||||
os.close(os.open(self.lockname, os.O_RDONLY))
|
os.close(os.open(self.lockname, os.O_RDONLY))
|
||||||
#sys.stderr.write('lock %r waited ok\n' % self.lockname)
|
|
||||||
except OSError, e:
|
except OSError, e:
|
||||||
if e.errno == errno.ENOENT:
|
if e.errno == errno.ENOENT:
|
||||||
#sys.stderr.write('lock %r missing\n' % self.lockname)
|
|
||||||
pass # it's not even unlocked or was unlocked earlier
|
pass # it's not even unlocked or was unlocked earlier
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,3 @@
|
||||||
redo example/clean curse/clean
|
redo example/clean curse/clean
|
||||||
rm -f c c.c c.c.c c.c.c.b c.c.c.b.b d
|
rm -f c c.c c.c.c c.c.c.b c.c.c.b.b d
|
||||||
|
rm -f hello [by]ellow *.o *~ .*~ CC LD
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue