[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    [kdepim-runtime] agents/nepomukfeeder: Avoid using to much memory
From:       Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date:       2011-11-30 23:04:40
Message-ID: 20111130230440.8F47FA60BE () git ! kde ! org
[Download RAW message or body]

Git commit 5dd4ff8e426367676d99b4a067c61b2ed4e5c059 by Christian Mollekopf.
Committed on 30/11/2011 at 03:27.
Pushed by cmollekopf into branch 'master'.

Avoid using to much memory and overloading nepomuk

The ItemQueue stores now id's only instead of full items, and only fetches a couple \
of items at a time. This ensures that not too much memory is used. To ease the load \
on nepomuk if a storejob times out, we insert a timeout.

M  +114  -71   agents/nepomukfeeder/feederqueue.cpp
M  +14   -6    agents/nepomukfeeder/feederqueue.h

http://commits.kde.org/kdepim-runtime/5dd4ff8e426367676d99b4a067c61b2ed4e5c059

diff --git a/agents/nepomukfeeder/feederqueue.cpp \
b/agents/nepomukfeeder/feederqueue.cpp index 30c3dab..ad3331f 100644
--- a/agents/nepomukfeeder/feederqueue.cpp
+++ b/agents/nepomukfeeder/feederqueue.cpp
@@ -20,6 +20,7 @@
 
 #include <dms-copy/datamanagement.h>
 #include <dms-copy/simpleresourcegraph.h>
+#include "dms-copy/simpleresource.h"
 #include <Nepomuk/ResourceManager>
 #include <Soprano/Model>
 #include <nie.h>
@@ -42,8 +43,8 @@ FeederQueue::FeederQueue( QObject* parent )
   mProcessedAmount( 0 ),
   mPendingJobs( 0 ),
   mReIndex( false ),
-  lowPrioQueue(1, this),
-  highPrioQueue(1, this)
+  lowPrioQueue(1, 100, this),
+  highPrioQueue(1, 100, this)
 {
   mProcessItemQueueTimer.setInterval( 0 );
   mProcessItemQueueTimer.setSingleShot( true );
@@ -133,34 +134,19 @@ void FeederQueue::itemHeadersReceived( const \
Akonadi::Item::List& items )  }
 
   if ( !itemsToUpdate.isEmpty() ) {
-    ItemFetchJob *itemFetch = new ItemFetchJob( itemsToUpdate, this );
-    itemFetch->setFetchScope( mItemFetchScope );
-    connect( itemFetch, SIGNAL(itemsReceived(Akonadi::Item::List)), \
                SLOT(itemsReceived(Akonadi::Item::List)) );
-    connect( itemFetch, SIGNAL(result(KJob*)), SLOT(itemFetchResult(KJob*)) );
-    ++mPendingJobs;
+    lowPrioQueue.addItems(itemsToUpdate);
     mTotalAmount += itemsToUpdate.size();
+    mProcessItemQueueTimer.start();
   }
 }
 
-void FeederQueue::itemsReceived(const Akonadi::Item::List& items)
-{
-  //kDebug() << items.size();
-  foreach ( Item item, items ) {
-    item.setParentCollection( mCurrentCollection );
-    lowPrioQueue.addItem( item );
-  }
-  mProcessItemQueueTimer.start();
-}
-
 void FeederQueue::itemFetchResult(KJob* job)
 {
   if ( job->error() )
     kWarning() << job->errorString();
-  ItemFetchJob *fetchJob = qobject_cast<ItemFetchJob*>(job);
-  Q_UNUSED( fetchJob )
-  //kDebug() << fetchJob->items().size();
+
   --mPendingJobs;
-  if ( mPendingJobs == 0 && lowPrioQueue.isEmpty() ) { //Fetch jobs finished but \
there are were no items in the collection +  if ( mPendingJobs == 0 && \
lowPrioQueue.isEmpty() ) { //Fetch jobs finished but there were no items in the \
collection  mCurrentCollection = Collection();
     emit idle( i18n( "Indexing completed." ) );
     //kDebug() << "Indexing completed.";
@@ -173,32 +159,10 @@ void FeederQueue::itemFetchResult(KJob* job)
 void FeederQueue::addItem( const Akonadi::Item &item )
 {
   //kDebug() << item.id();
-  if ( item.hasPayload() ) {
-    highPrioQueue.addItem( item );
-    mProcessItemQueueTimer.start();
-  } else {
-    if ( mItemFetchScope.fullPayload() || !mItemFetchScope.payloadParts().isEmpty() \
                ) {
-      ItemFetchJob *job = new ItemFetchJob( item );
-      job->setFetchScope( mItemFetchScope );
-      connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ),
-               SLOT( notificationItemsReceived( Akonadi::Item::List ) ) );
-    }
-  }
-}
-
-void FeederQueue::notificationItemsReceived(const Akonadi::Item::List& items)
-{
-  //kDebug() << items.size();
-  foreach ( const Item &item, items ) {
-    if ( !item.hasPayload() ) {
-      continue;
-    }
-    highPrioQueue.addItem( item );
-  }
+  highPrioQueue.addItem( item );
   mProcessItemQueueTimer.start();
 }
 
-
 bool FeederQueue::isEmpty()
 {
   return highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && \
mCollectionQueue.isEmpty(); @@ -234,7 +198,7 @@ void FeederQueue::processItemQueue()
     emit idle( i18n( "Ready to index data." ) );
   }
 
-  if ( !highPrioQueue.isEmpty() || !lowPrioQueue.isEmpty() ) {
+  if ( !highPrioQueue.isEmpty() || ( !lowPrioQueue.isEmpty() && mOnline ) ) {
     //kDebug() << "continue";
     // go to eventloop before processing the next one, otherwise we miss the idle \
status change  mProcessItemQueueTimer.start();
@@ -244,7 +208,7 @@ void FeederQueue::processItemQueue()
 void FeederQueue::prioQueueFinished()
 {
   if (highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && (mPendingJobs == 0) && \
                mCurrentCollection.isValid() ) {
-    //kDebug() << "indexing completed";
+    kDebug() << "indexing of collection " << mCurrentCollection.id() << " \
completed";  mCurrentCollection = Collection();
     emit idle( i18n( "Indexing completed." ) );
     processNextCollection();
@@ -282,13 +246,17 @@ void FeederQueue::setItemFetchScope(ItemFetchScope scope)
 
 
 
-ItemQueue::ItemQueue(int batchSize, QObject* parent)
+ItemQueue::ItemQueue(int batchSize, int fetchSize, QObject* parent)
 : QObject(parent),
   mPendingRemoveDataJobs( 0 ),
+  mFetchSize(fetchSize),
   mBatchSize(batchSize), 
-  block( false)
+  block(false)
 {
-
+  if ( fetchSize < batchSize )  {
+    kWarning() << "fetchSize must be >= batchsize";
+    fetchSize = batchSize;
+  }
 }
 
 ItemQueue::~ItemQueue()
@@ -299,41 +267,94 @@ ItemQueue::~ItemQueue()
 
 void ItemQueue::addItem(const Akonadi::Item &item)
 {
-  mItemPipeline.enqueue(item);
+  kDebug() << "pipline size: " << mItemPipeline.size();
+  mItemPipeline.enqueue(item.id()); //TODO if payload is available add directly to 
+}
+
+void ItemQueue::addItems(const Akonadi::Item::List &list )
+{
+  foreach (const Akonadi::Item &item, list) {
+    addItem(item);
+  }
 }
 
 
 bool ItemQueue::processItem()
 {
   if (block) {//wait until the old graph has been saved
-    //kDebug() << "blocked";
+    kDebug() << "blocked";
     return false;
   }
-  //kDebug();
+  kDebug() << "------------------------procItem";
   static bool processing = false; // guard against sub-eventloop reentrancy
   if ( processing )
     return false;
   processing = true;
   if ( !mItemPipeline.isEmpty() ) {
-    const Akonadi::Item &item = mItemPipeline.dequeue();
-    //kDebug() << item.id();
-    Q_ASSERT(mBatch.size() == 0 ? mResourceGraph.isEmpty() : true); //otherwise we \
havent reached removeDataByApplication yet, and therfore mustn't overwrite \
                mResourceGraph
-    NepomukHelpers::addItemToGraph( item, mResourceGraph );
-    mBatch.append(item.url());
+    mItemFetchList.append( Akonadi::Item( mItemPipeline.dequeue() ) );
   }
   processing = false;
   
-  if ( mBatch.size() >= mBatchSize || mItemPipeline.isEmpty() ) {
-    KJob *job = Nepomuk::removeDataByApplication( mBatch, \
                Nepomuk::RemoveSubResoures, KGlobal::mainComponent() );
-    connect( job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataResult( KJob* ) \
                ) );
-    mBatch.clear();
-    //kDebug() << "store";
+  if (mItemFetchList.size() >= mFetchSize || mItemPipeline.isEmpty() ) {
+    kDebug() << QString("Fetching %1 items").arg(mItemFetchList.size());
+    Akonadi::ItemFetchJob *job = new Akonadi::ItemFetchJob( mItemFetchList, 0 );
+    job->fetchScope().fetchFullPayload();
+    job->fetchScope().setCacheOnly( true );
+    job->setProperty("numberOfItems", mItemFetchList.size());
+    connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ),
+            SLOT( itemsReceived( Akonadi::Item::List ) ) );
+    connect( job, SIGNAL(result(KJob*)), SLOT(fetchJobResult(KJob*)) );
+    mItemFetchList.clear();
     block = true;
     return false;
+  } else { //In case there is nothing in the itemFetchList, but still in the \
fetchedItemList +    return processBatch();
   }
   return true;
 }
 
+void ItemQueue::itemsReceived(const Akonadi::Item::List& items)
+{
+    Akonadi::ItemFetchJob *job = qobject_cast<Akonadi::ItemFetchJob*>(sender());
+    int numberOfItems = job->property("numberOfItems").toInt();
+    kDebug() << items.size();
+    mFetchedItemList.append(items);
+    if ( mFetchedItemList.size() >= numberOfItems ) { //Sometimes we get a partial \
delivery only, wait for the rest +        processBatch();
+    }
+}
+
+void ItemQueue::fetchJobResult(KJob* job)
+{
+  if ( job->error() ) {
+    kWarning() << job->errorString();
+    block = false;
+    emit batchFinished();
+  }
+}
+
+bool ItemQueue::processBatch()
+{
+    int size = mFetchedItemList.size();
+    kDebug() << size;
+    for ( int i = 0; i < size && i < mBatchSize; i++ ) {
+        const Akonadi::Item &item = mFetchedItemList.takeFirst();
+        //kDebug() << item.id();
+        Q_ASSERT(item.hasPayload());
+        Q_ASSERT(mBatch.size() == 0 ? mResourceGraph.isEmpty() : true); //otherwise \
we havent reached removeDataByApplication yet, and therfore mustn't overwrite \
mResourceGraph +        NepomukHelpers::addItemToGraph( item, mResourceGraph );
+        mBatch.append(item.url());
+    }
+    if ( mBatch.size() && ( mBatch.size() >= mBatchSize || mItemPipeline.isEmpty() ) \
) { +        kDebug() << "process batch of " << mBatch.size() << "      left: " << \
mFetchedItemList.size(); +        KJob *job = Nepomuk::removeDataByApplication( \
mBatch, Nepomuk::RemoveSubResoures, KGlobal::mainComponent() ); +        connect( \
job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataResult( KJob* ) ) ); +        \
mBatch.clear(); +        return false;
+    }
+    return true;
+}
+
 void ItemQueue::removeDataResult(KJob* job)
 {
   if ( job->error() )
@@ -342,28 +363,50 @@ void ItemQueue::removeDataResult(KJob* job)
   //All old items have been removed, so we can now store the new items
   //kDebug() << "Saving Graph";
   KJob *addGraphJob = NepomukHelpers::addGraphToNepomuk( mResourceGraph );
-  connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( jobResult( KJob* ) ) );
-
+  connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( batchJobResult( KJob* ) ) \
); +  m_debugGraph = mResourceGraph;
   mResourceGraph.clear();
   //trigger processing of next collection as everything of this one has been stored
   //kDebug() << "removing completed, saving complete, batch done==================";
 }
 
-void ItemQueue::jobResult(KJob* job)
+void ItemQueue::batchJobResult(KJob* job)
 {
-  if ( job->error() )
+  kDebug() << "------------------------------------------";
+  kDebug() << "pipline size: " << mItemPipeline.size();
+  kDebug() << "fetchedItemList : " << mFetchedItemList.size();
+  Q_ASSERT(mBatch.isEmpty());
+  int timeout = 0;
+  if ( job->error() ) {
+    foreach( const Nepomuk::SimpleResource &res, m_debugGraph.toList() ) {
+        kWarning() << res;
+    }
     kWarning() << job->errorString();
-  block = false;
-  emit batchFinished();
-  if ( mItemPipeline.isEmpty() ) {
-    //kDebug() << "indexing completed";
+    timeout = 30000; //Nepomuk is probably still working. Lets wait a bit and hope \
it has finished until the next batch arrives. +  }
+  QTimer::singleShot(timeout, this, SLOT(continueProcessing()));
+}
+
+void ItemQueue::continueProcessing()
+{
+  if (processBatch()) { //Go back for more
+    kDebug() << "batch finished";
+    block = false;
+    emit batchFinished();
+  } else {
+      kDebug() << "there was more...";
+      return;
+  }
+  if ( mItemPipeline.isEmpty() && mFetchedItemList.isEmpty() ) {
+    kDebug() << "indexing completed";
     emit finished();
   }
 }
 
+
 bool ItemQueue::isEmpty()
 {
-  return mItemPipeline.isEmpty();
+  return mItemPipeline.isEmpty() && mFetchedItemList.isEmpty();
 }
 
 
diff --git a/agents/nepomukfeeder/feederqueue.h b/agents/nepomukfeeder/feederqueue.h
index 6467620..a70a2c5 100644
--- a/agents/nepomukfeeder/feederqueue.h
+++ b/agents/nepomukfeeder/feederqueue.h
@@ -42,11 +42,12 @@ class SimpleResourceGraph;
 class ItemQueue : public QObject {
   Q_OBJECT
 public:
-    explicit ItemQueue(int batchSize, QObject* parent = 0);
+    explicit ItemQueue(int batchSize, int fetchSize, QObject* parent = 0);
     ~ItemQueue();
   
   /** add item to queue */
   void addItem(const Akonadi::Item &);
+  void addItems(const Akonadi::Item::List &);
   /** process one item @return returns false if currently blocked */
   bool processItem();
   /** queue is empty */
@@ -59,16 +60,25 @@ signals:
   
 private slots:
   void removeDataResult( KJob* job );
-  void jobResult( KJob* job );
+  void batchJobResult( KJob* job );
+  void fetchJobResult( KJob* job );
+  void itemsReceived( const Akonadi::Item::List &items );
+  void continueProcessing();
   
 private:
+  bool processBatch();
+
   int mPendingRemoveDataJobs, mBatchCounter;
 
-  QQueue<Akonadi::Item> mItemPipeline;
+  QQueue<Akonadi::Item::Id> mItemPipeline;
   Nepomuk::SimpleResourceGraph mResourceGraph;
+  Nepomuk::SimpleResourceGraph m_debugGraph;
   QList<QUrl> mBatch;
+  Akonadi::Item::List mItemFetchList;
+  Akonadi::Item::List mFetchedItemList;
 
-  int mBatchSize;
+  int mBatchSize; //Size of Nepomuk batch, number of items stored together in \
nepomuk +  int mFetchSize; //Maximum number of items fetched with full payload \
(defines ram usage of feeder), must be >= mBatchSize, ideally a multiple of it  bool \
block;  };
 
@@ -127,8 +137,6 @@ signals:
   
 private slots:
   void itemHeadersReceived( const Akonadi::Item::List &items );
-  void itemsReceived( const Akonadi::Item::List &items );
-  void notificationItemsReceived( const Akonadi::Item::List &items );
   void itemFetchResult( KJob* job );
   void processItemQueue();
   void prioQueueFinished();


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic