[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: [sink/develop] /: The synchronization call can be sync.
From: Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date: 2016-12-02 10:08:34
Message-ID: E1cCkle-0004b8-Qp () code ! kde ! org
[Download RAW message or body]
Git commit 6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 by Christian Mollekopf.
Committed on 29/11/2016 at 10:27.
Pushed by cmollekopf into branch 'develop'.
The synchronization call can be sync.
... because we really just enqueue the request and then wait for the
notification.
M +5 -43 common/commandprocessor.cpp
M +0 -1 common/commandprocessor.h
M +2 -1 common/genericresource.cpp
M +5 -0 common/notifier.cpp
M +1 -0 common/notifier.h
M +36 -7 common/synchronizer.cpp
M +7 -6 common/synchronizer.h
M +16 -3 tests/mailsynctest.cpp
https://commits.kde.org/sink/6a072b2dcf23cbcdb210f2bd5c273ea0f425b188
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index fccff22..8eb0ef1 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -128,17 +128,7 @@ void CommandProcessor::processSynchronizeCommand(const \
QByteArray &data) QDataStream stream(&data, QIODevice::ReadOnly);
stream >> query;
}
- synchronizeWithSource(query)
- .then<void>([timer](const KAsync::Error &error) {
- if (error) {
- SinkWarning() << "Sync failed: " << error.errorMessage;
- return KAsync::error(error);
- } else {
- SinkTrace() << "Sync took " << \
Sink::Log::TraceTime(timer->elapsed());
- return KAsync::null();
- }
- })
- .exec();
+ mSynchronizer->synchronize(query);
} else {
SinkWarning() << "received invalid command";
}
@@ -156,34 +146,6 @@ void CommandProcessor::processSynchronizeCommand(const \
QByteArray &data) // loadResource().setLowerBoundRevision(lowerBoundRevision());
// }
-KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase \
&query)
-{
- return KAsync::start<void>([this, query] {
- Sink::Notification n;
- n.id = "sync";
- n.type = Sink::Notification::Status;
- n.message = "Synchronization has started.";
- n.code = Sink::ApplicationDomain::BusyStatus;
- emit notify(n);
-
- SinkLog() << " Synchronizing";
- return mSynchronizer->synchronize(query)
- .then<void>([this](const KAsync::Error &error) {
- if (!error) {
- SinkLog() << "Done Synchronizing";
- Sink::Notification n;
- n.id = "sync";
- n.type = Sink::Notification::Status;
- n.message = "Synchronization has ended.";
- n.code = Sink::ApplicationDomain::ConnectedStatus;
- emit notify(n);
- return KAsync::null();
- }
- return KAsync::error(error);
- });
- });
-}
-
void CommandProcessor::setOldestUsedRevision(qint64 revision)
{
mLowerBoundRevision = revision;
@@ -337,17 +299,17 @@ void CommandProcessor::setSynchronizer(const \
QSharedPointer<Synchronizer> &synch
QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() \
{ Sink::Notification n;
n.id = "changereplay";
- n.type = Sink::Notification::Status;
+ n.type = Notification::Status;
n.message = "Replaying changes.";
- n.code = Sink::ApplicationDomain::BusyStatus;
+ n.code = ApplicationDomain::BusyStatus;
emit notify(n);
});
QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() \
{ Sink::Notification n;
n.id = "changereplay";
- n.type = Sink::Notification::Status;
+ n.type = Notification::Status;
n.message = "All changes have been replayed.";
- n.code = Sink::ApplicationDomain::ConnectedStatus;
+ n.code = ApplicationDomain::ConnectedStatus;
emit notify(n);
});
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index a807f46..81f93e5 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -78,7 +78,6 @@ private:
// void processRevisionReplayedCommand(const QByteArray &data);
KAsync::Job<void> flush(void const *command, size_t size);
- KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
Sink::Pipeline *mPipeline;
MessageQueue mUserQueue;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5819a07..c11e899 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,8 @@ void GenericResource::processCommand(int commandId, const \
QByteArray &data)
KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase \
&query) {
- return mSynchronizer->synchronize(query);
+ mSynchronizer->synchronize(query);
+ return KAsync::null<void>();
}
KAsync::Job<void> GenericResource::processAllMessages()
diff --git a/common/notifier.cpp b/common/notifier.cpp
index 94ac84e..53db5be 100644
--- a/common/notifier.cpp
+++ b/common/notifier.cpp
@@ -23,6 +23,7 @@
#include <functional>
#include "resourceaccess.h"
+#include "resourceconfig.h"
#include "log.h"
using namespace Sink;
@@ -60,6 +61,10 @@ Notifier::Notifier(const QByteArray &instanceIdentifier, const \
QByteArray &resou d->resourceAccess << resourceAccess;
}
+Notifier::Notifier(const QByteArray &instanceIdentifier) : \
Notifier(instanceIdentifier, ResourceConfig::getResourceType(instanceIdentifier)) +{
+}
+
void Notifier::registerHandler(std::function<void(const Notification &)> handler)
{
d->handler << handler;
diff --git a/common/notifier.h b/common/notifier.h
index 3d61e95..df8f34b 100644
--- a/common/notifier.h
+++ b/common/notifier.h
@@ -36,6 +36,7 @@ class SINK_EXPORT Notifier
{
public:
Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
+ Notifier(const QByteArray &resourceInstanceIdentifier);
Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray \
&resourceType); void registerHandler(std::function<void(const Notification &)>);
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 11c7caf..6483cdf 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
: ChangeReplay(context),
mResourceContext(context),
mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
- mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + \
".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) + \
mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + \
".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), + \
mSyncInProgress(false) {
SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << \
mResourceContext.instanceId(); }
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity)
QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase \
&query) {
QList<Synchronizer::SyncRequest> list;
- list << Synchronizer::SyncRequest{query};
+ list << Synchronizer::SyncRequest{query, "sync"};
return list;
}
-KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
+void Synchronizer::synchronize(const Sink::QueryBase &query)
{
SinkTrace() << "Synchronizing";
mSyncRequestQueue << getSyncRequests(query);
- return processSyncQueue();
+ processSyncQueue().exec();
}
void Synchronizer::flush(int commandId, const QByteArray &flushId)
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
while (!mSyncRequestQueue.isEmpty()) {
auto request = mSyncRequestQueue.takeFirst();
if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
- job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] \
{ + job = job.syncThen<void>([this, request] {
+ Sink::Notification n;
+ n.id = request.requestId;
+ n.type = Notification::Status;
+ n.message = "Synchronization has started.";
+ n.code = ApplicationDomain::BusyStatus;
+ emit notify(n);
+ }).then(synchronizeWithSource(request.query)).syncThen<void>([this] {
//Commit after every request, so implementations only have to commit \
more if they add a lot of data. commit();
+ }).then<void>([this, request](const KAsync::Error &error) {
+ if (error) {
+ //Emit notification with error
+ SinkWarning() << "Synchronization failed: " << \
error.errorMessage; + Sink::Notification n;
+ n.id = request.requestId;
+ n.type = Notification::Status;
+ n.message = "Synchronization has ended.";
+ n.code = ApplicationDomain::ErrorStatus;
+ emit notify(n);
+ return KAsync::error(error);
+ } else {
+ SinkLog() << "Done Synchronizing";
+ Sink::Notification n;
+ n.id = request.requestId;
+ n.type = Notification::Status;
+ n.message = "Synchronization has ended.";
+ n.code = ApplicationDomain::ConnectedStatus;
+ emit notify(n);
+ return KAsync::null();
+ }
});
} else if (request.requestType == Synchronizer::SyncRequest::Flush) {
if (request.flushType == Flush::FlushReplayQueue) {
SinkTrace() << "Emitting flush completion.";
Sink::Notification n;
n.type = Sink::Notification::FlushCompletion;
- n.id = request.flushId;
+ n.id = request.requestId;
emit notify(n);
} else {
flatbuffers::FlatBufferBuilder fbb;
- auto flushId = fbb.CreateString(request.flushId);
+ auto flushId = fbb.CreateString(request.requestId);
auto location = Sink::Commands::CreateFlush(fbb, flushId, \
static_cast<int>(Sink::Flush::FlushSynchronization)); \
Sink::Commands::FinishFlushBuffer(fbb, location);
enqueueCommand(Sink::Commands::FlushCommand, \
BufferUtils::extractBuffer(fbb));
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 989f902..f9b834e 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -44,7 +44,7 @@ public:
virtual ~Synchronizer();
void setup(const std::function<void(int commandId, const QByteArray &data)> \
&enqueueCommandCallback, MessageQueue &messageQueue);
- KAsync::Job<void> synchronize(const Sink::QueryBase &query);
+ void synchronize(const Sink::QueryBase &query);
void flush(int commandId, const QByteArray &flushId);
//Read only access to main storage
@@ -123,8 +123,9 @@ protected:
Flush
};
- SyncRequest(const Sink::QueryBase &q)
- : requestType(Synchronization),
+ SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = \
QByteArray()) + : requestId(requestId_),
+ requestType(Synchronization),
query(q)
{
}
@@ -134,15 +135,15 @@ protected:
{
}
- SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_)
+ SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_)
: flushType(flushType_),
- flushId(flushId_),
+ requestId(requestId_),
requestType(type)
{
}
int flushType = 0;
- QByteArray flushId;
+ QByteArray requestId;
RequestType requestType;
Sink::QueryBase query;
};
diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp
index 3927f14..72dbfd5 100644
--- a/tests/mailsynctest.cpp
+++ b/tests/mailsynctest.cpp
@@ -25,6 +25,8 @@
#include "store.h"
#include "resourcecontrol.h"
+#include "notifier.h"
+#include "notification.h"
#include "log.h"
#include "test.h"
@@ -411,10 +413,21 @@ void MailSyncTest::testFailingSync()
Sink::Query query;
query.resourceFilter(resource.identifier());
+ bool errorReceived = false;
+
+ //We currently don't get a proper error notification except for the status \
change + auto notifier = \
QSharedPointer<Sink::Notifier>::create(resource.identifier()); + \
notifier->registerHandler([&](const Notification ¬ification) { + \
SinkTrace() << "Received notification " << notification.type << notification.id; + \
if (notification.code == ApplicationDomain::ErrorStatus) { + errorReceived \
= true; + SinkWarning() << "Sync return an error";
+ }
+ });
+
+ VERIFYEXEC(Store::synchronize(query));
// Ensure sync fails if resource is misconfigured
- auto future = Store::synchronize(query).exec();
- future.waitForFinished();
- QVERIFY(future.errorCode());
+ QTRY_VERIFY(errorReceived);
}
#include "mailsynctest.moc"
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic