[prev in list] [next in list] [prev in thread] [next in thread]
List: mesos-commits
Subject: (mesos) branch master updated: Added auto calculation of ingress_rate_per_cpu.
From: bmahler () apache ! org
Date: 2024-01-22 22:49:10
Message-ID: 170596375084.2321612.7600090679141258744 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new f4087b81c Added auto calculation of ingress_rate_per_cpu.
f4087b81c is described below
commit f4087b81c3d19af447fde003decf5590dfb7cdae
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Tue May 1 16:44:30 2018 -0700
Added auto calculation of ingress_rate_per_cpu.
Currently ingress rate limit scaling requires manual configuration of per CPU \
rate limit. While it is more predictable and gives rounder values, in a \
heterogeneous environment where hosts with one NIC type have different CPUs it may \
be cumbersome to configure.
This commit allows you to pass an "auto" value to the `--ingress_rate_per_cpu` \
flag of the isolator
```
--ingress_rate_per_cpu=auto
```
to automatically calculate the value. The value is calculated by determining the
link speed using the `/sys/class/net/<iface>/speed` interface
(https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net) and \
dividing the speed by the number of CPUs resources that are available. \
`--ingress_rate_per_cpu=auto` will fail if there are 0 CPU resources.
An additional flag
```
--network_link_speed=<link speed in bytes/second>
```
is provided to override link speed detection when `--ingress_rate_per_cpu=auto`.
---
.../mesos/isolators/network/port_mapping.cpp | 59 +++++++++-
.../mesos/isolators/network/port_mapping.hpp | 5 +-
src/slave/flags.cpp | 7 ++
src/slave/flags.hpp | 3 +-
src/tests/containerizer/port_mapping_tests.cpp | 127 ++++++++++++++++++++-
5 files changed, 192 insertions(+), 9 deletions(-)
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
c0dcb0ae7..c316b7dba 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -739,6 +739,7 @@ static Try<Nothing> updateIngressHTB(
}
+// Computes the speed of a link in bytes per second.
static Result<Bytes> getLinkSpeed(const string& link)
{
const string linkSpeedPath = path::join("/sys/class/net", link, "speed");
@@ -2245,6 +2246,50 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags) " maximum ingress rate.");
}
+ Option<Bytes> ingressRatePerCpu;
+ if (flags.ingress_rate_per_cpu.isSome()) {
+ if (flags.ingress_rate_per_cpu.get() == "auto") {
+ // Extract the number of CPUs from resources flag.
+ const uint64_t cpus = resources->cpus().getOrElse(0);
+ if (cpus == 0) {
+ return Error(
+ "CPUs resource has to be specified to determine per CPU ingress "
+ "rate limit");
+ }
+
+ // Link speed may be provided by the operator. If not, we can try to read
+ // the self-reported speed.
+ Result<Bytes> speed = flags.network_link_speed;
+ if (speed.isNone()) {
+ speed = getLinkSpeed(eth0.get());
+ if (!speed.isSome()) {
+ return Error(
+ "Failed to determine per CPU ingress rate limit: "
+ "Failed to determine link speed of " + eth0.get() + ": " +
+ (speed.isError() ? speed.error() : "Not supported"));
+ }
+ }
+
+ ingressRatePerCpu = speed.get() / cpus;
+
+ LOG(INFO) << "Using " << ingressRatePerCpu.get()
+ << " per CPU ingress rate limit"
+ << " (" << speed.get() << "/" << cpus << ")";
+ } else {
+ Try<Bytes> limit = Bytes::parse(flags.ingress_rate_per_cpu.get());
+ if (limit.isError()) {
+ return Error(
+ "Bad option for 'ingress_rate_per_cpu' flag: " + limit.error());
+ }
+
+ ingressRatePerCpu = limit.get();
+
+ LOG(INFO) << "Using " << ingressRatePerCpu.get()
+ << " per CPU ingress rate limit";
+ }
+ CHECK_SOME(ingressRatePerCpu);
+ }
+
// If an egress or ingress rate limit is provided, do a sanity check
// that it is not greater than the host physical link speed.
if (flags.egress_rate_limit_per_container.isSome() ||
@@ -2723,7 +2768,8 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags) nonEphemeralPorts,
ephemeralPortsAllocator,
freeFlowIds,
- ratesCollector)));
+ ratesCollector,
+ ingressRatePerCpu)));
}
@@ -2741,7 +2787,8 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
const std::set<uint16_t>& _flowIDs,
- const Owned<RatesCollector>& _ratesCollector)
+ const Owned<RatesCollector>& _ratesCollector,
+ const Option<Bytes>& _ingressRatePerCpu)
: ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
flags(_flags),
bindMountRoot(_bindMountRoot),
@@ -2756,7 +2803,8 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
managedNonEphemeralPorts(_managedNonEphemeralPorts),
ephemeralPortsAllocator(_ephemeralPortsAllocator),
freeFlowIds(_flowIDs),
- ratesCollector(_ratesCollector)
+ ratesCollector(_ratesCollector),
+ ingressRatePerCpu(_ingressRatePerCpu)
{}
@@ -4979,9 +5027,8 @@ Option<htb::cls::Config> \
PortMappingIsolatorProcess::ingressHTBConfig( Bytes rate(0);
if (flags.ingress_rate_limit_per_container.isSome()) {
rate = flags.ingress_rate_limit_per_container.get();
- } else if (flags.ingress_rate_per_cpu.isSome()) {
- rate = flags.ingress_rate_per_cpu.get() *
- floor(resources.cpus().getOrElse(0));
+ } else if (ingressRatePerCpu.isSome()) {
+ rate = ingressRatePerCpu.get() * floor(resources.cpus().getOrElse(0));
} else {
return None();
}
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
df0584b97..e79427d4b 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -342,7 +342,8 @@ private:
const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
const std::set<uint16_t>& _flowIDs,
- const process::Owned<RatesCollector>& _ratesCollector);
+ const process::Owned<RatesCollector>& _ratesCollector,
+ const Option<Bytes>& _ingressRatePerCpu);
// Continuations.
Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
@@ -422,6 +423,8 @@ private:
hashset<ContainerID> unmanaged;
process::Owned<RatesCollector> ratesCollector;
+
+ const Option<Bytes> ingressRatePerCpu;
};
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 594dbedf8..fe0b220ac 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1307,6 +1307,13 @@ mesos::internal::slave::Flags::Flags()
"Amount of data in Bytes that can be received at the higher ceil rate."
"This flag is used by the `network/port_mapping_isolator`.");
+ add(&Flags::network_link_speed,
+ "network_link_speed",
+ "Physical network link speed in Bytes/s. This flag is used only when\n"
+ "--ingress_rate_per_cpu=\'auto\'. This provided link speed overrides\n"
+ "automatic detection of the link speed. This flag is used by the\n"
+ "`network/port_mapping_isolator`.");
+
add(&Flags::network_enable_socket_statistics_summary,
"network_enable_socket_statistics_summary",
"Whether to collect socket statistics summary for each container.\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 0b6bd91c4..af98c12c0 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -161,11 +161,12 @@ public:
bool egress_unique_flow_per_container;
std::string egress_flow_classifier_parent;
Option<Bytes> ingress_rate_limit_per_container;
- Option<Bytes> ingress_rate_per_cpu;
+ Option<std::string> ingress_rate_per_cpu;
Option<Bytes> minimum_ingress_rate_limit;
Option<Bytes> maximum_ingress_rate_limit;
Option<Bytes> ingress_ceil_limit;
Option<Bytes> ingress_burst;
+ Option<Bytes> network_link_speed;
bool network_enable_socket_statistics_summary;
bool network_enable_socket_statistics_details;
bool network_enable_snmp_statistics;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index 579b8205a..f2c1e9e78 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1851,7 +1851,132 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
flags.ingress_rate_limit_per_container = None();
const Bytes ingressRatePerCpu = 1000;
- flags.ingress_rate_per_cpu = ingressRatePerCpu;
+ flags.ingress_rate_per_cpu = stringify(ingressRatePerCpu);
+
+ const Bytes minRate = 2000;
+ flags.minimum_ingress_rate_limit = minRate;
+
+ const Bytes maxRate = 4000;
+ flags.maximum_ingress_rate_limit = maxRate;
+
+ // CPU low enough for scaled network ingress to be increased to min
+ // limit: 1 * 1000 < 2000 ==> ingress is 2000.
+ Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+ ASSERT_SOME(lowCpu);
+
+ // CPU sufficient to be in linear scaling region, greater than min
+ // and less than max: 2000 < 3.1 * 1000 < 4000.
+ Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+ ASSERT_SOME(linearCpu);
+
+ // CPU high enough for scaled network ingress to be reduced to the
+ // max limit: 5 * 1000 > 4000.
+ Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+ ASSERT_SOME(highCpu);
+
+ Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+ ASSERT_SOME(isolator);
+
+ Try<Launcher*> launcher = LinuxLauncher::create(flags);
+ ASSERT_SOME(launcher);
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+ ContainerID containerId1;
+ containerId1.set_value(id::UUID::random().toString());
+
+ ContainerConfig containerConfig1;
+ containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+ Future<Option<ContainerLaunchInfo>> launchInfo1 =
+ isolator.get()->prepare(containerId1, containerConfig1);
+ AWAIT_READY(launchInfo1);
+ ASSERT_SOME(launchInfo1.get());
+ ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ Try<pid_t> pid = launchHelper(
+ launcher.get(),
+ pipes,
+ containerId1,
+ "touch " + container1Ready + " && sleep 1000",
+ launchInfo1.get());
+ ASSERT_SOME(pid);
+
+ // Reap the forked child.
+ Future<Option<int>> status = process::reap(pid.get());
+
+ // Continue in the parent.
+ ::close(pipes[0]);
+
+ // Isolate the forked child.
+ AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+ // Signal forked child to continue.
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+ ::close(pipes[1]);
+
+ // Wait for command to start to ensure all pre-exec scripts have
+ // executed.
+ ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+ const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get());
+ const routing::Handle cls(routing::Handle(1, 0), 1);
+
+ Result<htb::cls::Config> config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(minRate, config->rate);
+
+ // Increase CPU to get to linear scaling.
+ Future<Nothing> update = isolator.get()->update(
+ containerId1,
+ linearCpu.get());
+ AWAIT_READY(update);
+
+ config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(
+ ingressRatePerCpu.bytes() * floor(linearCpu.get().cpus().get()),
+ config->rate);
+
+ // Increase CPU further to hit maximum limit.
+ update = isolator.get()->update(
+ containerId1,
+ highCpu.get());
+ AWAIT_READY(update);
+
+ config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(maxRate, config->rate);
+
+ // Kill the container
+ AWAIT_READY(launcher.get()->destroy(containerId1));
+ AWAIT_READY(isolator.get()->cleanup(containerId1));
+}
+
+
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPUAutoConfig)
+{
+ flags.ingress_rate_limit_per_container = None();
+ flags.network_link_speed = Bytes(10000);
+
+ // Change available CPUs to be 10.
+ vector<string> resources = strings::split(flags.resources.get(), ";");
+ std::replace_if(
+ resources.begin(),
+ resources.end(),
+ [](const string& s) {return strings::startsWith(s, "cpus:");},
+ "cpus:10");
+
+ flags.resources = strings::join(";", resources);
+
+ // Ingress rate limit per CPU should be 10000 / 10 = 1000.
+ const Bytes ingressRatePerCpu = Bytes(1000);
+ flags.ingress_rate_per_cpu = "auto";
const Bytes minRate = 2000;
flags.minimum_ingress_rate_limit = minRate;
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic