[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-core-devel
Subject:    Re: KIO: Mass Copy of Files from Different Sources to Different
From:       "Dawit A." <adawit () kde ! org>
Date:       2009-09-16 22:44:49
Message-ID: 200909161844.50053.adawit () kde ! org
[Download RAW message or body]

Okay, I have run some tests on my approach to address the potential deadlock 
issue when scheduling high-level jobs. The idea is as follows:

1.) FileCopyJob is called as result of either KIO::file_copy by application.

2.) Step #2 results in a creation of "PUT" job.

3.) A meta data named "paried-request" is set at step #3 with the source url. 
This is done in FileCopyJob::startBestCopyMethod.

4.) When the scheduler is asked to create a slave for this "PUT" job, it will 
save any url specified in the "paired-request" meta-data in a reserve list.

5.) The scheduler always takes into account the number of items in the 
reserved list before attempting to assign any ioslave to a job.

The above process ensures that no two "PUT" jobs are assigned an ioslave 
before a "PUT/GET" pair.

I tested it with as many scenrios as I can think, but it is entirely possible 
I completely missed something. Anyhow, all feedback is welcome... 

NOTE: I do not like to use the meta-data system for this, but this a quick 
proof of concept. Perhaps, there is a better way to do this...

On Wednesday 16 September 2009 13:22:41 Dawit A. wrote:
> David,
> 
> Yes, now that I understand how the high level jobs work, I completely got
>  your concern about the potential for a deadlocked condition.
> 
> Right now I am working on a solution to eliminate this deadlock condition
>  from ocurring in the scheduler. There is a way to do this by pairing the
>  requests from high level jobs so that the scheduler can take that into
>  account when it is scheduling jobs.
> 
> More about that once I refine and test the solution to see whether or not
>  it is viable and does solve the deadlock problem...
> 
> On Wednesday 16 September 2009 11:52:44 David Faure wrote:
> > On Tuesday 08 September 2009, Dawit A. wrote:
> 
> [snipped]
> 
> > >  Can you give an example of how to trigger this
> > >  dead lock ? I suppose I can simply start copying files from remote
> > >  locations (sftp/ftp) until the max instances limit is reached, no ?
> >
> > Maybe. I admit I didn't actually try, but it seems logical to me, with
> > the above reasoning. To get many filecopyjobs started, I recommend
> > copying a whole directory of files. That gives time to start another
> > directory copy while it's happening. Each file being copied will start a
> > FileCopyJob.
> 
> Just for clarification, the dead lock condition can only occur if both ends
>  of the high level job are remote urls, correct ? That is both the put and
>  get operation must be remote otherwise they are handled differently and
>  the scheduling does not come into the equation... Or did I get that wrong
>  ?
> 

["sched_jobs.patch" (text/x-patch)]

Index: job.cpp
===================================================================
--- job.cpp	(revision 1024592)
+++ job.cpp	(working copy)
@@ -308,9 +308,9 @@
     }
 
     Scheduler::doJob(q);
+    Scheduler::scheduleJob(q);
 }
 
-
 bool SimpleJob::doKill()
 {
     //kDebug() << this;
@@ -627,6 +627,7 @@
         // Return slave to the scheduler
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
     }
 }
 
@@ -834,6 +835,7 @@
         // Return slave to the scheduler
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
     }
 }
 
@@ -999,6 +1001,7 @@
         // Return slave to the scheduler
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
     }
 }
 
@@ -1597,6 +1600,7 @@
         // Return slave to the scheduler
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
     }
 }
 
@@ -1803,6 +1807,8 @@
    else
    {
       startDataPump();
+      if (m_putJob)
+        m_putJob->addMetaData("paired_request", m_src.url());
    }
 }
 
@@ -2420,6 +2426,7 @@
         // Return slave to the scheduler
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
     }
 }
 
@@ -2671,6 +2678,7 @@
         d->m_url = d->m_waitQueue.first().url;
         d->slaveDone();
         Scheduler::doJob(this);
+        Scheduler::scheduleJob(this);
      }
   }
 }
Index: scheduler.cpp
===================================================================
--- scheduler.cpp	(revision 1024592)
+++ scheduler.cpp	(working copy)
@@ -136,8 +136,8 @@
     void publishSlaveOnHold();
     void registerWindow(QWidget *wid);
 
-    Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact);
-    Slave *createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KUrl &url);
+    Slave *findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool &exact, bool \
enforeLimits = false); +    Slave *createSlave(ProtocolInfo *protInfo, SimpleJob \
*job, const KUrl& url, bool enforceLimits = false);  
     void debug_info();
 
@@ -162,7 +162,7 @@
 class KIO::SchedulerPrivate::ProtocolInfo
 {
 public:
-    ProtocolInfo() : maxSlaves(1), skipCount(0)
+    ProtocolInfo() : maxSlaves(1), maxSlavesPerHost(0), skipCount(0)
     {
     }
 
@@ -181,16 +181,64 @@
         return ret;
     }
 
+    int activeSlavesCountFor(SimpleJob* job)
+    {
+        int count = 0;
+        QString host = job->url().host();
+
+        if (!host.isEmpty())
+        {
+            QListIterator<Slave *> it (activeSlaves);
+            while (it.hasNext())
+            {
+                if (host == it.next()->host())
+                    count++;
+            }
+
+            QString url = job->url().url();
+            kDebug() << "*** Reserve list count: " << reserveList.count();
+
+            if (reserveList.contains(url)) {
+                kDebug() << "*** Removing paired request for: " << url;
+                reserveList.removeOne(url);
+            } else {
+                count += reserveList.count();
+            }
+        }
+
+        return count;
+    }
+
+    QStringList reserveList;
     QList<SimpleJob *> joblist;
     SlaveList activeSlaves;
     SlaveList idleSlaves;
     CoSlaveMap coSlaves;
     SlaveList coIdleSlaves;
     int maxSlaves;
+    int maxSlavesPerHost;
     int skipCount;
     QString protocol;
 };
 
+static inline bool checkLimits(KIO::SchedulerPrivate::ProtocolInfo *protInfo, \
SimpleJob *job) +{
+  const int numActiveSlaves = protInfo->activeSlavesCountFor(job);
+
+#if 0
+    kDebug() << job->url() << ": ";
+    kDebug() << "    protocol :" << job->url().protocol()
+             << ", max :" << protInfo->maxSlaves
+             << ", max host :" << protInfo->maxSlavesPerHost
+             << ", active :" << protInfo->activeSlaves.count()
+             << ", idle :" << protInfo->idleSlaves.count()
+             << ", active for " << job->url().host() << " = " << numActiveSlaves;
+#endif
+
+  return (protInfo->maxSlavesPerHost < 1 || protInfo->maxSlavesPerHost > \
numActiveSlaves); +}
+
+
 KIO::SchedulerPrivate::ProtocolInfo *
 KIO::SchedulerPrivate::ProtocolInfoDict::get(const QString &protocol)
 {
@@ -200,6 +248,7 @@
         info = new ProtocolInfo;
         info->protocol = protocol;
         info->maxSlaves = KProtocolInfo::maxSlaves( protocol );
+        info->maxSlavesPerHost = KProtocolInfo::maxSlavesPerHost( protocol );
 
         insert(protocol, info);
     }
@@ -382,9 +431,10 @@
     }
 }
 
-void SchedulerPrivate::doJob(SimpleJob *job) {
+void SchedulerPrivate::doJob(SimpleJob *job) {  
     JobData jobData;
     jobData.protocol = KProtocolManager::slaveProtocol(job->url(), jobData.proxy);
+
 //    kDebug(7006) << "protocol=" << jobData->protocol;
     if (jobCommand(job) == CMD_GET)
     {
@@ -401,15 +451,17 @@
 }
 
 void SchedulerPrivate::scheduleJob(SimpleJob *job) {
+
     newJobs.removeOne(job);
     const JobData& jobData = extraJobData.value(job);
 
     QString protocol = jobData.protocol;
 //    kDebug(7006) << "protocol=" << protocol;
     ProtocolInfo *protInfo = protInfoDict.get(protocol);
-    protInfo->joblist.append(job);
-
-    slaveTimer.start(0);
+    if (!protInfo->joblist.contains(job)) { // scheduleJob already called for this \
job? +        protInfo->joblist.append(job);
+        slaveTimer.start(0);
+    }
 }
 
 void SchedulerPrivate::cancelJob(SimpleJob *job) {
@@ -422,6 +474,7 @@
         newJobs.removeAll(job);
         ProtocolInfo *protInfo = protInfoDict.get(jobData.protocol);
         protInfo->joblist.removeAll(job);
+        protInfo->reserveList.removeAll(job->url().url());
 
         // Search all slaves to see if job is in the queue of a coSlave
         foreach(Slave* coSlave, protInfo->allSlaves())
@@ -520,15 +573,15 @@
 
     SimpleJob *job = 0;
     Slave *slave = 0;
-
+    
     if (protInfo->skipCount > 2)
     {
        bool dummy;
        // Prevent starvation. We skip the first entry in the queue at most
        // 2 times in a row. The
        protInfo->skipCount = 0;
-       job = protInfo->joblist.at(0);
-       slave = findIdleSlave(protInfo, job, dummy );
+       job = protInfo->joblist.at(0);     
+       slave = findIdleSlave(protInfo, job, dummy, true);
     }
     else
     {
@@ -538,7 +591,8 @@
        for(int i = 0; (i < protInfo->joblist.count()) && (i < 10); i++)
        {
           job = protInfo->joblist.at(i);
-          slave = findIdleSlave(protInfo, job, exact);
+          slave = findIdleSlave(protInfo, job, exact, true);
+
           if (!firstSlave)
           {
              firstJob = job;
@@ -561,28 +615,33 @@
 
     if (!slave)
     {
-       if ( protInfo->maxSlaves > static_cast<int>(protInfo->activeSlaves.count()) )
-       {
-          newSlave = true;
-          slave = createSlave(protInfo, job, job->url());
-          if (!slave)
-             slaveTimer.start(0);
-       }
+      slave = createSlave(protInfo, job, job->url(), true);
+      if (slave)
+        newSlave = true;
+      else
+        slaveTimer.start(0);
     }
 
     if (!slave)
     {
-//          kDebug(7006) << "No slaves available";
-//          kDebug(7006) << " -- active: " << protInfo->activeSlaves.count();
+       //kDebug() << "No slaves available";
+       //kDebug() << " -- active: " << protInfo->activeSlaves.count();
        return false;
     }
 
+    KIO::MetaData metaData = job->outgoingMetaData();
+    if (metaData.contains("paired_request")) {
+      KUrl url (metaData.take("paired_request"));
+      kDebug() << "*** PAIRED REQUEST: " << url;
+      protInfoDict.get(url.protocol())->reserveList << url.url();
+      job->setMetaData(metaData);
+    }
+
     protInfo->activeSlaves.append(slave);
     protInfo->idleSlaves.removeAll(slave);
     protInfo->joblist.removeOne(job);
 //        kDebug(7006) << "scheduler: job started " << job;
 
-
     SchedulerPrivate::JobData jobData = extraJobData.value(job);
     setupSlave(slave, job->url(), jobData.protocol, jobData.proxy, newSlave);
     startJob(job, slave);
@@ -635,6 +694,8 @@
 
     foreach( Slave *slave, idleSlaves )
     {
+//       kDebug() << "job protocol: " << protocol << ", slave protocol: " << \
slave->slaveProtocol(); +//       kDebug() << "job host: " << host << ", slave host: \
" << slave->host();  if ((protocol == slave->slaveProtocol()) &&
            (host == slave->host()) &&
            (port == slave->port()) &&
@@ -652,82 +713,100 @@
     return 0;
 }
 
-Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob *job, bool \
&exact) +Slave *SchedulerPrivate::findIdleSlave(ProtocolInfo *protInfo, SimpleJob \
*job, +                                       bool &exact, bool enforceLimits)
 {
     Slave *slave = 0;
-    JobData jobData = extraJobData.value(job);
 
-    if (jobData.checkOnHold)
+    if (!enforceLimits || checkLimits(protInfo, job))
     {
-       slave = Slave::holdSlave(jobData.protocol, job->url());
-       if (slave)
-          return slave;
+        JobData jobData = extraJobData.value(job);
+
+        if (jobData.checkOnHold)
+        {
+           slave = Slave::holdSlave(jobData.protocol, job->url());
+           if (slave)
+              return slave;
+        }
+
+        if (slaveOnHold)
+        {
+           // Make sure that the job wants to do a GET or a POST, and with no offset
+           bool bCanReuse = (jobCommand(job) == CMD_GET);
+           KIO::TransferJob * tJob = qobject_cast<KIO::TransferJob *>(job);
+           if ( tJob )
+           {
+              bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == \
CMD_SPECIAL); +              if ( bCanReuse )
+              {
+                KIO::MetaData outgoing = tJob->outgoingMetaData();
+                QString resume = (!outgoing.contains("resume")) ? QString() : \
outgoing["resume"]; +                kDebug(7006) << "Resume metadata is" << resume;
+                bCanReuse = (resume.isEmpty() || resume == "0");
+              }
+           }
+//           kDebug(7006) << "bCanReuse = " << bCanReuse;
+           if (bCanReuse)
+           {
+              if (job->url() == urlOnHold)
+              {
+                 kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold;
+                 slave = slaveOnHold;
+              }
+              else
+              {
+                 kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << \
")"; +                 slaveOnHold->kill();
+              }
+              slaveOnHold = 0;
+              urlOnHold = KUrl();
+           }
+           if (slave)
+              return slave;
+        }
+
+        slave = searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, \
exact);  }
-    if (slaveOnHold)
+
+    return slave;
+}
+
+Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job,
+                                     const KUrl &url, bool enforceLimits)
+{  
+    Slave *slave = 0;
+    const int slavesCount = protInfo->activeSlaves.count() + \
protInfo->idleSlaves.count(); +
+    if (!enforceLimits ||
+        (protInfo->maxSlaves > slavesCount && checkLimits(protInfo, job)))
     {
-       // Make sure that the job wants to do a GET or a POST, and with no offset
-       bool bCanReuse = (jobCommand(job) == CMD_GET);
-       KIO::TransferJob * tJob = qobject_cast<KIO::TransferJob *>(job);
-       if ( tJob )
-       {
-          bCanReuse = (jobCommand(job) == CMD_GET || jobCommand(job) == \
                CMD_SPECIAL);
-          if ( bCanReuse )
-          {
-            KIO::MetaData outgoing = tJob->outgoingMetaData();
-            QString resume = (!outgoing.contains("resume")) ? QString() : \
                outgoing["resume"];
-            kDebug(7006) << "Resume metadata is" << resume;
-            bCanReuse = (resume.isEmpty() || resume == "0");
-          }
-       }
-//       kDebug(7006) << "bCanReuse = " << bCanReuse;
-       if (bCanReuse)
-       {
-          if (job->url() == urlOnHold)
-          {
-             kDebug(7006) << "HOLD: Reusing held slave for" << urlOnHold;
-             slave = slaveOnHold;
-          }
-          else
-          {
-             kDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold << ")";
-             slaveOnHold->kill();
-          }
-          slaveOnHold = 0;
-          urlOnHold = KUrl();
-       }
-       if (slave)
-          return slave;
+        int error;
+        QString errortext;
+        slave = Slave::createSlave(protInfo->protocol, url, error, errortext);
+
+        if (slave)
+        {
+            protInfo->idleSlaves.append(slave);
+            q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)),
+                       SLOT(slotSlaveDied(KIO::Slave *)));
+            q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const \
QString &, bool)), +                       SLOT(slotSlaveStatus(pid_t,const \
QByteArray&, const QString &, bool))); +        }
+        else
+        {
+            kError() << "couldn't create slave:" << errortext;
+            if (job)
+            {
+                protInfo->joblist.removeAll(job);
+                extraJobData.remove(job);
+                job->slotError( error, errortext );
+            }
+        }
     }
 
-    return searchIdleList(protInfo->idleSlaves, job->url(), jobData.protocol, \
exact); +    return slave;
 }
 
-Slave *SchedulerPrivate::createSlave(ProtocolInfo *protInfo, SimpleJob *job, const \
                KUrl &url)
-{
-   int error;
-   QString errortext;
-   Slave *slave = Slave::createSlave(protInfo->protocol, url, error, errortext);
-   if (slave)
-   {
-      protInfo->idleSlaves.append(slave);
-      q->connect(slave, SIGNAL(slaveDied(KIO::Slave *)),
-                 SLOT(slotSlaveDied(KIO::Slave *)));
-      q->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, \
                bool)),
-                 SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, \
                bool)));
-   }
-   else
-   {
-      kError() << "couldn't create slave:" << errortext;
-      if (job)
-      {
-         protInfo->joblist.removeAll(job);
-         extraJobData.remove(job);
-         job->slotError( error, errortext );
-      }
-   }
-   return slave;
-}
-
 void SchedulerPrivate::slotSlaveStatus(pid_t, const QByteArray&, const QString &, \
bool)  {
 }
@@ -955,16 +1034,15 @@
 {
 //    kDebug(7006) << "_assignJobToSlave( " << job << ", " << slave << ")";
     QString dummy;
-    if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), \
                dummy ))
-        ||
-        (!newJobs.removeAll(job)))
+    ProtocolInfo *protInfo = protInfoDict.get(slave->protocol());
+    if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), \
dummy )) || +        !(protInfo->joblist.removeAll(job) > 0 || newJobs.removeAll(job) \
> 0))  {
         kDebug(7006) << "_assignJobToSlave(): ERROR, nonmatching or unknown job.";
         job->kill();
         return false;
     }
 
-    ProtocolInfo *protInfo = protInfoDict.get(slave->protocol());
     JobList *list = protInfo->coSlaves.value(slave);
     assert(list);
     if (!list)



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic