[prev in list] [next in list] [prev in thread] [next in thread] 

List:       openais
Subject:    [Openais] corosync trunk - use posix unnamed semaphores in shared
From:       Steven Dake <sdake () redhat ! com>
Date:       2009-07-30 1:56:02
Message-ID: 1248918962.2722.98.camel () localhost ! localdomain
[Download RAW message or body]

This patch changes the use from sysv semaphores to posix unnamed shared
semaphores on platforms that support the feature.

The checking of OS feature support needs a bit of work.

This patch also gets rid of the need to handle SIGUSR2 in the library
for pthread_kill.  Instead we use sem_post to signal a connection is
ended.

Main benefit beyond performance is no upper limit on number of ipc
connections and no leaking that happens when a process is killed
unexpectedly.

Regards
-steve

["corosync-trunk-use-posix-semaphores.patch" (corosync-trunk-use-posix-semaphores.patch)]

Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h	(revision 2374)
+++ include/corosync/coroipc_ipc.h	(working copy)
@@ -34,6 +34,12 @@
 #ifndef COROIPC_IPC_H_DEFINED
 #define COROIPC_IPC_H_DEFINED
 
+#include "config.h"
+
+#ifdef HAVE_SEM_INIT
+#include <semaphore.h>
+#endif
+
 enum req_init_types {
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_DISPATCH_INIT = 1
@@ -45,6 +51,11 @@
 struct control_buffer {
 	unsigned int read;
 	unsigned int write;
+#ifdef HAVE_SEM_INIT
+	sem_t sem0;
+	sem_t sem1;
+	sem_t sem2;
+#endif
 };
 
 enum res_init_types {
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c	(revision 2374)
+++ exec/coroipcs.c	(working copy)
@@ -67,7 +67,13 @@
 #include <string.h>
 
 #include <sys/shm.h>
+
+#if defined(HAVE_SEM_INIT)
+#include <semaphore.h>
+#else
 #include <sys/sem.h>
+#endif
+
 #include <corosync/corotypes.h>
 #include <corosync/list.h>
 
@@ -100,6 +106,7 @@
 	size_t size;
 };
 
+#if !defined(HAVE_SEM_INIT)
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
 	int val;
@@ -108,7 +115,9 @@
 	struct seminfo *__buf;
 };
 #endif
+#endif
 
+
 enum conn_state {
 	CONN_STATE_THREAD_INACTIVE = 0,
 	CONN_STATE_THREAD_ACTIVE = 1,
@@ -126,9 +135,10 @@
 	enum conn_state state;
 	int notify_flow_control_enabled;
 	int refcount;
-	key_t shmkey;
+#if !defined(HAVE_SEM_INIT)
 	key_t semkey;
 	int semid;
+#endif
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
 	struct control_buffer *control_buffer;
@@ -159,6 +169,32 @@
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked);
 
+static void sem_post_exit_thread (struct conn_info *conn_info)
+{
+#if !defined(HAVE_SEM_INIT)
+	struct sembuf sop;
+#endif
+	int res;
+
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+	res = sem_post (&conn_info->control_buffer->sem0);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semop;
+	}
+#else
+	sop.sem_num = 1;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (conn_info->semid, &sop, 1);
+	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		goto retry_semop;
+	}
+#endif
+}
+
 static int
 memory_map (
 	const char *path,
@@ -392,7 +428,7 @@
 	}
 
 	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-		pthread_kill (conn_info->thread, SIGUSR1);
+		sem_post_exit_thread (conn_info);
 		return (0);
 	}
 
@@ -419,13 +455,19 @@
 	list_del (&conn_info->list);
 	pthread_mutex_unlock (&conn_info->mutex);
 
+#if defined(HAVE_SEM_INIT)
+	sem_destroy (&conn_info->control_buffer->sem0);
+	sem_destroy (&conn_info->control_buffer->sem1);
+	sem_destroy (&conn_info->control_buffer->sem2);
+#else
+	semctl (conn_info->semid, 0, IPC_RMID);
+#endif
 	/*
 	 * Destroy shared memory segment and semaphore
 	 */
 	res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
 	res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
 	res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
-	semctl (conn_info->semid, 0, IPC_RMID);
 
 	/*
 	 * Free allocated data needed to retry exiting library IPC connection
@@ -521,7 +563,9 @@
 static void *pthread_ipc_consumer (void *conn)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
 	struct sembuf sop;
+#endif
 	int res;
 	coroipc_request_header_t *header;
 	coroipc_response_header_t coroipc_response_header;
@@ -536,6 +580,18 @@
 #endif
 
 	for (;;) {
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+		res = sem_wait (&conn_info->control_buffer->sem0);
+		if (ipc_thread_active (conn_info) == 0) {
+			coroipcs_refcount_dec (conn_info);
+			pthread_exit (0);
+		}
+		if ((res == -1) && (errno == EINTR)) {
+			goto retry_semop;
+		}
+#else
+
 		sop.sem_num = 0;
 		sop.sem_op = -1;
 		sop.sem_flg = 0;
@@ -552,6 +608,7 @@
 			coroipcs_refcount_dec (conn_info);
 			pthread_exit (0);
 		}
+#endif
 
 		zerocopy_operations_process (conn_info, &header, &new_message);
 		/*
@@ -744,6 +801,8 @@
 
 static void ipc_disconnect (struct conn_info *conn_info)
 {
+	int res;
+
 	if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
 		conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
 		return;
@@ -755,7 +814,7 @@
 	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	pthread_mutex_unlock (&conn_info->mutex);
 
-	pthread_kill (conn_info->thread, SIGUSR1);
+	sem_post_exit_thread (conn_info);
 }
 
 static int conn_info_create (int fd)
@@ -874,6 +933,14 @@
 
 		conn_info = list_entry (list, struct conn_info, list);
 
+#if defined(HAVE_SEM_INIT)
+		sem_destroy (&conn_info->control_buffer->sem0);
+		sem_destroy (&conn_info->control_buffer->sem1);
+		sem_destroy (&conn_info->control_buffer->sem2);
+#else
+		semctl (conn_info->semid, 0, IPC_RMID);
+#endif
+
 		/*
 		 * Unmap memory segments
 		 */
@@ -886,9 +953,7 @@
 		res = circular_memory_unmap (conn_info->dispatch_buffer,
 			conn_info->dispatch_size);
 
-		semctl (conn_info->semid, 0, IPC_RMID);
-
-		pthread_kill (conn_info->thread, SIGUSR1);
+		sem_post_exit_thread (conn_info);
 	}
 }
 
@@ -905,10 +970,20 @@
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
 	struct sembuf sop;
+#endif
 	int res;
 
 	memcpy (conn_info->response_buffer, msg, mlen);
+
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+	res = sem_post (&conn_info->control_buffer->sem1);
+	if ((res == -1) && (errno == EINTR)) {
+		goto retry_semop;
+	}
+#else
 	sop.sem_num = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
@@ -921,13 +996,16 @@
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 	}
+#endif
 	return (0);
 }
 
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
 	struct sembuf sop;
+#endif
 	int res;
 	int write_idx = 0;
 	int i;
@@ -938,6 +1016,13 @@
 		write_idx += iov[i].iov_len;
 	}
 
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+	res = sem_post (&conn_info->control_buffer->sem1);
+	if ((res == -1) && (errno == EINTR)) {
+		goto retry_semop;
+	}
+#else
 	sop.sem_num = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
@@ -950,6 +1035,7 @@
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 	}
+#endif
 	return (0);
 }
 
@@ -984,7 +1070,9 @@
 		      int locked)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
 	struct sembuf sop;
+#endif
 	int res;
 	int i;
 	char buf;
@@ -1009,6 +1097,13 @@
 	if (res == -1) {
 		ipc_disconnect (conn_info);
 	}
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+	res = sem_post (&conn_info->control_buffer->sem2);
+	if ((res == -1) && (errno == EINTR)) {
+		goto retry_semop;
+	}
+#else
 	sop.sem_num = 2;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
@@ -1021,6 +1116,7 @@
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return;
 	}
+#endif
 }
 
 static void outq_flush (struct conn_info *conn_info) {
@@ -1062,8 +1158,10 @@
 {
 	mar_req_priv_change req_priv_change;
 	unsigned int res;
+#if !defined(HAVE_SEM_INIT)
 	union semun semun;
 	struct semid_ds ipc_set;
+#endif
 	int i;
 
 retry_recv:
@@ -1087,6 +1185,7 @@
 	}
 #endif
 
+#if !defined(HAVE_SEM_INIT)
 	ipc_set.sem_perm.uid = req_priv_change.euid;
 	ipc_set.sem_perm.gid = req_priv_change.egid;
 	ipc_set.sem_perm.mode = 0600;
@@ -1099,6 +1198,7 @@
 			return (-1);
 		}
 	}
+#endif
 	return (0);
 }
 
@@ -1293,7 +1393,9 @@
 			return (0);
 		}
 
+#if !defined(HAVE_SEM_INIT)
 		conn_info->semkey = req_setup->semkey;
+#endif
 		res = memory_map (
 			req_setup->control_file,
 			req_setup->control_size,
@@ -1323,7 +1425,9 @@
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 
+#if !defined(HAVE_SEM_INIT)
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
+#endif
 		conn_info->pending_semops = 0;
 
 		/*
Index: configure.ac
===================================================================
--- configure.ac	(revision 2374)
+++ configure.ac	(working copy)
@@ -105,7 +105,7 @@
 		memset mkdir scandir select socket strcasecmp strchr strdup \
 		strerror strrchr strspn strstr pthread_spin_lock \
 		pthread_spin_unlock pthread_setschedparam \
-		sched_get_priority_max sched_setscheduler])
+		sched_get_priority_max sched_setscheduler sem_init])
 
 AC_CONFIG_FILES([Makefile
 		 exec/Makefile
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c	(revision 2374)
+++ lib/coroipcc.c	(working copy)
@@ -55,7 +55,11 @@
 #include <netinet/in.h>
 #include <assert.h>
 #include <sys/shm.h>
+#ifdef HAVE_SEM_INIT
+#include <semaphore.h>
+#else
 #include <sys/sem.h>
+#endif
 #include <sys/mman.h>
 
 #include <corosync/corotypes.h>
@@ -68,8 +72,9 @@
 
 struct ipc_instance {
 	int fd;
-	int shmid;
+#ifndef HAVE_SEM_INIT
 	int semid;
+#endif
 	int flow_control_state;
 	struct control_buffer *control_buffer;
 	char *request_buffer;
@@ -258,6 +263,7 @@
 	return (0);
 }
 
+#ifndef HAVE_SEM_INIT
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
         int val;
@@ -266,6 +272,7 @@
         struct seminfo *__buf;
 };
 #endif
+#endif
 
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
@@ -391,7 +398,10 @@
 	const struct iovec *iov,
 	unsigned int iov_len)
 {
+#ifndef HAVE_SEM_INIT
 	struct sembuf sop;
+#endif
+
 	int i;
 	int res;
 	int req_buffer_idx = 0;
@@ -402,6 +412,10 @@
 			iov[i].iov_len);
 		req_buffer_idx += iov[i].iov_len;
 	}
+
+#ifdef HAVE_SEM_INIT
+	res = sem_post (&ipc_instance->control_buffer->sem0);
+#else 
 	/*
 	 * Signal semaphore #0 indicting a new message from client
 	 * to server request queue
@@ -422,6 +436,7 @@
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 	}
+#endif
 	return (CS_OK);
 }
 
@@ -431,10 +446,15 @@
 	void *res_msg,
 	size_t res_len)
 {
+#ifndef HAVE_SEM_INIT
 	struct sembuf sop;
+#endif
 	coroipc_response_header_t *response_header;
 	int res;
 
+#ifdef HAVE_SEM_INIT
+	res = sem_wait (&ipc_instance->control_buffer->sem1);
+#else
 	/*
 	 * Wait for semaphore #1 indicating a new message from server
 	 * to client in the response queue
@@ -455,6 +475,7 @@
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 	}
+#endif
 
 	response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
 	if (response_header->error == CS_ERR_TRY_AGAIN) {
@@ -470,9 +491,14 @@
 	struct ipc_instance *ipc_instance,
 	void **res_msg)
 {
+#ifndef HAVE_SEM_INIT
 	struct sembuf sop;
+#endif
 	int res;
 
+#ifdef HAVE_SEM_INIT
+	res = sem_wait (&ipc_instance->control_buffer->sem1);
+#else
 	/*
 	 * Wait for semaphore #1 indicating a new message from server
 	 * to client in the response queue
@@ -493,6 +519,7 @@
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 	}
+#endif
 
 	*res_msg = (char *)ipc_instance->response_buffer;
 	return (CS_OK);
@@ -515,11 +542,13 @@
 	struct sockaddr_un address;
 	cs_error_t res;
 	struct ipc_instance *ipc_instance;
+#ifndef HAVE_SEM_INIT
 	key_t semkey = 0;
+	union semun semun;
+#endif
 	int sys_res;
 	mar_req_setup_t req_setup;
 	mar_res_setup_t res_setup;
-	union semun semun;
 	char control_map_path[128];
 	char request_map_path[128];
 	char response_map_path[128];
@@ -568,6 +597,36 @@
 		return (CS_ERR_TRY_AGAIN);
 	}
 
+	res = memory_map (
+		control_map_path,
+		"control_buffer-XXXXXX",
+		(void *)&ipc_instance->control_buffer,
+		8192);
+
+	res = memory_map (
+		request_map_path,
+		"request_buffer-XXXXXX",
+		(void *)&ipc_instance->request_buffer,
+		request_size);
+
+	res = memory_map (
+		response_map_path,
+		"response_buffer-XXXXXX",
+		(void *)&ipc_instance->response_buffer,
+		response_size);
+
+	res = circular_memory_map (
+		dispatch_map_path,
+		"dispatch_buffer-XXXXXX",
+		(void *)&ipc_instance->dispatch_buffer,
+		dispatch_size);
+
+#ifdef HAVE_SEM_INIT
+	sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+#else
+
 	/*
 	 * Allocate a semaphore segment
 	 */
@@ -600,31 +659,8 @@
 	if (res != 0) {
 		goto res_exit;
 	}
+#endif
 
-	res = memory_map (
-		control_map_path,
-		"control_buffer-XXXXXX",
-		(void *)&ipc_instance->control_buffer,
-		8192);
-
-	res = memory_map (
-		request_map_path,
-		"request_buffer-XXXXXX",
-		(void *)&ipc_instance->request_buffer,
-		request_size);
-
-	res = memory_map (
-		response_map_path,
-		"response_buffer-XXXXXX",
-		(void *)&ipc_instance->response_buffer,
-		response_size);
-
-	res = circular_memory_map (
-		dispatch_map_path,
-		"dispatch_buffer-XXXXXX",
-		(void *)&ipc_instance->dispatch_buffer,
-		dispatch_size);
-
 	/*
 	 * Initialize IPC setup message
 	 */
@@ -637,7 +673,10 @@
 	req_setup.request_size = request_size;
 	req_setup.response_size = response_size;
 	req_setup.dispatch_size = dispatch_size;
+
+#ifndef HAVE_SEM_INIT
 	req_setup.semkey = semkey;
+#endif
 
 	res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
 	if (res != CS_OK) {
@@ -668,8 +707,10 @@
 
 res_exit:
 	close (request_fd);
+#ifndef HAVE_SEM_INIT
 	if (ipc_instance->semid > 0)
 		semctl (ipc_instance->semid, 0, IPC_RMID);
+#endif
 	return (res_setup.error);
 }
 
@@ -821,7 +862,9 @@
 cs_error_t
 coroipcc_dispatch_put (hdb_handle_t handle)
 {
+#ifndef HAVE_SEM_INIT
 	struct sembuf sop;
+#endif
 	coroipc_response_header_t *header;
 	struct ipc_instance *ipc_instance;
 	int res;
@@ -832,6 +875,9 @@
 	if (res != CS_OK) {
 		return (res);
 	}
+#ifdef HAVE_SEM_INIT
+	sem_wait (&ipc_instance->control_buffer->sem2);
+#else
 	sop.sem_num = 2;
 	sop.sem_op = -1;
 	sop.sem_flg = 0;
@@ -847,6 +893,7 @@
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 	}
+#endif
 
 	addr = ipc_instance->dispatch_buffer;
 


_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais

[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic