[prev in list] [next in list] [prev in thread] [next in thread]
List: gentoo-portage-dev
Subject: [gentoo-portage-dev] [PATCH] ForkProcess: replace os.fork with multiprocessing.Process (bug 730192)
From: Zac Medico <zmedico () gentoo ! org>
Date: 2020-07-16 20:28:16
Message-ID: 20200716202816.24649-1-zmedico () gentoo ! org
[Download RAW message or body]
Replace os.fork with multiprocessing.Process, in order to leverage
any pre-fork and post-fork interpreter housekeeping that it provides,
promoting a healthy state for the forked interpreter.
Since multiprocessing.Process closes sys.__stdin__, fix relevant
code to use the portage._get_stdin() compatibility function.
In case there's a legitimate need to inherit stdin for things like
PROPERTIES=interactive support, create a temporary duplicate of
fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__
in the subprocess.
Bug: https://bugs.gentoo.org/730192
Signed-off-by: Zac Medico <zmedico@gentoo.org>
---
lib/portage/process.py | 4 +-
lib/portage/sync/controller.py | 4 +-
lib/portage/util/_async/ForkProcess.py | 135 +++++++++++++++++++------
3 files changed, 108 insertions(+), 35 deletions(-)
diff --git a/lib/portage/process.py b/lib/portage/process.py
index f550bcb30..2affd4d4b 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -1,5 +1,5 @@
# portage.py -- core Portage functionality
-# Copyright 1998-2019 Gentoo Authors
+# Copyright 1998-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
@@ -107,7 +107,7 @@ def sanitize_fds():
if _set_inheritable is not None:
whitelist = frozenset([
- sys.__stdin__.fileno(),
+ portage._get_stdin().fileno(),
sys.__stdout__.fileno(),
sys.__stderr__.fileno(),
])
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
index c4c72e73a..43bb5d520 100644
--- a/lib/portage/sync/controller.py
+++ b/lib/portage/sync/controller.py
@@ -1,4 +1,4 @@
-# Copyright 2014-2019 Gentoo Authors
+# Copyright 2014-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
from __future__ import print_function
@@ -231,7 +231,7 @@ class SyncManager(object):
# Redirect command stderr to stdout, in order to prevent
# spurious cron job emails (bug 566132).
spawn_kwargs["fd_pipes"] = {
- 0: sys.__stdin__.fileno(),
+ 0: portage._get_stdin().fileno(),
1: sys.__stdout__.fileno(),
2: sys.__stdout__.fileno()
}
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index d84e93833..ade5c67ad 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,6 +1,8 @@
-# Copyright 2012-2013 Gentoo Foundation
+# Copyright 2012-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import fcntl
+import multiprocessing
import signal
import sys
import traceback
@@ -11,27 +13,100 @@ from _emerge.SpawnProcess import SpawnProcess
class ForkProcess(SpawnProcess):
- __slots__ = ()
+ __slots__ = ('_proc',)
def _spawn(self, args, fd_pipes=None, **kwargs):
"""
- Fork a subprocess, apply local settings, and call fetch().
+ Override SpawnProcess._spawn to fork a subprocess that calls
+ self._run(). This uses multiprocessing.Process in order to leverage
+ any pre-fork and post-fork interpreter housekeeping that it provides,
+ promoting a healthy state for the forked interpreter.
"""
-
- parent_pid = os.getpid()
- pid = None
+ # Since multiprocessing.Process closes sys.__stdin__, create a
+ # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
+ # be restored in the subprocess, in case this is needed for
+ # things like PROPERTIES=interactive support.
+ stdin_dup = None
try:
- pid = os.fork()
+ stdin_fd = fd_pipes.get(0)
+ if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
+ stdin_dup = os.dup(stdin_fd)
+ fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
+ fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
+ fd_pipes[0] = stdin_dup
+ self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
+ self._proc.start()
+ finally:
+ if stdin_dup is not None:
+ os.close(stdin_dup)
+
+ if self._use_proc_sentinel(self._proc):
+ self.scheduler.add_reader(self._proc.sentinel, self._proc_sentinel_exit, self._proc)
+
+ return [self._proc.pid]
+
+ def _use_proc_sentinel(self, proc):
+ """
+ Use sentinel for versions of python that support the close
+ method, since a premature call to self._proc.close() may
+ raise ValueError. It's not safe to call remove_reader on
+ proc.sentinel unless the close method is supported, since
+ otherwise it's not possible to know when the corresponding
+ file descriptor has been closed and then reallocated to a
+ concurrent coroutine.
- if pid != 0:
- if not isinstance(pid, int):
- raise AssertionError(
- "fork returned non-integer: %s" % (repr(pid),))
- return [pid]
+ @param proc: process instance
+ @type proc: multiprocessing.Process
+ @rtype: bool
+ @returns: True if multiprocessing.Process sentinal should be used
+ """
+ return hasattr(proc, 'close')
+
+ def _cancel(self):
+ if self._proc is None:
+ super(ForkProcess, self)._cancel()
+ else:
+ self._proc.terminate()
+
+ def _async_wait(self):
+ # Wait for _proc_sentinel_exit when appropriate, since a premature
+ # call to self._proc.close() may raise ValueError.
+ if self._proc is None or not self._use_proc_sentinel(self._proc):
+ super(ForkProcess, self)._async_wait()
- rval = 1
- try:
+ def _async_waitpid(self):
+ # Wait for _proc_sentinel_exit when appropriate, since a premature
+ # call to self._proc.close() may raise ValueError.
+ if self._proc is None or not self._use_proc_sentinel(self._proc):
+ super(ForkProcess, self)._async_waitpid()
+ def _proc_sentinel_exit(self, proc):
+ self._proc = None
+ # If multiprocessing.Process supports the close method, then
+ # access to the sentinel value should raise ValueError if the
+ # sentinel has been closed. In this case it's not safe to call
+ # remove_reader, since file descriptor has been closed and then
+ # reallocated to a concurrent coroutine. After proc is closed,
+ # most method calls and attribute accesses will raise ValueError,
+ # so we assume that it is ready to close here and that no other
+ # code will ever close it (the sentinel provides the only means
+ # to predict when it is safe to close).
+ self.scheduler.remove_reader(proc.sentinel)
+ proc.join()
+ self._was_cancelled()
+ if self.returncode is None:
+ self.returncode = proc.exitcode
+ proc.close()
+ self._async_wait()
+
+ def _unregister(self):
+ super(ForkProcess, self)._unregister()
+ if self._proc is not None:
+ if self._proc.is_alive():
+ self._proc.terminate()
+ self._proc = None
+
+ def _bootstrap(self, fd_pipes):
# Use default signal handlers in order to avoid problems
# killing subprocesses as reported in bug #353239.
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -52,24 +127,22 @@ class ForkProcess(SpawnProcess):
# (see _setup_pipes docstring).
portage.process._setup_pipes(fd_pipes, close_fds=False)
- rval = self._run()
- except SystemExit:
- raise
- except:
- traceback.print_exc()
- # os._exit() skips stderr flush!
- sys.stderr.flush()
- finally:
- os._exit(rval)
+ # Since multiprocessing.Process closes sys.__stdin__ and
+ # makes sys.stdin refer to os.devnull, restore it when
+ # appropriate.
+ if 0 in fd_pipes:
+ # It's possible that sys.stdin.fileno() is already 0,
+ # and in that case the above _setup_pipes call will
+ # have already updated its identity via dup2. Otherwise,
+ # perform the dup2 call now, and also copy the file
+ # descriptor flags.
+ if sys.stdin.fileno() != 0:
+ os.dup2(0, sys.stdin.fileno())
+ fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD,
+ fcntl.fcntl(0, fcntl.F_GETFD))
+ sys.__stdin__ = sys.stdin
- finally:
- if pid == 0 or (pid is None and os.getpid() != parent_pid):
- # Call os._exit() from a finally block in order
- # to suppress any finally blocks from earlier
- # in the call stack (see bug #345289). This
- # finally block has to be setup before the fork
- # in order to avoid a race condition.
- os._exit(1)
+ sys.exit(self._run())
def _run(self):
raise NotImplementedError(self)
--
2.25.3
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic