[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: [sink/develop] /: Added the flush command.
From: Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date: 2016-12-02 10:08:34
Message-ID: E1cCkle-0004b8-GG () code ! kde ! org
[Download RAW message or body]
Git commit 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 by Christian Mollekopf.
Committed on 25/11/2016 at 08:23.
Pushed by cmollekopf into branch 'develop'.
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
M +2 -0 common/CMakeLists.txt
M +2 -0 common/commands.cpp
M +1 -0 common/commands.h
A +8 -0 common/commands/flush.fbs
M +0 -2 common/commands/synchronize.fbs
C +17 -26 common/flush.h [from: common/notification.h - 060% similarity]
M +57 -0 common/genericresource.cpp
M +1 -0 common/genericresource.h
M +7 -12 common/listener.cpp
M +6 -4 common/notification.h
M +6 -0 common/query.h
M +0 -3 common/queuedcommand.fbs
M +13 -12 common/resourceaccess.cpp
M +7 -2 common/resourceaccess.h
M +28 -6 common/resourcecontrol.cpp
M +4 -0 common/resourcecontrol.h
M +1 -17 common/store.cpp
M +23 -0 common/synchronizer.cpp
M +16 -5 common/synchronizer.h
M +0 -4 tests/testimplementations.h
https://commits.kde.org/sink/22af1ed535b4afc8db3804e72bc5adb1a1b28d60
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 5ba524b..8a16af4 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -95,6 +95,7 @@ generate_flatbuffers(
commands/notification
commands/revisionreplayed
commands/inspection
+ commands/flush
domain/event
domain/mail
domain/folder
@@ -130,6 +131,7 @@ install(FILES
bufferadaptor.h
test.h
log.h
+ flush.h
${CMAKE_CURRENT_BINARY_DIR}/sink_export.h
DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel
)
diff --git a/common/commands.cpp b/common/commands.cpp
index 91657b8..c0781f6 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -61,6 +61,8 @@ QByteArray name(int commandId)
return "Inspection";
case RemoveFromDiskCommand:
return "RemoveFromDisk";
+ case FlushCommand:
+ return "Flush";
case CustomCommand:
return "Custom";
};
diff --git a/common/commands.h b/common/commands.h
index b97bbc6..0da1b3c 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -48,6 +48,7 @@ enum CommandIds
RevisionReplayedCommand,
InspectionCommand,
RemoveFromDiskCommand,
+ FlushCommand,
CustomCommand = 0xffff
};
diff --git a/common/commands/flush.fbs b/common/commands/flush.fbs
new file mode 100644
index 0000000..179f760
--- /dev/null
+++ b/common/commands/flush.fbs
@@ -0,0 +1,8 @@
+namespace Sink.Commands;
+
+table Flush {
+ id: string;
+ type: int; //See flush.h
+}
+
+root_type Flush;
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
index 62f4b2b..7b32305 100644
--- a/common/commands/synchronize.fbs
+++ b/common/commands/synchronize.fbs
@@ -1,8 +1,6 @@
namespace Sink.Commands;
table Synchronize {
- sourceSync: bool; //Synchronize with source
- localSync: bool; //Ensure all queues are processed so the local state is up-to \
date. query: string;
}
diff --git a/common/notification.h b/common/flush.h
similarity index 60%
copy from common/notification.h
copy to common/flush.h
index dcf00a3..3f04608 100644
--- a/common/notification.h
+++ b/common/flush.h
@@ -20,35 +20,26 @@
#pragma once
#include "sink_export.h"
-#include <QString>
-#include <QDebug>
namespace Sink {
+namespace Flush {
-/**
- * A notification
- */
-class SINK_EXPORT Notification
-{
-public:
- enum NoticationType {
- Shutdown,
- Status,
- Warning,
- Progress,
- Inspection,
- RevisionUpdate
- };
- enum InspectionCode {
- Success,
- Failure
- };
-
- QByteArray id;
- int type;
- QString message;
- int code;
+enum FlushType {
+ /**
+ * Guarantees that any commands issued before this flush are written back to the \
server once this flush completes. + * Note that this does not guarantee the \
success of writeback, only that an attempt has been made. + */
+ FlushReplayQueue,
+ /**
+ * Guarantees that any synchronization request issued before this flush has \
been executed and that all entities created by it have been processed once this flush \
completes. + */
+ FlushSynchronization,
+ /**
+ * Guarantees that any modification issued before this flush has been processed \
once this flush completes. + */
+ FlushUserQueue
};
+
+}
}
-QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::Notification &n);
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7c4d4ea..7b83957 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -27,6 +27,7 @@
#include "deleteentity_generated.h"
#include "inspection_generated.h"
#include "notification_generated.h"
+#include "flush_generated.h"
#include "domainadaptor.h"
#include "commands.h"
#include "index.h"
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject
{
Q_OBJECT
typedef std::function<KAsync::Job<void>(void const *, size_t)> \
InspectionFunction; + typedef std::function<KAsync::Job<void>(void const *, \
size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor")
public:
@@ -75,6 +77,11 @@ public:
mInspect = f;
}
+ void setFlushCommand(const FlushFunction &f)
+ {
+ mFlush = f;
+ }
+
signals:
void error(int errorCode, const QString &errorMessage);
@@ -124,6 +131,13 @@ private slots:
} else {
return KAsync::error<qint64>(-1, "Missing inspection command.");
}
+ case Sink::Commands::FlushCommand:
+ if (mFlush) {
+ return mFlush(queuedCommand->command()->Data(), \
queuedCommand->command()->size()) + .syncThen<qint64>([]() { \
return -1; }); + } else {
+ return KAsync::error<qint64>(-1, "Missing inspection command.");
+ }
default:
return KAsync::error<qint64>(-1, "Unhandled command");
}
@@ -219,6 +233,7 @@ private:
// The lowest revision we no longer need
qint64 mLowerBoundRevision;
InspectionFunction mInspect;
+ FlushFunction mFlush;
};
GenericResource::GenericResource(const ResourceContext &resourceContext, const \
QSharedPointer<Pipeline> &pipeline ) @@ -266,6 +281,26 @@ \
GenericResource::GenericResource(const ResourceContext &resourceContext, const Q }
return KAsync::error<void>(-1, "Invalid inspection command.");
});
+ mProcessor->setFlushCommand([this](void const *command, size_t size) {
+ flatbuffers::Verifier verifier((const uint8_t *)command, size);
+ if (Sink::Commands::VerifyFlushBuffer(verifier)) {
+ auto buffer = Sink::Commands::GetFlush(command);
+ const auto flushType = buffer->type();
+ const auto flushId = BufferUtils::extractBuffer(buffer->id());
+ if (flushType == Sink::Flush::FlushReplayQueue) {
+ SinkTrace() << "Flushing synchronizer ";
+ mSynchronizer->flush(flushType, flushId);
+ } else {
+ SinkTrace() << "Emitting flush completion" << flushId;
+ Sink::Notification n;
+ n.type = Sink::Notification::FlushCompletion;
+ n.id = flushId;
+ emit notify(n);
+ }
+ return KAsync::null<void>();
+ }
+ return KAsync::error<void>(-1, "Invalid flush command.");
+ });
{
auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, \
[this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); \
Q_ASSERT(ret); @@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue \
&mq, int commandId, const QByt
void GenericResource::processCommand(int commandId, const QByteArray &data)
{
+ if (commandId == Commands::FlushCommand) {
+ processFlushCommand(data);
+ return;
+ }
static int modifications = 0;
mUserQueue.startTransaction();
enqueueCommand(mUserQueue, commandId, data);
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const \
QByteArray &data) }
}
+void GenericResource::processFlushCommand(const QByteArray &data)
+{
+ flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
+ if (Sink::Commands::VerifyFlushBuffer(verifier)) {
+ auto buffer = Sink::Commands::GetFlush(data.constData());
+ const auto flushType = buffer->type();
+ const auto flushId = BufferUtils::extractBuffer(buffer->id());
+ if (flushType == Sink::Flush::FlushSynchronization) {
+ mSynchronizer->flush(flushType, flushId);
+ } else {
+ mUserQueue.startTransaction();
+ enqueueCommand(mUserQueue, Commands::FlushCommand, data);
+ mUserQueue.commit();
+ }
+ }
+
+}
+
KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase \
&query) {
return KAsync::start<void>([this, query] {
diff --git a/common/genericresource.h b/common/genericresource.h
index 3f92e93..9447c8b 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -47,6 +47,7 @@ public:
virtual ~GenericResource();
virtual void processCommand(int commandId, const QByteArray &data) \
Q_DECL_OVERRIDE; + virtual void processFlushCommand(const QByteArray &data);
virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) \
Q_DECL_OVERRIDE; virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
diff --git a/common/listener.cpp b/common/listener.cpp
index c3c6bc2..2ab0333 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -33,7 +33,6 @@
#include "common/synchronize_generated.h"
#include "common/notification_generated.h"
#include "common/revisionreplayed_generated.h"
-#include "common/inspection_generated.h"
#include <QLocalServer>
#include <QLocalSocket>
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, \
const QByteArray &c auto timer = QSharedPointer<QTime>::create();
timer->start();
auto job = KAsync::null<void>();
- if (buffer->sourceSync()) {
- Sink::QueryBase query;
- if (buffer->query()) {
- auto data = \
QByteArray::fromStdString(buffer->query()->str());
- QDataStream stream(&data, QIODevice::ReadOnly);
- stream >> query;
- }
- job = loadResource().synchronizeWithSource(query);
- }
- if (buffer->localSync()) {
- job = job.then<void>(loadResource().processAllMessages());
+ Sink::QueryBase query;
+ if (buffer->query()) {
+ auto data = QByteArray::fromStdString(buffer->query()->str());
+ QDataStream stream(&data, QIODevice::ReadOnly);
+ stream >> query;
}
+ job = loadResource().synchronizeWithSource(query);
job.then<void>([callback, timer](const KAsync::Error &error) {
if (error) {
SinkWarning() << "Sync failed: " << error.errorMessage;
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, \
const QByteArray &c case Sink::Commands::DeleteEntityCommand:
case Sink::Commands::ModifyEntityCommand:
case Sink::Commands::CreateEntityCommand:
+ case Sink::Commands::FlushCommand:
SinkTrace() << "Command id " << messageId << " of type \"" << \
Sink::Commands::name(commandId) << "\" from " << client.name; \
loadResource().processCommand(commandId, commandBuffer); break;
diff --git a/common/notification.h b/common/notification.h
index dcf00a3..b1bd290 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -37,17 +37,19 @@ public:
Warning,
Progress,
Inspection,
- RevisionUpdate
+ RevisionUpdate,
+ FlushCompletion
};
enum InspectionCode {
- Success,
+ Success = 0,
Failure
};
QByteArray id;
- int type;
+ int type = 0;
QString message;
- int code;
+ //A return code. Zero typically indicates success.
+ int code = 0;
};
}
diff --git a/common/query.h b/common/query.h
index 0bc5141..2adb7e9 100644
--- a/common/query.h
+++ b/common/query.h
@@ -465,6 +465,12 @@ class SyncScope : public QueryBase {
public:
using QueryBase::QueryBase;
+ SyncScope(const QueryBase &other)
+ : QueryBase(other)
+ {
+
+ }
+
Query::Filter getResourceFilter() const
{
return mResourceFilter;
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs
index 06226d3..114e2cd 100644
--- a/common/queuedcommand.fbs
+++ b/common/queuedcommand.fbs
@@ -3,9 +3,6 @@ namespace Sink;
table QueuedCommand {
commandId: int;
command: [ubyte];
- // entityId: string;
- // sourceRevision: ulong;
- // targetRevision: [ubyte];
}
root_type QueuedCommand;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 822b5cd..b46e8b2 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -31,6 +31,7 @@
#include "common/deleteentity_generated.h"
#include "common/revisionreplayed_generated.h"
#include "common/inspection_generated.h"
+#include "common/flush_generated.h"
#include "common/entitybuffer.h"
#include "common/bufferutils.h"
#include "common/test.h"
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, \
flatbuffers::FlatBu });
}
-KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool \
localSync)
-{
- SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
- flatbuffers::FlatBufferBuilder fbb;
- auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
- Sink::Commands::FinishSynchronizeBuffer(fbb, command);
- open();
- return sendCommand(Commands::SynchronizeCommand, fbb);
-}
-
KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query)
{
flatbuffers::FlatBufferBuilder fbb;
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const \
Sink::QueryBase &que }
auto q = fbb.CreateString(queryString.toStdString());
auto builder = Sink::Commands::SynchronizeBuilder(fbb);
- builder.add_sourceSync(true);
- builder.add_localSync(false);
builder.add_query(q);
Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const \
QByteArray &insp return sendCommand(Sink::Commands::InspectionCommand, fbb);
}
+KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray \
&flushId) +{
+ flatbuffers::FlatBufferBuilder fbb;
+ auto id = fbb.CreateString(flushId.toStdString());
+ auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
+ Sink::Commands::FinishFlushBuffer(fbb, location);
+ open();
+ return sendCommand(Sink::Commands::FlushCommand, fbb);
+}
+
void ResourceAccess::open()
{
if (d->socket && d->socket->isValid()) {
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer()
[[clang::fallthrough]];
case Sink::Notification::Warning:
[[clang::fallthrough]];
+ case Sink::Notification::FlushCompletion:
+ [[clang::fallthrough]];
case Sink::Notification::Progress: {
auto n = getNotification(buffer);
SinkTrace() << "Received notification: " << n.type;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 755c8a7..4229161 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -29,6 +29,7 @@
#include <flatbuffers/flatbuffers.h>
#include "notification.h"
+#include "flush.h"
#include "query.h"
#include "log.h"
@@ -50,7 +51,6 @@ public:
}
virtual KAsync::Job<void> sendCommand(int commandId) = 0;
virtual KAsync::Job<void> sendCommand(int commandId, \
flatbuffers::FlatBufferBuilder &fbb) = 0;
- virtual KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) = \
0;
virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = \
0;
virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const \
QByteArray &resourceBufferType, const QByteArray &buffer) @@ -75,6 +75,11 @@ public:
return KAsync::null<void>();
};
+ virtual KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray \
&flushId) + {
+ return KAsync::null<void>();
+ }
+
int getResourceStatus() const
{
return mResourceStatus;
@@ -108,7 +113,6 @@ public:
KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE;
KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder \
&fbb) Q_DECL_OVERRIDE;
- KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) \
Q_DECL_OVERRIDE;
KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) \
Q_DECL_OVERRIDE;
KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray \
&resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; KAsync::Job<void>
@@ -117,6 +121,7 @@ public:
KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE;
KAsync::Job<void>
sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const \
QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const \
QVariant &expecedValue) Q_DECL_OVERRIDE; + KAsync::Job<void> sendFlushCommand(int \
flushType, const QByteArray &flushId) Q_DECL_OVERRIDE; /**
* Tries to connect to server, and returns a connected socket on success.
*/
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 3568844..af98b8b 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const \
QByteArrayList &resou SinkTrace() << "flushMessageQueue" << resourceIdentifier;
return KAsync::value(resourceIdentifier)
.template each([](const QByteArray &resource) {
- SinkTrace() << "Flushing message queue " << resource;
- auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resource, \
ResourceConfig::getResourceType(resource));
- resourceAccess->open();
- return resourceAccess->synchronizeResource(false, true)
- .addToContext(resourceAccess);
+ return flushMessageQueue(resource);
});
}
KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray \
&resourceIdentifier) {
- return flushMessageQueue(QByteArrayList() << resourceIdentifier);
+ return flush(Flush::FlushUserQueue, \
resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); +}
+
+KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray \
&resourceIdentifier) +{
+ auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resourceIdentifier, \
ResourceConfig::getResourceType(resourceIdentifier)); + auto notifier = \
QSharedPointer<Sink::Notifier>::create(resourceAccess); + auto id = \
QUuid::createUuid().toByteArray(); + return \
KAsync::start<void>([=](KAsync::Future<void> &future) { + SinkTrace() << \
"Waiting for notification notification " << id; + \
notifier->registerHandler([&future, id](const Notification ¬ification) { + \
SinkTrace() << "Received notification " << notification.type << notification.id; + \
if (notification.id == id) { + SinkTrace() << "FlushComplete";
+ if (notification.code) {
+ SinkWarning() << "Flush return an error";
+ future.setError(-1, "Flush returned an error: " + \
notification.message); + } else {
+ future.setFinished();
+ }
+ }
+ });
+ resourceAccess->sendFlushCommand(type, id).onError([&future] (const \
KAsync::Error &error) { + SinkWarning() << "Failed to send command";
+ future.setError(1, "Failed to send command: " + error.errorMessage);
+ }).exec();
+ });
}
KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList \
&resourceIdentifier)
diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h
index 9e603e4..b910441 100644
--- a/common/resourcecontrol.h
+++ b/common/resourcecontrol.h
@@ -26,6 +26,7 @@
#include <Async/Async>
#include "inspection.h"
+#include "flush.h"
namespace Sink {
namespace ResourceControl {
@@ -58,5 +59,8 @@ KAsync::Job<void> SINK_EXPORT flushMessageQueue(const QByteArray \
&resourceIdenti
*/
KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList \
&resourceIdentifier); KAsync::Job<void> SINK_EXPORT flushReplayQueue(const \
QByteArray &resourceIdentifier); +
+KAsync::Job<void> SINK_EXPORT flush(Flush::FlushType, const QByteArray \
&resourceIdentifier); +
}
}
diff --git a/common/store.cpp b/common/store.cpp
index 6aae00f..8b8de1f 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -255,23 +255,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray \
&identifier)
KAsync::Job<void> Store::synchronize(const Sink::Query &query)
{
- auto resources = getResources(query.getResourceFilter()).keys();
- SinkTrace() << "synchronize" << resources;
- return KAsync::value(resources)
- .template each([query](const QByteArray &resource) {
- SinkTrace() << "Synchronizing " << resource;
- auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resource, \
ResourceConfig::getResourceType(resource));
- return resourceAccess->synchronizeResource(true, false)
- .addToContext(resourceAccess)
- .then<void>([](const KAsync::Error &error) {
- if (error) {
- SinkWarning() << "Error during sync.";
- return KAsync::error<void>(error);
- }
- SinkTrace() << "synced.";
- return KAsync::null<void>();
- });
- });
+ return synchronize(Sink::SyncScope{static_cast<Sink::QueryBase>(query)});
}
KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope)
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 5bde597..f7dd816 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -27,6 +27,8 @@
#include "createentity_generated.h"
#include "modifyentity_generated.h"
#include "deleteentity_generated.h"
+#include "flush_generated.h"
+#include "notification_generated.h"
SINK_DEBUG_AREA("synchronizer")
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const \
Sink::QueryBase &query) return processSyncQueue();
}
+void Synchronizer::flush(int commandId, const QByteArray &flushId)
+{
+ SinkTrace() << "Flushing the synchronization queue";
+ mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, \
commandId, flushId}; + processSyncQueue().exec();
+}
+
KAsync::Job<void> Synchronizer::processSyncQueue()
{
if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
//Commit after every request, so implementations only have to commit \
more if they add a lot of data. commit();
});
+ } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
+ if (request.flushType == Flush::FlushReplayQueue) {
+ SinkTrace() << "Emitting flush completion.";
+ Sink::Notification n;
+ n.type = Sink::Notification::FlushCompletion;
+ n.id = request.flushId;
+ emit notify(n);
+ } else {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto flushId = fbb.CreateString(request.flushId);
+ auto location = Sink::Commands::CreateFlush(fbb, flushId, \
static_cast<int>(Sink::Flush::FlushSynchronization)); + \
Sink::Commands::FinishFlushBuffer(fbb, location); + \
enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + \
} } else {
job = replayNextRevision();
}
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 4d5bdd5..99d4877 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -45,6 +45,7 @@ public:
void setup(const std::function<void(int commandId, const QByteArray &data)> \
&enqueueCommandCallback, MessageQueue &messageQueue); KAsync::Job<void> \
synchronize(const Sink::QueryBase &query); + void flush(int commandId, const \
QByteArray &flushId);
//Read only access to main storage
Storage::EntityStore &store();
@@ -57,6 +58,9 @@ public:
bool allChangesReplayed() Q_DECL_OVERRIDE;
+signals:
+ void notify(Notification);
+
public slots:
virtual void revisionChanged() Q_DECL_OVERRIDE;
@@ -115,23 +119,30 @@ protected:
struct SyncRequest {
enum RequestType {
Synchronization,
- ChangeReplay
+ ChangeReplay,
+ Flush
};
SyncRequest(const Sink::QueryBase &q)
- : flushQueue(false),
- requestType(Synchronization),
+ : requestType(Synchronization),
query(q)
{
}
SyncRequest(RequestType type)
- : flushQueue(false),
+ : requestType(type)
+ {
+ }
+
+ SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_)
+ : flushType(flushType_),
+ flushId(flushId_),
requestType(type)
{
}
- bool flushQueue;
+ int flushType = 0;
+ QByteArray flushId;
RequestType requestType;
Sink::QueryBase query;
};
diff --git a/tests/testimplementations.h b/tests/testimplementations.h
index 111c884..6fe08f7 100644
--- a/tests/testimplementations.h
+++ b/tests/testimplementations.h
@@ -66,10 +66,6 @@ public:
{
return KAsync::null<void>();
}
- KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) \
Q_DECL_OVERRIDE
- {
- return KAsync::null<void>();
- }
KAsync::Job<void> synchronizeResource(const Sink::QueryBase &) Q_DECL_OVERRIDE
{
return KAsync::null<void>();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic