[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    [sink/develop] /: The synchronization call can be sync.
From:       Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date:       2016-12-02 10:08:34
Message-ID: E1cCkle-0004b8-Qp () code ! kde ! org
[Download RAW message or body]

Git commit 6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 by Christian Mollekopf.
Committed on 29/11/2016 at 10:27.
Pushed by cmollekopf into branch 'develop'.

The synchronization call can be sync.

... because we really just enqueue the request and then wait for the
notification.

M  +5    -43   common/commandprocessor.cpp
M  +0    -1    common/commandprocessor.h
M  +2    -1    common/genericresource.cpp
M  +5    -0    common/notifier.cpp
M  +1    -0    common/notifier.h
M  +36   -7    common/synchronizer.cpp
M  +7    -6    common/synchronizer.h
M  +16   -3    tests/mailsynctest.cpp

https://commits.kde.org/sink/6a072b2dcf23cbcdb210f2bd5c273ea0f425b188

diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index fccff22..8eb0ef1 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -128,17 +128,7 @@ void CommandProcessor::processSynchronizeCommand(const \
QByteArray &data)  QDataStream stream(&data, QIODevice::ReadOnly);
             stream >> query;
         }
-        synchronizeWithSource(query)
-            .then<void>([timer](const KAsync::Error &error) {
-                if (error) {
-                    SinkWarning() << "Sync failed: " << error.errorMessage;
-                    return KAsync::error(error);
-                } else {
-                    SinkTrace() << "Sync took " << \
                Sink::Log::TraceTime(timer->elapsed());
-                    return KAsync::null();
-                }
-            })
-            .exec();
+        mSynchronizer->synchronize(query);
     } else {
         SinkWarning() << "received invalid command";
     }
@@ -156,34 +146,6 @@ void CommandProcessor::processSynchronizeCommand(const \
QByteArray &data)  //     loadResource().setLowerBoundRevision(lowerBoundRevision());
 // }
 
-KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase \
                &query)
-{
-    return KAsync::start<void>([this, query] {
-        Sink::Notification n;
-        n.id = "sync";
-        n.type = Sink::Notification::Status;
-        n.message = "Synchronization has started.";
-        n.code = Sink::ApplicationDomain::BusyStatus;
-        emit notify(n);
-
-        SinkLog() << " Synchronizing";
-        return mSynchronizer->synchronize(query)
-            .then<void>([this](const KAsync::Error &error) {
-                if (!error) {
-                    SinkLog() << "Done Synchronizing";
-                    Sink::Notification n;
-                    n.id = "sync";
-                    n.type = Sink::Notification::Status;
-                    n.message = "Synchronization has ended.";
-                    n.code = Sink::ApplicationDomain::ConnectedStatus;
-                    emit notify(n);
-                    return KAsync::null();
-                }
-                return KAsync::error(error);
-            });
-    });
-}
-
 void CommandProcessor::setOldestUsedRevision(qint64 revision)
 {
     mLowerBoundRevision = revision;
@@ -337,17 +299,17 @@ void CommandProcessor::setSynchronizer(const \
                QSharedPointer<Synchronizer> &synch
     QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() \
{  Sink::Notification n;
         n.id = "changereplay";
-        n.type = Sink::Notification::Status;
+        n.type = Notification::Status;
         n.message = "Replaying changes.";
-        n.code = Sink::ApplicationDomain::BusyStatus;
+        n.code = ApplicationDomain::BusyStatus;
         emit notify(n);
     });
     QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() \
{  Sink::Notification n;
         n.id = "changereplay";
-        n.type = Sink::Notification::Status;
+        n.type = Notification::Status;
         n.message = "All changes have been replayed.";
-        n.code = Sink::ApplicationDomain::ConnectedStatus;
+        n.code = ApplicationDomain::ConnectedStatus;
         emit notify(n);
     });
 
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index a807f46..81f93e5 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -78,7 +78,6 @@ private:
     // void processRevisionReplayedCommand(const QByteArray &data);
 
     KAsync::Job<void> flush(void const *command, size_t size);
-    KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
 
     Sink::Pipeline *mPipeline;
     MessageQueue mUserQueue;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5819a07..c11e899 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,8 @@ void GenericResource::processCommand(int commandId, const \
QByteArray &data)  
 KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase \
&query)  {
-    return mSynchronizer->synchronize(query);
+    mSynchronizer->synchronize(query);
+    return KAsync::null<void>();
 }
 
 KAsync::Job<void> GenericResource::processAllMessages()
diff --git a/common/notifier.cpp b/common/notifier.cpp
index 94ac84e..53db5be 100644
--- a/common/notifier.cpp
+++ b/common/notifier.cpp
@@ -23,6 +23,7 @@
 #include <functional>
 
 #include "resourceaccess.h"
+#include "resourceconfig.h"
 #include "log.h"
 
 using namespace Sink;
@@ -60,6 +61,10 @@ Notifier::Notifier(const QByteArray &instanceIdentifier, const \
QByteArray &resou  d->resourceAccess << resourceAccess;
 }
 
+Notifier::Notifier(const QByteArray &instanceIdentifier) : \
Notifier(instanceIdentifier, ResourceConfig::getResourceType(instanceIdentifier)) +{
+}
+
 void Notifier::registerHandler(std::function<void(const Notification &)> handler)
 {
     d->handler << handler;
diff --git a/common/notifier.h b/common/notifier.h
index 3d61e95..df8f34b 100644
--- a/common/notifier.h
+++ b/common/notifier.h
@@ -36,6 +36,7 @@ class SINK_EXPORT Notifier
 {
 public:
     Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
+    Notifier(const QByteArray &resourceInstanceIdentifier);
     Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray \
&resourceType);  void registerHandler(std::function<void(const Notification &)>);
 
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 11c7caf..6483cdf 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
     : ChangeReplay(context),
     mResourceContext(context),
     mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
-    mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + \
".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) +    \
mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + \
".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), +    \
mSyncInProgress(false)  {
     SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << \
mResourceContext.instanceId();  }
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity)
 QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase \
&query)  {
     QList<Synchronizer::SyncRequest> list;
-    list << Synchronizer::SyncRequest{query};
+    list << Synchronizer::SyncRequest{query, "sync"};
     return list;
 }
 
-KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
+void Synchronizer::synchronize(const Sink::QueryBase &query)
 {
     SinkTrace() << "Synchronizing";
     mSyncRequestQueue << getSyncRequests(query);
-    return processSyncQueue();
+    processSyncQueue().exec();
 }
 
 void Synchronizer::flush(int commandId, const QByteArray &flushId)
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
     while (!mSyncRequestQueue.isEmpty()) {
         auto request = mSyncRequestQueue.takeFirst();
         if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
-            job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] \
{ +            job = job.syncThen<void>([this, request] {
+                Sink::Notification n;
+                n.id = request.requestId;
+                n.type = Notification::Status;
+                n.message = "Synchronization has started.";
+                n.code = ApplicationDomain::BusyStatus;
+                emit notify(n);
+            }).then(synchronizeWithSource(request.query)).syncThen<void>([this] {
                 //Commit after every request, so implementations only have to commit \
more if they add a lot of data.  commit();
+            }).then<void>([this, request](const KAsync::Error &error) {
+                if (error) {
+                    //Emit notification with error
+                    SinkWarning() << "Synchronization failed: " << \
error.errorMessage; +                    Sink::Notification n;
+                    n.id = request.requestId;
+                    n.type = Notification::Status;
+                    n.message = "Synchronization has ended.";
+                    n.code = ApplicationDomain::ErrorStatus;
+                    emit notify(n);
+                    return KAsync::error(error);
+                } else {
+                    SinkLog() << "Done Synchronizing";
+                    Sink::Notification n;
+                    n.id = request.requestId;
+                    n.type = Notification::Status;
+                    n.message = "Synchronization has ended.";
+                    n.code = ApplicationDomain::ConnectedStatus;
+                    emit notify(n);
+                    return KAsync::null();
+                }
             });
         } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
             if (request.flushType == Flush::FlushReplayQueue) {
                 SinkTrace() << "Emitting flush completion.";
                 Sink::Notification n;
                 n.type = Sink::Notification::FlushCompletion;
-                n.id = request.flushId;
+                n.id = request.requestId;
                 emit notify(n);
             } else {
                 flatbuffers::FlatBufferBuilder fbb;
-                auto flushId = fbb.CreateString(request.flushId);
+                auto flushId = fbb.CreateString(request.requestId);
                 auto location = Sink::Commands::CreateFlush(fbb, flushId, \
static_cast<int>(Sink::Flush::FlushSynchronization));  \
                Sink::Commands::FinishFlushBuffer(fbb, location);
                 enqueueCommand(Sink::Commands::FlushCommand, \
                BufferUtils::extractBuffer(fbb));
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 989f902..f9b834e 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -44,7 +44,7 @@ public:
     virtual ~Synchronizer();
 
     void setup(const std::function<void(int commandId, const QByteArray &data)> \
                &enqueueCommandCallback, MessageQueue &messageQueue);
-    KAsync::Job<void> synchronize(const Sink::QueryBase &query);
+    void synchronize(const Sink::QueryBase &query);
     void flush(int commandId, const QByteArray &flushId);
 
     //Read only access to main storage
@@ -123,8 +123,9 @@ protected:
             Flush
         };
 
-        SyncRequest(const Sink::QueryBase &q)
-            : requestType(Synchronization),
+        SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = \
QByteArray()) +            : requestId(requestId_),
+            requestType(Synchronization),
             query(q)
         {
         }
@@ -134,15 +135,15 @@ protected:
         {
         }
 
-        SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_)
+        SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_)
             : flushType(flushType_),
-            flushId(flushId_),
+            requestId(requestId_),
             requestType(type)
         {
         }
 
         int flushType = 0;
-        QByteArray flushId;
+        QByteArray requestId;
         RequestType requestType;
         Sink::QueryBase query;
     };
diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp
index 3927f14..72dbfd5 100644
--- a/tests/mailsynctest.cpp
+++ b/tests/mailsynctest.cpp
@@ -25,6 +25,8 @@
 
 #include "store.h"
 #include "resourcecontrol.h"
+#include "notifier.h"
+#include "notification.h"
 #include "log.h"
 #include "test.h"
 
@@ -411,10 +413,21 @@ void MailSyncTest::testFailingSync()
     Sink::Query query;
     query.resourceFilter(resource.identifier());
 
+    bool errorReceived = false;
+
+    //We currently don't get a proper error notification except for the status \
change +    auto notifier = \
QSharedPointer<Sink::Notifier>::create(resource.identifier()); +    \
notifier->registerHandler([&](const Notification &notification) { +        \
SinkTrace() << "Received notification " << notification.type << notification.id; +    \
if (notification.code == ApplicationDomain::ErrorStatus) { +            errorReceived \
= true; +            SinkWarning() << "Sync return an error";
+        } 
+    });
+
+    VERIFYEXEC(Store::synchronize(query));
     // Ensure sync fails if resource is misconfigured
-    auto future = Store::synchronize(query).exec();
-    future.waitForFinished();
-    QVERIFY(future.errorCode());
+    QTRY_VERIFY(errorReceived);
 }
 
 #include "mailsynctest.moc"


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic