[prev in list] [next in list] [prev in thread] [next in thread] 

List:       gentoo-portage-dev
Subject:    [gentoo-portage-dev] [PATCH] ForkProcess: replace os.fork with multiprocessing.Process (bug 730192)
From:       Zac Medico <zmedico () gentoo ! org>
Date:       2020-07-16 20:28:16
Message-ID: 20200716202816.24649-1-zmedico () gentoo ! org
[Download RAW message or body]

Replace os.fork with multiprocessing.Process, in order to leverage
any pre-fork and post-fork interpreter housekeeping that it provides,
promoting a healthy state for the forked interpreter.

Since multiprocessing.Process closes sys.__stdin__, fix relevant
code to use the portage._get_stdin() compatibility function.
In case there's a legitimate need to inherit stdin for things like
PROPERTIES=interactive support, create a temporary duplicate of
fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__
in the subprocess.

Bug: https://bugs.gentoo.org/730192
Signed-off-by: Zac Medico <zmedico@gentoo.org>
---
 lib/portage/process.py                 |   4 +-
 lib/portage/sync/controller.py         |   4 +-
 lib/portage/util/_async/ForkProcess.py | 135 +++++++++++++++++++------
 3 files changed, 108 insertions(+), 35 deletions(-)

diff --git a/lib/portage/process.py b/lib/portage/process.py
index f550bcb30..2affd4d4b 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -1,5 +1,5 @@
 # portage.py -- core Portage functionality
-# Copyright 1998-2019 Gentoo Authors
+# Copyright 1998-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 
@@ -107,7 +107,7 @@ def sanitize_fds():
 	if _set_inheritable is not None:
 
 		whitelist = frozenset([
-			sys.__stdin__.fileno(),
+			portage._get_stdin().fileno(),
 			sys.__stdout__.fileno(),
 			sys.__stderr__.fileno(),
 		])
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
index c4c72e73a..43bb5d520 100644
--- a/lib/portage/sync/controller.py
+++ b/lib/portage/sync/controller.py
@@ -1,4 +1,4 @@
-# Copyright 2014-2019 Gentoo Authors
+# Copyright 2014-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import print_function
@@ -231,7 +231,7 @@ class SyncManager(object):
 		# Redirect command stderr to stdout, in order to prevent
 		# spurious cron job emails (bug 566132).
 		spawn_kwargs["fd_pipes"] = {
-			0: sys.__stdin__.fileno(),
+			0: portage._get_stdin().fileno(),
 			1: sys.__stdout__.fileno(),
 			2: sys.__stdout__.fileno()
 		}
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index d84e93833..ade5c67ad 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,6 +1,8 @@
-# Copyright 2012-2013 Gentoo Foundation
+# Copyright 2012-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
+import fcntl
+import multiprocessing
 import signal
 import sys
 import traceback
@@ -11,27 +13,100 @@ from _emerge.SpawnProcess import SpawnProcess
 
 class ForkProcess(SpawnProcess):
 
-	__slots__ = ()
+	__slots__ = ('_proc',)
 
 	def _spawn(self, args, fd_pipes=None, **kwargs):
 		"""
-		Fork a subprocess, apply local settings, and call fetch().
+		Override SpawnProcess._spawn to fork a subprocess that calls
+		self._run(). This uses multiprocessing.Process in order to leverage
+		any pre-fork and post-fork interpreter housekeeping that it provides,
+		promoting a healthy state for the forked interpreter.
 		"""
-
-		parent_pid = os.getpid()
-		pid = None
+		# Since multiprocessing.Process closes sys.__stdin__, create a
+		# temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
+		# be restored in the subprocess, in case this is needed for
+		# things like PROPERTIES=interactive support.
+		stdin_dup = None
 		try:
-			pid = os.fork()
+			stdin_fd = fd_pipes.get(0)
+			if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
+				stdin_dup = os.dup(stdin_fd)
+				fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
+					fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
+				fd_pipes[0] = stdin_dup
+			self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
+			self._proc.start()
+		finally:
+			if stdin_dup is not None:
+				os.close(stdin_dup)
+
+		if self._use_proc_sentinel(self._proc):
+			self.scheduler.add_reader(self._proc.sentinel, self._proc_sentinel_exit, self._proc)
+
+		return [self._proc.pid]
+
+	def _use_proc_sentinel(self, proc):
+		"""
+		Use sentinel for versions of python that support the close
+		method, since a premature call to self._proc.close() may
+		raise ValueError. It's not safe to call remove_reader on
+		proc.sentinel unless the close method is supported, since
+		otherwise it's not possible to know when the corresponding
+		file descriptor has been closed and then reallocated to a
+		concurrent coroutine.
 
-			if pid != 0:
-				if not isinstance(pid, int):
-					raise AssertionError(
-						"fork returned non-integer: %s" % (repr(pid),))
-				return [pid]
+		@param proc: process instance
+		@type proc: multiprocessing.Process
+		@rtype: bool
+		@returns: True if multiprocessing.Process sentinal should be used
+		"""
+		return hasattr(proc, 'close')
+
+	def _cancel(self):
+		if self._proc is None:
+			super(ForkProcess, self)._cancel()
+		else:
+			self._proc.terminate()
+
+	def _async_wait(self):
+		# Wait for _proc_sentinel_exit when appropriate, since a premature
+		# call to self._proc.close() may raise ValueError.
+		if self._proc is None or not self._use_proc_sentinel(self._proc):
+			super(ForkProcess, self)._async_wait()
 
-			rval = 1
-			try:
+	def _async_waitpid(self):
+		# Wait for _proc_sentinel_exit when appropriate, since a premature
+		# call to self._proc.close() may raise ValueError.
+		if self._proc is None or not self._use_proc_sentinel(self._proc):
+			super(ForkProcess, self)._async_waitpid()
 
+	def _proc_sentinel_exit(self, proc):
+		self._proc = None
+		# If multiprocessing.Process supports the close method, then
+		# access to the sentinel value should raise ValueError if the
+		# sentinel has been closed. In this case it's not safe to call
+		# remove_reader, since file descriptor has been closed and then
+		# reallocated to a concurrent coroutine. After proc is closed,
+		# most method calls and attribute accesses will raise ValueError,
+		# so we assume that it is ready to close here and that no other
+		# code will ever close it (the sentinel provides the only means
+		# to predict when it is safe to close).
+		self.scheduler.remove_reader(proc.sentinel)
+		proc.join()
+		self._was_cancelled()
+		if self.returncode is None:
+			self.returncode = proc.exitcode
+		proc.close()
+		self._async_wait()
+
+	def _unregister(self):
+		super(ForkProcess, self)._unregister()
+		if self._proc is not None:
+			if self._proc.is_alive():
+				self._proc.terminate()
+			self._proc = None
+
+	def _bootstrap(self, fd_pipes):
 				# Use default signal handlers in order to avoid problems
 				# killing subprocesses as reported in bug #353239.
 				signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -52,24 +127,22 @@ class ForkProcess(SpawnProcess):
 				# (see _setup_pipes docstring).
 				portage.process._setup_pipes(fd_pipes, close_fds=False)
 
-				rval = self._run()
-			except SystemExit:
-				raise
-			except:
-				traceback.print_exc()
-				# os._exit() skips stderr flush!
-				sys.stderr.flush()
-			finally:
-				os._exit(rval)
+				# Since multiprocessing.Process closes sys.__stdin__ and
+				# makes sys.stdin refer to os.devnull, restore it when
+				# appropriate.
+				if 0 in fd_pipes:
+					# It's possible that sys.stdin.fileno() is already 0,
+					# and in that case the above _setup_pipes call will
+					# have already updated its identity via dup2. Otherwise,
+					# perform the dup2 call now, and also copy the file
+					# descriptor flags.
+					if sys.stdin.fileno() != 0:
+						os.dup2(0, sys.stdin.fileno())
+						fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD,
+							fcntl.fcntl(0, fcntl.F_GETFD))
+					sys.__stdin__ = sys.stdin
 
-		finally:
-			if pid == 0 or (pid is None and os.getpid() != parent_pid):
-				# Call os._exit() from a finally block in order
-				# to suppress any finally blocks from earlier
-				# in the call stack (see bug #345289). This
-				# finally block has to be setup before the fork
-				# in order to avoid a race condition.
-				os._exit(1)
+				sys.exit(self._run())
 
 	def _run(self):
 		raise NotImplementedError(self)
-- 
2.25.3


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic