[prev in list] [next in list] [prev in thread] [next in thread]
List: mesos-commits
Subject: (mesos) branch master updated: Added auto calculation of egress_rate_per_cpu.
From: bmahler () apache ! org
Date: 2024-01-23 20:16:27
Message-ID: 170604098774.787716.8277327397169752926 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new b607bca7e Added auto calculation of egress_rate_per_cpu.
b607bca7e is described below
commit b607bca7ed6470403ef35728efa73fb7a6d4e3fc
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Fri Aug 30 14:38:15 2019 -0700
Added auto calculation of egress_rate_per_cpu.
Currently egress rate limit scaling requires manual configuration of per CPU rate \
limit. While it is more predictable and gives rounder values, in a heterogeneous \
environment where hosts with one NIC type have different CPUs it may be cumbersome \
to configure.
This commit allows you to pass an "auto" value to the --egress_rate_per_cpu flag
of the isolator
```
--egress_rate_per_cpu=auto
```
to automatically calculate the value. The value is calculated by determining the
link speed using the /sys/class/net/<iface>/speed interface
(https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net) and \
dividing the speed by the number of CPUs resources that are available. \
--egress_rate_per_cpu=auto will fail if there are 0 CPU resources.
The `network_link_speed` flag
```
--network_link_speed=<link speed in bytes/second>
```
can be used to override link speed detection when --egress_rate_per_cpu=auto.
---
.../mesos/isolators/network/port_mapping.cpp | 52 ++++++++-
.../mesos/isolators/network/port_mapping.hpp | 2 +
src/slave/flags.cpp | 11 +-
src/slave/flags.hpp | 2 +-
src/tests/containerizer/port_mapping_tests.cpp | 123 ++++++++++++++++++++-
5 files changed, 181 insertions(+), 9 deletions(-)
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
c944e8509..9d21d1f52 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -2284,6 +2284,50 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags) " maximum ingress rate.");
}
+ Option<Bytes> egressRatePerCpu;
+ if (flags.egress_rate_per_cpu.isSome()) {
+ if (flags.egress_rate_per_cpu.get() == "auto") {
+ // Extract the number of CPUs from the resources flag.
+ const uint64_t cpus = resources->cpus().getOrElse(0);
+ if (cpus == 0) {
+ return Error(
+ "CPUs resource has to be specified to determine per CPU egress "
+ "rate limit");
+ }
+
+ // Link speed may be provided by the operator. If not, we can try to read
+ // the self-reported speed.
+ Result<Bytes> speed = flags.network_link_speed;
+ if (speed.isNone()) {
+ speed = getLinkSpeed(eth0.get());
+ if (!speed.isSome()) {
+ return Error(
+ "Failed to determine per CPU egress rate limit: "
+ "Failed to determine link speed of " + eth0.get() + ": " +
+ (speed.isError() ? speed.error() : "Not supported"));
+ }
+ }
+
+ egressRatePerCpu = speed.get() / cpus;
+
+ LOG(INFO) << "Using " << egressRatePerCpu.get()
+ << " per CPU egress rate limit"
+ << " (" << speed.get() << "/" << cpus << ")";
+ } else {
+ Try<Bytes> limit = Bytes::parse(flags.egress_rate_per_cpu.get());
+ if (limit.isError()) {
+ return Error(
+ "Bad option for 'egress_rate_per_cpu' flag: " + limit.error());
+ }
+
+ egressRatePerCpu = limit.get();
+
+ LOG(INFO) << "Using " << egressRatePerCpu.get()
+ << " per CPU egress rate limit";
+ }
+ CHECK_SOME(egressRatePerCpu);
+ }
+
Option<Bytes> ingressRatePerCpu;
if (flags.ingress_rate_per_cpu.isSome()) {
if (flags.ingress_rate_per_cpu.get() == "auto") {
@@ -2807,6 +2851,7 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags) ephemeralPortsAllocator,
freeFlowIds,
ratesCollector,
+ egressRatePerCpu,
ingressRatePerCpu)));
}
@@ -2826,6 +2871,7 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
const std::set<uint16_t>& _flowIDs,
const Owned<RatesCollector>& _ratesCollector,
+ const Option<Bytes>& _egressRatePerCpu,
const Option<Bytes>& _ingressRatePerCpu)
: ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
flags(_flags),
@@ -2842,6 +2888,7 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
ephemeralPortsAllocator(_ephemeralPortsAllocator),
freeFlowIds(_flowIDs),
ratesCollector(_ratesCollector),
+ egressRatePerCpu(_egressRatePerCpu),
ingressRatePerCpu(_ingressRatePerCpu)
{}
@@ -5018,9 +5065,8 @@ Option<htb::cls::Config> \
PortMappingIsolatorProcess::egressHTBConfig( Bytes rate(0);
if (flags.egress_rate_limit_per_container.isSome()) {
rate = flags.egress_rate_limit_per_container.get();
- } else if (flags.egress_rate_per_cpu.isSome()) {
- rate = flags.egress_rate_per_cpu.get() *
- floor(resources.cpus().getOrElse(0));
+ } else if (egressRatePerCpu.isSome()) {
+ rate = egressRatePerCpu.get() * floor(resources.cpus().getOrElse(0));
} else {
return None();
}
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
e79427d4b..219cf5dc3 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -343,6 +343,7 @@ private:
const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
const std::set<uint16_t>& _flowIDs,
const process::Owned<RatesCollector>& _ratesCollector,
+ const Option<Bytes>& _egressRatePerCpu,
const Option<Bytes>& _ingressRatePerCpu);
// Continuations.
@@ -424,6 +425,7 @@ private:
process::Owned<RatesCollector> ratesCollector;
+ const Option<Bytes> egressRatePerCpu;
const Option<Bytes> ingressRatePerCpu;
};
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 5c776d3cc..fbead7f1e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1232,6 +1232,9 @@ mesos::internal::slave::Flags::Flags()
"egress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
"i.e., floor(CPU), subject to the values of the\n"
"minimum_egress_rate_limit and maximum_egress_rate_limit flags."
+ "If set to 'auto' the rate limit is automatically calculated\n"
+ "by determining the link speed and dividing by the number of available\n"
+ "CPU resources.\n"
"This flag is used by the `network/port_mapping` isolator,");
add(&Flags::minimum_egress_rate_limit,
@@ -1284,7 +1287,7 @@ mesos::internal::slave::Flags::Flags()
"ingress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
"i.e., floor(CPU), subject to the values of the\n"
"minimum_ingress_rate_limit and maximum_ingress_rate_limit flags."
- "This flag is used by the `network/port_mapping` isolator,");
+ "This flag is used by the `network/port_mapping` isolator.");
add(&Flags::minimum_ingress_rate_limit,
"minimum_ingress_rate_limit",
@@ -1322,9 +1325,9 @@ mesos::internal::slave::Flags::Flags()
add(&Flags::network_link_speed,
"network_link_speed",
"Physical network link speed in Bytes/s. This flag is used only when\n"
- "--ingress_rate_per_cpu=\'auto\'. This provided link speed overrides\n"
- "automatic detection of the link speed. This flag is used by the\n"
- "`network/port_mapping_isolator`.");
+ "--ingress_rate_per_cpu=\'auto\' or --egress_rate_per_cpu=\'auto\'.\n"
+ "This provided link speed overrides automatic detection of the link\n"
+ "speed. This flag is used by the `network/port_mapping_isolator`.");
add(&Flags::network_enable_socket_statistics_summary,
"network_enable_socket_statistics_summary",
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index e2a421d91..f4c413a06 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -153,7 +153,7 @@ public:
Option<std::string> eth0_name;
Option<std::string> lo_name;
Option<Bytes> egress_rate_limit_per_container;
- Option<Bytes> egress_rate_per_cpu;
+ Option<std::string> egress_rate_per_cpu;
Option<Bytes> minimum_egress_rate_limit;
Option<Bytes> maximum_egress_rate_limit;
Option<Bytes> egress_ceil_limit;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index 970109b7c..92e26e46a 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1742,7 +1742,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
flags.egress_rate_limit_per_container = None();
const Bytes egressRatePerCpu = 1000;
- flags.egress_rate_per_cpu = egressRatePerCpu;
+ flags.egress_rate_per_cpu = stringify(egressRatePerCpu);
const Bytes minRate = 2000;
flags.minimum_egress_rate_limit = minRate;
@@ -1847,6 +1847,127 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
}
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPUAutoConfig)
+{
+ flags.egress_rate_limit_per_container = None();
+ flags.network_link_speed = Bytes(10000);
+
+ // Change available CPUs to be 10.
+ vector<string> resources = strings::split(flags.resources.get(), ";");
+ std::replace_if(
+ resources.begin(),
+ resources.end(),
+ [](const string& s) {return strings::startsWith(s, "cpus:");},
+ "cpus:10");
+ flags.resources = strings::join(";", resources);
+
+ // Egress rate limit per CPU should be 10000 / 10 = 1000.
+ const Bytes egressRatePerCpu = Bytes(1000);
+ flags.egress_rate_per_cpu = "auto";
+
+ const Bytes minRate = 2000;
+ flags.minimum_egress_rate_limit = minRate;
+
+ const Bytes maxRate = 4000;
+ flags.maximum_egress_rate_limit = maxRate;
+
+ // CPU low enough for scaled network ingress to be increased to min limit:
+ // 1 * 1000 < 2000 ==> ingress is 2000.
+ Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+ ASSERT_SOME(lowCpu);
+
+ // CPU sufficient to be in linear scaling region, greater than min and less
+ // than max: 2000 < 3.1 * 1000 < 4000.
+ Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+ ASSERT_SOME(linearCpu);
+
+ // CPU high enough for scaled network ingress to be reduced to the max limit:
+ // 5 * 1000 > 4000.
+ Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+ ASSERT_SOME(highCpu);
+
+ Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+ ASSERT_SOME(isolator);
+
+ Try<Launcher*> launcher = LinuxLauncher::create(flags);
+ ASSERT_SOME(launcher);
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+ ContainerID containerId1;
+ containerId1.set_value(id::UUID::random().toString());
+
+ ContainerConfig containerConfig1;
+ containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+ Future<Option<ContainerLaunchInfo>> launchInfo1 =
+ isolator.get()->prepare(containerId1, containerConfig1);
+ AWAIT_READY(launchInfo1);
+ ASSERT_SOME(launchInfo1.get());
+ ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ Try<pid_t> pid = launchHelper(
+ launcher.get(),
+ pipes,
+ containerId1,
+ "touch " + container1Ready + " && sleep 1000",
+ launchInfo1.get());
+ ASSERT_SOME(pid);
+
+ // Reap the forked child.
+ Future<Option<int>> status = process::reap(pid.get());
+
+ // Continue in the parent.
+ ::close(pipes[0]);
+
+ // Isolate the forked child.
+ AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+ // Signal forked child to continue.
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+ ::close(pipes[1]);
+
+ // Wait for command to start to ensure all pre-exec scripts have
+ // executed.
+ ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+ // The container should start with minimum limit.
+ Result<htb::cls::Config> config = recoverHTBConfig(pid.get(), eth0, flags);
+ ASSERT_SOME(config);
+ ASSERT_EQ(minRate, config->rate);
+
+ // Increase CPU to get to linear scaling.
+ Future<Nothing> update = isolator.get()->update(
+ containerId1, linearCpu.get());
+ AWAIT_READY(update);
+
+ config = recoverHTBConfig(pid.get(), eth0, flags);
+ ASSERT_SOME(config);
+ ASSERT_EQ(egressRatePerCpu.bytes() * floor(linearCpu->cpus().get()),
+ config->rate);
+
+ // Increase CPU further to hit maximum limit.
+ update = isolator.get()->update(containerId1, highCpu.get());
+ AWAIT_READY(update);
+
+ config = recoverHTBConfig(pid.get(), eth0, flags);
+ ASSERT_SOME(config);
+ ASSERT_EQ(maxRate, config->rate);
+
+ // Kill the container
+ AWAIT_READY(launcher.get()->destroy(containerId1));
+ AWAIT_READY(isolator.get()->cleanup(containerId1));
+
+ delete launcher.get();
+ delete isolator.get();
+}
+
+
TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
{
flags.ingress_rate_limit_per_container = None();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic