[prev in list] [next in list] [prev in thread] [next in thread]
List: openais
Subject: [Openais] Event service unlink code
From: Mark Haverkamp <markh () osdl ! org>
Date: 2005-02-11 17:19:05
Message-ID: 1108142345.10361.14.camel () markh1 ! pdx ! osdl ! net
[Download RAW message or body]
Here is a patch for the event service saEvtChannelUnlink() code. This
address just the unlink itself, not any partition merge issues. That
will come later. I have also updated testevt to include functional
tests of the API. It passes the tests and I have been running my
subscribe/publish tests overnight. This is the last unimplemented B
spec feature.
One of these days I'll write up some documentation on the event test
programs and check it in.
Mark.
--
Mark Haverkamp <markh@osdl.org>
["evt_unlink.patch" (evt_unlink.patch)]
===== exec/evt.c 1.17 vs edited =====
--- 1.17/exec/evt.c 2005-02-08 09:16:51 -08:00
+++ edited/exec/evt.c 2005-02-10 15:56:36 -08:00
@@ -35,6 +35,7 @@
#define RECOVERY_DEBUG LOG_LEVEL_DEBUG
#define CHAN_DEL_DEBUG LOG_LEVEL_DEBUG
#define CHAN_OPEN_DEBUG LOG_LEVEL_DEBUG
+#define CHAN_UNLINK_DEBUG LOG_LEVEL_DEBUG
#define REMOTE_OP_DEBUG LOG_LEVEL_DEBUG
#include <sys/types.h>
@@ -68,6 +69,7 @@
static int lib_evt_open_channel_async(struct conn_info *conn_info,
void *message);
static int lib_evt_close_channel(struct conn_info *conn_info, void *message);
+static int lib_evt_unlink_channel(struct conn_info *conn_info, void *message);
static int lib_evt_event_subscribe(struct conn_info *conn_info,
void *message);
static int lib_evt_event_unsubscribe(struct conn_info *conn_info,
@@ -113,6 +115,11 @@
.response_id = MESSAGE_RES_EVT_CLOSE_CHANNEL,
},
{
+ .libais_handler_fn = lib_evt_unlink_channel,
+ .response_size = sizeof(struct res_evt_channel_unlink),
+ .response_id = MESSAGE_RES_EVT_UNLINK_CHANNEL,
+ },
+ {
.libais_handler_fn = lib_evt_event_subscribe,
.response_size = sizeof(struct res_evt_event_subscribe),
.response_id = MESSAGE_RES_EVT_SUBSCRIBE,
@@ -168,7 +175,7 @@
.exec_dump_fn = 0
};
-// TODOstatic totempg_recovery_plug_handle evt_recovery_plug_handle;
+// TODO static totempg_recovery_plug_handle evt_recovery_plug_handle;
/*
* list of all retained events
@@ -182,12 +189,49 @@
*/
static DECLARE_LIST_INIT(esc_head);
+/*
+ * list of all unlinked event channel information
+ * struct event_svr_channel_instance
+ */
+static DECLARE_LIST_INIT(esc_unlinked_head);
+
/*
* list of all active event conn_info structs.
* struct conn_info
*/
static DECLARE_LIST_INIT(ci_head);
+
+/*
+ * Global varaibles used by the event service
+ *
+ * base_id_top: upper bits of next event ID to assign
+ * base_id: Lower bits of Next event ID to assign
+ * my_node_id: My cluster node id
+ * in_cfg_change: Config change occurred. Figure out who sends retained evts.
+ * cleared when retained events have been delivered.
+ * total_members: how many members in this cluster
+ * checked_in: keep track during config change.
+ * any_joined: did any nodes join on this change?
+ * recovery_node: True if we're the recovery node.
+ * tok_call_handle: totempg token callback handle for recovery.
+ * next_retained: pointer to next retained message to send during recovery.
+ * next_chan: pointer to next channel to send during recovery.
+ *
+ */
+#define BASE_ID_MASK 0xffffffffLL
+static SaEvtEventIdT base_id = 0;
+static SaEvtEventIdT base_id_top = 0;
+static SaClmNodeIdT my_node_id = 0;
+static int in_cfg_change = 0;
+static int total_members = 0;
+static int checked_in = 0;
+static int any_joined = 0;
+static int recovery_node = 0;
+static void *tok_call_handle = 0;
+static struct list_head *next_retained = 0;
+static struct list_head *next_chan = 0;
+
/*
* Structure to track pending channel open requests.
* ocp_async: 1 for async open
@@ -208,13 +252,43 @@
uint32_t ocp_c_handle;
struct list_head ocp_entry;
};
-
+
/*
* list of pending channel opens
*/
static DECLARE_LIST_INIT(open_pending);
static void chan_open_timeout(void *data);
+/*
+ * Structure to track pending channel unlink requests.
+ * ucp_unlink_id: unlink ID of unlinked channel.
+ * ucp_conn_info: conn_info for returning to the library.
+ * ucp_entry: list entry for pending unlink list.
+ */
+struct unlink_chan_pending {
+ uint64_t ucp_unlink_id;
+ struct conn_info *ucp_conn_info;
+ struct list_head ucp_entry;
+};
+
+/*
+ * list of pending unlink requests
+ */
+static DECLARE_LIST_INIT(unlink_pending);
+
+/*
+ * Next unlink ID
+ */
+static uint64_t base_unlink_id = 0;
+inline uint64_t
+next_chan_unlink_id()
+{
+ uint64_t uid = my_node_id;
+ uid = (uid << 32ULL) | base_unlink_id;
+ base_unlink_id = (base_unlink_id + 1ULL) & BASE_ID_MASK;
+ return uid;
+}
+
#define min(a,b) ((a) < (b) ? (a) : (b))
/*
@@ -274,6 +348,10 @@
* esc_open_chans: list of opens of this channel.
* (event_svr_channel_open.eco_entry)
* esc_entry: links to other channels. (used by esc_head)
+ * esc_unlink_id: If non-zero, then the channel has been marked
+ * for unlink. This unlink ID is used to
+ * mark events still associated with current openings
+ * so they get delivered to the proper recipients.
*/
struct event_svr_channel_instance {
SaNameT esc_channel_name;
@@ -284,6 +362,7 @@
uint32_t esc_retained_count;
struct list_head esc_open_chans;
struct list_head esc_entry;
+ uint64_t esc_unlink_id;
};
/*
@@ -390,37 +469,6 @@
struct list_head mn_entry;
};
DECLARE_LIST_INIT(mnd);
-
-/*
- * Global varaibles used by the event service
- *
- * base_id_top: upper bits of next event ID to assign
- * base_id: Lower bits of Next event ID to assign
- * my_node_id: My cluster node id
- * in_cfg_change: Config change occurred. Figure out who sends retained evts.
- * cleared when retained events have been delivered.
- * total_members: how many members in this cluster
- * checked_in: keep track during config change.
- * any_joined: did any nodes join on this change?
- * recovery_node: True if we're the recovery node.
- * tok_call_handle: totempg token callback handle for recovery.
- * next_retained: pointer to next retained message to send during recovery.
- * next_chan: pointer to next channel to send during recovery.
- *
- */
-#define BASE_ID_MASK 0xffffffffLL
-static SaEvtEventIdT base_id = 0;
-static SaEvtEventIdT base_id_top = 0;
-static SaClmNodeIdT my_node_id = 0;
-static int in_cfg_change = 0;
-static int total_members = 0;
-static int checked_in = 0;
-static int any_joined = 0;
-static int recovery_node = 0;
-static void *tok_call_handle = 0;
-static struct list_head *next_retained = 0;
-static struct list_head *next_chan = 0;
-
/*
* Take the filters we received from the application via the library and
* make them into a real SaEvtEventFilterArrayT
@@ -495,16 +543,27 @@
* Look up a channel in the global channel list
*/
static struct event_svr_channel_instance *
-find_channel(SaNameT *chan_name)
+find_channel(SaNameT *chan_name, uint64_t unlink_id)
{
- struct list_head *l;
+ struct list_head *l, *head;
struct event_svr_channel_instance *eci;
- for (l = esc_head.next; l != &esc_head; l = l->next) {
+ /*
+ * choose which list to look through
+ */
+ if (unlink_id == EVT_CHAN_ACTIVE) {
+ head = &esc_head;
+ } else {
+ head = &esc_unlinked_head;
+ }
+
+ for (l = head->next; l != head; l = l->next) {
eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
if (!name_match(chan_name, &eci->esc_channel_name)) {
continue;
+ } else if (unlink_id != eci->esc_unlink_id) {
+ continue;
}
return eci;
}
@@ -512,6 +571,29 @@
}
/*
+ * Find the last unlinked version of a channel.
+ */
+static struct event_svr_channel_instance *
+find_last_unlinked_channel(SaNameT *chan_name)
+{
+ struct list_head *l;
+ struct event_svr_channel_instance *eci;
+
+ /*
+ * unlinked channels are added to the head of the list
+ * so the first one we see is the last one added.
+ */
+ for (l = esc_unlinked_head.next; l != &esc_unlinked_head; l = l->next) {
+
+ eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
+ if (!name_match(chan_name, &eci->esc_channel_name)) {
+ continue;
+ }
+ }
+ return 0;
+}
+
+/*
* Create and initialize a channel instance structure
*/
static struct event_svr_channel_instance *create_channel(SaNameT *cn)
@@ -731,12 +813,15 @@
eci->esc_retained_count);
/*
* If no one has the channel open anywhere and there are no unexpired
- * retained events for this channel, then it is OK to delete the
- * data structure.
+ * retained events for this channel, and it has been marked for deletion
+ * by an unlink, then it is OK to delete the data structure.
*/
- if ((eci->esc_retained_count == 0) && (eci->esc_total_opens == 0)) {
+ if ((eci->esc_retained_count == 0) && (eci->esc_total_opens == 0) &&
+ (eci->esc_unlink_id != EVT_CHAN_ACTIVE)) {
log_printf(CHAN_DEL_DEBUG, "Delete channel %s\n",
eci->esc_channel_name.value);
+ log_printf(CHAN_UNLINK_DEBUG, "Delete channel %s, unlink_id %0llx\n",
+ eci->esc_channel_name.value, eci->esc_unlink_id);
if (!list_empty(&eci->esc_open_chans)) {
log_printf(LOG_LEVEL_NOTICE,
@@ -762,6 +847,47 @@
}
/*
+ * Mark a channel for deletion.
+ */
+static void unlink_channel(struct event_svr_channel_instance *eci,
+ uint64_t unlink_id)
+{
+ struct event_data *edp;
+ struct list_head *l;
+
+ log_printf(CHAN_UNLINK_DEBUG, "Unlink request: %s, id 0x%llx\n",
+ eci->esc_channel_name.value, unlink_id);
+ /*
+ * The unlink ID is used to note that the channel has been marked
+ * for deletion and is a way to distinguish between multiple
+ * channels of the same name each marked for deletion.
+ */
+ eci->esc_unlink_id = unlink_id;
+
+ /*
+ * Move the unlinked channel to the unlinked list. This way
+ * we don't have to worry about filtering it out when we need to
+ * distribute retained events at recovery time.
+ */
+ list_del(&eci->esc_entry);
+ list_add(&eci->esc_entry, &esc_unlinked_head);
+
+ /*
+ * Scan the retained event list and tag any associated with this channel
+ * with the unlink ID so that they get routed properly.
+ */
+ for (l = retained_list.next; l != &retained_list; l = l->next) {
+ edp = list_entry(l, struct event_data, ed_retained);
+ if ((edp->ed_my_chan == eci) &&
+ (edp->ed_event.led_chan_unlink_id == EVT_CHAN_ACTIVE)) {
+ edp->ed_event.led_chan_unlink_id = unlink_id;
+ }
+ }
+
+ delete_channel(eci);
+}
+
+/*
* Remove the specified node from the node list in this channel.
*/
static int remove_open_count(
@@ -821,7 +947,7 @@
ret = SA_AIS_OK;
- eci = find_channel(cn);
+ eci = find_channel(cn, EVT_CHAN_ACTIVE);
/*
* If the create flag set, and it doesn't exist, we can make the channel.
@@ -860,7 +986,7 @@
/*
* Send a request to close a channel with the rest of the cluster.
*/
-static SaErrorT evt_close_channel(SaNameT *cn)
+static SaErrorT evt_close_channel(SaNameT *cn, uint64_t unlink_id)
{
struct req_evt_chan_command cpkt;
struct iovec chn_iovec;
@@ -877,7 +1003,8 @@
cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
cpkt.chc_head.size = sizeof(cpkt);
cpkt.chc_op = EVT_CLOSE_CHAN_OP;
- cpkt.u.chc_chan = *cn;
+ cpkt.u.chcu.chcu_name = *cn;
+ cpkt.u.chcu.chcu_unlink_id = unlink_id;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
@@ -1312,7 +1439,7 @@
list_del(&edp->ed_retained);
list_init(&edp->ed_retained);
/*
- * Check to see it the channel isn't in use anymore.
+ * Check to see if the channel isn't in use anymore.
*/
edp->ed_my_chan->esc_retained_count--;
if (edp->ed_my_chan->esc_retained_count == 0) {
@@ -1335,7 +1462,7 @@
int ret;
log_printf(LOG_LEVEL_DEBUG, "Search for Event ID %llx\n", event_id);
- for(l = retained_list.next; l != &retained_list; l = nxt) {
+ for (l = retained_list.next; l != &retained_list; l = nxt) {
nxt = l->next;
edp = list_entry(l, struct event_data, ed_retained);
if (edp->ed_event.led_event_id != event_id) {
@@ -1788,6 +1915,7 @@
*/
evt->led_chan_name.length = swab16(evt->led_chan_name.length);
+ evt->led_chan_unlink_id = swab64(evt->led_chan_unlink_id);
evt->led_event_id = swab64(evt->led_event_id);
evt->led_sub_id = swab32(evt->led_sub_id);
evt->led_publisher_name.length = swab32(evt->led_publisher_name.length);
@@ -2130,7 +2258,7 @@
eco->eco_flags);
/*
- * Unlink the channel open structure.
+ * Disconnect the channel open structure.
*
* Check for subscriptions and deal with them. In this case
* if there are any, we just implicitly unsubscribe.
@@ -2161,7 +2289,8 @@
* of who they have been delivered to.
*/
remove_delivered_channel(eco);
- return evt_close_channel(&eco->eco_channel->esc_channel_name);
+ return evt_close_channel(&eco->eco_channel->esc_channel_name,
+ eco->eco_channel->esc_unlink_id);
}
/*
@@ -2206,6 +2335,72 @@
}
/*
+ * Handler for saEvtChannelUnlink
+ */
+static int lib_evt_unlink_channel(struct conn_info *conn_info, void *message)
+{
+ struct req_evt_channel_unlink *req;
+ struct res_evt_channel_unlink res;
+ struct iovec chn_iovec;
+ struct unlink_chan_pending *ucp;
+ struct req_evt_chan_command cpkt;
+ SaAisErrorT error = SA_AIS_ERR_LIBRARY;
+
+ req = message;
+
+ log_printf(CHAN_UNLINK_DEBUG,
+ "saEvtChannelUnlink (Unlink channel request)\n");
+ log_printf(CHAN_UNLINK_DEBUG, "Channel Name %s\n",
+ req->iuc_channel_name.value);
+
+ if (!find_channel(&req->iuc_channel_name, EVT_CHAN_ACTIVE)) {
+ log_printf(CHAN_UNLINK_DEBUG, "Channel Name doesn't exist\n");
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto evt_unlink_err;
+ }
+
+ /*
+ * Set up the data structure so that the channel op
+ * mcast handler can complete the unlink comamnd back to the
+ * requestor.
+ */
+ ucp = malloc(sizeof(*ucp));
+ if (!ucp) {
+ log_printf(LOG_LEVEL_ERROR,
+ "saEvtChannelUnlink: Memory allocation failure\n");
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto evt_unlink_err;
+ }
+
+ ucp->ucp_unlink_id = next_chan_unlink_id();
+ ucp->ucp_conn_info = conn_info;
+ list_add_tail(&ucp->ucp_entry, &unlink_pending);
+
+ /*
+ * Put together a mcast packet to notify everyone
+ * of the channel unlink.
+ */
+ memset(&cpkt, 0, sizeof(cpkt));
+ cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+ cpkt.chc_head.size = sizeof(cpkt);
+ cpkt.chc_op = EVT_UNLINK_CHAN_OP;
+ cpkt.u.chcu.chcu_name = req->iuc_channel_name;
+ cpkt.u.chcu.chcu_unlink_id = ucp->ucp_unlink_id;
+ chn_iovec.iov_base = &cpkt;
+ chn_iovec.iov_len = cpkt.chc_head.size;
+ if (totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED) == 0) {
+ return 0;
+ }
+
+evt_unlink_err:
+ res.iuc_head.size = sizeof(res);
+ res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL;
+ res.iuc_head.error = error;
+ libais_send_response (conn_info, &res, sizeof(res));
+ return 0;
+}
+
+/*
* Subscribe to an event channel.
*
* - First look up the channel to subscribe.
@@ -2449,6 +2644,7 @@
req->led_head.id = MESSAGE_REQ_EXEC_EVT_EVENTDATA;
req->led_chan_name = eci->esc_channel_name;
req->led_event_id = event_id;
+ req->led_chan_unlink_id = eci->esc_unlink_id;
/*
* Distribute the event.
@@ -2867,13 +3063,30 @@
evtpkt->led_in_addr = source_addr;
evtpkt->led_receive_time = clust_time_now();
- eci = find_channel(&evtpkt->led_chan_name);
+ log_printf(CHAN_UNLINK_DEBUG,
+ "evt_remote_evt(0): chan %s, id 0x%llx\n",
+ evtpkt->led_chan_name.value, evtpkt->led_chan_unlink_id);
+ eci = find_channel(&evtpkt->led_chan_name, evtpkt->led_chan_unlink_id);
+ /*
+ * We may have had some events that were already queued when an
+ * unlink happened, if we don't find the channel in the active list
+ * look for the last unlinked channel of the same name. If this channel
+ * is re-opened the messages will still be routed correctly because new
+ * active channel messages will be ordered behind the open.
+ */
+ if (!eci && (evtpkt->led_chan_unlink_id == EVT_CHAN_ACTIVE)) {
+ log_printf(CHAN_UNLINK_DEBUG,
+ "evt_remote_evt(1): chan %s, id 0x%llx\n",
+ evtpkt->led_chan_name.value, evtpkt->led_chan_unlink_id);
+ eci = find_last_unlinked_channel(&evtpkt->led_chan_name);
+ }
/*
- * We shouldn't see an event for a channel that we don't know about.
+ * We shouldn't normally see an event for a channel that we
+ * don't know about.
*/
if (!eci) {
- log_printf(LOG_LEVEL_WARNING, "Channel %s doesn't exist\n",
+ log_printf(LOG_LEVEL_DEBUG, "Channel %s doesn't exist\n",
evtpkt->led_chan_name.value);
return 0;
}
@@ -3018,13 +3231,17 @@
md->mn_node_info.nodeId,
md->mn_node_info.nodeName.value);
- eci = find_channel(&evtpkt->led_chan_name);
+ log_printf(CHAN_UNLINK_DEBUG,
+ "evt_recovery_event: chan %s, id 0x%llx\n",
+ evtpkt->led_chan_name.value, evtpkt->led_chan_unlink_id);
+ eci = find_channel(&evtpkt->led_chan_name, evtpkt->led_chan_unlink_id);
/*
- * We shouldn't see an event for a channel that we don't know about.
+ * We shouldn't normally see an event for a channel that we don't
+ * know about.
*/
if (!eci) {
- log_printf(LOG_LEVEL_WARNING, "Channel %s doesn't exist\n",
+ log_printf(LOG_LEVEL_DEBUG, "Channel %s doesn't exist\n",
evtpkt->led_chan_name.value);
return 0;
}
@@ -3149,6 +3366,26 @@
}
/*
+ * Called by the channel unlink exec handler to
+ * respond to the application.
+ */
+static void evt_chan_unlink_finish(struct unlink_chan_pending *ucp)
+{
+ struct res_evt_channel_unlink res;
+
+ log_printf(CHAN_UNLINK_DEBUG, "Unlink channel finish ID 0x%llx\n",
+ ucp->ucp_unlink_id);
+
+ res.iuc_head.size = sizeof(res);
+ res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL;
+ res.iuc_head.error = SA_AIS_OK;
+ libais_send_response (ucp->ucp_conn_info, &res, sizeof(res));
+
+ list_del(&ucp->ucp_entry);
+ free(ucp);
+}
+
+/*
* Take the channel command data and swap the elements so they match
* our architectures word order.
*/
@@ -3170,10 +3407,15 @@
switch (cpkt->chc_op) {
case EVT_OPEN_CHAN_OP:
- case EVT_CLOSE_CHAN_OP:
cpkt->u.chc_chan.length = swab16(cpkt->u.chc_chan.length);
break;
+ case EVT_UNLINK_CHAN_OP:
+ case EVT_CLOSE_CHAN_OP:
+ cpkt->u.chcu.chcu_name.length = swab16(cpkt->u.chcu.chcu_name.length);
+ cpkt->u.chcu.chcu_unlink_id = swab64(cpkt->u.chcu.chcu_unlink_id);
+ break;
+
case EVT_CLEAR_RET_OP:
cpkt->u.chc_event_id = swab64(cpkt->u.chc_event_id);
break;
@@ -3259,7 +3501,7 @@
log_printf(CHAN_OPEN_DEBUG, "Opening channel %s for node 0x%x\n",
cpkt->u.chc_chan.value, mn->mn_node_info.nodeId);
- eci = find_channel(&cpkt->u.chc_chan);
+ eci = find_channel(&cpkt->u.chc_chan, EVT_CHAN_ACTIVE);
if (!eci) {
eci = create_channel(&cpkt->u.chc_chan);
@@ -3302,12 +3544,12 @@
*/
case EVT_CLOSE_CHAN_OP:
log_printf(LOG_LEVEL_DEBUG, "Closing channel %s for node 0x%x\n",
- cpkt->u.chc_chan.value, mn->mn_node_info.nodeId);
- eci = find_channel(&cpkt->u.chc_chan);
+ cpkt->u.chcu.chcu_name.value, mn->mn_node_info.nodeId);
+ eci = find_channel(&cpkt->u.chcu.chcu_name, cpkt->u.chcu.chcu_unlink_id);
if (!eci) {
log_printf(LOG_LEVEL_NOTICE,
"Channel close request for %s not found\n",
- cpkt->u.chc_chan.value);
+ cpkt->u.chcu.chcu_name.value);
break;
}
@@ -3324,6 +3566,62 @@
break;
/*
+ * Handle a request for channel unlink saEvtChannelUnlink().
+ * We'll look up the channel and mark it as unlinked. Respond to
+ * local library unlink commands.
+ */
+ case EVT_UNLINK_CHAN_OP: {
+ struct unlink_chan_pending *ucp;
+ struct list_head *l, *nxt;
+
+ log_printf(CHAN_UNLINK_DEBUG,
+ "Unlink request channel %s unlink ID 0x%llx from node 0x%x\n",
+ cpkt->u.chcu.chcu_name.value,
+ cpkt->u.chcu.chcu_unlink_id,
+ mn->mn_node_info.nodeId);
+
+
+ /*
+ * look up the channel name and get its assoicated data.
+ */
+ eci = find_channel(&cpkt->u.chcu.chcu_name,
+ EVT_CHAN_ACTIVE);
+ if (!eci) {
+ log_printf(LOG_LEVEL_NOTICE,
+ "Channel unlink request for %s not found\n",
+ cpkt->u.chcu.chcu_name);
+ break;
+ }
+
+ /*
+ * Mark channel as unlinked.
+ */
+ unlink_channel(eci, cpkt->u.chcu.chcu_unlink_id);
+
+ /*
+ * respond only to local library requests.
+ */
+ if (mn->mn_node_info.nodeId == my_node->nodeId) {
+ /*
+ * Complete one of our pending unlink requests
+ */
+ for (l = unlink_pending.next; l != &unlink_pending; l = nxt) {
+ nxt = l->next;
+ ucp = list_entry(l, struct unlink_chan_pending, ucp_entry);
+ log_printf(CHAN_UNLINK_DEBUG,
+ "Compare channel id 0x%llx 0x%llx\n",
+ ucp->ucp_unlink_id, eci->esc_unlink_id);
+ if (ucp->ucp_unlink_id == eci->esc_unlink_id) {
+ evt_chan_unlink_finish(ucp);
+ break;
+ }
+ }
+ }
+ break;
+ }
+
+
+ /*
* saEvtClearRetentiotime handler.
*/
case EVT_CLEAR_RET_OP:
@@ -3375,7 +3673,8 @@
cpkt->u.chc_set_opens.chc_open_count,
mn->mn_node_info.nodeId);
- eci = find_channel(&cpkt->u.chc_set_opens.chc_chan_name);
+ eci = find_channel(&cpkt->u.chc_set_opens.chc_chan_name,
+ EVT_CHAN_ACTIVE);
if (!eci) {
eci = create_channel(&cpkt->u.chc_set_opens.chc_chan_name);
}
===== include/ipc_evt.h 1.2 vs edited =====
--- 1.2/include/ipc_evt.h 2005-02-08 09:16:51 -08:00
+++ edited/include/ipc_evt.h 2005-02-10 13:12:06 -08:00
@@ -44,6 +44,7 @@
MESSAGE_REQ_EVT_OPEN_CHANNEL = 1,
MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC,
MESSAGE_REQ_EVT_CLOSE_CHANNEL,
+ MESSAGE_REQ_EVT_UNLINK_CHANNEL,
MESSAGE_REQ_EVT_SUBSCRIBE,
MESSAGE_REQ_EVT_UNSUBSCRIBE,
MESSAGE_REQ_EVT_PUBLISH,
@@ -54,6 +55,7 @@
enum res_evt_types {
MESSAGE_RES_EVT_OPEN_CHANNEL = 1,
MESSAGE_RES_EVT_CLOSE_CHANNEL,
+ MESSAGE_RES_EVT_UNLINK_CHANNEL,
MESSAGE_RES_EVT_SUBSCRIBE,
MESSAGE_RES_EVT_UNSUBSCRIBE,
MESSAGE_RES_EVT_PUBLISH,
@@ -143,13 +145,35 @@
* MESSAGE_RES_EVT_CLOSE_CHANNEL
*
* icc_head: Results head
- * icc_error: Request result
*
*/
struct res_evt_channel_close {
struct res_header icc_head;
};
+/*
+ * MESSAGE_REQ_EVT_UNLINK_CHANNEL
+ *
+ * iuc_head: Request head
+ * iuc_channel_name: Name of channel to unlink
+ *
+ */
+struct req_evt_channel_unlink {
+
+ struct req_header iuc_head;
+ SaNameT iuc_channel_name;
+};
+
+/*
+ * MESSAGE_RES_EVT_UNLINK_CHANNEL
+ *
+ * iuc_head: Results head
+ *
+ */
+struct res_evt_channel_unlink {
+ struct res_header iuc_head;
+};
+
/*
* MESSAGE_REQ_EVT_SUBSCRIBE
*
@@ -233,6 +257,7 @@
* led_svr_channel_handle: Server channel handle (1 only)
* led_lib_channel_handle: Lib channel handle (2 only)
* led_chan_name: Channel name (3 and 4 only)
+ * led_chan_unlink_id: directs delivery to unlinked channels.
* led_event_id: Event ID (2, 3 and 4 only)
* led_sub_id: Subscription ID (2 only)
* led_publisher_node_id: Node ID of event publisher
@@ -252,6 +277,7 @@
uint32_t led_svr_channel_handle;
SaEvtChannelHandleT led_lib_channel_handle;
SaNameT led_chan_name;
+ uint64_t led_chan_unlink_id;
SaEvtEventIdT led_event_id;
SaEvtSubscriptionIdT led_sub_id;
SaClmNodeIdT led_publisher_node_id;
@@ -312,14 +338,13 @@
/*
* MESSAGE_REQ_EXEC_EVT_CHANCMD
*
- * chc_header: Request head
- * chc_chan: Channel Name
- * chc_op: Channel operation (open, close, clear retentiontime)
+ * Used for various event related operations.
+ *
*/
-
enum evt_chan_ops {
EVT_OPEN_CHAN_OP, /* chc_chan */
- EVT_CLOSE_CHAN_OP, /* chc_chan */
+ EVT_CLOSE_CHAN_OP, /* chc_close_unlink_chan */
+ EVT_UNLINK_CHAN_OP, /* chc_close_unlink_chan */
EVT_CLEAR_RET_OP, /* chc_event_id */
EVT_SET_ID_OP, /* chc_set_id */
EVT_CONF_DONE, /* no data used */
@@ -327,24 +352,51 @@
EVT_OPEN_COUNT_DONE /* no data used */
};
+/*
+ * Used during recovery to set the next issued event ID
+ * based on the highest ID seen by any of the members
+ */
struct evt_set_id {
struct in_addr chc_addr;
SaEvtEventIdT chc_last_id;
};
+/*
+ * For set open count used during recovery to syncronize all nodes
+ *
+ * chc_chan_name: Channel name.
+ * chc_open_count: number of local opens of this channel.
+ */
struct evt_set_opens {
SaNameT chc_chan_name;
uint32_t chc_open_count;
};
+/*
+ * Used to communicate channel to close or unlink.
+ */
+#define EVT_CHAN_ACTIVE 0
+struct evt_close_unlink_chan {
+ SaNameT chcu_name;
+ uint64_t chcu_unlink_id;
+};
+
+/*
+ * Sent via MESSAGE_REQ_EXEC_EVT_CHANCMD
+ *
+ * chc_head: Request head
+ * chc_op: Channel operation (open, close, clear retentiontime)
+ * u: union of operation options.
+ */
struct req_evt_chan_command {
struct req_header chc_head;
int chc_op;
union {
- SaNameT chc_chan;
- SaEvtEventIdT chc_event_id;
- struct evt_set_id chc_set_id;
- struct evt_set_opens chc_set_opens;
+ SaNameT chc_chan;
+ SaEvtEventIdT chc_event_id;
+ struct evt_set_id chc_set_id;
+ struct evt_set_opens chc_set_opens;
+ struct evt_close_unlink_chan chcu;
} u;
};
===== lib/evt.c 1.13 vs edited =====
--- 1.13/lib/evt.c 2005-02-08 09:16:51 -08:00
+++ edited/lib/evt.c 2005-02-10 13:36:03 -08:00
@@ -1022,16 +1022,71 @@
chan_open_done:
return error;
}
-
+/*
+ * The SaEvtChannelUnlink function deletes an existing event channel
+ * from the cluster.
+ *
+ * After completion of the invocation:
+ * - An open of the channel name without a create will fail.
+ * - The channel remains available to any already opened instances.
+ * - If an open/create is performed on this channel name a new instance
+ * is created.
+ * - The ulinked channel's resources will be deleted when the last open
+ * instance is closed.
+ *
+ * Note that an event channel is only deleted from the cluster
+ * namespace when saEvtChannelUnlink() is invoked on it. The deletion
+ * of an event channel frees all resources allocated by the Event
+ * Service for it, such as published events with non-zero retention
+ * time.
+ */
SaAisErrorT
-SaEvtChannelUnlink(
- SaEvtHandleT evtHandle,
- const SaNameT *channelName)
+saEvtChannelUnlink(
+ SaEvtHandleT evt_handle,
+ const SaNameT *channel_name)
{
- /*
- * TODO: Fill in code
+ struct event_instance *evti;
+ struct req_evt_channel_unlink req;
+ struct res_evt_channel_unlink res;
+ SaAisErrorT error;
+
+ error = saHandleInstanceGet(&evt_instance_handle_db, evt_handle,
+ (void*)&evti);
+
+ if (error != SA_AIS_OK) {
+ goto chan_unlink_done;
+ }
+
+ /*
+ * Send the request to the server and wait for a response
*/
- return SA_AIS_ERR_LIBRARY;
+ req.iuc_head.size = sizeof(req);
+ req.iuc_head.id = MESSAGE_REQ_EVT_UNLINK_CHANNEL;
+ req.iuc_channel_name = *channel_name;
+
+
+ pthread_mutex_lock(&evti->ei_mutex);
+
+ error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
+ if (error != SA_AIS_OK) {
+ pthread_mutex_unlock (&evti->ei_mutex);
+ goto chan_unlink_put;
+ }
+ error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq,
+ MESSAGE_RES_EVT_UNLINK_CHANNEL);
+
+ pthread_mutex_unlock (&evti->ei_mutex);
+
+ if (error != SA_AIS_OK) {
+ goto chan_unlink_put;
+ }
+
+ error = res.iuc_head.error;
+
+chan_unlink_put:
+ saHandleInstancePut (&evt_instance_handle_db, evt_handle);
+chan_unlink_done:
+ return error;
}
/*
===== test/testevt.c 1.6 vs edited =====
--- 1.6/test/testevt.c 2005-02-08 09:16:51 -08:00
+++ edited/test/testevt.c 2005-02-10 15:55:36 -08:00
@@ -51,6 +51,9 @@
*
* test_retention();
* Test event retention times.
+ *
+ * test_unlink_channel();
+ * Test event channel unlink.
*/
#include <stdio.h>
@@ -116,6 +119,7 @@
};
char channel[256] = "TESTEVT_CHANNEL";
+char unlink_channel[256] = "TESTEVT_UNLINK_CHANNEL";
SaEvtSubscriptionIdT subscription_id = 0xabcdef;
SaInvocationT open_invocation = 0xaa55cc33;
unsigned long long test_ret_time = 30000000000ULL; /* 30 seconds */
@@ -2378,6 +2382,408 @@
printf("Done\n");
}
+
+void
+unlink_chan_callback(SaEvtSubscriptionIdT my_subscription_id,
+ const SaEvtEventHandleT event_handle,
+ const SaSizeT my_event_data_size)
+{
+ SaAisErrorT result;
+ SaUint8T my_priority;
+ SaTimeT my_retention_time;
+ SaNameT my_publisher_name = {0, {0}};
+ SaTimeT my_publish_time;
+ SaEvtEventIdT my_event_id;
+ SaEvtSubscriptionIdT exp_sub_id;
+
+ printf(" unlink_chan_callback called(%d)\n", ++call_count);
+
+ evt_pat_get_array.patternsNumber = 4;
+ result = saEvtEventAttributesGet(event_handle,
+ &evt_pat_get_array, /* patterns */
+ &my_priority, /* priority */
+ &my_retention_time, /* retention time */
+ &my_publisher_name, /* publisher name */
+ &my_publish_time, /* publish time */
+ &my_event_id /* event_id */
+ );
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event get attr result: %s\n", result_buf);
+ goto evt_free;
+ }
+
+ if (my_event_id == event_id1) {
+ exp_sub_id = sub1;
+ } else if (my_event_id == event_id2) {
+ exp_sub_id = sub2;
+ } else {
+ printf("ERROR: Received event %llx but not sent\n", my_event_id);
+ goto evt_free;
+ }
+
+ if (my_subscription_id != exp_sub_id) {
+ printf("ERROR: sub ID: e=%x, a=%x\n",
+ exp_sub_id, my_subscription_id);
+ goto evt_free;
+ }
+
+evt_free:
+ result = saEvtEventFree(event_handle);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event free result: %s\n", result_buf);
+ }
+}
+
+/*
+ * Test channel unlink operations.
+ * 1. Unlink channel.
+ * 2. Open/create a channel, close channel, open channel.
+ * 3. unlink channel, Open channel.
+ * 4. Open/create, unlink channel, close channel, open channel.
+ * 5. Open/create a channel, unlink channel, open/create channel, send
+ * event on each.
+ * 6. unlink all, close all.
+ */
+SaEvtCallbacksT unlink_callbacks = {
+ open_callback,
+ unlink_chan_callback
+};
+void
+test_unlink_channel()
+{
+ SaEvtHandleT handle;
+ SaEvtChannelHandleT channel_handle1;
+ SaEvtChannelHandleT channel_handle2;
+ SaEvtEventHandleT event_handle1;
+ SaEvtEventHandleT event_handle2;
+ SaEvtChannelOpenFlagsT flags1, flags2;
+ SaNameT channel_name;
+ int result;
+
+ struct pollfd pfd;
+ int nfd;
+ int fd;
+ int timeout = 5000;
+
+ flags1 = SA_EVT_CHANNEL_PUBLISHER |
+ SA_EVT_CHANNEL_SUBSCRIBER |
+ SA_EVT_CHANNEL_CREATE;
+
+ flags2 = SA_EVT_CHANNEL_PUBLISHER |
+ SA_EVT_CHANNEL_SUBSCRIBER;
+
+
+ printf("Test Channel Unlink operations:\n");
+
+ result = saEvtInitialize (&handle, &unlink_callbacks, versions[0].version);
+
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: Event Initialize result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ * 1. Unlink channel.
+ *
+ * Unlink previously opened channel should return OK.
+ * Unlink of non-existent channel should return error.
+ */
+ printf(" 1 Channel unlink:\n");
+
+ strcpy(channel_name.value, channel);
+ channel_name.length = strlen(channel);
+ result = saEvtChannelUnlink(handle, &channel_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel unlink(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ strcpy(channel_name.value, unlink_channel);
+ channel_name.length = strlen(unlink_channel);
+ result = saEvtChannelUnlink(handle, &channel_name);
+ if (result != SA_AIS_ERR_NOT_EXIST) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel unlink(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ * 2. Open/create a channel, close channel, open channel.
+ *
+ * Open/create the channel.
+ * Close the channel.
+ * Open without create. This should succeed in the B spec.
+ */
+ printf(" 2 Channel open/close/open:\n");
+
+ result = saEvtChannelOpen(handle, &channel_name, flags1, 0,
+ &channel_handle1);
+
+
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelClose(channel_handle1);
+
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel close(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelOpen(handle, &channel_name, flags2, 0,
+ &channel_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ * 3. unlink channel, Open channel, close channel
+ *
+ * Unlink the channel. Should mark for deletion but not
+ * delete it since it is already open.
+ * Open the channel without create. This should fail since
+ * the channel is marked for deletion and a new version
+ * hasn't been created.
+ * Close channel.
+ */
+ printf(" 3 Channel unlink/open/close:\n");
+
+ result = saEvtChannelUnlink(handle, &channel_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel unlink result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelOpen(handle, &channel_name, flags2, 0,
+ &channel_handle2);
+ if (result != SA_AIS_ERR_NOT_EXIST) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+ result = saEvtChannelClose(channel_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel close(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ *
+ * 4. Open/create, unlink channel, close channel, open channel.
+ *
+ * Open/create the channel.
+ * unlink the channel.
+ * close the channel. This should delete the channel instance since
+ * it was marked for deletion.
+ * open the channel without create. This should fail since the
+ * channel doesn't exist anymore.
+ */
+ printf(" 4 Channel open/unlink/close/open:\n");
+ result = saEvtChannelOpen(handle, &channel_name, flags1, 0,
+ &channel_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelUnlink(handle, &channel_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel unlink result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelClose(channel_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel close(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelOpen(handle, &channel_name, flags2, 0,
+ &channel_handle1);
+ if (result != SA_AIS_ERR_NOT_EXIST) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ * 5. Open/create a channel, unlink channel, open/create channel, send
+ * event on each.
+ *
+ * Open/create.
+ * unlink. Mark for deletion.
+ * open/create. Create new channel of same name.
+ * send event on each open channel. The events should be received on
+ * separate channels.
+ */
+ printf(" 5 Channel open/unlink/open/send:\n");
+
+ result = saEvtChannelOpen(handle, &channel_name, flags1, 0,
+ &channel_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelUnlink(handle, &channel_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel unlink result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtChannelOpen(handle, &channel_name, flags1, 0,
+ &channel_handle2);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel open result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtEventSubscribe(channel_handle1,
+ &subscribe_filters,
+ sub1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel subscribe(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtEventSubscribe(channel_handle2,
+ &subscribe_filters,
+ sub2);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: channel subscribe(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ retention_time = 0ULL;
+ result = saEvtEventAllocate(channel_handle1, &event_handle1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event allocate(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+ result = saEvtEventAttributesSet(event_handle1,
+ &evt_pat_set_array,
+ TEST_PRIORITY,
+ retention_time,
+ &test_pub_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event set(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtEventAllocate(channel_handle2, &event_handle2);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event allocate(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+ result = saEvtEventAttributesSet(event_handle2,
+ &evt_pat_set_array,
+ TEST_PRIORITY,
+ retention_time,
+ &test_pub_name);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event set(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ result = saEvtEventPublish(event_handle1, 0, 0, &event_id1);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event publish(1) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+ result = saEvtEventPublish(event_handle2, 0, 0, &event_id2);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: event publish(2) result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+
+ result = saEvtSelectionObjectGet(handle, &fd);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: select object get result: %s\n", result_buf);
+ goto unlink_exit;
+ }
+
+ /*
+ * We should see a total of two events processed, not four
+ * as if both events were recevied on both channels.
+ */
+ call_count = 0;
+ do {
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ nfd = poll(&pfd, 1, timeout);
+ if (nfd <= 0) {
+ if (nfd < 0) {
+ perror("ERROR: poll error");
+ goto unlink_exit;
+ }
+ } else {
+
+ result = saEvtDispatch(handle, SA_DISPATCH_ONE);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: saEvtDispatch %s\n", result_buf);
+ goto unlink_exit;
+ }
+ }
+ } while (nfd > 0);
+
+ if (call_count != 2) {
+ printf("ERROR: processed %d events\n", call_count);
+ goto unlink_exit;
+ }
+
+
+ /*
+ * 6. unlink all, close all.
+ *
+ * close all open channels.
+ * unlink the channel.
+ * open without create the channel. Verify that the channel no
+ * longer exists.
+ */
+ printf(" 6 Channel unlink all/close all/open:\n");
+
+unlink_exit:
+ saEvtChannelClose(channel_handle1);
+ saEvtChannelClose(channel_handle2);
+ saEvtChannelUnlink(handle, &channel_name);
+ result = saEvtFinalize(handle);
+ if (result != SA_AIS_OK) {
+ get_sa_error(result, result_buf, result_buf_len);
+ printf("ERROR: Event Finalize result: %s\n", result_buf);
+ }
+
+ printf("Done\n");
+
+}
int main (void)
{
test_initialize ();
@@ -2387,6 +2793,7 @@
test_multi_channel2();
test_multi_channel3();
test_retention();
+ test_unlink_channel();
return (0);
}
_______________________________________________
Openais mailing list
Openais@lists.osdl.org
http://lists.osdl.org/mailman/listinfo/openais
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic