[prev in list] [next in list] [prev in thread] [next in thread]
List: openais
Subject: [Openais] ipc-18
From: Steven Dake <sdake () redhat ! com>
Date: 2009-01-31 20:41:50
Message-ID: 1233434510.9117.49.camel () sdake-laptop
[Download RAW message or body]
latest ipc patch for whitetank with a few bugfixes.
Regards
-steve
["ipc-18.patch" (ipc-18.patch)]
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c (revision 1687)
+++ test/cpgbench.c (working copy)
@@ -1,4 +1,5 @@
#define _BSD_SOURCE
+#include <assert.h>
/*
* Copyright (c) 2006-2007 Red Hat, Inc.
*
@@ -92,6 +93,7 @@
struct iovec iov;
unsigned int res;
cpg_flow_control_state_t flow_control_state;
+ unsigned int j = 0;
alarm_notice = 0;
iov.iov_base = data;
@@ -105,19 +107,24 @@
/*
* Test checkpoint write
*/
+for (j = 0; j < 400; j++) {
cpg_flow_control_state_get (handle, &flow_control_state);
if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
retry:
res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
if (res == CPG_ERR_TRY_AGAIN) {
- goto retry;
+ break;
}
+ } else {
+ break;
}
+}
res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
if (res != CPG_OK) {
printf ("cpg dispatch returned error %d\n", res);
exit (1);
}
+ assert (res == CPG_OK);
} while (alarm_notice == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
@@ -144,10 +151,11 @@
int main (void) {
cpg_handle_t handle;
- unsigned int size = 1;
+ unsigned int size;
int i;
unsigned int res;
+ size = 1000;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CPG_OK) {
Index: test/Makefile
===================================================================
--- test/Makefile (revision 1687)
+++ test/Makefile (working copy)
@@ -44,15 +44,17 @@
testamf4.c testamf5.c testamf6.c testamfth.c \
testckpt.c ckptstress.c ckptbench.c \
ckptbenchth.c testevt.c testevs.c evsbench.c \
- subscription.c publish.c evtbench.c \
- sa_error.c unlink.c testclm2.c testlck.c testmsg.c
+ evsverify.c subscription.c publish.c evtbench.c \
+ sa_error.c unlink.c testclm2.c testlck.c
all: testclm testamf1 \
testckpt ckptstress ckptbench \
ckptbenchth ckpt-rd ckpt-wr ckpt-overload-exit testevt testevs \
- evsbench subscription publish evtbench unlink testclm2 testlck \
- testmsg testcpg testcpg2 cpgbench openais-cfgtool
+ evsbench evsverify subscription publish evtbench unlink testclm2 \
+ testlck testcpg testcpg2 cpgbench openais-cfgtool
+# testmsg testcpg testcpg2 cpgbench openais-cfgtool
+
testtimer: testtimer.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o
@@ -92,6 +94,9 @@
evsbench: evsbench.o $(LIBS)
$(CC) $(LDFLAGS) -o evsbench evsbench.o $(LIBS)
+evsverify: evsverify.o $(LIBS)
+ $(CC) $(LDFLAGS) -o evsverify evsverify.o $(LIBS) ../exec/crypto.o
+
testclm: testclm.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testclm testclm.o $(LIBS)
@@ -137,9 +142,6 @@
testlck: testlck.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testlck testlck.o $(LIBS)
-testmsg: testmsg.o $(LIBRARIES)
- $(CC) $(LDFLAGS) -o testmsg testmsg.o $(LIBS)
-
testcpg: testcpg.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS)
@@ -156,9 +158,11 @@
rm -f *.o testclm testamf testamf1 testamf2 testamf3 testamf4 \
testamf5 testamf6 testamfth testckpt ckptstress testtimer \
ckptbench ckptbenchth testevt testevs ckpt-wr ckpt-rd \
- evsbench subscription publish evtbench unlink testmsg testcpg \
+ evsbench subscription publish evtbench unlink testcpg \
testclm2 testlck cpgbench openais-cfgtool
+# #testmsg testcpg
+
%.o: %.c
$(CC) $(CFLAGS) $(CPPFLAGS) $(EXTRA_CFLAGS) -c -o $@ $<
Index: test/evsverify.c
===================================================================
--- test/evsverify.c (revision 0)
+++ test/evsverify.c (revision 0)
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2009 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <string.h>
+
+#include "saAis.h"
+#include "evs.h"
+#include "../exec/crypto.h"
+
+char *delivery_string;
+struct msg {
+ unsigned int msg_size;
+ unsigned char sha1[20];
+ unsigned char buffer[0];
+};
+
+int deliveries = 0;
+void evs_deliver_fn (
+ unsigned int nodeid,
+ void *m,
+ int msg_len)
+{
+ struct msg *msg2 = (struct msg *)m;
+ unsigned char sha1_compare[20];
+ hash_state sha1_hash;
+ unsigned int i;
+
+ printf ("API '%s' msg '%s'\n", delivery_string, msg2->buffer);
+ sha1_init (&sha1_hash);
+ sha1_process (&sha1_hash, msg2->buffer, msg2->msg_size);
+ sha1_done (&sha1_hash, sha1_compare);
+printf ("SIZE %d HASH: ", msg2->msg_size);
+for (i = 0; i < 20; i++) {
+printf ("%x", sha1_compare[i]);
+}
+printf ("\n");
+ if (memcmp (sha1_compare, msg2->sha1, 20) != 0) {
+ printf ("incorrect hash\n");
+ exit (1);
+ }
+ deliveries++;
+}
+
+void evs_confchg_fn (
+ unsigned int *member_list, int member_list_entries,
+ unsigned int *left_list, int left_list_entries,
+ unsigned int *joined_list, int joined_list_entries)
+{
+ int i;
+
+ printf ("CONFIGURATION CHANGE\n");
+ printf ("--------------------\n");
+ printf ("New configuration\n");
+ for (i = 0; i < member_list_entries; i++) {
+ printf ("%x\n", member_list[i]);
+ }
+ printf ("Members Left:\n");
+ for (i = 0; i < left_list_entries; i++) {
+ printf ("%x\n", left_list[i]);
+ }
+ printf ("Members Joined:\n");
+ for (i = 0; i < joined_list_entries; i++) {
+ printf ("%x\n", joined_list[i]);
+ }
+}
+
+evs_callbacks_t callbacks = {
+ evs_deliver_fn,
+ evs_confchg_fn
+};
+
+struct evs_group groups[3] = {
+ { "key1" },
+ { "key2" },
+ { "key3" }
+};
+
+struct msg msg;
+
+unsigned char buffer[200000];
+int main (void)
+{
+ evs_handle_t handle;
+ SaAisErrorT result;
+ unsigned int i = 0, j;
+ int fd;
+ unsigned int member_list[32];
+ unsigned int local_nodeid;
+ unsigned int member_list_entries = 32;
+ struct msg msg;
+ hash_state sha1_hash;
+ struct iovec iov[2];
+
+ result = evs_initialize (&handle, &callbacks);
+ if (result != EVS_OK) {
+ printf ("Couldn't initialize EVS service %d\n", result);
+ exit (0);
+ }
+
+ result = evs_membership_get (handle, &local_nodeid,
+ member_list, &member_list_entries);
+ printf ("Current membership from evs_membership_get entries %d\n",
+ member_list_entries);
+ for (i = 0; i < member_list_entries; i++) {
+ printf ("member [%d] is %x\n", i, member_list[i]);
+ }
+ printf ("local processor from evs_membership_get %x\n", local_nodeid);
+
+ printf ("Init result %d\n", result);
+ result = evs_join (handle, groups, 3);
+ printf ("Join result %d\n", result);
+ result = evs_leave (handle, &groups[0], 1);
+ printf ("Leave result %d\n", result);
+ delivery_string = "evs_mcast_joined";
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = sizeof (struct msg);
+ iov[1].iov_base = buffer;
+
+ /*
+ * Demonstrate evs_mcast_joined
+ */
+ for (i = 0; i < 1000000000; i++) {
+ msg.msg_size = 99 + rand() % 100000;
+ iov[1].iov_len = msg.msg_size;
+ for (j = 0; j < msg.msg_size; j++) {
+ buffer[j] = j + msg.msg_size;
+ }
+
+ sprintf ((char *)buffer,
+ "evs_mcast_joined: This is message %12d", i);
+ sha1_init (&sha1_hash);
+ sha1_process (&sha1_hash, buffer,
+ msg.msg_size);
+ sha1_done (&sha1_hash, msg.sha1);
+try_again_one:
+ result = evs_mcast_joined (handle, EVS_TYPE_AGREED,
+ iov, 2);
+ if (result == EVS_ERR_TRY_AGAIN) {
+ goto try_again_one;
+ }
+ result = evs_dispatch (handle, EVS_DISPATCH_ALL);
+ }
+
+ evs_fd_get (handle, &fd);
+
+ evs_finalize (handle);
+
+ return (0);
+}
Index: test/testevs.c
===================================================================
--- test/testevs.c (revision 1687)
+++ test/testevs.c (working copy)
@@ -50,10 +50,9 @@
{
char *buf = msg;
-// buf += 100000;
-// printf ("Delivery callback\n");
printf ("API '%s' msg '%s'\n", delivery_string, buf);
deliveries++;
+printf ("%d\n", deliveries);
}
void evs_confchg_fn (
@@ -90,7 +89,7 @@
{ "key3" }
};
-char buffer[200000];
+char buffer[2000];
struct iovec iov = {
.iov_base = buffer,
.iov_len = sizeof (buffer)
@@ -104,7 +103,7 @@
int fd;
unsigned int member_list[32];
unsigned int local_nodeid;
- int member_list_entries = 32;
+ unsigned int member_list_entries = 32;
result = evs_initialize (&handle, &callbacks);
if (result != EVS_OK) {
@@ -133,25 +132,16 @@
*/
for (i = 0; i < 500; i++) {
sprintf (buffer, "evs_mcast_joined: This is message %d", i);
-#ifdef COMPILE_OUT
- sprintf (buffer,
- "%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d%d",
- i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, \
i, i, i, i,
- i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, \
i, i, i, i);
-#endif
try_again_one:
result = evs_mcast_joined (handle, EVS_TYPE_AGREED,
&iov, 1);
if (result == EVS_ERR_TRY_AGAIN) {
-//printf ("try again\n");
goto try_again_one;
}
- result = evs_dispatch (handle, EVS_DISPATCH_ALL);
}
- do {
- result = evs_dispatch (handle, EVS_DISPATCH_ALL);
- } while (deliveries < 20);
+ result = evs_dispatch (handle, EVS_DISPATCH_ALL);
+
/*
* Demonstrate evs_mcast_joined
*/
@@ -172,7 +162,7 @@
*/
do {
result = evs_dispatch (handle, EVS_DISPATCH_ALL);
- } while (deliveries < 900);
+ } while (deliveries < 1000);
evs_fd_get (handle, &fd);
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h (revision 1687)
+++ include/ipc_cpg.h (working copy)
@@ -117,7 +117,6 @@
struct res_lib_cpg_mcast {
mar_res_header_t header __attribute__((aligned(8)));
- mar_uint32_t flow_control_state __attribute__((aligned(8)));
};
/* Message from another node */
@@ -127,7 +126,6 @@
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t nodeid __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
- mar_uint32_t flow_control_state __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
Index: include/ipc_gen.h
===================================================================
--- include/ipc_gen.h (revision 1687)
+++ include/ipc_gen.h (working copy)
@@ -62,6 +62,12 @@
int id __attribute__((aligned(8)));
} mar_req_header_t __attribute__((aligned(8)));
+typedef struct {
+ int service __attribute__((aligned(8)));
+ unsigned long long shmkey __attribute__((aligned(8)));
+ unsigned long long semkey __attribute__((aligned(8)));
+} mar_req_setup_t __attribute__((aligned(8)));
+
static inline void swab_mar_req_header_t (mar_req_header_t *to_swab)
{
swab_mar_int32_t (&to_swab->size);
Index: exec/Makefile
===================================================================
--- exec/Makefile (revision 1687)
+++ exec/Makefile (working copy)
@@ -58,9 +58,9 @@
LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o \
$(AMF_OBJS)
# main executive objects
-MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c flow.c timer.c \
+MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c timer.c \
totemconfig.c mainconfig.c
-MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o flow.o timer.o \
+MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o timer.o \
totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
OTHER_OBJS = objdb.o
Index: exec/evs.c
===================================================================
--- exec/evs.c (revision 1687)
+++ exec/evs.c (working copy)
@@ -1,7 +1,7 @@
/*
* Copyright (c) 2004-2006 MontaVista Software, Inc.
- * Copyright (c) 2006-2007 Red Hat, Inc.
* Copyright (c) 2006 Sun Microsystems, Inc.
+ * Copyright (c) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake (sdake@redhat.com)
*
@@ -271,11 +271,13 @@
return (0);
}
+unsigned int count = 0;
static int evs_lib_exit_fn (void *conn)
{
struct evs_pd *evs_pd = (struct evs_pd *)openais_conn_private_data_get (conn);
list_del (&evs_pd->list);
+
return (0);
}
@@ -499,6 +501,7 @@
int found = 0;
int i, j;
struct evs_pd *evs_pd;
+ struct iovec iov[2];
res_evs_deliver_callback.header.size =
sizeof (struct res_evs_deliver_callback) +
@@ -532,14 +535,15 @@
if (found) {
res_evs_deliver_callback.local_nodeid = nodeid;
- openais_dispatch_send (
+ iov[0].iov_base = &res_evs_deliver_callback;
+ iov[0].iov_len = sizeof (struct res_evs_deliver_callback);
+ iov[1].iov_base = msg_addr;
+ iov[1].iov_len = req_exec_evs_mcast->msg_len;
+
+ openais_dispatch_iov_send (
evs_pd->conn,
- &res_evs_deliver_callback,
- sizeof (struct res_evs_deliver_callback));
- openais_dispatch_send (
- evs_pd->conn,
- msg_addr,
- req_exec_evs_mcast->msg_len);
+ iov,
+ 2);
}
}
}
Index: exec/flow.c
===================================================================
--- exec/flow.c (revision 1687)
+++ exec/flow.c (working copy)
@@ -1,466 +0,0 @@
-/*
- * Copyright (c) 2006-2007 Red Hat, Inc.
- *
- * All rights reserved.
- *
- * Author: Steven Dake (sdake@redhat.com)
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-/*
- * New messages are allowed from the library ONLY when the processor has not
- * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor. If a
- * OPENAIS_FLOW_CONTROL_STATE_ENABLED message is sent, it must later be
- * cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED message. A configuration
- * change with the flow controlled processor leaving the configuration will
- * also cancel flow control.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <assert.h>
-#include <pthread.h>
-
-#include "flow.h"
-#include "totem.h"
-#include "totempg.h"
-#include "print.h"
-#include "hdb.h"
-#include "../include/list.h"
-
-struct flow_control_instance {
- struct list_head list_head;
- unsigned int service;
-};
-
-DECLARE_LIST_INIT (flow_control_service_list_head);
-
-struct flow_control_message {
- unsigned int service __attribute__((aligned(8)));
- char id[1024] __attribute__((aligned(8)));
- unsigned int id_len __attribute__((aligned(8)));
- enum openais_flow_control_state flow_control_state __attribute__((aligned(8)));
-};
-
-struct flow_control_node_state {
- unsigned int nodeid;
- enum openais_flow_control_state flow_control_state;
-};
-
-struct flow_control_service {
- struct flow_control_node_state flow_control_node_state[PROCESSOR_COUNT_MAX];
- unsigned int service;
- char id[1024];
- unsigned int id_len;
- void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state \
flow_control_state);
- void *context;
- unsigned int processor_count;
- enum openais_flow_control_state flow_control_state;
- struct list_head list;
- struct list_head list_all;
-};
-
-static struct totempg_group flow_control_group = {
- .group = "flowcontrol",
- .group_len = 12
-};
-
-static totempg_groups_handle flow_control_handle;
-
-static struct hdb_handle_database flow_control_hdb = {
- .handle_count = 0,
- .handles = NULL,
- .iterator = 0,
- .mutex = PTHREAD_MUTEX_INITIALIZER
-};
-
-static unsigned int flow_control_member_list[PROCESSOR_COUNT_MAX];
-static unsigned int flow_control_member_list_entries;
-
-static inline int flow_control_xmit (
- struct flow_control_service *flow_control_service,
- enum openais_flow_control_state flow_control_state)
-{
- struct flow_control_message flow_control_message;
- struct iovec iovec;
- unsigned int res;
-
- flow_control_message.service = flow_control_service->service;
- flow_control_message.flow_control_state = flow_control_state;
- memcpy (&flow_control_message.id, flow_control_service->id,
- flow_control_service->id_len);
- flow_control_message.id_len = flow_control_service->id_len;
-
- iovec.iov_base = (char *)&flow_control_message;
- iovec.iov_len = sizeof (flow_control_message);
-
- res = totempg_groups_mcast_joined (flow_control_handle, &iovec, 1,
- TOTEMPG_AGREED);
-
- flow_control_service->flow_control_state_set_fn (
- flow_control_service->context,
- flow_control_service->flow_control_state);
-
- return (res);
-}
-
-static void flow_control_deliver_fn (
- unsigned int nodeid,
- struct iovec *iovec,
- int iov_len,
- int endian_conversion_required)
-{
- struct flow_control_message *flow_control_message = (struct flow_control_message \
*)iovec[0].iov_base;
- struct flow_control_service *flow_control_service;
- struct list_head *list;
- unsigned int i;
-
- for (list = flow_control_service_list_head.next;
- list != &flow_control_service_list_head;
- list = list->next) {
-
- flow_control_service = list_entry (list, struct flow_control_service, list_all);
- /*
- * Find this nodeid in the flow control service and set the message
- * enabled or disabled flag
- */
- for (i = 0; i < flow_control_service->processor_count; i++) {
- if (nodeid == flow_control_service->flow_control_node_state[i].nodeid) {
- flow_control_service->flow_control_node_state[i].flow_control_state =
- flow_control_message->flow_control_state;
- break;
- }
- }
-
- /*
- * Determine if any flow control is enabled on any nodes and set
- * the internal variable appropriately
- */
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
- flow_control_service->flow_control_state_set_fn (flow_control_service->context, \
flow_control_service->flow_control_state);
- for (i = 0; i < flow_control_service->processor_count; i++) {
- if (flow_control_service->flow_control_node_state[i].flow_control_state == \
OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
- flow_control_service->flow_control_state_set_fn (flow_control_service->context, \
flow_control_service->flow_control_state);
- }
- }
- } /* for list iteration */
-}
-
-static void flow_control_confchg_fn (
- enum totem_configuration_type configuration_type,
- unsigned int *member_list, int member_list_entries,
- unsigned int *left_list, int left_list_entries,
- unsigned int *joined_list, int joined_list_entries,
- struct memb_ring_id *ring_id)
-{
- unsigned int i;
- unsigned int j;
- struct flow_control_service *flow_control_service;
- struct list_head *list;
- struct flow_control_node_state flow_control_node_state_temp[PROCESSOR_COUNT_MAX];
-
- memcpy (flow_control_member_list, member_list,
- sizeof (unsigned int) * member_list_entries);
- flow_control_member_list_entries = member_list_entries;
-
- for (list = flow_control_service_list_head.next;
- list != &flow_control_service_list_head;
- list = list->next) {
-
- flow_control_service = list_entry (list, struct flow_control_service, list_all);
-
- /*
- * Generate temporary flow control node state information
- */
- for (i = 0; i < member_list_entries; i++) {
- flow_control_node_state_temp[i].nodeid = member_list[i];
- flow_control_node_state_temp[i].flow_control_state = \
OPENAIS_FLOW_CONTROL_STATE_DISABLED;
-
- /*
- * Determine if previous state was set for this processor
- * if so keep that setting
- */
- for (j = 0; j < flow_control_service->processor_count; j++) {
- if (flow_control_service->flow_control_node_state[j].nodeid == member_list[i]) {
- flow_control_node_state_temp[i].flow_control_state =
- flow_control_service->flow_control_node_state[j].flow_control_state;
- break; /* from for */
- }
- }
- }
-
- /*
- * Copy temporary node state information to node state information
- */
- memcpy (flow_control_service->flow_control_node_state,
- flow_control_node_state_temp,
- sizeof (struct flow_control_node_state) * member_list_entries);
-
- /*
- * Set all of the node ids after a configuration change
- * Turn on all flow control after a configuration change
- */
- flow_control_service->processor_count = flow_control_member_list_entries;
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
- for (i = 0; i < member_list_entries; i++) {
- if (flow_control_service->flow_control_node_state[i].flow_control_state == \
OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
- flow_control_service->flow_control_state_set_fn (flow_control_service->context, \
flow_control_service->flow_control_state);
- }
- }
-
- }
-}
-/*
- * External API
- */
-unsigned int openais_flow_control_initialize (void)
-{
- unsigned int res;
-
- log_init ("FLOW");
-
- res = totempg_groups_initialize (
- &flow_control_handle,
- flow_control_deliver_fn,
- flow_control_confchg_fn);
-
- if (res == -1) {
- log_printf (LOG_LEVEL_ERROR,
- "Couldn't initialize flow control interface.\n");
- return (-1);
- }
- res = totempg_groups_join (
- flow_control_handle,
- &flow_control_group,
- 1);
-
- if (res == -1) {
- log_printf (LOG_LEVEL_ERROR, "Couldn't join flow control group.\n");
- return (-1);
- }
-
- return (0);
-}
-
-unsigned int openais_flow_control_ipc_init (
- unsigned int *flow_control_handle,
- unsigned int service)
-{
- struct flow_control_instance *instance;
- unsigned int res;
-
- res = hdb_handle_create (&flow_control_hdb,
- sizeof (struct flow_control_instance), flow_control_handle);
- if (res != 0) {
- goto error_exit;
- }
- res = hdb_handle_get (&flow_control_hdb, *flow_control_handle,
- (void *)&instance);
- if (res != 0) {
- goto error_destroy;
- }
- instance->service = service;
-
- list_init (&instance->list_head);
-
- return (0);
-
-error_destroy:
- hdb_handle_destroy (&flow_control_hdb, *flow_control_handle);
-error_exit:
- return (-1);
-
-}
-
-unsigned int openais_flow_control_ipc_exit (
- unsigned int flow_control_handle)
-{
- hdb_handle_destroy (&flow_control_hdb, flow_control_handle);
- return (0);
-}
-
-unsigned int openais_flow_control_create (
- unsigned int flow_control_handle,
- unsigned int service,
- void *id,
- unsigned int id_len,
- void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state \
flow_control_state),
- void *context)
-{
- struct flow_control_service *flow_control_service;
- struct flow_control_instance *instance;
- unsigned int res;
- unsigned int i;
-
- res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
- (void *)&instance);
- if (res != 0) {
- goto error_exit;
- }
-
- flow_control_service = malloc (sizeof (struct flow_control_service));
- if (flow_control_service == NULL) {
- goto error_put;
- }
-
- /*
- * Add new service to flow control system
- */
- memset (flow_control_service, 0, sizeof (struct flow_control_service));
-
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
- flow_control_service->service = service;
- memcpy (flow_control_service->id, id, id_len);
- flow_control_service->id_len = id_len;
- flow_control_service->flow_control_state_set_fn = flow_control_state_set_fn;
- flow_control_service->context = context;
-
- list_init (&flow_control_service->list);
- list_add_tail (&instance->list_head,
- &flow_control_service->list);
-
- list_init (&flow_control_service->list_all);
- list_add_tail (&flow_control_service_list_head,
- &flow_control_service->list_all);
-
- for (i = 0; i < flow_control_member_list_entries; i++) {
- flow_control_service->flow_control_node_state[i].nodeid = \
flow_control_member_list[i];
- flow_control_service->processor_count = flow_control_member_list_entries;
- }
-error_put:
- hdb_handle_put (&flow_control_hdb, flow_control_handle);
-
-error_exit:
- return (res);
-}
-
-unsigned int openais_flow_control_destroy (
- unsigned int flow_control_identifier,
- unsigned int service,
- unsigned char *id,
- unsigned int id_len)
-{
- struct flow_control_service *flow_control_service;
- struct flow_control_instance *instance;
- struct list_head *list;
- unsigned int res;
-
- res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
- (void *)&instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (list = flow_control_service_list_head.next;
- list != &flow_control_service_list_head;
- list = list->next) {
-
- flow_control_service = list_entry (list, struct flow_control_service, list_all);
-
- if ((flow_control_service->id_len == id_len) &&
- (memcmp (flow_control_service->id, id, id_len) == 0)) {
- flow_control_xmit (flow_control_service,
- OPENAIS_FLOW_CONTROL_STATE_DISABLED);
- list_del (&flow_control_service->list);
- list_del (&flow_control_service->list_all);
- free (flow_control_service);
- break; /* done - no delete-safe for loop needed */
- }
- }
- hdb_handle_put (&flow_control_hdb, flow_control_handle);
-
-error_exit:
- return (res);
-}
-
-/*
- * Disable the ability for new messages to be sent for this service
- * with the handle id of length id_len
- */
-unsigned int openais_flow_control_disable (
- unsigned int flow_control_handle)
-{
- struct flow_control_instance *instance;
- struct flow_control_service *flow_control_service;
- struct list_head *list;
- unsigned int res;
-
- res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
- (void *)&instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (list = instance->list_head.next;
- list != &instance->list_head;
- list = list->next) {
-
- flow_control_service = list_entry (list, struct flow_control_service, list);
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
- flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_DISABLED);
- }
- hdb_handle_put (&flow_control_hdb, flow_control_handle);
-
-error_exit:
- return (res);
-}
-
-/*
- * Enable the ability for new messagess to be sent for this service
- * with the handle id of length id_len
- */
-unsigned int openais_flow_control_enable (
- unsigned int flow_control_handle)
-{
- struct flow_control_instance *instance;
- struct flow_control_service *flow_control_service;
- struct list_head *list;
- unsigned int res;
-
- res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
- (void *)&instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (list = instance->list_head.next;
- list != &instance->list_head;
- list = list->next) {
-
-
- flow_control_service = list_entry (list, struct flow_control_service, list);
- flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
- flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_ENABLED);
- }
- hdb_handle_put (&flow_control_hdb, flow_control_handle);
-
-error_exit:
- return (res);
-}
Index: exec/flow.h
===================================================================
--- exec/flow.h (revision 1687)
+++ exec/flow.h (working copy)
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2006-2007 Red Hat, Inc.
- *
- * All rights reserved.
- *
- * Author: Steven Dake (sdake@redhat.com)
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef FLOW_H_DEFINED
-#define FLOW_H_DEFINED
-
-enum openais_flow_control_state {
- OPENAIS_FLOW_CONTROL_STATE_DISABLED,
- OPENAIS_FLOW_CONTROL_STATE_ENABLED
-};
-
-unsigned int openais_flow_control_initialize (void);
-
-unsigned int openais_flow_control_ipc_init (
- unsigned int *flow_control_identifier,
- unsigned int service);
-
-unsigned int openais_flow_control_ipc_exit (
- unsigned int flow_control_identifier);
-
-unsigned int openais_flow_control_create (
- unsigned int flow_control_handle,
- unsigned int service,
- void *id,
- unsigned int id_len,
- void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state \
flow_control_state),
- void *context);
-
-unsigned int openais_flow_control_destroy (
- unsigned int flow_control_identifier,
- unsigned int service,
- unsigned char *id,
- unsigned int id_len);
-
-unsigned int openais_flow_control_disable (
- unsigned int flow_control_identifier);
-
-unsigned int openais_flow_control_enable (
- unsigned int flow_control_identifier);
-
-#endif /* FLOW_H_DEFINED */
Index: exec/cpg.c
===================================================================
--- exec/cpg.c (revision 1687)
+++ exec/cpg.c (working copy)
@@ -63,7 +63,6 @@
#include "totempg.h"
#include "totemip.h"
#include "main.h"
-#include "flow.h"
#include "tlist.h"
#include "ipc.h"
#include "mempool.h"
@@ -72,7 +71,6 @@
#include "jhash.h"
#include "swab.h"
#include "ipc.h"
-#include "flow.h"
#include "print.h"
#define GROUP_HASH_SIZE 32
@@ -110,7 +108,6 @@
void *conn;
void *trackerconn;
struct group_info *group;
- enum openais_flow_control_state flow_control_state;
struct list_head list; /* on the group_info members list */
};
@@ -489,11 +486,13 @@
notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN;
cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, \
CONFCHG_CPG_REASON_PROCDOWN); list_del(&pi->list);
+/*
openais_ipc_flow_control_destroy (
conn,
CPG_SERVICE,
(unsigned char *)gi->group_name.value,
(unsigned int)gi->group_name.length);
+*/
}
return (0);
}
@@ -663,15 +662,6 @@
}
}
-static void cpg_flow_control_state_set_fn (
- void *context,
- enum openais_flow_control_state flow_control_state)
-{
- struct process_info *process_info = (struct process_info *)context;
-
- process_info->flow_control_state = flow_control_state;
-}
-
/* Can byteswap join & leave messages */
static void exec_cpg_procjoin_endian_convert (void *msg)
{
@@ -861,7 +851,7 @@
list_del(&pi->list);
if (pi->conn) {
- openais_conn_info_refcnt_dec(pi->conn);
+// openais_conn_info_refcnt_dec(pi->conn);
} else {
free(pi);
}
@@ -919,12 +909,9 @@
res_lib_cpg_mcast->msglen = msglen;
res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast->nodeid = nodeid;
- res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
if (message_source_is_local (&req_exec_cpg_mcast->source)) {
- openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
+ openais_conn_refcount_dec (req_exec_cpg_mcast->source.conn);
process_info = (struct process_info *)openais_conn_private_data_get \
(req_exec_cpg_mcast->source.conn);
- res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
- openais_conn_info_refcnt_dec (req_exec_cpg_mcast->source.conn);
}
memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
sizeof(mar_cpg_name_t));
@@ -1014,7 +1001,7 @@
struct process_info *pi = (struct process_info *)openais_conn_private_data_get \
(conn); pi->conn = conn;
- openais_conn_info_refcnt_inc (conn);
+// openais_conn_info_refcnt_inc (conn);
log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi);
return (0);
}
@@ -1042,14 +1029,6 @@
goto join_err;
}
- openais_ipc_flow_control_create (
- conn,
- CPG_SERVICE,
- req_lib_cpg_join->group_name.value,
- req_lib_cpg_join->group_name.length,
- cpg_flow_control_state_set_fn,
- pi);
-
/* Add a node entry for us */
pi->nodeid = totempg_my_nodeid_get();
pi->pid = req_lib_cpg_join->pid;
@@ -1085,11 +1064,13 @@
cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, \
CONFCHG_CPG_REASON_LEAVE); pi->group = NULL;
+/*
openais_ipc_flow_control_destroy (
conn,
CPG_SERVICE,
(unsigned char *)gi->group_name.value,
(unsigned int)gi->group_name.length);
+*/
leave_ret:
/* send return */
@@ -1118,7 +1099,6 @@
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
res_lib_cpg_mcast.header.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? \
*/
- res_lib_cpg_mcast.flow_control_state = CPG_FLOW_CONTROL_DISABLED;
openais_response_send(conn, &res_lib_cpg_mcast,
sizeof(res_lib_cpg_mcast));
return;
@@ -1127,7 +1107,6 @@
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
- openais_conn_info_refcnt_inc (conn);
req_exec_cpg_mcast.pid = pi->pid;
req_exec_cpg_mcast.msglen = msglen;
message_source_set (&req_exec_cpg_mcast.source, conn);
@@ -1140,13 +1119,16 @@
req_exec_cpg_iovec[1].iov_len = msglen;
// TODO: guarantee type...
+ openais_conn_refcount_inc (conn);
result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, \
TOTEMPG_AGREED);
- openais_ipc_flow_control_local_increment (conn);
+if (result != 0) {
+ printf ("result is %d\n", result);
+}
+ assert (result == 0);
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
res_lib_cpg_mcast.header.error = SA_AIS_OK;
- res_lib_cpg_mcast.flow_control_state = pi->flow_control_state;
openais_response_send(conn, &res_lib_cpg_mcast,
sizeof(res_lib_cpg_mcast));
}
Index: exec/ipc.c
===================================================================
--- exec/ipc.c (revision 1687)
+++ exec/ipc.c (working copy)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2007 Red Hat, Inc.
+ * Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
@@ -60,6 +60,9 @@
#include <ucred.h>
#endif
+#include <sys/shm.h>
+#include <sys/sem.h>
+
#include "swab.h"
#include "../include/saAis.h"
#include "../include/list.h"
@@ -72,10 +75,8 @@
#include "mainconfig.h"
#include "totemconfig.h"
#include "main.h"
-#include "flow.h"
#include "tlist.h"
#include "ipc.h"
-#include "flow.h"
#include "service.h"
#include "sync.h"
#include "swab.h"
@@ -93,648 +94,315 @@
#define SERVER_BACKLOG 5
-/*
- * When there are this many entries left in a queue, turn on flow control
- */
-#define FLOW_CONTROL_ENTRIES_ENABLE 400
+#define MSG_SEND_LOCKED 0
+#define MSG_SEND_UNLOCKED 1
-/*
- * When there are this many entries in a queue, turn off flow control
- */
-#define FLOW_CONTROL_ENTRIES_DISABLE 64
-
-
static unsigned int g_gid_valid = 0;
-static totempg_groups_handle ipc_handle;
+DECLARE_LIST_INIT (conn_info_list_head);
-static pthread_mutex_t conn_io_list_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-DECLARE_LIST_INIT (conn_io_list_head);
-
-static void (*ipc_serialize_lock_fn) (void);
-
-static void (*ipc_serialize_unlock_fn) (void);
-
struct outq_item {
void *msg;
size_t mlen;
+ struct list_head list;
};
-enum conn_io_state {
- CONN_IO_STATE_INITIALIZING,
- CONN_IO_STATE_AUTHENTICATED,
- CONN_IO_STATE_INIT_FAILED
-};
+#define REQ_SIZE 1000000
+#define RES_SIZE 1000000
+#define DISPATCH_SIZE 1000000
-enum conn_info_state {
- CONN_INFO_STATE_INITIALIZING,
- CONN_INFO_STATE_ACTIVE,
- CONN_INFO_STATE_DISCONNECT_REQUESTED,
- CONN_INFO_STATE_DISCONNECTED
+struct shared_memory {
+ volatile unsigned char req_buffer[REQ_SIZE];
+ unsigned char res_buffer[RES_SIZE];
+ unsigned char dispatch_buffer[DISPATCH_SIZE];
+ unsigned int read;
+ unsigned int write;
};
-struct conn_info;
-
-struct conn_io {
- int fd; /* File descriptor */
- unsigned int events; /* events polled for by file descriptor */
- pthread_t thread; /* thread identifier */
- pthread_attr_t thread_attr; /* thread attribute */
- char *inb; /* Input buffer for non-blocking reads */
- int inb_nextheader; /* Next message header starts here */
- int inb_start; /* Start location of input buffer */
- int inb_inuse; /* Bytes currently stored in input buffer */
- struct queue outq; /* Circular queue for outgoing requests */
- int byte_start; /* Byte to start sending from in head of queue */
- unsigned int fcc; /* flow control local count */
- enum conn_io_state state; /* state of this conn_io connection */
- struct conn_info *conn_info; /* connection information combining multiple conn_io \
structs */
- unsigned int refcnt; /* reference count for conn_io data structure */
- pthread_mutex_t mutex;
+struct conn_info {
+ int fd;
+ pthread_t thread;
+ pthread_attr_t thread_attr;
unsigned int service;
+ int destroyed;
+ int disconnect_requested;
+ int notify_flow_control_enabled;
+ int refcount;
+ key_t shmkey;
+ key_t semkey;
+ int shmid;
+ int semid;
+ unsigned int pending_semops;
+ pthread_mutex_t mutex;
+ struct shared_memory *mem;
+ struct list_head outq_head;
+ void *private_data;
+ int (*lib_exit_fn) (void *conn);
struct list_head list;
};
+static int shared_mem_dispatch_bytes_left (struct conn_info *conn_info);
-struct conn_info {
- enum conn_info_state state; /* State of this connection */
- enum service_types service; /* Type of service so dispatch knows how to route \
message */
- void *private_data; /* library connection private data */
- unsigned int flow_control_handle; /* flow control identifier */
- unsigned int flow_control_enabled; /* flow control enabled bit */
- enum openais_flow_control flow_control; /* Does this service use IPC flow control \
*/
- pthread_mutex_t flow_control_mutex;
- unsigned int flow_control_local_count; /* flow control local count */
- int (*lib_exit_fn) (void *conn);
- pthread_mutex_t mutex;
- struct conn_io *conn_io_response;
- struct conn_io *conn_io_dispatch;
- unsigned int refcnt;
-};
+static void outq_flush (struct conn_info *conn_info);
-static void *prioritized_poll_thread (void *conn_io_in);
-static int conn_io_outq_flush (struct conn_io *conn_io);
-static void conn_io_deliver (struct conn_io *conn_io);
-//static void ipc_flow_control (struct conn_info *conn_info);
-static inline void conn_info_destroy (struct conn_info *conn_info);
-static void conn_io_destroy (struct conn_io *conn_io);
-static int conn_io_send (struct conn_io *conn_io, void *msg, int mlen);
-static inline struct conn_info *conn_info_create (void);
-static int conn_io_found (struct conn_io *conn_io_to_match);
+static void ipc_disconnect (struct conn_info *conn_info);
-static int response_init_send (struct conn_io *conn_io, void *message);
-static int dispatch_init_send (struct conn_io *conn_io, void *message);
-
- /*
- * IPC Initializers
- */
-
-static int conn_io_refcnt_value (struct conn_io *conn_io)
+static inline int conn_info_destroy (struct conn_info *conn_info)
{
- unsigned int refcnt;
+ unsigned int res;
- pthread_mutex_lock (&conn_io->mutex);
- refcnt = conn_io->refcnt;
- pthread_mutex_unlock (&conn_io->mutex);
+ list_del (&conn_info->list);
+ list_init (&conn_info->list);
- return (refcnt);
-}
-
-static void conn_io_refcnt_inc (struct conn_io *conn_io)
-{
- pthread_mutex_lock (&conn_io->mutex);
- conn_io->refcnt += 1;
- pthread_mutex_unlock (&conn_io->mutex);
-}
-
-static int conn_io_refcnt_dec (struct conn_io *conn_io)
-{
- unsigned int refcnt;
-
- pthread_mutex_lock (&conn_io->mutex);
- conn_io->refcnt -= 1;
- refcnt = conn_io->refcnt;
- pthread_mutex_unlock (&conn_io->mutex);
-
- return (refcnt);
-}
-
-static void conn_info_refcnt_inc (struct conn_info *conn_info)
-{
/*
- * Connection not fully initialized yet
+ * Destroy shared memory segment and semaphore
*/
- if (conn_info == NULL) {
- return;
+ if (conn_info->destroyed == 0) {
+ openais_conn_refcount_dec (conn_info);
+ shmdt (conn_info->mem);
+ res = shmctl (conn_info->shmid, IPC_RMID, NULL);
+ semctl (conn_info->semid, 0, IPC_RMID);
+ conn_info->destroyed = 1;
}
+
pthread_mutex_lock (&conn_info->mutex);
- conn_info->refcnt += 1;
+ if (conn_info->refcount > 0) {
+ pthread_mutex_unlock (&conn_info->mutex);
+ return (-1);
+ }
pthread_mutex_unlock (&conn_info->mutex);
-}
-static void conn_info_refcnt_dec (struct conn_info *conn_info)
-{
- int refcnt;
-
/*
- * Connection not fully initialized yet
+ * Retry library exit function if busy
*/
- if (conn_info == NULL) {
- return;
+ res = ais_service[conn_info->service]->lib_exit_fn (conn_info);
+ if (res == -1) {
+ return (-1);
}
- pthread_mutex_lock (&conn_info->mutex);
- conn_info->refcnt -= 1;
- refcnt = conn_info->refcnt;
- assert (refcnt >= 0);
- pthread_mutex_unlock (&conn_info->mutex);
- if (refcnt == 0) {
- conn_info_destroy (conn_info);
- }
-}
-
-void openais_conn_info_refcnt_dec (void *conn)
-{
- struct conn_info *conn_info = (struct conn_info *)conn;
-
- conn_info_refcnt_dec (conn_info);
-}
-
-void openais_conn_info_refcnt_inc (void *conn)
-{
- struct conn_info *conn_info = (struct conn_info *)conn;
-
- conn_info_refcnt_inc (conn_info);
-}
-
-static int (*ais_init_service[]) (struct conn_io *conn_io, void *message) = {
- response_init_send,
- dispatch_init_send
-};
-
-static void disconnect_request (struct conn_info *conn_info)
-{
-unsigned int res;
/*
- * connection not fully active yet
+ * Free allocated data needed to retry exiting library IPC connection
*/
- if (conn_info == NULL) {
- return;
+ if (conn_info->private_data) {
+ free (conn_info->private_data);
}
- /*
- * We only want to decrement the reference count on these two
- * conn_io contexts one time
- */
- if (conn_info->state != CONN_INFO_STATE_ACTIVE) {
- return;
- }
- res = conn_io_refcnt_dec (conn_info->conn_io_response);
- res = conn_io_refcnt_dec (conn_info->conn_io_dispatch);
- conn_info->state = CONN_INFO_STATE_DISCONNECT_REQUESTED;
-}
-
-static int response_init_send (
- struct conn_io *conn_io,
- void *message)
-{
- SaAisErrorT error = SA_AIS_ERR_ACCESS;
- uintptr_t cinfo = (uintptr_t)conn_io;
- mar_req_lib_response_init_t *req_lib_response_init = (mar_req_lib_response_init_t \
*)message;
- mar_res_lib_response_init_t res_lib_response_init;
-
- if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) {
- error = SA_AIS_OK;
- conn_io->service = req_lib_response_init->resdis_header.service;
- }
- res_lib_response_init.header.size = sizeof (mar_res_lib_response_init_t);
- res_lib_response_init.header.id = MESSAGE_RES_INIT;
- res_lib_response_init.header.error = error;
- res_lib_response_init.conn_info = (mar_uint64_t)cinfo;
-
- conn_io_send (
- conn_io,
- &res_lib_response_init,
- sizeof (res_lib_response_init));
-
- if (error == SA_AIS_ERR_ACCESS) {
- conn_io_destroy (conn_io);
- return (-1);
- }
-
+ close (conn_info->fd);
+ list_del (&conn_info->list);
+ free (conn_info);
return (0);
}
-/*
- * This is called iwth ipc_serialize_lock_fn() called
- * Therefore there are no races with the destruction of the conn_io
- * data structure
- */
-static int dispatch_init_send (
- struct conn_io *conn_io,
- void *message)
+struct res_overlay {
+ mar_res_header_t header __attribute__((aligned(8)));
+ char buf[4096];
+};
+
+static void *pthread_ipc_consumer (void *conn)
{
- SaAisErrorT error = SA_AIS_ERR_ACCESS;
- uintptr_t cinfo;
- mar_req_lib_dispatch_init_t *req_lib_dispatch_init = (mar_req_lib_dispatch_init_t \
*)message;
- mar_res_lib_dispatch_init_t res_lib_dispatch_init;
- struct conn_io *msg_conn_io;
- struct conn_info *conn_info;
- unsigned int service;
+ struct conn_info *conn_info = (struct conn_info *)conn;
+ struct sembuf sop;
+ int res;
+ mar_req_header_t *header;
+ struct res_overlay res_overlay;
+ struct iovec send_ok_joined_iovec;
+ int send_ok = 0;
+ int send_ok_joined = 0;
- service = req_lib_dispatch_init->resdis_header.service;
- cinfo = (uintptr_t)req_lib_dispatch_init->conn_info;
- msg_conn_io = (struct conn_io *)cinfo;
-
- /*
- * The response IPC connection has disconnected already for
- * some reason and is no longer referenceable in the system
- */
- if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) {
- /*
- * If the response conn_io isn't found, it disconnected.
- * Hence, a full connection cannot be made and this connection
- * should be aborted by the poll thread
- */
- if (conn_io_found (msg_conn_io) == 0) {
- error = SA_AIS_ERR_TRY_AGAIN;
- conn_io->state = CONN_IO_STATE_INIT_FAILED;
+ for (;;) {
+ sop.sem_num = 0;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
} else
- /*
- * If no service is found for the requested library service,
- * the proper service handler isn't loaded and this connection
- * should be aborted by the poll thread
- */
- if (ais_service[service] == NULL) {
- error = SA_AIS_ERR_NOT_SUPPORTED;
- conn_io->state = CONN_IO_STATE_INIT_FAILED;
- } else {
- error = SA_AIS_OK;
+ if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
+ openais_conn_refcount_dec (conn);
+ return (0);
}
-
- /*
- * The response and dispatch conn_io structures are available.
- * Attempt to allocate the appropriate memory for the private
- * data area
- */
- if (error == SA_AIS_OK) {
- int private_data_size;
-
- conn_info = conn_info_create ();
- private_data_size = ais_service[service]->private_data_size;
- if (private_data_size) {
- conn_info->private_data = malloc (private_data_size);
-
- /*
- * No private data could be allocated so
- * request the poll thread to abort
- */
- if (conn_info->private_data == NULL) {
- conn_io->state = CONN_IO_STATE_INIT_FAILED;
- error = SA_AIS_ERR_NO_MEMORY;
- } else {
- memset (conn_info->private_data, 0, private_data_size);
- }
- } else {
- conn_info->private_data = NULL;
- }
+ if (conn_info->destroyed || conn_info->disconnect_requested) {
+ break;
}
- }
- res_lib_dispatch_init.header.size = sizeof (mar_res_lib_dispatch_init_t);
- res_lib_dispatch_init.header.id = MESSAGE_RES_INIT;
- res_lib_dispatch_init.header.error = error;
+ header = (mar_req_header_t *)conn_info->mem->req_buffer;
- if (error != SA_AIS_OK) {
- conn_io_send (
- conn_io,
- &res_lib_dispatch_init,
- sizeof (res_lib_dispatch_init));
+ send_ok_joined_iovec.iov_base = (char *)header;
+ send_ok_joined_iovec.iov_len = header->size;
+ send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
+ &send_ok_joined_iovec, 1);
- return (-1);
- }
+ send_ok =
+ (sync_primary_designated() == 1) && (
+ (ais_service[conn_info->service]->lib_service[header->id].flow_control == \
OPENAIS_FLOW_CONTROL_NOT_REQUIRED) || \
+ ((ais_service[conn_info->service]->lib_service[header->id].flow_control == \
OPENAIS_FLOW_CONTROL_REQUIRED) && + (send_ok_joined) &&
+ (sync_in_process() == 0)));
- /*
- * connect both dispatch and response conn_ios into the conn_info
- * data structure
- */
- conn_info->state = CONN_INFO_STATE_ACTIVE;
- conn_info->lib_exit_fn = ais_service[service]->lib_exit_fn;
- conn_info->conn_io_response = msg_conn_io;
- conn_info->conn_io_response->conn_info = conn_info;
- conn_info->conn_io_dispatch = conn_io;
- conn_info->service = service;
- conn_io->service = service;
- conn_io->conn_info = conn_info;
- ais_service[conn_info->service]->lib_init_fn (conn_info);
+ if (send_ok) {
+ ais_service[conn_info->service]->lib_service[header->id].lib_handler_fn \
(conn_info, header); + } else {
- conn_info->flow_control = ais_service[conn_info->service]->flow_control;
- if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) \
{
- openais_flow_control_ipc_init (
- &conn_info->flow_control_handle,
- conn_info->service);
-
+ /*
+ * Overload, tell library to retry
+ */
+ res_overlay.header.size =
+ ais_service[conn_info->service]->lib_service[header->id].response_size;
+ res_overlay.header.id =
+ ais_service[conn_info->service]->lib_service[header->id].response_id;
+ res_overlay.header.error = SA_AIS_ERR_TRY_AGAIN;
+ openais_response_send (conn_info, &res_overlay,
+ res_overlay.header.size);
+ }
}
-
- /*
- * Tell the library the IPC connections are configured
- */
- conn_io_send (
- conn_io,
- &res_lib_dispatch_init,
- sizeof (res_lib_dispatch_init));
- return (0);
+ openais_conn_refcount_dec (conn);
+ return (NULL);
}
-/*
- * Create a connection data structure
- */
-static inline struct conn_info *conn_info_create (void)
+static int poll_handler_connection (
+ poll_handle handle,
+ int fd,
+ int revent,
+ void *data)
{
- struct conn_info *conn_info;
+ mar_req_setup_t req_setup;
+ struct conn_info *conn_info = (struct conn_info *)data;
+ int res;
+ char buf;
- conn_info = malloc (sizeof (struct conn_info));
- if (conn_info == 0) {
- return (NULL);
- }
+ /*
+ * TODO this needs to be a secure nonblocking read
+ */
+ if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
+ recv (fd, &req_setup, sizeof (mar_req_setup_t), MSG_WAITALL);
- memset (conn_info, 0, sizeof (struct conn_info));
+ pthread_mutex_init (&conn_info->mutex, NULL);
+ conn_info->shmkey = req_setup.shmkey;
+ conn_info->semkey = req_setup.semkey;
+ conn_info->service = req_setup.service;
+ conn_info->destroyed = 0;
+ conn_info->disconnect_requested = 0;
+ conn_info->refcount = 0;
+ conn_info->notify_flow_control_enabled = 0;
- conn_info->refcnt = 2;
- pthread_mutex_init (&conn_info->mutex, NULL);
- conn_info->state = CONN_INFO_STATE_INITIALIZING;
+ conn_info->shmid = shmget (conn_info->shmkey,
+ sizeof (struct shared_memory), 0600);
+ conn_info->mem = shmat (conn_info->shmid, NULL, 0);
+ conn_info->semid = semget (conn_info->semkey, 3, 0600);
+ conn_info->pending_semops = 0;
+ conn_info->refcount = 1;
+ openais_conn_refcount_inc (conn_info);
- return (conn_info);
-}
+ conn_info->private_data = malloc \
(ais_service[conn_info->service]->private_data_size); + memset \
(conn_info->private_data, 0, + ais_service[conn_info->service]->private_data_size);
+ ais_service[conn_info->service]->lib_init_fn (conn_info);
-static inline void conn_info_destroy (struct conn_info *conn_info)
-{
- if (conn_info->private_data) {
- free (conn_info->private_data);
- }
- pthread_mutex_destroy (&conn_info->mutex);
- free (conn_info);
-}
-static int conn_io_create (int fd)
-{
- int res;
- struct conn_io *conn_io;
+ pthread_attr_init (&conn_info->thread_attr);
+ /*
+ * IA64 needs more stack space then other arches
+ */
+ #if defined(__ia64__)
+ pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
+ #else
+ pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
+ #endif
- conn_io = malloc (sizeof (struct conn_io));
- if (conn_io == NULL) {
- return (-1);
- }
- memset (conn_io, 0, sizeof (struct conn_io));
+ pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
+ res = pthread_create (&conn_info->thread,
+ &conn_info->thread_attr,
+ pthread_ipc_consumer,
+ conn_info);
- res = queue_init (&conn_io->outq, SIZEQUEUE,
- sizeof (struct outq_item));
- if (res != 0) {
- return (-1);
+ /*
+ * Security check - disallow multiple configurations of
+ * the system
+ */
+ if (conn_info->service == SOCKET_SERVICE_INIT) {
+ conn_info->service = -1;
+ }
+ } else
+ if (revent & POLLIN) {
+ res = recv (fd, &buf, 1, MSG_NOSIGNAL);
+ if (res == 1 && buf == 1) {
+ outq_flush (conn_info);
+ }
}
+ pthread_mutex_lock (&conn_info->mutex);
+ if ((conn_info->disconnect_requested == 0) && (revent & POLLOUT)) {
+ buf = !list_empty (&conn_info->outq_head);
+ for (; conn_info->pending_semops;) {
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res == 1) {
+ conn_info->pending_semops--;
+ } else {
+ break;
+ }
+ }
+ if (conn_info->notify_flow_control_enabled) {
+ buf = 2;
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res == 1) {
+ conn_info->notify_flow_control_enabled = 0;
+ }
+ }
+ if (conn_info->notify_flow_control_enabled == 0 &&
+ conn_info->pending_semops == 0) {
- conn_io->inb = malloc (sizeof (char) * SIZEINB);
- if (conn_io->inb == NULL) {
- queue_free (&conn_io->outq);
- return (-1);
+ poll_dispatch_modify (aisexec_poll_handle,
+ conn_info->fd, POLLIN|POLLNVAL,
+ poll_handler_connection);
+ }
}
+ pthread_mutex_unlock (&conn_info->mutex);
- conn_io->fd = fd;
- conn_io->events = POLLIN|POLLNVAL;
- conn_io->refcnt = 1;
- conn_io->service = SOCKET_SERVICE_INIT;
- conn_io->state = CONN_IO_STATE_INITIALIZING;
- pthread_attr_init (&conn_io->thread_attr);
-
- pthread_mutex_init (&conn_io->mutex, NULL);
-
/*
- * IA64 needs more stack space then other arches
+ * If an error occurs, try to exit if possible
*/
-#if defined(__ia64__)
- pthread_attr_setstacksize (&conn_io->thread_attr, 400000);
-#else
- pthread_attr_setstacksize (&conn_io->thread_attr, 200000);
-#endif
+ if ((conn_info->disconnect_requested) || (revent & (POLLERR|POLLHUP))) {
+ res = conn_info_destroy (conn_info);
+ if (res == -1) {
+ return (0);
+ } else {
+ return (-1);
+ }
- pthread_attr_setdetachstate (&conn_io->thread_attr, PTHREAD_CREATE_DETACHED);
-
- res = pthread_create (&conn_io->thread, &conn_io->thread_attr,
- prioritized_poll_thread, conn_io);
-
- list_init (&conn_io->list);
-
- pthread_mutex_lock (&conn_io_list_mutex);
- list_add (&conn_io->list, &conn_io_list_head);
- pthread_mutex_unlock (&conn_io_list_mutex);
- return (res);
-}
-
-static void conn_io_destroy (struct conn_io *conn_io)
-{
- struct outq_item *outq_item;
-
- /*
- * Free the outq queued items
- */
- while (!queue_is_empty (&conn_io->outq)) {
- outq_item = queue_item_get (&conn_io->outq);
- free (outq_item->msg);
- queue_item_remove (&conn_io->outq);
}
- queue_free (&conn_io->outq);
- free (conn_io->inb);
- close (conn_io->fd);
- pthread_mutex_lock (&conn_io_list_mutex);
- list_del (&conn_io->list);
- pthread_mutex_unlock (&conn_io_list_mutex);
-
- pthread_attr_destroy (&conn_io->thread_attr);
- pthread_mutex_destroy (&conn_io->mutex);
- free (conn_io);
+ return (0);
}
-static int conn_io_found (struct conn_io *conn_io_to_match)
+static void ipc_disconnect (struct conn_info *conn_info)
{
- struct list_head *list;
- struct conn_io *conn_io;
+ pthread_mutex_lock (&conn_info->mutex);
+ conn_info->disconnect_requested = 1;
+ pthread_mutex_unlock (&conn_info->mutex);
- for (list = conn_io_list_head.next; list != &conn_io_list_head;
- list = list->next) {
-
- conn_io = list_entry (list, struct conn_io, list);
- if (conn_io == conn_io_to_match) {
- return (1);
- }
- }
-
- return (0);
+ poll_dispatch_modify (aisexec_poll_handle,
+ conn_info->fd, POLLOUT|POLLNVAL,
+ poll_handler_connection);
}
-/*
- * This thread runs in a specific thread priority mode to handle
- * I/O requests from or to the library
- */
-static void *prioritized_poll_thread (void *conn_io_in)
+static int conn_info_create (int fd)
{
- struct conn_io *conn_io = (struct conn_io *)conn_io_in;
- struct conn_info *conn_info = NULL;
- struct pollfd ufd;
- int fds;
- struct sched_param sched_param;
- int res;
+ struct conn_info *conn_info;
- sched_param.sched_priority = 99;
- res = pthread_setschedparam (conn_io->thread, SCHED_RR, &sched_param);
-
- ufd.fd = conn_io->fd;
- for (;;) {
-retry_poll:
- conn_info = conn_io->conn_info;
- conn_io_refcnt_inc (conn_io);
- conn_info_refcnt_inc (conn_info);
-
- ufd.events = conn_io->events;
- ufd.revents = 0;
- fds = poll (&ufd, 1, -1);
- if (fds == -1) {
- conn_io_refcnt_dec (conn_io);
- conn_info_refcnt_dec (conn_info);
- goto retry_poll;
- }
-
- ipc_serialize_lock_fn ();
-
- if (fds == 1 && ufd.revents) {
- if (ufd.revents & (POLLERR|POLLHUP)) {
- disconnect_request (conn_info);
- conn_info_refcnt_dec (conn_info);
- conn_io_refcnt_dec (conn_io);
- /*
- * If conn_info not set, wait for it to be set
- * else break out of for loop
- */
- if (conn_info == NULL) {
- ipc_serialize_unlock_fn ();
- continue;
- } else {
- ipc_serialize_unlock_fn ();
- break;
- }
- }
-
- if (conn_info && conn_info->state == CONN_INFO_STATE_DISCONNECT_REQUESTED) {
- conn_info_refcnt_dec (conn_info);
- conn_io_refcnt_dec (conn_io);
- ipc_serialize_unlock_fn ();
- break;
- }
-
- if (ufd.revents & POLLOUT) {
- conn_io_outq_flush (conn_io);
- }
-
- if ((ufd.revents & POLLIN) == POLLIN) {
- conn_io_deliver (conn_io);
- }
-
- /*
- * IPC initializiation failed because response fd
- * disconnected before it was linked to dispatch fd
- */
- if (conn_io->state == CONN_IO_STATE_INIT_FAILED) {
- conn_io_destroy (conn_io);
- conn_info_refcnt_dec (conn_info);
- ipc_serialize_unlock_fn ();
- pthread_exit (0);
- }
- /*
- * IPC initializiation failed because response fd
- * disconnected before it was linked to dispatch fd
- */
- if (conn_io->state == CONN_IO_STATE_INIT_FAILED) {
- break;
- }
-
-// ipc_flow_control (conn_info);
-
- }
-
- ipc_serialize_unlock_fn ();
-
- conn_io_refcnt_dec (conn_io);
- conn_info_refcnt_dec (conn_info);
+ conn_info = malloc (sizeof (struct conn_info));
+ if (conn_info == NULL) {
+ return (-1);
}
+ memset (conn_info, 0, sizeof (struct conn_info));
- ipc_serialize_lock_fn ();
+ conn_info->fd = fd;
+ conn_info->service = SOCKET_SERVICE_INIT;
+ list_init (&conn_info->outq_head);
+ list_init (&conn_info->list);
+ list_add (&conn_info->list, &conn_info_list_head);
- /*
- * IPC initializiation failed because response fd
- * disconnected before it was linked to dispatch fd
- */
- if (conn_io->conn_info == NULL || conn_io->state == CONN_IO_STATE_INIT_FAILED) {
- conn_io_destroy (conn_io);
- conn_info_refcnt_dec (conn_info);
- ipc_serialize_unlock_fn ();
- pthread_exit (0);
- }
-
- conn_info = conn_io->conn_info;
-
- /*
- * This is the response conn_io
- */
- if (conn_info->conn_io_response == conn_io) {
- for (;;) {
- if (conn_io_refcnt_value (conn_io) == 0) {
- conn_io->conn_info = NULL;
- conn_io_destroy (conn_io);
- conn_info_refcnt_dec (conn_info);
- ipc_serialize_unlock_fn ();
- pthread_exit (0);
- }
- usleep (1000);
- printf ("sleep 1\n");
- }
- } /* response conn_io */
-
- /*
- * This is the dispatch conn_io
- */
- if (conn_io->conn_info->conn_io_dispatch == conn_io) {
- ipc_serialize_unlock_fn ();
- for (;;) {
- ipc_serialize_lock_fn ();
- if (conn_io_refcnt_value (conn_io) == 0) {
- res = 0; // TODO
- /*
- * Execute the library exit function
- */
- if (conn_io->conn_info->lib_exit_fn) {
- res = conn_io->conn_info->lib_exit_fn (conn_info);
- }
- if (res == 0) {
- if (conn_io->conn_info->flow_control_enabled == 1) {
-// openais_flow_control_disable (
-// conn_info->flow_control_handle);
- }
- conn_io->conn_info = NULL;
- conn_io_destroy (conn_io);
- conn_info_refcnt_dec (conn_info);
- ipc_serialize_unlock_fn ();
- pthread_exit (0);
- }
- } /* refcnt == 0 */
- ipc_serialize_unlock_fn ();
- usleep (1000);
- } /* for (;;) */
- } /* dispatch conn_io */
-
- /*
- * This code never reached
- */
+ poll_dispatch_add (aisexec_poll_handle, fd, POLLIN|POLLNVAL,
+ conn_info, poll_handler_connection);
return (0);
}
@@ -752,360 +420,6 @@
char *socketname = "/var/run/libais.socket";
#endif
-
-#ifdef COMPILOE_OUT
-static void ipc_flow_control (struct conn_info *conn_info)
-{
- unsigned int entries_used;
- unsigned int entries_usedhw;
- unsigned int flow_control_local_count;
- unsigned int fcc;
-
- /*
- * Determine FCC variable and printing variables
- */
- entries_used = queue_used (&conn_info->outq);
- if (conn_info->conn_info_partner &&
- queue_used (&conn_info->conn_info_partner->outq) > entries_used) {
- entries_used = queue_used (&conn_info->conn_info_partner->outq);
- }
- entries_usedhw = queue_usedhw (&conn_info->outq);
- if (conn_info->conn_info_partner &&
- queue_usedhw (&conn_info->conn_info_partner->outq) > entries_used) {
- entries_usedhw = queue_usedhw (&conn_info->conn_info_partner->outq);
- }
- flow_control_local_count = conn_info->flow_control_local_count;
- if (conn_info->conn_info_partner &&
- conn_info->conn_info_partner->flow_control_local_count > flow_control_local_count) \
{
- flow_control_local_count = conn_info->conn_info_partner->flow_control_local_count;
- }
-
- fcc = entries_used;
- if (flow_control_local_count > fcc) {
- fcc = flow_control_local_count;
- }
- /*
- * IPC group-wide flow control
- */
- if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
- if (conn_info->flow_control_enabled == 0 &&
- ((fcc + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
-
- log_printf (LOG_LEVEL_NOTICE, "Enabling flow control [%d/%d] - [%d].\n",
- entries_usedhw, SIZEQUEUE,
- flow_control_local_count);
- openais_flow_control_enable (conn_info->flow_control_handle);
- conn_info->flow_control_enabled = 1;
- conn_info->conn_info_partner->flow_control_enabled = 1;
- }
- if (conn_info->flow_control_enabled == 1 &&
-
- fcc <= FLOW_CONTROL_ENTRIES_DISABLE) {
-
- log_printf (LOG_LEVEL_NOTICE, "Disabling flow control [%d/%d] - [%d].\n",
- entries_usedhw, SIZEQUEUE,
- flow_control_local_count);
- openais_flow_control_disable (conn_info->flow_control_handle);
- conn_info->flow_control_enabled = 0;
- conn_info->conn_info_partner->flow_control_enabled = 0;
- }
- }
-}
-#endif
-
-static int conn_io_outq_flush (struct conn_io *conn_io) {
- struct queue *outq;
- int res = 0;
- struct outq_item *queue_item;
- struct msghdr msg_send;
- struct iovec iov_send;
- char *msg_addr;
-
- outq = &conn_io->outq;
-
- msg_send.msg_iov = &iov_send;
- msg_send.msg_name = 0;
- msg_send.msg_namelen = 0;
- msg_send.msg_iovlen = 1;
-#ifndef OPENAIS_SOLARIS
- msg_send.msg_control = 0;
- msg_send.msg_controllen = 0;
- msg_send.msg_flags = 0;
-#else
- msg_send.msg_accrights = 0;
- msg_send.msg_accrightslen = 0;
-#endif
-
- pthread_mutex_lock (&conn_io->mutex);
- while (!queue_is_empty (outq)) {
- queue_item = queue_item_get (outq);
- msg_addr = (char *)queue_item->msg;
- msg_addr = &msg_addr[conn_io->byte_start];
-
- iov_send.iov_base = msg_addr;
- iov_send.iov_len = queue_item->mlen - conn_io->byte_start;
-
-retry_sendmsg:
- res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL);
- if (res == -1 && errno == EINTR) {
- goto retry_sendmsg;
- }
- if (res == -1 && errno == EAGAIN) {
- pthread_mutex_unlock (&conn_io->mutex);
- return (0);
- }
- if (res == -1 && errno == EPIPE) {
- disconnect_request (conn_io->conn_info);
- pthread_mutex_unlock (&conn_io->mutex);
- return (0);
- }
- if (res == -1) {
- assert (0); /* some other unhandled error here */
- }
- if (res + conn_io->byte_start != queue_item->mlen) {
- conn_io->byte_start += res;
-
- pthread_mutex_unlock (&conn_io->mutex);
- return (0);
- }
-
- /*
- * Message sent, try sending another message
- */
- queue_item_remove (outq);
- conn_io->byte_start = 0;
- free (queue_item->msg);
- } /* while queue not empty */
-
- if (queue_is_empty (outq)) {
- conn_io->events = POLLIN|POLLNVAL;
- }
-
- pthread_mutex_unlock (&conn_io->mutex);
- return (0);
-}
-
-
-
-struct res_overlay {
- mar_res_header_t header __attribute((aligned(8)));
- char buf[4096];
-};
-
-static void conn_io_deliver (struct conn_io *conn_io)
-{
- int res;
- mar_req_header_t *header;
- int service;
- struct msghdr msg_recv;
- struct iovec iov_recv;
-#ifdef OPENAIS_LINUX
- struct cmsghdr *cmsg;
- char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
- struct ucred *cred;
- int on = 0;
-#endif
- int send_ok = 0;
- int send_ok_joined = 0;
- struct iovec send_ok_joined_iovec;
- struct res_overlay res_overlay;
-
- msg_recv.msg_iov = &iov_recv;
- msg_recv.msg_iovlen = 1;
- msg_recv.msg_name = 0;
- msg_recv.msg_namelen = 0;
-#ifndef OPENAIS_SOLARIS
- msg_recv.msg_flags = 0;
-
- if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) {
- msg_recv.msg_control = 0;
- msg_recv.msg_controllen = 0;
- } else {
-#ifdef OPENAIS_LINUX
- msg_recv.msg_control = (void *)cmsg_cred;
- msg_recv.msg_controllen = sizeof (cmsg_cred);
-#else
- {
- uid_t euid;
- gid_t egid;
-
- euid = -1; egid = -1;
- if (getpeereid(conn_io->fd, &euid, &egid) != -1 &&
- (euid == 0 || egid == g_gid_valid)) {
- conn_io->state = CONN_IO_STATE_AUTHENTICATED;
- }
- if (conn_io->state == CONN_IO_STATE_INITIALIZING) {
- log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, \
expecting %d\n", egid, g_gid_valid);
- conn_io->state = CONN_IO_STATE_INIT_FAILED;
- return;
- }
- }
-#endif
- }
-
-#else /* OPENAIS_SOLARIS */
- msg_recv.msg_accrights = 0;
- msg_recv.msg_accrightslen = 0;
-
-
- if (! conn_info->authenticated) {
-#ifdef HAVE_GETPEERUCRED
- ucred_t *uc;
- uid_t euid = -1;
- gid_t egid = -1;
-
- if (getpeerucred (conn_info->fd, &uc) == 0) {
- euid = ucred_geteuid (uc);
- egid = ucred_getegid (uc);
- if ((euid == 0) || (egid == g_gid_valid)) {
- conn_info->authenticated = 1;
- }
- ucred_free(uc);
- }
- if (conn_info->authenticated == 0) {
- log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, \
expecting %d\n", (int)egid, g_gid_valid);
- }
- #else
- log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated "
- "because platform does not support "
- "authentication with sockets, continuing "
- "with a fake authentication\n");
- conn_info->authenticated = 1;
- #endif
- }
- #endif
- iov_recv.iov_base = &conn_io->inb[conn_io->inb_start];
- iov_recv.iov_len = (SIZEINB) - conn_io->inb_start;
- if (conn_io->inb_inuse == SIZEINB) {
- return;
- }
-
-retry_recv:
- res = recvmsg (conn_io->fd, &msg_recv, MSG_NOSIGNAL);
- if (res == -1 && errno == EINTR) {
- goto retry_recv;
- } else
- if (res == -1 && errno != EAGAIN) {
- return;
- } else
- if (res == 0) {
-#if defined(OPENAIS_SOLARIS) || defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
- /* On many OS poll never return POLLHUP or POLLERR.
- * EOF is detected when recvmsg return 0.
- */
- disconnect_request (conn_io);
-#endif
- return;
- }
-
- /*
- * Authenticate if this connection has not been authenticated
- */
-#ifdef OPENAIS_LINUX
- if (conn_io->state == CONN_IO_STATE_INITIALIZING) {
- cmsg = CMSG_FIRSTHDR (&msg_recv);
- assert (cmsg);
- cred = (struct ucred *)CMSG_DATA (cmsg);
- if (cred) {
- if (cred->uid == 0 || cred->gid == g_gid_valid) {
- setsockopt(conn_io->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
- conn_io->state = CONN_IO_STATE_AUTHENTICATED;
- }
- }
- if (conn_io->state == CONN_IO_STATE_INITIALIZING) {
- log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, \
expecting %d\n", cred->gid, g_gid_valid);
- conn_io->state = CONN_IO_STATE_INIT_FAILED;
- return;
- }
- }
-#endif
- /*
- * Dispatch all messages received in recvmsg that can be dispatched
- * sizeof (mar_req_header_t) needed at minimum to do any processing
- */
- conn_io->inb_inuse += res;
- conn_io->inb_start += res;
-
- while (conn_io->inb_inuse >= sizeof (mar_req_header_t) && res != -1) {
- header = (mar_req_header_t *)&conn_io->inb[conn_io->inb_start - \
conn_io->inb_inuse];
-
- if (header->size > conn_io->inb_inuse) {
- break;
- }
- service = conn_io->service;
-
- /*
- * If this service is in init phase, initialize service
- * else handle message using service service
- */
- if (conn_io->service == SOCKET_SERVICE_INIT) {
- res = ais_init_service[header->id] (conn_io, header);
- } else {
- /*
- * Not an init service, but a standard service
- */
- if (header->id < 0 || header->id > ais_service[service]->lib_service_count) {
- log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n",
- header->id, ais_service[service]->lib_service_count);
- return ;
- }
-
- /*
- * If flow control is required of the library handle, determine that
- * openais is not in synchronization and that totempg has room available
- * to queue a message, otherwise tell the library we are busy and to
- * try again later
- */
- send_ok_joined_iovec.iov_base = (char *)header;
- send_ok_joined_iovec.iov_len = header->size;
- send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
- &send_ok_joined_iovec, 1);
-
- send_ok =
- (sync_primary_designated() == 1) && (
- (ais_service[service]->lib_service[header->id].flow_control == \
OPENAIS_FLOW_CONTROL_NOT_REQUIRED) ||
- ((ais_service[service]->lib_service[header->id].flow_control == \
OPENAIS_FLOW_CONTROL_REQUIRED) &&
- (send_ok_joined) &&
- (sync_in_process() == 0)));
-
- if (send_ok) {
- ais_service[service]->lib_service[header->id].lib_handler_fn(conn_io->conn_info, \
header);
- } else {
-
- /*
- * Overload, tell library to retry
- */
- res_overlay.header.size =
- ais_service[service]->lib_service[header->id].response_size;
- res_overlay.header.id =
- ais_service[service]->lib_service[header->id].response_id;
- res_overlay.header.error = SA_AIS_ERR_TRY_AGAIN;
- conn_io_send (
- conn_io,
- &res_overlay,
- res_overlay.header.size);
- }
- }
- conn_io->inb_inuse -= header->size;
- } /* while */
-
- if (conn_io->inb_inuse == 0) {
- conn_io->inb_start = 0;
- } else
-// BUG if (connections[conn_io->fd].inb_start + connections[conn_io->fd].inb_inuse \
>= SIZEINB) {
- if (conn_io->inb_start >= SIZEINB) {
- /*
- * If in buffer is full, move it back to start
- */
- memmove (conn_io->inb,
- &conn_io->inb[conn_io->inb_start - conn_io->inb_inuse],
- sizeof (char) * conn_io->inb_inuse);
- conn_io->inb_start = conn_io->inb_inuse;
- }
-
- return;
-}
-
static int poll_handler_accept (
poll_handle handle,
int fd,
@@ -1154,7 +468,7 @@
log_printf (LOG_LEVEL_DEBUG, "connection received from libais client %d.\n", \
new_fd);
- res = conn_io_create (new_fd);
+ res = conn_info_create (new_fd);
if (res != 0) {
close (new_fd);
}
@@ -1186,28 +500,12 @@
source->conn = conn;
}
-static void ipc_confchg_fn (
- enum totem_configuration_type configuration_type,
- unsigned int *member_list, int member_list_entries,
- unsigned int *left_list, int left_list_entries,
- unsigned int *joined_list, int joined_list_entries,
- struct memb_ring_id *ring_id)
+void openais_ipc_init (unsigned int gid_valid)
{
-}
-
-void openais_ipc_init (
- void (*serialize_lock_fn) (void),
- void (*serialize_unlock_fn) (void),
- unsigned int gid_valid)
-{
int libais_server_fd;
struct sockaddr_un un_addr;
int res;
- ipc_serialize_lock_fn = serialize_lock_fn;
-
- ipc_serialize_unlock_fn = serialize_unlock_fn;
-
/*
* Create socket for libais clients, name socket, listen for connections
*/
@@ -1249,21 +547,30 @@
* Setup libais connection dispatch routine
*/
poll_dispatch_add (aisexec_poll_handle, libais_server_fd,
- POLLIN, 0, poll_handler_accept);
+ POLLIN|POLLNVAL, 0, poll_handler_accept);
g_gid_valid = gid_valid;
-
- /*
- * Reset internal state of flow control when
- * configuration change occurs
- */
- res = totempg_groups_initialize (
- &ipc_handle,
- NULL,
- ipc_confchg_fn);
}
+void openais_ipc_exit (void)
+{
+ struct list_head *list;
+ struct conn_info *conn_info;
+ for (list = conn_info_list_head.next; list != &conn_info_list_head;
+ list = list->next) {
+
+ conn_info = list_entry (list, struct conn_info, list);
+
+ shmdt (conn_info->mem);
+ shmctl (conn_info->shmid, IPC_RMID, NULL);
+ semctl (conn_info->semid, 0, IPC_RMID);
+ conn_info->destroyed = 1;
+
+ pthread_kill (conn_info->thread, SIGUSR1);
+ }
+}
+
/*
* Get the conn info private data
*/
@@ -1274,215 +581,259 @@
return (conn_info->private_data);
}
-static int conn_io_send (
- struct conn_io *conn_io,
- void *msg,
- int mlen)
+int openais_response_send (void *conn, void *msg, int mlen)
{
- char *cmsg;
- int res = 0;
- int queue_empty;
- struct outq_item *queue_item;
- struct outq_item queue_item_out;
- struct msghdr msg_send;
- struct iovec iov_send;
- char *msg_addr;
+ struct conn_info *conn_info = (struct conn_info *)conn;
+ struct sembuf sop;
+ int res;
- if (conn_io == NULL) {
- assert (0);
+ memcpy (conn_info->mem->res_buffer, msg, mlen);
+ sop.sem_num = 1;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
+ } else
+ if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
+ return (0);
}
+ return (0);
+}
-// ipc_flow_control (conn_info);
+int openais_response_iov_send (void *conn, struct iovec *iov, int iov_len)
+{
+ struct conn_info *conn_info = (struct conn_info *)conn;
+ struct sembuf sop;
+ int res;
+ int write_idx = 0;
+ int i;
- msg_send.msg_iov = &iov_send;
- msg_send.msg_name = 0;
- msg_send.msg_namelen = 0;
- msg_send.msg_iovlen = 1;
-#ifndef OPENAIS_SOLARIS
- msg_send.msg_control = 0;
- msg_send.msg_controllen = 0;
- msg_send.msg_flags = 0;
-#else
- msg_send.msg_accrights = 0;
- msg_send.msg_accrightslen = 0;
-#endif
-
- pthread_mutex_lock (&conn_io->mutex);
- if (queue_is_full (&conn_io->outq)) {
- /*
- * Start a disconnect if we have not already started one
- * and report that the outgoing queue is full
- */
- log_printf (LOG_LEVEL_ERROR, "Library queue is full, disconnecting library \
connection.\n");
- disconnect_request (conn_io->conn_info);
- pthread_mutex_unlock (&conn_io->mutex);
- return (-1);
+ for (i = 0; i < iov_len; i++) {
+ memcpy (&conn_info->mem->res_buffer[write_idx], iov[i].iov_base, iov[i].iov_len);
+ write_idx += iov[i].iov_len;
}
- while (!queue_is_empty (&conn_io->outq)) {
- queue_item = queue_item_get (&conn_io->outq);
- msg_addr = (char *)queue_item->msg;
- msg_addr = &msg_addr[conn_io->byte_start];
- iov_send.iov_base = msg_addr;
- iov_send.iov_len = queue_item->mlen - conn_io->byte_start;
+ sop.sem_num = 1;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
-retry_sendmsg:
- res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL);
- if (res == -1 && errno == EINTR) {
- goto retry_sendmsg;
- }
- if (res == -1 && errno == EAGAIN) {
- break; /* outgoing kernel queue full */
- }
- if (res == -1 && errno == EPIPE) {
- disconnect_request (conn_io->conn_info);
- pthread_mutex_unlock (&conn_io->mutex);
- return (0);
- }
- if (res == -1) {
-// assert (0);
- break; /* some other error, stop trying to send message */
- }
- if (res + conn_io->byte_start != queue_item->mlen) {
- conn_io->byte_start += res;
- break;
- }
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
+ } else
+ if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
+ return (0);
+ }
+ return (0);
+}
- /*
- * Message sent, try sending another message
- */
- queue_item_remove (&conn_io->outq);
- conn_io->byte_start = 0;
- free (queue_item->msg);
- } /* while queue not empty */
+static int shared_mem_dispatch_bytes_left (struct conn_info *conn_info)
+{
+ unsigned int read;
+ unsigned int write;
+ unsigned int bytes_left;
- res = -1;
+ read = conn_info->mem->read;
+ write = conn_info->mem->write;
- queue_empty = queue_is_empty (&conn_io->outq);
- /*
- * Send request message
- */
- if (queue_empty) {
-
- iov_send.iov_base = msg;
- iov_send.iov_len = mlen;
-retry_sendmsg_two:
- res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL);
- if (res == -1 && errno == EINTR) {
- goto retry_sendmsg_two;
- }
- if (res == -1 && errno == EAGAIN) {
- conn_io->byte_start = 0;
- }
- if (res != -1) {
- if (res != mlen) {
- conn_io->byte_start += res;
- res = -1;
- } else {
- conn_io->byte_start = 0;
- }
- }
+ if (read <= write) {
+ bytes_left = DISPATCH_SIZE - write + read;
+ } else {
+ bytes_left = read - write;
}
+ return (bytes_left);
+}
- /*
- * If res == -1 , errrno == EAGAIN which means kernel queue full
- */
- if (res == -1) {
- cmsg = malloc (mlen);
- if (cmsg == 0) {
- log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, \
disconnecting library connection.\n");
- disconnect_request (conn_io->conn_info);
- pthread_mutex_unlock (&conn_io->mutex);
- return (-1);
- }
- queue_item_out.msg = cmsg;
- queue_item_out.mlen = mlen;
- memcpy (cmsg, msg, mlen);
- queue_item_add (&conn_io->outq, &queue_item_out);
+int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len)
+{
+ char *dest_char = (char *)conn_info->mem->dispatch_buffer;
+ char *src_char = (char *)msg;
+ unsigned int first_write;
+ unsigned int second_write;
- /*
- * Send a pthread_kill to interrupt the blocked poll syscall
- * and start a new poll operation in the thread if
- * POLLOUT is not already set
- */
- if (conn_io->events != (POLLIN|POLLOUT|POLLNVAL)) {
- conn_io->events = POLLIN|POLLOUT|POLLNVAL;
- pthread_kill (conn_io->thread, SIGUSR1);
- }
+ first_write = len;
+ second_write = 0;
+ if (len + conn_info->mem->write >= DISPATCH_SIZE) {
+ first_write = DISPATCH_SIZE - conn_info->mem->write;
+ second_write = len - first_write;
}
- pthread_mutex_unlock (&conn_io->mutex);
+ memcpy (&dest_char[conn_info->mem->write], src_char, first_write);
+ if (second_write) {
+ memcpy (dest_char, &src_char[first_write], second_write);
+ }
+ conn_info->mem->write = (conn_info->mem->write + len) % DISPATCH_SIZE;
return (0);
}
-void openais_ipc_flow_control_create (
- void *conn,
- unsigned int service,
- char *id,
- int id_len,
- void (*flow_control_state_set_fn) (void *conn, enum openais_flow_control_state),
- void *context)
+void msg_send (void *conn, struct iovec *iov, int iov_len, int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+ struct sembuf sop;
+ int res;
+ int i;
+ char buf;
- openais_flow_control_create (
- conn_info->flow_control_handle,
- service,
- id,
- id_len,
- flow_control_state_set_fn,
- context);
+ for (i = 0; i < iov_len; i++) {
+ memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
+ }
+
+ buf = !list_empty (&conn_info->outq_head);
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res == -1 && errno == EAGAIN) {
+ if (locked == 0) {
+ pthread_mutex_lock (&conn_info->mutex);
+ }
+ conn_info->pending_semops += 1;
+ if (locked == 0) {
+ pthread_mutex_unlock (&conn_info->mutex);
+ }
+ poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
+ POLLIN|POLLOUT|POLLNVAL, poll_handler_connection);
+ } else
+ if (res == -1) {
+ ipc_disconnect (conn_info);
+ }
+ sop.sem_num = 2;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
+ } else
+ if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
+ return;
+ }
}
-void openais_ipc_flow_control_destroy (
- void *conn,
- unsigned int service,
- unsigned char *id,
- int id_len)
-{
- struct conn_info *conn_info = (struct conn_info *)conn;
+static void outq_flush (struct conn_info *conn_info) {
+ struct list_head *list, *list_next;
+ struct outq_item *outq_item;
+ unsigned int bytes_left;
+ struct iovec iov;
+ char buf;
+ int res;
- openais_flow_control_destroy (
- conn_info->flow_control_handle,
- service,
- id,
- id_len);
+ pthread_mutex_lock (&conn_info->mutex);
+ if (list_empty (&conn_info->outq_head)) {
+ buf = 3;
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ pthread_mutex_unlock (&conn_info->mutex);
+ return;
+ }
+ for (list = conn_info->outq_head.next;
+ list != &conn_info->outq_head; list = list_next) {
+
+ list_next = list->next;
+ outq_item = list_entry (list, struct outq_item, list);
+ bytes_left = shared_mem_dispatch_bytes_left (conn_info);
+ if (bytes_left > outq_item->mlen) {
+ iov.iov_base = outq_item->msg;
+ iov.iov_len = outq_item->mlen;
+ msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
+ list_del (list);
+ free (iov.iov_base);
+ free (outq_item);
+ } else {
+ break;
+ }
+ }
+ pthread_mutex_unlock (&conn_info->mutex);
}
-void openais_ipc_flow_control_local_increment (
- void *conn)
+static void msg_send_or_queue (void *conn, struct iovec *iov, int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+ unsigned int bytes_left;
+ unsigned int bytes_msg = 0;
+ int i;
+ struct outq_item *outq_item;
+ char *write_buf = 0;
- pthread_mutex_lock (&conn_info->flow_control_mutex);
+ /*
+ * Exit transmission if the connection is dead
+ */
+ pthread_mutex_lock (&conn_info->mutex);
+ if (conn_info->destroyed || conn_info->disconnect_requested) {
+ pthread_mutex_unlock (&conn_info->mutex);
+ return;
+ }
+ pthread_mutex_unlock (&conn_info->mutex);
- conn_info->flow_control_local_count++;
+ bytes_left = shared_mem_dispatch_bytes_left (conn_info);
+ for (i = 0; i < iov_len; i++) {
+ bytes_msg += iov[i].iov_len;
+ }
+ if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+ outq_item = malloc (sizeof (struct outq_item));
+ if (outq_item == NULL) {
+ ipc_disconnect (conn);
+ return;
+ }
+ outq_item->msg = malloc (bytes_msg);
+ if (outq_item->msg == 0) {
+ free (outq_item);
+ ipc_disconnect (conn);
+ return;
+ }
- pthread_mutex_unlock (&conn_info->flow_control_mutex);
+ write_buf = outq_item->msg;
+ for (i = 0; i < iov_len; i++) {
+ memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
+ write_buf += iov[i].iov_len;
+ }
+ outq_item->mlen = bytes_msg;
+ list_init (&outq_item->list);
+ pthread_mutex_lock (&conn_info->mutex);
+ if (list_empty (&conn_info->outq_head)) {
+ conn_info->notify_flow_control_enabled = 1;
+ poll_dispatch_modify (aisexec_poll_handle,
+ conn_info->fd, POLLOUT|POLLIN|POLLNVAL,
+ poll_handler_connection);
+ }
+ list_add_tail (&outq_item->list, &conn_info->outq_head);
+ pthread_mutex_unlock (&conn_info->mutex);
+ return;
+ }
+ msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
}
-void openais_ipc_flow_control_local_decrement (
- void *conn)
+void openais_conn_refcount_inc (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
- pthread_mutex_lock (&conn_info->flow_control_mutex);
-
- conn_info->flow_control_local_count--;
-
- pthread_mutex_unlock (&conn_info->flow_control_mutex);
+ pthread_mutex_lock (&conn_info->mutex);
+ conn_info->refcount++;
+ pthread_mutex_unlock (&conn_info->mutex);
}
-
-int openais_response_send (void *conn, void *msg, int mlen)
+void openais_conn_refcount_dec (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-
- return (conn_io_send (conn_info->conn_io_response, msg, mlen));
+
+ pthread_mutex_lock (&conn_info->mutex);
+ conn_info->refcount--;
+ pthread_mutex_unlock (&conn_info->mutex);
}
int openais_dispatch_send (void *conn, void *msg, int mlen)
{
- struct conn_info *conn_info = (struct conn_info *)conn;
-
- return (conn_io_send (conn_info->conn_io_dispatch, msg, mlen));
+ struct iovec iov;
+
+ iov.iov_base = msg;
+ iov.iov_len = mlen;
+
+ msg_send_or_queue (conn, &iov, 1);
+ return (0);
}
+
+int openais_dispatch_iov_send (void *conn, struct iovec *iov, int iov_len)
+{
+ msg_send_or_queue (conn, iov, iov_len);
+ return (0);
+}
Index: exec/totempg.c
===================================================================
--- exec/totempg.c (revision 1687)
+++ exec/totempg.c (working copy)
@@ -1051,6 +1051,7 @@
struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
int i;
unsigned int res;
+ int totalsz = 0;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
@@ -1075,6 +1076,9 @@
iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
}
+ for (i = 0; i < iov_len + instance->groups_cnt + 1; i++) {
+ totalsz += iovec_mcast[i].iov_len;
+ }
res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
hdb_handle_put (&totempg_groups_instance_database, handle);
@@ -1089,6 +1093,8 @@
int iov_len)
{
struct totempg_group_instance *instance;
+ unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
+ struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
unsigned int size = 0;
unsigned int i;
unsigned int res;
@@ -1101,13 +1107,25 @@
goto error_exit;
}
+ group_len[0] = instance->groups_cnt;
for (i = 0; i < instance->groups_cnt; i++) {
- size += instance->groups[i].group_len;
+ group_len[i + 1] = instance->groups[i].group_len;
+ iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
+ iovec_mcast[i + 1].iov_base = instance->groups[i].group;
}
+ iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
+ iovec_mcast[0].iov_base = group_len;
for (i = 0; i < iov_len; i++) {
- size += iovec[i].iov_len;
+ iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
+ iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
}
+ size = 0;
+ for (i = 0; i < iov_len + instance->groups_cnt + 1; i++) {
+ size += iovec_mcast[i].iov_len;
+ }
+ size += 2000; /* Largest possible header size of a message */
+
res = send_ok (size);
hdb_handle_put (&totempg_groups_instance_database, handle);
Index: exec/ipc.h
===================================================================
--- exec/ipc.h (revision 1687)
+++ exec/ipc.h (working copy)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2007 Red Hat, Inc.
+ * Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
@@ -36,27 +36,29 @@
#define IPC_H_DEFINED
#include "tlist.h"
-#include "flow.h"
extern void message_source_set (mar_message_source_t *source, void *conn);
extern int message_source_is_local (mar_message_source_t *source);
+extern void openais_ipc_init (unsigned int gid_valid);
+
extern void *openais_conn_private_data_get (void *conn);
extern int openais_response_send (void *conn, void *msg, int mlen);
+extern int openais_response_iov_send (void *conn, struct iovec *iov, int iov_len);
+
extern int openais_dispatch_send (void *conn, void *msg, int mlen);
-extern void openais_conn_info_refcnt_dec (void *conn);
+extern int openais_dispatch_iov_send (void *conn, struct iovec *iov, int iov_len);
+extern void openais_conn_refcount_inc (void *conn);
-extern void openais_conn_info_refcnt_inc (void *conn);
+extern void openais_conn_refcount_dec (void *conn);
-extern void openais_ipc_init (
- void (*serialize_lock_fn) (void),
- void (*serialize_unlock_fn) (void),
- unsigned int gid_valid);
+extern void openais_ipc_exit (void);
+#ifdef COMPILE_OUT
extern int openais_ipc_timer_add (
void *conn,
void (*timer_fn) (void *data),
@@ -72,24 +74,6 @@
void *conn,
timer_handle timer_handle);
-extern void openais_ipc_flow_control_create (
- void *conn,
- unsigned int service,
- char *id,
- int id_len,
- void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state \
flow_control_state_set),
- void *context);
-
-extern void openais_ipc_flow_control_destroy (
- void *conn,
- unsigned int service,
- unsigned char *id,
- int id_len);
+#endif
-extern void openais_ipc_flow_control_local_increment (
- void *conn);
-
-extern void openais_ipc_flow_control_local_decrement (
- void *conn);
-
#endif /* IPC_H_DEFINED */
Index: exec/ckpt.c
===================================================================
--- exec/ckpt.c (revision 1687)
+++ exec/ckpt.c (working copy)
@@ -1322,6 +1322,7 @@
checkpoint_section->section_descriptor.section_size = 0;
checkpoint_section->section_data = NULL;
checkpoint_section->expiration_timer = 0;
+
}
} else {
/*
@@ -1502,6 +1503,9 @@
checkpoint = list_entry (list,
struct checkpoint, expiry_list);
+ log_printf (LOG_LEVEL_DEBUG,
+ "refcnt checkpoint %s %d\n",
+ get_mar_name_t (&checkpoint->name), checkpoint->reference_count);
if (checkpoint->reference_count == 0) {
req_exec_ckpt_checkpointunlink.header.size =
sizeof (struct req_exec_ckpt_checkpointunlink);
@@ -1538,6 +1542,7 @@
void timer_function_retention (void *data)
{
struct checkpoint *checkpoint = (struct checkpoint *)data;
+
checkpoint->retention_timer = 0;
list_add (&checkpoint->expiry_list, &my_checkpoint_expiry_list_head);
@@ -2107,6 +2112,7 @@
if (checkpoint->active_replica_set == 0) {
log_printf (LOG_LEVEL_DEBUG, "checkpointwrite: no active replica, returning \
error.\n"); +printf ("b\n");
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
@@ -2299,6 +2305,8 @@
struct checkpoint_section *checkpoint_section = 0;
int section_size = 0;
SaAisErrorT error = SA_AIS_OK;
+ int iov_len;
+ struct iovec iov[2];
res_lib_ckpt_sectionread.data_read = 0;
@@ -2367,6 +2375,7 @@
*/
error_exit:
if (message_source_is_local(&req_exec_ckpt_sectionread->source)) {
+
res_lib_ckpt_sectionread.header.size = sizeof (struct res_lib_ckpt_sectionread) + \
section_size; res_lib_ckpt_sectionread.header.id = \
MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD; res_lib_ckpt_sectionread.header.error = \
error; @@ -2375,22 +2384,23 @@
res_lib_ckpt_sectionread.data_read = section_size;
}
- openais_response_send (
- req_exec_ckpt_sectionread->source.conn,
- &res_lib_ckpt_sectionread,
- sizeof (struct res_lib_ckpt_sectionread));
+ iov[0].iov_base = &res_lib_ckpt_sectionread;
+ iov[0].iov_len = sizeof (struct res_lib_ckpt_sectionread);
+ iov_len = 1;
- /*
- * Write checkpoint to CKPT library section if section has data
- */
if (error == SA_AIS_OK) {
char *sd;
+
sd = (char *)checkpoint_section->section_data;
- openais_response_send (
- req_exec_ckpt_sectionread->source.conn,
- &sd[req_exec_ckpt_sectionread->data_offset],
- section_size);
+ iov[1].iov_base = &sd[req_exec_ckpt_sectionread->data_offset],
+ iov[1].iov_len = section_size;
+ iov_len = 2;
}
+
+ openais_response_iov_send (
+ req_exec_ckpt_sectionread->source.conn,
+ iov,
+ iov_len);
}
}
@@ -3210,6 +3220,8 @@
struct res_lib_ckpt_sectioniterationnext res_lib_ckpt_sectioniterationnext;
SaAisErrorT error = SA_AIS_OK;
int section_id_size = 0;
+ int iov_len;
+ struct iovec iov[2];
unsigned int res;
struct iteration_instance *iteration_instance = NULL;
void *iteration_instance_p;
@@ -3284,17 +3296,18 @@
res_lib_ckpt_sectioniterationnext.header.id = \
MESSAGE_RES_CKPT_SECTIONITERATIONNEXT; \
res_lib_ckpt_sectioniterationnext.header.error = error;
- openais_response_send (
+ iov[0].iov_base = &res_lib_ckpt_sectioniterationnext;
+ iov[0].iov_len = sizeof (struct res_lib_ckpt_sectioniterationnext);
+ iov_len = 1;
+ if (error == SA_AIS_OK ) {
+ iov[1].iov_base = checkpoint_section->section_descriptor.section_id.id,
+ iov[1].iov_len = checkpoint_section->section_descriptor.section_id.id_len;
+ iov_len = 2;
+ }
+ openais_response_iov_send (
conn,
- &res_lib_ckpt_sectioniterationnext,
- sizeof (struct res_lib_ckpt_sectioniterationnext));
-
- if (error == SA_AIS_OK) {
- openais_response_send (
- conn,
- checkpoint_section->section_descriptor.section_id.id,
- checkpoint_section->section_descriptor.section_id.id_len);
- }
+ iov,
+ iov_len);
}
/*
Index: exec/main.c
===================================================================
--- exec/main.c (revision 1687)
+++ exec/main.c (working copy)
@@ -75,7 +75,6 @@
#include "timer.h"
#include "print.h"
#include "util.h"
-#include "flow.h"
#include "version.h"
#define SERVER_BACKLOG 5
@@ -146,7 +145,9 @@
}
#endif
+ poll_stop (0);
totempg_finalize ();
+ openais_ipc_exit ();
openais_exit_error (AIS_DONE_EXIT);
}
@@ -578,8 +579,6 @@
totem_config.vsf_type);
- res = openais_flow_control_initialize ();
-
/*
* Drop root privleges to user 'ais'
* TODO: Don't really need full root capabilities;
@@ -592,10 +591,7 @@
aisexec_mempool_init ();
- openais_ipc_init (
- serialize_mutex_lock,
- serialize_mutex_unlock,
- gid_valid);
+ openais_ipc_init (gid_valid);
/*
* Start main processing loop
Index: lib/evt.c
===================================================================
--- lib/evt.c (revision 1687)
+++ lib/evt.c (working copy)
@@ -105,7 +105,7 @@
* data required to support events for a given initialization
*
* ei_dispatch_fd: fd used for getting callback data e.g. async event data
- * ei_response_fd: fd used for everything else (i.e. evt sync api commands).
+ * ipc_ctx: fd used for everything else (i.e. evt sync api commands).
* ei_callback: callback function.
* ei_version: version sent to the evtInitialize call.
* ei_node_id: our node id.
@@ -123,8 +123,7 @@
*
*/
struct event_instance {
- int ei_dispatch_fd;
- int ei_response_fd;
+ void *ipc_ctx;
SaEvtCallbacksT ei_callback;
SaVersionT ei_version;
SaClmNodeIdT ei_node_id;
@@ -313,7 +312,7 @@
mar_res_header_t hdr;
void *data;
- error = saRecvRetry(fd, &hdr, sizeof(hdr));
+//TODO error = saRecvRetry(fd, &hdr, sizeof(hdr));
if (error != SA_AIS_OK) {
goto msg_out;
}
@@ -325,7 +324,7 @@
data = (void *)((unsigned long)*msg) + sizeof(hdr);
memcpy(*msg, &hdr, sizeof(hdr));
if (hdr.size > sizeof(hdr)) {
- error = saRecvRetry(fd, data, hdr.size - sizeof(hdr));
+//TODO error = saRecvRetry(fd, data, hdr.size - sizeof(hdr));
if (error != SA_AIS_OK) {
goto msg_out;
}
@@ -393,8 +392,7 @@
/*
* Set up communication with the event server
*/
- error = saServiceConnect(&evti->ei_response_fd,
- &evti->ei_dispatch_fd, EVT_SERVICE);
+ error = openais_service_connect (EVT_SERVICE, &evti->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_handle_put;
}
@@ -450,7 +448,7 @@
return error;
}
- *selectionObject = evti->ei_dispatch_fd;
+ *selectionObject = openais_fd_get (evti->ipc_ctx);
saHandleInstancePut(&evt_instance_handle_db, evtHandle);
@@ -598,7 +596,6 @@
SaEvtHandleT evtHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int dispatch_avail;
@@ -607,9 +604,9 @@
SaEvtCallbacksT callbacks;
int ignore_dispatch = 0;
int cont = 1; /* always continue do loop except when set to 0 */
- int poll_fd;
struct lib_event_data *evt = 0;
struct res_evt_event_data res;
+ struct res_overlay dispatch_data;
if (dispatchFlags < SA_DISPATCH_ONE ||
dispatchFlags > SA_DISPATCH_BLOCKING) {
@@ -630,19 +627,9 @@
}
do {
- poll_fd = evti->ei_dispatch_fd;
+ dispatch_avail = openais_dispatch_recv (evti->ipc_ctx,
+ (void *)&dispatch_data, timeout);
- ufds.fd = poll_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
-
- error = saPollRetry(&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto dispatch_put;
- }
-
- pthread_mutex_lock(&evti->ei_dispatch_mutex);
-
/*
* Handle has been finalized in another thread
*/
@@ -651,61 +638,13 @@
goto dispatch_unlock;
}
- /*
- * If we know that we have an event waiting, we can skip the
- * polling and just ask for it.
- */
- if (!evti->ei_data_available) {
- /*
- * Check the poll data in case the fd status has changed
- * since taking the lock
- */
- error = saPollRetry(&ufds, 1, 0);
- if (error != SA_AIS_OK) {
- goto dispatch_unlock;
- }
-
- if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
- error = SA_AIS_ERR_BAD_HANDLE;
- goto dispatch_unlock;
- }
-
- dispatch_avail = ufds.revents & POLLIN;
- if (dispatch_avail == 0 &&
- (dispatchFlags == SA_DISPATCH_ALL ||
- dispatchFlags == SA_DISPATCH_ONE)) {
- pthread_mutex_unlock(&evti->ei_dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else if (dispatch_avail == 0) {
- pthread_mutex_unlock(&evti->ei_dispatch_mutex);
- continue; /* next poll */
- }
-
- if (ufds.revents & POLLIN) {
- error = saRecvRetry (evti->ei_dispatch_fd, &evti->ei_dispatch_data.header,
- sizeof (mar_res_header_t));
-
- if (error != SA_AIS_OK) {
- goto dispatch_unlock;
- }
- if (evti->ei_dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (evti->ei_dispatch_fd, &evti->ei_dispatch_data.data,
- evti->ei_dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto dispatch_unlock;
- }
- }
- } else {
- pthread_mutex_unlock(&evti->ei_dispatch_mutex);
- continue;
- }
- } else {
- /*
- * We know that we have an event available from before.
- * Fake up a header message and the switch statement will
- * take care of the rest.
- */
- evti->ei_dispatch_data.header.id = MESSAGE_RES_EVT_AVAILABLE;
+ if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+ pthread_mutex_unlock (&evti->ei_dispatch_mutex);
+ break; /* exit do while cont is 1 loop */
+ } else
+ if (dispatch_avail == 0) {
+ pthread_mutex_unlock (&evti->ei_dispatch_mutex);
+ continue; /* next poll */
}
/*
@@ -731,14 +670,14 @@
res.evd_head.size = sizeof(res);
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendRetry(evti->ei_response_fd, &res, sizeof(res));
+ //TODO error = saSendRetry(evti->ipc_ctx, &res, sizeof(res));
if (error != SA_AIS_OK) {
DPRINT (("MESSAGE_RES_EVT_AVAILABLE: send failed: %d\n", error));
pthread_mutex_unlock(&evti->ei_response_mutex);
break;
}
- error = evt_recv_event(evti->ei_response_fd, &evt);
+//TODO error = evt_recv_event(evti->ipc_ctx, &evt);
pthread_mutex_unlock(&evti->ei_response_mutex);
if (error != SA_AIS_OK) {
@@ -903,21 +842,12 @@
evti->ei_finalize = 1;
+ openais_service_disconnect (evti->ipc_ctx);
+
pthread_mutex_unlock (&evti->ei_response_mutex);
saHandleDestroy(&evt_instance_handle_db, evtHandle);
- /*
- * Disconnect from the server
- */
- if (evti->ei_response_fd != -1) {
- shutdown(evti->ei_response_fd, 0);
- close(evti->ei_response_fd);
- }
- if (evti->ei_dispatch_fd != -1) {
- shutdown(evti->ei_dispatch_fd, 0);
- close(evti->ei_dispatch_fd);
- }
saHandleInstancePut(&evt_instance_handle_db, evtHandle);
return error;
@@ -1004,7 +934,7 @@
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive(evti->ipc_ctx, &iov, 1,
&res, sizeof(res));
pthread_mutex_unlock (&evti->ei_response_mutex);
@@ -1107,7 +1037,7 @@
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive (evti->ipc_ctx, &iov, 1,
&res, sizeof (res));
pthread_mutex_unlock(&evti->ei_response_mutex);
@@ -1231,7 +1161,7 @@
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive (evti->ipc_ctx, &iov, 1,
&res, sizeof (res));
pthread_mutex_unlock(&evti->ei_response_mutex);
@@ -1332,7 +1262,7 @@
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive (evti->ipc_ctx, &iov, 1,
&res, sizeof (res));
pthread_mutex_unlock(&evti->ei_response_mutex);
@@ -2008,7 +1938,7 @@
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1, &res,
+ error = openais_msg_send_reply_receive(evti->ipc_ctx, &iov, 1, &res,
sizeof(res));
pthread_mutex_unlock (&evti->ei_response_mutex);
@@ -2125,7 +2055,7 @@
iov.iov_len = req->ics_head.size;
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive(evti->ipc_ctx, &iov, 1,
&res, sizeof(res));
pthread_mutex_unlock (&evti->ei_response_mutex);
free(req);
@@ -2188,7 +2118,7 @@
iov.iov_len = sizeof(req);
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive(evti->ipc_ctx, &iov, 1,
&res, sizeof(res));
pthread_mutex_unlock (&evti->ei_response_mutex);
@@ -2259,7 +2189,7 @@
iov.iov_len = sizeof(req);
pthread_mutex_lock(&evti->ei_response_mutex);
- error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive(evti->ipc_ctx, &iov, 1,
&res, sizeof(res));
pthread_mutex_unlock (&evti->ei_response_mutex);
Index: lib/cfg.c
===================================================================
--- lib/cfg.c (revision 1687)
+++ lib/cfg.c (working copy)
@@ -61,8 +61,7 @@
* Data structure for instance data
*/
struct cfg_instance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
OpenaisCfgCallbacksT callbacks;
SaNameT compName;
int compRegistered;
@@ -112,13 +111,8 @@
goto error_destroy;
}
- cfg_instance->response_fd = -1;
-
- cfg_instance->dispatch_fd = -1;
-
- error = saServiceConnect (&cfg_instance->response_fd,
- &cfg_instance->dispatch_fd, CFG_SERVICE);
- if (error != SA_AIS_OK) {
+ error = openais_service_connect (CFG_SERVICE, &cfg_instance->ipc_ctx);
+ if (error != 0) {
goto error_put_destroy;
}
@@ -155,7 +149,7 @@
return (error);
}
- *selectionObject = cfg_instance->dispatch_fd;
+ *selectionObject = openais_fd_get (cfg_instance->ipc_ctx);
saHandleInstancePut (&cfg_hdb, cfg_handle);
return (SA_AIS_OK);
@@ -166,7 +160,6 @@
openais_cfg_handle_t cfg_handle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int cont = 1; /* always continue do loop except when set to 0 */
@@ -196,35 +189,15 @@
}
do {
- /*
- * Read data directly from socket
- */
- ufds.fd = cfg_instance->dispatch_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
+ dispatch_avail = openais_dispatch_recv (cfg_instance->ipc_ctx,
+ (void *)&dispatch_data, timeout);
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
- pthread_mutex_lock (&cfg_instance->dispatch_mutex);
-
- error = saPollRetry (&ufds, 1, 0);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
- /*
- * Handle has been finalized in another thread
- */
if (cfg_instance->finalize == 1) {
error = SA_AIS_OK;
pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
goto error_unlock;
}
- dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -234,27 +207,6 @@
continue; /* next poll */
}
- if (ufds.revents & POLLIN) {
- /*
- * Queue empty, read response from socket
- */
- error = saRecvRetry (cfg_instance->dispatch_fd, &dispatch_data.header,
- sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (cfg_instance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
- } else {
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
- continue;
- }
-
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
@@ -332,15 +284,6 @@
saHandleDestroy (&cfg_hdb, cfg_handle);
- if (cfg_instance->response_fd != -1) {
- shutdown (cfg_instance->response_fd, 0);
- close (cfg_instance->response_fd);
- }
- if (cfg_instance->dispatch_fd != -1) {
- shutdown (cfg_instance->dispatch_fd, 0);
- close (cfg_instance->dispatch_fd);
- }
-
saHandleInstancePut (&cfg_hdb, cfg_handle);
return (error);
@@ -358,6 +301,7 @@
struct res_lib_cfg_ringstatusget res_lib_cfg_ringstatusget;
unsigned int i;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle, (void *)&cfg_instance);
if (error != SA_AIS_OK) {
@@ -367,11 +311,14 @@
req_lib_cfg_ringstatusget.header.size = sizeof (struct req_lib_cfg_ringstatusget);
req_lib_cfg_ringstatusget.header.id = MESSAGE_REQ_CFG_RINGSTATUSGET;
+ iov.iov_base = &req_lib_cfg_ringstatusget,
+ iov.iov_len = sizeof (struct req_lib_cfg_ringstatusget),
+
pthread_mutex_lock (&cfg_instance->response_mutex);
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_ringstatusget,
- sizeof (struct req_lib_cfg_ringstatusget),
+ error = openais_msg_send_reply_receive(cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_ringstatusget,
sizeof (struct res_lib_cfg_ringstatusget));
@@ -434,6 +381,7 @@
struct req_lib_cfg_ringreenable req_lib_cfg_ringreenable;
struct res_lib_cfg_ringreenable res_lib_cfg_ringreenable;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle, (void *)&cfg_instance);
if (error != SA_AIS_OK) {
@@ -443,11 +391,14 @@
req_lib_cfg_ringreenable.header.size = sizeof (struct req_lib_cfg_ringreenable);
req_lib_cfg_ringreenable.header.id = MESSAGE_REQ_CFG_RINGREENABLE;
+ iov.iov_base = &req_lib_cfg_ringreenable,
+ iov.iov_len = sizeof (struct req_lib_cfg_ringreenable);
+
pthread_mutex_lock (&cfg_instance->response_mutex);
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_ringreenable,
- sizeof (struct req_lib_cfg_ringreenable),
+ error = openais_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_ringreenable,
sizeof (struct res_lib_cfg_ringreenable));
@@ -467,6 +418,7 @@
struct req_lib_cfg_statetrack req_lib_cfg_statetrack;
struct res_lib_cfg_statetrack res_lib_cfg_statetrack;
SaAisErrorT error;
+ struct iovec iov;
req_lib_cfg_statetrack.header.size = sizeof (struct req_lib_cfg_statetrack);
req_lib_cfg_statetrack.header.id = MESSAGE_REQ_CFG_STATETRACKSTART;
@@ -479,11 +431,14 @@
return (error);
}
+ iov.iov_base = &req_lib_cfg_statetrack,
+ iov.iov_len = sizeof (struct req_lib_cfg_statetrack),
+
pthread_mutex_lock (&cfg_instance->response_mutex);
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_statetrack,
- sizeof (struct req_lib_cfg_statetrack),
+ error = openais_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_statetrack,
sizeof (struct res_lib_cfg_statetrack));
@@ -502,6 +457,7 @@
struct req_lib_cfg_statetrackstop req_lib_cfg_statetrackstop;
struct res_lib_cfg_statetrackstop res_lib_cfg_statetrackstop;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle,
(void *)&cfg_instance);
@@ -512,11 +468,13 @@
req_lib_cfg_statetrackstop.header.size = sizeof (struct \
req_lib_cfg_statetrackstop); req_lib_cfg_statetrackstop.header.id = \
MESSAGE_REQ_CFG_STATETRACKSTOP;
+ iov.iov_base = &req_lib_cfg_statetrackstop,
+ iov.iov_len = sizeof (struct req_lib_cfg_statetrackstop),
pthread_mutex_lock (&cfg_instance->response_mutex);
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_statetrackstop,
- sizeof (struct req_lib_cfg_statetrackstop),
+ error = openais_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_statetrackstop,
sizeof (struct res_lib_cfg_statetrackstop));
@@ -537,6 +495,7 @@
struct req_lib_cfg_administrativestateget req_lib_cfg_administrativestateget;
struct res_lib_cfg_administrativestateget res_lib_cfg_administrativestateget;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle,
(void *)&cfg_instance);
@@ -548,9 +507,14 @@
req_lib_cfg_administrativestateget.header.size = sizeof (struct \
req_lib_cfg_administrativestateget); \
req_lib_cfg_administrativestateget.administrativeTarget = administrativeTarget;
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_administrativestateget,
- sizeof (struct req_lib_cfg_administrativestateget),
+ iov.iov_base = &req_lib_cfg_administrativestateget,
+ iov.iov_len = sizeof (struct req_lib_cfg_administrativestateget),
+
+ pthread_mutex_lock (&cfg_instance->response_mutex);
+
+ error = openais_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_administrativestateget,
sizeof (struct res_lib_cfg_administrativestateget));
@@ -573,6 +537,7 @@
struct req_lib_cfg_administrativestateset req_lib_cfg_administrativestateset;
struct res_lib_cfg_administrativestateset res_lib_cfg_administrativestateset;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle,
(void *)&cfg_instance);
@@ -585,9 +550,14 @@
req_lib_cfg_administrativestateset.administrativeTarget = administrativeTarget;
req_lib_cfg_administrativestateset.administrativeState = administrativeState;
- error = saSendReceiveReply (cfg_instance->response_fd,
- &req_lib_cfg_administrativestateset,
- sizeof (struct req_lib_cfg_administrativestateset),
+ iov.iov_base = &req_lib_cfg_administrativestateset,
+ iov.iov_len = sizeof (struct req_lib_cfg_administrativestateset),
+
+ pthread_mutex_lock (&cfg_instance->response_mutex);
+
+ error = openais_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_cfg_administrativestateset,
sizeof (struct res_lib_cfg_administrativestateset));
Index: lib/ckpt.c
===================================================================
--- lib/ckpt.c (revision 1687)
+++ lib/ckpt.c (working copy)
@@ -65,8 +65,7 @@
* Data structure for instance data
*/
struct ckptInstance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
SaCkptCallbacksT callbacks;
int finalize;
SaCkptHandleT ckptHandle;
@@ -76,7 +75,7 @@
};
struct ckptCheckpointInstance {
- int response_fd;
+ void *ipc_ctx;
SaCkptHandleT ckptHandle;
SaCkptCheckpointHandleT checkpointHandle;
SaCkptCheckpointOpenFlagsT checkpointOpenFlags;
@@ -88,7 +87,7 @@
};
struct ckptSectionIterationInstance {
- int response_fd;
+ void *ipc_ctx;
SaCkptSectionIterationHandleT sectionIterationHandle;
SaNameT checkpointName;
SaSizeT maxSectionIdSize;
@@ -286,10 +285,7 @@
goto error_destroy;
}
- ckptInstance->response_fd = -1;
-
- error = saServiceConnect (&ckptInstance->response_fd,
- &ckptInstance->dispatch_fd, CKPT_SERVICE);
+ error = openais_service_connect (CKPT_SERVICE, &ckptInstance->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -334,7 +330,7 @@
return (error);
}
- *selectionObject = ckptInstance->dispatch_fd;
+ *selectionObject = openais_fd_get (ckptInstance->ipc_ctx);
saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
@@ -346,9 +342,7 @@
const SaCkptHandleT ckptHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
- int poll_fd;
- int timeout = 1;
+ int timeout = -1;
SaCkptCallbacksT callbacks;
SaAisErrorT error;
int dispatch_avail;
@@ -373,69 +367,45 @@
}
/*
- * Timeout instantly for SA_DISPATCH_ALL
+ * Timeout instantly for SA_DISPATCH_ALL, otherwise don't timeout
+ * for SA_DISPATCH_BLOCKING or SA_DISPATCH_ONE
*/
if (dispatchFlags == SA_DISPATCH_ALL) {
timeout = 0;
}
do {
- /*
- * Read data directly from socket
- */
- poll_fd = ckptInstance->dispatch_fd;
- ufds.fd = poll_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
+ pthread_mutex_lock (&ckptInstance->dispatch_mutex);
- error = saPollRetry(&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_put;
- }
- pthread_mutex_lock(&ckptInstance->dispatch_mutex);
+ dispatch_avail = openais_dispatch_recv (ckptInstance->ipc_ctx,
+ &dispatch_data, timeout);
- if (ckptInstance->finalize == 1) {
- error = SA_AIS_OK;
- goto error_unlock;
- }
+ pthread_mutex_unlock (&ckptInstance->dispatch_mutex);
- if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
- error = SA_AIS_ERR_BAD_HANDLE;
- goto error_unlock;
- }
-
- dispatch_avail = (ufds.revents & POLLIN);
-
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
- pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
- pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
continue;
}
-
- memset(&dispatch_data,0, sizeof(struct message_overlay));
- error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.header, sizeof \
(mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
+ if (dispatch_avail == -1) {
+ if (ckptInstance->finalize == 1) {
+ error = SA_AIS_OK;
+ } else {
+ error = SA_AIS_ERR_LIBRARY;
}
+ goto error_exit;
}
-
+
/*
* Make copy of callbacks, message data, unlock instance,
* and call callback. A risk of this dispatch method is that
* the callback routines may operate at the same time that
* CkptFinalize has been called in another thread.
*/
- memcpy(&callbacks,&ckptInstance->callbacks, sizeof(ckptInstance->callbacks));
- pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
+ memcpy (&callbacks, &ckptInstance->callbacks,
+ sizeof(ckptInstance->callbacks));
+
/*
* Dispatch incoming response
*/
@@ -503,7 +473,7 @@
*/
switch (dispatchFlags) {
case SA_DISPATCH_ONE:
- cont = 0;
+ cont = 0;
break;
case SA_DISPATCH_ALL:
break;
@@ -511,9 +481,7 @@
break;
}
} while (cont);
-error_unlock:
- pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
-error_put:
+
saHandleInstancePut(&ckptHandleDatabase, ckptHandle);
error_exit:
return (error);
@@ -545,20 +513,12 @@
ckptInstance->finalize = 1;
+ openais_service_disconnect (ckptInstance->ipc_ctx);
+
pthread_mutex_unlock (&ckptInstance->response_mutex);
ckptInstanceFinalize (ckptInstance);
- if (ckptInstance->response_fd != -1) {
- shutdown (ckptInstance->response_fd, 0);
- close (ckptInstance->response_fd);
- }
-
- if (ckptInstance->dispatch_fd != -1) {
- shutdown (ckptInstance->dispatch_fd, 0);
- close (ckptInstance->dispatch_fd);
- }
-
saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
return (SA_AIS_OK);
@@ -578,6 +538,7 @@
struct ckptInstance *ckptInstance;
struct req_lib_ckpt_checkpointopen req_lib_ckpt_checkpointopen;
struct res_lib_ckpt_checkpointopen res_lib_ckpt_checkpointopen;
+ struct iovec iov;
if (checkpointHandle == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -628,7 +589,7 @@
goto error_destroy;
}
- ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
+ ckptCheckpointInstance->ipc_ctx = ckptInstance->ipc_ctx;
ckptCheckpointInstance->ckptHandle = ckptHandle;
ckptCheckpointInstance->checkpointHandle = *checkpointHandle;
@@ -652,18 +613,16 @@
}
req_lib_ckpt_checkpointopen.checkpoint_open_flags = checkpointOpenFlags;
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
&req_lib_ckpt_checkpointopen,
- sizeof (struct req_lib_ckpt_checkpointopen));
- if (error != SA_AIS_OK) {
- goto error_put_destroy;
- }
+ iov.iov_base = &req_lib_ckpt_checkpointopen;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointopen);
- error = saRecvRetry (ckptCheckpointInstance->response_fd, \
&res_lib_ckpt_checkpointopen, + error = openais_msg_send_reply_receive (
+ ckptInstance->ipc_ctx,
+ &iov,
+ 1,
+ &res_lib_ckpt_checkpointopen,
sizeof (struct res_lib_ckpt_checkpointopen));
- if (error != SA_AIS_OK) {
- goto error_put_destroy;
- }
-
+
if (res_lib_ckpt_checkpointopen.header.error != SA_AIS_OK) {
error = res_lib_ckpt_checkpointopen.header.error;
goto error_put_destroy;
@@ -706,6 +665,7 @@
SaAisErrorT error;
struct req_lib_ckpt_checkpointopen req_lib_ckpt_checkpointopen;
struct res_lib_ckpt_checkpointopenasync res_lib_ckpt_checkpointopenasync;
+ struct iovec iov;
SaAisErrorT failWithError = SA_AIS_OK;
if (checkpointName == NULL) {
@@ -755,7 +715,7 @@
goto error_destroy;
}
- ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
+ ckptCheckpointInstance->ipc_ctx = ckptInstance->ipc_ctx;
ckptCheckpointInstance->ckptHandle = ckptHandle;
ckptCheckpointInstance->checkpointHandle = checkpointHandle;
ckptCheckpointInstance->checkpointOpenFlags = checkpointOpenFlags;
@@ -782,9 +742,13 @@
req_lib_ckpt_checkpointopen.checkpoint_open_flags = checkpointOpenFlags;
req_lib_ckpt_checkpointopen.checkpoint_handle = checkpointHandle;
- error = saSendReceiveReply (ckptInstance->response_fd,
- &req_lib_ckpt_checkpointopen,
- sizeof (struct req_lib_ckpt_checkpointopen),
+ iov.iov_base = &req_lib_ckpt_checkpointopen;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointopen);
+
+ error = openais_msg_send_reply_receive (
+ ckptInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointopenasync,
sizeof (struct res_lib_ckpt_checkpointopenasync));
@@ -822,6 +786,7 @@
struct req_lib_ckpt_checkpointclose req_lib_ckpt_checkpointclose;
struct res_lib_ckpt_checkpointclose res_lib_ckpt_checkpointclose;
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
error = saHandleInstanceGet (&checkpointHandleDatabase, checkpointHandle,
@@ -837,11 +802,15 @@
req_lib_ckpt_checkpointclose.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_checkpointclose;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointclose);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_checkpointclose,
- sizeof (struct req_lib_ckpt_checkpointclose),
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointclose,
sizeof (struct res_lib_ckpt_checkpointclose));
@@ -866,6 +835,7 @@
const SaNameT *checkpointName)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptInstance *ckptInstance;
struct req_lib_ckpt_checkpointunlink req_lib_ckpt_checkpointunlink;
struct res_lib_ckpt_checkpointunlink res_lib_ckpt_checkpointunlink;
@@ -883,11 +853,15 @@
marshall_to_mar_name_t (&req_lib_ckpt_checkpointunlink.checkpoint_name,
(SaNameT *)checkpointName);
+ iov.iov_base = &req_lib_ckpt_checkpointunlink;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointunlink);
+
pthread_mutex_lock (&ckptInstance->response_mutex);
- error = saSendReceiveReply (ckptInstance->response_fd,
- &req_lib_ckpt_checkpointunlink,
- sizeof (struct req_lib_ckpt_checkpointunlink),
+ error = openais_msg_send_reply_receive (
+ ckptInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointunlink,
sizeof (struct res_lib_ckpt_checkpointunlink));
@@ -905,6 +879,7 @@
SaTimeT retentionDuration)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_checkpointretentiondurationset \
req_lib_ckpt_checkpointretentiondurationset; struct \
res_lib_ckpt_checkpointretentiondurationset \
res_lib_ckpt_checkpointretentiondurationset; @@ -924,11 +899,15 @@
req_lib_ckpt_checkpointretentiondurationset.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_checkpointretentiondurationset;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointretentiondurationset);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_checkpointretentiondurationset,
- sizeof (struct req_lib_ckpt_checkpointretentiondurationset),
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointretentiondurationset,
sizeof (struct res_lib_ckpt_checkpointretentiondurationset));
@@ -943,6 +922,7 @@
SaCkptCheckpointHandleT checkpointHandle)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_activereplicaset req_lib_ckpt_activereplicaset;
struct res_lib_ckpt_activereplicaset res_lib_ckpt_activereplicaset;
@@ -965,11 +945,15 @@
req_lib_ckpt_activereplicaset.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_activereplicaset;
+ iov.iov_len = sizeof (struct req_lib_ckpt_activereplicaset);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_activereplicaset,
- sizeof (struct req_lib_ckpt_activereplicaset),
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_activereplicaset,
sizeof (struct res_lib_ckpt_activereplicaset));
@@ -987,6 +971,7 @@
SaCkptCheckpointDescriptorT *checkpointStatus)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_checkpointstatusget req_lib_ckpt_checkpointstatusget;
struct res_lib_ckpt_checkpointstatusget res_lib_ckpt_checkpointstatusget;
@@ -1008,11 +993,14 @@
req_lib_ckpt_checkpointstatusget.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_checkpointstatusget;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointstatusget);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_checkpointstatusget,
- sizeof (struct req_lib_ckpt_checkpointstatusget),
+ error = openais_msg_send_reply_receive (ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointstatusget,
sizeof (struct res_lib_ckpt_checkpointstatusget));
@@ -1035,6 +1023,8 @@
SaSizeT initialDataSize)
{
SaAisErrorT error;
+ struct iovec iov[3];
+ int iov_len;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_sectioncreate req_lib_ckpt_sectioncreate;
struct res_lib_ckpt_sectioncreate res_lib_ckpt_sectioncreate;
@@ -1073,30 +1063,27 @@
req_lib_ckpt_sectioncreate.ckpt_id =
ckptCheckpointInstance->checkpointId;
- pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
&req_lib_ckpt_sectioncreate,
- sizeof (struct req_lib_ckpt_sectioncreate));
- if (error != SA_AIS_OK) {
- goto error_exit;
+ iov[0].iov_base = &req_lib_ckpt_sectioncreate;
+ iov[0].iov_len = sizeof (struct req_lib_ckpt_sectioncreate);
+ iov_len = 1;
+ if (sectionCreationAttributes->sectionId->id) {
+ iov[1].iov_base = sectionCreationAttributes->sectionId->id;
+ iov[1].iov_len = sectionCreationAttributes->sectionId->idLen;
+ iov_len = 2;
}
-
- /*
- * Write section identifier to server
- */
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
sectionCreationAttributes->sectionId->id,
- sectionCreationAttributes->sectionId->idLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
+ if (initialDataSize) {
+ iov[iov_len].iov_base = (void *)initialData;
+ iov[iov_len].iov_len = initialDataSize;
+ iov_len++;
}
- error = saSendRetry (ckptCheckpointInstance->response_fd, initialData,
- initialDataSize);
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
+ pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saRecvRetry (ckptCheckpointInstance->response_fd,
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ iov,
+ iov_len,
&res_lib_ckpt_sectioncreate,
sizeof (struct res_lib_ckpt_sectioncreate));
@@ -1115,6 +1102,8 @@
const SaCkptSectionIdT *sectionId)
{
SaAisErrorT error;
+ struct iovec iov[2];
+ int iov_len;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_sectiondelete req_lib_ckpt_sectiondelete;
struct res_lib_ckpt_sectiondelete res_lib_ckpt_sectiondelete;
@@ -1134,8 +1123,6 @@
goto error_put;
}
- pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
-
req_lib_ckpt_sectiondelete.header.size = sizeof (struct req_lib_ckpt_sectiondelete) \
+ sectionId->idLen; req_lib_ckpt_sectiondelete.header.id = \
MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE; req_lib_ckpt_sectiondelete.id_len = \
sectionId->idLen; @@ -1146,25 +1133,23 @@
req_lib_ckpt_sectiondelete.ckpt_id =
ckptCheckpointInstance->checkpointId;
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
&req_lib_ckpt_sectiondelete,
- sizeof (struct req_lib_ckpt_sectiondelete));
- if (error != SA_AIS_OK) {
- goto error_exit;
+ iov[0].iov_base = &req_lib_ckpt_sectiondelete;
+ iov[0].iov_len = sizeof (struct req_lib_ckpt_sectiondelete);
+ iov_len = 1;
+ if (sectionId->idLen) {
+ iov[1].iov_base = sectionId->id;
+ iov[1].iov_len = sectionId->idLen;
+ iov_len = 2;
}
- /*
- * Write section identifier to server
- */
- error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
- sectionId->idLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
- error = saRecvRetry (ckptCheckpointInstance->response_fd,
+ pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
+
+ error = openais_msg_send_reply_receive (ckptCheckpointInstance->ipc_ctx,
+ iov,
+ iov_len,
&res_lib_ckpt_sectiondelete,
sizeof (struct res_lib_ckpt_sectiondelete));
-error_exit:
pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
error_put:
@@ -1179,6 +1164,8 @@
SaTimeT expirationTime)
{
SaAisErrorT error;
+ struct iovec iov[2];
+ unsigned int iov_len;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_sectionexpirationtimeset req_lib_ckpt_sectionexpirationtimeset;
struct res_lib_ckpt_sectionexpirationtimeset res_lib_ckpt_sectionexpirationtimeset;
@@ -1209,30 +1196,24 @@
req_lib_ckpt_sectionexpirationtimeset.ckpt_id =
ckptCheckpointInstance->checkpointId;
- pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
-
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
&req_lib_ckpt_sectionexpirationtimeset,
- sizeof (struct req_lib_ckpt_sectionexpirationtimeset));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
-
- /*
- * Write section identifier to server
- */
+ iov[0].iov_base = &req_lib_ckpt_sectionexpirationtimeset;
+ iov[0].iov_len = sizeof (struct req_lib_ckpt_sectionexpirationtimeset);
+ iov_len = 1;
if (sectionId->idLen) {
- error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
- sectionId->idLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
+ iov[1].iov_base = sectionId->id;
+ iov[1].iov_len = sectionId->idLen;
+ iov_len = 2;
}
+
+ pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saRecvRetry (ckptCheckpointInstance->response_fd,
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ iov,
+ iov_len,
&res_lib_ckpt_sectionexpirationtimeset,
sizeof (struct res_lib_ckpt_sectionexpirationtimeset));
-error_exit:
pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
error_put:
@@ -1249,6 +1230,7 @@
SaCkptSectionIterationHandleT *sectionIterationHandle)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct ckptSectionIterationInstance *ckptSectionIterationInstance;
struct req_lib_ckpt_sectioniterationinitialize \
req_lib_ckpt_sectioniterationinitialize; @@ -1284,7 +1266,7 @@
goto error_destroy;
}
- ckptSectionIterationInstance->response_fd = ckptCheckpointInstance->response_fd;
+ ckptSectionIterationInstance->ipc_ctx = ckptCheckpointInstance->ipc_ctx;
ckptSectionIterationInstance->sectionIterationHandle = *sectionIterationHandle;
memcpy (&ckptSectionIterationInstance->checkpointName,
@@ -1312,11 +1294,14 @@
req_lib_ckpt_sectioniterationinitialize.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_sectioniterationinitialize;
+ iov.iov_len = sizeof (struct req_lib_ckpt_sectioniterationinitialize);
+
pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
- error = saSendReceiveReply (ckptSectionIterationInstance->response_fd,
- &req_lib_ckpt_sectioniterationinitialize,
- sizeof (struct req_lib_ckpt_sectioniterationinitialize),
+ error = openais_msg_send_reply_receive (ckptSectionIterationInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_sectioniterationinitialize,
sizeof (struct res_lib_ckpt_sectioniterationinitialize));
@@ -1351,10 +1336,12 @@
SaCkptSectionDescriptorT *sectionDescriptor)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptSectionIterationInstance *ckptSectionIterationInstance;
struct req_lib_ckpt_sectioniterationnext req_lib_ckpt_sectioniterationnext;
- struct res_lib_ckpt_sectioniterationnext res_lib_ckpt_sectioniterationnext;
+ struct res_lib_ckpt_sectioniterationnext *res_lib_ckpt_sectioniterationnext;
struct iteratorSectionIdListEntry *iteratorSectionIdListEntry;
+ void *return_address;
if (sectionDescriptor == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -1379,13 +1366,17 @@
req_lib_ckpt_sectioniterationnext.header.id = \
MESSAGE_REQ_CKPT_SECTIONITERATIONNEXT; \
req_lib_ckpt_sectioniterationnext.iteration_handle = \
ckptSectionIterationInstance->executive_iteration_handle;
+ iov.iov_base = &req_lib_ckpt_sectioniterationnext;
+ iov.iov_len = sizeof (struct req_lib_ckpt_sectioniterationnext);
+
pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
- error = saSendReceiveReply (ckptSectionIterationInstance->response_fd,
- &req_lib_ckpt_sectioniterationnext,
- sizeof (struct req_lib_ckpt_sectioniterationnext),
- &res_lib_ckpt_sectioniterationnext,
- sizeof (struct res_lib_ckpt_sectioniterationnext));
+ error = openais_msg_send_reply_receive_in_buf (
+ ckptSectionIterationInstance->ipc_ctx,
+ &iov,
+ 1,
+ &return_address);
+ res_lib_ckpt_sectioniterationnext = return_address;
if (error != SA_AIS_OK) {
goto error_put_unlock;
@@ -1393,18 +1384,18 @@
marshall_from_mar_ckpt_section_descriptor_t (
sectionDescriptor,
- &res_lib_ckpt_sectioniterationnext.section_descriptor);
+ &res_lib_ckpt_sectioniterationnext->section_descriptor);
sectionDescriptor->sectionId.id = &iteratorSectionIdListEntry->data[0];
-
- if ((res_lib_ckpt_sectioniterationnext.header.size - sizeof (struct \
res_lib_ckpt_sectioniterationnext)) > 0) {
- error = saRecvRetry (ckptSectionIterationInstance->response_fd,
- sectionDescriptor->sectionId.id,
- res_lib_ckpt_sectioniterationnext.header.size -
- sizeof (struct res_lib_ckpt_sectioniterationnext));
- }
- error = (error == SA_AIS_OK ? res_lib_ckpt_sectioniterationnext.header.error : \
error); + memcpy (sectionDescriptor->sectionId.id,
+ ((char *)res_lib_ckpt_sectioniterationnext) +
+ sizeof (struct res_lib_ckpt_sectioniterationnext),
+ res_lib_ckpt_sectioniterationnext->header.size -
+ sizeof (struct res_lib_ckpt_sectioniterationnext));
+
+
+ error = (error == SA_AIS_OK ? res_lib_ckpt_sectioniterationnext->header.error : \
error);
/*
* Add to persistent memory list for this sectioniterator
@@ -1432,6 +1423,7 @@
SaCkptSectionIterationHandleT sectionIterationHandle)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptSectionIterationInstance *ckptSectionIterationInstance;
struct req_lib_ckpt_sectioniterationfinalize req_lib_ckpt_sectioniterationfinalize;
struct res_lib_ckpt_sectioniterationfinalize res_lib_ckpt_sectioniterationfinalize;
@@ -1446,11 +1438,14 @@
req_lib_ckpt_sectioniterationfinalize.header.id = \
MESSAGE_REQ_CKPT_SECTIONITERATIONFINALIZE; \
req_lib_ckpt_sectioniterationfinalize.iteration_handle = \
ckptSectionIterationInstance->executive_iteration_handle;
+ iov.iov_base = &req_lib_ckpt_sectioniterationfinalize;
+ iov.iov_len = sizeof (struct req_lib_ckpt_sectioniterationfinalize);
+
pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
- error = saSendReceiveReply (ckptSectionIterationInstance->response_fd,
- &req_lib_ckpt_sectioniterationfinalize,
- sizeof (struct req_lib_ckpt_sectioniterationfinalize),
+ error = openais_msg_send_reply_receive (ckptSectionIterationInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_sectioniterationfinalize,
sizeof (struct res_lib_ckpt_sectioniterationfinalize));
@@ -1538,10 +1533,9 @@
ckptCheckpointInstance->checkpointId;
iov_len = 0;
- iov_idx = 0;
- iov[iov_idx].iov_base = (char *)&req_lib_ckpt_sectionwrite;
- iov[iov_idx].iov_len = sizeof (struct req_lib_ckpt_sectionwrite);
- iov_idx++;
+ iov[0].iov_base = (char *)&req_lib_ckpt_sectionwrite;
+ iov[0].iov_len = sizeof (struct req_lib_ckpt_sectionwrite);
+ iov_idx = 1;
if (ioVector[i].sectionId.idLen) {
iov[iov_idx].iov_base = ioVector[i].sectionId.id;
@@ -1552,21 +1546,12 @@
iov[iov_idx].iov_len = ioVector[i].dataSize;
iov_idx++;
- error = saSendMsgRetry (ckptCheckpointInstance->response_fd,
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
iov,
- iov_idx);
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
-
- /*
- * Receive response
- */
- error = saRecvRetry (ckptCheckpointInstance->response_fd, \
&res_lib_ckpt_sectionwrite, + iov_idx,
+ &res_lib_ckpt_sectionwrite,
sizeof (struct res_lib_ckpt_sectionwrite));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
if (res_lib_ckpt_sectionwrite.header.error == SA_AIS_ERR_TRY_AGAIN) {
error = SA_AIS_ERR_TRY_AGAIN;
@@ -1600,6 +1585,8 @@
SaSizeT dataSize)
{
SaAisErrorT error;
+ struct iovec iov[3];
+ int iov_idx;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_sectionoverwrite req_lib_ckpt_sectionoverwrite;
struct res_lib_ckpt_sectionoverwrite res_lib_ckpt_sectionoverwrite;
@@ -1631,31 +1618,31 @@
req_lib_ckpt_sectionoverwrite.ckpt_id =
ckptCheckpointInstance->checkpointId;
- pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
-
- error = saSendRetry (ckptCheckpointInstance->response_fd, \
&req_lib_ckpt_sectionoverwrite,
- sizeof (struct req_lib_ckpt_sectionoverwrite));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
-
+ /*
+ * Build request IO Vector
+ */
+ iov[0].iov_base = &req_lib_ckpt_sectionoverwrite;
+ iov[0].iov_len = sizeof (struct req_lib_ckpt_sectionoverwrite);
+ iov_idx = 1;
if (sectionId->idLen) {
- error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
- sectionId->idLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
+ iov[iov_idx].iov_base = sectionId->id;
+ iov[iov_idx].iov_len = sectionId->idLen;
+ iov_idx += 1;
}
- error = saSendRetry (ckptCheckpointInstance->response_fd, dataBuffer, dataSize);
- if (error != SA_AIS_OK) {
- goto error_exit;
+ iov[iov_idx].iov_base = (void *)dataBuffer;
+ iov[iov_idx].iov_len = dataSize;
+ if (dataSize) {
+ iov_idx += 1;
}
- error = saRecvRetry (ckptCheckpointInstance->response_fd,
+ pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
+
+ error = openais_msg_send_reply_receive (ckptCheckpointInstance->ipc_ctx,
+ iov,
+ iov_idx,
&res_lib_ckpt_sectionoverwrite,
sizeof (struct res_lib_ckpt_sectionoverwrite));
-
-error_exit:
+
pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -1673,10 +1660,13 @@
SaAisErrorT error = SA_AIS_OK;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_sectionread req_lib_ckpt_sectionread;
- struct res_lib_ckpt_sectionread res_lib_ckpt_sectionread;
- int dataLength;
+ struct res_lib_ckpt_sectionread *res_lib_ckpt_sectionread = NULL;
+ char *source_char;
+ unsigned int copy_bytes;
int i;
struct iovec iov[3];
+ int source_length;
+ void *return_address;
if (ioVector == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -1697,7 +1687,8 @@
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
for (i = 0; i < numberOfElements; i++) {
- req_lib_ckpt_sectionread.header.size = sizeof (struct req_lib_ckpt_sectionread) +
+ req_lib_ckpt_sectionread.header.size =
+ sizeof (struct req_lib_ckpt_sectionread) +
ioVector[i].sectionId.idLen;
req_lib_ckpt_sectionread.id_len = ioVector[i].sectionId.idLen;
@@ -1714,48 +1705,49 @@
iov[1].iov_base = ioVector[i].sectionId.id;
iov[1].iov_len = ioVector[i].sectionId.idLen;
- error = saSendMsgRetry (ckptCheckpointInstance->response_fd,
+ openais_msg_send_reply_receive_in_buf (
+ ckptCheckpointInstance->ipc_ctx,
iov,
- 2);
+ 2,
+ &return_address);
+ res_lib_ckpt_sectionread = return_address;
- /*
- * Receive response header
- */
- error = saRecvRetry (ckptCheckpointInstance->response_fd, \
&res_lib_ckpt_sectionread,
- sizeof (struct res_lib_ckpt_sectionread));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
-
- dataLength = res_lib_ckpt_sectionread.header.size - sizeof (struct \
res_lib_ckpt_sectionread); + source_char = ((char *)(res_lib_ckpt_sectionread)) +
+ sizeof (struct res_lib_ckpt_sectionread);
+ source_length = res_lib_ckpt_sectionread->header.size -
+ sizeof (struct res_lib_ckpt_sectionread);
+
/*
* Receive checkpoint section data
*/
if (ioVector[i].dataBuffer == 0) {
ioVector[i].dataBuffer =
- malloc (dataLength);
+ malloc (source_length);
if (ioVector[i].dataBuffer == NULL) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
+ ioVector[i].dataSize = source_length;
}
- if (dataLength > 0) {
- error = saRecvRetry (ckptCheckpointInstance->response_fd, ioVector[i].dataBuffer,
- dataLength);
- if (error != SA_AIS_OK) {
- goto error_exit;
+ copy_bytes = source_length;
+ if (source_length > 0) {
+ if (copy_bytes > ioVector[i].dataSize) {
+ copy_bytes = ioVector[i].dataSize;
}
+ memcpy (((char *)ioVector[i].dataBuffer) +
+ ioVector[i].dataOffset,
+ source_char, copy_bytes);
}
- if (res_lib_ckpt_sectionread.header.error != SA_AIS_OK) {
+ if (res_lib_ckpt_sectionread->header.error != SA_AIS_OK) {
goto error_exit;
}
/*
* Report back bytes of data read
*/
- ioVector[i].readSize = res_lib_ckpt_sectionread.data_read;
+ ioVector[i].readSize = copy_bytes;
}
error_exit:
@@ -1766,7 +1758,11 @@
if (error != SA_AIS_OK && erroneousVectorIndex) {
*erroneousVectorIndex = i;
}
- return (error == SA_AIS_OK ? res_lib_ckpt_sectionread.header.error : error);
+ if (res_lib_ckpt_sectionread) {
+ return (error == SA_AIS_OK ? res_lib_ckpt_sectionread->header.error : error);
+ } else {
+ return (error);
+ }
}
SaAisErrorT
@@ -1775,6 +1771,7 @@
SaTimeT timeout)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_checkpointsynchronize req_lib_ckpt_checkpointsynchronize;
struct res_lib_ckpt_checkpointsynchronize res_lib_ckpt_checkpointsynchronize;
@@ -1801,11 +1798,15 @@
req_lib_ckpt_checkpointsynchronize.ckpt_id =
ckptCheckpointInstance->checkpointId;
+ iov.iov_base = &req_lib_ckpt_checkpointsynchronize;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointsynchronize);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_checkpointsynchronize,
- sizeof (struct req_lib_ckpt_checkpointsynchronize),
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointsynchronize,
sizeof (struct res_lib_ckpt_checkpointsynchronize));
@@ -1823,6 +1824,7 @@
SaInvocationT invocation)
{
SaAisErrorT error;
+ struct iovec iov;
struct ckptInstance *ckptInstance;
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_checkpointsynchronizeasync \
req_lib_ckpt_checkpointsynchronizeasync; @@ -1862,11 +1864,15 @@
ckptCheckpointInstance->checkpointId;
req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;
+ iov.iov_base = &req_lib_ckpt_checkpointsynchronizeasync;
+ iov.iov_len = sizeof (struct req_lib_ckpt_checkpointsynchronizeasync);
+
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
- error = saSendReceiveReply (ckptCheckpointInstance->response_fd,
- &req_lib_ckpt_checkpointsynchronizeasync,
- sizeof (struct req_lib_ckpt_checkpointsynchronizeasync),
+ error = openais_msg_send_reply_receive (
+ ckptCheckpointInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_ckpt_checkpointsynchronizeasync,
sizeof (struct res_lib_ckpt_checkpointsynchronizeasync));
Index: lib/amf.c
===================================================================
--- lib/amf.c (revision 1687)
+++ lib/amf.c (working copy)
@@ -61,8 +61,7 @@
* Data structure for instance data
*/
struct amfInstance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
SaAmfCallbacksT callbacks;
SaNameT compName;
int compRegistered;
@@ -131,12 +130,7 @@
goto error_destroy;
}
- amfInstance->response_fd = -1;
-
- amfInstance->dispatch_fd = -1;
-
- error = saServiceConnect (&amfInstance->response_fd,
- &amfInstance->dispatch_fd, AMF_SERVICE);
+ error = openais_service_connect (CPG_SERVICE, &amfInstance->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -172,7 +166,7 @@
return (error);
}
- *selectionObject = amfInstance->dispatch_fd;
+ *selectionObject = openais_fd_get (amfInstance->ipc_ctx);
saHandleInstancePut (&amfHandleDatabase, amfHandle);
return (SA_AIS_OK);
@@ -184,7 +178,6 @@
SaAmfHandleT amfHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int cont = 1; /* always continue do loop except when set to 0 */
@@ -216,28 +209,9 @@
}
do {
- /*
- * Read data directly from socket
- */
- ufds.fd = amfInstance->dispatch_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
+ dispatch_avail = openais_dispatch_recv (amfInstance->ipc_ctx,
+ (void *)&dispatch_data, timeout);
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
- pthread_mutex_lock (&amfInstance->dispatch_mutex);
-
- error = saPollRetry (&ufds, 1, 0);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
- /*
- * Handle has been finalized in another thread
- */
if (amfInstance->finalize == 1) {
error = SA_AIS_OK;
pthread_mutex_unlock (&amfInstance->dispatch_mutex);
@@ -245,7 +219,6 @@
goto error_unlock;
}
- dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
pthread_mutex_unlock (&amfInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -255,33 +228,6 @@
continue; /* next poll */
}
- if (ufds.revents & POLLIN) {
- /*
- * Queue empty, read response from socket
- */
- error = saRecvRetry (amfInstance->dispatch_fd, &dispatch_data.header,
- sizeof (mar_res_header_t));
-
- if (error != SA_AIS_OK) {
-
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
-
- error = saRecvRetry (amfInstance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
-
- if (error != SA_AIS_OK) {
-
- goto error_unlock;
- }
- }
- } else {
- pthread_mutex_unlock (&amfInstance->dispatch_mutex);
-
- continue;
- }
-
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
@@ -442,15 +388,6 @@
saHandleDestroy (&amfHandleDatabase, amfHandle);
- if (amfInstance->response_fd != -1) {
- shutdown (amfInstance->response_fd, 0);
- close (amfInstance->response_fd);
- }
- if (amfInstance->dispatch_fd != -1) {
- shutdown (amfInstance->dispatch_fd, 0);
- close (amfInstance->dispatch_fd);
- }
-
saHandleInstancePut (&amfHandleDatabase, amfHandle);
return (error);
@@ -466,6 +403,7 @@
SaAisErrorT error;
struct req_lib_amf_componentregister req_lib_amf_componentregister;
struct res_lib_amf_componentregister res_lib_amf_componentregister;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -485,11 +423,13 @@
sizeof (SaNameT));
}
+ iov.iov_base = &req_lib_amf_componentregister;
+ iov.iov_len = sizeof (struct req_lib_amf_componentregister);
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_componentregister,
- sizeof (struct req_lib_amf_componentregister),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_componentregister,
sizeof (struct res_lib_amf_componentregister));
@@ -514,6 +454,7 @@
struct res_lib_amf_componentunregister res_lib_amf_componentunregister;
struct amfInstance *amfInstance;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -533,11 +474,14 @@
sizeof (SaNameT));
}
+ iov.iov_base = &req_lib_amf_componentunregister;
+ iov.iov_len = sizeof (struct req_lib_amf_componentunregister);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_componentunregister,
- sizeof (struct req_lib_amf_componentunregister),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_componentunregister,
sizeof (struct res_lib_amf_componentunregister));
@@ -598,6 +542,7 @@
struct res_lib_amf_pmstart res_lib_amf_pmstart;
struct amfInstance *amfInstance;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -613,11 +558,14 @@
req_lib_amf_pmstart.descendentsTreeDepth = descendentsTreeDepth;
req_lib_amf_pmstart.pmErrors = pmErrors;
+ iov.iov_base = &req_lib_amf_pmstart;
+ iov.iov_len = sizeof (struct req_lib_amf_pmstart);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_pmstart,
- sizeof (struct req_lib_amf_pmstart),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_pmstart,
sizeof (struct res_lib_amf_pmstart));
@@ -640,6 +588,7 @@
struct res_lib_amf_pmstop res_lib_amf_pmstop;
struct amfInstance *amfInstance;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -654,11 +603,14 @@
req_lib_amf_pmstop.processId = processId;
req_lib_amf_pmstop.pmErrors = pmErrors;
+ iov.iov_base = &req_lib_amf_pmstop;
+ iov.iov_len = sizeof (struct req_lib_amf_pmstop);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_pmstop,
- sizeof (struct req_lib_amf_pmstop),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_pmstop,
sizeof (struct res_lib_amf_pmstop));
@@ -682,6 +634,7 @@
struct res_lib_amf_healthcheckstart res_lib_amf_healthcheckstart;
struct amfInstance *amfInstance;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -698,11 +651,14 @@
req_lib_amf_healthcheckstart.invocationType = invocationType;
req_lib_amf_healthcheckstart.recommendedRecovery = recommendedRecovery;
+ iov.iov_base = &req_lib_amf_healthcheckstart;
+ iov.iov_len = sizeof (struct req_lib_amf_healthcheckstart);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_healthcheckstart,
- sizeof (struct req_lib_amf_healthcheckstart),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_healthcheckstart,
sizeof (struct res_lib_amf_healthcheckstart));
@@ -724,6 +680,7 @@
struct res_lib_amf_healthcheckconfirm res_lib_amf_healthcheckconfirm;
struct amfInstance *amfInstance;
SaAisErrorT error;
+ struct iovec iov;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
(void *)&amfInstance);
@@ -739,11 +696,14 @@
healthcheckKey, sizeof (SaAmfHealthcheckKeyT));
req_lib_amf_healthcheckconfirm.healthcheckResult = healthcheckResult;
+ iov.iov_base = &req_lib_amf_healthcheckconfirm;
+ iov.iov_len = sizeof (struct req_lib_amf_healthcheckconfirm);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_healthcheckconfirm,
- sizeof (struct req_lib_amf_healthcheckconfirm),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_healthcheckconfirm,
sizeof (struct res_lib_amf_healthcheckconfirm));
@@ -763,6 +723,7 @@
struct req_lib_amf_healthcheckstop req_lib_amf_healthcheckstop;
struct res_lib_amf_healthcheckstop res_lib_amf_healthcheckstop;
struct amfInstance *amfInstance;
+ struct iovec iov;
SaAisErrorT error;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -778,11 +739,14 @@
memcpy (&req_lib_amf_healthcheckstop.healthcheckKey,
healthcheckKey, sizeof (SaAmfHealthcheckKeyT));
+ iov.iov_base = &req_lib_amf_healthcheckstop;
+ iov.iov_len = sizeof (struct req_lib_amf_healthcheckstop);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_healthcheckstop,
- sizeof (struct req_lib_amf_healthcheckstop),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_healthcheckstop,
sizeof (struct res_lib_amf_healthcheckstop));
@@ -804,6 +768,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_hastateget req_lib_amf_hastateget;
struct res_lib_amf_hastateget res_lib_amf_hastateget;
+ struct iovec iov;
SaAisErrorT error;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -812,6 +777,9 @@
return (error);
}
+ iov.iov_base = &req_lib_amf_hastateget,
+ iov.iov_len = sizeof (struct req_lib_amf_hastateget),
+
pthread_mutex_lock (&amfInstance->response_mutex);
req_lib_amf_hastateget.header.id = MESSAGE_REQ_AMF_HASTATEGET;
@@ -819,9 +787,11 @@
memcpy (&req_lib_amf_hastateget.compName, compName, sizeof (SaNameT));
memcpy (&req_lib_amf_hastateget.csiName, csiName, sizeof (SaNameT));
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_hastateget, sizeof (struct req_lib_amf_hastateget),
- &res_lib_amf_hastateget, sizeof (struct res_lib_amf_hastateget));
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
+ &res_lib_amf_hastateget,
+ sizeof (struct res_lib_amf_hastateget));
pthread_mutex_unlock (&amfInstance->response_mutex);
@@ -843,6 +813,7 @@
struct req_lib_amf_csiquiescingcomplete req_lib_amf_csiquiescingcomplete;
struct res_lib_amf_csiquiescingcomplete res_lib_amf_csiquiescingcomplete;
struct amfInstance *amfInstance;
+ struct iovec iov;
SaAisErrorT errorResult;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -856,11 +827,14 @@
req_lib_amf_csiquiescingcomplete.invocation = invocation;
req_lib_amf_csiquiescingcomplete.error = error;
+ iov.iov_base = &req_lib_amf_csiquiescingcomplete;
+ iov.iov_len = sizeof (struct req_lib_amf_csiquiescingcomplete);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- errorResult = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_csiquiescingcomplete,
- sizeof (struct req_lib_amf_csiquiescingcomplete),
+ errorResult = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_csiquiescingcomplete,
sizeof (struct res_lib_amf_csiquiescingcomplete));
@@ -881,6 +855,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_protectiongrouptrack req_lib_amf_protectiongrouptrack;
struct res_lib_amf_protectiongrouptrack res_lib_amf_protectiongrouptrack;
+ struct iovec iov;
SaAisErrorT error;
req_lib_amf_protectiongrouptrack.header.size = sizeof (struct \
req_lib_amf_protectiongrouptrack); @@ -896,11 +871,14 @@
return (error);
}
+ iov.iov_base = &req_lib_amf_protectiongrouptrack;
+ iov.iov_len = sizeof (struct req_lib_amf_protectiongrouptrack);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_protectiongrouptrack,
- sizeof (struct req_lib_amf_protectiongrouptrack),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_protectiongrouptrack,
sizeof (struct res_lib_amf_protectiongrouptrack));
@@ -919,6 +897,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_protectiongrouptrackstop req_lib_amf_protectiongrouptrackstop;
struct res_lib_amf_protectiongrouptrackstop res_lib_amf_protectiongrouptrackstop;
+ struct iovec iov;
SaAisErrorT error;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -931,11 +910,13 @@
req_lib_amf_protectiongrouptrackstop.header.id = \
MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP; memcpy \
(&req_lib_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT));
+ iov.iov_base = &req_lib_amf_protectiongrouptrackstop,
+ iov.iov_len = sizeof (struct req_lib_amf_protectiongrouptrackstop),
pthread_mutex_lock (&amfInstance->response_mutex);
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_protectiongrouptrackstop,
- sizeof (struct req_lib_amf_protectiongrouptrackstop),
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_protectiongrouptrackstop,
sizeof (struct res_lib_amf_protectiongrouptrackstop));
@@ -957,6 +938,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_componenterrorreport req_lib_amf_componenterrorreport;
struct res_lib_amf_componenterrorreport res_lib_amf_componenterrorreport;
+ struct iovec iov;
SaAisErrorT error;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -971,13 +953,14 @@
sizeof (SaNameT));
req_lib_amf_componenterrorreport.errorDetectionTime = errorDetectionTime;
- DPRINT (("start error report\n"));
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_componenterrorreport,
- sizeof (struct req_lib_amf_componenterrorreport),
+ iov.iov_base = &req_lib_amf_componenterrorreport;
+ iov.iov_len = sizeof (struct req_lib_amf_componenterrorreport);
+
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_componenterrorreport,
sizeof (struct res_lib_amf_componenterrorreport));
- DPRINT (("end error report\n"));
error = res_lib_amf_componenterrorreport.header.error;
@@ -997,6 +980,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_componenterrorclear req_lib_amf_componenterrorclear;
struct res_lib_amf_componenterrorclear res_lib_amf_componenterrorclear;
+ struct iovec iov;
SaAisErrorT error;
error = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -1009,9 +993,12 @@
req_lib_amf_componenterrorclear.header.size = sizeof (struct \
req_lib_amf_componenterrorclear); memcpy (&req_lib_amf_componenterrorclear.compName, \
compName, sizeof (SaNameT));
- error = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_componenterrorclear,
- sizeof (struct req_lib_amf_componenterrorclear),
+ iov.iov_base = &req_lib_amf_componenterrorclear;
+ iov.iov_len = sizeof (struct req_lib_amf_componenterrorclear);
+
+ error = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_amf_componenterrorclear,
sizeof (struct res_lib_amf_componenterrorclear));
@@ -1031,6 +1018,7 @@
struct amfInstance *amfInstance;
struct req_lib_amf_response req_lib_amf_response;
struct res_lib_amf_response res_lib_amf_response;
+ struct iovec iov;
SaAisErrorT errorResult;
errorResult = saHandleInstanceGet (&amfHandleDatabase, amfHandle,
@@ -1044,11 +1032,16 @@
req_lib_amf_response.invocation = invocation;
req_lib_amf_response.error = error;
+ iov.iov_base = &req_lib_amf_response;
+ iov.iov_len = sizeof (struct req_lib_amf_response);
+
pthread_mutex_lock (&amfInstance->response_mutex);
- errorResult = saSendReceiveReply (amfInstance->response_fd,
- &req_lib_amf_response, sizeof (struct req_lib_amf_response),
- &res_lib_amf_response, sizeof (struct res_lib_amf_response));
+ errorResult = openais_msg_send_reply_receive (amfInstance->ipc_ctx,
+ &iov,
+ 1,
+ &res_lib_amf_response,
+ sizeof (struct res_lib_amf_response));
pthread_mutex_unlock (&amfInstance->response_mutex);
Index: lib/lck.c
===================================================================
--- lib/lck.c (revision 1687)
+++ lib/lck.c (working copy)
@@ -63,8 +63,7 @@
* Data structure for instance data
*/
struct lckInstance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
SaLckCallbacksT callbacks;
int finalize;
SaLckHandleT lckHandle;
@@ -74,7 +73,7 @@
};
struct lckResourceInstance {
- int response_fd;
+ void *ipc_ctx;
SaLckHandleT lckHandle;
SaLckResourceHandleT lckResourceHandle;
SaLckResourceOpenFlagsT resourceOpenFlags;
@@ -85,7 +84,7 @@
};
struct lckLockIdInstance {
- int response_fd;
+ void *ipc_ctx;
SaLckResourceHandleT lckResourceHandle;
struct list_head list;
void *resource_lock;
@@ -262,10 +261,7 @@
goto error_destroy;
}
- lckInstance->response_fd = -1;
-
- error = saServiceConnect (&lckInstance->response_fd,
- &lckInstance->dispatch_fd, LCK_SERVICE);
+ error = openais_service_connect (LCK_SERVICE, &lckInstance->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -311,7 +307,7 @@
return (error);
}
- *selectionObject = lckInstance->dispatch_fd;
+ *selectionObject = openais_fd_get (lckInstance->ipc_ctx);
saHandleInstancePut (&lckHandleDatabase, lckHandle);
@@ -331,8 +327,6 @@
const SaLckHandleT lckHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
- int poll_fd;
int timeout = 1;
SaLckCallbacksT callbacks;
SaAisErrorT error;
@@ -369,18 +363,9 @@
}
do {
- /*
- * Read data directly from socket
- */
- poll_fd = lckInstance->dispatch_fd;
- ufds.fd = poll_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
+ dispatch_avail = openais_dispatch_recv (lckInstance->ipc_ctx,
+ (void *)&dispatch_data, timeout);
- error = saPollRetry(&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_put;
- }
pthread_mutex_lock(&lckInstance->dispatch_mutex);
if (lckInstance->finalize == 1) {
@@ -388,13 +373,6 @@
goto error_unlock;
}
- if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
- error = SA_AIS_ERR_BAD_HANDLE;
- goto error_unlock;
- }
-
- dispatch_avail = (ufds.revents & POLLIN);
-
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
pthread_mutex_unlock(&lckInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -404,19 +382,6 @@
continue;
}
- memset(&dispatch_data,0, sizeof(struct message_overlay));
- error = saRecvRetry (lckInstance->dispatch_fd, &dispatch_data.header, sizeof \
(mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (lckInstance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
-
/*
* Make copy of callbacks, message data, unlock instance,
* and call callback. A risk of this dispatch method is that
@@ -558,7 +523,6 @@
} while (cont);
error_unlock:
pthread_mutex_unlock(&lckInstance->dispatch_mutex);
-error_put:
saHandleInstancePut(&lckHandleDatabase, lckHandle);
error_exit:
return (error);
@@ -595,16 +559,6 @@
// TODO lckInstanceFinalize (lckInstance);
- if (lckInstance->response_fd != -1) {
- shutdown (lckInstance->response_fd, 0);
- close (lckInstance->response_fd);
- }
-
- if (lckInstance->dispatch_fd != -1) {
- shutdown (lckInstance->dispatch_fd, 0);
- close (lckInstance->dispatch_fd);
- }
-
saHandleInstancePut (&lckHandleDatabase, lckHandle);
return (SA_AIS_OK);
@@ -621,6 +575,7 @@
SaAisErrorT error;
struct lckResourceInstance *lckResourceInstance;
struct lckInstance *lckInstance;
+ struct iovec iov;
struct req_lib_lck_resourceopen req_lib_lck_resourceopen;
struct res_lib_lck_resourceopen res_lib_lck_resourceopen;
@@ -650,7 +605,7 @@
goto error_destroy;
}
- lckResourceInstance->response_fd = lckInstance->response_fd;
+ lckResourceInstance->ipc_ctx = lckInstance->ipc_ctx;
lckResourceInstance->lckHandle = lckHandle;
lckResourceInstance->lckResourceHandle = *lckResourceHandle;
@@ -666,11 +621,15 @@
req_lib_lck_resourceopen.resourceHandle = *lckResourceHandle;
req_lib_lck_resourceopen.async_call = 0;
+ iov.iov_base = &req_lib_lck_resourceopen;
+ iov.iov_len = sizeof (struct req_lib_lck_resourceopen);
+
pthread_mutex_lock (&lckInstance->response_mutex);
- error = saSendReceiveReply (lckResourceInstance->response_fd,
- &req_lib_lck_resourceopen,
- sizeof (struct req_lib_lck_resourceopen),
+ error = openais_msg_send_reply_receive (
+ lckResourceInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourceopen,
sizeof (struct res_lib_lck_resourceopen));
@@ -714,6 +673,7 @@
struct lckResourceInstance *lckResourceInstance;
struct lckInstance *lckInstance;
SaLckResourceHandleT lckResourceHandle;
+ struct iovec iov;
SaAisErrorT error;
struct req_lib_lck_resourceopen req_lib_lck_resourceopen;
struct res_lib_lck_resourceopenasync res_lib_lck_resourceopenasync;
@@ -741,7 +701,7 @@
goto error_destroy;
}
- lckResourceInstance->response_fd = lckInstance->response_fd;
+ lckResourceInstance->ipc_ctx = lckInstance->ipc_ctx;
lckResourceInstance->response_mutex = &lckInstance->response_mutex;
lckResourceInstance->lckHandle = lckHandle;
lckResourceInstance->lckResourceHandle = lckResourceHandle;
@@ -755,11 +715,15 @@
req_lib_lck_resourceopen.resourceHandle = lckResourceHandle;
req_lib_lck_resourceopen.async_call = 1;
+ iov.iov_base = &req_lib_lck_resourceopen;
+ iov.iov_len = sizeof (struct req_lib_lck_resourceopen);
+
pthread_mutex_lock (&lckInstance->response_mutex);
- error = saSendReceiveReply (lckResourceInstance->response_fd,
- &req_lib_lck_resourceopen,
- sizeof (struct req_lib_lck_resourceopen),
+ error = openais_msg_send_reply_receive (
+ lckResourceInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourceopenasync,
sizeof (struct res_lib_lck_resourceopenasync));
@@ -787,6 +751,7 @@
{
struct req_lib_lck_resourceclose req_lib_lck_resourceclose;
struct res_lib_lck_resourceclose res_lib_lck_resourceclose;
+ struct iovec iov;
SaAisErrorT error;
struct lckResourceInstance *lckResourceInstance;
@@ -802,11 +767,15 @@
&lckResourceInstance->lockResourceName, sizeof (SaNameT));
req_lib_lck_resourceclose.resourceHandle = lckResourceHandle;
+ iov.iov_base = &req_lib_lck_resourceclose;
+ iov.iov_len = sizeof (struct req_lib_lck_resourceclose);
+
pthread_mutex_lock (lckResourceInstance->response_mutex);
- error = saSendReceiveReply (lckResourceInstance->response_fd,
- &req_lib_lck_resourceclose,
- sizeof (struct req_lib_lck_resourceclose),
+ error = openais_msg_send_reply_receive (
+ lckResourceInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourceclose,
sizeof (struct res_lib_lck_resourceclose));
@@ -831,11 +800,11 @@
{
struct req_lib_lck_resourcelock req_lib_lck_resourcelock;
struct res_lib_lck_resourcelock res_lib_lck_resourcelock;
+ struct iovec iov;
SaAisErrorT error;
struct lckResourceInstance *lckResourceInstance;
struct lckLockIdInstance *lckLockIdInstance;
- int lock_fd;
- int dummy_fd;
+ void *lock_ctx;
error = saHandleInstanceGet (&lckResourceHandleDatabase, lckResourceHandle,
(void *)&lckResourceInstance);
@@ -855,13 +824,13 @@
goto error_destroy;
}
- error = saServiceConnect (&lock_fd, &dummy_fd, LCK_SERVICE);
+ error = openais_service_connect (LCK_SERVICE, &lock_ctx);
if (error != SA_AIS_OK) { // TODO error handling
goto error_destroy;
}
lckLockIdInstance->response_mutex = lckResourceInstance->response_mutex;
- lckLockIdInstance->response_fd = lckResourceInstance->response_fd;
+ lckLockIdInstance->ipc_ctx = lckResourceInstance->ipc_ctx;
lckLockIdInstance->lckResourceHandle = lckResourceHandle;
req_lib_lck_resourcelock.header.size = sizeof (struct req_lib_lck_resourcelock);
@@ -880,18 +849,18 @@
&lckResourceInstance->source,
sizeof (mar_message_source_t));
+ iov.iov_base = &req_lib_lck_resourcelock;
+ iov.iov_len = sizeof (struct req_lib_lck_resourcelock);
+
/*
* no mutex needed here since its a new connection
*/
- error = saSendReceiveReply (lock_fd,
- &req_lib_lck_resourcelock,
- sizeof (struct req_lib_lck_resourcelock),
+ error = openais_msg_send_reply_receive (lock_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourcelock,
sizeof (struct res_lib_lck_resourcelock));
- close (lock_fd);
- close (dummy_fd);
-
if (error == SA_AIS_OK) {
lckLockIdInstance->resource_lock = res_lib_lck_resourcelock.resource_lock;
*lockStatus = res_lib_lck_resourcelock.lockStatus;
@@ -924,11 +893,11 @@
{
struct req_lib_lck_resourcelock req_lib_lck_resourcelock;
struct res_lib_lck_resourcelockasync res_lib_lck_resourcelockasync;
+ struct iovec iov;
SaAisErrorT error;
struct lckResourceInstance *lckResourceInstance;
struct lckLockIdInstance *lckLockIdInstance;
- int lock_fd;
- int dummy_fd;
+ void *lock_ctx;
error = saHandleInstanceGet (&lckResourceHandleDatabase, lckResourceHandle,
(void *)&lckResourceInstance);
@@ -948,13 +917,13 @@
goto error_destroy;
}
- error = saServiceConnect (&lock_fd, &dummy_fd, LCK_SERVICE);
+ error = openais_service_connect (LCK_SERVICE, &lock_ctx);
if (error != SA_AIS_OK) { // TODO error handling
goto error_destroy;
}
lckLockIdInstance->response_mutex = lckResourceInstance->response_mutex;
- lckLockIdInstance->response_fd = lckResourceInstance->response_fd;
+ lckLockIdInstance->ipc_ctx = lckResourceInstance->ipc_ctx;
lckLockIdInstance->lckResourceHandle = lckResourceHandle;
req_lib_lck_resourcelock.header.size = sizeof (struct req_lib_lck_resourcelock);
@@ -973,18 +942,19 @@
&lckResourceInstance->source,
sizeof (mar_message_source_t));
+ iov.iov_base = &req_lib_lck_resourcelock;
+ iov.iov_len = sizeof (struct req_lib_lck_resourcelock);
+
/*
* no mutex needed here since its a new connection
*/
- error = saSendReceiveReply (lock_fd,
- &req_lib_lck_resourcelock,
- sizeof (struct req_lib_lck_resourcelock),
+ error = openais_msg_send_reply_receive (
+ lock_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourcelockasync,
sizeof (struct res_lib_lck_resourcelock));
- close (lock_fd);
- close (dummy_fd);
-
if (error == SA_AIS_OK) {
return (res_lib_lck_resourcelockasync.header.error);
}
@@ -1009,6 +979,7 @@
{
struct req_lib_lck_resourceunlock req_lib_lck_resourceunlock;
struct res_lib_lck_resourceunlock res_lib_lck_resourceunlock;
+ struct iovec iov;
SaAisErrorT error;
struct lckLockIdInstance *lckLockIdInstance;
struct lckResourceInstance *lckResourceInstance;
@@ -1043,11 +1014,15 @@
req_lib_lck_resourceunlock.async_call = 0;
req_lib_lck_resourceunlock.resource_lock = lckLockIdInstance->resource_lock;
+ iov.iov_base = &req_lib_lck_resourceunlock;
+ iov.iov_len = sizeof (struct req_lib_lck_resourceunlock);
+
pthread_mutex_lock (lckLockIdInstance->response_mutex);
- error = saSendReceiveReply (lckLockIdInstance->response_fd,
- &req_lib_lck_resourceunlock,
- sizeof (struct req_lib_lck_resourceunlock),
+ error = openais_msg_send_reply_receive (
+ lckLockIdInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourceunlock,
sizeof (struct res_lib_lck_resourceunlock));
@@ -1067,6 +1042,7 @@
{
struct req_lib_lck_resourceunlock req_lib_lck_resourceunlock;
struct res_lib_lck_resourceunlockasync res_lib_lck_resourceunlockasync;
+ struct iovec iov;
SaAisErrorT error;
struct lckLockIdInstance *lckLockIdInstance;
struct lckResourceInstance *lckResourceInstance;
@@ -1103,11 +1079,14 @@
req_lib_lck_resourceunlock.lockId = lockId;
req_lib_lck_resourceunlock.async_call = 1;
+ iov.iov_base = &req_lib_lck_resourceunlock;
+ iov.iov_len = sizeof (struct req_lib_lck_resourceunlock);
+
pthread_mutex_lock (lckLockIdInstance->response_mutex);
- error = saSendReceiveReply (lckLockIdInstance->response_fd,
- &req_lib_lck_resourceunlock,
- sizeof (struct req_lib_lck_resourceunlock),
+ error = openais_msg_send_reply_receive (lckLockIdInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_resourceunlockasync,
sizeof (struct res_lib_lck_resourceunlockasync));
@@ -1124,6 +1103,7 @@
{
struct req_lib_lck_lockpurge req_lib_lck_lockpurge;
struct res_lib_lck_lockpurge res_lib_lck_lockpurge;
+ struct iovec iov;
SaAisErrorT error;
struct lckResourceInstance *lckResourceInstance;
@@ -1138,11 +1118,14 @@
memcpy (&req_lib_lck_lockpurge.lockResourceName,
&lckResourceInstance->lockResourceName, sizeof (SaNameT));
+ iov.iov_base = &req_lib_lck_lockpurge;
+ iov.iov_len = sizeof (struct req_lib_lck_lockpurge);
+
pthread_mutex_lock (lckResourceInstance->response_mutex);
- error = saSendReceiveReply (lckResourceInstance->response_fd,
- &req_lib_lck_lockpurge,
- sizeof (struct req_lib_lck_lockpurge),
+ error = openais_msg_send_reply_receive (lckResourceInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_lck_lockpurge,
sizeof (struct res_lib_lck_lockpurge));
Index: lib/cpg.c
===================================================================
--- lib/cpg.c (revision 1687)
+++ lib/cpg.c (working copy)
@@ -52,10 +52,8 @@
#include "util.h"
struct cpg_inst {
- int response_fd;
- int dispatch_fd;
+ void *ipc_segment;
int finalize;
- cpg_flow_control_state_t flow_control_state;
cpg_callbacks_t callbacks;
void *context;
pthread_mutex_t response_mutex;
@@ -107,8 +105,7 @@
goto error_destroy;
}
- error = saServiceConnect (&cpg_inst->response_fd, &cpg_inst->dispatch_fd,
- CPG_SERVICE);
+ error = openais_service_connect (CPG_SERVICE, &cpg_inst->ipc_segment);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -155,21 +152,12 @@
cpg_inst->finalize = 1;
+ openais_service_disconnect (cpg_inst->ipc_segment);
+
pthread_mutex_unlock (&cpg_inst->response_mutex);
saHandleDestroy (&cpg_handle_t_db, handle);
- /*
- * Disconnect from the server
- */
- if (cpg_inst->response_fd != -1) {
- shutdown(cpg_inst->response_fd, 0);
- close(cpg_inst->response_fd);
- }
- if (cpg_inst->dispatch_fd != -1) {
- shutdown(cpg_inst->dispatch_fd, 0);
- close(cpg_inst->dispatch_fd);
- }
saHandleInstancePut (&cpg_handle_t_db, handle);
return (CPG_OK);
@@ -187,7 +175,7 @@
return (error);
}
- *fd = cpg_inst->dispatch_fd;
+ *fd = openais_fd_get (cpg_inst->ipc_segment);
saHandleInstancePut (&cpg_handle_t_db, handle);
@@ -241,7 +229,6 @@
cpg_handle_t handle,
cpg_dispatch_t dispatch_types)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int cont = 1; /* always continue do loop except when set to 0 */
@@ -274,35 +261,13 @@
}
do {
- ufds.fd = cpg_inst->dispatch_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
pthread_mutex_lock (&cpg_inst->dispatch_mutex);
- /*
- * Regather poll data in case ufds has changed since taking lock
- */
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
+ dispatch_avail = openais_dispatch_recv (cpg_inst->ipc_segment, (void \
*)&dispatch_data, timeout);
- /*
- * Handle has been finalized in another thread
- */
- if (cpg_inst->finalize == 1) {
- error = CPG_OK;
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- goto error_unlock;
- }
+ pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -311,37 +276,21 @@
pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
continue; /* next poll */
}
-
- if (ufds.revents & POLLIN) {
- /*
- * Queue empty, read response from socket
- */
- error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.header,
- sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
+ if (dispatch_avail == -1) {
+ if (cpg_inst->finalize == 1) {
+ error = SA_AIS_OK;
+ } else {
+ error = SA_AIS_ERR_LIBRARY;
}
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
-
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
- } else {
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- continue;
+ goto error_put;
}
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
* operate at the same time that cpgFinalize has been called.
- */
+ */
memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
-
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
/*
* Dispatch incoming message
*/
@@ -349,7 +298,6 @@
case MESSAGE_RES_CPG_DELIVER_CALLBACK:
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
- cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
@@ -398,7 +346,7 @@
default:
error = SA_AIS_ERR_LIBRARY;
- goto error_nounlock;
+ goto error_put;
break;
}
@@ -423,9 +371,8 @@
}
} while (cont);
-error_unlock:
+error_put:
saHandleInstancePut (&cpg_handle_t_db, handle);
-error_nounlock:
return (error);
}
@@ -457,7 +404,7 @@
iov[0].iov_base = &req_lib_cpg_trackstart;
iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, iov, 1,
&res_lib_cpg_trackstart, sizeof (struct res_lib_cpg_trackstart));
if (error != SA_AIS_OK) {
@@ -475,7 +422,7 @@
iov[0].iov_base = &req_lib_cpg_join;
iov[0].iov_len = sizeof (struct req_lib_cpg_join);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, iov, 1,
&res_lib_cpg_join, sizeof (struct res_lib_cpg_join));
pthread_mutex_unlock (&cpg_inst->response_mutex);
@@ -518,7 +465,7 @@
pthread_mutex_lock (&cpg_inst->response_mutex);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, iov, 1,
&res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
pthread_mutex_unlock (&cpg_inst->response_mutex);
@@ -570,8 +517,8 @@
pthread_mutex_lock (&cpg_inst->response_mutex);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1,
- &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, iov,
+ iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
pthread_mutex_unlock (&cpg_inst->response_mutex);
@@ -579,7 +526,6 @@
goto error_exit;
}
- cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
error = res_lib_cpg_mcast.header.error;
error_exit:
@@ -616,7 +562,7 @@
pthread_mutex_lock (&cpg_inst->response_mutex);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, &iov, 1,
&res_lib_cpg_membership_get, sizeof (mar_res_header_t));
pthread_mutex_unlock (&cpg_inst->response_mutex);
@@ -667,7 +613,7 @@
pthread_mutex_lock (&cpg_inst->response_mutex);
- error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
+ error = openais_msg_send_reply_receive (cpg_inst->ipc_segment, &iov, 1,
&res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
pthread_mutex_unlock (&cpg_inst->response_mutex);
@@ -698,7 +644,7 @@
return (error);
}
- *flow_control_state = cpg_inst->flow_control_state;
+ *flow_control_state = openais_dispatch_flow_control_get (cpg_inst->ipc_segment);
saHandleInstancePut (&cpg_handle_t_db, handle);
Index: lib/clm.c
===================================================================
--- lib/clm.c (revision 1687)
+++ lib/clm.c (working copy)
@@ -61,8 +61,7 @@
};
struct clmInstance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
SaClmCallbacksT callbacks;
int finalize;
pthread_mutex_t response_mutex;
@@ -170,12 +169,7 @@
goto error_destroy;
}
- clmInstance->response_fd = -1;
-
- clmInstance->dispatch_fd = -1;
-
- error = saServiceConnect (&clmInstance->response_fd,
- &clmInstance->dispatch_fd, CLM_SERVICE);
+ error = openais_service_connect (CLM_SERVICE, &clmInstance->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -241,7 +235,7 @@
return (error);
}
- *selectionObject = clmInstance->dispatch_fd;
+ *selectionObject = openais_fd_get (clmInstance->ipc_ctx);
saHandleInstancePut (&clmHandleDatabase, clmHandle);
return (SA_AIS_OK);
@@ -268,7 +262,6 @@
SaClmHandleT clmHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int cont = 1; /* always continue do loop except when set to 0 */
@@ -306,65 +299,35 @@
}
do {
- ufds.fd = clmInstance->dispatch_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
-
pthread_mutex_lock (&clmInstance->dispatch_mutex);
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
+ dispatch_avail = openais_dispatch_recv (clmInstance->ipc_ctx,
+ (void *)&dispatch_data, timeout);
- /*
- * Handle has been finalized in another thread
- */
- if (clmInstance->finalize == 1) {
- error = SA_AIS_OK;
- goto error_unlock;
- }
+ pthread_mutex_unlock (&clmInstance->dispatch_mutex);
- if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
- error = SA_AIS_ERR_BAD_HANDLE;
- goto error_unlock;
- }
-
- dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
- pthread_mutex_unlock (&clmInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
- pthread_mutex_unlock (&clmInstance->dispatch_mutex);
continue; /* next poll */
}
- if (ufds.revents & POLLIN) {
- error = saRecvRetry (clmInstance->dispatch_fd, &dispatch_data.header,
- sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
+ if (dispatch_avail == -1) {
+ if (clmInstance->finalize == 1) {
+ error = SA_AIS_OK;
+ } else {
+ error = SA_AIS_ERR_LIBRARY;
}
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (clmInstance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
- } else {
- pthread_mutex_unlock (&clmInstance->dispatch_mutex);
- continue;
+ goto error_put;
}
-
+
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
* operate at the same time that clmFinalize has been called in another thread.
*/
memcpy (&callbacks, &clmInstance->callbacks, sizeof (SaClmCallbacksT));
- pthread_mutex_unlock (&clmInstance->dispatch_mutex);
/*
* Dispatch incoming message
@@ -439,9 +402,6 @@
goto error_put;
-error_unlock:
- pthread_mutex_unlock (&clmInstance->dispatch_mutex);
-
error_put:
saHandleInstancePut (&clmHandleDatabase, clmHandle);
return (error);
@@ -497,19 +457,12 @@
clmInstance->finalize = 1;
+ openais_service_disconnect (clmInstance->ipc_ctx);
+
pthread_mutex_unlock (&clmInstance->response_mutex);
saHandleDestroy (&clmHandleDatabase, clmHandle);
- if (clmInstance->response_fd != -1) {
- shutdown (clmInstance->response_fd, 0);
- close (clmInstance->response_fd);
- }
- if (clmInstance->dispatch_fd != -1) {
- shutdown (clmInstance->dispatch_fd, 0);
- close (clmInstance->dispatch_fd);
- }
-
saHandleInstancePut (&clmHandleDatabase, clmHandle);
return (error);
@@ -527,6 +480,7 @@
SaAisErrorT error = SA_AIS_OK;
int items_to_copy;
unsigned int i;
+ struct iovec iov;
error = saHandleInstanceGet (&clmHandleDatabase, clmHandle,
(void *)&clmInstance);
@@ -560,6 +514,9 @@
req_lib_clm_clustertrack.return_in_callback = 1;
}
+ iov.iov_base = &req_lib_clm_clustertrack;
+ iov.iov_len = sizeof (req_lib_clm_clustertrack);;
+
pthread_mutex_lock (&clmInstance->response_mutex);
if ((clmInstance->callbacks.saClmClusterTrackCallback == 0) &&
@@ -570,9 +527,9 @@
goto error_exit;
}
- error = saSendReceiveReply (clmInstance->response_fd,
- &req_lib_clm_clustertrack,
- sizeof (struct req_lib_clm_clustertrack),
+ error = openais_msg_send_reply_receive (clmInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_clm_clustertrack,
sizeof (struct res_lib_clm_clustertrack));
@@ -620,6 +577,7 @@
struct req_lib_clm_trackstop req_lib_clm_trackstop;
struct res_lib_clm_trackstop res_lib_clm_trackstop;
SaAisErrorT error = SA_AIS_OK;
+ struct iovec iov;
req_lib_clm_trackstop.header.size = sizeof (struct req_lib_clm_trackstop);
req_lib_clm_trackstop.header.id = MESSAGE_REQ_CLM_TRACKSTOP;
@@ -629,11 +587,14 @@
return (error);
}
+ iov.iov_base = &req_lib_clm_trackstop;
+ iov.iov_len = sizeof (struct req_lib_clm_trackstop);
+
pthread_mutex_lock (&clmInstance->response_mutex);
- error = saSendReceiveReply (clmInstance->response_fd,
- &req_lib_clm_trackstop,
- sizeof (struct req_lib_clm_trackstop),
+ error = openais_msg_send_reply_receive (clmInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_clm_trackstop,
sizeof (struct res_lib_clm_trackstop));
@@ -655,6 +616,7 @@
struct req_lib_clm_nodeget req_lib_clm_nodeget;
struct res_clm_nodeget res_clm_nodeget;
SaAisErrorT error = SA_AIS_OK;
+ struct iovec iov;
if (clusterNode == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -670,6 +632,9 @@
return (error);
}
+ iov.iov_base = &req_lib_clm_nodeget,
+ iov.iov_len = sizeof (struct req_lib_clm_nodeget),
+
pthread_mutex_lock (&clmInstance->response_mutex);
/*
@@ -679,8 +644,11 @@
req_lib_clm_nodeget.header.id = MESSAGE_REQ_CLM_NODEGET;
req_lib_clm_nodeget.node_id = nodeId;
- error = saSendReceiveReply (clmInstance->response_fd, &req_lib_clm_nodeget,
- sizeof (struct req_lib_clm_nodeget), &res_clm_nodeget, sizeof (res_clm_nodeget));
+ error = openais_msg_send_reply_receive (clmInstance->ipc_ctx,
+ &iov,
+ 1,
+ &res_clm_nodeget,
+ sizeof (res_clm_nodeget));
if (error != SA_AIS_OK) {
goto error_exit;
}
@@ -707,6 +675,7 @@
struct clmInstance *clmInstance;
struct req_lib_clm_nodegetasync req_lib_clm_nodegetasync;
struct res_clm_nodegetasync res_clm_nodegetasync;
+ struct iovec iov;
SaAisErrorT error = SA_AIS_OK;
req_lib_clm_nodegetasync.header.size = sizeof (struct req_lib_clm_nodegetasync);
@@ -727,9 +696,15 @@
goto error_exit;
}
- error = saSendReceiveReply (clmInstance->response_fd, &req_lib_clm_nodegetasync,
- sizeof (struct req_lib_clm_nodegetasync),
- &res_clm_nodegetasync, sizeof (struct res_clm_nodegetasync));
+ iov.iov_base = &req_lib_clm_nodegetasync;
+ iov.iov_len = sizeof (req_lib_clm_nodegetasync);
+
+ error = openais_msg_send_reply_receive (
+ clmInstance->ipc_ctx,
+ &iov,
+ 1,
+ &res_clm_nodegetasync,
+ sizeof (struct res_clm_nodegetasync));
if (error != SA_AIS_OK) {
goto error_exit;
}
Index: lib/util.c
===================================================================
--- lib/util.c (revision 1687)
+++ lib/util.c (working copy)
@@ -2,7 +2,7 @@
* vi: set autoindent tabstop=4 shiftwidth=4 :
*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
- * Copyright (c) 2006-2007 Red Hat, Inc.
+ * Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
@@ -52,6 +52,8 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <assert.h>
+#include <sys/shm.h>
+#include <sys/sem.h>
#include "../include/saAis.h"
#include "../include/ipc_gen.h"
@@ -90,189 +92,32 @@
}
#endif
-SaAisErrorT
-saServiceConnect (
- int *responseOut,
- int *callbackOut,
- enum service_types service)
-{
- int responseFD;
- int callbackFD;
- int result;
- struct sockaddr_un address;
- mar_req_lib_response_init_t req_lib_response_init;
- mar_res_lib_response_init_t res_lib_response_init;
- mar_req_lib_dispatch_init_t req_lib_dispatch_init;
- mar_res_lib_dispatch_init_t res_lib_dispatch_init;
- SaAisErrorT error;
- gid_t egid;
+#define REQ_SIZE 1000000
+#define RES_SIZE 1000000
+#define DISPATCH_SIZE 1000000
- /*
- * Allow set group id binaries to be authenticated
- */
- egid = getegid();
- setregid (egid, -1);
+struct shared_memory {
+ unsigned char req_buffer[REQ_SIZE];
+ unsigned char res_buffer[RES_SIZE];
+ unsigned char dispatch_buffer[DISPATCH_SIZE];
+ unsigned int read;
+ unsigned int write;
+};
- memset (&address, 0, sizeof (struct sockaddr_un));
-#if defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
- address.sun_len = sizeof(struct sockaddr_un);
-#endif
- address.sun_family = PF_UNIX;
-#if defined(OPENAIS_LINUX)
- strcpy (address.sun_path + 1, socketname);
-#else
- strcpy (address.sun_path, socketname);
-#endif
- responseFD = socket (PF_UNIX, SOCK_STREAM, 0);
- if (responseFD == -1) {
- return (SA_AIS_ERR_NO_RESOURCES);
- }
+struct ipc_segment {
+ int fd;
+ int shmid;
+ int semid;
+ int flow_control_state;
+ struct shared_memory *shared_memory;
+};
- socket_nosigpipe (responseFD);
-
- result = connect (responseFD, (struct sockaddr *)&address, AIS_SUN_LEN(&address));
- if (result == -1) {
- close (responseFD);
- return (SA_AIS_ERR_TRY_AGAIN);
- }
-
- req_lib_response_init.resdis_header.size = sizeof (req_lib_response_init);
- req_lib_response_init.resdis_header.id = MESSAGE_REQ_RESPONSE_INIT;
- req_lib_response_init.resdis_header.service = service;
-
- error = saSendRetry (responseFD, &req_lib_response_init,
- sizeof (mar_req_lib_response_init_t));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
- error = saRecvRetry (responseFD, &res_lib_response_init,
- sizeof (mar_res_lib_response_init_t));
- if (error != SA_AIS_OK) {
- goto error_exit;
- }
-
- /*
- * Check for security errors
- */
- if (res_lib_response_init.header.error != SA_AIS_OK) {
- error = res_lib_response_init.header.error;
- goto error_exit;
- }
-
- *responseOut = responseFD;
-
-/* if I comment out the 4 lines below the executive crashes */
- callbackFD = socket (PF_UNIX, SOCK_STREAM, 0);
- if (callbackFD == -1) {
- close (responseFD);
- return (SA_AIS_ERR_NO_RESOURCES);
- }
-
- socket_nosigpipe (callbackFD);
-
- result = connect (callbackFD, (struct sockaddr *)&address, AIS_SUN_LEN(&address));
- if (result == -1) {
- close (callbackFD);
- close (responseFD);
- return (SA_AIS_ERR_TRY_AGAIN);
- }
-
- req_lib_dispatch_init.resdis_header.size = sizeof (req_lib_dispatch_init);
- req_lib_dispatch_init.resdis_header.id = MESSAGE_REQ_DISPATCH_INIT;
- req_lib_dispatch_init.resdis_header.service = service;
-
- req_lib_dispatch_init.conn_info = res_lib_response_init.conn_info;
-
- error = saSendRetry (callbackFD, &req_lib_dispatch_init,
- sizeof (mar_req_lib_dispatch_init_t));
- if (error != SA_AIS_OK) {
- goto error_exit_two;
- }
- error = saRecvRetry (callbackFD, &res_lib_dispatch_init,
- sizeof (mar_res_lib_dispatch_init_t));
- if (error != SA_AIS_OK) {
- goto error_exit_two;
- }
-
- /*
- * Check for security errors
- */
- if (res_lib_dispatch_init.header.error != SA_AIS_OK) {
- error = res_lib_dispatch_init.header.error;
- goto error_exit;
- }
-
- *callbackOut = callbackFD;
- return (SA_AIS_OK);
-
-error_exit_two:
- close (callbackFD);
-error_exit:
- close (responseFD);
- return (error);
-}
-
-SaAisErrorT
-saRecvRetry (
+static int
+openais_send (
int s,
- void *msg,
- size_t len)
-{
- SaAisErrorT error = SA_AIS_OK;
- int result;
- struct msghdr msg_recv;
- struct iovec iov_recv;
- char *rbuf = (char *)msg;
- int processed = 0;
-
- msg_recv.msg_iov = &iov_recv;
- msg_recv.msg_iovlen = 1;
- msg_recv.msg_name = 0;
- msg_recv.msg_namelen = 0;
- msg_recv.msg_control = 0;
- msg_recv.msg_controllen = 0;
- msg_recv.msg_flags = 0;
-
-retry_recv:
- iov_recv.iov_base = (void *)&rbuf[processed];
- iov_recv.iov_len = len - processed;
-
- result = recvmsg (s, &msg_recv, MSG_NOSIGNAL);
- if (result == -1 && errno == EINTR) {
- goto retry_recv;
- }
- if (result == -1 && errno == EAGAIN) {
- goto retry_recv;
- }
-#if defined(OPENAIS_SOLARIS) || defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
- /* On many OS poll never return POLLHUP or POLLERR.
- * EOF is detected when recvmsg return 0.
- */
- if (result == 0) {
- error = SA_AIS_ERR_LIBRARY;
- goto error_exit;
- }
-#endif
- if (result == -1 || result == 0) {
- error = SA_AIS_ERR_LIBRARY;
- goto error_exit;
- }
- processed += result;
- if (processed != len) {
- goto retry_recv;
- }
- assert (processed == len);
-error_exit:
- return (error);
-}
-
-SaAisErrorT
-saSendRetry (
- int s,
const void *msg,
size_t len)
{
- SaAisErrorT error = SA_AIS_OK;
int result;
struct msghdr msg_send;
struct iovec iov_send;
@@ -299,15 +144,12 @@
*/
if (result == -1 && processed == 0) {
if (errno == EINTR) {
- error = SA_AIS_ERR_TRY_AGAIN;
goto error_exit;
}
if (errno == EAGAIN) {
- error = SA_AIS_ERR_TRY_AGAIN;
goto error_exit;
}
if (errno == EFAULT) {
- error = SA_AIS_ERR_INVALID_PARAM;
goto error_exit;
}
}
@@ -324,7 +166,6 @@
goto retry_send;
}
if (errno == EFAULT) {
- error = SA_AIS_ERR_LIBRARY;
goto error_exit;
}
}
@@ -333,7 +174,6 @@
* return ERR_LIBRARY on any other syscall error
*/
if (result == -1) {
- error = SA_AIS_ERR_LIBRARY;
goto error_exit;
}
@@ -342,181 +182,411 @@
goto retry_send;
}
+ return (0);
+
error_exit:
- return (error);
+ return (-1);
}
-SaAisErrorT saSendMsgRetry (
- int s,
- struct iovec *iov,
- int iov_len)
+
+SaAisErrorT
+openais_service_connect (
+ enum service_types service,
+ void **shmseg)
{
- SaAisErrorT error = SA_AIS_OK;
- int result;
- int total_size = 0;
- int i;
- int csize;
- int csize_cntr;
- int total_sent = 0;
- int iov_len_sendmsg = iov_len;
- struct iovec *iov_sendmsg = iov;
- struct iovec iovec_save;
- int iovec_saved_position = -1;
+ int request_fd;
+ struct sockaddr_un address;
+ SaAisErrorT error;
+ struct ipc_segment *ipc_segment;
+ key_t shmkey;
+ key_t semkey;
+ int res;
+ mar_req_setup_t req_setup;
- struct msghdr msg_send;
+ request_fd = socket (PF_UNIX, SOCK_STREAM, 0);
+ if (request_fd == -1) {
+ return (-1);
+ }
- for (i = 0; i < iov_len; i++) {
- total_size += iov[i].iov_len;
+ memset (&address, 0, sizeof (struct sockaddr_un));
+#if defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
+ address.sun_len = sizeof(struct sockaddr_un);
+#endif
+ address.sun_family = PF_UNIX;
+#if defined(OPENAIS_LINUX)
+ strcpy (address.sun_path + 1, socketname);
+#else
+ strcpy (address.sun_path, socketname);
+#endif
+ res = connect (request_fd, (struct sockaddr *)&address,
+ AIS_SUN_LEN(&address));
+ if (res == -1) {
+ close (request_fd);
+ return (SA_AIS_ERR_TRY_AGAIN);
}
- msg_send.msg_iov = iov_sendmsg;
- msg_send.msg_iovlen = iov_len_sendmsg;
- msg_send.msg_name = 0;
- msg_send.msg_namelen = 0;
- msg_send.msg_control = 0;
- msg_send.msg_controllen = 0;
- msg_send.msg_flags = 0;
-retry_sendmsg:
- result = sendmsg (s, &msg_send, MSG_NOSIGNAL);
+ ipc_segment = malloc (sizeof (struct ipc_segment));
+ if (ipc_segment == NULL) {
+ close (request_fd);
+ return (-1);
+ }
+
/*
- * Can't send now, and message not committed, so don't retry send
+ * Allocate a shared memory segment
*/
- if (result == -1 && iovec_saved_position == -1) {
- if (errno == EINTR) {
- error = SA_AIS_ERR_TRY_AGAIN;
- goto error_exit;
+ do {
+ shmkey = random();
+ ipc_segment->shmid = shmget (shmkey, sizeof (struct shared_memory),
+ IPC_CREAT|IPC_EXCL|0600);
+ if (ipc_segment->shmid == -1 && errno == EEXIST) {
+ continue;
}
- if (errno == EAGAIN) {
- error = SA_AIS_ERR_TRY_AGAIN;
- goto error_exit;
+ } while (ipc_segment->shmid == -1);
+
+ /*
+ * Allocate a semaphore
+ */
+ do {
+ semkey = random();
+ ipc_segment->semid = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600);
+ if (ipc_segment->shmid == -1 && errno == EEXIST) {
+ continue;
}
- if (errno == EFAULT) {
- error = SA_AIS_ERR_INVALID_PARAM;
- goto error_exit;
- }
- }
+ } while (ipc_segment->semid == -1);
/*
- * Retry (and block) if portion of message has already been written
+ * Attach to shared memory segment
*/
- if (result == -1 && iovec_saved_position != -1) {
- if (errno == EINTR) {
- goto retry_sendmsg;
- }
- if (errno == EAGAIN) {
- goto retry_sendmsg;
- }
- if (errno == EFAULT) {
- error = SA_AIS_ERR_LIBRARY;
- goto error_exit;
- }
+ ipc_segment->shared_memory = shmat (ipc_segment->shmid, NULL, 0);
+ if (ipc_segment->shared_memory == (void *)-1) {
+ close (request_fd);
+ return (-1);
}
+ res = semctl (ipc_segment->semid, 0, SETVAL, 0);
+
+ res = semctl (ipc_segment->semid, 1, SETVAL, 0);
+
+ req_setup.shmkey = shmkey;
+ req_setup.semkey = semkey;
+ req_setup.service = service;
+
+ error = openais_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
+ if (error != 0) {
+ goto error_exit;
+ }
+
+ ipc_segment->fd = request_fd;
+ ipc_segment->flow_control_state = 0;
+ *shmseg = ipc_segment;
+
+ return (SA_AIS_OK);
+
+error_exit:
+ return (-1);
+}
+
+SaAisErrorT
+openais_service_disconnect (
+ void *ipc_context)
+{
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+
+ shutdown (ipc_segment->fd, SHUT_RDWR);
+ close (ipc_segment->fd);
+ shmdt (ipc_segment->shared_memory);
+ free (ipc_segment);
+ return (SA_AIS_OK);
+}
+
+int
+openais_dispatch_flow_control_get (
+ void *ipc_context)
+{
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+
+ return (ipc_segment->flow_control_state);
+}
+
+
+int
+openais_fd_get (void *ipc_ctx)
+{
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+
+ return (ipc_segment->fd);
+}
+
+static void memcpy_swrap (
+ void *dest, void *src, int len, unsigned int *read)
+{
+ char *dest_chr = (char *)dest;
+ char *src_chr = (char *)src;
+
+ unsigned int first_read;
+ unsigned int second_read;
+
+ first_read = len;
+ second_read = 0;
+
+ if (len + *read >= DISPATCH_SIZE) {
+ first_read = DISPATCH_SIZE - *read;
+ second_read = (len + *read) % DISPATCH_SIZE;
+ }
+ memcpy (dest_chr, &src_chr[*read], first_read);
+ if (second_read) {
+ memcpy (&dest_chr[first_read], src_chr,
+ second_read);
+ }
+ *read = (*read + len) % (DISPATCH_SIZE);
+}
+int original_flow = -1;
+
+int
+openais_dispatch_recv (void *ipc_ctx, void *data, int timeout)
+{
+ struct pollfd ufds;
+ struct sembuf sop;
+ int poll_events;
+ mar_res_header_t *header;
+ char buf;
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+ int res;
+ unsigned int my_read;
+ char buf_two = 1;
+
+ ufds.fd = ipc_segment->fd;
+ ufds.events = POLLIN;
+ ufds.revents = 0;
+
+retry_poll:
+ poll_events = poll (&ufds, 1, timeout);
+ if (poll_events == -1 && errno == EINTR) {
+ goto retry_poll;
+ } else
+ if (poll_events == -1) {
+ return (-1);
+ } else
+ if (poll_events == 0) {
+ return (0);
+ }
+ if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
+printf ("returning here\n");
+ return (-1);
+ }
+retry_recv:
+ res = recv (ipc_segment->fd, &buf, 1, 0);
+ if (res == -1 && errno == EINTR) {
+ goto retry_recv;
+ } else
+ if (res == -1) {
+ return (-1);
+ }
+ if (res == 0) {
+ return (SA_AIS_ERR_LIBRARY);
+ }
+ ipc_segment->flow_control_state = 0;
+ if (buf == 1 || buf == 2) {
+ ipc_segment->flow_control_state = 1;
+ }
/*
- * ERR_LIBRARY for any other syscall error
+ * Notify executive to flush any pending dispatch messages
*/
- if (result == -1) {
- error = SA_AIS_ERR_LIBRARY;
- goto error_exit;
+ if (ipc_segment->flow_control_state) {
+ buf_two = 1;
+ res = openais_send (ipc_segment->fd, &buf_two, 1);
+ assert (res == 0);
}
+ /*
+ * This is just a notification of flow control starting at the addition
+ * of a new pending message, not a message to dispatch
+ */
+ if (buf == 2) {
+ return (SA_AIS_OK);
+ }
+ if (buf == 3) {
+ return (SA_AIS_OK);
+ }
- if (iovec_saved_position != -1) {
- memcpy (&iov[iovec_saved_position], &iovec_save, sizeof (struct iovec));
+ sop.sem_num = 2;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (ipc_segment->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (-1);
}
+
+ if (ipc_segment->shared_memory->read + sizeof (mar_res_header_t) >= DISPATCH_SIZE) \
{ + my_read = ipc_segment->shared_memory->read;
+ memcpy_swrap (data,
+ ipc_segment->shared_memory->dispatch_buffer,
+ sizeof (mar_res_header_t),
+ &ipc_segment->shared_memory->read);
+ header = (mar_res_header_t *)data;
+ memcpy_swrap (
+ data + sizeof (mar_res_header_t),
+ ipc_segment->shared_memory->dispatch_buffer,
+ header->size - sizeof (mar_res_header_t),
+ &ipc_segment->shared_memory->read);
+ } else {
+ header = (mar_res_header_t \
*)&ipc_segment->shared_memory->dispatch_buffer[ipc_segment->shared_memory->read]; \
+ memcpy_swrap ( + data,
+ ipc_segment->shared_memory->dispatch_buffer,
+ header->size,
+ &ipc_segment->shared_memory->read);
+ }
- total_sent += result;
- if (total_sent != total_size) {
- for (i = 0, csize = 0, csize_cntr = 0; i < iov_len; i++) {
- csize += iov[i].iov_len;
- if (csize > total_sent) {
- break;
- }
+ return (1);
+}
- csize_cntr += iov[i].iov_len;
- }
- memcpy (&iovec_save, &iov[i], sizeof (struct iovec));
- iovec_saved_position = i;
- iov[i].iov_base = ((unsigned char *)(iov[i].iov_base)) +
- (total_sent - csize_cntr);
- iov[i].iov_len = total_size - total_sent;
- msg_send.msg_iov = &iov[i];
- msg_send.msg_iovlen = iov_len - i;
+static SaAisErrorT
+openais_msg_send (
+ void *ipc_context,
+ struct iovec *iov,
+ int iov_len)
+{
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+ struct sembuf sop;
+ int i;
+ int res;
+ int req_buffer_idx = 0;
- goto retry_sendmsg;
+ for (i = 0; i < iov_len; i++) {
+ memcpy (&ipc_segment->shared_memory->req_buffer[req_buffer_idx],
+ iov[i].iov_base,
+ iov[i].iov_len);
+ req_buffer_idx += iov[i].iov_len;
}
+ /*
+ * Signal semaphore #0 indicting a new message from client
+ * to server request queue
+ */
+ sop.sem_num = 0;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
-error_exit:
- return (error);
+retry_semop:
+ res = semop (ipc_segment->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (SA_AIS_ERR_LIBRARY);
+ }
+ return (SA_AIS_OK);
}
-SaAisErrorT saSendMsgReceiveReply (
- int s,
- struct iovec *iov,
- int iov_len,
- void *responseMessage,
- int responseLen)
+static SaAisErrorT
+openais_reply_receive (
+ void *ipc_context,
+ void *res_msg, int res_len)
{
- SaAisErrorT error = SA_AIS_OK;
+ struct sembuf sop;
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+ unsigned int res;
- error = saSendMsgRetry (s, iov, iov_len);
- if (error != SA_AIS_OK) {
- goto error_exit;
+ /*
+ * Wait for semaphore #1 indicating a new message from server
+ * to client in the response queue
+ */
+ sop.sem_num = 1;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (ipc_segment->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (SA_AIS_ERR_LIBRARY);
}
-
- error = saRecvRetry (s, responseMessage, responseLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
+
+ memcpy (res_msg, ipc_segment->shared_memory->res_buffer, res_len);
+ return (SA_AIS_OK);
+}
+
+static SaAisErrorT
+openais_reply_receive_in_buf (
+ void *ipc_context,
+ void **res_msg)
+{
+ struct sembuf sop;
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+ int res;
+
+ /*
+ * Wait for semaphore #1 indicating a new message from server
+ * to client in the response queue
+ */
+ sop.sem_num = 1;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ semop (ipc_segment->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (SA_AIS_ERR_LIBRARY);
}
-error_exit:
- return (error);
+ *res_msg = (char *)ipc_segment->shared_memory->res_buffer;
+ return (SA_AIS_OK);
}
-SaAisErrorT saSendReceiveReply (
- int s,
- void *requestMessage,
- int requestLen,
- void *responseMessage,
- int responseLen)
+SaAisErrorT
+openais_msg_send_reply_receive (
+ void *ipc_context,
+ struct iovec *iov,
+ int iov_len,
+ void *res_msg,
+ int res_len)
{
- SaAisErrorT error = SA_AIS_OK;
+ SaAisErrorT res;
- error = saSendRetry (s, requestMessage, requestLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
+ res = openais_msg_send (ipc_context, iov, iov_len);
+ if (res != SA_AIS_OK) {
+ return (res);
}
-
- error = saRecvRetry (s, responseMessage, responseLen);
- if (error != SA_AIS_OK) {
- goto error_exit;
+
+ res = openais_reply_receive (ipc_context, res_msg, res_len);
+ if (res != SA_AIS_OK) {
+ return (res);
}
-error_exit:
- return (error);
+ return (SA_AIS_OK);
}
SaAisErrorT
-saPollRetry (
- struct pollfd *ufds,
- unsigned int nfds,
- int timeout)
+openais_msg_send_reply_receive_in_buf (
+ void *ipc_context,
+ struct iovec *iov,
+ int iov_len,
+ void **res_msg)
{
- SaAisErrorT error = SA_AIS_OK;
- int result;
+ unsigned int res;
-retry_poll:
- result = poll (ufds, nfds, timeout);
- if (result == -1 && errno == EINTR) {
- goto retry_poll;
+ res = openais_msg_send (ipc_context, iov, iov_len);
+ if (res != SA_AIS_OK) {
+ return (res);
}
- if (result == -1) {
- error = SA_AIS_ERR_LIBRARY;
+
+ res = openais_reply_receive_in_buf (ipc_context, res_msg);
+ if (res != SA_AIS_OK) {
+ return (res);
}
- return (error);
+ return (SA_AIS_OK);
}
-
SaAisErrorT
saHandleCreate (
struct saHandleDatabase *handleDatabase,
@@ -525,7 +595,7 @@
{
uint32_t handle;
uint32_t check;
- void *newHandles;
+ void *newHandles = NULL;
int found = 0;
void *instance;
int i;
Index: lib/util.h
===================================================================
--- lib/util.h (revision 1687)
+++ lib/util.h (working copy)
@@ -73,47 +73,42 @@
};
SaAisErrorT
-saServiceConnect (
- int *responseOut,
- int *callbackOut,
- enum service_types service);
+openais_service_connect (
+ enum service_types service,
+ void **ipc_context);
SaAisErrorT
-saRecvRetry (
- int s,
- void *msg,
- size_t len);
+openais_service_disconnect (
+ void *ipc_context);
-SaAisErrorT
-saSendRetry (
- int s,
- const void *msg,
- size_t len);
+int
+openais_fd_get (
+ void *ipc_context);
-SaAisErrorT saSendMsgRetry (
- int s,
- struct iovec *iov,
- int iov_len);
+int
+openais_dispatch_recv (
+ void *ipc_context,
+ void *buf,
+ int timeout);
-SaAisErrorT saSendMsgReceiveReply (
- int s,
+int
+openais_dispatch_flow_control_get (
+ void *ipc_context);
+
+SaAisErrorT
+openais_msg_send_reply_receive (
+ void *ipc_context,
struct iovec *iov,
int iov_len,
- void *responseMessage,
- int responseLen);
+ void *res_msg,
+ int res_len);
-SaAisErrorT saSendReceiveReply (
- int s,
- void *requestMessage,
- int requestLen,
- void *responseMessage,
- int responseLen);
-
SaAisErrorT
-saPollRetry (
- struct pollfd *ufds,
- unsigned int nfds,
- int timeout);
+openais_msg_send_reply_receive_in_buf (
+ void *ipc_context,
+ struct iovec *iov,
+ int iov_len,
+ void **res_msg);
SaAisErrorT
saHandleCreate (
Index: lib/msg.c
===================================================================
--- lib/msg.c (revision 1687)
+++ lib/msg.c (working copy)
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
- * Copyright (c) 2006-2007 Red Hat, Inc.
+ * Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
@@ -63,8 +63,7 @@
* Data structure for instance data
*/
struct msgInstance {
- int response_fd;
- int dispatch_fd;
+ void *ipc_ctx;
SaMsgCallbacksT callbacks;
int finalize;
SaMsgHandleT msgHandle;
@@ -74,7 +73,7 @@
};
struct msgQueueInstance {
- int response_fd;
+ void *ipc_ctx;
SaMsgHandleT msgHandle;
SaMsgQueueHandleT queueHandle;
SaMsgQueueOpenFlagsT openFlags;
@@ -219,10 +218,7 @@
goto error_destroy;
}
- msgInstance->response_fd = -1;
-
- error = saServiceConnect (&msgInstance->response_fd,
- &msgInstance->dispatch_fd, MSG_SERVICE);
+ error = openais_service_connect (MSG_SERVICE, &msgInstance->ipc_ctx);
if (error != SA_AIS_OK) {
goto error_put_destroy;
}
@@ -267,7 +263,7 @@
return (error);
}
- *selectionObject = msgInstance->dispatch_fd;
+ *selectionObject = openais_fd_get (msgInstance->ipc_ctx);
saHandleInstancePut (&msgHandleDatabase, msgHandle);
@@ -279,8 +275,6 @@
const SaMsgHandleT msgHandle,
SaDispatchFlagsT dispatchFlags)
{
- struct pollfd ufds;
- int poll_fd;
int timeout = 1;
SaMsgCallbacksT callbacks;
SaAisErrorT error;
@@ -315,18 +309,9 @@
}
do {
- /*
- * Read data directly from socket
- */
- poll_fd = msgInstance->dispatch_fd;
- ufds.fd = poll_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
+ dispatch_avail = openais_dispatch_recv (msgInstance->ipc_ctx,
+ &dispatch_data, timeout);
- error = saPollRetry(&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
- goto error_put;
- }
pthread_mutex_lock(&msgInstance->dispatch_mutex);
if (msgInstance->finalize == 1) {
@@ -334,13 +319,6 @@
goto error_unlock;
}
- if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
- error = SA_AIS_ERR_BAD_HANDLE;
- goto error_unlock;
- }
-
- dispatch_avail = (ufds.revents & POLLIN);
-
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
pthread_mutex_unlock(&msgInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -350,18 +328,7 @@
continue;
}
- memset(&dispatch_data,0, sizeof(struct message_overlay));
- error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.header, sizeof \
(mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
+ memset(&dispatch_data, 0, sizeof(struct message_overlay));
/*
* Make copy of callbacks, message data, unlock instance,
@@ -448,7 +415,6 @@
} while (cont);
error_unlock:
pthread_mutex_unlock(&msgInstance->dispatch_mutex);
-error_put:
saHandleInstancePut(&msgHandleDatabase, msgHandle);
error_exit:
return (error);
@@ -484,16 +450,8 @@
pthread_mutex_destroy (&msgInstance->response_mutex);
// TODO msgInstanceFinalize (msgInstance);
- if (msgInstance->response_fd != -1) {
- shutdown (msgInstance->response_fd, 0);
- close (msgInstance->response_fd);
- }
+ error = openais_service_disconnect (&msgInstance->ipc_ctx);
- if (msgInstance->dispatch_fd != -1) {
- shutdown (msgInstance->dispatch_fd, 0);
- close (msgInstance->dispatch_fd);
- }
-
saHandleInstancePut (&msgHandleDatabase, msgHandle);
return (SA_AIS_OK);
@@ -513,6 +471,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queueopen req_lib_msg_queueopen;
struct res_lib_msg_queueopen res_lib_msg_queueopen;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
(void *)&msgInstance);
@@ -532,7 +491,7 @@
goto error_destroy;
}
- msgQueueInstance->response_fd = msgInstance->response_fd;
+ msgQueueInstance->ipc_ctx = msgInstance->ipc_ctx;
msgQueueInstance->response_mutex = &msgInstance->response_mutex;
msgQueueInstance->msgHandle = msgHandle;
@@ -553,11 +512,14 @@
req_lib_msg_queueopen.openFlags = openFlags;
req_lib_msg_queueopen.timeout = timeout;
+ iov.iov_base = &req_lib_msg_queueopen;
+ iov.iov_len = sizeof (struct req_lib_msg_queueopen);
+
pthread_mutex_lock (msgQueueInstance->response_mutex);
- error = saSendReceiveReply (msgQueueInstance->response_fd,
- &req_lib_msg_queueopen,
- sizeof (struct req_lib_msg_queueopen),
+ error = openais_msg_send_reply_receive (msgQueueInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queueopen,
sizeof (struct res_lib_msg_queueopen));
@@ -598,6 +560,7 @@
SaAisErrorT error;
struct req_lib_msg_queueopen req_lib_msg_queueopen;
struct res_lib_msg_queueopenasync res_lib_msg_queueopenasync;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
(void *)&msgInstance);
@@ -622,7 +585,7 @@
goto error_destroy;
}
- msgQueueInstance->response_fd = msgInstance->response_fd;
+ msgQueueInstance->ipc_ctx = msgInstance->ipc_ctx;
msgQueueInstance->response_mutex = &msgInstance->response_mutex;
msgQueueInstance->msgHandle = msgHandle;
@@ -642,11 +605,14 @@
req_lib_msg_queueopen.openFlags = openFlags;
req_lib_msg_queueopen.queueHandle = queueHandle;
+ iov.iov_base = &req_lib_msg_queueopen;
+ iov.iov_len = sizeof (struct req_lib_msg_queueopen);
+
pthread_mutex_unlock (msgQueueInstance->response_mutex);
- error = saSendReceiveReply (msgQueueInstance->response_fd,
- &req_lib_msg_queueopen,
- sizeof (struct req_lib_msg_queueopen),
+ error = openais_msg_send_reply_receive (msgQueueInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queueopenasync,
sizeof (struct res_lib_msg_queueopenasync));
@@ -681,6 +647,7 @@
struct res_lib_msg_queueclose res_lib_msg_queueclose;
SaAisErrorT error;
struct msgQueueInstance *msgQueueInstance;
+ struct iovec iov;
error = saHandleInstanceGet (&queueHandleDatabase, queueHandle,
(void *)&msgQueueInstance);
@@ -694,11 +661,14 @@
&msgQueueInstance->queueName, sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queueclose;
+ iov.iov_len = sizeof (struct req_lib_msg_queueclose);
+
pthread_mutex_lock (msgQueueInstance->response_mutex);
- error = saSendReceiveReply (msgQueueInstance->response_fd,
- &req_lib_msg_queueclose,
- sizeof (struct req_lib_msg_queueclose),
+ error = openais_msg_send_reply_receive (msgQueueInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queueclose,
sizeof (struct res_lib_msg_queueclose));
@@ -726,6 +696,7 @@
struct req_lib_msg_queuestatusget req_lib_msg_queuestatusget;
struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget;
SaAisErrorT error;
+ struct iovec iov;
if (queueName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -739,11 +710,14 @@
req_lib_msg_queuestatusget.header.id = MESSAGE_REQ_MSG_QUEUESTATUSGET;
memcpy (&req_lib_msg_queuestatusget.queueName, queueName, sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuestatusget;
+ iov.iov_len = sizeof (struct req_lib_msg_queuestatusget);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuestatusget,
- sizeof (struct req_lib_msg_queuestatusget),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuestatusget,
sizeof (struct res_lib_msg_queuestatusget));
@@ -770,6 +744,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queueunlink req_lib_msg_queueunlink;
struct res_lib_msg_queueunlink res_lib_msg_queueunlink;
+ struct iovec iov;
if (queueName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -783,11 +758,14 @@
req_lib_msg_queueunlink.header.id = MESSAGE_REQ_MSG_QUEUEUNLINK;
memcpy (&req_lib_msg_queueunlink.queueName, queueName, sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queueunlink;
+ iov.iov_len = sizeof (struct res_lib_msg_queueunlink);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queueunlink,
- sizeof (struct req_lib_msg_queueunlink),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queueunlink,
sizeof (struct res_lib_msg_queueunlink));
@@ -808,6 +786,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegroupcreate req_lib_msg_queuegroupcreate;
struct res_lib_msg_queuegroupcreate res_lib_msg_queuegroupcreate;
+ struct iovec iov;
if (queueGroupName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -823,11 +802,14 @@
sizeof (SaNameT));
req_lib_msg_queuegroupcreate.queueGroupPolicy = queueGroupPolicy;
+ iov.iov_base = &req_lib_msg_queuegroupcreate;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegroupcreate);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegroupcreate,
- sizeof (struct req_lib_msg_queuegroupcreate),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegroupcreate,
sizeof (struct res_lib_msg_queuegroupcreate));
@@ -848,6 +830,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegroupinsert req_lib_msg_queuegroupinsert;
struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert;
+ struct iovec iov;
if (queueName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -863,11 +846,14 @@
memcpy (&req_lib_msg_queuegroupinsert.queueGroupName, queueGroupName,
sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuegroupinsert;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegroupinsert);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegroupinsert,
- sizeof (struct req_lib_msg_queuegroupinsert),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegroupinsert,
sizeof (struct res_lib_msg_queuegroupinsert));
@@ -888,6 +874,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegroupremove req_lib_msg_queuegroupremove;
struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove;
+ struct iovec iov;
if (queueName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -903,11 +890,14 @@
memcpy (&req_lib_msg_queuegroupremove.queueGroupName, queueGroupName,
sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuegroupremove;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegroupremove);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegroupremove,
- sizeof (struct req_lib_msg_queuegroupremove),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegroupremove,
sizeof (struct res_lib_msg_queuegroupremove));
@@ -927,6 +917,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegroupdelete req_lib_msg_queuegroupdelete;
struct res_lib_msg_queuegroupdelete res_lib_msg_queuegroupdelete;
+ struct iovec iov;
if (queueGroupName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -941,11 +932,14 @@
memcpy (&req_lib_msg_queuegroupdelete.queueGroupName, queueGroupName,
sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuegroupdelete;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegroupdelete);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegroupdelete,
- sizeof (struct req_lib_msg_queuegroupdelete),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegroupdelete,
sizeof (struct res_lib_msg_queuegroupdelete));
@@ -967,6 +961,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegrouptrack req_lib_msg_queuegrouptrack;
struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
+ struct iovec iov;
if (queueGroupName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -982,11 +977,14 @@
memcpy (&req_lib_msg_queuegrouptrack.queueGroupName, queueGroupName,
sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuegrouptrack;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegrouptrack);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegrouptrack,
- sizeof (struct req_lib_msg_queuegrouptrack),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegrouptrack,
sizeof (struct res_lib_msg_queuegrouptrack));
@@ -1006,6 +1004,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_queuegrouptrackstop req_lib_msg_queuegrouptrackstop;
struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
+ struct iovec iov;
if (queueGroupName == NULL) {
return (SA_AIS_ERR_INVALID_PARAM);
@@ -1020,11 +1019,14 @@
memcpy (&req_lib_msg_queuegrouptrackstop.queueGroupName, queueGroupName,
sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_queuegrouptrackstop;
+ iov.iov_len = sizeof (struct req_lib_msg_queuegrouptrackstop);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_queuegrouptrackstop,
- sizeof (struct req_lib_msg_queuegrouptrackstop),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_queuegrouptrackstop,
sizeof (struct res_lib_msg_queuegrouptrackstop));
@@ -1046,6 +1048,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_messagesend req_lib_msg_messagesend;
struct res_lib_msg_messagesend res_lib_msg_messagesend;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
if (error != SA_AIS_OK) {
@@ -1056,11 +1059,14 @@
req_lib_msg_messagesend.header.id = MESSAGE_REQ_MSG_MESSAGESEND;
memcpy (&req_lib_msg_messagesend.destination, destination, sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_messagesend;
+ iov.iov_len = sizeof (req_lib_msg_messagesend);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_messagesend,
- sizeof (struct req_lib_msg_messagesend),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagesend,
sizeof (struct res_lib_msg_messagesend));
@@ -1083,6 +1089,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_messagesend req_lib_msg_messagesend;
struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
if (error != SA_AIS_OK) {
@@ -1093,11 +1100,14 @@
req_lib_msg_messagesend.header.id = MESSAGE_REQ_MSG_MESSAGESEND;
memcpy (&req_lib_msg_messagesend.destination, destination, sizeof (SaNameT));
+ iov.iov_base = &req_lib_msg_messagesend;
+ iov.iov_len = sizeof (struct req_lib_msg_messagesend);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_messagesend,
- sizeof (struct req_lib_msg_messagesend),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagesendasync,
sizeof (struct res_lib_msg_messagesendasync));
@@ -1120,6 +1130,7 @@
struct msgQueueInstance *msgQueueInstance;
struct req_lib_msg_messageget req_lib_msg_messageget;
struct res_lib_msg_messageget res_lib_msg_messageget;
+ struct iovec iov;
error = saHandleInstanceGet (&queueHandleDatabase, queueHandle, (void \
*)&msgQueueInstance); if (error != SA_AIS_OK) {
@@ -1130,11 +1141,14 @@
req_lib_msg_messageget.header.id = MESSAGE_REQ_MSG_MESSAGEGET;
req_lib_msg_messageget.timeout = timeout;
+ iov.iov_base = &req_lib_msg_messageget;
+ iov.iov_len = sizeof (struct req_lib_msg_messageget);
+
pthread_mutex_lock (msgQueueInstance->response_mutex);
- error = saSendReceiveReply (msgQueueInstance->response_fd,
- &req_lib_msg_messageget,
- sizeof (struct req_lib_msg_messageget),
+ error = openais_msg_send_reply_receive (msgQueueInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messageget,
sizeof (struct res_lib_msg_messageget));
@@ -1160,6 +1174,7 @@
struct msgQueueInstance *msgQueueInstance;
struct req_lib_msg_messagecancel req_lib_msg_messagecancel;
struct res_lib_msg_messagecancel res_lib_msg_messagecancel;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, queueHandle, (void \
*)&msgQueueInstance); if (error != SA_AIS_OK) {
@@ -1169,11 +1184,14 @@
req_lib_msg_messagecancel.header.size = sizeof (struct req_lib_msg_messagecancel);
req_lib_msg_messagecancel.header.id = MESSAGE_REQ_MSG_MESSAGECANCEL;
+ iov.iov_base = &req_lib_msg_messagecancel;
+ iov.iov_len = sizeof (struct req_lib_msg_messagecancel);
+
pthread_mutex_lock (msgQueueInstance->response_mutex);
- error = saSendReceiveReply (msgQueueInstance->response_fd,
- &req_lib_msg_messagecancel,
- sizeof (struct req_lib_msg_messagecancel),
+ error = openais_msg_send_reply_receive (msgQueueInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagecancel,
sizeof (struct res_lib_msg_messagecancel));
@@ -1197,6 +1215,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_messagesendreceive req_lib_msg_messagesendreceive;
struct res_lib_msg_messagesendreceive res_lib_msg_messagesendreceive;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
if (error != SA_AIS_OK) {
@@ -1209,11 +1228,14 @@
sizeof (SaNameT));
req_lib_msg_messagesendreceive.timeout = timeout;
+ iov.iov_base = &req_lib_msg_messagesendreceive;
+ iov.iov_len = sizeof (struct req_lib_msg_messagesendreceive);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_messagesendreceive,
- sizeof (struct req_lib_msg_messagesendreceive),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagesendreceive,
sizeof (struct res_lib_msg_messagesendreceive));
@@ -1241,6 +1263,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_messagereply req_lib_msg_messagereply;
struct res_lib_msg_messagereply res_lib_msg_messagereply;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
if (error != SA_AIS_OK) {
@@ -1251,11 +1274,14 @@
req_lib_msg_messagereply.header.id = MESSAGE_REQ_MSG_MESSAGEREPLY;
memcpy (&req_lib_msg_messagereply.senderId, senderId, sizeof (SaMsgSenderIdT));
+ iov.iov_base = &req_lib_msg_messagereply;
+ iov.iov_len = sizeof (struct req_lib_msg_messagereply);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_messagereply,
- sizeof (struct req_lib_msg_messagereply),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagereply,
sizeof (struct res_lib_msg_messagereply));
@@ -1277,6 +1303,7 @@
struct msgInstance *msgInstance;
struct req_lib_msg_messagereply req_lib_msg_messagereply;
struct res_lib_msg_messagereplyasync res_lib_msg_messagereplyasync;
+ struct iovec iov;
error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
if (error != SA_AIS_OK) {
@@ -1287,11 +1314,14 @@
req_lib_msg_messagereply.header.id = MESSAGE_REQ_MSG_MESSAGEREPLY;
memcpy (&req_lib_msg_messagereply.senderId, senderId, sizeof (SaMsgSenderIdT));
+ iov.iov_base = &req_lib_msg_messagereply;
+ iov.iov_len = sizeof (struct req_lib_msg_messagereply);
+
pthread_mutex_lock (&msgInstance->response_mutex);
- error = saSendReceiveReply (msgInstance->response_fd,
- &req_lib_msg_messagereply,
- sizeof (struct req_lib_msg_messagereply),
+ error = openais_msg_send_reply_receive (msgInstance->ipc_ctx,
+ &iov,
+ 1,
&res_lib_msg_messagereplyasync,
sizeof (struct res_lib_msg_messagereplyasync));
Index: lib/evs.c
===================================================================
--- lib/evs.c (revision 1687)
+++ lib/evs.c (working copy)
@@ -45,6 +45,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
+#include <stdio.h>
#include "../exec/totem.h"
#include "../include/saAis.h"
@@ -53,8 +54,7 @@
#include "util.h"
struct evs_inst {
- int response_fd;
- int dispatch_fd;
+ void *ipc_segment;
int finalize;
evs_callbacks_t callbacks;
pthread_mutex_t response_mutex;
@@ -116,10 +116,8 @@
goto error_destroy;
}
- error = saServiceConnect (&evs_inst->response_fd,
- &evs_inst->dispatch_fd,
- EVS_SERVICE);
- if (error != SA_AIS_OK) {
+ error = openais_service_connect (EVS_SERVICE, &evs_inst->ipc_segment);
+ if (error != EVS_OK) {
goto error_put_destroy;
}
@@ -151,7 +149,6 @@
if (error != SA_AIS_OK) {
return (error);
}
-// TODO is the locking right here
pthread_mutex_lock (&evs_inst->response_mutex);
/*
@@ -165,23 +162,14 @@
evs_inst->finalize = 1;
+ openais_service_disconnect (evs_inst->ipc_segment);
+
pthread_mutex_unlock (&evs_inst->response_mutex);
saHandleDestroy (&evs_handle_t_db, handle);
- /*
- * Disconnect from the server
- */
- if (evs_inst->response_fd != -1) {
- shutdown(evs_inst->response_fd, 0);
- close(evs_inst->response_fd);
- }
- if (evs_inst->dispatch_fd != -1) {
- shutdown(evs_inst->dispatch_fd, 0);
- close(evs_inst->dispatch_fd);
- }
+
saHandleInstancePut (&evs_handle_t_db, handle);
-
return (EVS_OK);
}
@@ -197,7 +185,7 @@
return (error);
}
- *fd = evs_inst->dispatch_fd;
+ *fd = openais_fd_get (evs_inst->ipc_segment);
saHandleInstancePut (&evs_handle_t_db, handle);
@@ -208,7 +196,6 @@
evs_handle_t handle,
evs_dispatch_t dispatch_types)
{
- struct pollfd ufds;
int timeout = -1;
SaAisErrorT error;
int cont = 1; /* always continue do loop except when set to 0 */
@@ -234,26 +221,16 @@
}
do {
- ufds.fd = evs_inst->dispatch_fd;
- ufds.events = POLLIN;
- ufds.revents = 0;
-
- error = saPollRetry (&ufds, 1, timeout);
- if (error != SA_AIS_OK) {
+ dispatch_avail = openais_dispatch_recv (evs_inst->ipc_segment, (void \
*)&dispatch_data, timeout); + if (dispatch_avail == -1) {
+ error = SA_AIS_ERR_LIBRARY;
goto error_nounlock;
}
+
pthread_mutex_lock (&evs_inst->dispatch_mutex);
/*
- * Regather poll data in case ufds has changed since taking lock
- */
- error = saPollRetry (&ufds, 1, 0);
- if (error != SA_AIS_OK) {
- goto error_nounlock;
- }
-
- /*
* Handle has been finalized in another thread
*/
if (evs_inst->finalize == 1) {
@@ -262,38 +239,15 @@
goto error_unlock;
}
- dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
pthread_mutex_unlock (&evs_inst->dispatch_mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
pthread_mutex_unlock (&evs_inst->dispatch_mutex);
- continue; /* next poll */
+ continue; /* next dispatch event */
}
- if (ufds.revents & POLLIN) {
- /*
- * Queue empty, read response from socket
- */
- error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.header,
- sizeof (mar_res_header_t));
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
- error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.data,
- dispatch_data.header.size - sizeof (mar_res_header_t));
-
- if (error != SA_AIS_OK) {
- goto error_unlock;
- }
- }
- } else {
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
- continue;
- }
-
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
@@ -370,7 +324,7 @@
struct res_lib_evs_join res_lib_evs_join;
error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
- if (error != SA_AIS_OK) {
+ if (error != EVS_OK) {
return (error);
}
@@ -386,7 +340,7 @@
pthread_mutex_lock (&evs_inst->response_mutex);
- error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
+ error = openais_msg_send_reply_receive (evs_inst->ipc_segment, iov, 2,
&res_lib_evs_join, sizeof (struct res_lib_evs_join));
pthread_mutex_unlock (&evs_inst->response_mutex);
@@ -431,7 +385,7 @@
pthread_mutex_lock (&evs_inst->response_mutex);
- error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
+ error = openais_msg_send_reply_receive (evs_inst->ipc_segment, iov, 2,
&res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
pthread_mutex_unlock (&evs_inst->response_mutex);
@@ -484,8 +438,10 @@
pthread_mutex_lock (&evs_inst->response_mutex);
- error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 1,
- &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined));
+ error = openais_msg_send_reply_receive (evs_inst->ipc_segment, iov,
+ iov_len + 1,
+ &res_lib_evs_mcast_joined,
+ sizeof (struct res_lib_evs_mcast_joined));
pthread_mutex_unlock (&evs_inst->response_mutex);
@@ -539,8 +495,10 @@
pthread_mutex_lock (&evs_inst->response_mutex);
- error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 2,
- &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups));
+ error = openais_msg_send_reply_receive (evs_inst->ipc_segment, iov,
+ iov_len + 2,
+ &res_lib_evs_mcast_groups,
+ sizeof (struct res_lib_evs_mcast_groups));
pthread_mutex_unlock (&evs_inst->response_mutex);
if (error != SA_AIS_OK) {
@@ -580,8 +538,11 @@
pthread_mutex_lock (&evs_inst->response_mutex);
- error = saSendMsgReceiveReply (evs_inst->response_fd, &iov, 1,
- &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get));
+ error = openais_msg_send_reply_receive (evs_inst->ipc_segment,
+ &iov,
+ 1,
+ &res_lib_evs_membership_get,
+ sizeof (struct res_lib_evs_membership_get));
pthread_mutex_unlock (&evs_inst->response_mutex);
_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic