--Boundary-00=_ipWsKMLZf9VVOYB Content-Type: Text/Plain; charset="iso-8859-15" Content-Transfer-Encoding: 7bit Okay, I have run some tests on my approach to address the potential deadlock issue when scheduling high-level jobs. The idea is as follows: 1.) FileCopyJob is called as result of either KIO::file_copy by application. 2.) Step #2 results in a creation of "PUT" job. 3.) A meta data named "paried-request" is set at step #3 with the source url. This is done in FileCopyJob::startBestCopyMethod. 4.) When the scheduler is asked to create a slave for this "PUT" job, it will save any url specified in the "paired-request" meta-data in a reserve list. 5.) The scheduler always takes into account the number of items in the reserved list before attempting to assign any ioslave to a job. The above process ensures that no two "PUT" jobs are assigned an ioslave before a "PUT/GET" pair. I tested it with as many scenrios as I can think, but it is entirely possible I completely missed something. Anyhow, all feedback is welcome... NOTE: I do not like to use the meta-data system for this, but this a quick proof of concept. Perhaps, there is a better way to do this... On Wednesday 16 September 2009 13:22:41 Dawit A. wrote: > David, > > Yes, now that I understand how the high level jobs work, I completely got > your concern about the potential for a deadlocked condition. > > Right now I am working on a solution to eliminate this deadlock condition > from ocurring in the scheduler. There is a way to do this by pairing the > requests from high level jobs so that the scheduler can take that into > account when it is scheduling jobs. > > More about that once I refine and test the solution to see whether or not > it is viable and does solve the deadlock problem... > > On Wednesday 16 September 2009 11:52:44 David Faure wrote: > > On Tuesday 08 September 2009, Dawit A. wrote: > > [snipped] > > > > Can you give an example of how to trigger this > > > dead lock ? I suppose I can simply start copying files from remote > > > locations (sftp/ftp) until the max instances limit is reached, no ? > > > > Maybe. I admit I didn't actually try, but it seems logical to me, with > > the above reasoning. To get many filecopyjobs started, I recommend > > copying a whole directory of files. That gives time to start another > > directory copy while it's happening. Each file being copied will start a > > FileCopyJob. > > Just for clarification, the dead lock condition can only occur if both ends > of the high level job are remote urls, correct ? That is both the put and > get operation must be remote otherwise they are handled differently and > the scheduling does not come into the equation... Or did I get that wrong > ? > --Boundary-00=_ipWsKMLZf9VVOYB Content-Type: text/x-patch; charset="UTF-8"; name="sched_jobs.patch" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="sched_jobs.patch" Index: job.cpp =================================================================== --- job.cpp (revision 1024592) +++ job.cpp (working copy) @@ -308,9 +308,9 @@ } Scheduler::doJob(q); + Scheduler::scheduleJob(q); } - bool SimpleJob::doKill() { //kDebug() << this; @@ -627,6 +627,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -834,6 +835,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -999,6 +1001,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -1597,6 +1600,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -1803,6 +1807,8 @@ else { startDataPump(); + if (m_putJob) + m_putJob->addMetaData("paired_request", m_src.url()); } } @@ -2420,6 +2426,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -2671,6 +2678,7 @@ d->m_url = d->m_waitQueue.first().url; d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } } Index: scheduler.cpp =================================================================== --- scheduler.cpp (revision 1024592) +++ scheduler.cpp (working copy) @@ -136,8 +136,8 @@ void publishSlaveOnHold(); void registerWindow(QWidget *wid); - Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact); - Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url); + Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact, bool enforeLimits = false); + Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl& url, bool enforceLimits = false); void debug_info(); @@ -162,7 +162,7 @@ class KIO::SchedulerPrivate::ProtocolInfo { public: - ProtocolInfo() : maxSlaves(1), skipCount(0) + ProtocolInfo() : maxSlaves(1), maxSlavesPerHost(0), skipCount(0) { } @@ -181,16 +181,64 @@ return ret; } + int activeSlavesCountFor(SimpleJob* job) + { + int count = 0; + QString host = job->url().host(); + + if (!host.isEmpty()) + { + QListIterator it (activeSlaves); + while (it.hasNext()) + { + if (host == it.next()->host()) + count++; + } + + QString url = job->url().url(); + kDebug() << "*** Reserve list count: " << reserveList.count(); + + if (reserveList.contains(url)) { + kDebug() << "*** Removing paired request for: " << url; + reserveList.removeOne(url); + } else { + count += reserveList.count(); + } + } + + return count; + } + + QStringList reserveList; QList joblist; SlaveList activeSlaves; SlaveList idleSlaves; CoSlaveMap coSlaves; SlaveList coIdleSlaves; int maxSlaves; + int maxSlavesPerHost; int skipCount; QString protocol; }; +static inline bool checkLimits(KIO::SchedulerPrivate::ProtocolInfo *protInfo, SimpleJob *job) +{ + const int numActiveSlaves = protInfo->activeSlavesCountFor(job); + +#if 0 + kDebug() << job->url() << ": "; + kDebug() << " protocol :" << job->url().protocol() + << ", max :" << protInfo->maxSlaves + << ", max host :" << protInfo->maxSlavesPerHost + << ", active :" << protInfo->activeSlaves.count() + << ", idle :" << protInfo->idleSlaves.count() + << ", active for " << job->url().host() << " = " << numActiveSlaves; +#endif + + return (protInfo->maxSlavesPerHost < 1 || protInfo->maxSlavesPerHost > numActiveSlaves); +} + + KIO::SchedulerPrivate::ProtocolInfo * KIO::SchedulerPrivate::ProtocolInfoDict::get(const QString &protocol) { @@ -200,6 +248,7 @@ info = new ProtocolInfo; info->protocol = protocol; info->maxSlaves = KProtocolInfo::maxSlaves( protocol ); + info->maxSlavesPerHost = KProtocolInfo::maxSlavesPerHost( protocol ); insert(protocol, info); } @@ -382,9 +431,10 @@ } } -void SchedulerPrivate::doJob(SimpleJob *job) { +void SchedulerPrivate::doJob(SimpleJob *job) { JobData jobData; jobData.protocol = KProtocolManager::slaveProtocol(job->url(), jobData.proxy); + // kDebug(7006) << "protocol=" << jobData->protocol; if (jobCommand(job) == CMD_GET) { @@ -401,15 +451,17 @@ } void SchedulerPrivate::scheduleJob(SimpleJob *job) { + newJobs.removeOne(job); const JobData& jobData = extraJobData.value(job); QString protocol = jobData.protocol; // kDebug(7006) << "protocol=" << protocol; ProtocolInfo *protInfo = protInfoDict.get(protocol); - protInfo->joblist.append(job); - - slaveTimer.start(0); + if (!protInfo->joblist.contains(job)) { // scheduleJob already called for this job? + protInfo->joblist.append(job); + slaveTimer.start(0); + } } void SchedulerPrivate::cancelJob(SimpleJob *job) { @@ -422,6 +474,7 @@ newJobs.removeAll(job); ProtocolInfo *protInfo = protInfoDict.get(jobData.protocol); protInfo->joblist.removeAll(job); + protInfo->reserveList.removeAll(job->url().url()); // Search all slaves to see if job is in the queue of a coSlave foreach(Slave* coSlave, protInfo->allSlaves()) @@ -520,15 +573,15 @@ SimpleJob *job = 0; Slave *slave = 0; - + if (protInfo->skipCount > 2) { bool dummy; // Prevent starvation. We skip the first entry in the queue at most // 2 times in a row. The protInfo->skipCount = 0; - job = protInfo->joblist.at(0); - slave = findIdleSlave(protInfo, job, dummy ); + job = protInfo->joblist.at(0); + slave = findIdleSlave(protInfo, job, dummy, true); } else { @@ -538,7 +591,8 @@ for(int i = 0; (i < protInfo->joblist.count()) && (i < 10); i++) { job = protInfo->joblist.at(i); - slave = findIdleSlave(protInfo, job, exact); + slave = findIdleSlave(protInfo, job, exact, true); + if (!firstSlave) { firstJob = job; @@ -561,28 +615,33 @@ if (!slave) { - if ( protInfo->maxSlaves > static_cast(protInfo->activeSlaves.count()) ) - { - newSlave = true; - slave = createSlave(protInfo, job, job->url()); - if (!slave) - slaveTimer.start(0); - } + slave = createSlave(protInfo, job, job->url(), true); + if (slave) + newSlave = true; + else + slaveTimer.start(0); } if (!slave) { -// kDebug(7006) << "No slaves available"; -// kDebug(7006) << " -- active: " << protInfo->activeSlaves.count(); + //kDebug() << "No slaves available"; + //kDebug() << " -- active: " << protInfo->activeSlaves.count(); return false; } + KIO::MetaData metaData = job->outgoingMetaData(); + if (metaData.contains("paired_request")) { + KUrl url (metaData.take("paired_request")); + kDebug() << "*** PAIRED REQUEST: " << url; + protInfoDict.get(url.protocol())->reserveList << url.url(); + job->setMetaData(metaData); + } + protInfo->activeSlaves.append(slave); protInfo->idleSlaves.removeAll(slave); protInfo->joblist.removeOne(job); // kDebug(7006) << "scheduler: job started " << job; - SchedulerPrivate::JobData jobData = extraJobData.value(job); setupSlave(slave, job->url(), jobData.protocol, jobData.proxy, newSlave); startJob(job, slave); @@ -635,6 +694,8 @@ foreach( Slave *slave, idleSlaves ) { +// kDebug() << "job protocol: " << protocol << ", slave protocol: " << slave->slaveProtocol(); +// kDebug() << "job host: " << host << ", slave host: " << slave->host(); if ((protocol == slave->slaveProtocol()) && (host == slave->host()) && (port == slave->port()) && @@ -652,82 +713,100 @@ return 0; } -Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact) +Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, + bool &exact, bool enforceLimits) { Slave *slave = 0; - JobData jobData = extraJobData.value(job); - if (jobData.checkOnHold) + if (!enforceLimits || checkLimits(protInfo, job)) { - slave = Slave::holdSlave(jobData.protocol, job->url()); - if (slave) - return slave; + JobData jobData = extraJobData.value(job); + + if (jobData.checkOnHold) + { + slave = Slave::holdSlave(jobData.protocol, job->url()); + if (slave) + return slave; + } + + if (slaveOnHold) + { + // Make sure that the job wants to do a GET or a POST, and with no offset + bool bCanReuse = (jobCommand(job) == CMD_GET); + KIO::TransferJob * tJob = qobject_cast(job); + if ( tJob ) + { + bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == CMD_SPECIAL); + if ( bCanReuse ) + { + KIO::MetaData outgoing = tJob->outgoingMetaData(); + QString resume = (!outgoing.contains("resume")) ? QString() : outgoing["resume"]; + kDebug(7006) << "Resume metadata is" << resume; + bCanReuse = (resume.isEmpty() || resume == "0"); + } + } +// kDebug(7006) << "bCanReuse = " << bCanReuse; + if (bCanReuse) + { + if (job->url() == urlOnHold) + { + kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold; + slave = slaveOnHold; + } + else + { + kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")"; + slaveOnHold->kill(); + } + slaveOnHold = 0; + urlOnHold = KUrl(); + } + if (slave) + return slave; + } + + slave = searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, exact); } - if (slaveOnHold) + + return slave; +} + +Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, + const KUrl &url, bool enforceLimits) +{ + Slave *slave = 0; + const int slavesCount = protInfo->activeSlaves.count() + protInfo->idleSlaves.count(); + + if (!enforceLimits || + (protInfo->maxSlaves > slavesCount && checkLimits(protInfo, job))) { - // Make sure that the job wants to do a GET or a POST, and with no offset - bool bCanReuse = (jobCommand(job) == CMD_GET); - KIO::TransferJob * tJob = qobject_cast(job); - if ( tJob ) - { - bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == CMD_SPECIAL); - if ( bCanReuse ) - { - KIO::MetaData outgoing = tJob->outgoingMetaData(); - QString resume = (!outgoing.contains("resume")) ? QString() : outgoing["resume"]; - kDebug(7006) << "Resume metadata is" << resume; - bCanReuse = (resume.isEmpty() || resume == "0"); - } - } -// kDebug(7006) << "bCanReuse = " << bCanReuse; - if (bCanReuse) - { - if (job->url() == urlOnHold) - { - kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold; - slave = slaveOnHold; - } - else - { - kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")"; - slaveOnHold->kill(); - } - slaveOnHold = 0; - urlOnHold = KUrl(); - } - if (slave) - return slave; + int error; + QString errortext; + slave = Slave::createSlave(protInfo->protocol, url, error, errortext); + + if (slave) + { + protInfo->idleSlaves.append(slave); + q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)), + SLOT(slotSlaveDied(KIO::Slave *))); + q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, bool)), + SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, bool))); + } + else + { + kError() << "couldn't create slave:" << errortext; + if (job) + { + protInfo->joblist.removeAll(job); + extraJobData.remove(job); + job->slotError( error, errortext ); + } + } } - return searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, exact); + return slave; } -Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url) -{ - int error; - QString errortext; - Slave *slave = Slave::createSlave(protInfo->protocol, url, error, errortext); - if (slave) - { - protInfo->idleSlaves.append(slave); - q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)), - SLOT(slotSlaveDied(KIO::Slave *))); - q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, bool)), - SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, bool))); - } - else - { - kError() << "couldn't create slave:" << errortext; - if (job) - { - protInfo->joblist.removeAll(job); - extraJobData.remove(job); - job->slotError( error, errortext ); - } - } - return slave; -} - void SchedulerPrivate::slotSlaveStatus(pid_t, const QByteArray&, const QString &, bool) { } @@ -955,16 +1034,15 @@ { // kDebug(7006) << "_assignJobToSlave( " << job << ", " << slave << ")"; QString dummy; - if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), dummy )) - || - (!newJobs.removeAll(job))) + ProtocolInfo *protInfo = protInfoDict.get(slave->protocol()); + if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), dummy )) || + !(protInfo->joblist.removeAll(job) > 0 || newJobs.removeAll(job) > 0)) { kDebug(7006) << "_assignJobToSlave(): ERROR, nonmatching or unknown job."; job->kill(); return false; } - ProtocolInfo *protInfo = protInfoDict.get(slave->protocol()); JobList *list = protInfo->coSlaves.value(slave); assert(list); if (!list) --Boundary-00=_ipWsKMLZf9VVOYB--