From 2b4fe812e28b07bfecb841042d5d57340ddad803 Mon Sep 17 00:00:00 2001 From: Avery Pennarun Date: Tue, 11 Dec 2018 02:57:29 +0000 Subject: [PATCH] Some renaming and comments to try to clarify builder and jobserver. The code is still a bit spaghetti-like, especialy when it comes to redo-unlocked, but at least the new names are slightly more comprehensible. --- redo/builder.py | 163 ++++++++++++++++++++++++++++--------------- redo/cmd_ifchange.py | 2 +- redo/cmd_redo.py | 2 +- redo/jobserver.py | 92 ++++++++++++++++++++---- 4 files changed, 189 insertions(+), 70 deletions(-) diff --git a/redo/builder.py b/redo/builder.py index ac2a47c..2499f4c 100644 --- a/redo/builder.py +++ b/redo/builder.py @@ -1,3 +1,4 @@ +"""Code for parallel-building a set of targets, if needed.""" import sys, os, errno, stat, signal, time from . import cycles, env, jobserver, logs, state, paths from .helpers import unlink, close_on_exec @@ -29,6 +30,13 @@ def close_stdin(): def start_stdin_log_reader(status, details, pretty, color, debug_locks, debug_pids): + """Redirect stderr to a redo-log instance with the given options. + + Then we automatically run logs.setup() to send the right data format + to that redo-log instance. + + After this, be sure to run await_log_reader() before exiting. + """ global log_reader_pid r, w = os.pipe() # main pipe to redo-log ar, aw = os.pipe() # ack pipe from redo-log --ack-fd @@ -87,6 +95,7 @@ def start_stdin_log_reader(status, details, pretty, color, def await_log_reader(): + """Await the redo-log instance we redirected stderr to, if any.""" if not env.v.LOG: return if log_reader_pid > 0: @@ -107,7 +116,7 @@ class ImmediateReturn(Exception): self.rv = rv -class BuildJob(object): +class _BuildJob(object): def __init__(self, t, sf, lock, shouldbuildfunc, donefunc): self.t = t # original target name, not relative to env.v.BASE self.sf = sf @@ -123,7 +132,11 @@ class BuildJob(object): self.donefunc = donefunc self.before_t = _try_stat(self.t) + # attributes of the running process + self.f = None + def start(self): + """Actually start running this job in a subproc, if needed.""" assert self.lock.owned try: try: @@ -135,16 +148,17 @@ class BuildJob(object): # target doesn't need to be built; skip the whole task if is_target: meta('unchanged', state.target_relpath(self.t)) - return self._after2(0) + return self._finalize(0) except ImmediateReturn, e: - return self._after2(e.rv) + return self._finalize(e.rv) if env.v.NO_OOB or dirty == True: # pylint: disable=singleton-comparison - self._start_do() + self._start_self() else: - self._start_unlocked(dirty) + self._start_deps_unlocked(dirty) - def _start_do(self): + def _start_self(self): + """Run jobserver.start() to build this object's target file.""" assert self.lock.owned t = self.t sf = self.sf @@ -159,7 +173,7 @@ class BuildJob(object): sf.set_override() sf.set_checked() sf.save() - return self._after2(0) + return self._finalize(0) if (os.path.exists(t) and not os.path.isdir(t + '/.') and not sf.is_generated): # an existing source file that was not generated by us. @@ -170,22 +184,21 @@ class BuildJob(object): debug2("-- static (%r)\n" % t) sf.set_static() sf.save() - return self._after2(0) + return self._finalize(0) sf.zap_deps1() (dodir, dofile, _, basename, ext) = paths.find_do_file(sf) if not dofile: if os.path.exists(t): sf.set_static() sf.save() - return self._after2(0) + return self._finalize(0) else: err('no rule to redo %r\n' % t) - return self._after2(1) + return self._finalize(1) unlink(self.tmpname1) unlink(self.tmpname2) ffd = os.open(self.tmpname1, os.O_CREAT|os.O_RDWR|os.O_EXCL, 0666) close_on_exec(ffd, True) - # pylint: disable=attribute-defined-outside-init self.f = os.fdopen(ffd, 'w+') # this will run in the dofile's directory, so use only basenames here arg1 = basename + ext # target name (including extension) @@ -208,31 +221,36 @@ class BuildJob(object): # that way redo-log won't trace into an obsolete logfile. if env.v.LOG: open(state.logname(self.sf.id), 'w') - # FIXME: put these variables somewhere else, instead of on-the-fly - # extending this class! - # pylint: disable=attribute-defined-outside-init - self.dodir = dodir - self.basename = basename - self.ext = ext - self.argv = argv dof = state.File(name=os.path.join(dodir, dofile)) dof.set_static() dof.save() state.commit() meta('do', state.target_relpath(t)) - jobserver.start_job(t, self._do_subproc, self._after) + def call_subproc(): + self._subproc(dodir, basename, ext, argv) + def call_exited(t, rv): + self._subproc_exited(t, rv, argv) + jobserver.start(t, call_subproc, call_exited) - def _start_unlocked(self, dirty): - # out-of-band redo of some sub-objects. This happens when we're not - # quite sure if t needs to be built or not (because some children - # look dirty, but might turn out to be clean thanks to checksums). - # We have to call redo-unlocked to figure it all out. - # - # Note: redo-unlocked will handle all the updating of sf, so we - # don't have to do it here, nor call _after1. However, we have to - # hold onto the lock because otherwise we would introduce a race - # condition; that's why it's called redo-unlocked, because it doesn't - # grab a lock. + def _start_deps_unlocked(self, dirty): + """Run jobserver.start to build objects needed to check deps. + + Out-of-band redo of some sub-objects. This happens when we're not + quite sure if t needs to be built or not (because some children + look dirty, but might turn out to be clean thanks to redo-stamp + checksums). We have to call redo-unlocked to figure it all out. + + Note: redo-unlocked will handle all the updating of sf, so we don't + have to do it here, nor call _record_new_state. However, we have to + hold onto the lock because otherwise we would introduce a race + condition; that's why it's called redo-unlocked, because it doesn't + grab a lock. + """ + # FIXME: redo-unlocked is kind of a weird hack. + # Maybe we should just start jobs to build the necessary deps + # directly from this process, and when done, reconsider building + # the target we started with. But that makes this one process's + # build recursive, where currently it's flat. here = os.getcwd() def _fix(p): return state.relpath(os.path.join(env.v.BASE, p), here) @@ -240,32 +258,35 @@ class BuildJob(object): list(set(_fix(d.name) for d in dirty))) meta('check', state.target_relpath(self.t)) state.commit() - def run(): + def subtask(): os.environ['REDO_DEPTH'] = env.v.DEPTH + ' ' # python ignores SIGPIPE signal.signal(signal.SIGPIPE, signal.SIG_DFL) os.execvp(argv[0], argv) assert 0 # returns only if there's an exception - def after(t, rv): - return self._after2(rv) - jobserver.start_job(self.t, run, after) + def job_exited(t, rv): + return self._finalize(rv) + jobserver.start(self.t, jobfunc=subtask, donefunc=job_exited) - def _do_subproc(self): + def _subproc(self, dodir, basename, ext, argv): + """The function by jobserver.start to exec the build script. + + This is run in the *child* process. + """ # careful: REDO_PWD was the PWD relative to the STARTPATH at the time # we *started* building the current target; but that target ran # redo-ifchange, and it might have done it from a different directory # than we started it in. So os.getcwd() might be != REDO_PWD right # now. assert state.is_flushed() - dn = self.dodir - newp = os.path.realpath(dn) + newp = os.path.realpath(dodir) os.environ['REDO_PWD'] = state.relpath(newp, env.v.STARTDIR) - os.environ['REDO_TARGET'] = self.basename + self.ext + os.environ['REDO_TARGET'] = basename + ext os.environ['REDO_DEPTH'] = env.v.DEPTH + ' ' cycles.add(self.lock.fid) - if dn: - os.chdir(dn) + if dodir: + os.chdir(dodir) os.dup2(self.f.fileno(), 1) os.close(self.f.fileno()) close_on_exec(1, False) @@ -290,23 +311,35 @@ class BuildJob(object): os.environ['REDO_LOG'] = '' signal.signal(signal.SIGPIPE, signal.SIG_DFL) # python ignores SIGPIPE if env.v.VERBOSE or env.v.XTRACE: - logs.write('* %s' % ' '.join(self.argv).replace('\n', ' ')) - os.execvp(self.argv[0], self.argv) + logs.write('* %s' % ' '.join(argv).replace('\n', ' ')) + os.execvp(argv[0], argv) # FIXME: it would be nice to log the exit code to logf. # But that would have to happen in the parent process, which doesn't # have logf open. assert 0 # returns only if there's an exception - def _after(self, t, rv): + def _subproc_exited(self, t, rv, argv): + """Called by the jobserver when our subtask exits. + + This is run in the *parent* process. + """ try: state.check_sane() - rv = self._after1(t, rv) + rv = self._record_new_state(t, rv, argv) state.commit() finally: - self._after2(rv) + self._finalize(rv) - def _after1(self, t, rv): + def _record_new_state(self, t, rv, argv): + """After a subtask finishes, handle its changes to the output file. + + This is run in the *parent* process. + + This includes renaming temp files into place and detecting mistakes + (like writing directly to $1 instead of $3). We also have to record + the new file stamp data for the completed target. + """ f = self.f before_t = self.before_t after_t = _try_stat(t) @@ -315,11 +348,11 @@ class BuildJob(object): if (after_t and (not before_t or before_t.st_mtime != after_t.st_mtime) and not stat.S_ISDIR(after_t.st_mode)): - err('%s modified %s directly!\n' % (self.argv[2], t)) + err('%s modified %s directly!\n' % (argv[2], t)) err('...you should update $3 (a temp file) or stdout, not $1.\n') rv = 206 elif st2 and st1.st_size > 0: - err('%s wrote to stdout *and* created $3.\n' % self.argv[2]) + err('%s wrote to stdout *and* created $3.\n' % argv[2]) err('...you should write status messages to stderr, not stdout.\n') rv = 207 if rv == 0: @@ -345,7 +378,7 @@ class BuildJob(object): unlink(t) else: err('%s: can\'t save stdout to %r: %s\n' % - (self.argv[2], t, e.strerror)) + (argv[2], t, e.strerror)) rv = 1000 if st2: os.unlink(self.tmpname2) @@ -376,7 +409,12 @@ class BuildJob(object): meta('done', '%d %s' % (rv, state.target_relpath(self.t))) return rv - def _after2(self, rv): + def _finalize(self, rv): + """After a target is built, report completion and unlock. + + This is run in the *parent* process. + Note: you usually need to call _record_new_state() first. + """ try: self.donefunc(self.t, rv) assert self.lock.owned @@ -384,7 +422,19 @@ class BuildJob(object): self.lock.unlock() -def main(targets, shouldbuildfunc): +def run(targets, shouldbuildfunc): + """Build the given list of targets, if necessary. + + Builds in parallel using whatever jobserver tokens can be obtained. + + Args: + targets: a list of target filenames to consider building. + shouldbuildfunc: a function(target) which determines whether the given + target needs to be built, as of the time it is called. + + Returns: + 0 if all necessary targets returned exit code zero; nonzero otherwise. + """ retcode = [0] # a list so that it can be reassigned from done() if env.v.SHUFFLE: import random @@ -392,7 +442,7 @@ def main(targets, shouldbuildfunc): locked = [] - def done(t, rv): + def job_exited(t, rv): if rv: retcode[0] = 1 @@ -456,7 +506,9 @@ def main(targets, shouldbuildfunc): # FIXME: separate obtaining the fid from creating the File. # FIXME: maybe integrate locking into the File object? f.refresh() - BuildJob(t, f, lock, shouldbuildfunc, done).start() + _BuildJob(t, f, lock, + shouldbuildfunc=shouldbuildfunc, + donefunc=job_exited).start() state.commit() assert state.is_flushed() lock = None @@ -520,8 +572,9 @@ def main(targets, shouldbuildfunc): retcode[0] = 2 lock.unlock() else: - BuildJob(t, state.File(fid=fid), lock, - shouldbuildfunc, done).start() + _BuildJob(t, state.File(fid=fid), lock, + shouldbuildfunc=shouldbuildfunc, + donefunc=job_exited).start() lock = None state.commit() return retcode[0] diff --git a/redo/cmd_ifchange.py b/redo/cmd_ifchange.py index 91004d2..b621e2f 100644 --- a/redo/cmd_ifchange.py +++ b/redo/cmd_ifchange.py @@ -42,7 +42,7 @@ def main(): f.add_dep('m', t) f.save() state.commit() - rv = builder.main(targets, should_build) + rv = builder.run(targets, should_build) finally: try: state.rollback() diff --git a/redo/cmd_redo.py b/redo/cmd_redo.py index 04f4bb8..6a0e6f2 100644 --- a/redo/cmd_redo.py +++ b/redo/cmd_redo.py @@ -102,7 +102,7 @@ def main(): jobserver.setup(j) try: assert state.is_flushed() - retcode = builder.main(targets, lambda t: (True, True)) + retcode = builder.run(targets, lambda t: (True, True)) assert state.is_flushed() finally: try: diff --git a/redo/jobserver.py b/redo/jobserver.py index 4c3dbd1..46f4cfa 100644 --- a/redo/jobserver.py +++ b/redo/jobserver.py @@ -92,6 +92,11 @@ def _debug(s): def _create_tokens(n): + """Materialize and own n tokens. + + If there are any cheater tokens active, they each destroy one matching + newly-created token. + """ global _mytokens, _cheats assert n >= 0 assert _cheats >= 0 @@ -103,6 +108,7 @@ def _create_tokens(n): def _destroy_tokens(n): + """Destroy n tokens that are currently in our posession.""" global _mytokens assert _mytokens >= n _mytokens -= n @@ -253,6 +259,19 @@ def setup(maxjobs): def _wait(want_token, max_delay): + """Wait for a subproc to die or, if want_token, tokenfd to be readable. + + Does not actually read tokenfd once it's readable (so someone else might + read it before us). This function returns after max_delay seconds or + when at least one subproc dies or (if want_token) for tokenfd to be + readable. + + Args: + want_token: true if we should return when tokenfd is readable. + max_delay: max seconds to wait, or None to wait forever. + Returns: + None + """ rfds = _waitfds.keys() if want_token: rfds.append(_tokenfds[0]) @@ -295,12 +314,23 @@ def _wait(want_token, max_delay): def has_token(): + """Returns true if this process has a job token.""" assert _mytokens >= 0 if _mytokens >= 1: return True -def ensure_token(reason, max_delay=None): +def _ensure_token(reason, max_delay=None): + """Don't return until this process has a job token. + + Args: + reason: the reason (for debugging purposes) we need a token. Usually + the name of a target we want to build. + max_delay: the max time to wait for a token, or None if forever. + Returns: + None, but has_token() is now true *unless* max_delay is non-None + and we timed out. + """ global _mytokens assert state.is_flushed() assert _mytokens <= 1 @@ -330,6 +360,23 @@ def ensure_token(reason, max_delay=None): def ensure_token_or_cheat(reason, cheatfunc): + """Wait for a job token to become available, or cheat if possible. + + If we already have a token, we return immediately. If we have any + processes running *and* we don't own any tokens, we wait for a process + to finish, and then use that token. + + Otherwise, we're allowed to cheat. We call cheatfunc() occasionally + to consider cheating; if it returns n > 0, we materialize that many + cheater tokens and return. + + Args: + reason: the reason (for debugging purposes) we need a token. Usually + the name of a target we want to build. + cheatfunc: a function which returns n > 0 (usually 1) if we should + cheat right now, because we're the "foreground" process that must + be allowed to continue. + """ global _mytokens, _cheats backoff = 0.01 while not has_token(): @@ -337,8 +384,8 @@ def ensure_token_or_cheat(reason, cheatfunc): # If we already have a subproc running, then effectively we # already have a token. Don't create a cheater token unless # we're completely idle. - ensure_token(reason, max_delay=None) - ensure_token(reason, max_delay=min(1.0, backoff)) + _ensure_token(reason, max_delay=None) + _ensure_token(reason, max_delay=min(1.0, backoff)) backoff *= 2 if not has_token(): assert _mytokens == 0 @@ -351,10 +398,12 @@ def ensure_token_or_cheat(reason, cheatfunc): def running(): + """Returns true if we have any running jobs.""" return len(_waitfds) def wait_all(): + """Wait for all running jobs to finish.""" _debug("%d,%d -> wait_all\n" % (_mytokens, _cheats)) assert state.is_flushed() while 1: @@ -382,6 +431,7 @@ def wait_all(): def force_return_tokens(): + """Release or destroy all the tokens we own, in preparation for exit.""" n = len(_waitfds) _debug('%d,%d -> %d jobs left in force_return_tokens\n' % (_mytokens, _cheats, n)) @@ -401,14 +451,18 @@ def force_return_tokens(): assert state.is_flushed() -def _pre_job(r, w, pfn): - os.close(r) - if pfn: - pfn() - - class Job(object): + """Metadata about a running job.""" + def __init__(self, name, pid, donefunc): + """Initialize a job. + + Args: + name: the name of the job (usually a target filename). + pid: the pid of the subprocess running this job. + donefunc: the function(name, return_value) to call when the + subprocess exits. + """ self.name = name self.pid = pid self.rv = None @@ -418,7 +472,19 @@ class Job(object): return 'Job(%s,%d)' % (self.name, self.pid) -def start_job(reason, jobfunc, donefunc): +def start(reason, jobfunc, donefunc): + """Start a new job. Must only be run with has_token() true. + + Args: + reason: the reason the job is running. Usually the filename of a + target being built. + jobfunc: the function() to execute, **in a subprocess**, to run the job. + Usually this calls os.execve(). It is an error for this function to + ever return. If it does, we return a non-zero exit code to the parent + process (the one which ran start()). + donefunc: the function(reason, return_value) to call **in the parent** + when the subprocess exits. + """ assert state.is_flushed() assert _mytokens <= 1 assert _mytokens == 1 @@ -429,12 +495,12 @@ def start_job(reason, jobfunc, donefunc): pid = os.fork() if pid == 0: # child - os.close(r) rv = 201 try: + os.close(r) try: - rv = jobfunc() or 0 - _debug('jobfunc completed (%r, %r)\n' % (jobfunc, rv)) + rv = jobfunc() + assert 0, 'jobfunc returned?! (%r, %r)' % (jobfunc, rv) except Exception: # pylint: disable=broad-except import traceback traceback.print_exc()