[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    branches/work/icecream-make-it-cool/services
From:       Dirk Mueller <mueller () kde ! org>
Date:       2007-07-26 23:17:38
Message-ID: 1185491858.017355.32048.nullmailer () svn ! kde ! org
[Download RAW message or body]

SVN commit 693072 by mueller:

fix handling of control channels


 M  +85 -80    scheduler.cpp  


--- branches/work/icecream-make-it-cool/services/scheduler.cpp #693071:693072
@@ -185,7 +185,7 @@
 
 unsigned int CS::hostid_counter = 0;
 
-static map<int, MsgChannel *> fd2chan;
+static map<int, CS *> fd2cs;
 static bool exit_main_loop = false;
 
 time_t starttime;
@@ -228,7 +228,7 @@
 };
 
 // A subset of connected_hosts representing the compiler servers
-static list<CS*> css;
+static list<CS*> css, monitors, controls;
 static list<string> block_css;
 static unsigned int new_job_id;
 static map<unsigned int, Job*> jobs;
@@ -246,8 +246,6 @@
 static list<JobStat> all_job_stats;
 static JobStat cum_job_stats;
 
-static list<MsgChannel*> monitors;
-
 static float server_speed (CS *cs, Job *job = 0);
 
 /* Searches the queue for JOB and removes it.
@@ -348,15 +346,15 @@
 
 }
 
-static bool handle_end (MsgChannel *c, Msg *);
+static bool handle_end (CS *c, Msg *);
 
 static void
 notify_monitors (Msg* m)
 {
-  list<MsgChannel*>::iterator it, it_old;
+  list<CS*>::iterator it, it_old;
   for (it = monitors.begin(); it != monitors.end();)
     {
-      it_old = it++; // handle_end removes it from monitors, so don't be clever
+      it_old = ++it;
       /* If we can't send it, don't be clever, simply close this monitor.  */
       if (!(*it_old)->send_msg (*m, MsgChannel::SendNonBlocking))
         handle_end (*it_old, 0);
@@ -913,6 +911,19 @@
   time_t now = time( 0 );
   time_t min_time = MAX_SCHEDULER_PING;
 
+  for (it = controls.begin(); it != controls.end();)
+    {
+      if (now - ( *it )->last_talk >= MAX_SCHEDULER_PING) 
+        {
+	  CS *old = *it;
+          ++it;
+	  handle_end (old, 0);
+	  continue;
+        }
+      min_time = min (min_time, MAX_SCHEDULER_PING - now + ( *it )->last_talk);
+      ++it;
+    }
+
   for (it = css.begin(); it != css.end(); )
     {
       /* protocol version 27 and newer use TCP keepalive */
@@ -983,21 +994,7 @@
   if (!job)
     return false;
 
-  if (css.empty())
-    {
-      /* XXX Can't happen anymore, right?  We have a request, hence one
-	 daemon must be connected to us (the submitter), so css can't
-	 be empty.  */
-      log_error() << "no servers to handle\n";
-      abort ();
-      remove_job_request ();
-      jobs.erase( job->id );
-      notify_monitors (new MonJobDoneMsg ( job->id,  255 ));
-      // Don't delete channel here.  We expect the client on the other side
-      // to exit, and that will remove the channel in handle_end
-      delete job;
-      return false;
-    }
+  assert(!css.empty());
 
   Job *first_job = job;
   CS *cs = 0;
@@ -1192,20 +1189,19 @@
 }
 
 static bool
-handle_mon_login (MsgChannel *c, Msg *_m)
+handle_mon_login (CS *c, Msg *_m)
 {
   MonLoginMsg *m = dynamic_cast<MonLoginMsg *>(_m);
   if (!m)
     return false;
-  // This is really a CS*, but we don't need the full one here
   monitors.push_back (c);
   // monitors really want to be fed lazily
   c->setBulkTransfer();
 
-  for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+  for (list<CS*>::const_iterator it = css.begin(); it != css.end(); ++it)
     handle_monitor_stats( *it );
 
-  fd2chan.erase( c->fd ); // no expected data from them
+  fd2cs.erase( c->fd ); // no expected data from them
   return true;
 }
 
@@ -1243,7 +1239,7 @@
 
 
 static bool
-handle_job_done (MsgChannel *c, Msg *_m)
+handle_job_done (CS *c, Msg *_m)
 {
   JobDoneMsg *m = dynamic_cast<JobDoneMsg *>(_m);
   if ( !m )
@@ -1255,7 +1251,7 @@
     {
       // the daemon saw a cancel of what he believes is waiting in the scheduler
       map<unsigned int, Job*>::iterator mit;
-      for (mit = jobs.begin(); mit != jobs.end(); mit++)
+      for (mit = jobs.begin(); mit != jobs.end(); ++mit)
         {
           Job *job = mit->second;
           trace() << "looking for waitcs " << job->server << " " << job->submitter  << " " << c << " "
@@ -1312,7 +1308,7 @@
     {
       log_info() << "the server isn't the same for job " << m->job_id << endl;
       log_info() << "server: " << j->server->nodename << endl;
-      log_info() << "msg came from: " << ((CS*)c)->nodename << endl;
+      log_info() << "msg came from: " << c->nodename << endl;
       // the daemon is not following matz's rules: kick him
       handle_end(c, 0);
       return false;
@@ -1321,7 +1317,7 @@
     {
       log_info() << "the submitter isn't the same for job " << m->job_id << endl;
       log_info() << "submitter: " << j->submitter->nodename << endl;
-      log_info() << "msg came from: " << ((CS*)c)->nodename << endl;
+      log_info() << "msg came from: " << c->nodename << endl;
       // the daemon is not following matz's rules: kick him
       handle_end(c, 0);
       return false;
@@ -1372,12 +1368,11 @@
 }
 
 static bool
-handle_ping (MsgChannel * c, Msg * /*_m*/)
+handle_ping (CS* c, Msg * /*_m*/)
 {
   c->last_talk = time( 0 );
-  CS *cs = dynamic_cast<CS*>( c );
-  if ( cs && cs->max_jobs < 0 )
-    cs->max_jobs *= -1;
+  if ( c->max_jobs < 0 )
+    c->max_jobs *= -1;
   return true;
 }
 
@@ -1451,38 +1446,47 @@
 }
 
 static bool
-send_greeting(MsgChannel* c)
+handle_control_login(CS* c)
 {
+    c->type = CS::LINE;
+    c->last_talk = time (0);
+    c->setBulkTransfer();
+    c->state = CS::LOGGEDIN;
+    assert(find(controls.begin(), controls.end(), c) == controls.end());
+    controls.push_back(c);
+
     std::ostringstream o;
-
     o << "200-ICECC " VERSION ": "
       << time(0) - starttime << "s uptime, "
       << css.size() << " hosts, "
       << jobs.size() << " jobs in queue "
       << "(" << new_job_id << " total)." << endl;
     o << "200 Use 'help' for help and 'quit' to quit." << endl;
-
     return c->send_msg(TextMsg(o.str()));
 }
 
-
 static bool
-handle_line (MsgChannel *c, Msg *_m)
+handle_line (CS *c, Msg *_m)
 {
   TextMsg *m = dynamic_cast<TextMsg *>(_m);
   if (!m)
     return false;
+
   char buffer[1000];
   string line;
   list<string> l;
   split_string (m->text, " \t\n", l);
   string cmd;
+
+  c->last_talk = time(0);
+
   if (l.empty())
     cmd = "";
   else
     {
       cmd = l.front();
       l.pop_front();
+      transform(cmd.begin(), cmd.end(), cmd.begin(), ::tolower);
     }
   if (cmd == "listcs")
     {
@@ -1504,14 +1508,14 @@
             return false;
           for ( list<Job*>::const_iterator it2 = cs->joblist.begin(); it2 != cs->joblist.end(); ++it2 )
             if(!c->send_msg (TextMsg ("   " + dump_job (*it2) ) ))
-              break;
+              return false;
 	}
     }
   else if (cmd == "listblocks")
     {
       for (list<string>::const_iterator it = block_css.begin(); it != block_css.end(); ++it)
         if(!c->send_msg (TextMsg ("   " + (*it) ) ))
-          break;
+          return false;
     }
   else if (cmd == "listjobs")
     {
@@ -1522,6 +1526,7 @@
     }
   else if (cmd == "quit" || cmd == "exit" )
     {
+      handle_end(c, 0);
       return false;
     }
   else if (cmd == "removecs" || cmd == "blockcs")
@@ -1595,57 +1600,53 @@
 
 // return false if some error occured, leaves C open.  */
 static bool
-try_login (MsgChannel *c, Msg *m)
+try_login (CS *c, Msg *m)
 {
   bool ret = true;
-  CS *cs = static_cast<CS *>(c);
   switch (m->type)
     {
     case M_LOGIN:
-      cs->type = CS::DAEMON;
+      c->type = CS::DAEMON;
       ret = handle_login (c, m);
       break;
     case M_MON_LOGIN:
-      cs->type = CS::MONITOR;
+      c->type = CS::MONITOR;
       ret = handle_mon_login (c, m);
       break;
-    case M_TEXT:
-      cs->type = CS::LINE;
-      ret = handle_line (c, m);
-      break;
     default:
       log_info() << "Invalid first message " << (char)m->type << endl;
       ret = false;
       break;
     }
   if (ret)
-    cs->state = CS::LOGGEDIN;
+    c->state = CS::LOGGEDIN;
   else
-    handle_end (cs, m);
+    handle_end (c, m);
 
   delete m;
   return ret;
 }
 
 static bool
-handle_end (MsgChannel *c, Msg *m)
+handle_end (CS *toremove, Msg *m)
 {
 #if DEBUG_SCHEDULER > 1
-  trace() << "Handle_end " << c << " " << m << endl;
+  trace() << "Handle_end " << toremove << " " << m << endl;
 #else
   ( void )m;
 #endif
 
-  CS *toremove = static_cast<CS *>(c);
-  if (toremove->type == CS::MONITOR)
+  switch (toremove->type) {
+  case CS::MONITOR:
     {
-      assert (find (monitors.begin(), monitors.end(), c) != monitors.end());
-      monitors.remove (c);
+      assert (find (monitors.begin(), monitors.end(), toremove) != monitors.end());
+      monitors.remove (toremove);
 #if DEBUG_SCHEDULER > 1
       trace() << "handle_end(moni) " << monitors.size() << endl;
 #endif
     }
-  else if (toremove->type == CS::DAEMON)
+    break;
+  case CS::DAEMON:
     {
       log_info() << "remove daemon " << toremove->nodename << endl;
 
@@ -1704,22 +1705,29 @@
             }
         }
     }
-  else if (toremove->type == CS::LINE)
+    break;
+  case CS::LINE:
     {
-      if (!c->send_msg (TextMsg ("200 Good Bye!"))) {
+      if (!toremove->send_msg (TextMsg ("200 Good Bye!"))) {
       }
+      controls.remove (toremove);
     }
-  else
-    trace() << "remote end had UNKNOWN type?" << endl;
+    break;
+  default:
+    {
+      trace() << "remote end had UNKNOWN type?" << endl;
+      assert(42 != 42);
+    }
+  }
 
-  fd2chan.erase (c->fd);
-  delete c;
+  fd2cs.erase (toremove->fd);
+  delete toremove;
   return true;
 }
 
 /* Returns TRUE if C was not closed.  */
 static bool
-handle_activity (MsgChannel *c)
+handle_activity (CS *c)
 {
   Msg *m;
   bool ret = true;
@@ -1730,7 +1738,7 @@
       return false;
     }
   /* First we need to login.  */
-  if (static_cast<CS *>(c)->state == CS::CONNECTED)
+  if (c->state == CS::CONNECTED)
     return try_login (c, m);
 
   switch (m->type)
@@ -1989,11 +1997,10 @@
       if (broad_fd > max_fd)
         max_fd = broad_fd;
       FD_SET (broad_fd, &read_set);
-      for (map<int, MsgChannel *>::const_iterator it = fd2chan.begin();
-           it != fd2chan.end();)
+      for (map<int, CS*>::const_iterator it = fd2cs.begin(); it != fd2cs.end();)
         {
           int i = it->first;
-          MsgChannel *c = it->second;
+          CS *c = it->second;
           bool ok = true;
           ++it;
           /* handle_activity() can delete c and make the iterator
@@ -2039,7 +2046,7 @@
                   delete cs;
                   continue;
                 }
-	      fd2chan[cs->fd] = cs;
+	      fd2cs[cs->fd] = cs;
 	      while (!cs->read_a_bit () || cs->has_msg ())
                 if(! handle_activity (cs))
                   break;
@@ -2061,14 +2068,12 @@
 	  if (remote_fd >= 0)
 	    {
 	      CS *cs = new CS (remote_fd, (struct sockaddr*) &remote_addr, remote_len, true);
-              cs->last_talk = time (0);
-              cs->setBulkTransfer();
-              if (!cs->protocol || !send_greeting(cs))
+	      fd2cs[cs->fd] = cs;
+              if (!handle_control_login(cs))
                 {
-                  delete cs;
+                  handle_end(cs, 0);
                   continue;
                 }
-	      fd2chan[cs->fd] = cs;
 	      while (!cs->read_a_bit () || cs->has_msg ())
 	        if (!handle_activity (cs))
                   break;
@@ -2109,12 +2114,12 @@
 		}
 	    }
 	}
-      for (map<int, MsgChannel *>::const_iterator it = fd2chan.begin();
-           max_fd && it != fd2chan.end();)
+      for (map<int, CS*>::const_iterator it = fd2cs.begin();
+           max_fd && it != fd2cs.end();)
         {
           int i = it->first;
-          MsgChannel *c = it->second;
-          /* handle_activity can delete the channel from the fd2chan list,
+          CS *c = it->second;
+          /* handle_activity can delete the channel from the fd2cs list,
              hence advance the iterator right now, so it doesn't become
              invalid.  */
           ++it;
@@ -2127,8 +2132,8 @@
             }
         }
     }
+  shutdown (broad_fd, SHUT_RDWR);
   close (broad_fd);
-  shutdown (broad_fd, SHUT_RDWR);
   unlink(pidFilePath.c_str());
   return 0;
 }
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic