[prev in list] [next in list] [prev in thread] [next in thread]
List: mesos-commits
Subject: [1/3] mesos git commit: Updated slave to send total amount of oversubscribed resources.
From: vinodkone () apache ! org
Date: 2015-05-29 1:02:45
Message-ID: 4e302ed1e9294260bec62c72e46c17e5 () git ! apache ! org
[Download RAW message or body]
Repository: mesos
Updated Branches:
refs/heads/master 5c9529777 -> fbf5c7e70
Updated slave to send total amount of oversubscribed resources.
Review: https://reviews.apache.org/r/34729
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0df7bb09
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0df7bb09
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0df7bb09
Branch: refs/heads/master
Commit: 0df7bb09894235cac0dbf1dfdb0a23d2799d62e9
Parents: 5c95297
Author: Vinod Kone <vinodkone@gmail.com>
Authored: Wed May 20 19:10:52 2015 -0700
Committer: Vinod Kone <vinodkone@gmail.com>
Committed: Thu May 28 17:11:01 2015 -0700
----------------------------------------------------------------------
src/messages/messages.proto | 7 +++--
src/slave/flags.cpp | 8 +++---
src/slave/flags.hpp | 2 +-
src/slave/slave.cpp | 46 +++++++++++++++++++++----------
src/slave/slave.hpp | 8 ++++--
src/tests/oversubscription_tests.cpp | 14 +++++-----
6 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 39dac72..1c8d79e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -334,10 +334,11 @@ message CheckpointResourcesMessage {
// This message is sent by the slave to the master to inform the
-// master about the currently oversubscribable resources.
-message OversubscribeResourcesMessage {
+// master about the total amount of oversubscribed (allocated and
+// allocatable) resources.
+message UpdateSlaveMessage {
required SlaveID slave_id = 1;
- repeated Resource resources = 2;
+ repeated Resource oversubscribed_resources = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index a8c7c49..6b7c61e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -467,10 +467,10 @@ mesos::internal::slave::Flags::Flags()
"resource_estimator",
"The name of the resource estimator to use for oversubscription.");
- add(&Flags::oversubscribe_resources_interval,
- "oversubscribe_resources_interval",
+ add(&Flags::oversubscribed_resources_interval,
+ "oversubscribed_resources_interval",
"The slave periodically updates the master with the current estimation\n"
- "about the maximum amount of resources that can be oversubscribed. The\n"
- "interval between updates is controlled by this flag.",
+ "about the total amount of oversubscribed resources that are allocated\n"
+ "and available. The interval between updates is controlled by this flag.",
Seconds(15));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6ca59dc..944ed79 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -101,7 +101,7 @@ public:
std::string authenticatee;
Option<std::string> hooks;
Option<std::string> resource_estimator;
- Duration oversubscribe_resources_interval;
+ Duration oversubscribed_resources_interval;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b4d2029..fdaaea4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3982,7 +3982,7 @@ void Slave::__recover(const Future<Nothing>& future)
// forward the estimations to the master.
resourceEstimator->oversubscribable()
.onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
- .onAny(defer(self(), &Self::forwardOversubscribableResources));
+ .onAny(defer(self(), &Self::forwardOversubscribedResources));
// Start detecting masters.
detection = detector->detect()
@@ -4090,34 +4090,50 @@ void Slave::updateOversubscribableResources(const Future<Resources>& future)
}
-void Slave::forwardOversubscribableResources()
+void Slave::forwardOversubscribedResources()
{
if (state != RUNNING) {
- delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
+ delay(Seconds(1), self(), &Self::forwardOversubscribedResources);
return;
}
- // We only forward updates after the first estimation is received.
- if (oversubscribableResources.isNone()) {
- delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
- return;
+ // Calculate the latest allocation of oversubscribed resources.
+ // Note that this allocation value might be different from the
+ // master's view because new task/executor might be in flight from
+ // the master or pending on the slave etc. This is ok because the
+ // allocator only considers the slave's view of allocation when
+ // calculating the available oversubscribed resources to offer.
+ Resources oversubscribed;
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ oversubscribed += executor->resources.revocable();
+ }
}
- CHECK_SOME(master);
- CHECK_SOME(oversubscribableResources);
+ // Add oversubscribable resources to the total.
+ oversubscribed += oversubscribableResources;
- LOG(INFO) << "Forwarding oversubscribable resources "
- << oversubscribableResources.get();
+ if (oversubscribed == oversubscribedResources) {
+ VLOG(1) << "Not forwarding total oversubscribed resources because the"
+ << " previous estimate " << oversubscribed << " hasn't changed";
+ return;
+ }
+
+ LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
- OversubscribeResourcesMessage message;
+ UpdateSlaveMessage message;
message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resources()->CopyFrom(oversubscribableResources.get());
+ message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+ CHECK_SOME(master);
send(master.get(), message);
- delay(flags.oversubscribe_resources_interval,
+ delay(flags.oversubscribed_resources_interval,
self(),
- &Self::forwardOversubscribableResources);
+ &Self::forwardOversubscribedResources);
+
+ // Update the estimate.
+ oversubscribedResources = oversubscribed;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0207eaf..245ea06 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -434,7 +434,7 @@ private:
const Executor* executor);
void updateOversubscribableResources(const Future<Resources>& future);
- void forwardOversubscribableResources();
+ void forwardOversubscribedResources();
const Flags flags;
@@ -510,7 +510,11 @@ private:
// The most recent estimation about the maximum amount of resources
// that can be oversubscribed on the slave.
- Option<Resources> oversubscribableResources;
+ Resources oversubscribableResources;
+
+ // The total amount of oversubscribed (allocated and
+ // oversubscribable) resources.
+ Resources oversubscribedResources;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 75c25b0..36a6793 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -48,8 +48,8 @@ class OversubscriptionSlaveTest : public MesosTest {};
// This test verifies that slave will forward the estimation of the
-// oversubscribable resources to the master.
-TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
+// oversubscribed resources to the master.
+TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
{
Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -66,13 +66,13 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
AWAIT_READY(slaveRegistered);
- Future<OversubscribeResourcesMessage> update =
- FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _);
+ Future<UpdateSlaveMessage> update =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Clock::pause();
Clock::settle();
- Clock::advance(flags.oversubscribe_resources_interval);
+ Clock::advance(flags.oversubscribed_resources_interval);
ASSERT_FALSE(update.isReady());
@@ -81,10 +81,10 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
resourceEstimator.estimate(resources);
Clock::settle();
- Clock::advance(flags.oversubscribe_resources_interval);
+ Clock::advance(flags.oversubscribed_resources_interval);
AWAIT_READY(update);
- EXPECT_EQ(Resources(update.get().resources()), resources);
+ EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources);
Shutdown();
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic