[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: branches/work/icecream-make-it-cool/services
From: Dirk Mueller <mueller () kde ! org>
Date: 2007-07-26 23:17:38
Message-ID: 1185491858.017355.32048.nullmailer () svn ! kde ! org
[Download RAW message or body]
SVN commit 693072 by mueller:
fix handling of control channels
M +85 -80 scheduler.cpp
--- branches/work/icecream-make-it-cool/services/scheduler.cpp #693071:693072
@@ -185,7 +185,7 @@
unsigned int CS::hostid_counter = 0;
-static map<int, MsgChannel *> fd2chan;
+static map<int, CS *> fd2cs;
static bool exit_main_loop = false;
time_t starttime;
@@ -228,7 +228,7 @@
};
// A subset of connected_hosts representing the compiler servers
-static list<CS*> css;
+static list<CS*> css, monitors, controls;
static list<string> block_css;
static unsigned int new_job_id;
static map<unsigned int, Job*> jobs;
@@ -246,8 +246,6 @@
static list<JobStat> all_job_stats;
static JobStat cum_job_stats;
-static list<MsgChannel*> monitors;
-
static float server_speed (CS *cs, Job *job = 0);
/* Searches the queue for JOB and removes it.
@@ -348,15 +346,15 @@
}
-static bool handle_end (MsgChannel *c, Msg *);
+static bool handle_end (CS *c, Msg *);
static void
notify_monitors (Msg* m)
{
- list<MsgChannel*>::iterator it, it_old;
+ list<CS*>::iterator it, it_old;
for (it = monitors.begin(); it != monitors.end();)
{
- it_old = it++; // handle_end removes it from monitors, so don't be clever
+ it_old = ++it;
/* If we can't send it, don't be clever, simply close this monitor. */
if (!(*it_old)->send_msg (*m, MsgChannel::SendNonBlocking))
handle_end (*it_old, 0);
@@ -913,6 +911,19 @@
time_t now = time( 0 );
time_t min_time = MAX_SCHEDULER_PING;
+ for (it = controls.begin(); it != controls.end();)
+ {
+ if (now - ( *it )->last_talk >= MAX_SCHEDULER_PING)
+ {
+ CS *old = *it;
+ ++it;
+ handle_end (old, 0);
+ continue;
+ }
+ min_time = min (min_time, MAX_SCHEDULER_PING - now + ( *it )->last_talk);
+ ++it;
+ }
+
for (it = css.begin(); it != css.end(); )
{
/* protocol version 27 and newer use TCP keepalive */
@@ -983,21 +994,7 @@
if (!job)
return false;
- if (css.empty())
- {
- /* XXX Can't happen anymore, right? We have a request, hence one
- daemon must be connected to us (the submitter), so css can't
- be empty. */
- log_error() << "no servers to handle\n";
- abort ();
- remove_job_request ();
- jobs.erase( job->id );
- notify_monitors (new MonJobDoneMsg ( job->id, 255 ));
- // Don't delete channel here. We expect the client on the other side
- // to exit, and that will remove the channel in handle_end
- delete job;
- return false;
- }
+ assert(!css.empty());
Job *first_job = job;
CS *cs = 0;
@@ -1192,20 +1189,19 @@
}
static bool
-handle_mon_login (MsgChannel *c, Msg *_m)
+handle_mon_login (CS *c, Msg *_m)
{
MonLoginMsg *m = dynamic_cast<MonLoginMsg *>(_m);
if (!m)
return false;
- // This is really a CS*, but we don't need the full one here
monitors.push_back (c);
// monitors really want to be fed lazily
c->setBulkTransfer();
- for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ for (list<CS*>::const_iterator it = css.begin(); it != css.end(); ++it)
handle_monitor_stats( *it );
- fd2chan.erase( c->fd ); // no expected data from them
+ fd2cs.erase( c->fd ); // no expected data from them
return true;
}
@@ -1243,7 +1239,7 @@
static bool
-handle_job_done (MsgChannel *c, Msg *_m)
+handle_job_done (CS *c, Msg *_m)
{
JobDoneMsg *m = dynamic_cast<JobDoneMsg *>(_m);
if ( !m )
@@ -1255,7 +1251,7 @@
{
// the daemon saw a cancel of what he believes is waiting in the scheduler
map<unsigned int, Job*>::iterator mit;
- for (mit = jobs.begin(); mit != jobs.end(); mit++)
+ for (mit = jobs.begin(); mit != jobs.end(); ++mit)
{
Job *job = mit->second;
trace() << "looking for waitcs " << job->server << " " << job->submitter << " " << c << " "
@@ -1312,7 +1308,7 @@
{
log_info() << "the server isn't the same for job " << m->job_id << endl;
log_info() << "server: " << j->server->nodename << endl;
- log_info() << "msg came from: " << ((CS*)c)->nodename << endl;
+ log_info() << "msg came from: " << c->nodename << endl;
// the daemon is not following matz's rules: kick him
handle_end(c, 0);
return false;
@@ -1321,7 +1317,7 @@
{
log_info() << "the submitter isn't the same for job " << m->job_id << endl;
log_info() << "submitter: " << j->submitter->nodename << endl;
- log_info() << "msg came from: " << ((CS*)c)->nodename << endl;
+ log_info() << "msg came from: " << c->nodename << endl;
// the daemon is not following matz's rules: kick him
handle_end(c, 0);
return false;
@@ -1372,12 +1368,11 @@
}
static bool
-handle_ping (MsgChannel * c, Msg * /*_m*/)
+handle_ping (CS* c, Msg * /*_m*/)
{
c->last_talk = time( 0 );
- CS *cs = dynamic_cast<CS*>( c );
- if ( cs && cs->max_jobs < 0 )
- cs->max_jobs *= -1;
+ if ( c->max_jobs < 0 )
+ c->max_jobs *= -1;
return true;
}
@@ -1451,38 +1446,47 @@
}
static bool
-send_greeting(MsgChannel* c)
+handle_control_login(CS* c)
{
+ c->type = CS::LINE;
+ c->last_talk = time (0);
+ c->setBulkTransfer();
+ c->state = CS::LOGGEDIN;
+ assert(find(controls.begin(), controls.end(), c) == controls.end());
+ controls.push_back(c);
+
std::ostringstream o;
-
o << "200-ICECC " VERSION ": "
<< time(0) - starttime << "s uptime, "
<< css.size() << " hosts, "
<< jobs.size() << " jobs in queue "
<< "(" << new_job_id << " total)." << endl;
o << "200 Use 'help' for help and 'quit' to quit." << endl;
-
return c->send_msg(TextMsg(o.str()));
}
-
static bool
-handle_line (MsgChannel *c, Msg *_m)
+handle_line (CS *c, Msg *_m)
{
TextMsg *m = dynamic_cast<TextMsg *>(_m);
if (!m)
return false;
+
char buffer[1000];
string line;
list<string> l;
split_string (m->text, " \t\n", l);
string cmd;
+
+ c->last_talk = time(0);
+
if (l.empty())
cmd = "";
else
{
cmd = l.front();
l.pop_front();
+ transform(cmd.begin(), cmd.end(), cmd.begin(), ::tolower);
}
if (cmd == "listcs")
{
@@ -1504,14 +1508,14 @@
return false;
for ( list<Job*>::const_iterator it2 = cs->joblist.begin(); it2 != cs->joblist.end(); ++it2 )
if(!c->send_msg (TextMsg (" " + dump_job (*it2) ) ))
- break;
+ return false;
}
}
else if (cmd == "listblocks")
{
for (list<string>::const_iterator it = block_css.begin(); it != block_css.end(); ++it)
if(!c->send_msg (TextMsg (" " + (*it) ) ))
- break;
+ return false;
}
else if (cmd == "listjobs")
{
@@ -1522,6 +1526,7 @@
}
else if (cmd == "quit" || cmd == "exit" )
{
+ handle_end(c, 0);
return false;
}
else if (cmd == "removecs" || cmd == "blockcs")
@@ -1595,57 +1600,53 @@
// return false if some error occured, leaves C open. */
static bool
-try_login (MsgChannel *c, Msg *m)
+try_login (CS *c, Msg *m)
{
bool ret = true;
- CS *cs = static_cast<CS *>(c);
switch (m->type)
{
case M_LOGIN:
- cs->type = CS::DAEMON;
+ c->type = CS::DAEMON;
ret = handle_login (c, m);
break;
case M_MON_LOGIN:
- cs->type = CS::MONITOR;
+ c->type = CS::MONITOR;
ret = handle_mon_login (c, m);
break;
- case M_TEXT:
- cs->type = CS::LINE;
- ret = handle_line (c, m);
- break;
default:
log_info() << "Invalid first message " << (char)m->type << endl;
ret = false;
break;
}
if (ret)
- cs->state = CS::LOGGEDIN;
+ c->state = CS::LOGGEDIN;
else
- handle_end (cs, m);
+ handle_end (c, m);
delete m;
return ret;
}
static bool
-handle_end (MsgChannel *c, Msg *m)
+handle_end (CS *toremove, Msg *m)
{
#if DEBUG_SCHEDULER > 1
- trace() << "Handle_end " << c << " " << m << endl;
+ trace() << "Handle_end " << toremove << " " << m << endl;
#else
( void )m;
#endif
- CS *toremove = static_cast<CS *>(c);
- if (toremove->type == CS::MONITOR)
+ switch (toremove->type) {
+ case CS::MONITOR:
{
- assert (find (monitors.begin(), monitors.end(), c) != monitors.end());
- monitors.remove (c);
+ assert (find (monitors.begin(), monitors.end(), toremove) != monitors.end());
+ monitors.remove (toremove);
#if DEBUG_SCHEDULER > 1
trace() << "handle_end(moni) " << monitors.size() << endl;
#endif
}
- else if (toremove->type == CS::DAEMON)
+ break;
+ case CS::DAEMON:
{
log_info() << "remove daemon " << toremove->nodename << endl;
@@ -1704,22 +1705,29 @@
}
}
}
- else if (toremove->type == CS::LINE)
+ break;
+ case CS::LINE:
{
- if (!c->send_msg (TextMsg ("200 Good Bye!"))) {
+ if (!toremove->send_msg (TextMsg ("200 Good Bye!"))) {
}
+ controls.remove (toremove);
}
- else
- trace() << "remote end had UNKNOWN type?" << endl;
+ break;
+ default:
+ {
+ trace() << "remote end had UNKNOWN type?" << endl;
+ assert(42 != 42);
+ }
+ }
- fd2chan.erase (c->fd);
- delete c;
+ fd2cs.erase (toremove->fd);
+ delete toremove;
return true;
}
/* Returns TRUE if C was not closed. */
static bool
-handle_activity (MsgChannel *c)
+handle_activity (CS *c)
{
Msg *m;
bool ret = true;
@@ -1730,7 +1738,7 @@
return false;
}
/* First we need to login. */
- if (static_cast<CS *>(c)->state == CS::CONNECTED)
+ if (c->state == CS::CONNECTED)
return try_login (c, m);
switch (m->type)
@@ -1989,11 +1997,10 @@
if (broad_fd > max_fd)
max_fd = broad_fd;
FD_SET (broad_fd, &read_set);
- for (map<int, MsgChannel *>::const_iterator it = fd2chan.begin();
- it != fd2chan.end();)
+ for (map<int, CS*>::const_iterator it = fd2cs.begin(); it != fd2cs.end();)
{
int i = it->first;
- MsgChannel *c = it->second;
+ CS *c = it->second;
bool ok = true;
++it;
/* handle_activity() can delete c and make the iterator
@@ -2039,7 +2046,7 @@
delete cs;
continue;
}
- fd2chan[cs->fd] = cs;
+ fd2cs[cs->fd] = cs;
while (!cs->read_a_bit () || cs->has_msg ())
if(! handle_activity (cs))
break;
@@ -2061,14 +2068,12 @@
if (remote_fd >= 0)
{
CS *cs = new CS (remote_fd, (struct sockaddr*) &remote_addr, remote_len, true);
- cs->last_talk = time (0);
- cs->setBulkTransfer();
- if (!cs->protocol || !send_greeting(cs))
+ fd2cs[cs->fd] = cs;
+ if (!handle_control_login(cs))
{
- delete cs;
+ handle_end(cs, 0);
continue;
}
- fd2chan[cs->fd] = cs;
while (!cs->read_a_bit () || cs->has_msg ())
if (!handle_activity (cs))
break;
@@ -2109,12 +2114,12 @@
}
}
}
- for (map<int, MsgChannel *>::const_iterator it = fd2chan.begin();
- max_fd && it != fd2chan.end();)
+ for (map<int, CS*>::const_iterator it = fd2cs.begin();
+ max_fd && it != fd2cs.end();)
{
int i = it->first;
- MsgChannel *c = it->second;
- /* handle_activity can delete the channel from the fd2chan list,
+ CS *c = it->second;
+ /* handle_activity can delete the channel from the fd2cs list,
hence advance the iterator right now, so it doesn't become
invalid. */
++it;
@@ -2127,8 +2132,8 @@
}
}
}
+ shutdown (broad_fd, SHUT_RDWR);
close (broad_fd);
- shutdown (broad_fd, SHUT_RDWR);
unlink(pidFilePath.c_str());
return 0;
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic