[prev in list] [next in list] [prev in thread] [next in thread] 

List:       grid-engine-cvs
Subject:    CVS update: /gridengine/, /gridengine/review/, /gridengine/source/libs/comm/
From:       crei () sunsource ! net
Date:       2005-06-27 15:01:32
Message-ID: 20050627150132.29347.qmail () sunsource ! net
[Download RAW message or body]

User: crei    
Date: 05/06/27 08:01:32

Added:
 /gridengine/review/
  CR-2005-06-27-1.txt

Modified:
 /gridengine/
  Changelog
 /gridengine/source/libs/comm/
  cl_commlib.c, cl_data_types.h

Log:
 CR-2005-06-27-1: Enhancem.: - added send message queue for connection
                               handles (performance enhancement)
                             - code cleanup
                  Review:    RD
 
 Issue number:  
 Obtained from: 
 Submitted by:  
 Reviewed by:   

File Changes:

Directory: /gridengine/
=======================

File [changed]: Changelog
Url: http://gridengine.sunsource.net/source/browse/gridengine/Changelog?r1=1.2892&r2=1.2893
 Delta lines:  +5 -0
-------------------
--- Changelog	27 Jun 2005 12:48:16 -0000	1.2892
+++ Changelog	27 Jun 2005 15:01:27 -0000	1.2893
@@ -1,3 +1,8 @@
+CR-2005-06-27-1: Enhancem.: - added send message queue for connection
+                              handles (performance enhancement)
+                            - code cleanup
+                 Review:    RD
+
 CR-2005-06-27-0: Testsuite: throughput test:
                             using tcl timestamp command instead of 
                             clock seconds command which caused time measurement

Directory: /gridengine/review/
==============================

File [added]: CR-2005-06-27-1.txt
Url: http://gridengine.sunsource.net/source/browse/gridengine/review/CR-2005-06-27-1.txt?rev=1.1&content-type=text/vnd.viewcvs-markup
 Added lines: 103
----------------
Source Code Review Form 
=======================
(version 1.2)

Date: 2005-06-27

1 Identification
CR-2005-06-27-1: Enhancem.: - added send message queue for connection
                              handles (performance enhancement)
                            - code cleanup
                 Review:    RD


2 Conformance to specification

2 [x] yes
2 [ ] no - comment:


3 Documentation

3.1 user/admin guide
3.1 [x] yes
3.1 [ ] no - comment:

3.2 man pages (user view)
3.2 [x] yes
3.2 [ ] no - comment:

3.3 -help output
3.3 [x] yes
3.3 [ ] no - comment:

3.4 documented interfaces (at least GDI, EVI, libs, ADOC format)
3.4 [x] yes
3.4 [ ] no - comment:

3.5 messages, output formats
3.5 [x] yes
3.5 [ ] no - comment:

3.6 Bugster CR and Issuezilla
3.6 [x] yes
3.6 [ ] no - comment:

3.7 Issuezilla
3.7 [x] yes
3.7 [ ] no - comment:


4 Source review

4.1 Style guide conformance
4.1 [x] yes
4.1 [ ] no - comment:

4.2 Memory leaks
4.2 [x] yes
4.2 [ ] no - comment:

4.3 Thread safe
4.3 [x] yes
4.3 [ ] no


5 Tests

5.1 flawfinder diff
No new flaws.
5.1 [x] yes
5.1 [ ] no

5.2 Used memory leak debugger
5.2 [x] yes
5.2 [ ] no - comment:

5.3 Manual tests
Description:
Successfull execution
5.3 [x] yes
5.3 [ ] no - comment:

5.4 Testsuite covers issue
5.4 [x] yes
5.4 [ ] no - comment:

5.5 If 5.4 = no: Testsuite issue created
5.5 [x] yes
5.5 [ ] no - comment:

5.6 Testsuite run successfull
5.6 [x] yes
5.6 [ ] no - comment:


6 Comments


7 Accepted
7 [x] yes
7 [ ] no - comment:



Directory: /gridengine/source/libs/comm/
========================================

File [changed]: cl_commlib.c
Url: http://gridengine.sunsource.net/source/browse/gridengine/source/libs/comm/cl_commlib.c?r1=1.69&r2=1.70
 Delta lines:  +291 -136
-----------------------
--- cl_commlib.c	23 Jun 2005 12:16:59 -0000	1.69
+++ cl_commlib.c	27 Jun 2005 15:01:29 -0000	1.70
@@ -99,6 +99,14 @@
 
 
 
+static int cl_commlib_send_message_to_endpoint(cl_com_handle_t*   handle,
+                                               cl_com_endpoint_t* endpoint, 
+                                               cl_xml_ack_type_t  ack_type, 
+                                               cl_byte_t*         data, 
+                                               unsigned long      size , 
+                                               unsigned long      response_mid,
+                                               unsigned long      tag);
+
 /* global lists */
 
 /* cl_com_handle_list
@@ -807,7 +815,7 @@
    new_handle->connection_list_mutex = NULL;
    new_handle->connection_list = NULL;
    new_handle->received_message_queue = NULL;
-
+   new_handle->send_message_queue = NULL;
    new_handle->next_free_client_id = 1;
    new_handle->service_handler = NULL;
    new_handle->framework = framework;
@@ -989,6 +997,30 @@
       return NULL;
    }
 
+   if ((return_value=cl_app_message_queue_setup(&(new_handle->send_message_queue), \
"send message queue", 1)) != CL_RETVAL_OK) { +      int mutex_ret_val;
+      cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
+      cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
+      cl_com_free_endpoint(&(new_handle->local));
+      mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
+      if (mutex_ret_val != EBUSY) {
+         free(new_handle->connection_list_mutex); 
+      }
+      mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
+      if (mutex_ret_val != EBUSY) {
+         free(new_handle->messages_ready_mutex); 
+      }
+      cl_com_free_handle_statistic(&(new_handle->statistic));
+      free(new_handle);
+      free(local_hostname);
+      cl_raw_list_unlock(cl_com_handle_list);
+      if (commlib_error) {
+         *commlib_error = return_value;
+      }
+      cl_commlib_push_application_error(return_value, NULL);
+      return NULL;
+   } 
+
    if ((return_value=cl_app_message_queue_setup(&(new_handle->received_message_queue), \
"received message queue", 1)) != CL_RETVAL_OK) {  int mutex_ret_val;
       cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
@@ -1014,6 +1046,7 @@
 
    if ((return_value=cl_connection_list_setup(&(new_handle->connection_list), \
"connection list", 1)) != CL_RETVAL_OK) {  int mutex_ret_val;
+      cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
       cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
       cl_connection_list_cleanup(&(new_handle->connection_list));
       cl_com_free_endpoint(&(new_handle->local));
@@ -1040,6 +1073,7 @@
       int mutex_ret_val;
 
       cl_string_list_cleanup(&(new_handle->allowed_host_list));
+      cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
       cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
       cl_connection_list_cleanup(&(new_handle->connection_list));
       cl_com_free_endpoint(&(new_handle->local));
@@ -1081,6 +1115,7 @@
          int mutex_ret_val;
          cl_com_close_connection(&new_con);
          cl_string_list_cleanup(&(new_handle->allowed_host_list));
+         cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
          cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
          cl_connection_list_cleanup(&(new_handle->connection_list));
          cl_com_free_endpoint(&(new_handle->local));
@@ -1113,6 +1148,7 @@
          cl_com_connection_request_handler_cleanup(new_con);
          cl_com_close_connection(&new_con);
          cl_string_list_cleanup(&(new_handle->allowed_host_list));
+         cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
          cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
          cl_connection_list_cleanup(&(new_handle->connection_list));
          cl_com_free_endpoint(&(new_handle->local));
@@ -1228,6 +1264,7 @@
          cl_com_close_connection(&(new_handle->service_handler));
       }
       cl_connection_list_cleanup(&(new_handle->connection_list));
+      cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
       cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
       cl_string_list_cleanup(&(new_handle->allowed_host_list));
       cl_com_free_endpoint(&(new_handle->local));
@@ -1577,6 +1614,7 @@
          cl_com_close_connection(&(handle->service_handler));
       }
    
+      cl_app_message_queue_cleanup(&(handle->send_message_queue));
       cl_app_message_queue_cleanup(&(handle->received_message_queue));
       cl_connection_list_cleanup(&(handle->connection_list));
       cl_com_free_endpoint(&(handle->local));
@@ -5855,7 +5893,7 @@
    return CL_RETVAL_UNKNOWN;
 }
 
-#if 0
+
 #ifdef __CL_FUNCTION__
 #undef __CL_FUNCTION__
 #endif
@@ -5897,9 +5935,6 @@
       return CL_RETVAL_UNKNOWN_ENDPOINT;
    }
 
-
-   CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending message to :      ", \
                endpoint->comp_host, endpoint->comp_name, (int)endpoint->comp_id);
-   
    while(retry_send != 0) {
    
       /* lock handle connection list */
@@ -6003,7 +6038,6 @@
 
    return return_value;
 }
-#endif
 
 
 #ifdef __CL_FUNCTION__
@@ -6027,6 +6061,7 @@
    int return_value = CL_RETVAL_OK;
    cl_com_endpoint_t receiver;
    char* unique_hostname = NULL;
+   cl_byte_t* help_data = NULL;
    int retry_send = 1;
    cl_bool_t ignore_connection = CL_FALSE;
 
@@ -6049,12 +6084,92 @@
       return CL_RETVAL_UNKNOWN_ENDPOINT;
    }
 
+   /* make a copy of the message date (if wished) */
+   if (copy_data == CL_TRUE) {
+      help_data = (cl_byte_t*) malloc((sizeof(cl_byte_t)*size));
+      if (help_data == NULL) {
+         return CL_RETVAL_MALLOC;
+      }
+      memcpy(help_data, data, (sizeof(cl_byte_t)*size) );
+   } else {
+      help_data = data;
+   }
+
+
+   /*
+    *  The send_message_queue can only be used if the following parameters are not \
requested +    *  by the user:
+    *
+    *  - mid != NULL               : The caller wants the message id which can't be \
set before +    *                                the message is added to the \
connection's message list +    *
+    *  - wait_for_ack == CL_TRUE   : The caller wants to wait for the response, we \
have to do +    *                                the transaction at once
+    *
+    */
+
+
+   if (mid == NULL && wait_for_ack == CL_FALSE && cl_com_create_threads != \
CL_NO_THREAD ) { +      cl_com_endpoint_t* destination_endpoint = NULL;
+
+      /* using send_message_queue for this message */
+      CL_LOG_STR_STR_INT(CL_LOG_INFO, "add message into send queue for:      ", \
un_resolved_hostname, component_name, (int)component_id); +    
+      /* resolve hostname and create endpoint structure */
+      return_value = cl_com_cached_gethostbyname(un_resolved_hostname, \
&unique_hostname,NULL, NULL, NULL); +      if (return_value != CL_RETVAL_OK) {
+         CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+         if (copy_data == CL_TRUE) {
+            free(help_data);
+         }
+         return return_value;
+      }
+
+      destination_endpoint = \
cl_com_create_endpoint(unique_hostname,component_name,component_id); +      \
free(unique_hostname); +      unique_hostname = NULL;
+
+      if (destination_endpoint == NULL) {
+         if (copy_data == CL_TRUE) {
+            free(help_data);
+         }
+         return CL_RETVAL_MALLOC;
+      }
+
+      return_value = cl_app_message_queue_append(handle->send_message_queue, NULL,
+                                                 destination_endpoint, ack_type,
+                                                 help_data, size, response_mid, tag, \
1); +      if (return_value != CL_RETVAL_OK) {
+         CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+         if (copy_data == CL_TRUE) {
+            free(help_data);
+         }
+         return return_value;
+      }
+
+      switch(cl_com_create_threads) {
+         case CL_NO_THREAD:
+            CL_LOG(CL_LOG_INFO,"no threads enabled");
+            /* we just want to trigger write , no wait for read*/
+            cl_commlib_trigger(handle, 1);
+            break;
+         case CL_RW_THREAD:
+            /* we just want to trigger write , no wait for read*/
+            cl_thread_trigger_event(handle->write_thread);
+            break;
+      }
+
+   } else {
+
    CL_LOG_STR_STR_INT(CL_LOG_INFO, "new message for:      ", un_resolved_hostname, \
component_name, (int)component_id);  
    /* resolve hostname */
    return_value = cl_com_cached_gethostbyname(un_resolved_hostname, \
&unique_hostname,NULL, NULL, NULL);  if (return_value != CL_RETVAL_OK) {
       CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
+         if (copy_data == CL_TRUE) {
+            free(help_data);
+         }
       return return_value;
    }
 
@@ -6069,7 +6184,6 @@
       /* lock handle connection list */
       cl_raw_list_lock(handle->connection_list);
       elem = cl_connection_list_get_first_elem(handle->connection_list);     
-      connection = NULL;
       while(elem) {
          ignore_connection = CL_FALSE;
          connection = elem->connection;
@@ -6084,6 +6198,7 @@
                }
                default: {
                   ignore_connection = CL_TRUE;
+                     CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected \
connection state");  break;
                }
             }
@@ -6091,7 +6206,6 @@
          
          if (ignore_connection == CL_FALSE && 
              cl_com_compare_endpoints(connection->receiver, &receiver)) {
-            cl_byte_t* help_data = NULL;
    
             /* send message to client (no broadcast) */
 
@@ -6116,28 +6230,24 @@
                CL_LOG(CL_LOG_ERROR,"Protocol error: haven't received such a high \
message id till now");  cl_raw_list_unlock(handle->connection_list);
                free(unique_hostname);
+                  if (copy_data == CL_TRUE) {
+                     free(help_data);
+                  }
                return CL_RETVAL_PROTOCOL_ERROR;
             }    
 
             CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending it to:        ", \
                connection->receiver->comp_host,
                                                                       \
                connection->receiver->comp_name,
                                                                       \
                (int)connection->receiver->comp_id);
-            if (copy_data == CL_TRUE) {
-               help_data = (cl_byte_t*) malloc((sizeof(cl_byte_t)*size));
-               if (help_data == NULL) {
-                  cl_raw_list_unlock(handle->connection_list);
-                  free(unique_hostname);
-                  return CL_RETVAL_MALLOC;
-               }
-               memcpy(help_data, data, (sizeof(cl_byte_t)*size) );
-               return_value = cl_com_setup_message(&message, connection, help_data, \
                size,ack_type,response_mid,tag);
-            } else {
-               return_value = cl_com_setup_message(&message, connection, data, size, \
                ack_type,response_mid,tag);
-            }
+
+               return_value = cl_com_setup_message(&message, connection, help_data, \
size, ack_type,response_mid,tag);  
             if (return_value != CL_RETVAL_OK) {
                cl_raw_list_unlock(handle->connection_list);
                free(unique_hostname);
+                  if (copy_data == CL_TRUE) {
+                     free(help_data);
+                  }
                return return_value;
             }
    
@@ -6155,13 +6265,8 @@
             message_added = 1;
             
             break;
-         } else {
-            if (ignore_connection == CL_TRUE) {
-               CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected connection \
                state");
-            }
          }
          elem = cl_connection_list_get_next_elem(elem);
-         connection = NULL;
       }
       cl_raw_list_unlock(handle->connection_list);
 
@@ -6172,6 +6277,9 @@
          if (return_value != CL_RETVAL_OK) {
             free(unique_hostname);
             CL_LOG_STR(CL_LOG_ERROR,"cl_commlib_open_connection() returned: \
",cl_get_error_text(return_value)); +               if (copy_data == CL_TRUE) {
+                  free(help_data);
+               }
             return return_value;
          }
          if (retry_send >= 3) {
@@ -6197,6 +6305,9 @@
       }
    } else {
       free(unique_hostname);
+         if (copy_data == CL_TRUE) {
+            free(help_data);
+         }
       return CL_RETVAL_SEND_ERROR;
    } 
  
@@ -6213,6 +6324,7 @@
    CL_LOG_INT(CL_LOG_INFO,"message acknowledge expected, waiting for ack", \
(int)my_mid);  return_value = cl_commlib_check_for_ack(handle, receiver.comp_host, \
component_name, component_id, my_mid, CL_TRUE);  free(unique_hostname);
+   }
    return return_value;
 }
 
@@ -6466,6 +6578,10 @@
    int wait_for_events = 1;
    int select_sec_timeout = 0;
    int select_usec_timeout = 100*1000;
+   cl_app_message_queue_elem_t* mq_elem = NULL;
+   int mq_return_value = CL_RETVAL_OK; 
+
+
    int message_received = 0;
    int trigger_write_thread = 0;
    cl_connection_list_elem_t* elem = NULL;
@@ -6529,6 +6645,25 @@
          } else {
             the_handler = NULL;
          }
+
+         cl_raw_list_lock(handle->send_message_queue);
+         while((mq_elem = \
cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) { +         \
mq_return_value = cl_commlib_send_message_to_endpoint(handle, \
mq_elem->snd_destination, +                                                           \
mq_elem->snd_ack_type, mq_elem->snd_data, +                                           \
mq_elem->snd_size, mq_elem->snd_response_mid, +                                       \
mq_elem->snd_tag);  +            /* remove queue entries */
+            cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
+            if (mq_return_value != CL_RETVAL_OK) {
+               CL_LOG_STR(CL_LOG_ERROR,"can't send message:", \
cl_get_error_text(mq_return_value)); +               free(mq_elem->snd_data);
+            }
+            cl_com_free_endpoint(&(mq_elem->snd_destination));
+            free(mq_elem);
+         }
+         cl_raw_list_unlock(handle->send_message_queue);
+
+
          ret_val = cl_com_open_connection_request_handler(handle->framework, 
                                                           handle->connection_list, 
                                                           the_handler,
@@ -6845,6 +6980,8 @@
    cl_com_handle_t* handle = NULL;
    int trigger_read_thread = 0;
    char tmp_string[1024];
+   cl_app_message_queue_elem_t* mq_elem = NULL;
+   int mq_return_value = CL_RETVAL_OK; 
 
    /* get pointer to cl_thread_settings_t struct */
    cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf; 
@@ -6883,6 +7020,24 @@
          cl_raw_list_unlock(cl_com_handle_list);
          
       } else {
+
+         cl_raw_list_lock(handle->send_message_queue);
+         while((mq_elem = \
cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) { +         \
mq_return_value = cl_commlib_send_message_to_endpoint(handle, \
mq_elem->snd_destination, +                                                           \
mq_elem->snd_ack_type, mq_elem->snd_data, +                                           \
mq_elem->snd_size, mq_elem->snd_response_mid, +                                       \
mq_elem->snd_tag);  +            /* remove queue entries */
+            cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
+            if (mq_return_value != CL_RETVAL_OK) {
+               CL_LOG_STR(CL_LOG_ERROR,"can't send message:", \
cl_get_error_text(mq_return_value)); +               free(mq_elem->snd_data);
+            }
+            cl_com_free_endpoint(&(mq_elem->snd_destination));
+            free(mq_elem);
+         }
+         cl_raw_list_unlock(handle->send_message_queue);
+
 
          /* do write select */
          ret_val = cl_com_open_connection_request_handler(handle->framework, 

File [changed]: cl_data_types.h
Url: http://gridengine.sunsource.net/source/browse/gridengine/source/libs/comm/cl_data_types.h?r1=1.25&r2=1.26
 Delta lines:  +2 -0
-------------------
--- cl_data_types.h	23 Jun 2005 12:17:00 -0000	1.25
+++ cl_data_types.h	27 Jun 2005 15:01:29 -0000	1.26
@@ -305,6 +305,8 @@
    cl_raw_list_t* allowed_host_list; /* string list with hostnames allowed to \
connect */  unsigned long next_free_client_id;
 
+   cl_raw_list_t* send_message_queue;     /* used as queue for application messages \
which have to  +                                             be send to a connection \
( used in cl_commlib_send_message()) */  cl_raw_list_t* received_message_queue; /* \
                used as queue for application messages which are ready to be
                                              handled over to the application ( used \
                in cl_commlib_receive_message() 
                                              This message queue must have a "garbage \
collector", removing 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic