[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-core-devel
Subject: Re: KIO: Mass Copy of Files from Different Sources to Different
From: "Dawit A." <adawit () kde ! org>
Date: 2009-09-16 22:44:49
Message-ID: 200909161844.50053.adawit () kde ! org
[Download RAW message or body]
Okay, I have run some tests on my approach to address the potential deadlock
issue when scheduling high-level jobs. The idea is as follows:
1.) FileCopyJob is called as result of either KIO::file_copy by application.
2.) Step #2 results in a creation of "PUT" job.
3.) A meta data named "paried-request" is set at step #3 with the source url.
This is done in FileCopyJob::startBestCopyMethod.
4.) When the scheduler is asked to create a slave for this "PUT" job, it will
save any url specified in the "paired-request" meta-data in a reserve list.
5.) The scheduler always takes into account the number of items in the
reserved list before attempting to assign any ioslave to a job.
The above process ensures that no two "PUT" jobs are assigned an ioslave
before a "PUT/GET" pair.
I tested it with as many scenrios as I can think, but it is entirely possible
I completely missed something. Anyhow, all feedback is welcome...
NOTE: I do not like to use the meta-data system for this, but this a quick
proof of concept. Perhaps, there is a better way to do this...
On Wednesday 16 September 2009 13:22:41 Dawit A. wrote:
> David,
>
> Yes, now that I understand how the high level jobs work, I completely got
> your concern about the potential for a deadlocked condition.
>
> Right now I am working on a solution to eliminate this deadlock condition
> from ocurring in the scheduler. There is a way to do this by pairing the
> requests from high level jobs so that the scheduler can take that into
> account when it is scheduling jobs.
>
> More about that once I refine and test the solution to see whether or not
> it is viable and does solve the deadlock problem...
>
> On Wednesday 16 September 2009 11:52:44 David Faure wrote:
> > On Tuesday 08 September 2009, Dawit A. wrote:
>
> [snipped]
>
> > > Can you give an example of how to trigger this
> > > dead lock ? I suppose I can simply start copying files from remote
> > > locations (sftp/ftp) until the max instances limit is reached, no ?
> >
> > Maybe. I admit I didn't actually try, but it seems logical to me, with
> > the above reasoning. To get many filecopyjobs started, I recommend
> > copying a whole directory of files. That gives time to start another
> > directory copy while it's happening. Each file being copied will start a
> > FileCopyJob.
>
> Just for clarification, the dead lock condition can only occur if both ends
> of the high level job are remote urls, correct ? That is both the put and
> get operation must be remote otherwise they are handled differently and
> the scheduling does not come into the equation... Or did I get that wrong
> ?
>
["sched_jobs.patch" (text/x-patch)]
Index: job.cpp
===================================================================
--- job.cpp (revision 1024592)
+++ job.cpp (working copy)
@@ -308,9 +308,9 @@
}
Scheduler::doJob(q);
+ Scheduler::scheduleJob(q);
}
-
bool SimpleJob::doKill()
{
//kDebug() << this;
@@ -627,6 +627,7 @@
// Return slave to the scheduler
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
@@ -834,6 +835,7 @@
// Return slave to the scheduler
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
@@ -999,6 +1001,7 @@
// Return slave to the scheduler
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
@@ -1597,6 +1600,7 @@
// Return slave to the scheduler
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
@@ -1803,6 +1807,8 @@
else
{
startDataPump();
+ if (m_putJob)
+ m_putJob->addMetaData("paired_request", m_src.url());
}
}
@@ -2420,6 +2426,7 @@
// Return slave to the scheduler
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
@@ -2671,6 +2678,7 @@
d->m_url = d->m_waitQueue.first().url;
d->slaveDone();
Scheduler::doJob(this);
+ Scheduler::scheduleJob(this);
}
}
}
Index: scheduler.cpp
===================================================================
--- scheduler.cpp (revision 1024592)
+++ scheduler.cpp (working copy)
@@ -136,8 +136,8 @@
void publishSlaveOnHold();
void registerWindow(QWidget *wid);
- Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact);
- Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url);
+ Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact, bool \
enforeLimits = false); + Slave *createSlave(ProtocolInfo *protInfo, SimpleJob \
*job, const KUrl& url, bool enforceLimits = false);
void debug_info();
@@ -162,7 +162,7 @@
class KIO::SchedulerPrivate::ProtocolInfo
{
public:
- ProtocolInfo() : maxSlaves(1), skipCount(0)
+ ProtocolInfo() : maxSlaves(1), maxSlavesPerHost(0), skipCount(0)
{
}
@@ -181,16 +181,64 @@
return ret;
}
+ int activeSlavesCountFor(SimpleJob* job)
+ {
+ int count = 0;
+ QString host = job->url().host();
+
+ if (!host.isEmpty())
+ {
+ QListIterator<Slave *> it (activeSlaves);
+ while (it.hasNext())
+ {
+ if (host == it.next()->host())
+ count++;
+ }
+
+ QString url = job->url().url();
+ kDebug() << "*** Reserve list count: " << reserveList.count();
+
+ if (reserveList.contains(url)) {
+ kDebug() << "*** Removing paired request for: " << url;
+ reserveList.removeOne(url);
+ } else {
+ count += reserveList.count();
+ }
+ }
+
+ return count;
+ }
+
+ QStringList reserveList;
QList<SimpleJob *> joblist;
SlaveList activeSlaves;
SlaveList idleSlaves;
CoSlaveMap coSlaves;
SlaveList coIdleSlaves;
int maxSlaves;
+ int maxSlavesPerHost;
int skipCount;
QString protocol;
};
+static inline bool checkLimits(KIO::SchedulerPrivate::ProtocolInfo *protInfo, \
SimpleJob *job) +{
+ const int numActiveSlaves = protInfo->activeSlavesCountFor(job);
+
+#if 0
+ kDebug() << job->url() << ": ";
+ kDebug() << " protocol :" << job->url().protocol()
+ << ", max :" << protInfo->maxSlaves
+ << ", max host :" << protInfo->maxSlavesPerHost
+ << ", active :" << protInfo->activeSlaves.count()
+ << ", idle :" << protInfo->idleSlaves.count()
+ << ", active for " << job->url().host() << " = " << numActiveSlaves;
+#endif
+
+ return (protInfo->maxSlavesPerHost < 1 || protInfo->maxSlavesPerHost > \
numActiveSlaves); +}
+
+
KIO::SchedulerPrivate::ProtocolInfo *
KIO::SchedulerPrivate::ProtocolInfoDict::get(const QString &protocol)
{
@@ -200,6 +248,7 @@
info = new ProtocolInfo;
info->protocol = protocol;
info->maxSlaves = KProtocolInfo::maxSlaves( protocol );
+ info->maxSlavesPerHost = KProtocolInfo::maxSlavesPerHost( protocol );
insert(protocol, info);
}
@@ -382,9 +431,10 @@
}
}
-void SchedulerPrivate::doJob(SimpleJob *job) {
+void SchedulerPrivate::doJob(SimpleJob *job) {
JobData jobData;
jobData.protocol = KProtocolManager::slaveProtocol(job->url(), jobData.proxy);
+
// kDebug(7006) << "protocol=" << jobData->protocol;
if (jobCommand(job) == CMD_GET)
{
@@ -401,15 +451,17 @@
}
void SchedulerPrivate::scheduleJob(SimpleJob *job) {
+
newJobs.removeOne(job);
const JobData& jobData = extraJobData.value(job);
QString protocol = jobData.protocol;
// kDebug(7006) << "protocol=" << protocol;
ProtocolInfo *protInfo = protInfoDict.get(protocol);
- protInfo->joblist.append(job);
-
- slaveTimer.start(0);
+ if (!protInfo->joblist.contains(job)) { // scheduleJob already called for this \
job? + protInfo->joblist.append(job);
+ slaveTimer.start(0);
+ }
}
void SchedulerPrivate::cancelJob(SimpleJob *job) {
@@ -422,6 +474,7 @@
newJobs.removeAll(job);
ProtocolInfo *protInfo = protInfoDict.get(jobData.protocol);
protInfo->joblist.removeAll(job);
+ protInfo->reserveList.removeAll(job->url().url());
// Search all slaves to see if job is in the queue of a coSlave
foreach(Slave* coSlave, protInfo->allSlaves())
@@ -520,15 +573,15 @@
SimpleJob *job = 0;
Slave *slave = 0;
-
+
if (protInfo->skipCount > 2)
{
bool dummy;
// Prevent starvation. We skip the first entry in the queue at most
// 2 times in a row. The
protInfo->skipCount = 0;
- job = protInfo->joblist.at(0);
- slave = findIdleSlave(protInfo, job, dummy );
+ job = protInfo->joblist.at(0);
+ slave = findIdleSlave(protInfo, job, dummy, true);
}
else
{
@@ -538,7 +591,8 @@
for(int i = 0; (i < protInfo->joblist.count()) && (i < 10); i++)
{
job = protInfo->joblist.at(i);
- slave = findIdleSlave(protInfo, job, exact);
+ slave = findIdleSlave(protInfo, job, exact, true);
+
if (!firstSlave)
{
firstJob = job;
@@ -561,28 +615,33 @@
if (!slave)
{
- if ( protInfo->maxSlaves > static_cast<int>(protInfo->activeSlaves.count()) )
- {
- newSlave = true;
- slave = createSlave(protInfo, job, job->url());
- if (!slave)
- slaveTimer.start(0);
- }
+ slave = createSlave(protInfo, job, job->url(), true);
+ if (slave)
+ newSlave = true;
+ else
+ slaveTimer.start(0);
}
if (!slave)
{
-// kDebug(7006) << "No slaves available";
-// kDebug(7006) << " -- active: " << protInfo->activeSlaves.count();
+ //kDebug() << "No slaves available";
+ //kDebug() << " -- active: " << protInfo->activeSlaves.count();
return false;
}
+ KIO::MetaData metaData = job->outgoingMetaData();
+ if (metaData.contains("paired_request")) {
+ KUrl url (metaData.take("paired_request"));
+ kDebug() << "*** PAIRED REQUEST: " << url;
+ protInfoDict.get(url.protocol())->reserveList << url.url();
+ job->setMetaData(metaData);
+ }
+
protInfo->activeSlaves.append(slave);
protInfo->idleSlaves.removeAll(slave);
protInfo->joblist.removeOne(job);
// kDebug(7006) << "scheduler: job started " << job;
-
SchedulerPrivate::JobData jobData = extraJobData.value(job);
setupSlave(slave, job->url(), jobData.protocol, jobData.proxy, newSlave);
startJob(job, slave);
@@ -635,6 +694,8 @@
foreach( Slave *slave, idleSlaves )
{
+// kDebug() << "job protocol: " << protocol << ", slave protocol: " << \
slave->slaveProtocol(); +// kDebug() << "job host: " << host << ", slave host: \
" << slave->host(); if ((protocol == slave->slaveProtocol()) &&
(host == slave->host()) &&
(port == slave->port()) &&
@@ -652,82 +713,100 @@
return 0;
}
-Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool \
&exact) +Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob \
*job, + bool &exact, bool enforceLimits)
{
Slave *slave = 0;
- JobData jobData = extraJobData.value(job);
- if (jobData.checkOnHold)
+ if (!enforceLimits || checkLimits(protInfo, job))
{
- slave = Slave::holdSlave(jobData.protocol, job->url());
- if (slave)
- return slave;
+ JobData jobData = extraJobData.value(job);
+
+ if (jobData.checkOnHold)
+ {
+ slave = Slave::holdSlave(jobData.protocol, job->url());
+ if (slave)
+ return slave;
+ }
+
+ if (slaveOnHold)
+ {
+ // Make sure that the job wants to do a GET or a POST, and with no offset
+ bool bCanReuse = (jobCommand(job) == CMD_GET);
+ KIO::TransferJob * tJob = qobject_cast<KIO::TransferJob *>(job);
+ if ( tJob )
+ {
+ bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == \
CMD_SPECIAL); + if ( bCanReuse )
+ {
+ KIO::MetaData outgoing = tJob->outgoingMetaData();
+ QString resume = (!outgoing.contains("resume")) ? QString() : \
outgoing["resume"]; + kDebug(7006) << "Resume metadata is" << resume;
+ bCanReuse = (resume.isEmpty() || resume == "0");
+ }
+ }
+// kDebug(7006) << "bCanReuse = " << bCanReuse;
+ if (bCanReuse)
+ {
+ if (job->url() == urlOnHold)
+ {
+ kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold;
+ slave = slaveOnHold;
+ }
+ else
+ {
+ kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << \
")"; + slaveOnHold->kill();
+ }
+ slaveOnHold = 0;
+ urlOnHold = KUrl();
+ }
+ if (slave)
+ return slave;
+ }
+
+ slave = searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, \
exact); }
- if (slaveOnHold)
+
+ return slave;
+}
+
+Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job,
+ const KUrl &url, bool enforceLimits)
+{
+ Slave *slave = 0;
+ const int slavesCount = protInfo->activeSlaves.count() + \
protInfo->idleSlaves.count(); +
+ if (!enforceLimits ||
+ (protInfo->maxSlaves > slavesCount && checkLimits(protInfo, job)))
{
- // Make sure that the job wants to do a GET or a POST, and with no offset
- bool bCanReuse = (jobCommand(job) == CMD_GET);
- KIO::TransferJob * tJob = qobject_cast<KIO::TransferJob *>(job);
- if ( tJob )
- {
- bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == \
CMD_SPECIAL);
- if ( bCanReuse )
- {
- KIO::MetaData outgoing = tJob->outgoingMetaData();
- QString resume = (!outgoing.contains("resume")) ? QString() : \
outgoing["resume"];
- kDebug(7006) << "Resume metadata is" << resume;
- bCanReuse = (resume.isEmpty() || resume == "0");
- }
- }
-// kDebug(7006) << "bCanReuse = " << bCanReuse;
- if (bCanReuse)
- {
- if (job->url() == urlOnHold)
- {
- kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold;
- slave = slaveOnHold;
- }
- else
- {
- kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")";
- slaveOnHold->kill();
- }
- slaveOnHold = 0;
- urlOnHold = KUrl();
- }
- if (slave)
- return slave;
+ int error;
+ QString errortext;
+ slave = Slave::createSlave(protInfo->protocol, url, error, errortext);
+
+ if (slave)
+ {
+ protInfo->idleSlaves.append(slave);
+ q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)),
+ SLOT(slotSlaveDied(KIO::Slave *)));
+ q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const \
QString &, bool)), + SLOT(slotSlaveStatus(pid_t,const \
QByteArray&, const QString &, bool))); + }
+ else
+ {
+ kError() << "couldn't create slave:" << errortext;
+ if (job)
+ {
+ protInfo->joblist.removeAll(job);
+ extraJobData.remove(job);
+ job->slotError( error, errortext );
+ }
+ }
}
- return searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, \
exact); + return slave;
}
-Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, const \
KUrl &url)
-{
- int error;
- QString errortext;
- Slave *slave = Slave::createSlave(protInfo->protocol, url, error, errortext);
- if (slave)
- {
- protInfo->idleSlaves.append(slave);
- q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)),
- SLOT(slotSlaveDied(KIO::Slave *)));
- q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, \
bool)),
- SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, \
bool)));
- }
- else
- {
- kError() << "couldn't create slave:" << errortext;
- if (job)
- {
- protInfo->joblist.removeAll(job);
- extraJobData.remove(job);
- job->slotError( error, errortext );
- }
- }
- return slave;
-}
-
void SchedulerPrivate::slotSlaveStatus(pid_t, const QByteArray&, const QString &, \
bool) {
}
@@ -955,16 +1034,15 @@
{
// kDebug(7006) << "_assignJobToSlave( " << job << ", " << slave << ")";
QString dummy;
- if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), \
dummy ))
- ||
- (!newJobs.removeAll(job)))
+ ProtocolInfo *protInfo = protInfoDict.get(slave->protocol());
+ if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), \
dummy )) || + !(protInfo->joblist.removeAll(job) > 0 || newJobs.removeAll(job) \
> 0)) {
kDebug(7006) << "_assignJobToSlave(): ERROR, nonmatching or unknown job.";
job->kill();
return false;
}
- ProtocolInfo *protInfo = protInfoDict.get(slave->protocol());
JobList *list = protInfo->coSlaves.value(slave);
assert(list);
if (!list)
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic