[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    [sink/develop] /: Added the flush command.
From:       Christian Mollekopf <chrigi_1 () fastmail ! fm>
Date:       2016-12-02 10:08:34
Message-ID: E1cCkle-0004b8-GG () code ! kde ! org
[Download RAW message or body]

Git commit 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 by Christian Mollekopf.
Committed on 25/11/2016 at 08:23.
Pushed by cmollekopf into branch 'develop'.

Added the flush command.

Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.

M  +2    -0    common/CMakeLists.txt
M  +2    -0    common/commands.cpp
M  +1    -0    common/commands.h
A  +8    -0    common/commands/flush.fbs
M  +0    -2    common/commands/synchronize.fbs
C  +17   -26   common/flush.h [from: common/notification.h - 060% similarity]
M  +57   -0    common/genericresource.cpp
M  +1    -0    common/genericresource.h
M  +7    -12   common/listener.cpp
M  +6    -4    common/notification.h
M  +6    -0    common/query.h
M  +0    -3    common/queuedcommand.fbs
M  +13   -12   common/resourceaccess.cpp
M  +7    -2    common/resourceaccess.h
M  +28   -6    common/resourcecontrol.cpp
M  +4    -0    common/resourcecontrol.h
M  +1    -17   common/store.cpp
M  +23   -0    common/synchronizer.cpp
M  +16   -5    common/synchronizer.h
M  +0    -4    tests/testimplementations.h

https://commits.kde.org/sink/22af1ed535b4afc8db3804e72bc5adb1a1b28d60

diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 5ba524b..8a16af4 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -95,6 +95,7 @@ generate_flatbuffers(
     commands/notification
     commands/revisionreplayed
     commands/inspection
+    commands/flush
     domain/event
     domain/mail
     domain/folder
@@ -130,6 +131,7 @@ install(FILES
     bufferadaptor.h
     test.h
     log.h
+    flush.h
     ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h
     DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel
 )
diff --git a/common/commands.cpp b/common/commands.cpp
index 91657b8..c0781f6 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -61,6 +61,8 @@ QByteArray name(int commandId)
             return "Inspection";
         case RemoveFromDiskCommand:
             return "RemoveFromDisk";
+        case FlushCommand:
+            return "Flush";
         case CustomCommand:
             return "Custom";
     };
diff --git a/common/commands.h b/common/commands.h
index b97bbc6..0da1b3c 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -48,6 +48,7 @@ enum CommandIds
     RevisionReplayedCommand,
     InspectionCommand,
     RemoveFromDiskCommand,
+    FlushCommand,
     CustomCommand = 0xffff
 };
 
diff --git a/common/commands/flush.fbs b/common/commands/flush.fbs
new file mode 100644
index 0000000..179f760
--- /dev/null
+++ b/common/commands/flush.fbs
@@ -0,0 +1,8 @@
+namespace Sink.Commands;
+
+table Flush {
+    id: string;
+    type: int; //See flush.h
+}
+
+root_type Flush;
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
index 62f4b2b..7b32305 100644
--- a/common/commands/synchronize.fbs
+++ b/common/commands/synchronize.fbs
@@ -1,8 +1,6 @@
 namespace Sink.Commands;
 
 table Synchronize {
-    sourceSync: bool; //Synchronize with source
-    localSync: bool; //Ensure all queues are processed so the local state is up-to \
date.  query: string;
 }
 
diff --git a/common/notification.h b/common/flush.h
similarity index 60%
copy from common/notification.h
copy to common/flush.h
index dcf00a3..3f04608 100644
--- a/common/notification.h
+++ b/common/flush.h
@@ -20,35 +20,26 @@
 #pragma once
 
 #include "sink_export.h"
-#include <QString>
-#include <QDebug>
 
 namespace Sink {
+namespace Flush {
 
-/**
- * A notification
- */
-class SINK_EXPORT Notification
-{
-public:
-    enum NoticationType {
-        Shutdown,
-        Status,
-        Warning,
-        Progress,
-        Inspection,
-        RevisionUpdate
-    };
-    enum InspectionCode {
-        Success,
-        Failure
-    };
-
-    QByteArray id;
-    int type;
-    QString message;
-    int code;
+enum FlushType {
+    /**
+     * Guarantees that any commands issued before this flush are written back to the \
server once this flush completes. +     * Note that this does not guarantee the \
success of writeback, only that an attempt has been made. +     */
+    FlushReplayQueue,
+    /**
+     * Guarantees that any synchronization request issued before  this flush has \
been executed and that all entities created by it have been processed once this flush \
completes. +     */
+    FlushSynchronization,
+    /**
+     * Guarantees that any modification issued before this flush has been processed \
once this flush completes. +     */
+    FlushUserQueue
 };
+
+}
 }
 
-QDebug SINK_EXPORT operator<<(QDebug dbg, const Sink::Notification &n);
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7c4d4ea..7b83957 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -27,6 +27,7 @@
 #include "deleteentity_generated.h"
 #include "inspection_generated.h"
 #include "notification_generated.h"
+#include "flush_generated.h"
 #include "domainadaptor.h"
 #include "commands.h"
 #include "index.h"
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject
 {
     Q_OBJECT
     typedef std::function<KAsync::Job<void>(void const *, size_t)> \
InspectionFunction; +    typedef std::function<KAsync::Job<void>(void const *, \
size_t)> FlushFunction;  SINK_DEBUG_AREA("commandprocessor")
 
 public:
@@ -75,6 +77,11 @@ public:
         mInspect = f;
     }
 
+    void setFlushCommand(const FlushFunction &f)
+    {
+        mFlush = f;
+    }
+
 signals:
     void error(int errorCode, const QString &errorMessage);
 
@@ -124,6 +131,13 @@ private slots:
                 } else {
                     return KAsync::error<qint64>(-1, "Missing inspection command.");
                 }
+            case Sink::Commands::FlushCommand:
+                if (mFlush) {
+                    return mFlush(queuedCommand->command()->Data(), \
queuedCommand->command()->size()) +                        .syncThen<qint64>([]() { \
return -1; }); +                } else {
+                    return KAsync::error<qint64>(-1, "Missing inspection command.");
+                }
             default:
                 return KAsync::error<qint64>(-1, "Unhandled command");
         }
@@ -219,6 +233,7 @@ private:
     // The lowest revision we no longer need
     qint64 mLowerBoundRevision;
     InspectionFunction mInspect;
+    FlushFunction mFlush;
 };
 
 GenericResource::GenericResource(const ResourceContext &resourceContext, const \
QSharedPointer<Pipeline> &pipeline ) @@ -266,6 +281,26 @@ \
GenericResource::GenericResource(const ResourceContext &resourceContext, const Q  }
         return KAsync::error<void>(-1, "Invalid inspection command.");
     });
+    mProcessor->setFlushCommand([this](void const *command, size_t size) {
+        flatbuffers::Verifier verifier((const uint8_t *)command, size);
+        if (Sink::Commands::VerifyFlushBuffer(verifier)) {
+            auto buffer = Sink::Commands::GetFlush(command);
+            const auto flushType = buffer->type();
+            const auto flushId = BufferUtils::extractBuffer(buffer->id());
+            if (flushType == Sink::Flush::FlushReplayQueue) {
+                SinkTrace() << "Flushing synchronizer ";
+                mSynchronizer->flush(flushType, flushId);
+            } else {
+                SinkTrace() << "Emitting flush completion" << flushId;
+                Sink::Notification n;
+                n.type = Sink::Notification::FlushCompletion;
+                n.id = flushId;
+                emit notify(n);
+            }
+            return KAsync::null<void>();
+        }
+        return KAsync::error<void>(-1, "Invalid flush command.");
+    });
     {
         auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, \
[this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });  \
Q_ASSERT(ret); @@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue \
&mq, int commandId, const QByt  
 void GenericResource::processCommand(int commandId, const QByteArray &data)
 {
+    if (commandId == Commands::FlushCommand) {
+        processFlushCommand(data);
+        return;
+    }
     static int modifications = 0;
     mUserQueue.startTransaction();
     enqueueCommand(mUserQueue, commandId, data);
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const \
QByteArray &data)  }
 }
 
+void GenericResource::processFlushCommand(const QByteArray &data)
+{
+    flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
+    if (Sink::Commands::VerifyFlushBuffer(verifier)) {
+        auto buffer = Sink::Commands::GetFlush(data.constData());
+        const auto flushType = buffer->type();
+        const auto flushId = BufferUtils::extractBuffer(buffer->id());
+        if (flushType == Sink::Flush::FlushSynchronization) {
+            mSynchronizer->flush(flushType, flushId);
+        } else {
+            mUserQueue.startTransaction();
+            enqueueCommand(mUserQueue, Commands::FlushCommand, data);
+            mUserQueue.commit();
+        }
+    }
+
+}
+
 KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase \
&query)  {
     return KAsync::start<void>([this, query] {
diff --git a/common/genericresource.h b/common/genericresource.h
index 3f92e93..9447c8b 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -47,6 +47,7 @@ public:
     virtual ~GenericResource();
 
     virtual void processCommand(int commandId, const QByteArray &data) \
Q_DECL_OVERRIDE; +    virtual void processFlushCommand(const QByteArray &data);
     virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) \
Q_DECL_OVERRIDE;  virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
     virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
diff --git a/common/listener.cpp b/common/listener.cpp
index c3c6bc2..2ab0333 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -33,7 +33,6 @@
 #include "common/synchronize_generated.h"
 #include "common/notification_generated.h"
 #include "common/revisionreplayed_generated.h"
-#include "common/inspection_generated.h"
 
 #include <QLocalServer>
 #include <QLocalSocket>
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, \
const QByteArray &c  auto timer = QSharedPointer<QTime>::create();
                 timer->start();
                 auto job = KAsync::null<void>();
-                if (buffer->sourceSync()) {
-                    Sink::QueryBase query;
-                    if (buffer->query()) {
-                        auto data = \
                QByteArray::fromStdString(buffer->query()->str());
-                        QDataStream stream(&data, QIODevice::ReadOnly);
-                        stream >> query;
-                    }
-                    job = loadResource().synchronizeWithSource(query);
-                }
-                if (buffer->localSync()) {
-                    job = job.then<void>(loadResource().processAllMessages());
+                Sink::QueryBase query;
+                if (buffer->query()) {
+                    auto data = QByteArray::fromStdString(buffer->query()->str());
+                    QDataStream stream(&data, QIODevice::ReadOnly);
+                    stream >> query;
                 }
+                job = loadResource().synchronizeWithSource(query);
                 job.then<void>([callback, timer](const KAsync::Error &error) {
                         if (error) {
                             SinkWarning() << "Sync failed: " << error.errorMessage;
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, \
const QByteArray &c  case Sink::Commands::DeleteEntityCommand:
         case Sink::Commands::ModifyEntityCommand:
         case Sink::Commands::CreateEntityCommand:
+        case Sink::Commands::FlushCommand:
             SinkTrace() << "Command id  " << messageId << " of type \"" << \
Sink::Commands::name(commandId) << "\" from " << client.name;  \
loadResource().processCommand(commandId, commandBuffer);  break;
diff --git a/common/notification.h b/common/notification.h
index dcf00a3..b1bd290 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -37,17 +37,19 @@ public:
         Warning,
         Progress,
         Inspection,
-        RevisionUpdate
+        RevisionUpdate,
+        FlushCompletion
     };
     enum InspectionCode {
-        Success,
+        Success = 0,
         Failure
     };
 
     QByteArray id;
-    int type;
+    int type = 0;
     QString message;
-    int code;
+    //A return code. Zero typically indicates success.
+    int code = 0;
 };
 }
 
diff --git a/common/query.h b/common/query.h
index 0bc5141..2adb7e9 100644
--- a/common/query.h
+++ b/common/query.h
@@ -465,6 +465,12 @@ class SyncScope : public QueryBase {
 public:
     using QueryBase::QueryBase;
 
+    SyncScope(const QueryBase &other)
+        : QueryBase(other)
+    {
+
+    }
+
     Query::Filter getResourceFilter() const
     {
         return mResourceFilter;
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs
index 06226d3..114e2cd 100644
--- a/common/queuedcommand.fbs
+++ b/common/queuedcommand.fbs
@@ -3,9 +3,6 @@ namespace Sink;
 table QueuedCommand {
     commandId: int;
     command: [ubyte];
-    // entityId: string;
-    // sourceRevision: ulong;
-    // targetRevision: [ubyte];
 }
 
 root_type QueuedCommand;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 822b5cd..b46e8b2 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -31,6 +31,7 @@
 #include "common/deleteentity_generated.h"
 #include "common/revisionreplayed_generated.h"
 #include "common/inspection_generated.h"
+#include "common/flush_generated.h"
 #include "common/entitybuffer.h"
 #include "common/bufferutils.h"
 #include "common/test.h"
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, \
flatbuffers::FlatBu  });
 }
 
-KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool \
                localSync)
-{
-    SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
-    flatbuffers::FlatBufferBuilder fbb;
-    auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
-    Sink::Commands::FinishSynchronizeBuffer(fbb, command);
-    open();
-    return sendCommand(Commands::SynchronizeCommand, fbb);
-}
-
 KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query)
 {
     flatbuffers::FlatBufferBuilder fbb;
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const \
Sink::QueryBase &que  }
     auto q = fbb.CreateString(queryString.toStdString());
     auto builder = Sink::Commands::SynchronizeBuilder(fbb);
-    builder.add_sourceSync(true);
-    builder.add_localSync(false);
     builder.add_query(q);
     Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
 
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const \
QByteArray &insp  return sendCommand(Sink::Commands::InspectionCommand, fbb);
 }
 
+KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray \
&flushId) +{
+    flatbuffers::FlatBufferBuilder fbb;
+    auto id = fbb.CreateString(flushId.toStdString());
+    auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
+    Sink::Commands::FinishFlushBuffer(fbb, location);
+    open();
+    return sendCommand(Sink::Commands::FlushCommand, fbb);
+}
+
 void ResourceAccess::open()
 {
     if (d->socket && d->socket->isValid()) {
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer()
                     [[clang::fallthrough]];
                 case Sink::Notification::Warning:
                     [[clang::fallthrough]];
+                case Sink::Notification::FlushCompletion:
+                    [[clang::fallthrough]];
                 case Sink::Notification::Progress: {
                     auto n = getNotification(buffer);
                     SinkTrace() << "Received notification: " << n.type;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 755c8a7..4229161 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -29,6 +29,7 @@
 
 #include <flatbuffers/flatbuffers.h>
 #include "notification.h"
+#include "flush.h"
 #include "query.h"
 #include "log.h"
 
@@ -50,7 +51,6 @@ public:
     }
     virtual KAsync::Job<void> sendCommand(int commandId) = 0;
     virtual KAsync::Job<void> sendCommand(int commandId, \
                flatbuffers::FlatBufferBuilder &fbb) = 0;
-    virtual KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) = \
                0;
     virtual KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) = \
0;  
     virtual KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const \
QByteArray &resourceBufferType, const QByteArray &buffer) @@ -75,6 +75,11 @@ public:
         return KAsync::null<void>();
     };
 
+    virtual KAsync::Job<void> sendFlushCommand(int flushType, const QByteArray \
&flushId) +    {
+        return KAsync::null<void>();
+    }
+
     int getResourceStatus() const
     {
         return mResourceStatus;
@@ -108,7 +113,6 @@ public:
 
     KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE;
     KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder \
                &fbb) Q_DECL_OVERRIDE;
-    KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) \
                Q_DECL_OVERRIDE;
     KAsync::Job<void> synchronizeResource(const Sink::QueryBase &filter) \
                Q_DECL_OVERRIDE;
     KAsync::Job<void> sendCreateCommand(const QByteArray &uid, const QByteArray \
&resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE;  KAsync::Job<void>
@@ -117,6 +121,7 @@ public:
     KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE;
     KAsync::Job<void>
     sendInspectionCommand(int inspectionType,const QByteArray &inspectionId, const \
QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const \
QVariant &expecedValue) Q_DECL_OVERRIDE; +    KAsync::Job<void> sendFlushCommand(int \
flushType, const QByteArray &flushId) Q_DECL_OVERRIDE;  /**
      * Tries to connect to server, and returns a connected socket on success.
      */
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 3568844..af98b8b 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const \
QByteArrayList &resou  SinkTrace() << "flushMessageQueue" << resourceIdentifier;
     return KAsync::value(resourceIdentifier)
         .template each([](const QByteArray &resource) {
-            SinkTrace() << "Flushing message queue " << resource;
-            auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resource, \
                ResourceConfig::getResourceType(resource));
-            resourceAccess->open();
-            return resourceAccess->synchronizeResource(false, true)
-                .addToContext(resourceAccess);
+            return flushMessageQueue(resource);
         });
 }
 
 KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray \
&resourceIdentifier)  {
-    return flushMessageQueue(QByteArrayList() << resourceIdentifier);
+    return flush(Flush::FlushUserQueue, \
resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); +}
+
+KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray \
&resourceIdentifier) +{
+    auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resourceIdentifier, \
ResourceConfig::getResourceType(resourceIdentifier)); +    auto notifier = \
QSharedPointer<Sink::Notifier>::create(resourceAccess); +    auto id = \
QUuid::createUuid().toByteArray(); +    return \
KAsync::start<void>([=](KAsync::Future<void> &future) { +            SinkTrace() << \
"Waiting for notification notification " << id; +            \
notifier->registerHandler([&future, id](const Notification &notification) { +         \
SinkTrace() << "Received notification " << notification.type << notification.id; +    \
if (notification.id == id) { +                    SinkTrace() << "FlushComplete";
+                    if (notification.code) {
+                        SinkWarning() << "Flush return an error";
+                        future.setError(-1, "Flush returned an error: " + \
notification.message); +                    } else {
+                        future.setFinished();
+                    }
+                }
+            });
+            resourceAccess->sendFlushCommand(type, id).onError([&future] (const \
KAsync::Error &error) { +                SinkWarning() << "Failed to send command";
+                future.setError(1, "Failed to send command: " + error.errorMessage);
+            }).exec();
+        });
 }
 
 KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList \
                &resourceIdentifier)
diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h
index 9e603e4..b910441 100644
--- a/common/resourcecontrol.h
+++ b/common/resourcecontrol.h
@@ -26,6 +26,7 @@
 #include <Async/Async>
 
 #include "inspection.h"
+#include "flush.h"
 
 namespace Sink {
 namespace ResourceControl {
@@ -58,5 +59,8 @@ KAsync::Job<void> SINK_EXPORT flushMessageQueue(const QByteArray \
                &resourceIdenti
  */
 KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList \
&resourceIdentifier);  KAsync::Job<void> SINK_EXPORT flushReplayQueue(const \
QByteArray &resourceIdentifier); +
+KAsync::Job<void> SINK_EXPORT flush(Flush::FlushType, const QByteArray \
&resourceIdentifier); +
 }
 }
diff --git a/common/store.cpp b/common/store.cpp
index 6aae00f..8b8de1f 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -255,23 +255,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray \
&identifier)  
 KAsync::Job<void> Store::synchronize(const Sink::Query &query)
 {
-    auto resources = getResources(query.getResourceFilter()).keys();
-    SinkTrace() << "synchronize" << resources;
-    return KAsync::value(resources)
-        .template each([query](const QByteArray &resource) {
-            SinkTrace() << "Synchronizing " << resource;
-            auto resourceAccess = \
ResourceAccessFactory::instance().getAccess(resource, \
                ResourceConfig::getResourceType(resource));
-            return resourceAccess->synchronizeResource(true, false)
-                .addToContext(resourceAccess)
-                .then<void>([](const KAsync::Error &error) {
-                        if (error) {
-                            SinkWarning() << "Error during sync.";
-                            return KAsync::error<void>(error);
-                        }
-                        SinkTrace() << "synced.";
-                        return KAsync::null<void>();
-                    });
-        });
+    return synchronize(Sink::SyncScope{static_cast<Sink::QueryBase>(query)});
 }
 
 KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope)
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 5bde597..f7dd816 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -27,6 +27,8 @@
 #include "createentity_generated.h"
 #include "modifyentity_generated.h"
 #include "deleteentity_generated.h"
+#include "flush_generated.h"
+#include "notification_generated.h"
 
 SINK_DEBUG_AREA("synchronizer")
 
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const \
Sink::QueryBase &query)  return processSyncQueue();
 }
 
+void Synchronizer::flush(int commandId, const QByteArray &flushId)
+{
+    SinkTrace() << "Flushing the synchronization queue";
+    mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, \
commandId, flushId}; +    processSyncQueue().exec();
+}
+
 KAsync::Job<void> Synchronizer::processSyncQueue()
 {
     if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
                 //Commit after every request, so implementations only have to commit \
more if they add a lot of data.  commit();
             });
+        } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
+            if (request.flushType == Flush::FlushReplayQueue) {
+                SinkTrace() << "Emitting flush completion.";
+                Sink::Notification n;
+                n.type = Sink::Notification::FlushCompletion;
+                n.id = request.flushId;
+                emit notify(n);
+            } else {
+                flatbuffers::FlatBufferBuilder fbb;
+                auto flushId = fbb.CreateString(request.flushId);
+                auto location = Sink::Commands::CreateFlush(fbb, flushId, \
static_cast<int>(Sink::Flush::FlushSynchronization)); +                \
Sink::Commands::FinishFlushBuffer(fbb, location); +                \
enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); +      \
}  } else {
             job = replayNextRevision();
         }
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 4d5bdd5..99d4877 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -45,6 +45,7 @@ public:
 
     void setup(const std::function<void(int commandId, const QByteArray &data)> \
&enqueueCommandCallback, MessageQueue &messageQueue);  KAsync::Job<void> \
synchronize(const Sink::QueryBase &query); +    void flush(int commandId, const \
QByteArray &flushId);  
     //Read only access to main storage
     Storage::EntityStore &store();
@@ -57,6 +58,9 @@ public:
 
     bool allChangesReplayed() Q_DECL_OVERRIDE;
 
+signals:
+    void notify(Notification);
+
 public slots:
     virtual void revisionChanged() Q_DECL_OVERRIDE;
 
@@ -115,23 +119,30 @@ protected:
     struct SyncRequest {
         enum RequestType {
             Synchronization,
-            ChangeReplay
+            ChangeReplay,
+            Flush
         };
 
         SyncRequest(const Sink::QueryBase &q)
-            : flushQueue(false),
-            requestType(Synchronization),
+            : requestType(Synchronization),
             query(q)
         {
         }
 
         SyncRequest(RequestType type)
-            : flushQueue(false),
+            : requestType(type)
+        {
+        }
+
+        SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_)
+            : flushType(flushType_),
+            flushId(flushId_),
             requestType(type)
         {
         }
 
-        bool flushQueue;
+        int flushType = 0;
+        QByteArray flushId;
         RequestType requestType;
         Sink::QueryBase query;
     };
diff --git a/tests/testimplementations.h b/tests/testimplementations.h
index 111c884..6fe08f7 100644
--- a/tests/testimplementations.h
+++ b/tests/testimplementations.h
@@ -66,10 +66,6 @@ public:
     {
         return KAsync::null<void>();
     }
-    KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) \
                Q_DECL_OVERRIDE
-    {
-        return KAsync::null<void>();
-    }
     KAsync::Job<void> synchronizeResource(const Sink::QueryBase &) Q_DECL_OVERRIDE
     {
         return KAsync::null<void>();


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic