[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: [kdepimlibs/frameworks] akonadi/src: Merge remote-tracking branch 'origin/master' into frameworks
From: Montel Laurent <montel () kde ! org>
Date: 2014-04-17 5:20:18
Message-ID: E1WaekE-0000AB-RL () scm ! kde ! org
[Download RAW message or body]
Git commit e7ddab6af088aac3cfb8e92a6781c45ef1377fa4 by Montel Laurent.
Committed on 17/04/2014 at 05:19.
Pushed by mlaurent into branch 'frameworks'.
Merge remote-tracking branch 'origin/master' into frameworks
Conflicts:
akonadi/src/core/itemsync.cpp
A +16 -0 akonadi/src/agentbase/resourcebase.cpp [License: LGPL (v2+)]
A +37 -1 akonadi/src/agentbase/resourcebase.h [License: LGPL (v2+)]
A +264 -143 akonadi/src/core/itemsync.cpp [License: LGPL (v2+)]
A +31 -0 akonadi/src/core/itemsync.h [License: LGPL (v2+)]
http://commits.kde.org/kdepimlibs/e7ddab6af088aac3cfb8e92a6781c45ef1377fa4
diff --cc akonadi/src/agentbase/resourcebase.cpp
index a2a8bd4,0000000..3dcb088
mode 100644,000000..100644
--- a/akonadi/src/agentbase/resourcebase.cpp
+++ b/akonadi/src/agentbase/resourcebase.cpp
@@@ -1,1246 -1,0 +1,1262 @@@
+/*
+ Copyright (c) 2006 Till Adam <adam@kde.org>
+ Copyright (c) 2007 Volker Krause <vkrause@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#include "resourcebase.h"
+#include "agentbase_p.h"
+
+#include "resourceadaptor.h"
+#include "collectiondeletejob.h"
+#include "collectionsync_p.h"
+#include "dbusconnectionpool.h"
+#include "itemsync.h"
+#include "akonadi_version.h"
+#include "resourcescheduler_p.h"
+#include "tracerinterface.h"
+#include <akonadi/private/xdgbasedirs_p.h>
+
+#include "changerecorder.h"
+#include "collectionfetchjob.h"
+#include "collectionfetchscope.h"
+#include "collectionmodifyjob.h"
+#include "invalidatecachejob_p.h"
+#include "itemfetchjob.h"
+#include "itemfetchscope.h"
+#include "itemmodifyjob.h"
+#include "itemmodifyjob_p.h"
+#include "session.h"
+#include "resourceselectjob_p.h"
+#include "monitor_p.h"
+#include "servermanager_p.h"
+#include "recursivemover_p.h"
+#include "tagmodifyjob.h"
+
+#include <kaboutdata.h>
+#include <kcmdlineargs.h>
+#include <qdebug.h>
+#include <klocalizedstring.h>
+#include <kglobal.h>
+
+#include <QtCore/QDebug>
+#include <QtCore/QDir>
+#include <QtCore/QHash>
+#include <QtCore/QSettings>
+#include <QtCore/QTimer>
+#include <QApplication>
+#include <QtDBus/QtDBus>
+
+using namespace Akonadi;
+
+class Akonadi::ResourceBasePrivate : public AgentBasePrivate
+{
+ Q_OBJECT
+ Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
+
+public:
+ ResourceBasePrivate(ResourceBase *parent)
+ : AgentBasePrivate(parent)
+ , scheduler(0)
+ , mItemSyncer(0)
+ , mItemSyncFetchScope(0)
+ , mItemTransactionMode(ItemSync::SingleTransaction)
+ , mCollectionSyncer(0)
+ , mHierarchicalRid(false)
+ , mUnemittedProgress(0)
+ , mAutomaticProgressReporting(true)
++ , mItemSyncBatchSize(10)
+ {
+ Internal::setClientType(Internal::Resource);
+ mStatusMessage = defaultReadyMessage();
+ mProgressEmissionCompressor.setInterval(1000);
+ mProgressEmissionCompressor.setSingleShot(true);
+ }
+
+ ~ResourceBasePrivate()
+ {
+ delete mItemSyncFetchScope;
+ }
+
+ Q_DECLARE_PUBLIC(ResourceBase)
+
+ void delayedInit()
+ {
+ const QString serviceId = \
ServerManager::agentServiceName(ServerManager::Resource, mId); + if \
(!DBusConnectionPool::threadConnection().registerService(serviceId)) { + \
QString reason = DBusConnectionPool::threadConnection().lastError().message(); + \
if (reason.isEmpty()) { + reason = QString::fromLatin1("this service \
is probably running already."); + }
+ qCritical() << "Unable to register service" << serviceId << "at D-Bus:" \
<< reason; +
+ if (QThread::currentThread() == QCoreApplication::instance()->thread()) \
{ + QCoreApplication::instance()->exit(1);
+ }
+
+ } else {
+ AgentBasePrivate::delayedInit();
+ }
+ }
+
+ virtual void changeProcessed()
+ {
+ if (m_recursiveMover) {
+ m_recursiveMover->changeProcessed();
+ QTimer::singleShot(0, m_recursiveMover, SLOT(replayNext()));
+ return;
+ }
+
+ mChangeRecorder->changeProcessed();
+ if (!mChangeRecorder->isEmpty()) {
+ scheduler->scheduleChangeReplay();
+ }
+ scheduler->taskDone();
+ }
+
+ void slotAbortRequested();
+
+ void slotDeliveryDone(KJob *job);
+ void slotCollectionSyncDone(KJob *job);
+ void slotLocalListDone(KJob *job);
+ void slotSynchronizeCollection(const Collection &col);
+ void slotCollectionListDone(KJob *job);
+ void slotSynchronizeCollectionAttributes(const Collection &col);
+ void slotCollectionListForAttributesDone(KJob *job);
+ void slotCollectionAttributesSyncDone(KJob *job);
+
+ void slotItemSyncDone(KJob *job);
+
+ void slotPercent(KJob *job, unsigned long percent);
+ void slotDelayedEmitProgress();
+ void slotDeleteResourceCollection();
+ void slotDeleteResourceCollectionDone(KJob *job);
+ void slotCollectionDeletionDone(KJob *job);
+
+ void slotInvalidateCache(const Akonadi::Collection &collection);
+
+ void slotPrepareItemRetrieval(const Akonadi::Item &item);
+ void slotPrepareItemRetrievalResult(KJob *job);
+
+ void changeCommittedResult(KJob *job);
+
+ void slotRecursiveMoveReplay(RecursiveMover *mover);
+ void slotRecursiveMoveReplayResult(KJob *job);
+
+ void slotSessionReconnected()
+ {
+ Q_Q(ResourceBase);
+
+ new ResourceSelectJob(q->identifier());
+ }
+
+ void createItemSyncInstanceIfMissing()
+ {
+ Q_Q(ResourceBase);
+ Q_ASSERT_X(scheduler->currentTask().type == \
ResourceScheduler::SyncCollection, + "createItemSyncInstance", \
"Calling items retrieval methods although no item retrieval is in progress"); + \
if (!mItemSyncer) { + mItemSyncer = new ItemSync(q->currentCollection());
+ mItemSyncer->setTransactionMode(mItemTransactionMode);
++ mItemSyncer->setBatchSize(mItemSyncBatchSize);
+ if (mItemSyncFetchScope) {
+ mItemSyncer->setFetchScope(*mItemSyncFetchScope);
+ }
+ mItemSyncer->setProperty("collection", \
QVariant::fromValue(q->currentCollection())); + connect(mItemSyncer, \
SIGNAL(percent(KJob*,ulong)), q, SLOT(slotPercent(KJob*,ulong))); + \
connect(mItemSyncer, SIGNAL(result(KJob*)), q, SLOT(slotItemSyncDone(KJob*))); ++ \
connect(mItemSyncer, SIGNAL(readyForNextBatch(int)), q, \
SIGNAL(retrieveNextItemSyncBatch(int))); + }
+ Q_ASSERT(mItemSyncer);
+ }
+
+public Q_SLOTS:
+ // Dump the state of the scheduler
+ Q_SCRIPTABLE QString dumpToString() const
+ {
+ Q_Q(const ResourceBase);
+ QString retVal;
+ QMetaObject::invokeMethod(const_cast<ResourceBase *>(q), \
"dumpResourceToString", Qt::DirectConnection, Q_RETURN_ARG(QString, retVal)); + \
return scheduler->dumpToString() + QLatin1Char('\n') + retVal; + }
+
+ Q_SCRIPTABLE void dump()
+ {
+ scheduler->dump();
+ }
+
+ Q_SCRIPTABLE void clear()
+ {
+ scheduler->clear();
+ }
+
+protected Q_SLOTS:
+ // reimplementations from AgentbBasePrivate, containing sanity checks that only \
apply to resources + // such as making sure that RIDs are present as well as \
translations of cross-resource moves + // TODO: we could possibly add recovery \
code for no-RID notifications by re-enquing those to the change recorder + // as \
the corresponding Add notifications, although that contains a risk of endless \
fail/retry loops +
+ void itemAdded(const Akonadi::Item &item, const Akonadi::Collection \
&collection) + {
+ if (collection.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::itemAdded(item, collection);
+ }
+
+ void itemChanged(const Akonadi::Item &item, const QSet< QByteArray > \
&partIdentifiers) + {
+ if (item.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::itemChanged(item, partIdentifiers);
+ }
+
+ void itemsFlagsChanged(const Item::List &items, const QSet< QByteArray > \
&addedFlags, + const QSet< QByteArray > &removedFlags)
+ {
+ if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ Item::List validItems;
+ foreach (const Akonadi::Item &item, items) {
+ if (!item.remoteId().isEmpty()) {
+ validItems << item;
+ }
+ }
+ if (validItems.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
+ }
+
+ void itemsTagsChanged(const Item::List &items, const QSet<Tag> &addedTags, \
const QSet<Tag> &removedTags) + {
+ if (addedTags.isEmpty() && removedTags.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ Item::List validItems;
+ foreach (const Akonadi::Item &item, items) {
+ if (!item.remoteId().isEmpty()) {
+ validItems << item;
+ }
+ }
+ if (validItems.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
+ }
+
+ // TODO move the move translation code from AgentBasePrivate here, it's wrong \
for agents + void itemMoved(const Akonadi::Item &item, const Akonadi::Collection \
&source, const Akonadi::Collection &destination) + {
+ if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || \
destination == source) { + changeProcessed();
+ return;
+ }
+ AgentBasePrivate::itemMoved(item, source, destination);
+ }
+
+ void itemsMoved(const Item::List &items, const Collection &source, const \
Collection &destination) + {
+ if (destination.remoteId().isEmpty() || destination == source) {
+ changeProcessed();
+ return;
+ }
+
+ Item::List validItems;
+ foreach (const Akonadi::Item &item, items) {
+ if (!item.remoteId().isEmpty()) {
+ validItems << item;
+ }
+ }
+ if (validItems.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::itemsMoved(validItems, source, destination);
+ }
+
+ void itemRemoved(const Akonadi::Item &item)
+ {
+ if (item.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::itemRemoved(item);
+ }
+
+ void itemsRemoved(const Item::List &items)
+ {
+ Item::List validItems;
+ foreach (const Akonadi::Item &item, items) {
+ if (!item.remoteId().isEmpty()) {
+ validItems << item;
+ }
+ }
+ if (validItems.isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::itemsRemoved(validItems);
+ }
+
+ void collectionAdded(const Akonadi::Collection &collection, const \
Akonadi::Collection &parent) + {
+ if (parent.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::collectionAdded(collection, parent);
+ }
+
+ void collectionChanged(const Akonadi::Collection &collection)
+ {
+ if (collection.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::collectionChanged(collection);
+ }
+
+ void collectionChanged(const Akonadi::Collection &collection, const QSet< \
QByteArray > &partIdentifiers) + {
+ if (collection.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::collectionChanged(collection, partIdentifiers);
+ }
+
+ void collectionMoved(const Akonadi::Collection &collection, const \
Akonadi::Collection &source, const Akonadi::Collection &destination) + {
+ // unknown destination or source == destination means we can't do/don't \
have to do anything + if (destination.remoteId().isEmpty() || source == \
destination) { + changeProcessed();
+ return;
+ }
+
+ // inter-resource moves, requires we know which resources the source and \
destination are in though + if (!source.resource().isEmpty() && \
!destination.resource().isEmpty() && source.resource() != destination.resource()) { \
+ if (source.resource() == q_ptr->identifier()) { // moved away from us \
+ AgentBasePrivate::collectionRemoved(collection); + } \
else if (destination.resource() == q_ptr->identifier()) { // moved to us + \
scheduler->taskDone(); // stop change replay for now + RecursiveMover \
*mover = new RecursiveMover(this); + mover->setCollection(collection, \
destination); + scheduler->scheduleMoveReplay(collection, mover);
+ }
+ return;
+ }
+
+ // intra-resource move, requires the moved collection to have a valid id \
though + if (collection.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ // intra-resource move, ie. something we can handle internally
+ AgentBasePrivate::collectionMoved(collection, source, destination);
+ }
+
+ void collectionRemoved(const Akonadi::Collection &collection)
+ {
+ if (collection.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+ AgentBasePrivate::collectionRemoved(collection);
+ }
+
+ void tagAdded(const Akonadi::Tag &tag)
+ {
+ if (!tag.isValid()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::tagAdded(tag);
+ }
+
+ void tagChanged(const Akonadi::Tag &tag)
+ {
+ if (tag.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::tagChanged(tag);
+ }
+
+ void tagRemoved(const Akonadi::Tag &tag)
+ {
+ if (tag.remoteId().isEmpty()) {
+ changeProcessed();
+ return;
+ }
+
+ AgentBasePrivate::tagRemoved(tag);
+ }
+
+public:
+ // synchronize states
+ Collection currentCollection;
+
+ ResourceScheduler *scheduler;
+ ItemSync *mItemSyncer;
+ ItemFetchScope *mItemSyncFetchScope;
+ ItemSync::TransactionMode mItemTransactionMode;
+ CollectionSync *mCollectionSyncer;
+ bool mHierarchicalRid;
+ QTimer mProgressEmissionCompressor;
+ int mUnemittedProgress;
+ QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
+ bool mAutomaticProgressReporting;
+ QPointer<RecursiveMover> m_recursiveMover;
++ int mItemSyncBatchSize;
+};
+
+ResourceBase::ResourceBase(const QString &id)
+ : AgentBase(new ResourceBasePrivate(this), id)
+{
+ Q_D(ResourceBase);
+
+ new Akonadi__ResourceAdaptor(this);
+
+ d->scheduler = new ResourceScheduler(this);
+
+ d->mChangeRecorder->setChangeRecordingEnabled(true);
+ d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal \
with this ourselves + connect(d->mChangeRecorder, SIGNAL(changesAdded()),
+ d->scheduler, SLOT(scheduleChangeReplay()));
+
+ d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
+ d->mChangeRecorder->fetchCollection(true);
+
+ connect(d->scheduler, SIGNAL(executeFullSync()),
+ SLOT(retrieveCollections()));
+ connect(d->scheduler, SIGNAL(executeCollectionTreeSync()),
+ SLOT(retrieveCollections()));
+ connect(d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)),
+ SLOT(slotSynchronizeCollection(Akonadi::Collection)));
+ connect(d->scheduler, \
SIGNAL(executeCollectionAttributesSync(Akonadi::Collection)), + \
SLOT(slotSynchronizeCollectionAttributes(Akonadi::Collection))); + \
connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)), + \
SLOT(slotPrepareItemRetrieval(Akonadi::Item))); + connect(d->scheduler, \
SIGNAL(executeResourceCollectionDeletion()), + \
SLOT(slotDeleteResourceCollection())); + connect(d->scheduler, \
SIGNAL(executeCacheInvalidation(Akonadi::Collection)), + \
SLOT(slotInvalidateCache(Akonadi::Collection))); + connect(d->scheduler, \
SIGNAL(status(int,QString)), + SIGNAL(status(int,QString)));
+ connect(d->scheduler, SIGNAL(executeChangeReplay()),
+ d->mChangeRecorder, SLOT(replayNext()));
+ connect(d->scheduler, SIGNAL(executeRecursiveMoveReplay(RecursiveMover*)),
+ SLOT(slotRecursiveMoveReplay(RecursiveMover*)));
+ connect(d->scheduler, SIGNAL(fullSyncComplete()), SIGNAL(synchronized()));
+ connect(d->scheduler, SIGNAL(collectionTreeSyncComplete()), \
SIGNAL(collectionTreeSynchronized())); + connect(d->mChangeRecorder, \
SIGNAL(nothingToReplay()), d->scheduler, SLOT(taskDone())); + \
connect(d->mChangeRecorder, SIGNAL(collectionRemoved(Akonadi::Collection)), + \
d->scheduler, SLOT(collectionRemoved(Akonadi::Collection))); + connect(this, \
SIGNAL(abortRequested()), this, SLOT(slotAbortRequested())); + connect(this, \
SIGNAL(synchronized()), d->scheduler, SLOT(taskDone())); + connect(this, \
SIGNAL(collectionTreeSynchronized()), d->scheduler, SLOT(taskDone())); + \
connect(this, SIGNAL(agentNameChanged(QString)), + this, \
SIGNAL(nameChanged(QString))); +
+ connect(&d->mProgressEmissionCompressor, SIGNAL(timeout()),
+ this, SLOT(slotDelayedEmitProgress()));
+
+ d->scheduler->setOnline(d->mOnline);
+ if (!d->mChangeRecorder->isEmpty()) {
+ d->scheduler->scheduleChangeReplay();
+ }
+
+ new ResourceSelectJob(identifier());
+
+ connect(d->mChangeRecorder->session(), SIGNAL(reconnected()), \
SLOT(slotSessionReconnected())); +}
+
+ResourceBase::~ResourceBase()
+{
+}
+
+void ResourceBase::synchronize()
+{
+ d_func()->scheduler->scheduleFullSync();
+}
+
+void ResourceBase::setName(const QString &name)
+{
+ AgentBase::setAgentName(name);
+}
+
+QString ResourceBase::name() const
+{
+ return AgentBase::agentName();
+}
+
+QString ResourceBase::parseArguments(int argc, char **argv)
+{
+ QString identifier;
+ if (argc < 3) {
+ qDebug() << "Not enough arguments passed...";
+ exit(1);
+ }
+
+ for (int i = 1; i < argc - 1; ++i) {
+ if (QLatin1String(argv[i]) == QLatin1String("--identifier")) {
+ identifier = QLatin1String(argv[i + 1]);
+ }
+ }
+
+ if (identifier.isEmpty()) {
+ qDebug() << "Identifier argument missing";
+ exit(1);
+ }
+
+ const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
+ // strip off full path and possible .exe suffix
+ const QByteArray catalog = fi.baseName().toLatin1();
+
+ KCmdLineArgs::init(argc, argv, \
ServerManager::addNamespace(identifier).toLatin1(), catalog, + \
ki18nc("@title application name", "Akonadi Resource"), \
AKONADILIBRARIES_VERSION_STRING, + ki18nc("@title application \
description", "Akonadi Resource")); +
+ KCmdLineOptions options;
+ options.add("identifier <argument>",
+ ki18nc("@label commandline option", "Resource identifier"));
+ KCmdLineArgs::addCmdLineOptions(options);
+
+ return identifier;
+}
+
+int ResourceBase::init(ResourceBase *r)
+{
+ QApplication::setQuitOnLastWindowClosed(false);
+#warning port to the new way of doing this
+// KLocalizedString::insertCatalog( QLatin1String( "libakonadi" ) );
+ int rv = kapp->exec();
+ delete r;
+ return rv;
+}
+
+void ResourceBasePrivate::slotAbortRequested()
+{
+ Q_Q(ResourceBase);
+
+ scheduler->cancelQueues();
+ QMetaObject::invokeMethod(q, "abortActivity");
+}
+
+void ResourceBase::itemRetrieved(const Item &item)
+{
+ Q_D(ResourceBase);
+ Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
+ if (!item.isValid()) {
+ d->scheduler->currentTask().sendDBusReplies(i18nc("@info", "Invalid item \
retrieved")); + d->scheduler->taskDone();
+ return;
+ }
+
+ Item i(item);
+ QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
+ foreach (const QByteArray &part, requestedParts) {
+ if (!item.loadedPayloadParts().contains(part)) {
+ qWarning() << "Item does not provide part" << part;
+ }
+ }
+
+ ItemModifyJob *job = new ItemModifyJob(i);
+ job->d_func()->setSilent( true );
+ // FIXME: remove once the item with which we call retrieveItem() has a revision \
number + job->disableRevisionCheck();
+ connect(job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)));
+}
+
+void ResourceBasePrivate::slotDeliveryDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
+ if (job->error()) {
+ emit q->error(i18nc("@info", "Error while creating item: %1", \
job->errorString())); + }
+ scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : \
QString()); + scheduler->taskDone();
+}
+
+void ResourceBase::collectionAttributesRetrieved(const Collection &collection)
+{
+ Q_D(ResourceBase);
+ Q_ASSERT(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionAttributes); + if (!collection.isValid()) {
+ emit attributesSynchronized(d->scheduler->currentTask().collection.id());
+ d->scheduler->taskDone();
+ return;
+ }
+
+ CollectionModifyJob *job = new CollectionModifyJob(collection);
+ connect(job, SIGNAL(result(KJob*)), \
SLOT(slotCollectionAttributesSyncDone(KJob*))); +}
+
+void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ Q_ASSERT(scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionAttributes); + if (job->error()) {
+ emit q->error(i18nc("@info", "Error while updating collection: %1", \
job->errorString())); + }
+ emit q->attributesSynchronized(scheduler->currentTask().collection.id());
+ scheduler->taskDone();
+}
+
+void ResourceBasePrivate::slotDeleteResourceCollection()
+{
+ Q_Q(ResourceBase);
+
+ CollectionFetchJob *job = new CollectionFetchJob(Collection::root(), \
CollectionFetchJob::FirstLevel); + \
job->fetchScope().setResource(q->identifier()); + connect(job, \
SIGNAL(result(KJob*)), q, SLOT(slotDeleteResourceCollectionDone(KJob*))); +}
+
+void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ if (job->error()) {
+ emit q->error(job->errorString());
+ scheduler->taskDone();
+ } else {
+ const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob \
*>(job); +
+ if (!fetchJob->collections().isEmpty()) {
+ CollectionDeleteJob *job = new \
CollectionDeleteJob(fetchJob->collections().first()); + connect(job, \
SIGNAL(result(KJob*)), q, SLOT(slotCollectionDeletionDone(KJob*))); + } else \
{ + // there is no resource collection, so just ignore the request
+ scheduler->taskDone();
+ }
+ }
+}
+
+void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ if (job->error()) {
+ emit q->error(job->errorString());
+ }
+
+ scheduler->taskDone();
+}
+
+void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection \
&collection) +{
+ Q_Q(ResourceBase);
+ InvalidateCacheJob *job = new InvalidateCacheJob(collection, q);
+ connect(job, SIGNAL(result(KJob*)), scheduler, SLOT(taskDone()));
+}
+
+void ResourceBase::changeCommitted(const Item &item)
+{
+ changesCommitted(Item::List() << item);
+}
+
+void ResourceBase::changesCommitted(const Item::List &items)
+{
+ ItemModifyJob *job = new ItemModifyJob(items);
+ job->d_func()->setClean();
+ job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the \
error? + job->setIgnorePayload(true); // we only want to reset the dirty flag \
and update the remote id + job->setUpdateGid(true); // allow resources to \
update GID too + connect(job, SIGNAL(finished(KJob*)), this, \
SLOT(changeCommittedResult(KJob*))); +}
+
+void ResourceBase::changeCommitted(const Collection &collection)
+{
+ CollectionModifyJob *job = new CollectionModifyJob(collection);
+ connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
+}
+
+void ResourceBasePrivate::changeCommittedResult(KJob *job)
+{
+ Q_Q(ResourceBase);
+ if (qobject_cast<CollectionModifyJob *>(job)) {
+ if (job->error()) {
+ emit q->error(i18nc("@info", "Updating local collection failed: %1.", \
job->errorText())); + }
+ mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob \
*>(job)->collection()); + } else {
+ // TODO: Error handling for item changes?
+ // Item and tag cache is invalidated by modify job
+ }
+
+ changeProcessed();
+}
+
+void ResourceBase::changeCommitted(const Tag &tag)
+{
+ TagModifyJob *job = new TagModifyJob(tag);
+ connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
+}
+
+bool ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId,
+ const QString &mimeType, const QStringList \
&parts) +{
+ return requestItemDeliveryV2(uid, remoteId, mimeType, parts).isEmpty();
+}
+
+QString ResourceBase::requestItemDeliveryV2(qint64 uid, const QString &remoteId, \
const QString &mimeType, const QStringList &_parts) +{
+ Q_D(ResourceBase);
+ if (!isOnline()) {
+ const QString errorMsg = i18nc("@info", "Cannot fetch item in offline \
mode."); + emit error(errorMsg);
+ return errorMsg;
+ }
+
+ setDelayedReply(true);
+ // FIXME: we need at least the revision number too
+ Item item(uid);
+ item.setMimeType(mimeType);
+ item.setRemoteId(remoteId);
+
+ QSet<QByteArray> parts;
+ Q_FOREACH (const QString &str, _parts) {
+ parts.insert(str.toLatin1());
+ }
+
+ d->scheduler->scheduleItemFetch(item, parts, message());
+
+ return QString();
+
+}
+
+void ResourceBase::collectionsRetrieved(const Collection::List &collections)
+{
+ Q_D(ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree || + \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll, + \
"ResourceBase::collectionsRetrieved()", + "Calling \
collectionsRetrieved() although no collection retrieval is in progress"); + if \
(!d->mCollectionSyncer) { + d->mCollectionSyncer = new \
CollectionSync(identifier()); + \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid); + \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong))); + connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*))); + }
+ d->mCollectionSyncer->setRemoteCollections(collections);
+}
+
+void ResourceBase::collectionsRetrievedIncremental(const Collection::List \
&changedCollections, + const \
Collection::List &removedCollections) +{
+ Q_D(ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree || + \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll, + \
"ResourceBase::collectionsRetrievedIncremental()", + "Calling \
collectionsRetrievedIncremental() although no collection retrieval is in progress"); \
+ if (!d->mCollectionSyncer) { + d->mCollectionSyncer = new \
CollectionSync(identifier()); + \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid); + \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong))); + connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*))); + }
+ d->mCollectionSyncer->setRemoteCollections(changedCollections, \
removedCollections); +}
+
+void ResourceBase::setCollectionStreamingEnabled(bool enable)
+{
+ Q_D(ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree || + \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll, + \
"ResourceBase::setCollectionStreamingEnabled()", + "Calling \
setCollectionStreamingEnabled() although no collection retrieval is in progress"); + \
if (!d->mCollectionSyncer) { + d->mCollectionSyncer = new \
CollectionSync(identifier()); + \
d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid); + \
connect(d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), \
SLOT(slotPercent(KJob*,ulong))); + connect(d->mCollectionSyncer, \
SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*))); + }
+ d->mCollectionSyncer->setStreamingEnabled(enable);
+}
+
+void ResourceBase::collectionsRetrievalDone()
+{
+ Q_D(ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree || + \
d->scheduler->currentTask().type == ResourceScheduler::SyncAll, + \
"ResourceBase::collectionsRetrievalDone()", + "Calling \
collectionsRetrievalDone() although no collection retrieval is in progress"); + \
// streaming enabled, so finalize the sync + if (d->mCollectionSyncer) {
+ d->mCollectionSyncer->retrievalDone();
+ } else {
+ // user did the sync himself, we are done now
+ // FIXME: we need the same special case for SyncAll as in \
slotCollectionSyncDone here! + d->scheduler->taskDone();
+ }
+}
+
+void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ mCollectionSyncer = 0;
+ if (job->error()) {
+ if (job->error() != Job::UserCanceled) {
+ emit q->error(job->errorString());
+ }
+ } else {
+ if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
+ CollectionFetchJob *list = new CollectionFetchJob(Collection::root(), \
CollectionFetchJob::Recursive); + \
list->setFetchScope(q->changeRecorder()->collectionFetchScope()); + \
list->fetchScope().setResource(mId); + q->connect(list, \
SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*))); + return;
+ } else if (scheduler->currentTask().type == \
ResourceScheduler::SyncCollectionTree) { + \
scheduler->scheduleCollectionTreeSyncCompletion(); + }
+ }
+ scheduler->taskDone();
+}
+
+void ResourceBasePrivate::slotLocalListDone(KJob *job)
+{
+ Q_Q(ResourceBase);
+ if (job->error()) {
+ emit q->error(job->errorString());
+ } else {
+ Collection::List cols = static_cast<CollectionFetchJob \
*>(job)->collections(); + foreach (const Collection &col, cols) {
+ scheduler->scheduleSync(col);
+ }
+ scheduler->scheduleFullSyncCompletion();
+ }
+ scheduler->taskDone();
+}
+
+void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
+{
+ Q_Q(ResourceBase);
+ currentCollection = col;
+ // check if this collection actually can contain anything
+ QStringList contentTypes = currentCollection.contentMimeTypes();
+ contentTypes.removeAll(Collection::mimeType());
+ contentTypes.removeAll(Collection::virtualMimeType());
+ if (!contentTypes.isEmpty() || col.isVirtual()) {
+ if (mAutomaticProgressReporting) {
+ emit q->status(AgentBase::Running, i18nc("@info:status", "Syncing \
folder '%1'", currentCollection.displayName())); + }
+ q->retrieveItems(currentCollection);
+ return;
+ }
+ scheduler->taskDone();
+}
+
++int ResourceBase::itemSyncBatchSize() const
++{
++ Q_D(const ResourceBase);
++ return d->mItemSyncBatchSize;
++}
++
++void ResourceBase::setItemSyncBatchSize(int batchSize)
++{
++ Q_D(ResourceBase);
++ d->mItemSyncBatchSize = batchSize;
++}
++
+void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection \
&col) +{
+ Q_Q(ResourceBase);
+ QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", \
Q_ARG(Akonadi::Collection, col)); +}
+
+void ResourceBasePrivate::slotPrepareItemRetrieval(const Akonadi::Item &item)
+{
+ Q_Q(ResourceBase);
+ ItemFetchJob *fetch = new ItemFetchJob(item, this);
+ fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
+ fetch->fetchScope().setCacheOnly(true);
+
+ // copy list of attributes to fetch
+ const QSet<QByteArray> attributes = \
q->changeRecorder()->itemFetchScope().attributes(); + foreach (const QByteArray \
&attribute, attributes) { + fetch->fetchScope().fetchAttribute(attribute);
+ }
+
+ q->connect(fetch, SIGNAL(result(KJob*)), \
SLOT(slotPrepareItemRetrievalResult(KJob*))); +}
+
+void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
+{
+ Q_Q(ResourceBase);
+ Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
+ "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
+ "Preparing item retrieval although no item retrieval is in \
progress"); + if (job->error()) {
+ q->cancelTask(job->errorText());
+ return;
+ }
+ ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
+ if (fetch->items().count() != 1) {
+ q->cancelTask(i18n("The requested item no longer exists"));
+ return;
+ }
+ const Item item = fetch->items().first();
+ const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
+ if (!q->retrieveItem(item, parts)) {
+ q->cancelTask();
+ }
+}
+
+void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
+{
+ Q_Q(ResourceBase);
+ Q_ASSERT(mover);
+ Q_ASSERT(!m_recursiveMover);
+ m_recursiveMover = mover;
+ connect(mover, SIGNAL(result(KJob*)), q, \
SLOT(slotRecursiveMoveReplayResult(KJob*))); + mover->start();
+}
+
+void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
+{
+ Q_Q(ResourceBase);
+ m_recursiveMover = 0;
+
+ if (job->error()) {
+ q->deferTask();
+ return;
+ }
+
+ changeProcessed();
+}
+
+void ResourceBase::itemsRetrievalDone()
+{
+ Q_D(ResourceBase);
+ // streaming enabled, so finalize the sync
+ if (d->mItemSyncer) {
+ d->mItemSyncer->deliveryDone();
+ } else {
+ // user did the sync himself, we are done now
+ d->scheduler->taskDone();
+ }
+}
+
+void ResourceBase::clearCache()
+{
+ Q_D(ResourceBase);
+ d->scheduler->scheduleResourceCollectionDeletion();
+}
+
+void ResourceBase::invalidateCache(const Collection &collection)
+{
+ Q_D(ResourceBase);
+ d->scheduler->scheduleCacheInvalidation(collection);
+}
+
+Collection ResourceBase::currentCollection() const
+{
+ Q_D(const ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == \
ResourceScheduler::SyncCollection , + \
"ResourceBase::currentCollection()", + "Trying to access current \
collection although no item retrieval is in progress"); + return \
d->currentCollection; +}
+
+Item ResourceBase::currentItem() const
+{
+ Q_D(const ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
+ "ResourceBase::currentItem()",
+ "Trying to access current item although no item retrieval is in \
progress"); + return d->scheduler->currentTask().item;
+}
+
+void ResourceBase::synchronizeCollectionTree()
+{
+ d_func()->scheduler->scheduleCollectionTreeSync();
+}
+
+void ResourceBase::cancelTask()
+{
+ Q_D(ResourceBase);
+ switch (d->scheduler->currentTask().type) {
+ case ResourceScheduler::FetchItem:
+ itemRetrieved(Item()); // sends the error reply and
+ break;
+ case ResourceScheduler::ChangeReplay:
+ d->changeProcessed();
+ break;
+ case ResourceScheduler::SyncCollectionTree:
+ case ResourceScheduler::SyncAll:
+ if (d->mCollectionSyncer) {
+ d->mCollectionSyncer->rollback();
+ } else {
+ d->scheduler->taskDone();
+ }
+ break;
+ case ResourceScheduler::SyncCollection:
+ if (d->mItemSyncer) {
+ d->mItemSyncer->rollback();
+ } else {
+ d->scheduler->taskDone();
+ }
+ break;
+ default:
+ d->scheduler->taskDone();
+ }
+}
+
+void ResourceBase::cancelTask(const QString &msg)
+{
+ cancelTask();
+
+ emit error(msg);
+}
+
+void ResourceBase::deferTask()
+{
+ Q_D(ResourceBase);
+ d->scheduler->deferTask();
+}
+
+void ResourceBase::doSetOnline(bool state)
+{
+ d_func()->scheduler->setOnline(state);
+}
+
+void ResourceBase::synchronizeCollection(qint64 collectionId)
+{
+ synchronizeCollection(collectionId, false);
+}
+
+void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
+{
+ CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), \
recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base); + \
job->setFetchScope(changeRecorder()->collectionFetchScope()); + \
job->fetchScope().setResource(identifier()); + job->setProperty("recursive", \
recursive); + connect(job, SIGNAL(result(KJob*)), \
SLOT(slotCollectionListDone(KJob*))); +}
+
+void ResourceBasePrivate::slotCollectionListDone(KJob *job)
+{
+ if (!job->error()) {
+ Collection::List list = static_cast<CollectionFetchJob \
*>(job)->collections(); + if (!list.isEmpty()) {
+ if (job->property("recursive").toBool()) {
+ Q_FOREACH (const Collection &collection, list) {
+ scheduler->scheduleSync(collection);
+ }
+ } else {
+ scheduler->scheduleSync(list.first());
+ }
+ }
+ }
+ // TODO: error handling
+}
+
+void ResourceBase::synchronizeCollectionAttributes(qint64 collectionId)
+{
+ CollectionFetchJob *job = new CollectionFetchJob(Collection(collectionId), \
CollectionFetchJob::Base); + \
job->setFetchScope(changeRecorder()->collectionFetchScope()); + \
job->fetchScope().setResource(identifier()); + connect(job, \
SIGNAL(result(KJob*)), SLOT(slotCollectionListForAttributesDone(KJob*))); +}
+
+void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
+{
+ if (!job->error()) {
+ Collection::List list = static_cast<CollectionFetchJob \
*>(job)->collections(); + if (!list.isEmpty()) {
+ Collection col = list.first();
+ scheduler->scheduleAttributesSync(col);
+ }
+ }
+ // TODO: error handling
+}
+
+void ResourceBase::setTotalItems(int amount)
+{
+ qDebug() << amount;
+ Q_D(ResourceBase);
+ setItemStreamingEnabled(true);
+ if (d->mItemSyncer) {
+ d->mItemSyncer->setTotalItems(amount);
+ }
+}
+
+void ResourceBase::setItemStreamingEnabled(bool enable)
+{
+ Q_D(ResourceBase);
+ d->createItemSyncInstanceIfMissing();
+ if (d->mItemSyncer) {
+ d->mItemSyncer->setStreamingEnabled(enable);
+ }
+}
+
+void ResourceBase::itemsRetrieved(const Item::List &items)
+{
+ Q_D(ResourceBase);
+ d->createItemSyncInstanceIfMissing();
+ if (d->mItemSyncer) {
+ d->mItemSyncer->setFullSyncItems(items);
+ }
+}
+
+void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems,
+ const Item::List &removedItems)
+{
+ Q_D(ResourceBase);
+ d->createItemSyncInstanceIfMissing();
+ if (d->mItemSyncer) {
+ d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
+ }
+}
+
+void ResourceBasePrivate::slotItemSyncDone(KJob *job)
+{
+ mItemSyncer = 0;
+ Q_Q(ResourceBase);
+ if (job->error() && job->error() != Job::UserCanceled) {
+ emit q->error(job->errorString());
+ }
+ scheduler->taskDone();
+}
+
+void ResourceBasePrivate::slotDelayedEmitProgress()
+{
+ Q_Q(ResourceBase);
+ if (mAutomaticProgressReporting) {
+ emit q->percent(mUnemittedProgress);
+
+ Q_FOREACH (const QVariantMap &statusMap, mUnemittedAdvancedStatus) {
+ emit q->advancedStatus(statusMap);
+ }
+ }
+ mUnemittedProgress = 0;
+ mUnemittedAdvancedStatus.clear();
+}
+
+void ResourceBasePrivate::slotPercent(KJob *job, unsigned long percent)
+{
+ mUnemittedProgress = percent;
+
+ const Collection collection = job->property("collection").value<Collection>();
+ if (collection.isValid()) {
+ QVariantMap statusMap;
+ statusMap.insert(QStringLiteral("key"), \
QString::fromLatin1("collectionSyncProgress")); + \
statusMap.insert(QStringLiteral("collectionId"), collection.id()); + \
statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent)); +
+ mUnemittedAdvancedStatus[collection.id()] = statusMap;
+ }
+ // deliver completion right away, intermediate progress at 1s intervals
+ if (percent == 100) {
+ mProgressEmissionCompressor.stop();
+ slotDelayedEmitProgress();
+ } else if (!mProgressEmissionCompressor.isActive()) {
+ mProgressEmissionCompressor.start();
+ }
+}
+
+void ResourceBase::setHierarchicalRemoteIdentifiersEnabled(bool enable)
+{
+ Q_D(ResourceBase);
+ d->mHierarchicalRid = enable;
+}
+
+void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const \
QVariant &argument, SchedulePriority priority) +{
+ Q_D(ResourceBase);
+ d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
+}
+
+void ResourceBase::taskDone()
+{
+ Q_D(ResourceBase);
+ d->scheduler->taskDone();
+}
+
+void ResourceBase::retrieveCollectionAttributes(const Collection &collection)
+{
+ collectionAttributesRetrieved(collection);
+}
+
+void Akonadi::ResourceBase::abortActivity()
+{
+}
+
+void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode)
+{
+ Q_D(ResourceBase);
+ d->mItemTransactionMode = mode;
+}
+
+void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope \
&fetchScope) +{
+ Q_D(ResourceBase);
+ if (!d->mItemSyncFetchScope) {
+ d->mItemSyncFetchScope = new ItemFetchScope;
+ }
+ *(d->mItemSyncFetchScope) = fetchScope;
+}
+
+void ResourceBase::setAutomaticProgressReporting(bool enabled)
+{
+ Q_D(ResourceBase);
+ d->mAutomaticProgressReporting = enabled;
+}
+
+QString ResourceBase::dumpNotificationListToString() const
+{
+ Q_D(const ResourceBase);
+ return d->dumpNotificationListToString();
+}
+
+QString ResourceBase::dumpSchedulerToString() const
+{
+ Q_D(const ResourceBase);
+ return d->dumpToString();
+}
+
+void ResourceBase::dumpMemoryInfo() const
+{
+ Q_D(const ResourceBase);
+ return d->dumpMemoryInfo();
+}
+
+QString ResourceBase::dumpMemoryInfoToString() const
+{
+ Q_D(const ResourceBase);
+ return d->dumpMemoryInfoToString();
+}
+
+#include "resourcebase.moc"
+#include "moc_resourcebase.cpp"
diff --cc akonadi/src/agentbase/resourcebase.h
index 3901cb0,0000000..5076777
mode 100644,000000..100644
--- a/akonadi/src/agentbase/resourcebase.h
+++ b/akonadi/src/agentbase/resourcebase.h
@@@ -1,725 -1,0 +1,761 @@@
+/*
+ This file is part of akonadiresources.
+
+ Copyright (c) 2006 Till Adam <adam@kde.org>
+ Copyright (c) 2007 Volker Krause <vkrause@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#ifndef AKONADI_RESOURCEBASE_H
+#define AKONADI_RESOURCEBASE_H
+
+#include "akonadiagentbase_export.h"
+#include "agentbase.h"
+#include "collection.h"
+#include "item.h"
+#include "itemsync.h"
+
+class KJob;
+class Akonadi__ResourceAdaptor;
+class ResourceState;
+
+namespace Akonadi {
+
+class ResourceBasePrivate;
+
+/**
+ * @short The base class for all Akonadi resources.
+ *
+ * This class should be used as a base class by all resource agents,
+ * because it encapsulates large parts of the protocol between
+ * resource agent, agent manager and the Akonadi storage.
+ *
+ * It provides many convenience methods to make implementing a
+ * new Akonadi resource agent as simple as possible.
+ *
+ * <h4>How to write a resource</h4>
+ *
+ * The following provides an overview of what you need to do to implement
+ * your own Akonadi resource. In the following, the term 'backend' refers
+ * to the entity the resource connects with Akonadi, be it a single file
+ * or a remote server.
+ *
+ * @todo Complete this (online/offline state management)
+ *
+ * <h5>Basic %Resource Framework</h5>
+ *
+ * The following is needed to create a new resource:
+ * - A new class deriving from Akonadi::ResourceBase, implementing at least all
+ * pure-virtual methods, see below for further details.
+ * - call init() in your main() function.
+ * - a .desktop file similar to the following example
+ * \code
+ * [Desktop Entry]
+ * Encoding=UTF-8
+ * Name=My Akonadi Resource
+ * Type=AkonadiResource
+ * Exec=akonadi_my_resource
+ * Icon=my-icon
+ *
+ * X-Akonadi-MimeTypes=<supported-mimetypes>
+ * X-Akonadi-Capabilities=Resource
+ * X-Akonadi-Identifier=akonadi_my_resource
+ * \endcode
+ *
+ * <h5>Handling PIM Items</h5>
+ *
+ * To follow item changes in the backend, the following steps are necessary:
+ * - Implement retrieveItems() to synchronize all items in the given
+ * collection. If the backend supports incremental retrieval,
+ * implementing support for that is recommended to improve performance.
+ * - Convert the items provided by the backend to Akonadi items.
+ * This typically happens either in retrieveItems() if you retrieved
+ * the collection synchronously (not recommended for network backends) or
+ * in the result slot of the asynchronous retrieval job.
+ * Converting means to create Akonadi::Item objects for every retrieved
+ * item. It's very important that every object has its remote identifier set.
+ * - Call itemsRetrieved() or itemsRetrievedIncremental() respectively
+ * with the item objects created above. The Akonadi storage will then be
+ * updated automatically. Note that it is usually not necessary to manipulate
+ * any item in the Akonadi storage manually.
+ *
+ * To fetch item data on demand, the method retrieveItem() needs to be
+ * reimplemented. Fetch the requested data there and call itemRetrieved()
+ * with the result item.
+ *
+ * To write local changes back to the backend, you need to re-implement
+ * the following three methods:
+ * - itemAdded()
+ * - itemChanged()
+ * - itemRemoved()
+ *
+ * Note that these three functions don't get the full payload of the items by \
default, + * you need to change the item fetch scope of the change recorder to fetch \
the full + * payload. This can be expensive with big payloads, though.<br>
+ * Once you have handled changes in these methods, call changeCommitted().
+ * These methods are called whenever a local item related to this resource is
+ * added, modified or deleted. They are only called if the resource is online, \
otherwise + * all changes are recorded and replayed as soon the resource is online \
again. + *
+ * <h5>Handling Collections</h5>
+ *
+ * To follow collection changes in the backend, the following steps are necessary:
+ * - Implement retrieveCollections() to retrieve collections from the backend.
+ * If the backend supports incremental collections updates, implementing
+ * support for that is recommended to improve performance.
+ * - Convert the collections of the backend to Akonadi collections.
+ * This typically happens either in retrieveCollections() if you retrieved
+ * the collection synchronously (not recommended for network backends) or
+ * in the result slot of the asynchronous retrieval job.
+ * Converting means to create Akonadi::Collection objects for every retrieved
+ * collection. It's very important that every object has its remote identifier
+ * and its parent remote identifier set.
+ * - Call collectionsRetrieved() or collectionsRetrievedIncremental() respectively
+ * with the collection objects created above. The Akonadi storage will then be
+ * updated automatically. Note that it is usually not necessary to manipulate
+ * any collection in the Akonadi storage manually.
+ *
+ *
+ * To write local collection changes back to the backend, you need to re-implement
+ * the following three methods:
+ * - collectionAdded()
+ * - collectionChanged()
+ * - collectionRemoved()
+ * Once you have handled changes in these methods call changeCommitted().
+ * These methods are called whenever a local collection related to this resource is
+ * added, modified or deleted. They are only called if the resource is online, \
otherwise + * all changes are recorded and replayed as soon the resource is online \
again. + *
+ * @todo Convenience base class for collection-less resources
+ */
+// FIXME_API: API dox need to be updated for Observer approach (kevin)
+class AKONADIAGENTBASE_EXPORT ResourceBase : public AgentBase
+{
+ Q_OBJECT
+
+public:
+ /**
+ * Use this method in the main function of your resource
+ * application to initialize your resource subclass.
+ * This method also takes care of creating a KApplication
+ * object and parsing command line arguments.
+ *
+ * @note In case the given class is also derived from AgentBase::Observer
+ * it gets registered as its own observer (see AgentBase::Observer), e.g.
+ * <tt>resourceInstance->registerObserver( resourceInstance );</tt>
+ *
+ * @code
+ *
+ * class MyResource : public ResourceBase
+ * {
+ * ...
+ * };
+ *
+ * int main( int argc, char **argv )
+ * {
+ * return ResourceBase::init<MyResource>( argc, argv );
+ * }
+ *
+ * @endcode
+ *
+ * @param argc number of arguments
+ * @param argv string arguments
+ */
+ template <typename T>
+ static int init(int argc, char **argv)
+ {
+ const QString id = parseArguments(argc, argv);
+ KApplication app;
+ T *r = new T(id);
+
+ // check if T also inherits AgentBase::Observer and
+ // if it does, automatically register it on itself
+ Observer *observer = dynamic_cast<Observer *>(r);
+ if (observer != 0) {
+ r->registerObserver(observer);
+ }
+
+ return init(r);
+ }
+
+ /**
+ * This method is used to set the name of the resource.
+ */
+ void setName(const QString &name);
+
+ /**
+ * Returns the name of the resource.
+ */
+ QString name() const;
+
+ /**
+ * Enable or disable automatic progress reporting. By default, it is enabled.
+ * When enabled, the resource will automatically emit the signals percent() and \
status() + * while syncing items or collections.
+ *
+ * The automatic progress reporting is done on a per item / per collection \
basis, so if a + * finer granularity is desired, automatic reporting should be \
disabled and the subclass should + * emit the percent() and status() signals \
itself. + *
+ * @param enabled Whether or not automatic emission of the signals is enabled.
+ * @since 4.7
+ */
+ void setAutomaticProgressReporting(bool enabled);
+
+Q_SIGNALS:
+ /**
+ * This signal is emitted whenever the name of the resource has changed.
+ *
+ * @param name The new name of the resource.
+ */
+ void nameChanged(const QString &name);
+
+ /**
+ * Emitted when a full synchronization has been completed.
+ */
+ void synchronized();
+
+ /**
+ * Emitted when a collection attributes synchronization has been completed.
+ *
+ * @param collectionId The identifier of the collection whose attributes got \
synchronized. + * @since 4.6
+ */
+ void attributesSynchronized(qlonglong collectionId);
+
+ /**
+ * Emitted when a collection tree synchronization has been completed.
+ *
+ * @since 4.8
+ */
+ void collectionTreeSynchronized();
+
++ /**
++ * Emitted when the item synchronization processed the current batch and is \
ready for a new one. ++ * Use this to throttle the delivery to not overload \
Akonadi. ++ *
++ * Throttling can be used during item retrieval \
(retrieveItems(Akonadi::Collection)) in streaming mode. ++ * To throttle only \
deliver itemSyncBatchSize() items, and wait for this signal, then again deliver ++ \
* @param remainingBatchSize items. ++ *
++ * By always only providing the number of items required to process the batch, \
the items don't pile ++ * up in memory and items are only provided as fast as \
Akonadi can process them. ++ *
++ * @see batchSize()
++ *
++ * @since 4.14
++ */
++ void retrieveNextItemSyncBatch(int remainingBatchSize);
++
+protected Q_SLOTS:
+ /**
+ * Retrieve the collection tree from the remote server and supply it via
+ * collectionsRetrieved() or collectionsRetrievedIncremental().
+ * @see collectionsRetrieved(), collectionsRetrievedIncremental()
+ */
+ virtual void retrieveCollections() = 0;
+
+ /**
+ * Retrieve the attributes of a single collection from the backend. The
+ * collection to retrieve attributes for is provided as @p collection.
+ * Add the attributes parts and call collectionAttributesRetrieved()
+ * when done.
+ *
+ * @param collection The collection whose attributes should be retrieved.
+ * @see collectionAttributesRetrieved()
+ * @since 4.6
+ */
+ // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
+ // in order to simulate polymorphism
+ void retrieveCollectionAttributes(const Akonadi::Collection &collection);
+
+ /**
+ * Retrieve all (new/changed) items in collection @p collection.
+ * It is recommended to use incremental retrieval if the backend supports that
+ * and provide the result by calling itemsRetrievedIncremental().
+ * If incremental retrieval is not possible, provide the full listing by \
calling + * itemsRetrieved( const Item::List& ).
+ * In any case, ensure that all items have a correctly set remote identifier
+ * to allow synchronizing with items already existing locally.
+ * In case you don't want to use the built-in item syncing code, store the \
retrieved + * items manually and call itemsRetrieved() once you are done.
+ * @param collection The collection whose items to retrieve.
- * @see itemsRetrieved( const Item::List& ), itemsRetrievedIncremental(), \
itemsRetrieved(), currentCollection() ++ * @see itemsRetrieved( const Item::List& \
), itemsRetrievedIncremental(), itemsRetrieved(), currentCollection(), batchSize() + \
*/ + virtual void retrieveItems(const Akonadi::Collection &collection) = 0;
+
+ /**
++ * Returns the batch size used during the item sync.
++ *
++ * This can be used to throttle the item delivery.
++ *
++ * @see retrieveNextItemSyncBatch(int), retrieveItems(Akonadi::Collection)
++ * @since 4.14
++ */
++ int itemSyncBatchSize() const;
++
++ /**
++ * Set the batch size used during the item sync.
++ * The default is 10.
++ *
++ * @see retrieveNextItemSyncBatch(int)
++ * @since 4.14
++ */
++ void setItemSyncBatchSize(int batchSize);
++
++ /**
+ * Retrieve a single item from the backend. The item to retrieve is provided as \
@p item. + * Add the requested payload parts and call itemRetrieved() when done.
+ * @param item The empty item whose payload should be retrieved. Use this \
object when delivering + * the result instead of creating a new item to ensure \
conflict detection will work. + * @param parts The item parts that should be \
retrieved. + * @return false if there is an immediate error when retrieving the \
item. + * @see itemRetrieved()
+ */
+ virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> \
&parts) = 0; +
+ /**
+ * Abort any activity in progress in the backend. By default this method does \
nothing. + *
+ * @since 4.6
+ */
+ // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
+ // in order to simulate polymorphism
+ void abortActivity();
+
+ /**
+ * Dump resource internals, for debugging.
+ * @since 4.9
+ */
+ // KDE5: Make it pure virtual, for now can be called only by invokeMethod()
+ // in order to simulate polymorphism
+ QString dumpResourceToString() const
+ {
+ return QString();
+ }
+
+protected:
+ /**
+ * Creates a base resource.
+ *
+ * @param id The instance id of the resource.
+ */
+ ResourceBase(const QString &id);
+
+ /**
+ * Destroys the base resource.
+ */
+ ~ResourceBase();
+
+ /**
+ * Call this method from retrieveItem() once the result is available.
+ *
+ * @param item The retrieved item.
+ */
+ void itemRetrieved(const Item &item);
+
+ /**
+ * Call this method from retrieveCollectionAttributes() once the result is \
available. + *
+ * @param collection The collection whose attributes got retrieved.
+ * @since 4.6
+ */
+ void collectionAttributesRetrieved(const Collection &collection);
+
+ /**
+ * Resets the dirty flag of the given item and updates the remote id.
+ *
+ * Call whenever you have successfully written changes back to the server.
+ * This implicitly calls changeProcessed().
+ * @param item The changed item.
+ */
+ void changeCommitted(const Item &item);
+
+ /**
+ * Resets the dirty flag of all given items and updates remote ids.
+ *
+ * Call whenever you have successfully written changes back to the server.
+ * This implicitly calls changeProcessed().
+ * @param items Changed items
+ *
+ * @since 4.11
+ */
+ void changesCommitted(const Item::List &items);
+
+ /**
+ * Resets the dirty flag of the given tag and updates the remote id.
+ *
+ * Call whenever you have successfully written changes back to the server.
+ * This implicitly calls changeProcessed().
+ * @param tag Changed tag.
+ *
+ * @since 4.13
+ */
+ void changeCommitted(const Tag &tag);
+
+ /**
+ * Call whenever you have successfully handled or ignored a collection
+ * change notification.
+ *
+ * This will update the remote identifier of @p collection if necessary,
+ * as well as any other collection attributes.
+ * This implicitly calls changeProcessed().
+ * @param collection The collection which changes have been handled.
+ */
+ void changeCommitted(const Collection &collection);
+
+ /**
+ * Call this to supply the full folder tree retrieved from the remote server.
+ *
+ * @param collections A list of collections.
+ * @see collectionsRetrievedIncremental()
+ */
+ void collectionsRetrieved(const Collection::List &collections);
+
+ /**
+ * Call this to supply incrementally retrieved collections from the remote \
server. + *
+ * @param changedCollections Collections that have been added or changed.
+ * @param removedCollections Collections that have been deleted.
+ * @see collectionsRetrieved()
+ */
+ void collectionsRetrievedIncremental(const Collection::List \
&changedCollections, + const \
Collection::List &removedCollections); +
+ /**
+ * Enable collection streaming, that is collections don't have to be delivered \
at once + * as result of a retrieveCollections() call but can be delivered by \
multiple calls + * to collectionsRetrieved() or \
collectionsRetrievedIncremental(). When all collections + * have been retrieved, \
call collectionsRetrievalDone(). + * @param enable @c true if collection \
streaming should be enabled, @c false by default + */
+ void setCollectionStreamingEnabled(bool enable);
+
+ /**
+ * Call this method to indicate you finished synchronizing the collection tree.
+ *
+ * This is not needed if you use the built in syncing without collection \
streaming + * and call collectionsRetrieved() or \
collectionRetrievedIncremental() instead. + * If collection streaming is \
enabled, call this method once all collections have been delivered + * using \
collectionsRetrieved() or collectionsRetrievedIncremental(). + */
+ void collectionsRetrievalDone();
+
+ /**
+ * Call this method to supply the full collection listing from the remote \
server. Items not present in the list + * will be dropped from the Akonadi \
database. + *
+ * If the remote server supports incremental listing, it's strongly
+ * recommended to use itemsRetrievedIncremental() instead.
+ * @param items A list of items.
+ * @see itemsRetrievedIncremental().
+ */
+ void itemsRetrieved(const Item::List &items);
+
+ /**
+ * Call this method when you want to use the itemsRetrieved() method
+ * in streaming mode and indicate the amount of items that will arrive
+ * that way.
+ * @deprecated Use setItemStreamingEnabled( true ) + \
itemsRetrieved[Incremental]() + * + itemsRetrieved() instead.
+ * @param amount number of items that will arrive in streaming mode
+ */
+ void setTotalItems(int amount);
+
+ /**
+ * Enable item streaming.
+ * Item streaming is disabled by default.
+ * @param enable @c true if items are delivered in chunks rather in one big \
block. + */
+ void setItemStreamingEnabled(bool enable);
+
+ /**
+ * Set transaction mode for item sync'ing.
+ * @param mode item transaction mode
+ * @see Akonadi::ItemSync::TransactionMode
+ * @since 4.6
+ */
+ void setItemTransactionMode(ItemSync::TransactionMode mode);
+
+ /**
+ * Set the fetch scope applied for item synchronization.
+ * By default, the one set on the changeRecorder() is used. However, it can \
make sense + * to specify a specialized fetch scope for synchronization to \
improve performance. + * The rule of thumb is to remove anything from this fetch \
scope that does not provide + * additional information regarding whether and \
item has changed or not. This is primarily + * relevant for backends not \
supporting incremental retrieval. + * @param fetchScope The fetch scope to use \
by the internal Akonadi::ItemSync instance. + * @see Akonadi::ItemSync
+ * @since 4.6
+ */
+ void setItemSynchronizationFetchScope(const ItemFetchScope &fetchScope);
+
+ /**
+ * Call this method to supply incrementally retrieved items from the remote \
server. + *
+ * @param changedItems Items changed in the backend.
+ * @param removedItems Items removed from the backend.
+ */
+ void itemsRetrievedIncremental(const Item::List &changedItems,
+ const Item::List &removedItems);
+
+ /**
+ * Call this method to indicate you finished synchronizing the current \
collection. + *
+ * This is not needed if you use the built in syncing without item streaming
+ * and call itemsRetrieved() or itemsRetrievedIncremental() instead.
+ * If item streaming is enabled, call this method once all items have been \
delivered + * using itemsRetrieved() or itemsRetrievedIncremental().
+ * @see retrieveItems()
+ */
+ void itemsRetrievalDone();
+
+ /**
+ * Call this method to remove all items and collections of the resource from \
the + * server cache.
+ *
+ * The method should not be used anymore
+ *
+ * @see invalidateCache()
+ * @since 4.3
+ */
+ void clearCache();
+
+ /**
+ * Call this method to invalidate all cached content in @p collection.
+ *
+ * The method should be used when the backend indicated that the cached content
+ * is no longer valid.
+ *
+ * @param collection parent of the content to be invalidated in cache
+ * @since 4.8
+ */
+ void invalidateCache(const Collection &collection);
+
+ /**
+ * Returns the collection that is currently synchronized.
+ * @note Calling this method is only allowed during a collection \
synchronization task, that + * is directly or indirectly from retrieveItems().
+ */
+ Collection currentCollection() const;
+
+ /**
+ * Returns the item that is currently retrieved.
+ * @note Calling this method is only allowed during fetching a single item, \
that + * is directly or indirectly from retrieveItem().
+ */
+ Item currentItem() const;
+
+ /**
+ * This method is called whenever the resource should start synchronize all \
data. + */
+ void synchronize();
+
+ /**
+ * This method is called whenever the collection with the given @p id
+ * shall be synchronized.
+ */
+ void synchronizeCollection(qint64 id);
+
+ /**
+ * This method is called whenever the collection with the given @p id
+ * shall be synchronized.
+ * @param recursive if true, a recursive synchronization is done
+ */
+ void synchronizeCollection(qint64 id, bool recursive);
+
+ /**
+ * This method is called whenever the collection with the given @p id
+ * shall have its attributes synchronized.
+ *
+ * @param id The id of the collection to synchronize
+ * @since 4.6
+ */
+ void synchronizeCollectionAttributes(qint64 id);
+
+ /**
+ * Refetches the Collections.
+ */
+ void synchronizeCollectionTree();
+
+ /**
+ * Stops the execution of the current task and continues with the next one.
+ */
+ void cancelTask();
+
+ /**
+ * Stops the execution of the current task and continues with the next one.
+ * Additionally an error message is emitted.
+ * @param error additional error message to be emitted
+ */
+ void cancelTask(const QString &error);
+
+ /**
+ * Stops the execution of the current task and continues with the next one.
+ * The current task will be tried again later.
+ *
+ * This can be used to delay the task processing until the resource has reached \
a safe + * state, e.g. login to a server succeeded.
+ *
+ * @note This does not change the order of tasks so if there is no task with \
higher priority + * e.g. a custom task added with #Prepend the deferred \
task will be processed again. + *
+ * @since 4.3
+ */
+ void deferTask();
+
+ /**
+ * Inherited from AgentBase.
+ */
+ void doSetOnline(bool online);
+
+ /**
+ * Indicate the use of hierarchical remote identifiers.
+ *
+ * This means that it is possible to have two different items with the same
+ * remoteId in different Collections.
+ *
+ * This should be called in the resource constructor as needed.
+ *
+ * @param enable whether to enable use of hierarchical remote identifiers
+ * @since 4.4
+ */
+ void setHierarchicalRemoteIdentifiersEnabled(bool enable);
+
+ friend class ResourceScheduler;
+ friend class ::ResourceState;
+
+ /**
+ * Describes the scheduling priority of a task that has been queued
+ * for execution.
+ *
+ * @see scheduleCustomTask
+ * @since 4.4
+ */
+ enum SchedulePriority {
+ Prepend, ///< The task will be executed as soon as the current \
task has finished. + AfterChangeReplay, ///< The task is scheduled after the \
last ChangeReplay task in the queue + Append ///< The task will \
be executed after all tasks currently in the queue are finished + };
+
+ /**
+ * Schedules a custom task in the internal scheduler. It will be queued with
+ * all other tasks such as change replays and retrieval requests and eventually
+ * executed by calling the specified method. With the priority parameter the
+ * time of execution of the Task can be influenced. @see SchedulePriority
+ * @param receiver The object the slot should be called on.
+ * @param method The name of the method (and only the name, not signature, not \
SLOT(...) macro), + * that should be called to execute this task. The method has \
to be a slot and take a QVariant as + * argument.
+ * @param argument A QVariant argument passed to the method specified above. \
Use this to pass task + * parameters.
+ * @param priority Priority of the task. Use this to influence the position in
+ * the execution queue.
+ * @since 4.4
+ */
+ void scheduleCustomTask(QObject *receiver, const char *method, const QVariant \
&argument, SchedulePriority priority = Append); +
+ /**
+ * Indicate that the current task is finished. Use this method from the slot \
called via scheduleCustomTaks(). + * As with all the other callbacks, make sure \
to either call taskDone() or cancelTask()/deferTask() on all + * exit paths, \
otherwise the resource will hang. + * @since 4.4
+ */
+ void taskDone();
+
+ /**
+ * Dump the contents of the current ChangeReplay
+ * @since 4.8.1
+ */
+ QString dumpNotificationListToString() const;
+
+ /**
+ * Dumps memory usage information to stdout.
+ * For now it outputs the result of glibc's mallinfo().
+ * This is useful to check if your memory problems are due to poor management \
by glibc. + * Look for a high value on fsmblks when interpreting results.
+ * man mallinfo for more details.
+ * @since 4.11
+ */
+ void dumpMemoryInfo() const;
+
+ /**
+ * Returns a string with memory usage information.
+ * @see dumpMemoryInfo()
+ *
+ * @since 4.11
+ */
+ QString dumpMemoryInfoToString() const;
+
+ /**
+ * Dump the state of the scheduler
+ * @since 4.8.1
+ */
+ QString dumpSchedulerToString() const;
+
+private:
+ static QString parseArguments(int, char **);
+ static int init(ResourceBase *r);
+
+ // dbus resource interface
+ friend class ::Akonadi__ResourceAdaptor;
+
+ bool requestItemDelivery(qint64 uid, const QString &remoteId, const QString \
&mimeType, const QStringList &parts); +
+ QString requestItemDeliveryV2(qint64 uid, const QString &remoteId, const \
QString &mimeType, const QStringList &parts); +
+private:
+ Q_DECLARE_PRIVATE(ResourceBase)
+
+ Q_PRIVATE_SLOT(d_func(), void slotAbortRequested())
+ Q_PRIVATE_SLOT(d_func(), void slotDeliveryDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotCollectionSyncDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotDeleteResourceCollection())
+ Q_PRIVATE_SLOT(d_func(), void slotDeleteResourceCollectionDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotCollectionDeletionDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotInvalidateCache(const Akonadi::Collection &))
+ Q_PRIVATE_SLOT(d_func(), void slotLocalListDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotSynchronizeCollection(const \
Akonadi::Collection &)) + Q_PRIVATE_SLOT(d_func(), void \
slotCollectionListDone(KJob *)) + Q_PRIVATE_SLOT(d_func(), void \
slotSynchronizeCollectionAttributes(const Akonadi::Collection &)) + \
Q_PRIVATE_SLOT(d_func(), void slotCollectionListForAttributesDone(KJob *)) + \
Q_PRIVATE_SLOT(d_func(), void slotCollectionAttributesSyncDone(KJob *)) + \
Q_PRIVATE_SLOT(d_func(), void slotItemSyncDone(KJob *)) + \
Q_PRIVATE_SLOT(d_func(), void slotPercent(KJob *, unsigned long)) + \
Q_PRIVATE_SLOT(d_func(), void slotDelayedEmitProgress()) + \
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &item)) + \
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrievalResult(KJob *)) + \
Q_PRIVATE_SLOT(d_func(), void changeCommittedResult(KJob *)) + \
Q_PRIVATE_SLOT(d_func(), void slotSessionReconnected()) + \
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplay(RecursiveMover *)) + \
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplayResult(KJob *)) +};
+
+}
+
+#ifndef AKONADI_RESOURCE_MAIN
+/**
+ * Convenience Macro for the most common main() function for Akonadi resources.
+ */
+#define AKONADI_RESOURCE_MAIN( resourceClass ) \
+ int main( int argc, char **argv ) \
+ { \
+ return Akonadi::ResourceBase::init<resourceClass>( argc, argv ); \
+ }
+#endif
+
+#endif
diff --cc akonadi/src/core/itemsync.cpp
index dd7d3b6,0000000..1ab71f4
mode 100644,000000..100644
--- a/akonadi/src/core/itemsync.cpp
+++ b/akonadi/src/core/itemsync.cpp
@@@ -1,558 -1,0 +1,679 @@@
+/*
+ Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
+ Copyright (c) 2007 Volker Krause <vkrause@kde.org>
++ Copyright (c) 2014 Christian Mollekopf <mollekopf@kolabsys.com>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#include "itemsync.h"
+
+#include "job_p.h"
+#include "collection.h"
+#include "item.h"
+#include "item_p.h"
+#include "itemcreatejob.h"
+#include "itemdeletejob.h"
+#include "itemfetchjob.h"
+#include "itemmodifyjob.h"
+#include "transactionsequence.h"
+#include "itemfetchscope.h"
+
+#include <qdebug.h>
+
+#include <QtCore/QStringList>
+
+using namespace Akonadi;
+
+/**
+ * @internal
+ */
+class Akonadi::ItemSyncPrivate : public JobPrivate
+{
+public:
+ ItemSyncPrivate(ItemSync *parent)
+ : JobPrivate(parent)
+ , mTransactionMode(ItemSync::SingleTransaction)
+ , mCurrentTransaction(0)
+ , mTransactionJobs(0)
+ , mPendingJobs(0)
+ , mProgress(0)
+ , mTotalItems(-1)
+ , mTotalItemsProcessed(0)
+ , mStreaming(false)
+ , mIncremental(false)
- , mLocalListDone(false)
+ , mDeliveryDone(false)
+ , mFinished(false)
- , mLocalListStarted( false )
++ , mFullListingDone(false)
++ , mProcessingBatch(false)
++ , mBatchSize(10)
+ {
+ // we want to fetch all data by default
+ mFetchScope.fetchFullPayload();
+ mFetchScope.fetchAllAttributes();
+ }
+
+ void createLocalItem(const Item &item);
++ void modifyLocalItem(const Item &remoteItem, Akonadi::Item::Id localId);
+ void checkDone();
++ void slotItemsReceived(const Item::List &items);
+ void slotLocalListDone(KJob *job);
++ void slotLocalFetchDone(KJob *job);
+ void slotLocalDeleteDone(KJob *);
+ void slotLocalChangeDone(KJob *job);
+ void execute();
+ void processItems();
++ void processBatch();
+ void deleteItems(const Item::List &items);
+ void slotTransactionResult(KJob *job);
++ void requestTransaction();
+ Job *subjobParent() const;
+ void fetchLocalItems();
+ QString jobDebuggingString() const /*Q_DECL_OVERRIDE*/;
++ bool allProcessed() const;
+
+ Q_DECLARE_PUBLIC(ItemSync)
+ Collection mSyncCollection;
- QHash<Item::Id, Akonadi::Item> mLocalItemsById;
- QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
- QSet<Akonadi::Item> mUnprocessedLocalItems;
++ QSet<Akonadi::Item::Id> mUnprocessedLocalIds;
++ QHash<QString, Akonadi::Item::Id> mLocalIdByRid;
+
+ ItemSync::TransactionMode mTransactionMode;
+ TransactionSequence *mCurrentTransaction;
+ int mTransactionJobs;
+
+ // fetch scope for initial item listing
+ ItemFetchScope mFetchScope;
+
- // remote items
- Akonadi::Item::List mRemoteItems;
-
- // removed remote items
- Item::List mRemovedRemoteItems;
++ Akonadi::Item::List mRemoteItemQueue;
++ Akonadi::Item::List mRemovedRemoteItemQueue;
++ Akonadi::Item::List mCurrentBatchRemoteItems;
++ Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
+
+ // create counter
+ int mPendingJobs;
+ int mProgress;
+ int mTotalItems;
+ int mTotalItemsProcessed;
+
+ bool mStreaming;
+ bool mIncremental;
- bool mLocalListDone;
+ bool mDeliveryDone;
+ bool mFinished;
- bool mLocalListStarted;
++ bool mFullListingDone;
++ bool mProcessingBatch;
++
++ int mBatchSize;
+};
+
+void ItemSyncPrivate::createLocalItem(const Item &item)
+{
+ Q_Q(ItemSync);
+ // don't try to do anything in error state
+ if (q->error()) {
+ return;
+ }
+ mPendingJobs++;
+ ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, \
subjobParent()); + q->connect(create, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*))); +}
+
++void ItemSyncPrivate::modifyLocalItem(const Item &remoteItem, Akonadi::Item::Id \
localId) ++{
++ Q_Q(ItemSync);
++ // don't try to do anything in error state
++ if (q->error()) {
++ return;
++ }
++
++ //we fetch the local item to check if a modification is required and to make \
sure we have all parts ++ Akonadi::ItemFetchJob *fetchJob = new \
Akonadi::ItemFetchJob(Akonadi::Item(localId), subjobParent()); ++ \
fetchJob->setFetchScope(mFetchScope); ++ \
fetchJob->fetchScope().setCacheOnly(true); ++ \
fetchJob->setDeliveryOption(ItemFetchJob::ItemGetter); ++ q->connect(fetchJob, \
SIGNAL(result(KJob*)), q, SLOT(slotLocalFetchDone(KJob*))); ++ \
fetchJob->setProperty("remoteItem", QVariant::fromValue(remoteItem)); ++ \
mPendingJobs++; ++}
++
++void ItemSyncPrivate::slotLocalFetchDone(KJob *job)
++{
++ Q_Q(ItemSync);
++ mPendingJobs--;
++ if (job->error()) {
++ kWarning() << job->errorString();
++ checkDone();
++ return;
++ }
++ Akonadi::ItemFetchJob *fetchJob = static_cast<Akonadi::ItemFetchJob*>(job);
++ Akonadi::Item remoteItem = \
fetchJob->property("remoteItem").value<Akonadi::Item>(); ++ if \
(fetchJob->items().isEmpty()) { ++ kWarning() << "Failed to fetch local item: \
" << remoteItem.remoteId() << remoteItem.gid(); ++ checkDone();
++ return;
++ }
++ const Akonadi::Item localItem = fetchJob->items().first();
++ if (q->updateItem(localItem, remoteItem)) {
++ remoteItem.setId(localItem.id());
++ remoteItem.setRevision(localItem.revision());
++ remoteItem.setSize(localItem.size());
++ remoteItem.setRemoteId(localItem.remoteId()); // in case someone clears \
remoteId by accident ++ ItemModifyJob *mod = new ItemModifyJob(remoteItem, \
subjobParent()); ++ mod->disableRevisionCheck();
++ q->connect(mod, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*))); ++ mPendingJobs++;
++ } else {
++ mProgress++;
++ }
++ checkDone();
++}
++
++bool ItemSyncPrivate::allProcessed() const
++{
++ return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && \
mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && \
mCurrentBatchRemovedRemoteItems.isEmpty(); ++}
++
+void ItemSyncPrivate::checkDone()
+{
+ Q_Q(ItemSync);
+ q->setProcessedAmount(KJob::Bytes, mProgress);
- if (mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0) {
++ if (mPendingJobs > 0) {
++ return;
++ }
++
++ if (mTransactionJobs > 0) {
++ //Commit the current transaction if we're in batch processing mode or done
++ if ((mTransactionMode == ItemSync::MultipleTransactions || mDeliveryDone) \
&& mCurrentTransaction) { ++ mCurrentTransaction->commit();
++ mCurrentTransaction = 0;
++ }
+ return;
+ }
++ mProcessingBatch = false;
++ if (!mRemoteItemQueue.isEmpty()) {
++ execute();
++ //We don't have enough items, request more
++ if (!mProcessingBatch) {
++ q->emit readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
++ }
++ return;
++ }
++ q->emit readyForNextBatch(mBatchSize);
+
- if (!mFinished) {
++ if (allProcessed() && !mFinished) {
+ // prevent double result emission, can happen since checkDone() is called \
from all over the place + mFinished = true;
+ q->emitResult();
+ }
+}
+
+ItemSync::ItemSync(const Collection &collection, QObject *parent)
+ : Job(new ItemSyncPrivate(this), parent)
+{
+ Q_D(ItemSync);
+ d->mSyncCollection = collection;
+}
+
+ItemSync::~ItemSync()
+{
+}
+
+void ItemSync::setFullSyncItems(const Item::List &items)
+{
++ /*
++ * We received a list of items from the server:
++ * * fetch all local id's + rid's only
++ * * check each full sync item wether it's locally available
++ * * if it is modify the item
++ * * if it's not create it
++ * * delete all superfluous items
++ */
+ Q_D(ItemSync);
+ Q_ASSERT(!d->mIncremental);
+ if (!d->mStreaming) {
+ d->mDeliveryDone = true;
+ }
- d->mRemoteItems += items;
++ d->mRemoteItemQueue += items;
+ d->mTotalItemsProcessed += items.count();
+ qDebug() << "Received: " << items.count() << "In total: " << \
d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
- setTotalAmount(KJob::Bytes, d->mTotalItemsProcessed);
+ if (d->mTotalItemsProcessed == d->mTotalItems) {
+ d->mDeliveryDone = true;
+ }
+ d->execute();
+}
+
+void ItemSync::setTotalItems(int amount)
+{
+ Q_D(ItemSync);
+ Q_ASSERT(!d->mIncremental);
+ Q_ASSERT(amount >= 0);
+ setStreamingEnabled(true);
+ qDebug() << amount;
+ d->mTotalItems = amount;
+ setTotalAmount(KJob::Bytes, amount);
+ if (d->mTotalItems == 0) {
+ d->mDeliveryDone = true;
+ d->execute();
+ }
+}
+
+void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const \
Item::List &removedItems) +{
++ /*
++ * We received an incremental listing of items:
++ * * for each changed item:
++ * ** If locally available => modify
++ * ** else => create
++ * * removed items can be removed right away
++ */
+ Q_D(ItemSync);
+ d->mIncremental = true;
+ if (!d->mStreaming) {
+ d->mDeliveryDone = true;
+ }
- d->mRemoteItems += changedItems;
- d->mRemovedRemoteItems += removedItems;
++ d->mRemoteItemQueue += changedItems;
++ d->mRemovedRemoteItemQueue += removedItems;
+ d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
- setTotalAmount(KJob::Bytes, d->mTotalItemsProcessed);
++ kDebug() << "Received: " << changedItems.count() << "Removed: " << \
removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << \
d->mTotalItems; + if (d->mTotalItemsProcessed == d->mTotalItems) {
+ d->mDeliveryDone = true;
+ }
+ d->execute();
+}
+
+void ItemSync::setFetchScope(ItemFetchScope &fetchScope)
+{
+ Q_D(ItemSync);
+ d->mFetchScope = fetchScope;
+}
+
+ItemFetchScope &ItemSync::fetchScope()
+{
+ Q_D(ItemSync);
+ return d->mFetchScope;
+}
+
+void ItemSync::doStart()
+{
+}
+
+bool ItemSync::updateItem(const Item &storedItem, Item &newItem)
+{
+ Q_D(ItemSync);
+ // we are in error state, better not change anything at all anymore
+ if (error()) {
+ return false;
+ }
+
+ /*
+ * We know that this item has changed (as it is part of the
+ * incremental changed list), so we just put it into the
+ * storage.
+ */
+ if (d->mIncremental) {
+ return true;
+ }
+
+ if (newItem.d_func()->mClearPayload) {
+ return true;
+ }
+
+ // Check whether the remote revisions differ
+ if (storedItem.remoteRevision() != newItem.remoteRevision()) {
+ return true;
+ }
+
+ // Check whether the flags differ
+ if (storedItem.flags() != newItem.flags()) {
+ qDebug() << "Stored flags " << storedItem.flags()
+ << "new flags " << newItem.flags();
+ return true;
+ }
+
+ // Check whether the new item contains unknown parts
+ QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
+ missingParts.subtract(storedItem.loadedPayloadParts());
+ if (!missingParts.isEmpty()) {
+ return true;
+ }
+
+ // ### FIXME SLOW!!!
+ // If the available part identifiers don't differ, check
+ // whether the content of the payload differs
+ if (newItem.hasPayload()
+ && storedItem.payloadData() != newItem.payloadData()) {
+ return true;
+ }
+
+ // check if remote attributes have been changed
+ foreach (Attribute *attr, newItem.attributes()) {
+ if (!storedItem.hasAttribute(attr->type())) {
+ return true;
+ }
+ if (attr->serialized() != storedItem.attribute(attr->type())->serialized()) \
{ + return true;
+ }
+ }
+
+ return false;
+}
+
+void ItemSyncPrivate::fetchLocalItems()
+{
+ Q_Q( ItemSync );
- if (mLocalListStarted) {
- return;
- }
- mLocalListStarted = true;
+ ItemFetchJob* job;
- if ( mIncremental ) {
- if ( mRemoteItems.isEmpty() ) {
++ if (mIncremental) {
++ //Try fetching the items so we have their id and know if they're available
++ const Akonadi::Item::List itemsToFetch = mCurrentBatchRemoteItems + \
mCurrentBatchRemovedRemoteItems; ++ if (itemsToFetch.isEmpty()) {
+ // The fetch job produces an error with an empty set
- mLocalListDone = true;
- execute();
++ processBatch();
+ return;
+ }
+ // We need to fetch the items only to detect if they are new or modified
- job = new ItemFetchJob( mRemoteItems, q );
- job->setFetchScope( mFetchScope );
- job->setCollection( mSyncCollection );
++ job = new ItemFetchJob(itemsToFetch, q);
++ job->fetchScope().setFetchRemoteIdentification(true);
++ job->fetchScope().setFetchModificationTime(false);
++ job->setCollection(mSyncCollection);
++ job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
+ // We use this to check if items are available locally, so errors are \
inevitable
- job->fetchScope().setIgnoreRetrievalErrors( true );
++ job->fetchScope().setIgnoreRetrievalErrors(true);
++ QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, \
SLOT(slotItemsReceived(Akonadi::Item::List))); + } else {
- job = new ItemFetchJob( mSyncCollection, q );
- job->setFetchScope( mFetchScope );
++ if (mFullListingDone) {
++ processBatch();
++ return;
++ }
++ //Otherwise we'll remove the created items again during the second run
++ mFullListingDone = true;
++ job = new ItemFetchJob(mSyncCollection, q);
++ job->fetchScope().setFetchRemoteIdentification(true);
++ job->fetchScope().setFetchModificationTime(false);
++ job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
++ QObject::connect(job, SIGNAL(itemsReceived(Akonadi::Item::List)), q, \
SLOT(slotItemsReceived(Akonadi::Item::List))); + }
+
+ // we only can fetch parts already in the cache, otherwise this will deadlock
- job->fetchScope().setCacheOnly( true );
++ job->fetchScope().setCacheOnly(true);
+
- QObject::connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)) \
); ++ QObject::connect(job, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalListDone(KJob*))); +}
+
- void ItemSyncPrivate::slotLocalListDone(KJob *job)
++void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
+{
- if (!job->error()) {
- const Item::List list = static_cast<ItemFetchJob *>(job)->items();
- foreach (const Item &item, list) {
- if (item.remoteId().isEmpty()) {
- continue;
- }
- mLocalItemsById.insert(item.id(), item);
- mLocalItemsByRemoteId.insert(item.remoteId(), item);
- mUnprocessedLocalItems.insert(item);
++ foreach (const Akonadi::Item &item, items) {
++ //Don't delete items that have not yet been synchronized
++ if (item.remoteId().isEmpty()) {
++ continue;
++ }
++ if (mLocalIdByRid.contains(item.remoteId())) {
++ kWarning() << "Found multiple items with the same rid : " << \
item.remoteId() << item.id(); ++ } else {
++ mLocalIdByRid.insert(item.remoteId(), item.id());
++ }
++ if (!mIncremental) {
++ mUnprocessedLocalIds << item.id();
+ }
+ }
++}
+
- mLocalListDone = true;
- execute();
++void ItemSyncPrivate::slotLocalListDone(KJob *job)
++{
++ if (job->error()) {
++ kWarning() << job->errorString();
++ }
++ processBatch();
+}
+
+QString ItemSyncPrivate::jobDebuggingString() const /*Q_DECL_OVERRIDE*/
+{
+ // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set \
after the job + // started, so this requires passing jobDebuggingString to \
jobEnded(). + return QString::fromLatin1("Collection %1 \
(%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name()); +}
+
+void ItemSyncPrivate::execute()
+{
+ Q_Q(ItemSync);
- if (!mLocalListDone) {
- // Start fetching local items only once the delivery is done for \
incremental fetch,
- // so we can fetch only the required items
- if (mDeliveryDone || !mIncremental) {
- fetchLocalItems();
++ //shouldn't happen
++ if (mFinished) {
++ kWarning() << "Call to execute() on finished job.";
++ Q_ASSERT(false);
++ return;
++ }
++ if (mTransactionJobs > 0) {
++ kWarning() << "transaction still in progress " << mProcessingBatch;
++ return;
++ }
++ //not doing anything, start processing
++ if (!mProcessingBatch) {
++ if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
++ //we have a new batch to process
++ const int num = qMin(mBatchSize, mRemoteItemQueue.size());
++ for (int i = 0; i < num; i++) {
++ mCurrentBatchRemoteItems << mRemoteItemQueue.takeFirst();
++ }
++ mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
++ mRemovedRemoteItemQueue.clear();
++ } else {
++ //nothing to do, let's wait for more data
++ return;
+ }
++ mProcessingBatch = true;
++ fetchLocalItems();
+ return;
+ }
++ checkDone();
++}
+
- // early exit to avoid unnecessary TransactionSequence creation in \
MultipleTransactions mode
- // TODO: do the transaction handling in a nicer way instead, only creating \
TransactionSequences when really needed
- if (!mDeliveryDone && mRemoteItems.isEmpty()) {
++//process the current batch of items
++void ItemSyncPrivate::processBatch()
++{
++ Q_Q(ItemSync);
++ if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
+ return;
+ }
+
- if ((mTransactionMode == ItemSync::SingleTransaction && !mCurrentTransaction) \
|| mTransactionMode == ItemSync::MultipleTransactions) {
- ++mTransactionJobs;
- mCurrentTransaction = new TransactionSequence(q);
- mCurrentTransaction->setAutomaticCommittingEnabled(false);
- QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, \
SLOT(slotTransactionResult(KJob*)));
- }
++ //request a transaction, there are items that require processing
++ requestTransaction();
+
+ processItems();
- if (!mDeliveryDone) {
- if (mTransactionMode == ItemSync::MultipleTransactions && \
mCurrentTransaction) {
- mCurrentTransaction->commit();
- mCurrentTransaction = 0;
- }
- return;
- }
+
+ // removed
- if (!mIncremental) {
- mRemovedRemoteItems = mUnprocessedLocalItems.toList();
- mUnprocessedLocalItems.clear();
++ Akonadi::Item::List itemsToDelete;
++ if (!mIncremental && allProcessed()) {
++ //the full listing is done and we know which items to remove
++ foreach (Akonadi::Item::Id id, mUnprocessedLocalIds) {
++ itemsToDelete << Akonadi::Item(id);
++ }
++ mUnprocessedLocalIds.clear();
++ } else {
++ foreach (const Akonadi::Item &removedItem, mCurrentBatchRemovedRemoteItems) \
{ ++ if (!mLocalIdByRid.contains(removedItem.remoteId())) {
++ kWarning() << "cannot remove item because it's not available \
locally. RID: " << removedItem.remoteId(); ++ continue;
++ }
++ itemsToDelete << \
Akonadi::Item(mLocalIdByRid.value(removedItem.remoteId())); ++ }
++ mCurrentBatchRemovedRemoteItems.clear();
+ }
++ deleteItems(itemsToDelete);
+
- deleteItems(mRemovedRemoteItems);
- mLocalItemsById.clear();
- mLocalItemsByRemoteId.clear();
- mRemovedRemoteItems.clear();
-
- if (mCurrentTransaction) {
- mCurrentTransaction->commit();
- mCurrentTransaction = 0;
++ if (mIncremental) {
++ //no longer required, we processed all items of the current batch
++ mLocalIdByRid.clear();
+ }
-
+ checkDone();
+}
+
+void ItemSyncPrivate::processItems()
+{
+ Q_Q(ItemSync);
+ // added / updated
- foreach (Item remoteItem, mRemoteItems) { //krazy:exclude=foreach non-const \
is needed here
- #ifndef NDEBUG
++ foreach (const Item &remoteItem, mCurrentBatchRemoteItems) {
+ if (remoteItem.remoteId().isEmpty()) {
+ qWarning() << "Item " << remoteItem.id() << " does not have a remote \
identifier";
- }
- #endif
-
- Item localItem = mLocalItemsById.value(remoteItem.id());
- if (!localItem.isValid()) {
- localItem = mLocalItemsByRemoteId.value(remoteItem.remoteId());
- }
- mUnprocessedLocalItems.remove(localItem);
- // missing locally
- if (!localItem.isValid()) {
- createLocalItem(remoteItem);
+ continue;
+ }
+
- if (q->updateItem(localItem, remoteItem)) {
- mPendingJobs++;
-
- remoteItem.setId(localItem.id());
- remoteItem.setRevision(localItem.revision());
- remoteItem.setSize(localItem.size());
- remoteItem.setRemoteId(localItem.remoteId()); // in case someone \
clears remoteId by accident
- ItemModifyJob *mod = new ItemModifyJob(remoteItem, subjobParent());
- mod->disableRevisionCheck();
- q->connect(mod, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalChangeDone(KJob*))); ++ //TODO also check by id and gid
++ //Locally available
++ if (mLocalIdByRid.contains(remoteItem.remoteId())) {
++ const Akonadi::Item::Id localId = \
mLocalIdByRid.value(remoteItem.remoteId()); ++ if (!mIncremental) {
++ mUnprocessedLocalIds.remove(localId);
++ }
++ modifyLocalItem(remoteItem, localId);
+ } else {
- mProgress++;
++ createLocalItem(remoteItem);
+ }
+ }
- mRemoteItems.clear();
++ mCurrentBatchRemoteItems.clear();
+}
+
- void ItemSyncPrivate::deleteItems(const Item::List &items)
++void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
+{
+ Q_Q(ItemSync);
+ // if in error state, better not change anything anymore
+ if (q->error()) {
+ return;
+ }
+
- Item::List itemsToDelete;
- foreach (const Item &item, items) {
- Item delItem(item);
- if (!mIncremental) {
- if (!item.isValid()) {
- delItem = mLocalItemsByRemoteId.value(item.remoteId());
- }
-
- if (!delItem.isValid()) {
- #ifndef NDEBUG
- qWarning() << "Delete item (remoteeId=" << item.remoteId()
- << "mimeType=" << item.mimeType()
- << ") does not have a valid UID and no item with that \
remote ID exists either";
- #endif
- continue;
- }
- }
-
- if (delItem.remoteId().isEmpty()) {
- // don't attempt to remove items that never were written to the backend
- continue;
- }
-
- itemsToDelete.append(delItem);
++ if (itemsToDelete.isEmpty()) {
++ return;
+ }
+
- if (!itemsToDelete.isEmpty()) {
- mPendingJobs++;
- ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
- q->connect(job, SIGNAL(result(KJob*)), q, \
SLOT(slotLocalDeleteDone(KJob*)));
-
- // It can happen that the groupware servers report us deleted items
- // twice, in this case this item delete job will fail on the second try.
- // To avoid a rollback of the complete transaction we gracefully allow the \
job
- // to fail :)
- TransactionSequence *transaction = qobject_cast<TransactionSequence \
*>(subjobParent());
- if (transaction) {
- transaction->setIgnoreJobFailure(job);
- }
++ mPendingJobs++;
++ ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
++ q->connect(job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)));
++
++ // It can happen that the groupware servers report us deleted items
++ // twice, in this case this item delete job will fail on the second try.
++ // To avoid a rollback of the complete transaction we gracefully allow the job
++ // to fail :)
++ TransactionSequence *transaction = qobject_cast<TransactionSequence \
*>(subjobParent()); ++ if (transaction) {
++ transaction->setIgnoreJobFailure(job);
+ }
+}
+
+void ItemSyncPrivate::slotLocalDeleteDone(KJob *)
+{
+ mPendingJobs--;
+ mProgress++;
+
+ checkDone();
+}
+
+void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
+{
+ Q_UNUSED(job);
+ mPendingJobs--;
+ mProgress++;
+
+ checkDone();
+}
+
+void ItemSyncPrivate::slotTransactionResult(KJob *job)
+{
+ --mTransactionJobs;
+ if (mCurrentTransaction == job) {
+ mCurrentTransaction = 0;
+ }
+
+ checkDone();
+}
+
++void ItemSyncPrivate::requestTransaction()
++{
++ Q_Q(ItemSync);
++ //we never want parallel transactions, single transaction just makes one big \
transaction, and multi transaction uses multiple transaction sequentially ++ if \
(!mCurrentTransaction) { ++ ++mTransactionJobs;
++ mCurrentTransaction = new TransactionSequence(q);
++ mCurrentTransaction->setAutomaticCommittingEnabled(false);
++ QObject::connect(mCurrentTransaction, SIGNAL(result(KJob*)), q, \
SLOT(slotTransactionResult(KJob*))); ++ }
++}
++
+Job *ItemSyncPrivate::subjobParent() const
+{
+ Q_Q(const ItemSync);
+ if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
+ return mCurrentTransaction;
+ }
+ return const_cast<ItemSync *>(q);
+}
+
+void ItemSync::setStreamingEnabled(bool enable)
+{
+ Q_D(ItemSync);
+ d->mStreaming = enable;
+}
+
+void ItemSync::deliveryDone()
+{
+ Q_D(ItemSync);
+ Q_ASSERT(d->mStreaming);
+ d->mDeliveryDone = true;
+ d->execute();
+}
+
+void ItemSync::slotResult(KJob *job)
+{
+ if (job->error()) {
+ // pretent there were no errors
+ Akonadi::Job::removeSubjob(job);
+ // propagate the first error we got but continue, we might still be fed \
with stuff from a resource + if (!error()) {
+ setError(job->error());
+ setErrorText(job->errorText());
+ }
+ } else {
+ Akonadi::Job::slotResult(job);
+ }
+}
+
+void ItemSync::rollback()
+{
+ Q_D(ItemSync);
+ setError(UserCanceled);
+ if (d->mCurrentTransaction) {
+ d->mCurrentTransaction->rollback();
+ }
+ d->mDeliveryDone = true; // user wont deliver more data
+ d->execute(); // end this in an ordered way, since we have an error set no real \
change will be done +}
+
+void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
+{
+ Q_D(ItemSync);
+ d->mTransactionMode = mode;
+}
+
++int ItemSync::batchSize() const
++{
++ Q_D(const ItemSync);
++ return d->mBatchSize;
++}
++
++void ItemSync::setBatchSize(int size)
++{
++ Q_D(ItemSync);
++ d->mBatchSize = size;
++}
++
++
+#include "moc_itemsync.cpp"
diff --cc akonadi/src/core/itemsync.h
index ba3912e,0000000..dabba64
mode 100644,000000..100644
--- a/akonadi/src/core/itemsync.h
+++ b/akonadi/src/core/itemsync.h
@@@ -1,199 -1,0 +1,230 @@@
+/*
+ Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#ifndef AKONADI_ITEMSYNC_H
+#define AKONADI_ITEMSYNC_H
+
+#include "akonadicore_export.h"
+#include "item.h"
+#include "job.h"
+
+namespace Akonadi {
+
+class Collection;
+class ItemFetchScope;
+class ItemSyncPrivate;
+
+/**
+ * @short Syncs between items known to a client (usually a resource) and the \
Akonadi storage. + *
+ * Remote Id must only be set by the resource storing the item, other clients
+ * should leave it empty, since the resource responsible for the target collection
+ * will be notified about the addition and then create a suitable remote Id.
+ *
+ * There are two different forms of ItemSync usage:
+ * - Full-Sync: meaning the client provides all valid items, i.e. any item not
+ * part of the list but currently stored in Akonadi will be removed
+ * - Incremental-Sync: meaning the client provides two lists, one for items which
+ * are new or modified and one for items which should be removed. Any item not
+ * part of either list but currently stored in Akonadi will not be changed.
+ *
+ * @note This is provided for convenience to implement "save all" like behavior,
+ * however it is strongly recommended to use single item jobs whenever
+ * possible, e.g. ItemCreateJob, ItemModifyJob and ItemDeleteJob
+ *
+ * @author Tobias Koenig <tokoe@kde.org>
+ */
+class AKONADICORE_EXPORT ItemSync : public Job
+{
+ Q_OBJECT
+
+public:
+ /**
+ * Creates a new item synchronizer.
+ *
+ * @param collection The collection we are syncing.
+ * @param parent The parent object.
+ */
+ explicit ItemSync(const Collection &collection, QObject *parent = 0);
+
+ /**
+ * Destroys the item synchronizer.
+ */
+ ~ItemSync();
+
+ /**
+ * Sets the full item list for the collection.
+ *
+ * Usually the result of a full item listing.
+ *
+ * @warning If the client using this is a resource, all items must have
+ * a valid remote identifier.
+ *
+ * @param items A list of items.
+ */
+ void setFullSyncItems(const Item::List &items);
+
+ /**
+ * Set the amount of items which you are going to return in total
+ * by using the setFullSyncItems() method.
+ *
+ * @param amount The amount of items in total.
+ */
+ void setTotalItems(int amount);
+
+ /**
+ Enable item streaming. Item streaming means that the items delivered by \
setXItems() calls + are delivered in chunks and you manually indicate when all \
items have been delivered + by calling deliveryDone().
+ @param enable @c true to enable item streaming
+ */
+ void setStreamingEnabled(bool enable);
+
+ /**
+ Notify ItemSync that all remote items have been delivered.
+ Only call this in streaming mode.
+ */
+ void deliveryDone();
+
+ /**
+ * Sets the item lists for incrementally syncing the collection.
+ *
+ * Usually the result of an incremental remote item listing.
+ *
+ * @warning If the client using this is a resource, all items must have
+ * a valid remote identifier.
+ *
+ * @param changedItems A list of items added or changed by the client.
+ * @param removedItems A list of items deleted by the client.
+ */
+ void setIncrementalSyncItems(const Item::List &changedItems,
+ const Item::List &removedItems);
+
+ /**
+ * Sets the item fetch scope.
+ *
+ * The ItemFetchScope controls how much of an item's data is fetched
+ * from the server, e.g. whether to fetch the full item payload or
+ * only meta data.
+ *
+ * @param fetchScope The new scope for item fetch operations.
+ *
+ * @see fetchScope()
+ */
+ void setFetchScope(ItemFetchScope &fetchScope);
+
+ /**
+ * Returns the item fetch scope.
+ *
+ * Since this returns a reference it can be used to conveniently modify the
+ * current scope in-place, i.e. by calling a method on the returned reference
+ * without storing it in a local variable. See the ItemFetchScope documentation
+ * for an example.
+ *
+ * @return a reference to the current item fetch scope
+ *
+ * @see setFetchScope() for replacing the current item fetch scope
+ */
+ ItemFetchScope &fetchScope();
+
+ /**
+ * Aborts the sync process and rolls back all not yet committed transactions.
+ * Use this if an external error occurred during the sync process (such as the
+ * user canceling it).
+ * @since 4.5
+ */
+ void rollback();
+
+ /**
+ * Transaction mode used by ItemSync.
+ * @since 4.6
+ */
+ enum TransactionMode {
+ SingleTransaction, ///< Use a single transaction for the entire sync \
process (default), provides maximum consistency ("all or nothing") and best \
performance + MultipleTransactions, ///< Use one transaction per chunk of \
delivered items, good compromise between the other two when using streaming + \
NoTransaction ///< Use no transaction at all, provides highest responsiveness (might \
therefore feel faster even when actually taking slightly longer), no consistency \
guaranteed (can fail anywhere in the sync process) + };
+
+ /**
+ * Set the transaction mode to use for this sync.
+ * @note You must call this method before starting the sync, changes afterwards \
lead to undefined results. + * @param mode the transaction mode to use
+ * @since 4.6
+ */
+ void setTransactionMode(TransactionMode mode);
+
++ /**
++ * Minimum number of items required to start processing in streaming mode.
++ * When MultipleTransactions is used, one transaction per batch will be \
created. ++ *
++ * @see setBatchSize()
++ * @since 4.14
++ */
++ int batchSize() const;
++
++ /**
++ * Set the batch size.
++ *
++ * The default is 10.
++ *
++ * @note You must call this method before starting the sync, changes afterwards \
lead to undefined results. ++ * @see batchSize()
++ * @since 4.14
++ */
++ void setBatchSize(int);
++
++Q_SIGNALS:
++ /**
++ * Signals the resource that new items can be delivered.
++ * @param remainingBatchSize the number of items required to complete the batch \
(typically the same as batchSize()) ++ *
++ * @since 4.14
++ */
++ void readyForNextBatch(int remainingBatchSize);
++
+protected:
+ void doStart();
+ void slotResult(KJob *job);
+
+ /**
+ * Reimplement this method to customize the synchronization algorithm.
+ * @param storedItem the item as it is now
+ * @param newItem the item as it should be
+ * You can update the @p newItem according to the @p storedItem before
+ * it gets committed.
+ */
+ virtual bool updateItem(const Item &storedItem, Item &newItem);
+
+private:
+ //@cond PRIVATE
+ Q_DECLARE_PRIVATE(ItemSync)
+ ItemSyncPrivate *dummy; // for BC. KF5 TODO: REMOVE.
+
+ Q_PRIVATE_SLOT(d_func(), void slotLocalListDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotLocalDeleteDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotLocalChangeDone(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotTransactionResult(KJob *))
++ Q_PRIVATE_SLOT(d_func(), void slotItemsReceived(const Akonadi::Item::List &))
++ Q_PRIVATE_SLOT(d_func(), void slotLocalFetchDone(KJob *))
+ //@endcond
+};
+
+}
+
+#endif
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic