[prev in list] [next in list] [prev in thread] [next in thread]
List: grid-engine-cvs
Subject: CVS update: /gridengine/, /gridengine/review/, /gridengine/source/libs/comm/
From: crei () sunsource ! net
Date: 2005-06-27 15:01:32
Message-ID: 20050627150132.29347.qmail () sunsource ! net
[Download RAW message or body]
User: crei
Date: 05/06/27 08:01:32
Added:
/gridengine/review/
CR-2005-06-27-1.txt
Modified:
/gridengine/
Changelog
/gridengine/source/libs/comm/
cl_commlib.c, cl_data_types.h
Log:
CR-2005-06-27-1: Enhancem.: - added send message queue for connection
handles (performance enhancement)
- code cleanup
Review: RD
Issue number:
Obtained from:
Submitted by:
Reviewed by:
File Changes:
Directory: /gridengine/
=======================
File [changed]: Changelog
Url: http://gridengine.sunsource.net/source/browse/gridengine/Changelog?r1=1.2892&r2=1.2893
Delta lines: +5 -0
-------------------
--- Changelog 27 Jun 2005 12:48:16 -0000 1.2892
+++ Changelog 27 Jun 2005 15:01:27 -0000 1.2893
@@ -1,3 +1,8 @@
+CR-2005-06-27-1: Enhancem.: - added send message queue for connection
+ handles (performance enhancement)
+ - code cleanup
+ Review: RD
+
CR-2005-06-27-0: Testsuite: throughput test:
using tcl timestamp command instead of
clock seconds command which caused time measurement
Directory: /gridengine/review/
==============================
File [added]: CR-2005-06-27-1.txt
Url: http://gridengine.sunsource.net/source/browse/gridengine/review/CR-2005-06-27-1.txt?rev=1.1&content-type=text/vnd.viewcvs-markup
Added lines: 103
----------------
Source Code Review Form
=======================
(version 1.2)
Date: 2005-06-27
1 Identification
CR-2005-06-27-1: Enhancem.: - added send message queue for connection
handles (performance enhancement)
- code cleanup
Review: RD
2 Conformance to specification
2 [x] yes
2 [ ] no - comment:
3 Documentation
3.1 user/admin guide
3.1 [x] yes
3.1 [ ] no - comment:
3.2 man pages (user view)
3.2 [x] yes
3.2 [ ] no - comment:
3.3 -help output
3.3 [x] yes
3.3 [ ] no - comment:
3.4 documented interfaces (at least GDI, EVI, libs, ADOC format)
3.4 [x] yes
3.4 [ ] no - comment:
3.5 messages, output formats
3.5 [x] yes
3.5 [ ] no - comment:
3.6 Bugster CR and Issuezilla
3.6 [x] yes
3.6 [ ] no - comment:
3.7 Issuezilla
3.7 [x] yes
3.7 [ ] no - comment:
4 Source review
4.1 Style guide conformance
4.1 [x] yes
4.1 [ ] no - comment:
4.2 Memory leaks
4.2 [x] yes
4.2 [ ] no - comment:
4.3 Thread safe
4.3 [x] yes
4.3 [ ] no
5 Tests
5.1 flawfinder diff
No new flaws.
5.1 [x] yes
5.1 [ ] no
5.2 Used memory leak debugger
5.2 [x] yes
5.2 [ ] no - comment:
5.3 Manual tests
Description:
Successfull execution
5.3 [x] yes
5.3 [ ] no - comment:
5.4 Testsuite covers issue
5.4 [x] yes
5.4 [ ] no - comment:
5.5 If 5.4 = no: Testsuite issue created
5.5 [x] yes
5.5 [ ] no - comment:
5.6 Testsuite run successfull
5.6 [x] yes
5.6 [ ] no - comment:
6 Comments
7 Accepted
7 [x] yes
7 [ ] no - comment:
Directory: /gridengine/source/libs/comm/
========================================
File [changed]: cl_commlib.c
Url: http://gridengine.sunsource.net/source/browse/gridengine/source/libs/comm/cl_commlib.c?r1=1.69&r2=1.70
Delta lines: +291 -136
-----------------------
--- cl_commlib.c 23 Jun 2005 12:16:59 -0000 1.69
+++ cl_commlib.c 27 Jun 2005 15:01:29 -0000 1.70
@@ -99,6 +99,14 @@
+static int cl_commlib_send_message_to_endpoint(cl_com_handle_t* handle,
+ cl_com_endpoint_t* endpoint,
+ cl_xml_ack_type_t ack_type,
+ cl_byte_t* data,
+ unsigned long size ,
+ unsigned long response_mid,
+ unsigned long tag);
+
/* global lists */
/* cl_com_handle_list
@@ -807,7 +815,7 @@
new_handle->connection_list_mutex = NULL;
new_handle->connection_list = NULL;
new_handle->received_message_queue = NULL;
-
+ new_handle->send_message_queue = NULL;
new_handle->next_free_client_id = 1;
new_handle->service_handler = NULL;
new_handle->framework = framework;
@@ -989,6 +997,30 @@
return NULL;
}
+ if ((return_value=cl_app_message_queue_setup(&(new_handle->send_message_queue), \
"send message queue", 1)) != CL_RETVAL_OK) { + int mutex_ret_val;
+ cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
+ cl_com_free_endpoint(&(new_handle->local));
+ mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
+ if (mutex_ret_val != EBUSY) {
+ free(new_handle->connection_list_mutex);
+ }
+ mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
+ if (mutex_ret_val != EBUSY) {
+ free(new_handle->messages_ready_mutex);
+ }
+ cl_com_free_handle_statistic(&(new_handle->statistic));
+ free(new_handle);
+ free(local_hostname);
+ cl_raw_list_unlock(cl_com_handle_list);
+ if (commlib_error) {
+ *commlib_error = return_value;
+ }
+ cl_commlib_push_application_error(return_value, NULL);
+ return NULL;
+ }
+
if ((return_value=cl_app_message_queue_setup(&(new_handle->received_message_queue), \
"received message queue", 1)) != CL_RETVAL_OK) { int mutex_ret_val;
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
@@ -1014,6 +1046,7 @@
if ((return_value=cl_connection_list_setup(&(new_handle->connection_list), \
"connection list", 1)) != CL_RETVAL_OK) { int mutex_ret_val;
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
cl_connection_list_cleanup(&(new_handle->connection_list));
cl_com_free_endpoint(&(new_handle->local));
@@ -1040,6 +1073,7 @@
int mutex_ret_val;
cl_string_list_cleanup(&(new_handle->allowed_host_list));
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
cl_connection_list_cleanup(&(new_handle->connection_list));
cl_com_free_endpoint(&(new_handle->local));
@@ -1081,6 +1115,7 @@
int mutex_ret_val;
cl_com_close_connection(&new_con);
cl_string_list_cleanup(&(new_handle->allowed_host_list));
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
cl_connection_list_cleanup(&(new_handle->connection_list));
cl_com_free_endpoint(&(new_handle->local));
@@ -1113,6 +1148,7 @@
cl_com_connection_request_handler_cleanup(new_con);
cl_com_close_connection(&new_con);
cl_string_list_cleanup(&(new_handle->allowed_host_list));
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
cl_connection_list_cleanup(&(new_handle->connection_list));
cl_com_free_endpoint(&(new_handle->local));
@@ -1228,6 +1264,7 @@
cl_com_close_connection(&(new_handle->service_handler));
}
cl_connection_list_cleanup(&(new_handle->connection_list));
+ cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
cl_string_list_cleanup(&(new_handle->allowed_host_list));
cl_com_free_endpoint(&(new_handle->local));
@@ -1577,6 +1614,7 @@
cl_com_close_connection(&(handle->service_handler));
}
+ cl_app_message_queue_cleanup(&(handle->send_message_queue));
cl_app_message_queue_cleanup(&(handle->received_message_queue));
cl_connection_list_cleanup(&(handle->connection_list));
cl_com_free_endpoint(&(handle->local));
@@ -5855,7 +5893,7 @@
return CL_RETVAL_UNKNOWN;
}
-#if 0
+
#ifdef __CL_FUNCTION__
#undef __CL_FUNCTION__
#endif
@@ -5897,9 +5935,6 @@
return CL_RETVAL_UNKNOWN_ENDPOINT;
}
-
- CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending message to : ", \
endpoint->comp_host, endpoint->comp_name, (int)endpoint->comp_id);
-
while(retry_send != 0) {
/* lock handle connection list */
@@ -6003,7 +6038,6 @@
return return_value;
}
-#endif
#ifdef __CL_FUNCTION__
@@ -6027,6 +6061,7 @@
int return_value = CL_RETVAL_OK;
cl_com_endpoint_t receiver;
char* unique_hostname = NULL;
+ cl_byte_t* help_data = NULL;
int retry_send = 1;
cl_bool_t ignore_connection = CL_FALSE;
@@ -6049,12 +6084,92 @@
return CL_RETVAL_UNKNOWN_ENDPOINT;
}
+ /* make a copy of the message date (if wished) */
+ if (copy_data == CL_TRUE) {
+ help_data = (cl_byte_t*) malloc((sizeof(cl_byte_t)*size));
+ if (help_data == NULL) {
+ return CL_RETVAL_MALLOC;
+ }
+ memcpy(help_data, data, (sizeof(cl_byte_t)*size) );
+ } else {
+ help_data = data;
+ }
+
+
+ /*
+ * The send_message_queue can only be used if the following parameters are not \
requested + * by the user:
+ *
+ * - mid != NULL : The caller wants the message id which can't be \
set before + * the message is added to the \
connection's message list + *
+ * - wait_for_ack == CL_TRUE : The caller wants to wait for the response, we \
have to do + * the transaction at once
+ *
+ */
+
+
+ if (mid == NULL && wait_for_ack == CL_FALSE && cl_com_create_threads != \
CL_NO_THREAD ) { + cl_com_endpoint_t* destination_endpoint = NULL;
+
+ /* using send_message_queue for this message */
+ CL_LOG_STR_STR_INT(CL_LOG_INFO, "add message into send queue for: ", \
un_resolved_hostname, component_name, (int)component_id); +
+ /* resolve hostname and create endpoint structure */
+ return_value = cl_com_cached_gethostbyname(un_resolved_hostname, \
&unique_hostname,NULL, NULL, NULL); + if (return_value != CL_RETVAL_OK) {
+ CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
+ return return_value;
+ }
+
+ destination_endpoint = \
cl_com_create_endpoint(unique_hostname,component_name,component_id); + \
free(unique_hostname); + unique_hostname = NULL;
+
+ if (destination_endpoint == NULL) {
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
+ return CL_RETVAL_MALLOC;
+ }
+
+ return_value = cl_app_message_queue_append(handle->send_message_queue, NULL,
+ destination_endpoint, ack_type,
+ help_data, size, response_mid, tag, \
1); + if (return_value != CL_RETVAL_OK) {
+ CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
+ return return_value;
+ }
+
+ switch(cl_com_create_threads) {
+ case CL_NO_THREAD:
+ CL_LOG(CL_LOG_INFO,"no threads enabled");
+ /* we just want to trigger write , no wait for read*/
+ cl_commlib_trigger(handle, 1);
+ break;
+ case CL_RW_THREAD:
+ /* we just want to trigger write , no wait for read*/
+ cl_thread_trigger_event(handle->write_thread);
+ break;
+ }
+
+ } else {
+
CL_LOG_STR_STR_INT(CL_LOG_INFO, "new message for: ", un_resolved_hostname, \
component_name, (int)component_id);
/* resolve hostname */
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, \
&unique_hostname,NULL, NULL, NULL); if (return_value != CL_RETVAL_OK) {
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
return return_value;
}
@@ -6069,7 +6184,6 @@
/* lock handle connection list */
cl_raw_list_lock(handle->connection_list);
elem = cl_connection_list_get_first_elem(handle->connection_list);
- connection = NULL;
while(elem) {
ignore_connection = CL_FALSE;
connection = elem->connection;
@@ -6084,6 +6198,7 @@
}
default: {
ignore_connection = CL_TRUE;
+ CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected \
connection state"); break;
}
}
@@ -6091,7 +6206,6 @@
if (ignore_connection == CL_FALSE &&
cl_com_compare_endpoints(connection->receiver, &receiver)) {
- cl_byte_t* help_data = NULL;
/* send message to client (no broadcast) */
@@ -6116,28 +6230,24 @@
CL_LOG(CL_LOG_ERROR,"Protocol error: haven't received such a high \
message id till now"); cl_raw_list_unlock(handle->connection_list);
free(unique_hostname);
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
return CL_RETVAL_PROTOCOL_ERROR;
}
CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending it to: ", \
connection->receiver->comp_host,
\
connection->receiver->comp_name,
\
(int)connection->receiver->comp_id);
- if (copy_data == CL_TRUE) {
- help_data = (cl_byte_t*) malloc((sizeof(cl_byte_t)*size));
- if (help_data == NULL) {
- cl_raw_list_unlock(handle->connection_list);
- free(unique_hostname);
- return CL_RETVAL_MALLOC;
- }
- memcpy(help_data, data, (sizeof(cl_byte_t)*size) );
- return_value = cl_com_setup_message(&message, connection, help_data, \
size,ack_type,response_mid,tag);
- } else {
- return_value = cl_com_setup_message(&message, connection, data, size, \
ack_type,response_mid,tag);
- }
+
+ return_value = cl_com_setup_message(&message, connection, help_data, \
size, ack_type,response_mid,tag);
if (return_value != CL_RETVAL_OK) {
cl_raw_list_unlock(handle->connection_list);
free(unique_hostname);
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
return return_value;
}
@@ -6155,13 +6265,8 @@
message_added = 1;
break;
- } else {
- if (ignore_connection == CL_TRUE) {
- CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected connection \
state");
- }
}
elem = cl_connection_list_get_next_elem(elem);
- connection = NULL;
}
cl_raw_list_unlock(handle->connection_list);
@@ -6172,6 +6277,9 @@
if (return_value != CL_RETVAL_OK) {
free(unique_hostname);
CL_LOG_STR(CL_LOG_ERROR,"cl_commlib_open_connection() returned: \
",cl_get_error_text(return_value)); + if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
return return_value;
}
if (retry_send >= 3) {
@@ -6197,6 +6305,9 @@
}
} else {
free(unique_hostname);
+ if (copy_data == CL_TRUE) {
+ free(help_data);
+ }
return CL_RETVAL_SEND_ERROR;
}
@@ -6213,6 +6324,7 @@
CL_LOG_INT(CL_LOG_INFO,"message acknowledge expected, waiting for ack", \
(int)my_mid); return_value = cl_commlib_check_for_ack(handle, receiver.comp_host, \
component_name, component_id, my_mid, CL_TRUE); free(unique_hostname);
+ }
return return_value;
}
@@ -6466,6 +6578,10 @@
int wait_for_events = 1;
int select_sec_timeout = 0;
int select_usec_timeout = 100*1000;
+ cl_app_message_queue_elem_t* mq_elem = NULL;
+ int mq_return_value = CL_RETVAL_OK;
+
+
int message_received = 0;
int trigger_write_thread = 0;
cl_connection_list_elem_t* elem = NULL;
@@ -6529,6 +6645,25 @@
} else {
the_handler = NULL;
}
+
+ cl_raw_list_lock(handle->send_message_queue);
+ while((mq_elem = \
cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) { + \
mq_return_value = cl_commlib_send_message_to_endpoint(handle, \
mq_elem->snd_destination, + \
mq_elem->snd_ack_type, mq_elem->snd_data, + \
mq_elem->snd_size, mq_elem->snd_response_mid, + \
mq_elem->snd_tag); + /* remove queue entries */
+ cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
+ if (mq_return_value != CL_RETVAL_OK) {
+ CL_LOG_STR(CL_LOG_ERROR,"can't send message:", \
cl_get_error_text(mq_return_value)); + free(mq_elem->snd_data);
+ }
+ cl_com_free_endpoint(&(mq_elem->snd_destination));
+ free(mq_elem);
+ }
+ cl_raw_list_unlock(handle->send_message_queue);
+
+
ret_val = cl_com_open_connection_request_handler(handle->framework,
handle->connection_list,
the_handler,
@@ -6845,6 +6980,8 @@
cl_com_handle_t* handle = NULL;
int trigger_read_thread = 0;
char tmp_string[1024];
+ cl_app_message_queue_elem_t* mq_elem = NULL;
+ int mq_return_value = CL_RETVAL_OK;
/* get pointer to cl_thread_settings_t struct */
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
@@ -6883,6 +7020,24 @@
cl_raw_list_unlock(cl_com_handle_list);
} else {
+
+ cl_raw_list_lock(handle->send_message_queue);
+ while((mq_elem = \
cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) { + \
mq_return_value = cl_commlib_send_message_to_endpoint(handle, \
mq_elem->snd_destination, + \
mq_elem->snd_ack_type, mq_elem->snd_data, + \
mq_elem->snd_size, mq_elem->snd_response_mid, + \
mq_elem->snd_tag); + /* remove queue entries */
+ cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
+ if (mq_return_value != CL_RETVAL_OK) {
+ CL_LOG_STR(CL_LOG_ERROR,"can't send message:", \
cl_get_error_text(mq_return_value)); + free(mq_elem->snd_data);
+ }
+ cl_com_free_endpoint(&(mq_elem->snd_destination));
+ free(mq_elem);
+ }
+ cl_raw_list_unlock(handle->send_message_queue);
+
/* do write select */
ret_val = cl_com_open_connection_request_handler(handle->framework,
File [changed]: cl_data_types.h
Url: http://gridengine.sunsource.net/source/browse/gridengine/source/libs/comm/cl_data_types.h?r1=1.25&r2=1.26
Delta lines: +2 -0
-------------------
--- cl_data_types.h 23 Jun 2005 12:17:00 -0000 1.25
+++ cl_data_types.h 27 Jun 2005 15:01:29 -0000 1.26
@@ -305,6 +305,8 @@
cl_raw_list_t* allowed_host_list; /* string list with hostnames allowed to \
connect */ unsigned long next_free_client_id;
+ cl_raw_list_t* send_message_queue; /* used as queue for application messages \
which have to + be send to a connection \
( used in cl_commlib_send_message()) */ cl_raw_list_t* received_message_queue; /* \
used as queue for application messages which are ready to be
handled over to the application ( used \
in cl_commlib_receive_message()
This message queue must have a "garbage \
collector", removing
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic