--Boundary-00=_zFlsKf3Gr11mkr9 Content-Type: Text/Plain; charset="iso-8859-15" Content-Transfer-Encoding: 7bit Updated patch. * Replace the use of metadata container with a member variable addition to the SimpleJobPrivate class. On Wednesday 16 September 2009 18:44:49 Dawit A. wrote: > Okay, I have run some tests on my approach to address the potential > deadlock issue when scheduling high-level jobs. The idea is as follows: > > 1.) FileCopyJob is called as result of either KIO::file_copy by > application. > > 2.) Step #2 results in a creation of "PUT" job. > > 3.) A meta data named "paried-request" is set at step #3 with the source > url. This is done in FileCopyJob::startBestCopyMethod. > > 4.) When the scheduler is asked to create a slave for this "PUT" job, it > will save any url specified in the "paired-request" meta-data in a reserve > list. > > 5.) The scheduler always takes into account the number of items in the > reserved list before attempting to assign any ioslave to a job. > > The above process ensures that no two "PUT" jobs are assigned an ioslave > before a "PUT/GET" pair. > > I tested it with as many scenrios as I can think, but it is entirely > possible I completely missed something. Anyhow, all feedback is welcome... > > NOTE: I do not like to use the meta-data system for this, but this a quick > proof of concept. Perhaps, there is a better way to do this... > > On Wednesday 16 September 2009 13:22:41 Dawit A. wrote: > > David, > > > > Yes, now that I understand how the high level jobs work, I completely got > > your concern about the potential for a deadlocked condition. > > > > Right now I am working on a solution to eliminate this deadlock condition > > from ocurring in the scheduler. There is a way to do this by pairing the > > requests from high level jobs so that the scheduler can take that into > > account when it is scheduling jobs. > > > > More about that once I refine and test the solution to see whether or not > > it is viable and does solve the deadlock problem... > > > > On Wednesday 16 September 2009 11:52:44 David Faure wrote: > > > On Tuesday 08 September 2009, Dawit A. wrote: > > > > [snipped] > > > > > > Can you give an example of how to trigger this > > > > dead lock ? I suppose I can simply start copying files from remote > > > > locations (sftp/ftp) until the max instances limit is reached, no ? > > > > > > Maybe. I admit I didn't actually try, but it seems logical to me, with > > > the above reasoning. To get many filecopyjobs started, I recommend > > > copying a whole directory of files. That gives time to start another > > > directory copy while it's happening. Each file being copied will start > > > a FileCopyJob. > > > > Just for clarification, the dead lock condition can only occur if both > > ends of the high level job are remote urls, correct ? That is both the > > put and get operation must be remote otherwise they are handled > > differently and the scheduling does not come into the equation... Or did > > I get that wrong ? > --Boundary-00=_zFlsKf3Gr11mkr9 Content-Type: text/x-patch; charset="UTF-8"; name="kio.patch" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="kio.patch" Index: kio/kio/job.cpp =================================================================== --- kio/kio/job.cpp (revision 1024592) +++ kio/kio/job.cpp (working copy) @@ -308,9 +308,9 @@ } Scheduler::doJob(q); + Scheduler::scheduleJob(q); } - bool SimpleJob::doKill() { //kDebug() << this; @@ -627,6 +627,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -834,6 +835,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -999,6 +1001,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -1597,6 +1600,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -1803,6 +1807,8 @@ else { startDataPump(); + if (m_putJob) + SimpleJobPrivate::get(m_putJob)->m_pairedUrl = m_src; } } @@ -2420,6 +2426,7 @@ // Return slave to the scheduler d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } @@ -2671,6 +2678,7 @@ d->m_url = d->m_waitQueue.first().url; d->slaveDone(); Scheduler::doJob(this); + Scheduler::scheduleJob(this); } } } Index: kio/kio/job_p.h =================================================================== --- kio/kio/job_p.h (revision 1024592) +++ kio/kio/job_p.h (working copy) @@ -98,6 +98,13 @@ KUrl m_url; KUrl m_subUrl; int m_command; + /* + In high-level jobs such as FileCopyJob, this variable represents the + source (GET) url and used by KIO::Scheduler to avoid deadlock conditions + when scheduling jobs with two remote ends, e.g. copying file from one ftp + server to another one. + */ + KUrl m_pairedUrl; void simpleJobInit(); Index: kio/kio/scheduler.cpp =================================================================== --- kio/kio/scheduler.cpp (revision 1024592) +++ kio/kio/scheduler.cpp (working copy) @@ -55,6 +55,11 @@ return SimpleJobPrivate::get(job)->m_command; } +static inline KUrl pairedRequest(SimpleJob *job) +{ + return SimpleJobPrivate::get(job)->m_pairedUrl; +} + static inline void startJob(SimpleJob *job, Slave *slave) { SimpleJobPrivate::get(job)->start(slave); @@ -136,8 +141,8 @@ void publishSlaveOnHold(); void registerWindow(QWidget *wid); - Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact); - Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url); + Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact, bool enforeLimits = false); + Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl& url, bool enforceLimits = false); void debug_info(); @@ -162,7 +167,7 @@ class KIO::SchedulerPrivate::ProtocolInfo { public: - ProtocolInfo() : maxSlaves(1), skipCount(0) + ProtocolInfo() : maxSlaves(1), maxSlavesPerHost(0), skipCount(0) { } @@ -181,16 +186,64 @@ return ret; } + int activeSlavesCountFor(SimpleJob* job) + { + int count = 0; + QString host = job->url().host(); + + if (!host.isEmpty()) + { + QListIterator it (activeSlaves); + while (it.hasNext()) + { + if (host == it.next()->host()) + count++; + } + + QString url = job->url().url(); + kDebug() << "*** Reserve list count: " << reserveList.count(); + + if (reserveList.contains(url)) { + kDebug() << "*** Removing paired request for: " << url; + reserveList.removeOne(url); + } else { + count += reserveList.count(); + } + } + + return count; + } + + QStringList reserveList; QList joblist; SlaveList activeSlaves; SlaveList idleSlaves; CoSlaveMap coSlaves; SlaveList coIdleSlaves; int maxSlaves; + int maxSlavesPerHost; int skipCount; QString protocol; }; +static inline bool checkLimits(KIO::SchedulerPrivate::ProtocolInfo *protInfo, SimpleJob *job) +{ + const int numActiveSlaves = protInfo->activeSlavesCountFor(job); + +#if 0 + kDebug() << job->url() << ": "; + kDebug() << " protocol :" << job->url().protocol() + << ", max :" << protInfo->maxSlaves + << ", max host :" << protInfo->maxSlavesPerHost + << ", active :" << protInfo->activeSlaves.count() + << ", idle :" << protInfo->idleSlaves.count() + << ", active for " << job->url().host() << " = " << numActiveSlaves; +#endif + + return (protInfo->maxSlavesPerHost < 1 || protInfo->maxSlavesPerHost > numActiveSlaves); +} + + KIO::SchedulerPrivate::ProtocolInfo * KIO::SchedulerPrivate::ProtocolInfoDict::get(const QString &protocol) { @@ -200,6 +253,7 @@ info = new ProtocolInfo; info->protocol = protocol; info->maxSlaves = KProtocolInfo::maxSlaves( protocol ); + info->maxSlavesPerHost = KProtocolInfo::maxSlavesPerHost( protocol ); insert(protocol, info); } @@ -382,9 +436,10 @@ } } -void SchedulerPrivate::doJob(SimpleJob *job) { +void SchedulerPrivate::doJob(SimpleJob *job) { JobData jobData; jobData.protocol = KProtocolManager::slaveProtocol(job->url(), jobData.proxy); + // kDebug(7006) << "protocol=" << jobData->protocol; if (jobCommand(job) == CMD_GET) { @@ -401,15 +456,17 @@ } void SchedulerPrivate::scheduleJob(SimpleJob *job) { + newJobs.removeOne(job); const JobData& jobData = extraJobData.value(job); QString protocol = jobData.protocol; // kDebug(7006) << "protocol=" << protocol; ProtocolInfo *protInfo = protInfoDict.get(protocol); - protInfo->joblist.append(job); - - slaveTimer.start(0); + if (!protInfo->joblist.contains(job)) { // scheduleJob already called for this job? + protInfo->joblist.append(job); + slaveTimer.start(0); + } } void SchedulerPrivate::cancelJob(SimpleJob *job) { @@ -422,6 +479,7 @@ newJobs.removeAll(job); ProtocolInfo *protInfo = protInfoDict.get(jobData.protocol); protInfo->joblist.removeAll(job); + protInfo->reserveList.removeAll(job->url().url()); // Search all slaves to see if job is in the queue of a coSlave foreach(Slave* coSlave, protInfo->allSlaves()) @@ -520,15 +578,15 @@ SimpleJob *job = 0; Slave *slave = 0; - + if (protInfo->skipCount > 2) { bool dummy; // Prevent starvation. We skip the first entry in the queue at most // 2 times in a row. The protInfo->skipCount = 0; - job = protInfo->joblist.at(0); - slave = findIdleSlave(protInfo, job, dummy ); + job = protInfo->joblist.at(0); + slave = findIdleSlave(protInfo, job, dummy, true); } else { @@ -538,7 +596,8 @@ for(int i = 0; (i < protInfo->joblist.count()) && (i < 10); i++) { job = protInfo->joblist.at(i); - slave = findIdleSlave(protInfo, job, exact); + slave = findIdleSlave(protInfo, job, exact, true); + if (!firstSlave) { firstJob = job; @@ -561,28 +620,37 @@ if (!slave) { - if ( protInfo->maxSlaves > static_cast(protInfo->activeSlaves.count()) ) - { - newSlave = true; - slave = createSlave(protInfo, job, job->url()); - if (!slave) - slaveTimer.start(0); - } + slave = createSlave(protInfo, job, job->url(), true); + if (slave) + newSlave = true; + else + slaveTimer.start(0); } if (!slave) { -// kDebug(7006) << "No slaves available"; -// kDebug(7006) << " -- active: " << protInfo->activeSlaves.count(); + //kDebug() << "No slaves available"; + //kDebug() << " -- active: " << protInfo->activeSlaves.count(); return false; } + // Check to make sure the scheduling of this job is dependent on another + // job request yet to arrive and if it is add the url of the new job to + // to the reserve list. This is done to avoid any potential deadlock + // conditions that might occur as a result of scheduling high level jobs, + // e.g. cipying file from one ftp server to another one. + KUrl url = pairedRequest(job); + if (url.isValid()) + { + kDebug() << "*** PAIRED REQUEST: " << url; + protInfoDict.get(url.protocol())->reserveList << url.url(); + } + protInfo->activeSlaves.append(slave); protInfo->idleSlaves.removeAll(slave); protInfo->joblist.removeOne(job); // kDebug(7006) << "scheduler: job started " << job; - SchedulerPrivate::JobData jobData = extraJobData.value(job); setupSlave(slave, job->url(), jobData.protocol, jobData.proxy, newSlave); startJob(job, slave); @@ -635,6 +703,8 @@ foreach( Slave *slave, idleSlaves ) { +// kDebug() << "job protocol: " << protocol << ", slave protocol: " << slave->slaveProtocol(); +// kDebug() << "job host: " << host << ", slave host: " << slave->host(); if ((protocol == slave->slaveProtocol()) && (host == slave->host()) && (port == slave->port()) && @@ -652,82 +722,100 @@ return 0; } -Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact) +Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, + bool &exact, bool enforceLimits) { Slave *slave = 0; - JobData jobData = extraJobData.value(job); - if (jobData.checkOnHold) + if (!enforceLimits || checkLimits(protInfo, job)) { - slave = Slave::holdSlave(jobData.protocol, job->url()); - if (slave) - return slave; + JobData jobData = extraJobData.value(job); + + if (jobData.checkOnHold) + { + slave = Slave::holdSlave(jobData.protocol, job->url()); + if (slave) + return slave; + } + + if (slaveOnHold) + { + // Make sure that the job wants to do a GET or a POST, and with no offset + bool bCanReuse = (jobCommand(job) == CMD_GET); + KIO::TransferJob * tJob = qobject_cast(job); + if ( tJob ) + { + bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == CMD_SPECIAL); + if ( bCanReuse ) + { + KIO::MetaData outgoing = tJob->outgoingMetaData(); + QString resume = (!outgoing.contains("resume")) ? QString() : outgoing["resume"]; + kDebug(7006) << "Resume metadata is" << resume; + bCanReuse = (resume.isEmpty() || resume == "0"); + } + } +// kDebug(7006) << "bCanReuse = " << bCanReuse; + if (bCanReuse) + { + if (job->url() == urlOnHold) + { + kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold; + slave = slaveOnHold; + } + else + { + kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")"; + slaveOnHold->kill(); + } + slaveOnHold = 0; + urlOnHold = KUrl(); + } + if (slave) + return slave; + } + + slave = searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, exact); } - if (slaveOnHold) + + return slave; +} + +Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, + const KUrl &url, bool enforceLimits) +{ + Slave *slave = 0; + const int slavesCount = protInfo->activeSlaves.count() + protInfo->idleSlaves.count(); + + if (!enforceLimits || + (protInfo->maxSlaves > slavesCount && checkLimits(protInfo, job))) { - // Make sure that the job wants to do a GET or a POST, and with no offset - bool bCanReuse = (jobCommand(job) == CMD_GET); - KIO::TransferJob * tJob = qobject_cast(job); - if ( tJob ) - { - bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == CMD_SPECIAL); - if ( bCanReuse ) - { - KIO::MetaData outgoing = tJob->outgoingMetaData(); - QString resume = (!outgoing.contains("resume")) ? QString() : outgoing["resume"]; - kDebug(7006) << "Resume metadata is" << resume; - bCanReuse = (resume.isEmpty() || resume == "0"); - } - } -// kDebug(7006) << "bCanReuse = " << bCanReuse; - if (bCanReuse) - { - if (job->url() == urlOnHold) - { - kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold; - slave = slaveOnHold; - } - else - { - kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")"; - slaveOnHold->kill(); - } - slaveOnHold = 0; - urlOnHold = KUrl(); - } - if (slave) - return slave; + int error; + QString errortext; + slave = Slave::createSlave(protInfo->protocol, url, error, errortext); + + if (slave) + { + protInfo->idleSlaves.append(slave); + q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)), + SLOT(slotSlaveDied(KIO::Slave *))); + q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, bool)), + SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, bool))); + } + else + { + kError() << "couldn't create slave:" << errortext; + if (job) + { + protInfo->joblist.removeAll(job); + extraJobData.remove(job); + job->slotError( error, errortext ); + } + } } - return searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, exact); + return slave; } -Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url) -{ - int error; - QString errortext; - Slave *slave = Slave::createSlave(protInfo->protocol, url, error, errortext); - if (slave) - { - protInfo->idleSlaves.append(slave); - q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)), - SLOT(slotSlaveDied(KIO::Slave *))); - q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, bool)), - SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, bool))); - } - else - { - kError() << "couldn't create slave:" << errortext; - if (job) - { - protInfo->joblist.removeAll(job); - extraJobData.remove(job); - job->slotError( error, errortext ); - } - } - return slave; -} - void SchedulerPrivate::slotSlaveStatus(pid_t, const QByteArray&, const QString &, bool) { } @@ -955,16 +1043,15 @@ { // kDebug(7006) << "_assignJobToSlave( " << job << ", " << slave << ")"; QString dummy; - if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), dummy )) - || - (!newJobs.removeAll(job))) + ProtocolInfo *protInfo = protInfoDict.get(slave->protocol()); + if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), dummy )) || + !(protInfo->joblist.removeAll(job) > 0 || newJobs.removeAll(job) > 0)) { kDebug(7006) << "_assignJobToSlave(): ERROR, nonmatching or unknown job."; job->kill(); return false; } - ProtocolInfo *protInfo = protInfoDict.get(slave->protocol()); JobList *list = protInfo->coSlaves.value(slave); assert(list); if (!list) --Boundary-00=_zFlsKf3Gr11mkr9 Content-Type: text/x-patch; charset="UTF-8"; name="kioslaves.patch" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="kioslaves.patch" Index: kioslave/file/file.protocol =================================================================== --- kioslave/file/file.protocol (revision 1024592) +++ kioslave/file/file.protocol (working copy) @@ -13,6 +13,6 @@ moving=true opening=true deleteRecursive=true -maxInstances=4 +maxInstances=50 X-DocPath=kioslave/file/index.html Class=:local Index: kioslave/http/http.protocol =================================================================== --- kioslave/http/http.protocol (revision 1024592) +++ kioslave/http/http.protocol (working copy) @@ -9,6 +9,7 @@ defaultMimetype=application/octet-stream determineMimetypeFromExtension=false Icon=text-html -maxInstances=3 +maxInstances=18 +maxInstancesPerHost=3 X-DocPath=kioslave/http/index.html Class=:internet Index: kioslave/http/webdavs.protocol =================================================================== --- kioslave/http/webdavs.protocol (revision 1024592) +++ kioslave/http/webdavs.protocol (working copy) @@ -14,5 +14,7 @@ determineMimetypeFromExtension=false Icon=folder-remote config=webdav +maxInstances=18 +maxInstancesPerHost=3 X-DocPath=kioslave/webdav/index.html Class=:internet Index: kioslave/http/https.protocol =================================================================== --- kioslave/http/https.protocol (revision 1024592) +++ kioslave/http/https.protocol (working copy) @@ -7,6 +7,8 @@ defaultMimetype=application/octet-stream determineMimetypeFromExtension=false Icon=text-html +maxInstances=18 +maxInstancesPerHost=3 config=http X-DocPath=kioslave/http/index.html Class=:internet Index: kioslave/http/webdav.protocol =================================================================== --- kioslave/http/webdav.protocol (revision 1024592) +++ kioslave/http/webdav.protocol (working copy) @@ -13,6 +13,7 @@ defaultMimetype=application/octet-stream determineMimetypeFromExtension=false Icon=folder-remote -maxInstances=3 +maxInstances=18 +maxInstancesPerHost=3 X-DocPath=kioslave/webdav/index.html Class=:internet Index: kioslave/ftp/ftp.protocol =================================================================== --- kioslave/ftp/ftp.protocol (revision 1024592) +++ kioslave/ftp/ftp.protocol (working copy) @@ -12,6 +12,7 @@ deleting=true moving=true Icon=folder-remote -maxInstances=2 +maxInstancesPerHost=2 +maxInstances=10 X-DocPath=kioslave/ftp/index.html Class=:internet --Boundary-00=_zFlsKf3Gr11mkr9 Content-Type: text/x-patch; charset="UTF-8"; name="kprotocolinfo.patch" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="kprotocolinfo.patch" Index: kdecore/sycoca/kprotocolinfo_p.h =================================================================== --- kdecore/sycoca/kprotocolinfo_p.h (revision 1024592) +++ kdecore/sycoca/kprotocolinfo_p.h (working copy) @@ -58,6 +58,7 @@ //KUrl::URIMode uriMode; QStringList capabilities; QString proxyProtocol; + int maxSlavesPerHost; }; Index: kdecore/sycoca/kprotocolinfo.cpp =================================================================== --- kdecore/sycoca/kprotocolinfo.cpp (revision 1024592) +++ kdecore/sycoca/kprotocolinfo.cpp (working copy) @@ -69,6 +69,7 @@ m_icon = config.readEntry( "Icon" ); m_config = config.readEntry( "config", m_name ); m_maxSlaves = config.readEntry( "maxInstances", 1); + d->maxSlavesPerHost = config.readEntry( "maxInstancesPerHost", 0); QString tmp = config.readEntry( "input" ); if ( tmp == "filesystem" ) @@ -151,7 +152,7 @@ >> d->capabilities >> d->proxyProtocol >> i_canRenameFromFile >> i_canRenameToFile >> i_canDeleteRecursive >> i_fileNameUsedForCopying - >> d->archiveMimetype; + >> d->archiveMimetype >> d->maxSlavesPerHost; m_inputType = (Type) i_inputType; m_outputType = (Type) i_outputType; @@ -230,7 +231,7 @@ << capabilities << proxyProtocol << i_canRenameFromFile << i_canRenameToFile << i_canDeleteRecursive << i_fileNameUsedForCopying - << archiveMimetype; + << archiveMimetype << maxSlavesPerHost; } @@ -282,6 +283,15 @@ return prot->m_maxSlaves; } +int KProtocolInfo::maxSlavesPerHost( const QString& _protocol ) +{ + KProtocolInfo::Ptr prot = KProtocolInfoFactory::self()->findProtocol(_protocol); + if ( !prot ) + return 0; + + return prot->d_func()->maxSlavesPerHost; +} + bool KProtocolInfo::determineMimetypeFromExtension( const QString &_protocol ) { KProtocolInfo::Ptr prot = KProtocolInfoFactory::self()->findProtocol( _protocol ); Index: kdecore/sycoca/ksycoca.cpp =================================================================== --- kdecore/sycoca/ksycoca.cpp (revision 1024592) +++ kdecore/sycoca/ksycoca.cpp (working copy) @@ -55,7 +55,7 @@ * If the existing file is outdated, it will not get read * but instead we'll ask kded to regenerate a new one... */ -#define KSYCOCA_VERSION 143 +#define KSYCOCA_VERSION 144 /** * Sycoca file name, used internally (by kbuildsycoca) Index: kdecore/sycoca/kprotocolinfo.h =================================================================== --- kdecore/sycoca/kprotocolinfo.h (revision 1024592) +++ kdecore/sycoca/kprotocolinfo.h (working copy) @@ -218,7 +218,19 @@ */ static int maxSlaves( const QString& protocol ); + /** + * Returns the limit on the number of slaves for this protocol per host. + * + * This corresponds to the "maxInstancesPerHost=" field in the protocol description file. + * The default is 0 which means there is no per host limit. + * + * @param protocol the protocol to check + * @return the maximum number of slaves, or 1 if unknown + */ + static int maxSlavesPerHost( const QString& protocol ); + + /** * Returns whether mimetypes can be determined based on extension for this * protocol. For some protocols, e.g. http, the filename extension in the URL * can not be trusted to truly reflect the file type. --Boundary-00=_zFlsKf3Gr11mkr9--