[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: [kdepim-runtime] agents/nepomukfeeder: Avoid using to much memory
From: Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date: 2011-11-30 23:04:40
Message-ID: 20111130230440.8F47FA60BE () git ! kde ! org
[Download RAW message or body]
Git commit 5dd4ff8e426367676d99b4a067c61b2ed4e5c059 by Christian Mollekopf.
Committed on 30/11/2011 at 03:27.
Pushed by cmollekopf into branch 'master'.
Avoid using to much memory and overloading nepomuk
The ItemQueue stores now id's only instead of full items, and only fetches a couple \
of items at a time. This ensures that not too much memory is used. To ease the load \
on nepomuk if a storejob times out, we insert a timeout.
M +114 -71 agents/nepomukfeeder/feederqueue.cpp
M +14 -6 agents/nepomukfeeder/feederqueue.h
http://commits.kde.org/kdepim-runtime/5dd4ff8e426367676d99b4a067c61b2ed4e5c059
diff --git a/agents/nepomukfeeder/feederqueue.cpp \
b/agents/nepomukfeeder/feederqueue.cpp index 30c3dab..ad3331f 100644
--- a/agents/nepomukfeeder/feederqueue.cpp
+++ b/agents/nepomukfeeder/feederqueue.cpp
@@ -20,6 +20,7 @@
#include <dms-copy/datamanagement.h>
#include <dms-copy/simpleresourcegraph.h>
+#include "dms-copy/simpleresource.h"
#include <Nepomuk/ResourceManager>
#include <Soprano/Model>
#include <nie.h>
@@ -42,8 +43,8 @@ FeederQueue::FeederQueue( QObject* parent )
mProcessedAmount( 0 ),
mPendingJobs( 0 ),
mReIndex( false ),
- lowPrioQueue(1, this),
- highPrioQueue(1, this)
+ lowPrioQueue(1, 100, this),
+ highPrioQueue(1, 100, this)
{
mProcessItemQueueTimer.setInterval( 0 );
mProcessItemQueueTimer.setSingleShot( true );
@@ -133,34 +134,19 @@ void FeederQueue::itemHeadersReceived( const \
Akonadi::Item::List& items ) }
if ( !itemsToUpdate.isEmpty() ) {
- ItemFetchJob *itemFetch = new ItemFetchJob( itemsToUpdate, this );
- itemFetch->setFetchScope( mItemFetchScope );
- connect( itemFetch, SIGNAL(itemsReceived(Akonadi::Item::List)), \
SLOT(itemsReceived(Akonadi::Item::List)) );
- connect( itemFetch, SIGNAL(result(KJob*)), SLOT(itemFetchResult(KJob*)) );
- ++mPendingJobs;
+ lowPrioQueue.addItems(itemsToUpdate);
mTotalAmount += itemsToUpdate.size();
+ mProcessItemQueueTimer.start();
}
}
-void FeederQueue::itemsReceived(const Akonadi::Item::List& items)
-{
- //kDebug() << items.size();
- foreach ( Item item, items ) {
- item.setParentCollection( mCurrentCollection );
- lowPrioQueue.addItem( item );
- }
- mProcessItemQueueTimer.start();
-}
-
void FeederQueue::itemFetchResult(KJob* job)
{
if ( job->error() )
kWarning() << job->errorString();
- ItemFetchJob *fetchJob = qobject_cast<ItemFetchJob*>(job);
- Q_UNUSED( fetchJob )
- //kDebug() << fetchJob->items().size();
+
--mPendingJobs;
- if ( mPendingJobs == 0 && lowPrioQueue.isEmpty() ) { //Fetch jobs finished but \
there are were no items in the collection + if ( mPendingJobs == 0 && \
lowPrioQueue.isEmpty() ) { //Fetch jobs finished but there were no items in the \
collection mCurrentCollection = Collection();
emit idle( i18n( "Indexing completed." ) );
//kDebug() << "Indexing completed.";
@@ -173,32 +159,10 @@ void FeederQueue::itemFetchResult(KJob* job)
void FeederQueue::addItem( const Akonadi::Item &item )
{
//kDebug() << item.id();
- if ( item.hasPayload() ) {
- highPrioQueue.addItem( item );
- mProcessItemQueueTimer.start();
- } else {
- if ( mItemFetchScope.fullPayload() || !mItemFetchScope.payloadParts().isEmpty() \
) {
- ItemFetchJob *job = new ItemFetchJob( item );
- job->setFetchScope( mItemFetchScope );
- connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ),
- SLOT( notificationItemsReceived( Akonadi::Item::List ) ) );
- }
- }
-}
-
-void FeederQueue::notificationItemsReceived(const Akonadi::Item::List& items)
-{
- //kDebug() << items.size();
- foreach ( const Item &item, items ) {
- if ( !item.hasPayload() ) {
- continue;
- }
- highPrioQueue.addItem( item );
- }
+ highPrioQueue.addItem( item );
mProcessItemQueueTimer.start();
}
-
bool FeederQueue::isEmpty()
{
return highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && \
mCollectionQueue.isEmpty(); @@ -234,7 +198,7 @@ void FeederQueue::processItemQueue()
emit idle( i18n( "Ready to index data." ) );
}
- if ( !highPrioQueue.isEmpty() || !lowPrioQueue.isEmpty() ) {
+ if ( !highPrioQueue.isEmpty() || ( !lowPrioQueue.isEmpty() && mOnline ) ) {
//kDebug() << "continue";
// go to eventloop before processing the next one, otherwise we miss the idle \
status change mProcessItemQueueTimer.start();
@@ -244,7 +208,7 @@ void FeederQueue::processItemQueue()
void FeederQueue::prioQueueFinished()
{
if (highPrioQueue.isEmpty() && lowPrioQueue.isEmpty() && (mPendingJobs == 0) && \
mCurrentCollection.isValid() ) {
- //kDebug() << "indexing completed";
+ kDebug() << "indexing of collection " << mCurrentCollection.id() << " \
completed"; mCurrentCollection = Collection();
emit idle( i18n( "Indexing completed." ) );
processNextCollection();
@@ -282,13 +246,17 @@ void FeederQueue::setItemFetchScope(ItemFetchScope scope)
-ItemQueue::ItemQueue(int batchSize, QObject* parent)
+ItemQueue::ItemQueue(int batchSize, int fetchSize, QObject* parent)
: QObject(parent),
mPendingRemoveDataJobs( 0 ),
+ mFetchSize(fetchSize),
mBatchSize(batchSize),
- block( false)
+ block(false)
{
-
+ if ( fetchSize < batchSize ) {
+ kWarning() << "fetchSize must be >= batchsize";
+ fetchSize = batchSize;
+ }
}
ItemQueue::~ItemQueue()
@@ -299,41 +267,94 @@ ItemQueue::~ItemQueue()
void ItemQueue::addItem(const Akonadi::Item &item)
{
- mItemPipeline.enqueue(item);
+ kDebug() << "pipline size: " << mItemPipeline.size();
+ mItemPipeline.enqueue(item.id()); //TODO if payload is available add directly to
+}
+
+void ItemQueue::addItems(const Akonadi::Item::List &list )
+{
+ foreach (const Akonadi::Item &item, list) {
+ addItem(item);
+ }
}
bool ItemQueue::processItem()
{
if (block) {//wait until the old graph has been saved
- //kDebug() << "blocked";
+ kDebug() << "blocked";
return false;
}
- //kDebug();
+ kDebug() << "------------------------procItem";
static bool processing = false; // guard against sub-eventloop reentrancy
if ( processing )
return false;
processing = true;
if ( !mItemPipeline.isEmpty() ) {
- const Akonadi::Item &item = mItemPipeline.dequeue();
- //kDebug() << item.id();
- Q_ASSERT(mBatch.size() == 0 ? mResourceGraph.isEmpty() : true); //otherwise we \
havent reached removeDataByApplication yet, and therfore mustn't overwrite \
mResourceGraph
- NepomukHelpers::addItemToGraph( item, mResourceGraph );
- mBatch.append(item.url());
+ mItemFetchList.append( Akonadi::Item( mItemPipeline.dequeue() ) );
}
processing = false;
- if ( mBatch.size() >= mBatchSize || mItemPipeline.isEmpty() ) {
- KJob *job = Nepomuk::removeDataByApplication( mBatch, \
Nepomuk::RemoveSubResoures, KGlobal::mainComponent() );
- connect( job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataResult( KJob* ) \
) );
- mBatch.clear();
- //kDebug() << "store";
+ if (mItemFetchList.size() >= mFetchSize || mItemPipeline.isEmpty() ) {
+ kDebug() << QString("Fetching %1 items").arg(mItemFetchList.size());
+ Akonadi::ItemFetchJob *job = new Akonadi::ItemFetchJob( mItemFetchList, 0 );
+ job->fetchScope().fetchFullPayload();
+ job->fetchScope().setCacheOnly( true );
+ job->setProperty("numberOfItems", mItemFetchList.size());
+ connect( job, SIGNAL( itemsReceived( Akonadi::Item::List ) ),
+ SLOT( itemsReceived( Akonadi::Item::List ) ) );
+ connect( job, SIGNAL(result(KJob*)), SLOT(fetchJobResult(KJob*)) );
+ mItemFetchList.clear();
block = true;
return false;
+ } else { //In case there is nothing in the itemFetchList, but still in the \
fetchedItemList + return processBatch();
}
return true;
}
+void ItemQueue::itemsReceived(const Akonadi::Item::List& items)
+{
+ Akonadi::ItemFetchJob *job = qobject_cast<Akonadi::ItemFetchJob*>(sender());
+ int numberOfItems = job->property("numberOfItems").toInt();
+ kDebug() << items.size();
+ mFetchedItemList.append(items);
+ if ( mFetchedItemList.size() >= numberOfItems ) { //Sometimes we get a partial \
delivery only, wait for the rest + processBatch();
+ }
+}
+
+void ItemQueue::fetchJobResult(KJob* job)
+{
+ if ( job->error() ) {
+ kWarning() << job->errorString();
+ block = false;
+ emit batchFinished();
+ }
+}
+
+bool ItemQueue::processBatch()
+{
+ int size = mFetchedItemList.size();
+ kDebug() << size;
+ for ( int i = 0; i < size && i < mBatchSize; i++ ) {
+ const Akonadi::Item &item = mFetchedItemList.takeFirst();
+ //kDebug() << item.id();
+ Q_ASSERT(item.hasPayload());
+ Q_ASSERT(mBatch.size() == 0 ? mResourceGraph.isEmpty() : true); //otherwise \
we havent reached removeDataByApplication yet, and therfore mustn't overwrite \
mResourceGraph + NepomukHelpers::addItemToGraph( item, mResourceGraph );
+ mBatch.append(item.url());
+ }
+ if ( mBatch.size() && ( mBatch.size() >= mBatchSize || mItemPipeline.isEmpty() ) \
) { + kDebug() << "process batch of " << mBatch.size() << " left: " << \
mFetchedItemList.size(); + KJob *job = Nepomuk::removeDataByApplication( \
mBatch, Nepomuk::RemoveSubResoures, KGlobal::mainComponent() ); + connect( \
job, SIGNAL( finished( KJob* ) ), this, SLOT( removeDataResult( KJob* ) ) ); + \
mBatch.clear(); + return false;
+ }
+ return true;
+}
+
void ItemQueue::removeDataResult(KJob* job)
{
if ( job->error() )
@@ -342,28 +363,50 @@ void ItemQueue::removeDataResult(KJob* job)
//All old items have been removed, so we can now store the new items
//kDebug() << "Saving Graph";
KJob *addGraphJob = NepomukHelpers::addGraphToNepomuk( mResourceGraph );
- connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( jobResult( KJob* ) ) );
-
+ connect( addGraphJob, SIGNAL( result( KJob* ) ), SLOT( batchJobResult( KJob* ) ) \
); + m_debugGraph = mResourceGraph;
mResourceGraph.clear();
//trigger processing of next collection as everything of this one has been stored
//kDebug() << "removing completed, saving complete, batch done==================";
}
-void ItemQueue::jobResult(KJob* job)
+void ItemQueue::batchJobResult(KJob* job)
{
- if ( job->error() )
+ kDebug() << "------------------------------------------";
+ kDebug() << "pipline size: " << mItemPipeline.size();
+ kDebug() << "fetchedItemList : " << mFetchedItemList.size();
+ Q_ASSERT(mBatch.isEmpty());
+ int timeout = 0;
+ if ( job->error() ) {
+ foreach( const Nepomuk::SimpleResource &res, m_debugGraph.toList() ) {
+ kWarning() << res;
+ }
kWarning() << job->errorString();
- block = false;
- emit batchFinished();
- if ( mItemPipeline.isEmpty() ) {
- //kDebug() << "indexing completed";
+ timeout = 30000; //Nepomuk is probably still working. Lets wait a bit and hope \
it has finished until the next batch arrives. + }
+ QTimer::singleShot(timeout, this, SLOT(continueProcessing()));
+}
+
+void ItemQueue::continueProcessing()
+{
+ if (processBatch()) { //Go back for more
+ kDebug() << "batch finished";
+ block = false;
+ emit batchFinished();
+ } else {
+ kDebug() << "there was more...";
+ return;
+ }
+ if ( mItemPipeline.isEmpty() && mFetchedItemList.isEmpty() ) {
+ kDebug() << "indexing completed";
emit finished();
}
}
+
bool ItemQueue::isEmpty()
{
- return mItemPipeline.isEmpty();
+ return mItemPipeline.isEmpty() && mFetchedItemList.isEmpty();
}
diff --git a/agents/nepomukfeeder/feederqueue.h b/agents/nepomukfeeder/feederqueue.h
index 6467620..a70a2c5 100644
--- a/agents/nepomukfeeder/feederqueue.h
+++ b/agents/nepomukfeeder/feederqueue.h
@@ -42,11 +42,12 @@ class SimpleResourceGraph;
class ItemQueue : public QObject {
Q_OBJECT
public:
- explicit ItemQueue(int batchSize, QObject* parent = 0);
+ explicit ItemQueue(int batchSize, int fetchSize, QObject* parent = 0);
~ItemQueue();
/** add item to queue */
void addItem(const Akonadi::Item &);
+ void addItems(const Akonadi::Item::List &);
/** process one item @return returns false if currently blocked */
bool processItem();
/** queue is empty */
@@ -59,16 +60,25 @@ signals:
private slots:
void removeDataResult( KJob* job );
- void jobResult( KJob* job );
+ void batchJobResult( KJob* job );
+ void fetchJobResult( KJob* job );
+ void itemsReceived( const Akonadi::Item::List &items );
+ void continueProcessing();
private:
+ bool processBatch();
+
int mPendingRemoveDataJobs, mBatchCounter;
- QQueue<Akonadi::Item> mItemPipeline;
+ QQueue<Akonadi::Item::Id> mItemPipeline;
Nepomuk::SimpleResourceGraph mResourceGraph;
+ Nepomuk::SimpleResourceGraph m_debugGraph;
QList<QUrl> mBatch;
+ Akonadi::Item::List mItemFetchList;
+ Akonadi::Item::List mFetchedItemList;
- int mBatchSize;
+ int mBatchSize; //Size of Nepomuk batch, number of items stored together in \
nepomuk + int mFetchSize; //Maximum number of items fetched with full payload \
(defines ram usage of feeder), must be >= mBatchSize, ideally a multiple of it bool \
block; };
@@ -127,8 +137,6 @@ signals:
private slots:
void itemHeadersReceived( const Akonadi::Item::List &items );
- void itemsReceived( const Akonadi::Item::List &items );
- void notificationItemsReceived( const Akonadi::Item::List &items );
void itemFetchResult( KJob* job );
void processItemQueue();
void prioQueueFinished();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic