Some renaming and comments to try to clarify builder and jobserver.

The code is still a bit spaghetti-like, especialy when it comes to
redo-unlocked, but at least the new names are slightly more
comprehensible.
This commit is contained in:
Avery Pennarun 2018-12-11 02:57:29 +00:00
commit 2b4fe812e2
4 changed files with 189 additions and 70 deletions

View file

@ -1,3 +1,4 @@
"""Code for parallel-building a set of targets, if needed."""
import sys, os, errno, stat, signal, time import sys, os, errno, stat, signal, time
from . import cycles, env, jobserver, logs, state, paths from . import cycles, env, jobserver, logs, state, paths
from .helpers import unlink, close_on_exec from .helpers import unlink, close_on_exec
@ -29,6 +30,13 @@ def close_stdin():
def start_stdin_log_reader(status, details, pretty, color, def start_stdin_log_reader(status, details, pretty, color,
debug_locks, debug_pids): debug_locks, debug_pids):
"""Redirect stderr to a redo-log instance with the given options.
Then we automatically run logs.setup() to send the right data format
to that redo-log instance.
After this, be sure to run await_log_reader() before exiting.
"""
global log_reader_pid global log_reader_pid
r, w = os.pipe() # main pipe to redo-log r, w = os.pipe() # main pipe to redo-log
ar, aw = os.pipe() # ack pipe from redo-log --ack-fd ar, aw = os.pipe() # ack pipe from redo-log --ack-fd
@ -87,6 +95,7 @@ def start_stdin_log_reader(status, details, pretty, color,
def await_log_reader(): def await_log_reader():
"""Await the redo-log instance we redirected stderr to, if any."""
if not env.v.LOG: if not env.v.LOG:
return return
if log_reader_pid > 0: if log_reader_pid > 0:
@ -107,7 +116,7 @@ class ImmediateReturn(Exception):
self.rv = rv self.rv = rv
class BuildJob(object): class _BuildJob(object):
def __init__(self, t, sf, lock, shouldbuildfunc, donefunc): def __init__(self, t, sf, lock, shouldbuildfunc, donefunc):
self.t = t # original target name, not relative to env.v.BASE self.t = t # original target name, not relative to env.v.BASE
self.sf = sf self.sf = sf
@ -123,7 +132,11 @@ class BuildJob(object):
self.donefunc = donefunc self.donefunc = donefunc
self.before_t = _try_stat(self.t) self.before_t = _try_stat(self.t)
# attributes of the running process
self.f = None
def start(self): def start(self):
"""Actually start running this job in a subproc, if needed."""
assert self.lock.owned assert self.lock.owned
try: try:
try: try:
@ -135,16 +148,17 @@ class BuildJob(object):
# target doesn't need to be built; skip the whole task # target doesn't need to be built; skip the whole task
if is_target: if is_target:
meta('unchanged', state.target_relpath(self.t)) meta('unchanged', state.target_relpath(self.t))
return self._after2(0) return self._finalize(0)
except ImmediateReturn, e: except ImmediateReturn, e:
return self._after2(e.rv) return self._finalize(e.rv)
if env.v.NO_OOB or dirty == True: # pylint: disable=singleton-comparison if env.v.NO_OOB or dirty == True: # pylint: disable=singleton-comparison
self._start_do() self._start_self()
else: else:
self._start_unlocked(dirty) self._start_deps_unlocked(dirty)
def _start_do(self): def _start_self(self):
"""Run jobserver.start() to build this object's target file."""
assert self.lock.owned assert self.lock.owned
t = self.t t = self.t
sf = self.sf sf = self.sf
@ -159,7 +173,7 @@ class BuildJob(object):
sf.set_override() sf.set_override()
sf.set_checked() sf.set_checked()
sf.save() sf.save()
return self._after2(0) return self._finalize(0)
if (os.path.exists(t) and not os.path.isdir(t + '/.') if (os.path.exists(t) and not os.path.isdir(t + '/.')
and not sf.is_generated): and not sf.is_generated):
# an existing source file that was not generated by us. # an existing source file that was not generated by us.
@ -170,22 +184,21 @@ class BuildJob(object):
debug2("-- static (%r)\n" % t) debug2("-- static (%r)\n" % t)
sf.set_static() sf.set_static()
sf.save() sf.save()
return self._after2(0) return self._finalize(0)
sf.zap_deps1() sf.zap_deps1()
(dodir, dofile, _, basename, ext) = paths.find_do_file(sf) (dodir, dofile, _, basename, ext) = paths.find_do_file(sf)
if not dofile: if not dofile:
if os.path.exists(t): if os.path.exists(t):
sf.set_static() sf.set_static()
sf.save() sf.save()
return self._after2(0) return self._finalize(0)
else: else:
err('no rule to redo %r\n' % t) err('no rule to redo %r\n' % t)
return self._after2(1) return self._finalize(1)
unlink(self.tmpname1) unlink(self.tmpname1)
unlink(self.tmpname2) unlink(self.tmpname2)
ffd = os.open(self.tmpname1, os.O_CREAT|os.O_RDWR|os.O_EXCL, 0666) ffd = os.open(self.tmpname1, os.O_CREAT|os.O_RDWR|os.O_EXCL, 0666)
close_on_exec(ffd, True) close_on_exec(ffd, True)
# pylint: disable=attribute-defined-outside-init
self.f = os.fdopen(ffd, 'w+') self.f = os.fdopen(ffd, 'w+')
# this will run in the dofile's directory, so use only basenames here # this will run in the dofile's directory, so use only basenames here
arg1 = basename + ext # target name (including extension) arg1 = basename + ext # target name (including extension)
@ -208,31 +221,36 @@ class BuildJob(object):
# that way redo-log won't trace into an obsolete logfile. # that way redo-log won't trace into an obsolete logfile.
if env.v.LOG: if env.v.LOG:
open(state.logname(self.sf.id), 'w') open(state.logname(self.sf.id), 'w')
# FIXME: put these variables somewhere else, instead of on-the-fly
# extending this class!
# pylint: disable=attribute-defined-outside-init
self.dodir = dodir
self.basename = basename
self.ext = ext
self.argv = argv
dof = state.File(name=os.path.join(dodir, dofile)) dof = state.File(name=os.path.join(dodir, dofile))
dof.set_static() dof.set_static()
dof.save() dof.save()
state.commit() state.commit()
meta('do', state.target_relpath(t)) meta('do', state.target_relpath(t))
jobserver.start_job(t, self._do_subproc, self._after) def call_subproc():
self._subproc(dodir, basename, ext, argv)
def call_exited(t, rv):
self._subproc_exited(t, rv, argv)
jobserver.start(t, call_subproc, call_exited)
def _start_unlocked(self, dirty): def _start_deps_unlocked(self, dirty):
# out-of-band redo of some sub-objects. This happens when we're not """Run jobserver.start to build objects needed to check deps.
# quite sure if t needs to be built or not (because some children
# look dirty, but might turn out to be clean thanks to checksums). Out-of-band redo of some sub-objects. This happens when we're not
# We have to call redo-unlocked to figure it all out. quite sure if t needs to be built or not (because some children
# look dirty, but might turn out to be clean thanks to redo-stamp
# Note: redo-unlocked will handle all the updating of sf, so we checksums). We have to call redo-unlocked to figure it all out.
# don't have to do it here, nor call _after1. However, we have to
# hold onto the lock because otherwise we would introduce a race Note: redo-unlocked will handle all the updating of sf, so we don't
# condition; that's why it's called redo-unlocked, because it doesn't have to do it here, nor call _record_new_state. However, we have to
# grab a lock. hold onto the lock because otherwise we would introduce a race
condition; that's why it's called redo-unlocked, because it doesn't
grab a lock.
"""
# FIXME: redo-unlocked is kind of a weird hack.
# Maybe we should just start jobs to build the necessary deps
# directly from this process, and when done, reconsider building
# the target we started with. But that makes this one process's
# build recursive, where currently it's flat.
here = os.getcwd() here = os.getcwd()
def _fix(p): def _fix(p):
return state.relpath(os.path.join(env.v.BASE, p), here) return state.relpath(os.path.join(env.v.BASE, p), here)
@ -240,32 +258,35 @@ class BuildJob(object):
list(set(_fix(d.name) for d in dirty))) list(set(_fix(d.name) for d in dirty)))
meta('check', state.target_relpath(self.t)) meta('check', state.target_relpath(self.t))
state.commit() state.commit()
def run(): def subtask():
os.environ['REDO_DEPTH'] = env.v.DEPTH + ' ' os.environ['REDO_DEPTH'] = env.v.DEPTH + ' '
# python ignores SIGPIPE # python ignores SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL) signal.signal(signal.SIGPIPE, signal.SIG_DFL)
os.execvp(argv[0], argv) os.execvp(argv[0], argv)
assert 0 assert 0
# returns only if there's an exception # returns only if there's an exception
def after(t, rv): def job_exited(t, rv):
return self._after2(rv) return self._finalize(rv)
jobserver.start_job(self.t, run, after) jobserver.start(self.t, jobfunc=subtask, donefunc=job_exited)
def _do_subproc(self): def _subproc(self, dodir, basename, ext, argv):
"""The function by jobserver.start to exec the build script.
This is run in the *child* process.
"""
# careful: REDO_PWD was the PWD relative to the STARTPATH at the time # careful: REDO_PWD was the PWD relative to the STARTPATH at the time
# we *started* building the current target; but that target ran # we *started* building the current target; but that target ran
# redo-ifchange, and it might have done it from a different directory # redo-ifchange, and it might have done it from a different directory
# than we started it in. So os.getcwd() might be != REDO_PWD right # than we started it in. So os.getcwd() might be != REDO_PWD right
# now. # now.
assert state.is_flushed() assert state.is_flushed()
dn = self.dodir newp = os.path.realpath(dodir)
newp = os.path.realpath(dn)
os.environ['REDO_PWD'] = state.relpath(newp, env.v.STARTDIR) os.environ['REDO_PWD'] = state.relpath(newp, env.v.STARTDIR)
os.environ['REDO_TARGET'] = self.basename + self.ext os.environ['REDO_TARGET'] = basename + ext
os.environ['REDO_DEPTH'] = env.v.DEPTH + ' ' os.environ['REDO_DEPTH'] = env.v.DEPTH + ' '
cycles.add(self.lock.fid) cycles.add(self.lock.fid)
if dn: if dodir:
os.chdir(dn) os.chdir(dodir)
os.dup2(self.f.fileno(), 1) os.dup2(self.f.fileno(), 1)
os.close(self.f.fileno()) os.close(self.f.fileno())
close_on_exec(1, False) close_on_exec(1, False)
@ -290,23 +311,35 @@ class BuildJob(object):
os.environ['REDO_LOG'] = '' os.environ['REDO_LOG'] = ''
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # python ignores SIGPIPE signal.signal(signal.SIGPIPE, signal.SIG_DFL) # python ignores SIGPIPE
if env.v.VERBOSE or env.v.XTRACE: if env.v.VERBOSE or env.v.XTRACE:
logs.write('* %s' % ' '.join(self.argv).replace('\n', ' ')) logs.write('* %s' % ' '.join(argv).replace('\n', ' '))
os.execvp(self.argv[0], self.argv) os.execvp(argv[0], argv)
# FIXME: it would be nice to log the exit code to logf. # FIXME: it would be nice to log the exit code to logf.
# But that would have to happen in the parent process, which doesn't # But that would have to happen in the parent process, which doesn't
# have logf open. # have logf open.
assert 0 assert 0
# returns only if there's an exception # returns only if there's an exception
def _after(self, t, rv): def _subproc_exited(self, t, rv, argv):
"""Called by the jobserver when our subtask exits.
This is run in the *parent* process.
"""
try: try:
state.check_sane() state.check_sane()
rv = self._after1(t, rv) rv = self._record_new_state(t, rv, argv)
state.commit() state.commit()
finally: finally:
self._after2(rv) self._finalize(rv)
def _after1(self, t, rv): def _record_new_state(self, t, rv, argv):
"""After a subtask finishes, handle its changes to the output file.
This is run in the *parent* process.
This includes renaming temp files into place and detecting mistakes
(like writing directly to $1 instead of $3). We also have to record
the new file stamp data for the completed target.
"""
f = self.f f = self.f
before_t = self.before_t before_t = self.before_t
after_t = _try_stat(t) after_t = _try_stat(t)
@ -315,11 +348,11 @@ class BuildJob(object):
if (after_t and if (after_t and
(not before_t or before_t.st_mtime != after_t.st_mtime) and (not before_t or before_t.st_mtime != after_t.st_mtime) and
not stat.S_ISDIR(after_t.st_mode)): not stat.S_ISDIR(after_t.st_mode)):
err('%s modified %s directly!\n' % (self.argv[2], t)) err('%s modified %s directly!\n' % (argv[2], t))
err('...you should update $3 (a temp file) or stdout, not $1.\n') err('...you should update $3 (a temp file) or stdout, not $1.\n')
rv = 206 rv = 206
elif st2 and st1.st_size > 0: elif st2 and st1.st_size > 0:
err('%s wrote to stdout *and* created $3.\n' % self.argv[2]) err('%s wrote to stdout *and* created $3.\n' % argv[2])
err('...you should write status messages to stderr, not stdout.\n') err('...you should write status messages to stderr, not stdout.\n')
rv = 207 rv = 207
if rv == 0: if rv == 0:
@ -345,7 +378,7 @@ class BuildJob(object):
unlink(t) unlink(t)
else: else:
err('%s: can\'t save stdout to %r: %s\n' % err('%s: can\'t save stdout to %r: %s\n' %
(self.argv[2], t, e.strerror)) (argv[2], t, e.strerror))
rv = 1000 rv = 1000
if st2: if st2:
os.unlink(self.tmpname2) os.unlink(self.tmpname2)
@ -376,7 +409,12 @@ class BuildJob(object):
meta('done', '%d %s' % (rv, state.target_relpath(self.t))) meta('done', '%d %s' % (rv, state.target_relpath(self.t)))
return rv return rv
def _after2(self, rv): def _finalize(self, rv):
"""After a target is built, report completion and unlock.
This is run in the *parent* process.
Note: you usually need to call _record_new_state() first.
"""
try: try:
self.donefunc(self.t, rv) self.donefunc(self.t, rv)
assert self.lock.owned assert self.lock.owned
@ -384,7 +422,19 @@ class BuildJob(object):
self.lock.unlock() self.lock.unlock()
def main(targets, shouldbuildfunc): def run(targets, shouldbuildfunc):
"""Build the given list of targets, if necessary.
Builds in parallel using whatever jobserver tokens can be obtained.
Args:
targets: a list of target filenames to consider building.
shouldbuildfunc: a function(target) which determines whether the given
target needs to be built, as of the time it is called.
Returns:
0 if all necessary targets returned exit code zero; nonzero otherwise.
"""
retcode = [0] # a list so that it can be reassigned from done() retcode = [0] # a list so that it can be reassigned from done()
if env.v.SHUFFLE: if env.v.SHUFFLE:
import random import random
@ -392,7 +442,7 @@ def main(targets, shouldbuildfunc):
locked = [] locked = []
def done(t, rv): def job_exited(t, rv):
if rv: if rv:
retcode[0] = 1 retcode[0] = 1
@ -456,7 +506,9 @@ def main(targets, shouldbuildfunc):
# FIXME: separate obtaining the fid from creating the File. # FIXME: separate obtaining the fid from creating the File.
# FIXME: maybe integrate locking into the File object? # FIXME: maybe integrate locking into the File object?
f.refresh() f.refresh()
BuildJob(t, f, lock, shouldbuildfunc, done).start() _BuildJob(t, f, lock,
shouldbuildfunc=shouldbuildfunc,
donefunc=job_exited).start()
state.commit() state.commit()
assert state.is_flushed() assert state.is_flushed()
lock = None lock = None
@ -520,8 +572,9 @@ def main(targets, shouldbuildfunc):
retcode[0] = 2 retcode[0] = 2
lock.unlock() lock.unlock()
else: else:
BuildJob(t, state.File(fid=fid), lock, _BuildJob(t, state.File(fid=fid), lock,
shouldbuildfunc, done).start() shouldbuildfunc=shouldbuildfunc,
donefunc=job_exited).start()
lock = None lock = None
state.commit() state.commit()
return retcode[0] return retcode[0]

View file

@ -42,7 +42,7 @@ def main():
f.add_dep('m', t) f.add_dep('m', t)
f.save() f.save()
state.commit() state.commit()
rv = builder.main(targets, should_build) rv = builder.run(targets, should_build)
finally: finally:
try: try:
state.rollback() state.rollback()

View file

@ -102,7 +102,7 @@ def main():
jobserver.setup(j) jobserver.setup(j)
try: try:
assert state.is_flushed() assert state.is_flushed()
retcode = builder.main(targets, lambda t: (True, True)) retcode = builder.run(targets, lambda t: (True, True))
assert state.is_flushed() assert state.is_flushed()
finally: finally:
try: try:

View file

@ -92,6 +92,11 @@ def _debug(s):
def _create_tokens(n): def _create_tokens(n):
"""Materialize and own n tokens.
If there are any cheater tokens active, they each destroy one matching
newly-created token.
"""
global _mytokens, _cheats global _mytokens, _cheats
assert n >= 0 assert n >= 0
assert _cheats >= 0 assert _cheats >= 0
@ -103,6 +108,7 @@ def _create_tokens(n):
def _destroy_tokens(n): def _destroy_tokens(n):
"""Destroy n tokens that are currently in our posession."""
global _mytokens global _mytokens
assert _mytokens >= n assert _mytokens >= n
_mytokens -= n _mytokens -= n
@ -253,6 +259,19 @@ def setup(maxjobs):
def _wait(want_token, max_delay): def _wait(want_token, max_delay):
"""Wait for a subproc to die or, if want_token, tokenfd to be readable.
Does not actually read tokenfd once it's readable (so someone else might
read it before us). This function returns after max_delay seconds or
when at least one subproc dies or (if want_token) for tokenfd to be
readable.
Args:
want_token: true if we should return when tokenfd is readable.
max_delay: max seconds to wait, or None to wait forever.
Returns:
None
"""
rfds = _waitfds.keys() rfds = _waitfds.keys()
if want_token: if want_token:
rfds.append(_tokenfds[0]) rfds.append(_tokenfds[0])
@ -295,12 +314,23 @@ def _wait(want_token, max_delay):
def has_token(): def has_token():
"""Returns true if this process has a job token."""
assert _mytokens >= 0 assert _mytokens >= 0
if _mytokens >= 1: if _mytokens >= 1:
return True return True
def ensure_token(reason, max_delay=None): def _ensure_token(reason, max_delay=None):
"""Don't return until this process has a job token.
Args:
reason: the reason (for debugging purposes) we need a token. Usually
the name of a target we want to build.
max_delay: the max time to wait for a token, or None if forever.
Returns:
None, but has_token() is now true *unless* max_delay is non-None
and we timed out.
"""
global _mytokens global _mytokens
assert state.is_flushed() assert state.is_flushed()
assert _mytokens <= 1 assert _mytokens <= 1
@ -330,6 +360,23 @@ def ensure_token(reason, max_delay=None):
def ensure_token_or_cheat(reason, cheatfunc): def ensure_token_or_cheat(reason, cheatfunc):
"""Wait for a job token to become available, or cheat if possible.
If we already have a token, we return immediately. If we have any
processes running *and* we don't own any tokens, we wait for a process
to finish, and then use that token.
Otherwise, we're allowed to cheat. We call cheatfunc() occasionally
to consider cheating; if it returns n > 0, we materialize that many
cheater tokens and return.
Args:
reason: the reason (for debugging purposes) we need a token. Usually
the name of a target we want to build.
cheatfunc: a function which returns n > 0 (usually 1) if we should
cheat right now, because we're the "foreground" process that must
be allowed to continue.
"""
global _mytokens, _cheats global _mytokens, _cheats
backoff = 0.01 backoff = 0.01
while not has_token(): while not has_token():
@ -337,8 +384,8 @@ def ensure_token_or_cheat(reason, cheatfunc):
# If we already have a subproc running, then effectively we # If we already have a subproc running, then effectively we
# already have a token. Don't create a cheater token unless # already have a token. Don't create a cheater token unless
# we're completely idle. # we're completely idle.
ensure_token(reason, max_delay=None) _ensure_token(reason, max_delay=None)
ensure_token(reason, max_delay=min(1.0, backoff)) _ensure_token(reason, max_delay=min(1.0, backoff))
backoff *= 2 backoff *= 2
if not has_token(): if not has_token():
assert _mytokens == 0 assert _mytokens == 0
@ -351,10 +398,12 @@ def ensure_token_or_cheat(reason, cheatfunc):
def running(): def running():
"""Returns true if we have any running jobs."""
return len(_waitfds) return len(_waitfds)
def wait_all(): def wait_all():
"""Wait for all running jobs to finish."""
_debug("%d,%d -> wait_all\n" % (_mytokens, _cheats)) _debug("%d,%d -> wait_all\n" % (_mytokens, _cheats))
assert state.is_flushed() assert state.is_flushed()
while 1: while 1:
@ -382,6 +431,7 @@ def wait_all():
def force_return_tokens(): def force_return_tokens():
"""Release or destroy all the tokens we own, in preparation for exit."""
n = len(_waitfds) n = len(_waitfds)
_debug('%d,%d -> %d jobs left in force_return_tokens\n' _debug('%d,%d -> %d jobs left in force_return_tokens\n'
% (_mytokens, _cheats, n)) % (_mytokens, _cheats, n))
@ -401,14 +451,18 @@ def force_return_tokens():
assert state.is_flushed() assert state.is_flushed()
def _pre_job(r, w, pfn):
os.close(r)
if pfn:
pfn()
class Job(object): class Job(object):
"""Metadata about a running job."""
def __init__(self, name, pid, donefunc): def __init__(self, name, pid, donefunc):
"""Initialize a job.
Args:
name: the name of the job (usually a target filename).
pid: the pid of the subprocess running this job.
donefunc: the function(name, return_value) to call when the
subprocess exits.
"""
self.name = name self.name = name
self.pid = pid self.pid = pid
self.rv = None self.rv = None
@ -418,7 +472,19 @@ class Job(object):
return 'Job(%s,%d)' % (self.name, self.pid) return 'Job(%s,%d)' % (self.name, self.pid)
def start_job(reason, jobfunc, donefunc): def start(reason, jobfunc, donefunc):
"""Start a new job. Must only be run with has_token() true.
Args:
reason: the reason the job is running. Usually the filename of a
target being built.
jobfunc: the function() to execute, **in a subprocess**, to run the job.
Usually this calls os.execve(). It is an error for this function to
ever return. If it does, we return a non-zero exit code to the parent
process (the one which ran start()).
donefunc: the function(reason, return_value) to call **in the parent**
when the subprocess exits.
"""
assert state.is_flushed() assert state.is_flushed()
assert _mytokens <= 1 assert _mytokens <= 1
assert _mytokens == 1 assert _mytokens == 1
@ -429,12 +495,12 @@ def start_job(reason, jobfunc, donefunc):
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
# child # child
os.close(r)
rv = 201 rv = 201
try: try:
os.close(r)
try: try:
rv = jobfunc() or 0 rv = jobfunc()
_debug('jobfunc completed (%r, %r)\n' % (jobfunc, rv)) assert 0, 'jobfunc returned?! (%r, %r)' % (jobfunc, rv)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
import traceback import traceback
traceback.print_exc() traceback.print_exc()