[prev in list] [next in list] [prev in thread] [next in thread]
List: openais
Subject: [Openais] corosync trunk - use posix unnamed semaphores in shared
From: Steven Dake <sdake () redhat ! com>
Date: 2009-07-30 1:56:02
Message-ID: 1248918962.2722.98.camel () localhost ! localdomain
[Download RAW message or body]
This patch changes the use from sysv semaphores to posix unnamed shared
semaphores on platforms that support the feature.
The checking of OS feature support needs a bit of work.
This patch also gets rid of the need to handle SIGUSR2 in the library
for pthread_kill. Instead we use sem_post to signal a connection is
ended.
Main benefit beyond performance is no upper limit on number of ipc
connections and no leaking that happens when a process is killed
unexpectedly.
Regards
-steve
["corosync-trunk-use-posix-semaphores.patch" (corosync-trunk-use-posix-semaphores.patch)]
Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h (revision 2374)
+++ include/corosync/coroipc_ipc.h (working copy)
@@ -34,6 +34,12 @@
#ifndef COROIPC_IPC_H_DEFINED
#define COROIPC_IPC_H_DEFINED
+#include "config.h"
+
+#ifdef HAVE_SEM_INIT
+#include <semaphore.h>
+#endif
+
enum req_init_types {
MESSAGE_REQ_RESPONSE_INIT = 0,
MESSAGE_REQ_DISPATCH_INIT = 1
@@ -45,6 +51,11 @@
struct control_buffer {
unsigned int read;
unsigned int write;
+#ifdef HAVE_SEM_INIT
+ sem_t sem0;
+ sem_t sem1;
+ sem_t sem2;
+#endif
};
enum res_init_types {
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c (revision 2374)
+++ exec/coroipcs.c (working copy)
@@ -67,7 +67,13 @@
#include <string.h>
#include <sys/shm.h>
+
+#if defined(HAVE_SEM_INIT)
+#include <semaphore.h>
+#else
#include <sys/sem.h>
+#endif
+
#include <corosync/corotypes.h>
#include <corosync/list.h>
@@ -100,6 +106,7 @@
size_t size;
};
+#if !defined(HAVE_SEM_INIT)
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
@@ -108,7 +115,9 @@
struct seminfo *__buf;
};
#endif
+#endif
+
enum conn_state {
CONN_STATE_THREAD_INACTIVE = 0,
CONN_STATE_THREAD_ACTIVE = 1,
@@ -126,9 +135,10 @@
enum conn_state state;
int notify_flow_control_enabled;
int refcount;
- key_t shmkey;
+#if !defined(HAVE_SEM_INIT)
key_t semkey;
int semid;
+#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
struct control_buffer *control_buffer;
@@ -159,6 +169,32 @@
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked);
+static void sem_post_exit_thread (struct conn_info *conn_info)
+{
+#if !defined(HAVE_SEM_INIT)
+ struct sembuf sop;
+#endif
+ int res;
+
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+ res = sem_post (&conn_info->control_buffer->sem0);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ }
+#else
+ sop.sem_num = 1;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
+ }
+#endif
+}
+
static int
memory_map (
const char *path,
@@ -392,7 +428,7 @@
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
return (0);
}
@@ -419,13 +455,19 @@
list_del (&conn_info->list);
pthread_mutex_unlock (&conn_info->mutex);
+#if defined(HAVE_SEM_INIT)
+ sem_destroy (&conn_info->control_buffer->sem0);
+ sem_destroy (&conn_info->control_buffer->sem1);
+ sem_destroy (&conn_info->control_buffer->sem2);
+#else
+ semctl (conn_info->semid, 0, IPC_RMID);
+#endif
/*
* Destroy shared memory segment and semaphore
*/
res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
- semctl (conn_info->semid, 0, IPC_RMID);
/*
* Free allocated data needed to retry exiting library IPC connection
@@ -521,7 +563,9 @@
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
struct sembuf sop;
+#endif
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
@@ -536,6 +580,18 @@
#endif
for (;;) {
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+ res = sem_wait (&conn_info->control_buffer->sem0);
+ if (ipc_thread_active (conn_info) == 0) {
+ coroipcs_refcount_dec (conn_info);
+ pthread_exit (0);
+ }
+ if ((res == -1) && (errno == EINTR)) {
+ goto retry_semop;
+ }
+#else
+
sop.sem_num = 0;
sop.sem_op = -1;
sop.sem_flg = 0;
@@ -552,6 +608,7 @@
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
+#endif
zerocopy_operations_process (conn_info, &header, &new_message);
/*
@@ -744,6 +801,8 @@
static void ipc_disconnect (struct conn_info *conn_info)
{
+ int res;
+
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
return;
@@ -755,7 +814,7 @@
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
}
static int conn_info_create (int fd)
@@ -874,6 +933,14 @@
conn_info = list_entry (list, struct conn_info, list);
+#if defined(HAVE_SEM_INIT)
+ sem_destroy (&conn_info->control_buffer->sem0);
+ sem_destroy (&conn_info->control_buffer->sem1);
+ sem_destroy (&conn_info->control_buffer->sem2);
+#else
+ semctl (conn_info->semid, 0, IPC_RMID);
+#endif
+
/*
* Unmap memory segments
*/
@@ -886,9 +953,7 @@
res = circular_memory_unmap (conn_info->dispatch_buffer,
conn_info->dispatch_size);
- semctl (conn_info->semid, 0, IPC_RMID);
-
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
}
}
@@ -905,10 +970,20 @@
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
struct sembuf sop;
+#endif
int res;
memcpy (conn_info->response_buffer, msg, mlen);
+
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+ res = sem_post (&conn_info->control_buffer->sem1);
+ if ((res == -1) && (errno == EINTR)) {
+ goto retry_semop;
+ }
+#else
sop.sem_num = 1;
sop.sem_op = 1;
sop.sem_flg = 0;
@@ -921,13 +996,16 @@
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return (0);
}
+#endif
return (0);
}
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
struct sembuf sop;
+#endif
int res;
int write_idx = 0;
int i;
@@ -938,6 +1016,13 @@
write_idx += iov[i].iov_len;
}
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+ res = sem_post (&conn_info->control_buffer->sem1);
+ if ((res == -1) && (errno == EINTR)) {
+ goto retry_semop;
+ }
+#else
sop.sem_num = 1;
sop.sem_op = 1;
sop.sem_flg = 0;
@@ -950,6 +1035,7 @@
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return (0);
}
+#endif
return (0);
}
@@ -984,7 +1070,9 @@
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if !defined(HAVE_SEM_INIT)
struct sembuf sop;
+#endif
int res;
int i;
char buf;
@@ -1009,6 +1097,13 @@
if (res == -1) {
ipc_disconnect (conn_info);
}
+#if defined(HAVE_SEM_INIT)
+retry_semop:
+ res = sem_post (&conn_info->control_buffer->sem2);
+ if ((res == -1) && (errno == EINTR)) {
+ goto retry_semop;
+ }
+#else
sop.sem_num = 2;
sop.sem_op = 1;
sop.sem_flg = 0;
@@ -1021,6 +1116,7 @@
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return;
}
+#endif
}
static void outq_flush (struct conn_info *conn_info) {
@@ -1062,8 +1158,10 @@
{
mar_req_priv_change req_priv_change;
unsigned int res;
+#if !defined(HAVE_SEM_INIT)
union semun semun;
struct semid_ds ipc_set;
+#endif
int i;
retry_recv:
@@ -1087,6 +1185,7 @@
}
#endif
+#if !defined(HAVE_SEM_INIT)
ipc_set.sem_perm.uid = req_priv_change.euid;
ipc_set.sem_perm.gid = req_priv_change.egid;
ipc_set.sem_perm.mode = 0600;
@@ -1099,6 +1198,7 @@
return (-1);
}
}
+#endif
return (0);
}
@@ -1293,7 +1393,9 @@
return (0);
}
+#if !defined(HAVE_SEM_INIT)
conn_info->semkey = req_setup->semkey;
+#endif
res = memory_map (
req_setup->control_file,
req_setup->control_size,
@@ -1323,7 +1425,9 @@
conn_info->notify_flow_control_enabled = 0;
conn_info->setup_bytes_read = 0;
+#if !defined(HAVE_SEM_INIT)
conn_info->semid = semget (conn_info->semkey, 3, 0600);
+#endif
conn_info->pending_semops = 0;
/*
Index: configure.ac
===================================================================
--- configure.ac (revision 2374)
+++ configure.ac (working copy)
@@ -105,7 +105,7 @@
memset mkdir scandir select socket strcasecmp strchr strdup \
strerror strrchr strspn strstr pthread_spin_lock \
pthread_spin_unlock pthread_setschedparam \
- sched_get_priority_max sched_setscheduler])
+ sched_get_priority_max sched_setscheduler sem_init])
AC_CONFIG_FILES([Makefile
exec/Makefile
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c (revision 2374)
+++ lib/coroipcc.c (working copy)
@@ -55,7 +55,11 @@
#include <netinet/in.h>
#include <assert.h>
#include <sys/shm.h>
+#ifdef HAVE_SEM_INIT
+#include <semaphore.h>
+#else
#include <sys/sem.h>
+#endif
#include <sys/mman.h>
#include <corosync/corotypes.h>
@@ -68,8 +72,9 @@
struct ipc_instance {
int fd;
- int shmid;
+#ifndef HAVE_SEM_INIT
int semid;
+#endif
int flow_control_state;
struct control_buffer *control_buffer;
char *request_buffer;
@@ -258,6 +263,7 @@
return (0);
}
+#ifndef HAVE_SEM_INIT
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
@@ -266,6 +272,7 @@
struct seminfo *__buf;
};
#endif
+#endif
static int
circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
@@ -391,7 +398,10 @@
const struct iovec *iov,
unsigned int iov_len)
{
+#ifndef HAVE_SEM_INIT
struct sembuf sop;
+#endif
+
int i;
int res;
int req_buffer_idx = 0;
@@ -402,6 +412,10 @@
iov[i].iov_len);
req_buffer_idx += iov[i].iov_len;
}
+
+#ifdef HAVE_SEM_INIT
+ res = sem_post (&ipc_instance->control_buffer->sem0);
+#else
/*
* Signal semaphore #0 indicting a new message from client
* to server request queue
@@ -422,6 +436,7 @@
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
return (CS_OK);
}
@@ -431,10 +446,15 @@
void *res_msg,
size_t res_len)
{
+#ifndef HAVE_SEM_INIT
struct sembuf sop;
+#endif
coroipc_response_header_t *response_header;
int res;
+#ifdef HAVE_SEM_INIT
+ res = sem_wait (&ipc_instance->control_buffer->sem1);
+#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
@@ -455,6 +475,7 @@
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
if (response_header->error == CS_ERR_TRY_AGAIN) {
@@ -470,9 +491,14 @@
struct ipc_instance *ipc_instance,
void **res_msg)
{
+#ifndef HAVE_SEM_INIT
struct sembuf sop;
+#endif
int res;
+#ifdef HAVE_SEM_INIT
+ res = sem_wait (&ipc_instance->control_buffer->sem1);
+#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
@@ -493,6 +519,7 @@
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
*res_msg = (char *)ipc_instance->response_buffer;
return (CS_OK);
@@ -515,11 +542,13 @@
struct sockaddr_un address;
cs_error_t res;
struct ipc_instance *ipc_instance;
+#ifndef HAVE_SEM_INIT
key_t semkey = 0;
+ union semun semun;
+#endif
int sys_res;
mar_req_setup_t req_setup;
mar_res_setup_t res_setup;
- union semun semun;
char control_map_path[128];
char request_map_path[128];
char response_map_path[128];
@@ -568,6 +597,36 @@
return (CS_ERR_TRY_AGAIN);
}
+ res = memory_map (
+ control_map_path,
+ "control_buffer-XXXXXX",
+ (void *)&ipc_instance->control_buffer,
+ 8192);
+
+ res = memory_map (
+ request_map_path,
+ "request_buffer-XXXXXX",
+ (void *)&ipc_instance->request_buffer,
+ request_size);
+
+ res = memory_map (
+ response_map_path,
+ "response_buffer-XXXXXX",
+ (void *)&ipc_instance->response_buffer,
+ response_size);
+
+ res = circular_memory_map (
+ dispatch_map_path,
+ "dispatch_buffer-XXXXXX",
+ (void *)&ipc_instance->dispatch_buffer,
+ dispatch_size);
+
+#ifdef HAVE_SEM_INIT
+ sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+#else
+
/*
* Allocate a semaphore segment
*/
@@ -600,31 +659,8 @@
if (res != 0) {
goto res_exit;
}
+#endif
- res = memory_map (
- control_map_path,
- "control_buffer-XXXXXX",
- (void *)&ipc_instance->control_buffer,
- 8192);
-
- res = memory_map (
- request_map_path,
- "request_buffer-XXXXXX",
- (void *)&ipc_instance->request_buffer,
- request_size);
-
- res = memory_map (
- response_map_path,
- "response_buffer-XXXXXX",
- (void *)&ipc_instance->response_buffer,
- response_size);
-
- res = circular_memory_map (
- dispatch_map_path,
- "dispatch_buffer-XXXXXX",
- (void *)&ipc_instance->dispatch_buffer,
- dispatch_size);
-
/*
* Initialize IPC setup message
*/
@@ -637,7 +673,10 @@
req_setup.request_size = request_size;
req_setup.response_size = response_size;
req_setup.dispatch_size = dispatch_size;
+
+#ifndef HAVE_SEM_INIT
req_setup.semkey = semkey;
+#endif
res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
if (res != CS_OK) {
@@ -668,8 +707,10 @@
res_exit:
close (request_fd);
+#ifndef HAVE_SEM_INIT
if (ipc_instance->semid > 0)
semctl (ipc_instance->semid, 0, IPC_RMID);
+#endif
return (res_setup.error);
}
@@ -821,7 +862,9 @@
cs_error_t
coroipcc_dispatch_put (hdb_handle_t handle)
{
+#ifndef HAVE_SEM_INIT
struct sembuf sop;
+#endif
coroipc_response_header_t *header;
struct ipc_instance *ipc_instance;
int res;
@@ -832,6 +875,9 @@
if (res != CS_OK) {
return (res);
}
+#ifdef HAVE_SEM_INIT
+ sem_wait (&ipc_instance->control_buffer->sem2);
+#else
sop.sem_num = 2;
sop.sem_op = -1;
sop.sem_flg = 0;
@@ -847,6 +893,7 @@
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
addr = ipc_instance->dispatch_buffer;
_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic