[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    [kdepimlibs/frameworks] akonadi/src: Merge remote-tracking branch 'origin/master' into frameworks
From:       Montel Laurent <montel () kde ! org>
Date:       2014-04-17 5:20:18
Message-ID: E1WaekE-0000AB-RL () scm ! kde ! org
[Download RAW message or body]

Git commit e7ddab6af088aac3cfb8e92a6781c45ef1377fa4 by Montel Laurent.
Committed on 17/04/2014 at 05:19.
Pushed by mlaurent into branch 'frameworks'.

Merge remote-tracking branch 'origin/master' into frameworks

Conflicts:
	akonadi/src/core/itemsync.cpp

A  +16   -0    akonadi/src/agentbase/resourcebase.cpp     [License: LGPL (v2+)]
A  +37   -1    akonadi/src/agentbase/resourcebase.h     [License: LGPL (v2+)]
A  +264  -143  akonadi/src/core/itemsync.cpp     [License: LGPL (v2+)]
A  +31   -0    akonadi/src/core/itemsync.h     [License: LGPL (v2+)]

http://commits.kde.org/kdepimlibs/e7ddab6af088aac3cfb8e92a6781c45ef1377fa4

diff --cc akonadi/src/agentbase/resourcebase.cpp
index a2a8bd4,0000000..3dcb088
mode 100644,000000..100644
--- a/akonadi/src/agentbase/resourcebase.cpp
+++ b/akonadi/src/agentbase/resourcebase.cpp
@@@ -1,1246 -1,0 +1,1262 @@@
 +/*
 +    Copyright (c) 2006 Till Adam <adam@kde.org>
 +    Copyright (c) 2007 Volker Krause <vkrause@kde.org>
 +
 +    This library is free software; you can redistribute it and/or modify it
 +    under the terms of the GNU Library General Public License as published by
 +    the Free Software Foundation; either version 2 of the License, or (at your
 +    option) any later version.
 +
 +    This library is distributed in the hope that it will be useful, but WITHOUT
 +    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 +    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
 +    License for more details.
 +
 +    You should have received a copy of the GNU Library General Public License
 +    along with this library; see the file COPYING.LIB.  If not, write to the
 +    Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 +    02110-1301, USA.
 +*/
 +
 +#include "resourcebase.h"
 +#include "agentbase_p.h"
 +
 +#include "resourceadaptor.h"
 +#include "collectiondeletejob.h"
 +#include "collectionsync_p.h"
 +#include "dbusconnectionpool.h"
 +#include "itemsync.h"
 +#include "akonadi_version.h"
 +#include "resourcescheduler_p.h"
 +#include "tracerinterface.h"
 +#include <akonadi/private/xdgbasedirs_p.h>
 +
 +#include "changerecorder.h"
 +#include "collectionfetchjob.h"
 +#include "collectionfetchscope.h"
 +#include "collectionmodifyjob.h"
 +#include "invalidatecachejob_p.h"
 +#include "itemfetchjob.h"
 +#include "itemfetchscope.h"
 +#include "itemmodifyjob.h"
 +#include "itemmodifyjob_p.h"
 +#include "session.h"
 +#include "resourceselectjob_p.h"
 +#include "monitor_p.h"
 +#include "servermanager_p.h"
 +#include "recursivemover_p.h"
 +#include "tagmodifyjob.h"
 +
 +#include <kaboutdata.h>
 +#include <kcmdlineargs.h>
 +#include <qdebug.h>
 +#include <klocalizedstring.h>
 +#include <kglobal.h>
 +
 +#include <QtCore/QDebug>
 +#include <QtCore/QDir>
 +#include <QtCore/QHash>
 +#include <QtCore/QSettings>
 +#include <QtCore/QTimer>
 +#include <QApplication>
 +#include <QtDBus/QtDBus>
 +
 +using namespace Akonadi;
 +
 +class Akonadi::ResourceBasePrivate : public AgentBasePrivate
 +{
 +    Q_OBJECT
 +    Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
 +
 +public:
 +    ResourceBasePrivate(ResourceBase *parent)
 +        : AgentBasePrivate(parent)
 +        , scheduler(0)
 +        , mItemSyncer(0)
 +        , mItemSyncFetchScope(0)
 +        , mItemTransactionMode(ItemSync::SingleTransaction)
 +        , mCollectionSyncer(0)
 +        , mHierarchicalRid(false)
 +        , mUnemittedProgress(0)
 +        , mAutomaticProgressReporting(true)
++        , mItemSyncBatchSize(10)
 +    {
 +        Internal::setClientType(Internal::Resource);
 +        mStatusMessage = defaultReadyMessage();
 +        mProgressEmissionCompressor.setInterval(1000);
 +        mProgressEmissionCompressor.setSingleShot(true);
 +    }
 +
 +    ~ResourceBasePrivate()
 +    {
 +        delete mItemSyncFetchScope;
 +    }
 +
 +    Q_DECLARE_PUBLIC(ResourceBase)
 +
 +    void delayedInit()
 +    {
 +        const QString serviceId = \
ServerManager::agentServiceName(ServerManager::Resource, mId);  +        if \
(!DBusConnectionPool::threadConnection().registerService(serviceId)) {  +            \
QString reason = DBusConnectionPool::threadConnection().lastError().message();  +     \
if (reason.isEmpty()) {  +                reason = QString::fromLatin1("this service \
is probably running already.");  +            }
 +            qCritical() << "Unable to register service" << serviceId << "at D-Bus:" \
<< reason;  +
 +            if (QThread::currentThread() == QCoreApplication::instance()->thread()) \
{  +                QCoreApplication::instance()->exit(1);
 +            }
 +
 +        } else {
 +            AgentBasePrivate::delayedInit();
 +        }
 +    }
 +
 +    virtual void changeProcessed()
 +    {
 +        if (m_recursiveMover) {
 +            m_recursiveMover->changeProcessed();
 +            QTimer::singleShot(0, m_recursiveMover, SLOT(replayNext()));
 +            return;
 +        }
 +
 +        mChangeRecorder->changeProcessed();
 +        if (!mChangeRecorder->isEmpty()) {
 +            scheduler->scheduleChangeReplay();
 +        }
 +        scheduler->taskDone();
 +    }
 +
 +    void slotAbortRequested();
 +
 +    void slotDeliveryDone(KJob *job);
 +    void slotCollectionSyncDone(KJob *job);
 +    void slotLocalListDone(KJob *job);
 +    void slotSynchronizeCollection(const Collection &col);
 +    void slotCollectionListDone(KJob *job);
 +    void slotSynchronizeCollectionAttributes(const Collection &col);
 +    void slotCollectionListForAttributesDone(KJob *job);
 +    void slotCollectionAttributesSyncDone(KJob *job);
 +
 +    void slotItemSyncDone(KJob *job);
 +
 +    void slotPercent(KJob *job, unsigned long percent);
 +    void slotDelayedEmitProgress();
 +    void slotDeleteResourceCollection();
 +    void slotDeleteResourceCollectionDone(KJob *job);
 +    void slotCollectionDeletionDone(KJob *job);
 +
 +    void slotInvalidateCache(const Akonadi::Collection &collection);
 +
 +    void slotPrepareItemRetrieval(const Akonadi::Item &item);
 +    void slotPrepareItemRetrievalResult(KJob *job);
 +
 +    void changeCommittedResult(KJob *job);
 +
 +    void slotRecursiveMoveReplay(RecursiveMover *mover);
 +    void slotRecursiveMoveReplayResult(KJob *job);
 +
 +    void slotSessionReconnected()
 +    {
 +        Q_Q(ResourceBase);
 +
 +        new ResourceSelectJob(q->identifier());
 +    }
 +
 +    void createItemSyncInstanceIfMissing()
 +    {
 +        Q_Q(ResourceBase);
 +        Q_ASSERT_X(scheduler->currentTask().type == \
ResourceScheduler::SyncCollection,  +                   "createItemSyncInstance", \
"Calling items retrieval methods although no item retrieval is in progress");  +      \
if (!mItemSyncer) {  +            mItemSyncer = new ItemSync(q->currentCollection());
 +            mItemSyncer->setTransactionMode(mItemTransactionMode);
++            mItemSyncer->setBatchSize(mItemSyncBatchSize);
 +            if (mItemSyncFetchScope) {
 +                mItemSyncer->setFetchScope(*mItemSyncFetchScope);
 +            }
 +            mItemSyncer->setProperty("collection", \
QVariant::fromValue(q->currentCollection()));  +            connect(mItemSyncer, \
SIGNAL(percent(KJob*,ulong)), q, SLOT(slotPercent(KJob*,ulong)));  +            \
connect(mItemSyncer, SIGNAL(result(KJob*)), q, SLOT(slotItemSyncDone(KJob*))); ++     \
connect(mItemSyncer, SIGNAL(readyForNextBatch(int)), q, \
SIGNAL(retrieveNextItemSyncBatch(int)));  +        }
 +        Q_ASSERT(mItemSyncer);
 +    }
 +
 +public Q_SLOTS:
 +    // Dump the state of the scheduler
 +    Q_SCRIPTABLE QString dumpToString() const
 +    {
 +        Q_Q(const ResourceBase);
 +        QString retVal;
 +        QMetaObject::invokeMethod(const_cast<ResourceBase *>(q), \
"dumpResourceToString", Qt::DirectConnection, Q_RETURN_ARG(QString, retVal));  +      \
return scheduler->dumpToString() + QLatin1Char('\n') + retVal;  +    }
 +
 +    Q_SCRIPTABLE void dump()
 +    {
 +        scheduler->dump();
 +    }
 +
 +    Q_SCRIPTABLE void clear()
 +    {
 +        scheduler->clear();
 +    }
 +
 +protected Q_SLOTS:
 +    // reimplementations from AgentbBasePrivate, containing sanity checks that only \
apply to resources  +    // such as making sure that RIDs are present as well as \
translations of cross-resource moves  +    // TODO: we could possibly add recovery \
code for no-RID notifications by re-enquing those to the change recorder  +    // as \
the corresponding Add notifications, although that contains a risk of endless \
fail/retry loops  +
 +    void itemAdded(const Akonadi::Item &item, const Akonadi::Collection \
&collection)  +    {
 +        if (collection.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::itemAdded(item, collection);
 +    }
 +
 +    void itemChanged(const Akonadi::Item &item, const QSet< QByteArray > \
&partIdentifiers)  +    {
 +        if (item.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::itemChanged(item, partIdentifiers);
 +    }
 +
 +    void itemsFlagsChanged(const Item::List &items, const QSet< QByteArray > \
&addedFlags,  +                           const QSet< QByteArray > &removedFlags)
 +    {
 +        if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        Item::List validItems;
 +        foreach (const Akonadi::Item &item, items) {
 +            if (!item.remoteId().isEmpty()) {
 +                validItems << item;
 +            }
 +        }
 +        if (validItems.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
 +    }
 +
 +    void itemsTagsChanged(const Item::List &items, const QSet<Tag> &addedTags, \
const QSet<Tag> &removedTags)  +    {
 +        if (addedTags.isEmpty() && removedTags.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        Item::List validItems;
 +        foreach (const Akonadi::Item &item, items) {
 +            if (!item.remoteId().isEmpty()) {
 +                validItems << item;
 +            }
 +        }
 +        if (validItems.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
 +    }
 +
 +    // TODO move the move translation code from AgentBasePrivate here, it's wrong \
for agents  +    void itemMoved(const Akonadi::Item &item, const Akonadi::Collection \
&source, const Akonadi::Collection &destination)  +    {
 +        if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || \
destination == source) {  +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::itemMoved(item, source, destination);
 +    }
 +
 +    void itemsMoved(const Item::List &items, const Collection &source, const \
Collection &destination)  +    {
 +        if (destination.remoteId().isEmpty() || destination == source) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        Item::List validItems;
 +        foreach (const Akonadi::Item &item, items) {
 +            if (!item.remoteId().isEmpty()) {
 +                validItems << item;
 +            }
 +        }
 +        if (validItems.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::itemsMoved(validItems, source, destination);
 +    }
 +
 +    void itemRemoved(const Akonadi::Item &item)
 +    {
 +        if (item.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::itemRemoved(item);
 +    }
 +
 +    void itemsRemoved(const Item::List &items)
 +    {
 +        Item::List validItems;
 +        foreach (const Akonadi::Item &item, items) {
 +            if (!item.remoteId().isEmpty()) {
 +                validItems << item;
 +            }
 +        }
 +        if (validItems.isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::itemsRemoved(validItems);
 +    }
 +
 +    void collectionAdded(const Akonadi::Collection &collection, const \
Akonadi::Collection &parent)  +    {
 +        if (parent.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::collectionAdded(collection, parent);
 +    }
 +
 +    void collectionChanged(const Akonadi::Collection &collection)
 +    {
 +        if (collection.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::collectionChanged(collection);
 +    }
 +
 +    void collectionChanged(const Akonadi::Collection &collection, const QSet< \
QByteArray > &partIdentifiers)  +    {
 +        if (collection.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::collectionChanged(collection, partIdentifiers);
 +    }
 +
 +    void collectionMoved(const Akonadi::Collection &collection, const \
Akonadi::Collection &source, const Akonadi::Collection &destination)  +    {
 +        // unknown destination or source == destination means we can't do/don't \
have to do anything  +        if (destination.remoteId().isEmpty() || source == \
destination) {  +            changeProcessed();
 +            return;
 +        }
 +
 +        // inter-resource moves, requires we know which resources the source and \
destination are in though  +        if (!source.resource().isEmpty() && \
!destination.resource().isEmpty() && source.resource() != destination.resource()) {  \
+            if (source.resource() == q_ptr->identifier()) {   // moved away from us  \
+                AgentBasePrivate::collectionRemoved(collection);  +            } \
else if (destination.resource() == q_ptr->identifier()) {   // moved to us  +         \
scheduler->taskDone(); // stop change replay for now  +                RecursiveMover \
*mover = new RecursiveMover(this);  +                mover->setCollection(collection, \
destination);  +                scheduler->scheduleMoveReplay(collection, mover);
 +            }
 +            return;
 +        }
 +
 +        // intra-resource move, requires the moved collection to have a valid id \
though  +        if (collection.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        // intra-resource move, ie. something we can handle internally
 +        AgentBasePrivate::collectionMoved(collection, source, destination);
 +    }
 +
 +    void collectionRemoved(const Akonadi::Collection &collection)
 +    {
 +        if (collection.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +        AgentBasePrivate::collectionRemoved(collection);
 +    }
 +
 +    void tagAdded(const Akonadi::Tag &tag)
 +    {
 +        if (!tag.isValid()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::tagAdded(tag);
 +    }
 +
 +    void tagChanged(const Akonadi::Tag &tag)
 +    {
 +        if (tag.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::tagChanged(tag);
 +    }
 +
 +    void tagRemoved(const Akonadi::Tag &tag)
 +    {
 +        if (tag.remoteId().isEmpty()) {
 +            changeProcessed();
 +            return;
 +        }
 +
 +        AgentBasePrivate::tagRemoved(tag);
 +    }
 +
 +public:
 +    // synchronize states
 +    Collection currentCollection;
 +
 +    ResourceScheduler *scheduler;
 +    ItemSync *mItemSyncer;
 +    ItemFetchScope *mItemSyncFetchScope;
 +    ItemSync::TransactionMode mItemTransactionMode;
 +    CollectionSync *mCollectionSyncer;
 +    bool mHierarchicalRid;
 +    QTimer mProgressEmissionCompressor;
 +    int mUnemittedProgress;
 +    QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
 +    bool mAutomaticProgressReporting;
 +    QPointer<RecursiveMover> m_recursiveMover;
++    int mItemSyncBatchSize;
 +};
 +
 +ResourceBase::ResourceBase(const QString &id)
 +    : AgentBase(new ResourceBasePrivate(this), id)
 +{
 +    Q_D(ResourceBase);
 +
 +    new Akonadi__ResourceAdaptor(this);
 +
 +    d->scheduler = new ResourceScheduler(this);
 +
 +    d->mChangeRecorder->setChangeRecordingEnabled(true);
 +    d->mChangeRecorder->setCollectionMoveTranslationEnabled(false);   // we deal \
with this ourselves  +    connect(d->mChangeRecorder, SIGNAL(changesAdded()),
 +            d->scheduler, SLOT(scheduleChangeReplay()));
 +
 +    d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
 +    d->mChangeRecorder->fetchCollection(true);
 +
 +    connect(d->scheduler, SIGNAL(executeFullSync()),
 +            SLOT(retrieveCollections()));
 +    connect(d->scheduler, SIGNAL(executeCollectionTreeSync()),
 +            SLOT(retrieveCollections()));
 +    connect(d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)),
 +            SLOT(slotSynchronizeCollection(Akonadi::Collection)));
 +    connect(d->scheduler, \
SIGNAL(executeCollectionAttributesSync(Akonadi::Collection)),  +            \
SLOT(slotSynchronizeCollectionAttributes(Akonadi::Collection)));  +    \
connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),  +    \
SLOT(slotPrepareItemRetrieval(Akonadi::Item)));  +    connect(d->scheduler, \
SIGNAL(executeResourceCollectionDeletion()),  +            \
SLOT(slotDeleteResourceCollection()));  +    connect(d->scheduler, \
SIGNAL(executeCacheInvalidation(Akonadi::Collection)),  +            \
SLOT(slotInvalidateCache(Akonadi::Collection)));  +    connect(d->scheduler, \
SIGNAL(status(int,QString)),  +            SIGNAL(status(int,QString)));
 +    connect(d->scheduler, SIGNAL(executeChangeReplay()),
 +            d->mChangeRecorder, SLOT(replayNext()));
 +    connect(d->scheduler, SIGNAL(executeRecursiveMoveReplay(RecursiveMover*)),
 +            SLOT(slotRecursiveMoveReplay(RecursiveMover*)));
 +    connect(d->scheduler, SIGNAL(fullSyncComplete()), SIGNAL(synchronized()));
 +    connect(d->scheduler, SIGNAL(collectionTreeSyncComplete()), \
SIGNAL(collectionTreeSynchronized()));  +    connect(d->mChangeRecorder, \
SIGNAL(nothingToReplay()), d->scheduler, SLOT(taskDone()));  +    \
connect(d->mChangeRecorder, SIGNAL(collectionRemoved(Akonadi::Collection)),  +        \
d->scheduler, SLOT(collectionRemoved(Akonadi::Collection)));  +    connect(this, \
SIGNAL(abortRequested()), this, SLOT(slotAbortRequested()));  +    connect(this, \
SIGNAL(synchronized()), d->scheduler, SLOT(taskDone()));  +    connect(this, \
SIGNAL(collectionTreeSynchronized()), d->scheduler, SLOT(taskDone()));  +    \
connect(this, SIGNAL(agentNameChanged(QString)),  +            this, \
SIGNAL(nameChanged(QString)));  +
 +    connect(&d->mProgressEmissionCompressor, SIGNAL(timeout()),
 +            this, SLOT(slotDelayedEmitProgress()));
 +
 +    d->scheduler->setOnline(d->mOnline);
 +    if (!d->mChangeRecorder->isEmpty()) {
 +        d->scheduler->scheduleChangeReplay();
 +    }
 +
 +    new ResourceSelectJob(identifier());
 +
 +    connect(d->mChangeRecorder->session(), SIGNAL(reconnected()), \
SLOT(slotSessionReconnected()));  +}
 +
 +ResourceBase::~ResourceBase()
 +{
 +}
 +
 +void ResourceBase::synchronize()
 +{
 +    d_func()->scheduler->scheduleFullSync();
 +}
 +
 +void ResourceBase::setName(const QString &name)
 +{
 +    AgentBase::setAgentName(name);
 +}
 +
 +QString ResourceBase::name() const
 +{
 +    return AgentBase::agentName();
 +}
 +
 +QString ResourceBase::parseArguments(int argc, char **argv)
 +{
 +    QString identifier;
 +    if (argc < 3) {
 +        qDebug() << "Not enough arguments passed...";
 +        exit(1);
 +    }
 +
 +    for (int i = 1; i < argc - 1; ++i) {
 +        if (QLatin1String(argv[i]) == QLatin1String("--identifier")) {
 +            identifier = QLatin1String(argv[i + 1]);
 +        }
 +    }
 +
 +    if (identifier.isEmpty()) {
 +        qDebug() << "Identifier argument missing";
 +        exit(1);
 +    }
 +
 +    const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
 +    // strip off full path and possible .exe suffix
 +    const QByteArray catalog = fi.baseName().toLatin1();
 +
 +    KCmdLineArgs::init(argc, argv, \
ServerManager::addNamespace(identifier).toLatin1(), catalog,  +                       \
ki18nc("@title application name", "Akonadi Resource"), \
AKONADILIBRARIES_VERSION_STRING,  +                       ki18nc("@title application \
description", "Akonadi Resource"));  +
 +    KCmdLineOptions options;
 +    options.add("identifier <argument>",
 +                ki18nc("@label commandline option", "Resource identifier"));
 +    KCmdLineArgs::addCmdLineOptions(options);
 +
 +    return identifier;
 +}
 +
 +int ResourceBase::init(ResourceBase *r)
 +{
 +    QApplication::setQuitOnLastWindowClosed(false);
 +#warning port to the new way of doing this
 +//   KLocalizedString::insertCatalog( QLatin1String( "libakonadi" ) );
 +    int rv = kapp->exec();
 +    delete r;
 +    return rv;
 +}
 +
 +void ResourceBasePrivate::slotAbortRequested()
 +{
 +    Q_Q(ResourceBase);
 +
 +    scheduler->cancelQueues();
 +    QMetaObject::invokeMethod(q, "abortActivity");
 +}
 +
 +void ResourceBase::itemRetrieved(const Item &item)
 +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
 +    if (!item.isValid()) {
 +        d->scheduler->currentTask().sendDBusReplies(i18nc("@info", "Invalid item \
retrieved"));  +        d->scheduler->taskDone();
 +        return;
 +    }
 +
 +    Item i(item);
 +    QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
 +    foreach (const QByteArray &part, requestedParts) {
 +        if (!item.loadedPayloadParts().contains(part)) {
 +            qWarning() << "Item does not provide part" << part;
 +        }
 +    }
 +
 +    ItemModifyJob *job = new ItemModifyJob(i);
 +  job->d_func()->setSilent( true );
 +    // FIXME: remove once the item with which we call retrieveItem() has a revision \
number  +    job->disableRevisionCheck();
 +    connect(job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)));
 +}
 +
 +void ResourceBasePrivate::slotDeliveryDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
 +    if (job->error()) {
 +        emit q->error(i18nc("@info", "Error while creating item: %1", \
job->errorString()));  +    }
 +    scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : \
QString());  +    scheduler->taskDone();
 +}
 +
 +void ResourceBase::collectionAttributesRetrieved(const Collection &collection)
 +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionAttributes);  +    if (!collection.isValid()) {
 +        emit attributesSynchronized(d->scheduler->currentTask().collection.id());
 +        d->scheduler->taskDone();
 +        return;
 +    }
 +
 +    CollectionModifyJob *job = new CollectionModifyJob(collection);
 +    connect(job, SIGNAL(result(KJob*)), \
SLOT(slotCollectionAttributesSyncDone(KJob*)));  +}
 +
 +void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    Q_ASSERT(scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionAttributes);  +    if (job->error()) {
 +        emit q->error(i18nc("@info", "Error while updating collection: %1", \
job->errorString()));  +    }
 +    emit q->attributesSynchronized(scheduler->currentTask().collection.id());
 +    scheduler->taskDone();
 +}
 +
 +void ResourceBasePrivate::slotDeleteResourceCollection()
 +{
 +    Q_Q(ResourceBase);
 +
 +    CollectionFetchJob *job = new CollectionFetchJob(Collection::root(), \
CollectionFetchJob::FirstLevel);  +    \
job->fetchScope().setResource(q->identifier());  +    connect(job, \
SIGNAL(result(KJob*)), q, SLOT(slotDeleteResourceCollectionDone(KJob*)));  +}
 +
 +void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    if (job->error()) {
 +        emit q->error(job->errorString());
 +        scheduler->taskDone();
 +    } else {
 +        const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob \
*>(job);  +
 +        if (!fetchJob->collections().isEmpty()) {
 +            CollectionDeleteJob *job = new \
CollectionDeleteJob(fetchJob->collections().first());  +            connect(job, \
SIGNAL(result(KJob*)), q, SLOT(slotCollectionDeletionDone(KJob*)));  +        } else \
{  +            // there is no resource collection, so just ignore the request
 +            scheduler->taskDone();
 +        }
 +    }
 +}
 +
 +void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    if (job->error()) {
 +        emit q->error(job->errorString());
 +    }
 +
 +    scheduler->taskDone();
 +}
 +
 +void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection \
&collection)  +{
 +    Q_Q(ResourceBase);
 +    InvalidateCacheJob *job = new InvalidateCacheJob(collection, q);
 +    connect(job, SIGNAL(result(KJob*)), scheduler, SLOT(taskDone()));
 +}
 +
 +void ResourceBase::changeCommitted(const Item &item)
 +{
 +    changesCommitted(Item::List() << item);
 +}
 +
 +void ResourceBase::changesCommitted(const Item::List &items)
 +{
 +    ItemModifyJob *job = new ItemModifyJob(items);
 +    job->d_func()->setClean();
 +    job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the \
error?  +    job->setIgnorePayload(true);   // we only want to reset the dirty flag \
and update the remote id  +    job->setUpdateGid(true);   // allow resources to \
update GID too  +    connect(job, SIGNAL(finished(KJob*)), this, \
SLOT(changeCommittedResult(KJob*)));  +}
 +
 +void ResourceBase::changeCommitted(const Collection &collection)
 +{
 +    CollectionModifyJob *job = new CollectionModifyJob(collection);
 +    connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
 +}
 +
 +void ResourceBasePrivate::changeCommittedResult(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    if (qobject_cast<CollectionModifyJob *>(job)) {
 +        if (job->error()) {
 +            emit q->error(i18nc("@info", "Updating local collection failed: %1.", \
job->errorText()));  +        }
 +        mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob \
*>(job)->collection());  +    } else {
 +        // TODO: Error handling for item changes?
 +        // Item and tag cache is invalidated by modify job
 +    }
 +
 +    changeProcessed();
 +}
 +
 +void ResourceBase::changeCommitted(const Tag &tag)
 +{
 +    TagModifyJob *job = new TagModifyJob(tag);
 +    connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
 +}
 +
 +bool ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId,
 +                                       const QString &mimeType, const QStringList \
&parts)  +{
 +    return requestItemDeliveryV2(uid, remoteId, mimeType, parts).isEmpty();
 +}
 +
 +QString ResourceBase::requestItemDeliveryV2(qint64 uid, const QString &remoteId, \
const QString &mimeType, const QStringList &_parts)  +{
 +    Q_D(ResourceBase);
 +    if (!isOnline()) {
 +        const QString errorMsg = i18nc("@info", "Cannot fetch item in offline \
mode.");  +        emit error(errorMsg);
 +        return errorMsg;
 +    }
 +
 +    setDelayedReply(true);
 +    // FIXME: we need at least the revision number too
 +    Item item(uid);
 +    item.setMimeType(mimeType);
 +    item.setRemoteId(remoteId);
 +
 +    QSet<QByteArray> parts;
 +    Q_FOREACH (const QString &str, _parts) {
 +        parts.insert(str.toLatin1());
 +    }
 +
 +    d->scheduler->scheduleItemFetch(item, parts, message());
 +
 +    return QString();
 +
 +}
 +
 +void ResourceBase::collectionsRetrieved(const Collection::List &collections)
 +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree ||  +               \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll,  +               \
"ResourceBase::collectionsRetrieved()",  +               "Calling \
collectionsRetrieved() although no collection retrieval is in progress");  +    if \
(!d->mCollectionSyncer) {  +        d->mCollectionSyncer = new \
CollectionSync(identifier());  +        \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);  +        \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong)));  +        connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));  +    }
 +    d->mCollectionSyncer->setRemoteCollections(collections);
 +}
 +
 +void ResourceBase::collectionsRetrievedIncremental(const Collection::List \
&changedCollections,  +                                                   const \
Collection::List &removedCollections)  +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree ||  +               \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll,  +               \
"ResourceBase::collectionsRetrievedIncremental()",  +               "Calling \
collectionsRetrievedIncremental() although no collection retrieval is in progress");  \
+    if (!d->mCollectionSyncer) {  +        d->mCollectionSyncer = new \
CollectionSync(identifier());  +        \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);  +        \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong)));  +        connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));  +    }
 +    d->mCollectionSyncer->setRemoteCollections(changedCollections, \
removedCollections);  +}
 +
 +void ResourceBase::setCollectionStreamingEnabled(bool enable)
 +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree ||  +               \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll,  +               \
"ResourceBase::setCollectionStreamingEnabled()",  +               "Calling \
setCollectionStreamingEnabled() although no collection retrieval is in progress");  + \
if (!d->mCollectionSyncer) {  +        d->mCollectionSyncer = new \
CollectionSync(identifier());  +        \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);  +        \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong)));  +        connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));  +    }
 +    d->mCollectionSyncer->setStreamingEnabled(enable);
 +}
 +
 +void ResourceBase::collectionsRetrievalDone()
 +{
 +    Q_D(ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree ||  +               \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll,  +               \
"ResourceBase::collectionsRetrievalDone()",  +               "Calling \
collectionsRetrievalDone() although no collection retrieval is in progress");  +    \
// streaming enabled, so finalize the sync  +    if (d->mCollectionSyncer) {
 +        d->mCollectionSyncer->retrievalDone();
 +    } else {
 +        // user did the sync himself, we are done now
 +        // FIXME: we need the same special case for SyncAll as in \
slotCollectionSyncDone here!  +        d->scheduler->taskDone();
 +    }
 +}
 +
 +void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    mCollectionSyncer = 0;
 +    if (job->error()) {
 +        if (job->error() != Job::UserCanceled) {
 +            emit q->error(job->errorString());
 +        }
 +    } else {
 +        if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
 +            CollectionFetchJob *list = new CollectionFetchJob(Collection::root(), \
CollectionFetchJob::Recursive);  +            \
list->setFetchScope(q->changeRecorder()->collectionFetchScope());  +            \
list->fetchScope().setResource(mId);  +            q->connect(list, \
SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)));  +            return;
 +        } else if (scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree) {  +            \
scheduler->scheduleCollectionTreeSyncCompletion();  +        }
 +    }
 +    scheduler->taskDone();
 +}
 +
 +void ResourceBasePrivate::slotLocalListDone(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    if (job->error()) {
 +        emit q->error(job->errorString());
 +    } else {
 +        Collection::List cols = static_cast<CollectionFetchJob \
*>(job)->collections();  +        foreach (const Collection &col, cols) {
 +            scheduler->scheduleSync(col);
 +        }
 +        scheduler->scheduleFullSyncCompletion();
 +    }
 +    scheduler->taskDone();
 +}
 +
 +void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
 +{
 +    Q_Q(ResourceBase);
 +    currentCollection = col;
 +    // check if this collection actually can contain anything
 +    QStringList contentTypes = currentCollection.contentMimeTypes();
 +    contentTypes.removeAll(Collection::mimeType());
 +    contentTypes.removeAll(Collection::virtualMimeType());
 +    if (!contentTypes.isEmpty() || col.isVirtual()) {
 +        if (mAutomaticProgressReporting) {
 +            emit q->status(AgentBase::Running, i18nc("@info:status", "Syncing \
folder '%1'", currentCollection.displayName()));  +        }
 +        q->retrieveItems(currentCollection);
 +        return;
 +    }
 +    scheduler->taskDone();
 +}
 +
++int ResourceBase::itemSyncBatchSize() const
++{
++    Q_D(const ResourceBase);
++    return d->mItemSyncBatchSize;
++}
++
++void ResourceBase::setItemSyncBatchSize(int batchSize)
++{
++    Q_D(ResourceBase);
++    d->mItemSyncBatchSize = batchSize;
++}
++
 +void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection \
&col)  +{
 +    Q_Q(ResourceBase);
 +    QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", \
Q_ARG(Akonadi::Collection, col));  +}
 +
 +void ResourceBasePrivate::slotPrepareItemRetrieval(const Akonadi::Item &item)
 +{
 +    Q_Q(ResourceBase);
 +    ItemFetchJob *fetch = new ItemFetchJob(item, this);
 +    fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
  +    fetch->fetchScope().setCacheOnly(true);
 +
 +    // copy list of attributes to fetch
 +    const QSet<QByteArray> attributes = \
q->changeRecorder()->itemFetchScope().attributes();  +    foreach (const QByteArray \
&attribute, attributes) {  +        fetch->fetchScope().fetchAttribute(attribute);
 +    }
 +
 +    q->connect(fetch, SIGNAL(result(KJob*)), \
SLOT(slotPrepareItemRetrievalResult(KJob*)));  +}
 +
 +void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
 +               "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
 +               "Preparing item retrieval although no item retrieval is in \
progress");  +    if (job->error()) {
 +        q->cancelTask(job->errorText());
 +        return;
 +    }
 +    ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
 +    if (fetch->items().count() != 1) {
 +        q->cancelTask(i18n("The requested item no longer exists"));
 +        return;
 +    }
 +    const Item item = fetch->items().first();
 +    const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
 +    if (!q->retrieveItem(item, parts)) {
 +        q->cancelTask();
 +    }
 +}
 +
 +void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
 +{
 +    Q_Q(ResourceBase);
 +    Q_ASSERT(mover);
 +    Q_ASSERT(!m_recursiveMover);
 +    m_recursiveMover = mover;
 +    connect(mover, SIGNAL(result(KJob*)), q, \
SLOT(slotRecursiveMoveReplayResult(KJob*)));  +    mover->start();
 +}
 +
 +void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
 +{
 +    Q_Q(ResourceBase);
 +    m_recursiveMover = 0;
 +
 +    if (job->error()) {
 +        q->deferTask();
 +        return;
 +    }
 +
 +    changeProcessed();
 +}
 +
 +void ResourceBase::itemsRetrievalDone()
 +{
 +    Q_D(ResourceBase);
 +    // streaming enabled, so finalize the sync
 +    if (d->mItemSyncer) {
 +        d->mItemSyncer->deliveryDone();
 +    } else {
 +        // user did the sync himself, we are done now
 +        d->scheduler->taskDone();
 +    }
 +}
 +
 +void ResourceBase::clearCache()
 +{
 +    Q_D(ResourceBase);
 +    d->scheduler->scheduleResourceCollectionDeletion();
 +}
 +
 +void ResourceBase::invalidateCache(const Collection &collection)
 +{
 +    Q_D(ResourceBase);
 +    d->scheduler->scheduleCacheInvalidation(collection);
 +}
 +
 +Collection ResourceBase::currentCollection() const
 +{
 +    Q_D(const ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollection ,  +               \
"ResourceBase::currentCollection()",  +               "Trying to access current \
collection although no item retrieval is in progress");  +    return \
d->currentCollection;  +}
 +
 +Item ResourceBase::currentItem() const
 +{
 +    Q_D(const ResourceBase);
 +    Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
 +               "ResourceBase::currentItem()",
 +               "Trying to access current item although no item retrieval is in \
progress");  +    return d->scheduler->currentTask().item;
 +}
 +
 +void ResourceBase::synchronizeCollectionTree()
 +{
 +    d_func()->scheduler->scheduleCollectionTreeSync();
 +}
 +
 +void ResourceBase::cancelTask()
 +{
 +    Q_D(ResourceBase);
 +    switch (d->scheduler->currentTask().type) {
 +    case ResourceScheduler::FetchItem:
 +        itemRetrieved(Item());   // sends the error reply and
 +        break;
 +    case ResourceScheduler::ChangeReplay:
 +        d->changeProcessed();
 +        break;
 +    case ResourceScheduler::SyncCollectionTree:
 +    case ResourceScheduler::SyncAll:
 +        if (d->mCollectionSyncer) {
 +            d->mCollectionSyncer->rollback();
 +        } else {
 +            d->scheduler->taskDone();
 +        }
 +        break;
 +    case ResourceScheduler::SyncCollection:
 +        if (d->mItemSyncer) {
 +            d->mItemSyncer->rollback();
 +        } else {
 +            d->scheduler->taskDone();
 +        }
 +        break;
 +    default:
 +        d->scheduler->taskDone();
 +    }
 +}
 +
 +void ResourceBase::cancelTask(const QString &msg)
 +{
 +    cancelTask();
 +
 +    emit error(msg);
 +}
 +
 +void ResourceBase::deferTask()
 +{
 +    Q_D(ResourceBase);
 +    d->scheduler->deferTask();
 +}
 +
 +void ResourceBase::doSetOnline(bool state)
 +{
 +    d_func()->scheduler->setOnline(state);
 +}
 +
 +void ResourceBase::synchronizeCollection(qint64 collectionId)
 +{
 +    synchronizeCollection(collectionId, false);
 +}
 +
 +void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
 +{
 +    CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), \
recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);  +    \
job->setFetchScope(changeRecorder()->collectionFetchScope());  +    \
job->fetchScope().setResource(identifier());  +    job->setProperty("recursive", \
recursive);  +    connect(job, SIGNAL(result(KJob*)), \
SLOT(slotCollectionListDone(KJob*)));  +}
 +
 +void ResourceBasePrivate::slotCollectionListDone(KJob *job)
 +{
 +    if (!job->error()) {
 +        Collection::List list = static_cast<CollectionFetchJob \
*>(job)->collections();  +        if (!list.isEmpty()) {
 +            if (job->property("recursive").toBool()) {
 +                Q_FOREACH (const Collection &collection, list) {
 +                    scheduler->scheduleSync(collection);
 +                }
 +            } else {
 +                scheduler->scheduleSync(list.first());
 +            }
 +        }
 +    }
 +    // TODO: error handling
 +}
 +
 +void ResourceBase::synchronizeCollectionAttributes(qint64 collectionId)
 +{
 +    CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), \
CollectionFetchJob::Base);  +    \
job->setFetchScope(changeRecorder()->collectionFetchScope());  +    \
job->fetchScope().setResource(identifier());  +    connect(job, \
SIGNAL(result(KJob*)), SLOT(slotCollectionListForAttributesDone(KJob*)));  +}
 +
 +void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
 +{
 +    if (!job->error()) {
 +        Collection::List list = static_cast<CollectionFetchJob \
*>(job)->collections();  +        if (!list.isEmpty()) {
 +            Collection col = list.first();
 +            scheduler->scheduleAttributesSync(col);
 +        }
 +    }
 +    // TODO: error handling
 +}
 +
 +void ResourceBase::setTotalItems(int amount)
 +{
 +    qDebug() << amount;
 +    Q_D(ResourceBase);
 +    setItemStreamingEnabled(true);
 +    if (d->mItemSyncer) {
 +        d->mItemSyncer->setTotalItems(amount);
 +    }
 +}
 +
 +void ResourceBase::setItemStreamingEnabled(bool enable)
 +{
 +    Q_D(ResourceBase);
 +    d->createItemSyncInstanceIfMissing();
 +    if (d->mItemSyncer) {
 +        d->mItemSyncer->setStreamingEnabled(enable);
 +    }
 +}
 +
 +void ResourceBase::itemsRetrieved(const Item::List &items)
 +{
 +    Q_D(ResourceBase);
 +    d->createItemSyncInstanceIfMissing();
 +    if (d->mItemSyncer) {
 +        d->mItemSyncer->setFullSyncItems(items);
 +    }
 +}
 +
 +void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems,
 +                                             const Item::List &removedItems)
 +{
 +    Q_D(ResourceBase);
 +    d->createItemSyncInstanceIfMissing();
 +    if (d->mItemSyncer) {
 +        d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
 +    }
 +}
 +
 +void ResourceBasePrivate::slotItemSyncDone(KJob *job)
 +{
 +    mItemSyncer = 0;
 +    Q_Q(ResourceBase);
 +    if (job->error() && job->error() != Job::UserCanceled) {
 +        emit q->error(job->errorString());
 +    }
 +    scheduler->taskDone();
 +}
 +
 +void ResourceBasePrivate::slotDelayedEmitProgress()
 +{
 +    Q_Q(ResourceBase);
 +    if (mAutomaticProgressReporting) {
 +        emit q->percent(mUnemittedProgress);
 +
 +        Q_FOREACH (const QVariantMap &statusMap, mUnemittedAdvancedStatus) {
 +            emit q->advancedStatus(statusMap);
 +        }
 +    }
 +    mUnemittedProgress = 0;
 +    mUnemittedAdvancedStatus.clear();
 +}
 +
 +void ResourceBasePrivate::slotPercent(KJob *job, unsigned long percent)
 +{
 +    mUnemittedProgress = percent;
 +
 +    const Collection collection = job->property("collection").value<Collection>();
 +    if (collection.isValid()) {
 +        QVariantMap statusMap;
 +        statusMap.insert(QStringLiteral("key"), \
QString::fromLatin1("collectionSyncProgress"));  +        \
statusMap.insert(QStringLiteral("collectionId"), collection.id());  +        \
statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent));  +
 +        mUnemittedAdvancedStatus[collection.id()] = statusMap;
 +    }
 +    // deliver completion right away, intermediate progress at 1s intervals
 +    if (percent == 100) {
 +        mProgressEmissionCompressor.stop();
 +        slotDelayedEmitProgress();
 +    } else if (!mProgressEmissionCompressor.isActive()) {
 +        mProgressEmissionCompressor.start();
 +    }
 +}
 +
 +void ResourceBase::setHierarchicalRemoteIdentifiersEnabled(bool enable)
 +{
 +    Q_D(ResourceBase);
 +    d->mHierarchicalRid = enable;
 +}
 +
 +void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const \
QVariant &argument, SchedulePriority priority)  +{
 +    Q_D(ResourceBase);
 +    d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
 +}
 +
 +void ResourceBase::taskDone()
 +{
 +    Q_D(ResourceBase);
 +    d->scheduler->taskDone();
 +}
 +
 +void ResourceBase::retrieveCollectionAttributes(const Collection &collection)
 +{
 +    collectionAttributesRetrieved(collection);
 +}
 +
 +void Akonadi::ResourceBase::abortActivity()
 +{
 +}
 +
 +void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode)
 +{
 +    Q_D(ResourceBase);
 +    d->mItemTransactionMode = mode;
 +}
 +
 +void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope \
&fetchScope)  +{
 +    Q_D(ResourceBase);
 +    if (!d->mItemSyncFetchScope) {
 +        d->mItemSyncFetchScope = new ItemFetchScope;
 +    }
 +    *(d->mItemSyncFetchScope) = fetchScope;
 +}
 +
 +void ResourceBase::setAutomaticProgressReporting(bool enabled)
 +{
 +    Q_D(ResourceBase);
 +    d->mAutomaticProgressReporting = enabled;
 +}
 +
 +QString ResourceBase::dumpNotificationListToString() const
 +{
 +    Q_D(const ResourceBase);
 +    return d->dumpNotificationListToString();
 +}
 +
 +QString ResourceBase::dumpSchedulerToString() const
 +{
 +    Q_D(const ResourceBase);
 +    return d->dumpToString();
 +}
 +
 +void ResourceBase::dumpMemoryInfo() const
 +{
 +    Q_D(const ResourceBase);
 +    return d->dumpMemoryInfo();
 +}
 +
 +QString ResourceBase::dumpMemoryInfoToString() const
 +{
 +    Q_D(const ResourceBase);
 +    return d->dumpMemoryInfoToString();
 +}
 +
 +#include "resourcebase.moc"
 +#include "moc_resourcebase.cpp"
diff --cc akonadi/src/agentbase/resourcebase.h
index 3901cb0,0000000..5076777
mode 100644,000000..100644
--- a/akonadi/src/agentbase/resourcebase.h
+++ b/akonadi/src/agentbase/resourcebase.h
@@@ -1,725 -1,0 +1,761 @@@
 +/*
 +    This file is part of akonadiresources.
 +
 +    Copyright (c) 2006 Till Adam <adam@kde.org>
 +    Copyright (c) 2007 Volker Krause <vkrause@kde.org>
 +
 +    This library is free software; you can redistribute it and/or modify it
 +    under the terms of the GNU Library General Public License as published by
 +    the Free Software Foundation; either version 2 of the License, or (at your
 +    option) any later version.
 +
 +    This library is distributed in the hope that it will be useful, but WITHOUT
 +    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 +    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
 +    License for more details.
 +
 +    You should have received a copy of the GNU Library General Public License
 +    along with this library; see the file COPYING.LIB.  If not, write to the
 +    Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 +    02110-1301, USA.
 +*/
 +
 +#ifndef AKONADI_RESOURCEBASE_H
 +#define AKONADI_RESOURCEBASE_H
 +
 +#include "akonadiagentbase_export.h"
 +#include "agentbase.h"
 +#include "collection.h"
 +#include "item.h"
 +#include "itemsync.h"
 +
 +class KJob;
 +class Akonadi__ResourceAdaptor;
 +class ResourceState;
 +
 +namespace Akonadi {
 +
 +class ResourceBasePrivate;
 +
 +/**
 + * @short The base class for all Akonadi resources.
 + *
 + * This class should be used as a base class by all resource agents,
 + * because it encapsulates large parts of the protocol between
 + * resource agent, agent manager and the Akonadi storage.
 + *
 + * It provides many convenience methods to make implementing a
 + * new Akonadi resource agent as simple as possible.
 + *
 + * <h4>How to write a resource</h4>
 + *
 + * The following provides an overview of what you need to do to implement
 + * your own Akonadi resource. In the following, the term 'backend' refers
 + * to the entity the resource connects with Akonadi, be it a single file
 + * or a remote server.
 + *
 + * @todo Complete this (online/offline state management)
 + *
 + * <h5>Basic %Resource Framework</h5>
 + *
 + * The following is needed to create a new resource:
 + * - A new class deriving from Akonadi::ResourceBase, implementing at least all
 + *   pure-virtual methods, see below for further details.
 + * - call init() in your main() function.
 + * - a .desktop file similar to the following example
 + *   \code
 + * [Desktop Entry]
 + * Encoding=UTF-8
 + * Name=My Akonadi Resource
 + * Type=AkonadiResource
 + * Exec=akonadi_my_resource
 + * Icon=my-icon
 + *
 + * X-Akonadi-MimeTypes=<supported-mimetypes>
 + * X-Akonadi-Capabilities=Resource
 + * X-Akonadi-Identifier=akonadi_my_resource
 + *   \endcode
 + *
 + * <h5>Handling PIM Items</h5>
 + *
 + * To follow item changes in the backend, the following steps are necessary:
 + * - Implement retrieveItems() to synchronize all items in the given
 + *   collection. If the backend supports incremental retrieval,
 + *   implementing support for that is recommended to improve performance.
 + * - Convert the items provided by the backend to Akonadi items.
 + *   This typically happens either in retrieveItems() if you retrieved
 + *   the collection synchronously (not recommended for network backends) or
 + *   in the result slot of the asynchronous retrieval job.
 + *   Converting means to create Akonadi::Item objects for every retrieved
 + *   item. It's very important that every object has its remote identifier set.
 + * - Call itemsRetrieved() or itemsRetrievedIncremental() respectively
 + *   with the item objects created above. The Akonadi storage will then be
 + *   updated automatically. Note that it is usually not necessary to manipulate
 + *   any item in the Akonadi storage manually.
 + *
 + * To fetch item data on demand, the method retrieveItem() needs to be
 + * reimplemented. Fetch the requested data there and call itemRetrieved()
 + * with the result item.
 + *
 + * To write local changes back to the backend, you need to re-implement
 + * the following three methods:
 + * - itemAdded()
 + * - itemChanged()
 + * - itemRemoved()
 + *
 + * Note that these three functions don't get the full payload of the items by \
default,  + * you need to change the item fetch scope of the change recorder to fetch \
the full  + * payload. This can be expensive with big payloads, though.<br>
 + * Once you have handled changes in these methods, call changeCommitted().
 + * These methods are called whenever a local item related to this resource is
 + * added, modified or deleted. They are only called if the resource is online, \
otherwise  + * all changes are recorded and replayed as soon the resource is online \
again.  + *
 + * <h5>Handling Collections</h5>
 + *
 + * To follow collection changes in the backend, the following steps are necessary:
 + * - Implement retrieveCollections() to retrieve collections from the backend.
 + *   If the backend supports incremental collections updates, implementing
 + *   support for that is recommended to improve performance.
 + * - Convert the collections of the backend to Akonadi collections.
 + *   This typically happens either in retrieveCollections() if you retrieved
 + *   the collection synchronously (not recommended for network backends) or
 + *   in the result slot of the asynchronous retrieval job.
 + *   Converting means to create Akonadi::Collection objects for every retrieved
 + *   collection. It's very important that every object has its remote identifier
 + *   and its parent remote identifier set.
 + * - Call collectionsRetrieved() or collectionsRetrievedIncremental() respectively
 + *   with the collection objects created above. The Akonadi storage will then be
 + *   updated automatically. Note that it is usually not necessary to manipulate
 + *   any collection in the Akonadi storage manually.
 + *
 + *
 + * To write local collection changes back to the backend, you need to re-implement
 + * the following three methods:
 + * - collectionAdded()
 + * - collectionChanged()
 + * - collectionRemoved()
 + * Once you have handled changes in these methods call changeCommitted().
 + * These methods are called whenever a local collection related to this resource is
 + * added, modified or deleted. They are only called if the resource is online, \
otherwise  + * all changes are recorded and replayed as soon the resource is online \
again.  + *
 + * @todo Convenience base class for collection-less resources
 + */
 +// FIXME_API: API dox need to be updated for Observer approach (kevin)
 +class AKONADIAGENTBASE_EXPORT ResourceBase : public AgentBase
 +{
 +    Q_OBJECT
 +
 +public:
 +    /**
 +     * Use this method in the main function of your resource
 +     * application to initialize your resource subclass.
 +     * This method also takes care of creating a KApplication
 +     * object and parsing command line arguments.
 +     *
 +     * @note In case the given class is also derived from AgentBase::Observer
 +     *       it gets registered as its own observer (see AgentBase::Observer), e.g.
 +     *       <tt>resourceInstance->registerObserver( resourceInstance );</tt>
 +     *
 +     * @code
 +     *
 +     *   class MyResource : public ResourceBase
 +     *   {
 +     *     ...
 +     *   };
 +     *
 +     *   int main( int argc, char **argv )
 +     *   {
 +     *     return ResourceBase::init<MyResource>( argc, argv );
 +     *   }
 +     *
 +     * @endcode
 +     *
 +     * @param argc number of arguments
 +     * @param argv string arguments
 +     */
 +    template <typename T>
 +    static int init(int argc, char **argv)
 +    {
 +        const QString id = parseArguments(argc, argv);
 +        KApplication app;
 +        T *r = new T(id);
 +
 +        // check if T also inherits AgentBase::Observer and
 +        // if it does, automatically register it on itself
 +        Observer *observer = dynamic_cast<Observer *>(r);
 +        if (observer != 0) {
 +            r->registerObserver(observer);
 +        }
 +
 +        return init(r);
 +    }
 +
 +    /**
 +     * This method is used to set the name of the resource.
 +     */
 +    void setName(const QString &name);
 +
 +    /**
 +     * Returns the name of the resource.
 +     */
 +    QString name() const;
 +
 +    /**
 +     * Enable or disable automatic progress reporting. By default, it is enabled.
 +     * When enabled, the resource will automatically emit the signals percent() and \
status()  +     * while syncing items or collections.
 +     *
 +     * The automatic progress reporting is done on a per item / per collection \
basis, so if a  +     * finer granularity is desired, automatic reporting should be \
disabled and the subclass should  +     * emit the percent() and status() signals \
itself.  +     *
 +     * @param enabled Whether or not automatic emission of the signals is enabled.
 +     * @since 4.7
 +     */
 +    void setAutomaticProgressReporting(bool enabled);
 +
 +Q_SIGNALS:
 +    /**
 +     * This signal is emitted whenever the name of the resource has changed.
 +     *
 +     * @param name The new name of the resource.
 +     */
 +    void nameChanged(const QString &name);
 +
 +    /**
 +     * Emitted when a full synchronization has been completed.
 +     */
 +    void synchronized();
 +
 +    /**
 +     * Emitted when a collection attributes synchronization has been completed.
 +     *
 +     * @param collectionId The identifier of the collection whose attributes got \
synchronized.  +     * @since 4.6
 +     */
 +    void attributesSynchronized(qlonglong collectionId);
 +
 +    /**
 +     * Emitted when a collection tree synchronization has been completed.
 +     *
 +     * @since 4.8
 +     */
 +    void collectionTreeSynchronized();
 +
++    /**
++     * Emitted when the item synchronization processed the current batch and is \
ready for a new one. ++     * Use this to throttle the delivery to not overload \
Akonadi. ++     *
++     * Throttling can be used during item retrieval \
(retrieveItems(Akonadi::Collection)) in streaming mode. ++     * To throttle only \
deliver itemSyncBatchSize() items, and wait for this signal, then again deliver ++    \
* @param remainingBatchSize items. ++     *
++     * By always only providing the number of items required to process the batch, \
the items don't pile ++     * up in memory and items are only provided as fast as \
Akonadi can process them. ++     *
++     * @see batchSize()
++     *
++     * @since 4.14
++     */
++    void retrieveNextItemSyncBatch(int remainingBatchSize);
++
 +protected Q_SLOTS:
 +    /**
 +     * Retrieve the collection tree from the remote server and supply it via
 +     * collectionsRetrieved() or collectionsRetrievedIncremental().
 +     * @see collectionsRetrieved(), collectionsRetrievedIncremental()
 +     */
 +    virtual void retrieveCollections() = 0;
 +
 +    /**
 +     * Retrieve the attributes of a single collection from the backend. The
 +     * collection to retrieve attributes for is provided as @p collection.
 +     * Add the attributes parts and call collectionAttributesRetrieved()
 +     * when done.
 +     *
 +     * @param collection The collection whose attributes should be retrieved.
 +     * @see collectionAttributesRetrieved()
 +     * @since 4.6
 +     */
 +    // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
 +    //       in order to simulate polymorphism
 +    void retrieveCollectionAttributes(const Akonadi::Collection &collection);
 +
 +    /**
 +     * Retrieve all (new/changed) items in collection @p collection.
 +     * It is recommended to use incremental retrieval if the backend supports that
 +     * and provide the result by calling itemsRetrievedIncremental().
 +     * If incremental retrieval is not possible, provide the full listing by \
calling  +     * itemsRetrieved( const Item::List& ).
 +     * In any case, ensure that all items have a correctly set remote identifier
 +     * to allow synchronizing with items already existing locally.
 +     * In case you don't want to use the built-in item syncing code, store the \
retrieved  +     * items manually and call itemsRetrieved() once you are done.
 +     * @param collection The collection whose items to retrieve.
-      * @see itemsRetrieved( const Item::List& ), itemsRetrievedIncremental(), \
itemsRetrieved(), currentCollection() ++     * @see itemsRetrieved( const Item::List& \
), itemsRetrievedIncremental(), itemsRetrieved(), currentCollection(), batchSize()  + \
*/  +    virtual void retrieveItems(const Akonadi::Collection &collection) = 0;
 +
 +    /**
++     * Returns the batch size used during the item sync.
++     *
++     * This can be used to throttle the item delivery.
++     *
++     * @see retrieveNextItemSyncBatch(int), retrieveItems(Akonadi::Collection)
++     * @since 4.14
++     */
++    int itemSyncBatchSize() const;
++
++    /**
++     * Set the batch size used during the item sync.
++     * The default is 10.
++     *
++     * @see retrieveNextItemSyncBatch(int)
++     * @since 4.14
++     */
++    void setItemSyncBatchSize(int batchSize);
++
++    /**
 +     * Retrieve a single item from the backend. The item to retrieve is provided as \
@p item.  +     * Add the requested payload parts and call itemRetrieved() when done.
 +     * @param item The empty item whose payload should be retrieved. Use this \
object when delivering  +     * the result instead of creating a new item to ensure \
conflict detection will work.  +     * @param parts The item parts that should be \
retrieved.  +     * @return false if there is an immediate error when retrieving the \
item.  +     * @see itemRetrieved()
 +     */
 +    virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> \
&parts) = 0;  +
 +    /**
 +     * Abort any activity in progress in the backend. By default this method does \
nothing.  +     *
 +     * @since 4.6
 +     */
 +    // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
 +    //       in order to simulate polymorphism
 +    void abortActivity();
 +
 +    /**
 +     * Dump resource internals, for debugging.
 +     * @since 4.9
 +     */
 +    // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
 +    //       in order to simulate polymorphism
 +    QString dumpResourceToString() const
 +    {
 +        return QString();
 +    }
 +
 +protected:
 +    /**
 +     * Creates a base resource.
 +     *
 +     * @param id The instance id of the resource.
 +     */
 +    ResourceBase(const QString &id);
 +
 +    /**
 +     * Destroys the base resource.
 +     */
 +    ~ResourceBase();
 +
 +    /**
 +     * Call this method from retrieveItem() once the result is available.
 +     *
 +     * @param item The retrieved item.
 +     */
 +    void itemRetrieved(const Item &item);
 +
 +    /**
 +     * Call this method from retrieveCollectionAttributes() once the result is \
available.  +     *
 +     * @param collection The collection whose attributes got retrieved.
 +     * @since 4.6
 +     */
 +    void collectionAttributesRetrieved(const Collection &collection);
 +
 +    /**
 +     * Resets the dirty flag of the given item and updates the remote id.
 +     *
 +     * Call whenever you have successfully written changes back to the server.
 +     * This implicitly calls changeProcessed().
 +     * @param item The changed item.
 +     */
 +    void changeCommitted(const Item &item);
 +
 +    /**
 +     * Resets the dirty flag of all given items and updates remote ids.
 +     *
 +     * Call whenever you have successfully written changes back to the server.
 +     * This implicitly calls changeProcessed().
 +     * @param items Changed items
 +     *
 +     * @since 4.11
 +     */
 +    void changesCommitted(const Item::List &items);
 +
 +    /**
 +     * Resets the dirty flag of the given tag and updates the remote id.
 +     *
 +     * Call whenever you have successfully written changes back to the server.
 +     * This implicitly calls changeProcessed().
 +     * @param tag Changed tag.
 +     *
 +     * @since 4.13
 +     */
 +    void changeCommitted(const Tag &tag);
 +
 +    /**
 +     * Call whenever you have successfully handled or ignored a collection
 +     * change notification.
 +     *
 +     * This will update the remote identifier of @p collection if necessary,
 +     * as well as any other collection attributes.
 +     * This implicitly calls changeProcessed().
 +     * @param collection The collection which changes have been handled.
 +    */
 +    void changeCommitted(const Collection &collection);
 +
 +    /**
 +     * Call this to supply the full folder tree retrieved from the remote server.
 +     *
 +     * @param collections A list of collections.
 +     * @see collectionsRetrievedIncremental()
 +    */
 +    void collectionsRetrieved(const Collection::List &collections);
 +
 +    /**
 +     * Call this to supply incrementally retrieved collections from the remote \
server.  +     *
 +     * @param changedCollections Collections that have been added or changed.
 +     * @param removedCollections Collections that have been deleted.
 +     * @see collectionsRetrieved()
 +     */
 +    void collectionsRetrievedIncremental(const Collection::List \
&changedCollections,  +                                         const \
Collection::List &removedCollections);  +
 +    /**
 +     * Enable collection streaming, that is collections don't have to be delivered \
at once  +     * as result of a retrieveCollections() call but can be delivered by \
multiple calls  +     * to collectionsRetrieved() or \
collectionsRetrievedIncremental(). When all collections  +     * have been retrieved, \
call collectionsRetrievalDone().  +     * @param enable @c true if collection \
streaming should be enabled, @c false by default  +     */
 +    void setCollectionStreamingEnabled(bool enable);
 +
 +    /**
 +     * Call this method to indicate you finished synchronizing the collection tree.
 +     *
 +     * This is not needed if you use the built in syncing without collection \
streaming  +     * and call collectionsRetrieved() or \
collectionRetrievedIncremental() instead.  +     * If collection streaming is \
enabled, call this method once all collections have been delivered  +     * using \
collectionsRetrieved() or collectionsRetrievedIncremental().  +     */
 +    void collectionsRetrievalDone();
 +
 +    /**
 +     * Call this method to supply the full collection listing from the remote \
server. Items not present in the list  +     * will be dropped from the Akonadi \
database.  +     *
 +     * If the remote server supports incremental listing, it's strongly
 +     * recommended to use itemsRetrievedIncremental() instead.
 +     * @param items A list of items.
 +     * @see itemsRetrievedIncremental().
 +     */
 +    void itemsRetrieved(const Item::List &items);
 +
 +    /**
 +     * Call this method when you want to use the itemsRetrieved() method
 +     * in streaming mode and indicate the amount of items that will arrive
 +     * that way.
 +     * @deprecated Use setItemStreamingEnabled( true ) + \
itemsRetrieved[Incremental]()  +     * + itemsRetrieved() instead.
 +     * @param amount number of items that will arrive in streaming mode
 +     */
 +    void setTotalItems(int amount);
 +
 +    /**
 +     * Enable item streaming.
 +     * Item streaming is disabled by default.
 +     * @param enable @c true if items are delivered in chunks rather in one big \
block.  +     */
 +    void setItemStreamingEnabled(bool enable);
 +
 +    /**
 +     * Set transaction mode for item sync'ing.
 +     * @param mode item transaction mode
 +     * @see Akonadi::ItemSync::TransactionMode
 +     * @since 4.6
 +     */
 +    void setItemTransactionMode(ItemSync::TransactionMode mode);
 +
 +    /**
 +     * Set the fetch scope applied for item synchronization.
 +     * By default, the one set on the changeRecorder() is used. However, it can \
make sense  +     * to specify a specialized fetch scope for synchronization to \
improve performance.  +     * The rule of thumb is to remove anything from this fetch \
scope that does not provide  +     * additional information regarding whether and \
item has changed or not. This is primarily  +     * relevant for backends not \
supporting incremental retrieval.  +     * @param fetchScope The fetch scope to use \
by the internal Akonadi::ItemSync instance.  +     * @see Akonadi::ItemSync
 +     * @since 4.6
 +     */
 +    void setItemSynchronizationFetchScope(const ItemFetchScope &fetchScope);
 +
 +    /**
 +     * Call this method to supply incrementally retrieved items from the remote \
server.  +     *
 +     * @param changedItems Items changed in the backend.
 +     * @param removedItems Items removed from the backend.
 +     */
 +    void itemsRetrievedIncremental(const Item::List &changedItems,
 +                                   const Item::List &removedItems);
 +
 +    /**
 +     * Call this method to indicate you finished synchronizing the current \
collection.  +     *
 +     * This is not needed if you use the built in syncing without item streaming
 +     * and call itemsRetrieved() or itemsRetrievedIncremental() instead.
 +     * If item streaming is enabled, call this method once all items have been \
delivered  +     * using itemsRetrieved() or itemsRetrievedIncremental().
 +     * @see retrieveItems()
 +     */
 +    void itemsRetrievalDone();
 +
 +    /**
 +     * Call this method to remove all items and collections of the resource from \
the  +     * server cache.
 +     *
 +     * The method should not be used anymore
 +     *
 +     * @see invalidateCache()
 +     * @since 4.3
 +     */
 +    void clearCache();
 +
 +    /**
 +     * Call this method to invalidate all cached content in @p collection.
 +     *
 +     * The method should be used when the backend indicated that the cached content
 +     * is no longer valid.
 +     *
 +     * @param collection parent of the content to be invalidated in cache
 +     * @since 4.8
 +     */
 +    void invalidateCache(const Collection &collection);
 +
 +    /**
 +     * Returns the collection that is currently synchronized.
 +     * @note Calling this method is only allowed during a collection \
synchronization task, that  +     * is directly or indirectly from retrieveItems().
 +     */
 +    Collection currentCollection() const;
 +
 +    /**
 +     * Returns the item that is currently retrieved.
 +     * @note Calling this method is only allowed during fetching a single item, \
that  +     * is directly or indirectly from retrieveItem().
 +     */
 +    Item currentItem() const;
 +
 +    /**
 +     * This method is called whenever the resource should start synchronize all \
data.  +     */
 +    void synchronize();
 +
 +    /**
 +     * This method is called whenever the collection with the given @p id
 +     * shall be synchronized.
 +     */
 +    void synchronizeCollection(qint64 id);
 +
 +    /**
 +     * This method is called whenever the collection with the given @p id
 +     * shall be synchronized.
 +     * @param recursive if true, a recursive synchronization is done
 +     */
 +    void synchronizeCollection(qint64 id, bool recursive);
 +
 +    /**
 +     * This method is called whenever the collection with the given @p id
 +     * shall have its attributes synchronized.
 +     *
 +     * @param id The id of the collection to synchronize
 +     * @since 4.6
 +     */
 +    void synchronizeCollectionAttributes(qint64 id);
 +
 +    /**
 +     * Refetches the Collections.
 +     */
 +    void synchronizeCollectionTree();
 +
 +    /**
 +     * Stops the execution of the current task and continues with the next one.
 +     */
 +    void cancelTask();
 +
 +    /**
 +     * Stops the execution of the current task and continues with the next one.
 +     * Additionally an error message is emitted.
 +     * @param error additional error message to be emitted
 +     */
 +    void cancelTask(const QString &error);
 +
 +    /**
 +     * Stops the execution of the current task and continues with the next one.
 +     * The current task will be tried again later.
 +     *
 +     * This can be used to delay the task processing until the resource has reached \
a safe  +     * state, e.g. login to a server succeeded.
 +     *
 +     * @note This does not change the order of tasks so if there is no task with \
higher priority  +     *       e.g. a custom task added with #Prepend the deferred \
task will be processed again.  +     *
 +     * @since 4.3
 +     */
 +    void deferTask();
 +
 +    /**
 +     * Inherited from AgentBase.
 +     */
 +    void doSetOnline(bool online);
 +
 +    /**
 +     * Indicate the use of hierarchical remote identifiers.
 +     *
 +     * This means that it is possible to have two different items with the same
 +     * remoteId in different Collections.
 +     *
 +     * This should be called in the resource constructor as needed.
 +     *
 +     * @param enable whether to enable use of hierarchical remote identifiers
 +     * @since 4.4
 +     */
 +    void setHierarchicalRemoteIdentifiersEnabled(bool enable);
 +
 +    friend class ResourceScheduler;
 +    friend class ::ResourceState;
 +
 +    /**
 +     * Describes the scheduling priority of a task that has been queued
 +     * for execution.
 +     *
 +     * @see scheduleCustomTask
 +     * @since 4.4
 +     */
 +    enum SchedulePriority {
 +        Prepend,            ///< The task will be executed as soon as the current \
task has finished.  +        AfterChangeReplay,  ///< The task is scheduled after the \
last ChangeReplay task in the queue  +        Append              ///< The task will \
be executed after all tasks currently in the queue are finished  +    };
 +
 +    /**
 +     * Schedules a custom task in the internal scheduler. It will be queued with
 +     * all other tasks such as change replays and retrieval requests and eventually
 +     * executed by calling the specified method. With the priority parameter the
 +     * time of execution of the Task can be influenced. @see SchedulePriority
 +     * @param receiver The object the slot should be called on.
 +     * @param method The name of the method (and only the name, not signature, not \
SLOT(...) macro),  +     * that should be called to execute this task. The method has \
to be a slot and take a QVariant as  +     * argument.
 +     * @param argument A QVariant argument passed to the method specified above. \
Use this to pass task  +     * parameters.
 +     * @param priority Priority of the task. Use this to influence the position in
 +     * the execution queue.
 +     * @since 4.4
 +     */
 +    void scheduleCustomTask(QObject *receiver, const char *method, const QVariant \
&argument, SchedulePriority priority = Append);  +
 +    /**
 +     * Indicate that the current task is finished. Use this method from the slot \
called via scheduleCustomTaks().  +     * As with all the other callbacks, make sure \
to either call taskDone() or cancelTask()/deferTask() on all  +     * exit paths, \
otherwise the resource will hang.  +     * @since 4.4
 +     */
 +    void taskDone();
 +
 +    /**
 +     * Dump the contents of the current ChangeReplay
 +     * @since 4.8.1
 +     */
 +    QString dumpNotificationListToString() const;
 +
 +    /**
 +     *  Dumps memory usage information to stdout.
 +     *  For now it outputs the result of glibc's mallinfo().
 +     *  This is useful to check if your memory problems are due to poor management \
by glibc.  +     *  Look for a high value on fsmblks when interpreting results.
 +     *  man mallinfo for more details.
 +     *  @since 4.11
 +     */
 +    void dumpMemoryInfo() const;
 +
 +    /**
 +     *  Returns a string with memory usage information.
 +     *  @see dumpMemoryInfo()
 +     *
 +     *  @since 4.11
 +     */
 +    QString dumpMemoryInfoToString() const;
 +
 +    /**
 +     * Dump the state of the scheduler
 +     * @since 4.8.1
 +     */
 +    QString dumpSchedulerToString() const;
 +
 +private:
 +    static QString parseArguments(int, char **);
 +    static int init(ResourceBase *r);
 +
 +    // dbus resource interface
 +    friend class ::Akonadi__ResourceAdaptor;
 +
 +    bool requestItemDelivery(qint64 uid, const QString &remoteId, const QString \
&mimeType, const QStringList &parts);  +
 +    QString requestItemDeliveryV2(qint64 uid, const QString &remoteId, const \
QString &mimeType, const QStringList &parts);  +
 +private:
 +    Q_DECLARE_PRIVATE(ResourceBase)
 +
 +    Q_PRIVATE_SLOT(d_func(), void slotAbortRequested())
 +    Q_PRIVATE_SLOT(d_func(), void slotDeliveryDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotCollectionSyncDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotDeleteResourceCollection())
 +    Q_PRIVATE_SLOT(d_func(), void slotDeleteResourceCollectionDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotCollectionDeletionDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotInvalidateCache(const Akonadi::Collection &))
 +    Q_PRIVATE_SLOT(d_func(), void slotLocalListDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotSynchronizeCollection(const \
Akonadi::Collection &))  +    Q_PRIVATE_SLOT(d_func(), void \
slotCollectionListDone(KJob *))  +    Q_PRIVATE_SLOT(d_func(), void \
slotSynchronizeCollectionAttributes(const Akonadi::Collection &))  +    \
Q_PRIVATE_SLOT(d_func(), void slotCollectionListForAttributesDone(KJob *))  +    \
Q_PRIVATE_SLOT(d_func(), void slotCollectionAttributesSyncDone(KJob *))  +    \
Q_PRIVATE_SLOT(d_func(), void slotItemSyncDone(KJob *))  +    \
Q_PRIVATE_SLOT(d_func(), void slotPercent(KJob *, unsigned long))  +    \
Q_PRIVATE_SLOT(d_func(), void slotDelayedEmitProgress())  +    \
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &item))  + \
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrievalResult(KJob *))  +    \
Q_PRIVATE_SLOT(d_func(), void changeCommittedResult(KJob *))  +    \
Q_PRIVATE_SLOT(d_func(), void slotSessionReconnected())  +    \
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplay(RecursiveMover *))  +    \
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplayResult(KJob *))  +};
 +
 +}
 +
 +#ifndef AKONADI_RESOURCE_MAIN
 +/**
 + * Convenience Macro for the most common main() function for Akonadi resources.
 + */
 +#define AKONADI_RESOURCE_MAIN( resourceClass )                       \
 +  int main( int argc, char **argv )                                  \
 +  {                                                                  \
 +    return Akonadi::ResourceBase::init<resourceClass>( argc, argv ); \
 +  }
 +#endif
 +
 +#endif
diff --cc akonadi/src/core/itemsync.cpp
index dd7d3b6,0000000..1ab71f4
mode 100644,000000..100644
--- a/akonadi/src/core/itemsync.cpp
+++ b/akonadi/src/core/itemsync.cpp
@@@ -1,558 -1,0 +1,679 @@@
 +/*
 +    Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
 +    Copyright (c) 2007 Volker Krause <vkrause@kde.org>
++    Copyright (c) 2014 Christian Mollekopf <mollekopf@kolabsys.com>
 +
 +    This library is free software; you can redistribute it and/or modify it
 +    under the terms of the GNU Library General Public License as published by
 +    the Free Software Foundation; either version 2 of the License, or (at your
 +    option) any later version.
 +
 +    This library is distributed in the hope that it will be useful, but WITHOUT
 +    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 +    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
 +    License for more details.
 +
 +    You should have received a copy of the GNU Library General Public License
 +    along with this library; see the file COPYING.LIB.  If not, write to the
 +    Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 +    02110-1301, USA.
 +*/
 +
 +#include "itemsync.h"
 +
 +#include "job_p.h"
 +#include "collection.h"
 +#include "item.h"
 +#include "item_p.h"
 +#include "itemcreatejob.h"
 +#include "itemdeletejob.h"
 +#include "itemfetchjob.h"
 +#include "itemmodifyjob.h"
 +#include "transactionsequence.h"
 +#include "itemfetchscope.h"
 +
 +#include <qdebug.h>
 +
 +#include <QtCore/QStringList>
 +
 +using namespace Akonadi;
 +
 +/**
 + * @internal
 + */
 +class Akonadi::ItemSyncPrivate : public JobPrivate
 +{
 +public:
 +    ItemSyncPrivate(ItemSync *parent)
 +        : JobPrivate(parent)
 +        , mTransactionMode(ItemSync::SingleTransaction)
 +        , mCurrentTransaction(0)
 +        , mTransactionJobs(0)
 +        , mPendingJobs(0)
 +        , mProgress(0)
 +        , mTotalItems(-1)
 +        , mTotalItemsProcessed(0)
 +        , mStreaming(false)
 +        , mIncremental(false)
-         , mLocalListDone(false)
 +        , mDeliveryDone(false)
 +        , mFinished(false)
-         , mLocalListStarted( false )
++        , mFullListingDone(false)
++        , mProcessingBatch(false)
++        , mBatchSize(10)
 +    {
 +        // we want to fetch all data by default
 +        mFetchScope.fetchFullPayload();
 +        mFetchScope.fetchAllAttributes();
 +    }
 +
 +    void createLocalItem(const Item &item);
++    void modifyLocalItem(const Item &remoteItem, Akonadi::Item::Id localId);
 +    void checkDone();
++    void slotItemsReceived(const Item::List &items);
 +    void slotLocalListDone(KJob *job);
++    void slotLocalFetchDone(KJob *job);
 +    void slotLocalDeleteDone(KJob *);
 +    void slotLocalChangeDone(KJob *job);
 +    void execute();
 +    void processItems();
++    void processBatch();
 +    void deleteItems(const Item::List &items);
 +    void slotTransactionResult(KJob *job);
++    void requestTransaction();
 +    Job *subjobParent() const;
 +    void fetchLocalItems();
 +    QString jobDebuggingString() const /*Q_DECL_OVERRIDE*/;
++    bool allProcessed() const;
 +
 +    Q_DECLARE_PUBLIC(ItemSync)
 +    Collection mSyncCollection;
-     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
-     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
-     QSet<Akonadi::Item> mUnprocessedLocalItems;
++    QSet<Akonadi::Item::Id> mUnprocessedLocalIds;
++    QHash<QString, Akonadi::Item::Id> mLocalIdByRid;
 +
 +    ItemSync::TransactionMode mTransactionMode;
 +    TransactionSequence *mCurrentTransaction;
 +    int mTransactionJobs;
 +
 +    // fetch scope for initial item listing
 +    ItemFetchScope mFetchScope;
 +
-     // remote items
-     Akonadi::Item::List mRemoteItems;
- 
-     // removed remote items
-     Item::List mRemovedRemoteItems;
++    Akonadi::Item::List mRemoteItemQueue;
++    Akonadi::Item::List mRemovedRemoteItemQueue;
++    Akonadi::Item::List mCurrentBatchRemoteItems;
++    Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
 +
 +    // create counter
 +    int mPendingJobs;
 +    int mProgress;
 +    int mTotalItems;
 +    int mTotalItemsProcessed;
 +
 +    bool mStreaming;
 +    bool mIncremental;
-     bool mLocalListDone;
 +    bool mDeliveryDone;
 +    bool mFinished;
-     bool mLocalListStarted;
++    bool mFullListingDone;
++    bool mProcessingBatch;
++
++    int mBatchSize;
 +};
 +
 +void ItemSyncPrivate::createLocalItem(const Item &item)
 +{
 +    Q_Q(ItemSync);
 +    // don't try to do anything in error state
 +    if (q->error()) {
 +        return;
 +    }
 +    mPendingJobs++;
 +    ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, \
subjobParent());  +    q->connect(create, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*)));  +}
 +
++void ItemSyncPrivate::modifyLocalItem(const Item &remoteItem, Akonadi::Item::Id \
localId) ++{
++    Q_Q(ItemSync);
++    // don't try to do anything in error state
++    if (q->error()) {
++        return;
++    }
++
++    //we fetch the local item to check if a modification is required and to make \
sure we have all parts ++    Akonadi::ItemFetchJob *fetchJob = new \
Akonadi::ItemFetchJob(Akonadi::Item(localId), subjobParent()); ++    \
fetchJob->setFetchScope(mFetchScope); ++    \
fetchJob->fetchScope().setCacheOnly(true); ++    \
fetchJob->setDeliveryOption(ItemFetchJob::ItemGetter); ++    q->connect(fetchJob, \
SIGNAL(result(KJob*)), q, SLOT(slotLocalFetchDone(KJob*))); ++    \
fetchJob->setProperty("remoteItem", QVariant::fromValue(remoteItem)); ++    \
mPendingJobs++; ++}
++
++void ItemSyncPrivate::slotLocalFetchDone(KJob *job)
++{
++    Q_Q(ItemSync);
++    mPendingJobs--;
++    if (job->error()) {
++        kWarning() << job->errorString();
++        checkDone();
++        return;
++    }
++    Akonadi::ItemFetchJob *fetchJob = static_cast<Akonadi::ItemFetchJob*>(job);
++    Akonadi::Item remoteItem = \
fetchJob->property("remoteItem").value<Akonadi::Item>(); ++    if \
(fetchJob->items().isEmpty()) { ++        kWarning() << "Failed to fetch local item: \
" << remoteItem.remoteId() << remoteItem.gid(); ++        checkDone();
++        return;
++    }
++    const Akonadi::Item localItem = fetchJob->items().first();
++    if (q->updateItem(localItem, remoteItem)) {
++        remoteItem.setId(localItem.id());
++        remoteItem.setRevision(localItem.revision());
++        remoteItem.setSize(localItem.size());
++        remoteItem.setRemoteId(localItem.remoteId());    // in case someone clears \
remoteId by accident ++        ItemModifyJob *mod = new ItemModifyJob(remoteItem, \
subjobParent()); ++        mod->disableRevisionCheck();
++        q->connect(mod, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*))); ++        mPendingJobs++;
++    } else {
++        mProgress++;
++    }
++    checkDone();
++}
++
++bool ItemSyncPrivate::allProcessed() const
++{
++    return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && \
mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && \
mCurrentBatchRemovedRemoteItems.isEmpty(); ++}
++
 +void ItemSyncPrivate::checkDone()
 +{
 +    Q_Q(ItemSync);
 +    q->setProcessedAmount(KJob::Bytes, mProgress);
-     if (mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0) {
++    if (mPendingJobs > 0) {
++        return;
++    }
++
++    if (mTransactionJobs > 0) {
++        //Commit the current transaction if we're in batch processing mode or done
++        if ((mTransactionMode == ItemSync::MultipleTransactions || mDeliveryDone) \
&& mCurrentTransaction) { ++            mCurrentTransaction->commit();
++            mCurrentTransaction = 0;
++        }
 +        return;
 +    }
++    mProcessingBatch = false;
++    if (!mRemoteItemQueue.isEmpty()) {
++        execute();
++        //We don't have enough items, request more
++        if (!mProcessingBatch) {
++            q->emit readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
++        }
++        return;
++    }
++    q->emit readyForNextBatch(mBatchSize);
 +
-     if (!mFinished) {
++    if (allProcessed() && !mFinished) {
 +        // prevent double result emission, can happen since checkDone() is called \
from all over the place  +        mFinished = true;
 +        q->emitResult();
 +    }
 +}
 +
 +ItemSync::ItemSync(const Collection &collection, QObject *parent)
 +    : Job(new ItemSyncPrivate(this), parent)
 +{
 +    Q_D(ItemSync);
 +    d->mSyncCollection = collection;
 +}
 +
 +ItemSync::~ItemSync()
 +{
 +}
 +
 +void ItemSync::setFullSyncItems(const Item::List &items)
 +{
++  /*
++   * We received a list of items from the server:
++   * * fetch all local id's + rid's only
++   * * check each full sync item wether it's locally available
++   * * if it is modify the item
++   * * if it's not create it
++   * * delete all superfluous items
++   */
 +    Q_D(ItemSync);
 +    Q_ASSERT(!d->mIncremental);
 +    if (!d->mStreaming) {
 +        d->mDeliveryDone = true;
 +    }
-     d->mRemoteItems += items;
++    d->mRemoteItemQueue += items;
 +    d->mTotalItemsProcessed += items.count();
 +    qDebug() << "Received: " << items.count() << "In total: " << \
                d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
-     setTotalAmount(KJob::Bytes, d->mTotalItemsProcessed);
 +    if (d->mTotalItemsProcessed == d->mTotalItems) {
 +        d->mDeliveryDone = true;
 +    }
 +    d->execute();
 +}
 +
 +void ItemSync::setTotalItems(int amount)
 +{
 +    Q_D(ItemSync);
 +    Q_ASSERT(!d->mIncremental);
 +    Q_ASSERT(amount >= 0);
 +    setStreamingEnabled(true);
 +    qDebug() << amount;
 +    d->mTotalItems = amount;
 +    setTotalAmount(KJob::Bytes, amount);
 +    if (d->mTotalItems == 0) {
 +        d->mDeliveryDone = true;
 +        d->execute();
 +    }
 +}
 +
 +void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const \
Item::List &removedItems)  +{
++  /*
++   * We received an incremental listing of items:
++   * * for each changed item:
++   * ** If locally available => modify
++   * ** else => create
++   * * removed items can be removed right away
++   */
 +    Q_D(ItemSync);
 +    d->mIncremental = true;
 +    if (!d->mStreaming) {
 +        d->mDeliveryDone = true;
 +    }
-     d->mRemoteItems += changedItems;
-     d->mRemovedRemoteItems += removedItems;
++    d->mRemoteItemQueue += changedItems;
++    d->mRemovedRemoteItemQueue += removedItems;
 +    d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
-     setTotalAmount(KJob::Bytes, d->mTotalItemsProcessed);
++    kDebug() << "Received: " << changedItems.count() << "Removed: " << \
removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << \
d->mTotalItems;  +    if (d->mTotalItemsProcessed == d->mTotalItems) {
 +        d->mDeliveryDone = true;
 +    }
 +    d->execute();
 +}
 +
 +void ItemSync::setFetchScope(ItemFetchScope &fetchScope)
 +{
 +    Q_D(ItemSync);
 +    d->mFetchScope = fetchScope;
 +}
 +
 +ItemFetchScope &ItemSync::fetchScope()
 +{
 +    Q_D(ItemSync);
 +    return d->mFetchScope;
 +}
 +
 +void ItemSync::doStart()
 +{
 +}
 +
 +bool ItemSync::updateItem(const Item &storedItem, Item &newItem)
 +{
 +    Q_D(ItemSync);
 +    // we are in error state, better not change anything at all anymore
 +    if (error()) {
 +        return false;
 +    }
 +
 +    /*
 +     * We know that this item has changed (as it is part of the
 +     * incremental changed list), so we just put it into the
 +     * storage.
 +     */
 +    if (d->mIncremental) {
 +        return true;
 +    }
 +
 +    if (newItem.d_func()->mClearPayload) {
 +        return true;
 +    }
 +
 +    // Check whether the remote revisions differ
 +    if (storedItem.remoteRevision() != newItem.remoteRevision()) {
 +        return true;
 +    }
 +
 +    // Check whether the flags differ
 +    if (storedItem.flags() != newItem.flags()) {
 +        qDebug() << "Stored flags "  << storedItem.flags()
 +                 << "new flags " << newItem.flags();
 +        return true;
 +    }
 +
 +    // Check whether the new item contains unknown parts
 +    QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
 +    missingParts.subtract(storedItem.loadedPayloadParts());
 +    if (!missingParts.isEmpty()) {
 +        return true;
 +    }
 +
 +    // ### FIXME SLOW!!!
 +    // If the available part identifiers don't differ, check
 +    // whether the content of the payload differs
 +    if (newItem.hasPayload()
 +        && storedItem.payloadData() != newItem.payloadData()) {
 +        return true;
 +     }
 +
 +    // check if remote attributes have been changed
 +    foreach (Attribute *attr, newItem.attributes()) {
 +        if (!storedItem.hasAttribute(attr->type())) {
 +            return true;
 +        }
 +        if (attr->serialized() != storedItem.attribute(attr->type())->serialized()) \
{  +            return true;
 +        }
 +    }
 +
 +    return false;
 +}
 +
 +void ItemSyncPrivate::fetchLocalItems()
 +{
 +    Q_Q( ItemSync );
-     if (mLocalListStarted) {
-         return;
-     }
-     mLocalListStarted = true;
 +    ItemFetchJob* job;
-     if ( mIncremental ) {
-         if ( mRemoteItems.isEmpty() ) {
++    if (mIncremental) {
++        //Try fetching the items so we have their id and know if they're available
++        const Akonadi::Item::List itemsToFetch = mCurrentBatchRemoteItems + \
mCurrentBatchRemovedRemoteItems; ++        if (itemsToFetch.isEmpty()) {
 +            // The fetch job produces an error with an empty set
-             mLocalListDone = true;
-             execute();
++            processBatch();
 +            return;
 +        }
 +        // We need to fetch the items only to detect if they are new or modified
-         job = new ItemFetchJob( mRemoteItems, q );
-         job->setFetchScope( mFetchScope );
-         job->setCollection( mSyncCollection );
++        job = new ItemFetchJob(itemsToFetch, q);
++        job->fetchScope().setFetchRemoteIdentification(true);
++        job->fetchScope().setFetchModificationTime(false);
++        job->setCollection(mSyncCollection);
++        job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
 +        // We use this to check if items are available locally, so errors are \
                inevitable
-         job->fetchScope().setIgnoreRetrievalErrors( true );
++        job->fetchScope().setIgnoreRetrievalErrors(true);
++        QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, \
SLOT(slotItemsReceived(Akonadi::Item::List)));  +    } else {
-         job = new ItemFetchJob( mSyncCollection, q );
-         job->setFetchScope( mFetchScope );
++        if (mFullListingDone) {
++            processBatch();
++            return;
++        }
++        //Otherwise we'll remove the created items again during the second run
++        mFullListingDone = true;
++        job = new ItemFetchJob(mSyncCollection, q);
++        job->fetchScope().setFetchRemoteIdentification(true);
++        job->fetchScope().setFetchModificationTime(false);
++        job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
++        QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, \
SLOT(slotItemsReceived(Akonadi::Item::List)));  +    }
 +
 +    // we only can fetch parts already in the cache, otherwise this will deadlock
-     job->fetchScope().setCacheOnly( true );
++    job->fetchScope().setCacheOnly(true);
 +
-     QObject::connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)) \
); ++    QObject::connect(job, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalListDone(KJob*)));  +}
 +
- void ItemSyncPrivate::slotLocalListDone(KJob *job)
++void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
 +{
-     if (!job->error()) {
-         const Item::List list = static_cast<ItemFetchJob *>(job)->items();
-         foreach (const Item &item, list) {
-             if (item.remoteId().isEmpty()) {
-                 continue;
-             }
-             mLocalItemsById.insert(item.id(), item);
-             mLocalItemsByRemoteId.insert(item.remoteId(), item);
-             mUnprocessedLocalItems.insert(item);
++    foreach (const Akonadi::Item &item, items) {
++        //Don't delete items that have not yet been synchronized
++        if (item.remoteId().isEmpty()) {
++            continue;
++        }
++        if (mLocalIdByRid.contains(item.remoteId())) {
++            kWarning() << "Found multiple items with the same rid : " << \
item.remoteId() << item.id(); ++        } else {
++            mLocalIdByRid.insert(item.remoteId(), item.id());
++        }
++        if (!mIncremental) {
++            mUnprocessedLocalIds << item.id();
 +        }
 +    }
++}
 +
-     mLocalListDone = true;
-     execute();
++void ItemSyncPrivate::slotLocalListDone(KJob *job)
++{
++    if (job->error()) {
++        kWarning() << job->errorString();
++    }
++    processBatch();
 +}
 +
 +QString ItemSyncPrivate::jobDebuggingString() const /*Q_DECL_OVERRIDE*/
 +{
 +  // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set \
after the job  +  // started, so this requires passing jobDebuggingString to \
jobEnded().  +  return QString::fromLatin1("Collection %1 \
(%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());  +}
 +
 +void ItemSyncPrivate::execute()
 +{
 +    Q_Q(ItemSync);
-     if (!mLocalListDone) {
-         // Start fetching local items only once the delivery is done for \
                incremental fetch,
-         // so we can fetch only the required items
-         if (mDeliveryDone || !mIncremental) {
-             fetchLocalItems();
++    //shouldn't happen
++    if (mFinished) {
++        kWarning() << "Call to execute() on finished job.";
++        Q_ASSERT(false);
++        return;
++    }
++    if (mTransactionJobs > 0) {
++        kWarning() << "transaction still in progress " << mProcessingBatch;
++        return;
++    }
++    //not doing anything, start processing
++    if (!mProcessingBatch) {
++        if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
++            //we have a new batch to process
++            const int num = qMin(mBatchSize, mRemoteItemQueue.size());
++            for (int i = 0; i < num; i++) {
++                mCurrentBatchRemoteItems << mRemoteItemQueue.takeFirst();
++            }
++            mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
++            mRemovedRemoteItemQueue.clear();
++        } else {
++            //nothing to do, let's wait for more data
++            return;
 +        }
++        mProcessingBatch = true;
++        fetchLocalItems();
 +        return;
 +    }
++    checkDone();
++}
 +
-     // early exit to avoid unnecessary TransactionSequence creation in \
                MultipleTransactions mode
-     // TODO: do the transaction handling in a nicer way instead, only creating \
                TransactionSequences when really needed
-     if (!mDeliveryDone && mRemoteItems.isEmpty()) {
++//process the current batch of items
++void ItemSyncPrivate::processBatch()
++{
++    Q_Q(ItemSync);
++    if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
 +        return;
 +    }
 +
-     if ((mTransactionMode == ItemSync::SingleTransaction && !mCurrentTransaction) \
                || mTransactionMode == ItemSync::MultipleTransactions) {
-         ++mTransactionJobs;
-         mCurrentTransaction = new TransactionSequence(q);
-         mCurrentTransaction->setAutomaticCommittingEnabled(false);
-         QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, \
                SLOT(slotTransactionResult(KJob*)));
-     }
++    //request a transaction, there are items that require processing
++    requestTransaction();
 +
 +    processItems();
-     if (!mDeliveryDone) {
-         if (mTransactionMode == ItemSync::MultipleTransactions && \
                mCurrentTransaction) {
-             mCurrentTransaction->commit();
-             mCurrentTransaction = 0;
-         }
-         return;
-     }
 +
 +    // removed
-     if (!mIncremental) {
-         mRemovedRemoteItems = mUnprocessedLocalItems.toList();
-         mUnprocessedLocalItems.clear();
++    Akonadi::Item::List itemsToDelete;
++    if (!mIncremental && allProcessed()) {
++        //the full listing is done and we know which items to remove
++        foreach (Akonadi::Item::Id id, mUnprocessedLocalIds) {
++            itemsToDelete << Akonadi::Item(id);
++        }
++        mUnprocessedLocalIds.clear();
++    } else {
++        foreach (const Akonadi::Item &removedItem, mCurrentBatchRemovedRemoteItems) \
{ ++            if (!mLocalIdByRid.contains(removedItem.remoteId())) {
++                kWarning() << "cannot remove item because it's not available \
locally. RID: " << removedItem.remoteId(); ++                continue;
++            }
++            itemsToDelete << \
Akonadi::Item(mLocalIdByRid.value(removedItem.remoteId())); ++        }
++        mCurrentBatchRemovedRemoteItems.clear();
 +    }
++    deleteItems(itemsToDelete);
 +
-     deleteItems(mRemovedRemoteItems);
-     mLocalItemsById.clear();
-     mLocalItemsByRemoteId.clear();
-     mRemovedRemoteItems.clear();
- 
-     if (mCurrentTransaction) {
-         mCurrentTransaction->commit();
-         mCurrentTransaction = 0;
++    if (mIncremental) {
++        //no longer required, we processed all items of the current batch
++        mLocalIdByRid.clear();
 +    }
- 
 +    checkDone();
 +}
 +
 +void ItemSyncPrivate::processItems()
 +{
 +    Q_Q(ItemSync);
 +    // added / updated
-     foreach (Item remoteItem, mRemoteItems) {    //krazy:exclude=foreach non-const \
                is needed here
- #ifndef NDEBUG
++    foreach (const Item &remoteItem, mCurrentBatchRemoteItems) {
 +        if (remoteItem.remoteId().isEmpty()) {
 +            qWarning() << "Item " << remoteItem.id() << " does not have a remote \
                identifier";
-         }
- #endif
- 
-         Item localItem = mLocalItemsById.value(remoteItem.id());
-         if (!localItem.isValid()) {
-             localItem = mLocalItemsByRemoteId.value(remoteItem.remoteId());
-         }
-         mUnprocessedLocalItems.remove(localItem);
-         // missing locally
-         if (!localItem.isValid()) {
-             createLocalItem(remoteItem);
 +            continue;
 +        }
 +
-         if (q->updateItem(localItem, remoteItem)) {
-             mPendingJobs++;
- 
-             remoteItem.setId(localItem.id());
-             remoteItem.setRevision(localItem.revision());
-             remoteItem.setSize(localItem.size());
-             remoteItem.setRemoteId(localItem.remoteId());    // in case someone \
                clears remoteId by accident
-             ItemModifyJob *mod = new ItemModifyJob(remoteItem, subjobParent());
-             mod->disableRevisionCheck();
-             q->connect(mod, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*))); ++        //TODO also check by id and gid
++        //Locally available
++        if (mLocalIdByRid.contains(remoteItem.remoteId())) {
++            const Akonadi::Item::Id localId = \
mLocalIdByRid.value(remoteItem.remoteId()); ++            if (!mIncremental) {
++                mUnprocessedLocalIds.remove(localId);
++            }
++            modifyLocalItem(remoteItem, localId);
 +        } else {
-             mProgress++;
++            createLocalItem(remoteItem);
 +        }
 +    }
-     mRemoteItems.clear();
++    mCurrentBatchRemoteItems.clear();
 +}
 +
- void ItemSyncPrivate::deleteItems(const Item::List &items)
++void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
 +{
 +    Q_Q(ItemSync);
 +    // if in error state, better not change anything anymore
 +    if (q->error()) {
 +        return;
 +    }
 +
-     Item::List itemsToDelete;
-     foreach (const Item &item, items) {
-         Item delItem(item);
-         if (!mIncremental) {
-             if (!item.isValid()) {
-                 delItem = mLocalItemsByRemoteId.value(item.remoteId());
-             }
- 
-             if (!delItem.isValid()) {
- #ifndef NDEBUG
-             qWarning() << "Delete item (remoteeId=" << item.remoteId()
-                         << "mimeType=" << item.mimeType()
-                         << ") does not have a valid UID and no item with that \
                remote ID exists either";
- #endif
-                 continue;
-             }
-         }
- 
-         if (delItem.remoteId().isEmpty()) {
-             // don't attempt to remove items that never were written to the backend
-             continue;
-         }
- 
-         itemsToDelete.append(delItem);
++    if (itemsToDelete.isEmpty()) {
++        return;
 +    }
 +
-     if (!itemsToDelete.isEmpty()) {
-         mPendingJobs++;
-         ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
-         q->connect(job, SIGNAL(result(KJob*)), q, \
                SLOT(slotLocalDeleteDone(KJob*)));
- 
-         // It can happen that the groupware servers report us deleted items
-         // twice, in this case this item delete job will fail on the second try.
-         // To avoid a rollback of the complete transaction we gracefully allow the \
                job
-         // to fail :)
-         TransactionSequence *transaction = qobject_cast<TransactionSequence \
                *>(subjobParent());
-         if (transaction) {
-             transaction->setIgnoreJobFailure(job);
-         }
++    mPendingJobs++;
++    ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
++    q->connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)));
++
++    // It can happen that the groupware servers report us deleted items
++    // twice, in this case this item delete job will fail on the second try.
++    // To avoid a rollback of the complete transaction we gracefully allow the job
++    // to fail :)
++    TransactionSequence *transaction = qobject_cast<TransactionSequence \
*>(subjobParent()); ++    if (transaction) {
++        transaction->setIgnoreJobFailure(job);
 +    }
 +}
 +
 +void ItemSyncPrivate::slotLocalDeleteDone(KJob *)
 +{
 +    mPendingJobs--;
 +    mProgress++;
 +
 +    checkDone();
 +}
 +
 +void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
 +{
 +    Q_UNUSED(job);
 +    mPendingJobs--;
 +    mProgress++;
 +
 +    checkDone();
 +}
 +
 +void ItemSyncPrivate::slotTransactionResult(KJob *job)
 +{
 +    --mTransactionJobs;
 +    if (mCurrentTransaction == job) {
 +        mCurrentTransaction = 0;
 +    }
 +
 +    checkDone();
 +}
 +
++void ItemSyncPrivate::requestTransaction()
++{
++    Q_Q(ItemSync);
++    //we never want parallel transactions, single transaction just makes one big \
transaction, and multi transaction uses multiple transaction sequentially ++    if \
(!mCurrentTransaction) { ++        ++mTransactionJobs;
++        mCurrentTransaction = new TransactionSequence(q);
++        mCurrentTransaction->setAutomaticCommittingEnabled(false);
++        QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, \
SLOT(slotTransactionResult(KJob*))); ++    }
++}
++
 +Job *ItemSyncPrivate::subjobParent() const
 +{
 +    Q_Q(const ItemSync);
 +    if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
 +        return mCurrentTransaction;
 +    }
 +    return const_cast<ItemSync *>(q);
 +}
 +
 +void ItemSync::setStreamingEnabled(bool enable)
 +{
 +    Q_D(ItemSync);
 +    d->mStreaming = enable;
 +}
 +
 +void ItemSync::deliveryDone()
 +{
 +    Q_D(ItemSync);
 +    Q_ASSERT(d->mStreaming);
 +    d->mDeliveryDone = true;
 +    d->execute();
 +}
 +
 +void ItemSync::slotResult(KJob *job)
 +{
 +    if (job->error()) {
 +        // pretent there were no errors
 +        Akonadi::Job::removeSubjob(job);
 +        // propagate the first error we got but continue, we might still be fed \
with stuff from a resource  +        if (!error()) {
 +            setError(job->error());
 +            setErrorText(job->errorText());
 +        }
 +    } else {
 +        Akonadi::Job::slotResult(job);
 +    }
 +}
 +
 +void ItemSync::rollback()
 +{
 +    Q_D(ItemSync);
 +    setError(UserCanceled);
 +    if (d->mCurrentTransaction) {
 +        d->mCurrentTransaction->rollback();
 +    }
 +    d->mDeliveryDone = true; // user wont deliver more data
 +    d->execute(); // end this in an ordered way, since we have an error set no real \
change will be done  +}
 +
 +void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
 +{
 +    Q_D(ItemSync);
 +    d->mTransactionMode = mode;
 +}
 +
++int ItemSync::batchSize() const
++{
++    Q_D(const ItemSync);
++    return d->mBatchSize;
++}
++
++void ItemSync::setBatchSize(int size)
++{
++    Q_D(ItemSync);
++    d->mBatchSize = size;
++}
++
++
 +#include "moc_itemsync.cpp"
diff --cc akonadi/src/core/itemsync.h
index ba3912e,0000000..dabba64
mode 100644,000000..100644
--- a/akonadi/src/core/itemsync.h
+++ b/akonadi/src/core/itemsync.h
@@@ -1,199 -1,0 +1,230 @@@
 +/*
 +    Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
 +
 +    This library is free software; you can redistribute it and/or modify it
 +    under the terms of the GNU Library General Public License as published by
 +    the Free Software Foundation; either version 2 of the License, or (at your
 +    option) any later version.
 +
 +    This library is distributed in the hope that it will be useful, but WITHOUT
 +    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 +    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
 +    License for more details.
 +
 +    You should have received a copy of the GNU Library General Public License
 +    along with this library; see the file COPYING.LIB.  If not, write to the
 +    Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 +    02110-1301, USA.
 +*/
 +
 +#ifndef AKONADI_ITEMSYNC_H
 +#define AKONADI_ITEMSYNC_H
 +
 +#include "akonadicore_export.h"
 +#include "item.h"
 +#include "job.h"
 +
 +namespace Akonadi {
 +
 +class Collection;
 +class ItemFetchScope;
 +class ItemSyncPrivate;
 +
 +/**
 + * @short Syncs between items known to a client (usually a resource) and the \
Akonadi storage.  + *
 + * Remote Id must only be set by the resource storing the item, other clients
 + * should leave it empty, since the resource responsible for the target collection
 + * will be notified about the addition and then create a suitable remote Id.
 + *
 + * There are two different forms of ItemSync usage:
 + * - Full-Sync: meaning the client provides all valid items, i.e. any item not
 + *   part of the list but currently stored in Akonadi will be removed
 + * - Incremental-Sync: meaning the client provides two lists, one for items which
 + *   are new or modified and one for items which should be removed. Any item not
 + *   part of either list but currently stored in Akonadi will not be changed.
 + *
 + * @note This is provided for convenience to implement "save all" like behavior,
 + *       however it is strongly recommended to use single item jobs whenever
 + *       possible, e.g. ItemCreateJob, ItemModifyJob and ItemDeleteJob
 + *
 + * @author Tobias Koenig <tokoe@kde.org>
 + */
 +class AKONADICORE_EXPORT ItemSync : public Job
 +{
 +    Q_OBJECT
 +
 +public:
 +    /**
 +     * Creates a new item synchronizer.
 +     *
 +     * @param collection The collection we are syncing.
 +     * @param parent The parent object.
 +     */
 +    explicit ItemSync(const Collection &collection, QObject *parent = 0);
 +
 +    /**
 +     * Destroys the item synchronizer.
 +     */
 +    ~ItemSync();
 +
 +    /**
 +     * Sets the full item list for the collection.
 +     *
 +     * Usually the result of a full item listing.
 +     *
 +     * @warning If the client using this is a resource, all items must have
 +     *          a valid remote identifier.
 +     *
 +     * @param items A list of items.
 +     */
 +    void setFullSyncItems(const Item::List &items);
 +
 +    /**
 +     * Set the amount of items which you are going to return in total
 +     * by using the setFullSyncItems() method.
 +     *
 +     * @param amount The amount of items in total.
 +     */
 +    void setTotalItems(int amount);
 +
 +    /**
 +      Enable item streaming. Item streaming means that the items delivered by \
setXItems() calls  +      are delivered in chunks and you manually indicate when all \
items have been delivered  +      by calling deliveryDone().
 +      @param enable @c true to enable item streaming
 +    */
 +    void setStreamingEnabled(bool enable);
 +
 +    /**
 +      Notify ItemSync that all remote items have been delivered.
 +      Only call this in streaming mode.
 +    */
 +    void deliveryDone();
 +
 +    /**
 +     * Sets the item lists for incrementally syncing the collection.
 +     *
 +     * Usually the result of an incremental remote item listing.
 +     *
 +     * @warning If the client using this is a resource, all items must have
 +     *          a valid remote identifier.
 +     *
 +     * @param changedItems A list of items added or changed by the client.
 +     * @param removedItems A list of items deleted by the client.
 +     */
 +    void setIncrementalSyncItems(const Item::List &changedItems,
 +                                 const Item::List &removedItems);
 +
 +    /**
 +     * Sets the item fetch scope.
 +     *
 +     * The ItemFetchScope controls how much of an item's data is fetched
 +     * from the server, e.g. whether to fetch the full item payload or
 +     * only meta data.
 +     *
 +     * @param fetchScope The new scope for item fetch operations.
 +     *
 +     * @see fetchScope()
 +     */
 +    void setFetchScope(ItemFetchScope &fetchScope);
 +
 +    /**
 +     * Returns the item fetch scope.
 +     *
 +     * Since this returns a reference it can be used to conveniently modify the
 +     * current scope in-place, i.e. by calling a method on the returned reference
 +     * without storing it in a local variable. See the ItemFetchScope documentation
 +     * for an example.
 +     *
 +     * @return a reference to the current item fetch scope
 +     *
 +     * @see setFetchScope() for replacing the current item fetch scope
 +     */
 +    ItemFetchScope &fetchScope();
 +
 +    /**
 +     * Aborts the sync process and rolls back all not yet committed transactions.
 +     * Use this if an external error occurred during the sync process (such as the
 +     * user canceling it).
 +     * @since 4.5
 +     */
 +    void rollback();
 +
 +    /**
 +     * Transaction mode used by ItemSync.
 +     * @since 4.6
 +     */
 +    enum TransactionMode {
 +        SingleTransaction, ///< Use a single transaction for the entire sync \
process (default), provides maximum consistency ("all or nothing") and best \
performance  +        MultipleTransactions, ///< Use one transaction per chunk of \
delivered items, good compromise between the other two when using streaming  +        \
NoTransaction ///< Use no transaction at all, provides highest responsiveness (might \
therefore feel faster even when actually taking slightly longer), no consistency \
guaranteed (can fail anywhere in the sync process)  +    };
 +
 +    /**
 +     * Set the transaction mode to use for this sync.
 +     * @note You must call this method before starting the sync, changes afterwards \
lead to undefined results.  +     * @param mode the transaction mode to use
 +     * @since 4.6
 +     */
 +    void setTransactionMode(TransactionMode mode);
 +
++    /**
++     * Minimum number of items required to start processing in streaming mode.
++     * When MultipleTransactions is used, one transaction per batch will be \
created. ++     *
++     * @see setBatchSize()
++     * @since 4.14
++     */
++    int batchSize() const;
++
++    /**
++     * Set the batch size.
++     *
++     * The default is 10.
++     *
++     * @note You must call this method before starting the sync, changes afterwards \
lead to undefined results. ++     * @see batchSize()
++     * @since 4.14
++     */
++    void setBatchSize(int);
++
++Q_SIGNALS:
++    /**
++     * Signals the resource that new items can be delivered.
++     * @param remainingBatchSize the number of items required to complete the batch \
(typically the same as batchSize()) ++     *
++     * @since 4.14
++     */
++    void readyForNextBatch(int remainingBatchSize);
++
 +protected:
 +    void doStart();
 +    void slotResult(KJob *job);
 +
 +    /**
 +     * Reimplement this method to customize the synchronization algorithm.
 +     * @param storedItem the item as it is now
 +     * @param newItem the item as it should be
 +     * You can update the @p newItem according to the @p storedItem before
 +     * it gets committed.
 +     */
 +    virtual bool updateItem(const Item &storedItem, Item &newItem);
 +
 +private:
 +    //@cond PRIVATE
 +    Q_DECLARE_PRIVATE(ItemSync)
 +    ItemSyncPrivate *dummy; // for BC. KF5 TODO: REMOVE.
 +
 +    Q_PRIVATE_SLOT(d_func(), void slotLocalListDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotLocalDeleteDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotLocalChangeDone(KJob *))
 +    Q_PRIVATE_SLOT(d_func(), void slotTransactionResult(KJob *))
++    Q_PRIVATE_SLOT(d_func(), void slotItemsReceived(const Akonadi::Item::List &))
++    Q_PRIVATE_SLOT(d_func(), void slotLocalFetchDone(KJob *))
 +    //@endcond
 +};
 +
 +}
 +
 +#endif


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic