Git commit 5dd4ff8e426367676d99b4a067c61b2ed4e5c059 by Christian Mollekopf. Committed on 30/11/2011 at 03:27. Pushed by cmollekopf into branch 'master'. Avoid using to much memory and overloading nepomuk The ItemQueue stores now id's only instead of full items, and only fetches = a couple of items at a time. This ensures that not too much memory is used. To ease the load on nepomuk = if a storejob times out, we insert a timeout. M +114 -71 agents/nepomukfeeder/feederqueue.cpp M +14 -6 agents/nepomukfeeder/feederqueue.h http://commits.kde.org/kdepim-runtime/5dd4ff8e426367676d99b4a067c61b2ed4e5c= 059 diff --git a/agents/nepomukfeeder/feederqueue.cpp b/agents/nepomukfeeder/fe= ederqueue.cpp index 30c3dab..ad3331f 100644 --- a/agents/nepomukfeeder/feederqueue.cpp +++ b/agents/nepomukfeeder/feederqueue.cpp @@ -20,6 +20,7 @@ = #include #include +#include "dms-copy/simpleresource.h" #include #include #include @@ -42,8 +43,8 @@ FeederQueue::FeederQueue( QObject* parent ) mProcessedAmount( 0 ), mPendingJobs( 0 ), mReIndex( false ), - lowPrioQueue(1, this), - highPrioQueue(1, this) + lowPrioQueue(1, 100, this), + highPrioQueue(1, 100, this) { mProcessItemQueueTimer.setInterval( 0 ); mProcessItemQueueTimer.setSingleShot( true ); @@ -133,34 +134,19 @@ void FeederQueue::itemHeadersReceived( const Akonadi:= :Item::List& items ) } = if ( !itemsToUpdate.isEmpty() ) { - ItemFetchJob *itemFetch =3D new ItemFetchJob( itemsToUpdate, this ); - itemFetch->setFetchScope( mItemFetchScope ); - connect( itemFetch, SIGNAL(itemsReceived(Akonadi::Item::List)), SLOT(i= temsReceived(Akonadi::Item::List)) ); - connect( itemFetch, SIGNAL(result(KJob*)), SLOT(itemFetchResult(KJob*)= ) ); - ++mPendingJobs; + lowPrioQueue.addItems(itemsToUpdate); mTotalAmount +=3D itemsToUpdate.size(); + mProcessItemQueueTimer.start(); } } = -void FeederQueue::itemsReceived(const Akonadi::Item::List& items) -{ - //kDebug() << items.size(); - foreach ( Item item, items ) { - item.setParentCollection( mCurrentCollection ); - lowPrioQueue.addItem( item ); - } - mProcessItemQueueTimer.start(); -} - void FeederQueue::itemFetchResult(KJob* job) { if ( job->error() ) kWarning() << job->errorString(); - ItemFetchJob *fetchJob =3D qobject_cast(job); - Q_UNUSED( fetchJob ) - //kDebug() << fetchJob->items().size(); + --mPendingJobs; - if ( mPendingJobs =3D=3D 0 && lowPrioQueue.isEmpty() ) { //Fetch jobs fi= nished but there are were no items in the collection + if ( mPendingJobs =3D=3D 0 && lowPrioQueue.isEmpty() ) { //Fetch jobs fi= nished but there were no items in the collection mCurrentCollection =3D Collection(); emit idle( i18n( "Indexing completed." ) ); //kDebug() << "Indexing completed."; @@ -173,32 +159,10 @@ void FeederQueue::itemFetchResult(KJob* job) void FeederQueue::addItem( const Akonadi::Item &item ) { //kDebug() << item.id(); - if ( item.hasPayload() ) { - highPrioQueue.addItem( item ); - mProcessItemQueueTimer.start(); - } else { - if ( mItemFetchScope.fullPayload() || !mItemFetchScope.payloadParts().= isEmpty() ) { - ItemFetchJob *job =3D new ItemFetchJob( item ); - job->setFetchScope( mItemFetchScope ); - connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ), - SLOT( notificationItemsReceived( Akonadi::Item::List ) ) ); - } - } -} - -void FeederQueue::notificationItemsReceived(const Akonadi::Item::List& ite= ms) -{ - //kDebug() << items.size(); - foreach ( const Item &item, items ) { - if ( !item.hasPayload() ) { - continue; - } - highPrioQueue.addItem( item ); - } + highPrioQueue.addItem( item ); mProcessItemQueueTimer.start(); } = - bool FeederQueue::isEmpty() { return highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && mCollectionQ= ueue.isEmpty(); @@ -234,7 +198,7 @@ void FeederQueue::processItemQueue() emit idle( i18n( "Ready to index data." ) ); } = - if ( !highPrioQueue.isEmpty() || !lowPrioQueue.isEmpty() ) { + if ( !highPrioQueue.isEmpty() || ( !lowPrioQueue.isEmpty() && mOnline ) = ) { //kDebug() << "continue"; // go to eventloop before processing the next one, otherwise we miss t= he idle status change mProcessItemQueueTimer.start(); @@ -244,7 +208,7 @@ void FeederQueue::processItemQueue() void FeederQueue::prioQueueFinished() { if (highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && (mPendingJobs = =3D=3D 0) && mCurrentCollection.isValid() ) { - //kDebug() << "indexing completed"; + kDebug() << "indexing of collection " << mCurrentCollection.id() << " = completed"; mCurrentCollection =3D Collection(); emit idle( i18n( "Indexing completed." ) ); processNextCollection(); @@ -282,13 +246,17 @@ void FeederQueue::setItemFetchScope(ItemFetchScope sc= ope) = = = -ItemQueue::ItemQueue(int batchSize, QObject* parent) +ItemQueue::ItemQueue(int batchSize, int fetchSize, QObject* parent) : QObject(parent), mPendingRemoveDataJobs( 0 ), + mFetchSize(fetchSize), mBatchSize(batchSize), = - block( false) + block(false) { - + if ( fetchSize < batchSize ) { + kWarning() << "fetchSize must be >=3D batchsize"; + fetchSize =3D batchSize; + } } = ItemQueue::~ItemQueue() @@ -299,41 +267,94 @@ ItemQueue::~ItemQueue() = void ItemQueue::addItem(const Akonadi::Item &item) { - mItemPipeline.enqueue(item); + kDebug() << "pipline size: " << mItemPipeline.size(); + mItemPipeline.enqueue(item.id()); //TODO if payload is available add dir= ectly to = +} + +void ItemQueue::addItems(const Akonadi::Item::List &list ) +{ + foreach (const Akonadi::Item &item, list) { + addItem(item); + } } = = bool ItemQueue::processItem() { if (block) {//wait until the old graph has been saved - //kDebug() << "blocked"; + kDebug() << "blocked"; return false; } - //kDebug(); + kDebug() << "------------------------procItem"; static bool processing =3D false; // guard against sub-eventloop reentra= ncy if ( processing ) return false; processing =3D true; if ( !mItemPipeline.isEmpty() ) { - const Akonadi::Item &item =3D mItemPipeline.dequeue(); - //kDebug() << item.id(); - Q_ASSERT(mBatch.size() =3D=3D 0 ? mResourceGraph.isEmpty() : true); //= otherwise we havent reached removeDataByApplication yet, and therfore mustn= 't overwrite mResourceGraph - NepomukHelpers::addItemToGraph( item, mResourceGraph ); - mBatch.append(item.url()); + mItemFetchList.append( Akonadi::Item( mItemPipeline.dequeue() ) ); } processing =3D false; = - if ( mBatch.size() >=3D mBatchSize || mItemPipeline.isEmpty() ) { - KJob *job =3D Nepomuk::removeDataByApplication( mBatch, Nepomuk::Remov= eSubResoures, KGlobal::mainComponent() ); - connect( job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataResul= t( KJob* ) ) ); - mBatch.clear(); - //kDebug() << "store"; + if (mItemFetchList.size() >=3D mFetchSize || mItemPipeline.isEmpty() ) { + kDebug() << QString("Fetching %1 items").arg(mItemFetchList.size()); + Akonadi::ItemFetchJob *job =3D new Akonadi::ItemFetchJob( mItemFetchLi= st, 0 ); + job->fetchScope().fetchFullPayload(); + job->fetchScope().setCacheOnly( true ); + job->setProperty("numberOfItems", mItemFetchList.size()); + connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ), + SLOT( itemsReceived( Akonadi::Item::List ) ) ); + connect( job, SIGNAL(result(KJob*)), SLOT(fetchJobResult(KJob*)) ); + mItemFetchList.clear(); block =3D true; return false; + } else { //In case there is nothing in the itemFetchList, but still in t= he fetchedItemList + return processBatch(); } return true; } = +void ItemQueue::itemsReceived(const Akonadi::Item::List& items) +{ + Akonadi::ItemFetchJob *job =3D qobject_cast(se= nder()); + int numberOfItems =3D job->property("numberOfItems").toInt(); + kDebug() << items.size(); + mFetchedItemList.append(items); + if ( mFetchedItemList.size() >=3D numberOfItems ) { //Sometimes we get= a partial delivery only, wait for the rest + processBatch(); + } +} + +void ItemQueue::fetchJobResult(KJob* job) +{ + if ( job->error() ) { + kWarning() << job->errorString(); + block =3D false; + emit batchFinished(); + } +} + +bool ItemQueue::processBatch() +{ + int size =3D mFetchedItemList.size(); + kDebug() << size; + for ( int i =3D 0; i < size && i < mBatchSize; i++ ) { + const Akonadi::Item &item =3D mFetchedItemList.takeFirst(); + //kDebug() << item.id(); + Q_ASSERT(item.hasPayload()); + Q_ASSERT(mBatch.size() =3D=3D 0 ? mResourceGraph.isEmpty() : true)= ; //otherwise we havent reached removeDataByApplication yet, and therfore m= ustn't overwrite mResourceGraph + NepomukHelpers::addItemToGraph( item, mResourceGraph ); + mBatch.append(item.url()); + } + if ( mBatch.size() && ( mBatch.size() >=3D mBatchSize || mItemPipeline= .isEmpty() ) ) { + kDebug() << "process batch of " << mBatch.size() << " left: "= << mFetchedItemList.size(); + KJob *job =3D Nepomuk::removeDataByApplication( mBatch, Nepomuk::R= emoveSubResoures, KGlobal::mainComponent() ); + connect( job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataR= esult( KJob* ) ) ); + mBatch.clear(); + return false; + } + return true; +} + void ItemQueue::removeDataResult(KJob* job) { if ( job->error() ) @@ -342,28 +363,50 @@ void ItemQueue::removeDataResult(KJob* job) //All old items have been removed, so we can now store the new items //kDebug() << "Saving Graph"; KJob *addGraphJob =3D NepomukHelpers::addGraphToNepomuk( mResourceGraph = ); - connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( jobResult( KJob* = ) ) ); - + connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( batchJobResult( K= Job* ) ) ); + m_debugGraph =3D mResourceGraph; mResourceGraph.clear(); //trigger processing of next collection as everything of this one has be= en stored //kDebug() << "removing completed, saving complete, batch done=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D"; } = -void ItemQueue::jobResult(KJob* job) +void ItemQueue::batchJobResult(KJob* job) { - if ( job->error() ) + kDebug() << "------------------------------------------"; + kDebug() << "pipline size: " << mItemPipeline.size(); + kDebug() << "fetchedItemList : " << mFetchedItemList.size(); + Q_ASSERT(mBatch.isEmpty()); + int timeout =3D 0; + if ( job->error() ) { + foreach( const Nepomuk::SimpleResource &res, m_debugGraph.toList() ) { + kWarning() << res; + } kWarning() << job->errorString(); - block =3D false; - emit batchFinished(); - if ( mItemPipeline.isEmpty() ) { - //kDebug() << "indexing completed"; + timeout =3D 30000; //Nepomuk is probably still working. Lets wait a bi= t and hope it has finished until the next batch arrives. + } + QTimer::singleShot(timeout, this, SLOT(continueProcessing())); +} + +void ItemQueue::continueProcessing() +{ + if (processBatch()) { //Go back for more + kDebug() << "batch finished"; + block =3D false; + emit batchFinished(); + } else { + kDebug() << "there was more..."; + return; + } + if ( mItemPipeline.isEmpty() && mFetchedItemList.isEmpty() ) { + kDebug() << "indexing completed"; emit finished(); } } = + bool ItemQueue::isEmpty() { - return mItemPipeline.isEmpty(); + return mItemPipeline.isEmpty() && mFetchedItemList.isEmpty(); } = = diff --git a/agents/nepomukfeeder/feederqueue.h b/agents/nepomukfeeder/feed= erqueue.h index 6467620..a70a2c5 100644 --- a/agents/nepomukfeeder/feederqueue.h +++ b/agents/nepomukfeeder/feederqueue.h @@ -42,11 +42,12 @@ class SimpleResourceGraph; class ItemQueue : public QObject { Q_OBJECT public: - explicit ItemQueue(int batchSize, QObject* parent =3D 0); + explicit ItemQueue(int batchSize, int fetchSize, QObject* parent =3D 0= ); ~ItemQueue(); = /** add item to queue */ void addItem(const Akonadi::Item &); + void addItems(const Akonadi::Item::List &); /** process one item @return returns false if currently blocked */ bool processItem(); /** queue is empty */ @@ -59,16 +60,25 @@ signals: = private slots: void removeDataResult( KJob* job ); - void jobResult( KJob* job ); + void batchJobResult( KJob* job ); + void fetchJobResult( KJob* job ); + void itemsReceived( const Akonadi::Item::List &items ); + void continueProcessing(); = private: + bool processBatch(); + int mPendingRemoveDataJobs, mBatchCounter; = - QQueue mItemPipeline; + QQueue mItemPipeline; Nepomuk::SimpleResourceGraph mResourceGraph; + Nepomuk::SimpleResourceGraph m_debugGraph; QList mBatch; + Akonadi::Item::List mItemFetchList; + Akonadi::Item::List mFetchedItemList; = - int mBatchSize; + int mBatchSize; //Size of Nepomuk batch, number of items stored together= in nepomuk + int mFetchSize; //Maximum number of items fetched with full payload (def= ines ram usage of feeder), must be >=3D mBatchSize, ideally a multiple of it bool block; }; = @@ -127,8 +137,6 @@ signals: = private slots: void itemHeadersReceived( const Akonadi::Item::List &items ); - void itemsReceived( const Akonadi::Item::List &items ); - void notificationItemsReceived( const Akonadi::Item::List &items ); void itemFetchResult( KJob* job ); void processItemQueue(); void prioQueueFinished();