"""Implementation of a GNU make-compatible jobserver.""" # # The basic idea is that both ends of a pipe (tokenfds) are shared with all # subprocesses. At startup, we write one "token" into the pipe for each # configured job. (So eg. redo -j20 will put 20 tokens in the pipe.) In # order to do work, you must first obtain a token, by reading the other # end of the pipe. When you're done working, you write the token back into # the pipe so that someone else can grab it. # # The toplevel process in the hierarchy is what creates the pipes in the # first place. Then it puts the pipe file descriptor numbers into MAKEFLAGS, # so that subprocesses can pull them back out. # # As usual, edge cases make all this a bit tricky: # # - Every process is defined as owning a token at startup time. This makes # sense because it's backward compatible with single-process make: if a # subprocess neither reads nor writes the pipe, then it has exactly one # token, so it's allowed to do one thread of work. # # - Thus, for symmetry, processes also must own a token at exit time. # # - In turn, to make *that* work, a parent process must destroy *its* token # upon launching a subprocess. (Destroy, not release, because the # subprocess has created its own token.) It can try to obtain another # token, but if none are available, it has to stop work until one of its # subprocesses finishes. When the subprocess finishes, its token is # destroyed, so the parent creates a new one. # # - If our process is going to stop and wait for a lock (eg. because we # depend on a target and someone else is already building that target), # we must give up our token. Otherwise, we're sucking up a "thread" (a # unit of parallelism) just to do nothing. If enough processes are waiting # on a particular lock, then the process building that target might end up # with only a single token, and everything gets serialized. # # - Unfortunately this leads to a problem: if we give up our token, we then # have to re-acquire a token before exiting, even if we want to exit with # an error code. # # - redo-log wants to linearize output so that it always prints log messages # in the order jobs were started; but because of the above, a job being # logged might end up with no tokens for a long time, waiting for some # other branch of the build to complete. # # As a result, we extend beyond GNU make's model and make things even more # complicated. We add a second pipe, cheatfds, which we use to "cheat" on # tokens if our particular job is in the foreground (ie. is the one # currently being tailed by redo-log -f). We add at most one token per # redo-log instance. If we are the foreground task, and we need a token, # and we don't have a token, and we don't have any subtasks (because if we # had a subtask, then we're not in the foreground), we synthesize our own # token by incrementing _mytokens and _cheats, but we don't read from # tokenfds. Then, when it's time to give up our token again, we also won't # write back to tokenfds, so the synthesized token disappears. # # Of course, all that then leads to *another* problem: every process must # hold a *real* token when it exits, because its parent has given up a # *real* token in order to start this subprocess. If we're holding a cheat # token when it's time to exit, then we can't meet this requirement. The # obvious thing to do would be to give up the cheat token and wait for a # real token, but that might take a very long time, and if we're the last # thing preventing our parent from exiting, then redo-log will sit around # following our parent until we finally get a token so we can exit, # defeating the whole purpose of cheating. Instead of waiting, we write our # "cheater" token to cheatfds. Then, any task, upon noticing one of its # subprocesses has finished, will check to see if there are any tokens on # cheatfds; if so, it will remove one of them and *not* re-create its # child's token, thus destroying the cheater token from earlier, and restoring # balance. # # Sorry this is so complicated. I couldn't think of a way to make it # simpler :) # import sys, os, errno, select, fcntl, signal from . import env, helpers, logs, state from .atoi import atoi _toplevel = 0 _mytokens = 1 _cheats = 0 _tokenfds = None _cheatfds = None _waitfds = {} def _debug(s): if 0: sys.stderr.write('job#%d: %s' % (os.getpid(), s)) def _create_tokens(n): """Materialize and own n tokens. If there are any cheater tokens active, they each destroy one matching newly-created token. """ global _mytokens, _cheats assert n >= 0 assert _cheats >= 0 for _ in range(n): if _cheats > 0: _cheats -= 1 else: _mytokens += 1 def _destroy_tokens(n): """Destroy n tokens that are currently in our posession.""" global _mytokens assert _mytokens >= n _mytokens -= n def _release(n): global _mytokens, _cheats assert n >= 0 assert _mytokens >= n _debug('%d,%d -> release(%d)\n' % (_mytokens, _cheats, n)) n_to_share = 0 for _ in range(n): _mytokens -= 1 if _cheats > 0: _cheats -= 1 else: n_to_share += 1 assert _mytokens >= 0 assert _cheats >= 0 if n_to_share: _debug('PUT tokenfds %d\n' % n_to_share) os.write(_tokenfds[1], b't' * n_to_share) def _release_except_mine(): assert _mytokens > 0 _release(_mytokens - 1) def release_mine(): assert _mytokens >= 1 _debug('%d,%d -> release_mine()\n' % (_mytokens, _cheats)) _release(1) class TimeoutError(Exception): pass def _timeout(sig, frame): raise TimeoutError() # We make the pipes use the first available fd numbers starting at startfd. # This makes it easier to differentiate different kinds of pipes when using # strace. def _make_pipe(startfd): (a, b) = os.pipe() fds = (fcntl.fcntl(a, fcntl.F_DUPFD, startfd), fcntl.fcntl(b, fcntl.F_DUPFD, startfd + 1)) os.close(a) os.close(b) return fds def _try_read(fd, n): """Try to read n bytes from fd. Returns: '' on EOF, None if EAGAIN.""" assert state.is_flushed() # using djb's suggested way of doing non-blocking reads from a blocking # socket: http://cr.yp.to/unix/nonblock.html # We can't just make the socket non-blocking, because we want to be # compatible with GNU Make, and they can't handle it. r, w, x = select.select([fd], [], [], 0) if not r: return None # try again # ok, the socket is readable - but some other process might get there # first. We have to set an alarm() in case our read() gets stuck. try: oldh = signal.signal(signal.SIGALRM, _timeout) signal.setitimer(signal.ITIMER_REAL, 0.01, 0.01) # emergency fallback try: b = os.read(fd, 1) except TimeoutError: return None # try again except OSError as e: if e.errno in (errno.EAGAIN, errno.EINTR): # interrupted or it was nonblocking return None # try again else: raise finally: signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, oldh) return b def _try_read_all(fd, n): bb = b'' while 1: b = _try_read(fd, n) if not b: break bb += b return bb def setup(maxjobs): """Start the jobserver (if it isn't already) with the given token count. Args: maxjobs: if nonzero, create a new jobserver with separate tokens from the one we inherited (if any). If zero and we inherited a jobserver, just use that. If zero and we didn't inherit a jobserver, create one with a default number of tokens (currently always 1). """ global _tokenfds, _cheatfds, _toplevel assert maxjobs >= 0 assert not _tokenfds _debug('setup(%d)\n' % maxjobs) flags = ' ' + os.getenv('MAKEFLAGS', '') + ' ' FIND1 = ' --jobserver-auth=' # renamed in GNU make 4.2 FIND2 = ' --jobserver-fds=' # fallback syntax FIND = FIND1 ofs = flags.find(FIND1) if ofs < 0: FIND = FIND2 ofs = flags.find(FIND2) if ofs >= 0: s = flags[ofs+len(FIND):] (arg, junk) = s.split(' ', 1) (a, b) = arg.split(',', 1) a = atoi(a) b = atoi(b) if a <= 0 or b <= 0: logs.err('invalid --jobserver-auth: %r\n' % arg) raise helpers.ImmediateReturn(200) if not helpers.fd_exists(a) or not helpers.fd_exists(b): logs.err('broken --jobserver-auth from parent process:\n') logs.err(' using GNU make? prefix your Makefile rule with "+"\n') logs.err( ' otherwise, see ' + 'https://redo.rtfd.io/en/latest/FAQParallel/#MAKEFLAGS\n') raise helpers.ImmediateReturn(200) if maxjobs == 1: # user requested exactly one token, which means they want to # serialize us, even if the parent redo is running in parallel. # That's pretty harmless, so allow it without a warning. pass elif maxjobs: # user requested more than one token, even though we have a parent # jobserver, which is fishy. Warn about it, like make does. logs.warn(('warning: -j%d forced in sub-redo; ' + 'starting new jobserver.\n') % maxjobs) else: # user requested zero tokens, which means use the parent jobserver # if it exists. _tokenfds = (a, b) cheats = os.getenv('REDO_CHEATFDS', '') if not maxjobs else '' _cheatfds = None if cheats: (a, b) = cheats.split(',', 1) a = atoi(a) b = atoi(b) if a <= 0 or b <= 0: raise ValueError('invalid REDO_CHEATFDS: %r' % cheats) _cheatfds = (a, b) if not helpers.fd_exists(a) or not helpers.fd_exists(b): # This can happen if we're called by a parent process who closes # all "unknown" file descriptors (which is anti-social behaviour, # but oh well, we'll warn about it if they close the jobserver # fds in MAKEFLAGS, so just ignore it if it also happens here). _cheatfds = None if not _cheatfds: _cheatfds = _make_pipe(102) os.environ['REDO_CHEATFDS'] = ('%d,%d' % (_cheatfds[0], _cheatfds[1])) if not _tokenfds: # need to start a new server realmax = maxjobs or 1 _toplevel = realmax _tokenfds = _make_pipe(100) _create_tokens(realmax - 1) _release_except_mine() os.environ['MAKEFLAGS'] = ( ' -j --jobserver-auth=%d,%d --jobserver-fds=%d,%d' % (_tokenfds[0], _tokenfds[1], _tokenfds[0], _tokenfds[1])) def _wait(want_token, max_delay): """Wait for a subproc to die or, if want_token, tokenfd to be readable. Does not actually read tokenfd once it's readable (so someone else might read it before us). This function returns after max_delay seconds or when at least one subproc dies or (if want_token) for tokenfd to be readable. Args: want_token: true if we should return when tokenfd is readable. max_delay: max seconds to wait, or None to wait forever. Returns: None """ rfds = list(_waitfds.keys()) if want_token: rfds.append(_tokenfds[0]) assert rfds assert state.is_flushed() _debug('_tokenfds=%r; jfds=%r\n' % (_tokenfds, _waitfds)) r, w, x = select.select(rfds, [], [], max_delay) _debug('readable: %r\n' % (r,)) for fd in r: if fd == _tokenfds[0]: pass else: pd = _waitfds[fd] _debug("done: %r\n" % pd.name) # redo subprocesses are expected to die without releasing their # tokens, so things are less likely to get confused if they # die abnormally. Since a child has died, that means a token has # 'disappeared' and we now need to recreate it. b = _try_read(_cheatfds[0], 1) if b: # someone exited with _cheats > 0, so we need to compensate # by *not* re-creating a token now. _debug('EAT cheatfd %r\n' % b) else: _create_tokens(1) if has_token(): _release_except_mine() os.close(fd) del _waitfds[fd] rv = os.waitpid(pd.pid, 0) assert rv[0] == pd.pid _debug("done1: rv=%r\n" % (rv,)) rv = rv[1] if os.WIFEXITED(rv): pd.rv = os.WEXITSTATUS(rv) else: pd.rv = -os.WTERMSIG(rv) _debug("done2: rv=%d\n" % pd.rv) pd.donefunc(pd.name, pd.rv) def has_token(): """Returns true if this process has a job token.""" assert _mytokens >= 0 if _mytokens >= 1: return True def _ensure_token(reason, max_delay=None): """Don't return until this process has a job token. Args: reason: the reason (for debugging purposes) we need a token. Usually the name of a target we want to build. max_delay: the max time to wait for a token, or None if forever. Returns: None, but has_token() is now true *unless* max_delay is non-None and we timed out. """ global _mytokens assert state.is_flushed() assert _mytokens <= 1 while 1: if _mytokens >= 1: _debug("_mytokens is %d\n" % _mytokens) assert _mytokens == 1 _debug('(%r) used my own token...\n' % reason) break assert _mytokens < 1 _debug('(%r) waiting for tokens...\n' % reason) _wait(want_token=1, max_delay=max_delay) if _mytokens >= 1: break assert _mytokens < 1 b = _try_read(_tokenfds[0], 1) if b == '': raise Exception('unexpected EOF on token read') if b: _mytokens += 1 _debug('(%r) got a token (%r).\n' % (reason, b)) break if max_delay != None: break assert _mytokens <= 1 def ensure_token_or_cheat(reason, cheatfunc): """Wait for a job token to become available, or cheat if possible. If we already have a token, we return immediately. If we have any processes running *and* we don't own any tokens, we wait for a process to finish, and then use that token. Otherwise, we're allowed to cheat. We call cheatfunc() occasionally to consider cheating; if it returns n > 0, we materialize that many cheater tokens and return. Args: reason: the reason (for debugging purposes) we need a token. Usually the name of a target we want to build. cheatfunc: a function which returns n > 0 (usually 1) if we should cheat right now, because we're the "foreground" process that must be allowed to continue. """ global _mytokens, _cheats backoff = 0.01 while not has_token(): while running() and not has_token(): # If we already have a subproc running, then effectively we # already have a token. Don't create a cheater token unless # we're completely idle. _ensure_token(reason, max_delay=None) _ensure_token(reason, max_delay=min(1.0, backoff)) backoff *= 2 if not has_token(): assert _mytokens == 0 n = cheatfunc() _debug('%s: %s: cheat = %d\n' % (env.v.TARGET, reason, n)) if n > 0: _mytokens += n _cheats += n break def running(): """Returns true if we have any running jobs.""" return len(_waitfds) def wait_all(): """Wait for all running jobs to finish.""" _debug("%d,%d -> wait_all\n" % (_mytokens, _cheats)) assert state.is_flushed() while 1: while _mytokens >= 2: _release(1) if not running(): break # We should only release our last token if we have remaining # children. A terminating redo process should try to terminate while # holding a token, and if we have no children left, we might be # about to terminate. if _mytokens >= 1: release_mine() _debug("wait_all: wait()\n") _wait(want_token=0, max_delay=None) _debug("wait_all: empty list\n") if _toplevel: # If we're the toplevel and we're sure no child processes remain, # then we know we're totally idle. Self-test to ensure no tokens # mysteriously got created/destroyed. if _mytokens >= 1: release_mine() tokens = _try_read_all(_tokenfds[0], 8192) cheats = _try_read_all(_cheatfds[0], 8192) _debug('toplevel: GOT %d tokens and %d cheats\n' % (len(tokens), len(cheats))) if len(tokens) - len(cheats) != _toplevel: raise Exception('on exit: expected %d tokens; found %r-%r' % (_toplevel, len(tokens), len(cheats))) os.write(_tokenfds[1], tokens) # note: when we return, we may have *no* tokens, not even our own! # If caller wants to continue, they might have to obtain one first. def force_return_tokens(): """Release or destroy all the tokens we own, in preparation for exit.""" n = len(_waitfds) _debug('%d,%d -> %d jobs left in force_return_tokens\n' % (_mytokens, _cheats, n)) for k in list(_waitfds): del _waitfds[k] _create_tokens(n) if has_token(): _release_except_mine() assert _mytokens == 1, 'mytokens=%d' % _mytokens assert _cheats <= _mytokens, 'mytokens=%d cheats=%d' % (_mytokens, _cheats) assert _cheats in (0, 1), 'cheats=%d' % _cheats if _cheats: _debug('%d,%d -> force_return_tokens: recovering final token\n' % (_mytokens, _cheats)) _destroy_tokens(_cheats) os.write(_cheatfds[1], b't' * _cheats) assert state.is_flushed() class Job(object): """Metadata about a running job.""" def __init__(self, name, pid, donefunc): """Initialize a job. Args: name: the name of the job (usually a target filename). pid: the pid of the subprocess running this job. donefunc: the function(name, return_value) to call when the subprocess exits. """ self.name = name self.pid = pid self.rv = None self.donefunc = donefunc def __repr__(self): return 'Job(%s,%d)' % (self.name, self.pid) def start(reason, jobfunc, donefunc): """Start a new job. Must only be run with has_token() true. Args: reason: the reason the job is running. Usually the filename of a target being built. jobfunc: the function() to execute, **in a subprocess**, to run the job. Usually this calls os.execve(). It is an error for this function to ever return. If it does, we return a non-zero exit code to the parent process (the one which ran start()). donefunc: the function(reason, return_value) to call **in the parent** when the subprocess exits. """ assert state.is_flushed() assert _mytokens <= 1 assert _mytokens == 1 # Subprocesses always start with 1 token, so we have to destroy ours # in order for the universe to stay in balance. _destroy_tokens(1) r, w = _make_pipe(50) pid = os.fork() if pid == 0: # child rv = 201 try: os.close(r) try: rv = jobfunc() assert 0, 'jobfunc returned?! (%r, %r)' % (jobfunc, rv) except Exception: # pylint: disable=broad-except import traceback traceback.print_exc() finally: _debug('exit: %d\n' % rv) os._exit(rv) helpers.close_on_exec(r, True) os.close(w) pd = Job(reason, pid, donefunc) _waitfds[r] = pd