[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mesos-commits
Subject:    (mesos) branch master updated: Added auto calculation of egress_rate_per_cpu.
From:       bmahler () apache ! org
Date:       2024-01-23 20:16:27
Message-ID: 170604098774.787716.8277327397169752926 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new b607bca7e Added auto calculation of egress_rate_per_cpu.
b607bca7e is described below

commit b607bca7ed6470403ef35728efa73fb7a6d4e3fc
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Fri Aug 30 14:38:15 2019 -0700

    Added auto calculation of egress_rate_per_cpu.
    
    Currently egress rate limit scaling requires manual configuration of per CPU rate \
limit.  While it is more predictable and gives rounder values, in a heterogeneous \
environment  where hosts with one NIC type have different CPUs it may be cumbersome \
to configure.  
    This commit allows you to pass an "auto" value to the --egress_rate_per_cpu flag
    of the isolator
    ```
    --egress_rate_per_cpu=auto
    ```
    to automatically calculate the value. The value is calculated by determining the
    link speed using the /sys/class/net/<iface>/speed interface
    (https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net) and \
dividing the  speed by the number of CPUs resources that are available.  \
--egress_rate_per_cpu=auto  will fail if there are 0 CPU resources.
    
    The `network_link_speed` flag
    ```
    --network_link_speed=<link speed in bytes/second>
    ```
    can be used to override link speed detection when --egress_rate_per_cpu=auto.
---
 .../mesos/isolators/network/port_mapping.cpp       |  52 ++++++++-
 .../mesos/isolators/network/port_mapping.hpp       |   2 +
 src/slave/flags.cpp                                |  11 +-
 src/slave/flags.hpp                                |   2 +-
 src/tests/containerizer/port_mapping_tests.cpp     | 123 ++++++++++++++++++++-
 5 files changed, 181 insertions(+), 9 deletions(-)

diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
                c944e8509..9d21d1f52 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -2284,6 +2284,50 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags)  " maximum ingress rate.");
   }
 
+  Option<Bytes> egressRatePerCpu;
+  if (flags.egress_rate_per_cpu.isSome()) {
+    if (flags.egress_rate_per_cpu.get() == "auto") {
+      // Extract the number of CPUs from the resources flag.
+      const uint64_t cpus = resources->cpus().getOrElse(0);
+      if (cpus == 0) {
+        return Error(
+            "CPUs resource has to be specified to determine per CPU egress "
+            "rate limit");
+      }
+
+      // Link speed may be provided by the operator. If not, we can try to read
+      // the self-reported speed.
+      Result<Bytes> speed = flags.network_link_speed;
+      if (speed.isNone()) {
+        speed = getLinkSpeed(eth0.get());
+        if (!speed.isSome()) {
+          return Error(
+              "Failed to determine per CPU egress rate limit: "
+              "Failed to determine link speed of " + eth0.get() + ": " +
+              (speed.isError() ? speed.error() : "Not supported"));
+        }
+      }
+
+      egressRatePerCpu = speed.get() / cpus;
+
+      LOG(INFO) << "Using " << egressRatePerCpu.get()
+                << " per CPU egress rate limit"
+                << " (" << speed.get() << "/" << cpus << ")";
+    } else {
+      Try<Bytes> limit = Bytes::parse(flags.egress_rate_per_cpu.get());
+      if (limit.isError()) {
+        return Error(
+            "Bad option for 'egress_rate_per_cpu' flag: " + limit.error());
+      }
+
+      egressRatePerCpu = limit.get();
+
+      LOG(INFO) << "Using " << egressRatePerCpu.get()
+                << " per CPU egress rate limit";
+    }
+    CHECK_SOME(egressRatePerCpu);
+  }
+
   Option<Bytes> ingressRatePerCpu;
   if (flags.ingress_rate_per_cpu.isSome()) {
     if (flags.ingress_rate_per_cpu.get() == "auto") {
@@ -2807,6 +2851,7 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags)  ephemeralPortsAllocator,
           freeFlowIds,
           ratesCollector,
+          egressRatePerCpu,
           ingressRatePerCpu)));
 }
 
@@ -2826,6 +2871,7 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
     const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
     const std::set<uint16_t>& _flowIDs,
     const Owned<RatesCollector>& _ratesCollector,
+    const Option<Bytes>& _egressRatePerCpu,
     const Option<Bytes>& _ingressRatePerCpu)
   : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
     flags(_flags),
@@ -2842,6 +2888,7 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
     ephemeralPortsAllocator(_ephemeralPortsAllocator),
     freeFlowIds(_flowIDs),
     ratesCollector(_ratesCollector),
+    egressRatePerCpu(_egressRatePerCpu),
     ingressRatePerCpu(_ingressRatePerCpu)
 {}
 
@@ -5018,9 +5065,8 @@ Option<htb::cls::Config> \
PortMappingIsolatorProcess::egressHTBConfig(  Bytes rate(0);
   if (flags.egress_rate_limit_per_container.isSome()) {
     rate = flags.egress_rate_limit_per_container.get();
-  } else if (flags.egress_rate_per_cpu.isSome()) {
-    rate = flags.egress_rate_per_cpu.get() *
-           floor(resources.cpus().getOrElse(0));
+  } else if (egressRatePerCpu.isSome()) {
+    rate = egressRatePerCpu.get() * floor(resources.cpus().getOrElse(0));
   } else {
     return None();
   }
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
                e79427d4b..219cf5dc3 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -343,6 +343,7 @@ private:
       const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
       const std::set<uint16_t>& _flowIDs,
       const process::Owned<RatesCollector>& _ratesCollector,
+      const Option<Bytes>& _egressRatePerCpu,
       const Option<Bytes>& _ingressRatePerCpu);
 
   // Continuations.
@@ -424,6 +425,7 @@ private:
 
   process::Owned<RatesCollector> ratesCollector;
 
+  const Option<Bytes> egressRatePerCpu;
   const Option<Bytes> ingressRatePerCpu;
 };
 
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 5c776d3cc..fbead7f1e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1232,6 +1232,9 @@ mesos::internal::slave::Flags::Flags()
       "egress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
       "i.e., floor(CPU), subject to the values of the\n"
       "minimum_egress_rate_limit and maximum_egress_rate_limit flags."
+      "If set to 'auto' the rate limit is automatically calculated\n"
+      "by determining the link speed and dividing by the number of available\n"
+      "CPU resources.\n" 
       "This flag is used by the `network/port_mapping` isolator,");
 
   add(&Flags::minimum_egress_rate_limit,
@@ -1284,7 +1287,7 @@ mesos::internal::slave::Flags::Flags()
       "ingress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
       "i.e., floor(CPU), subject to the values of the\n"
       "minimum_ingress_rate_limit and maximum_ingress_rate_limit flags."
-      "This flag is used by the `network/port_mapping` isolator,");
+      "This flag is used by the `network/port_mapping` isolator.");
 
   add(&Flags::minimum_ingress_rate_limit,
       "minimum_ingress_rate_limit",
@@ -1322,9 +1325,9 @@ mesos::internal::slave::Flags::Flags()
   add(&Flags::network_link_speed,
       "network_link_speed",
       "Physical network link speed in Bytes/s. This flag is used only when\n"
-      "--ingress_rate_per_cpu=\'auto\'. This provided link speed overrides\n"
-      "automatic detection of the link speed. This flag is used by the\n"
-      "`network/port_mapping_isolator`.");
+      "--ingress_rate_per_cpu=\'auto\' or --egress_rate_per_cpu=\'auto\'.\n"
+      "This provided link speed overrides automatic detection of the link\n"
+      "speed. This flag is used by the `network/port_mapping_isolator`.");
 
   add(&Flags::network_enable_socket_statistics_summary,
       "network_enable_socket_statistics_summary",
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index e2a421d91..f4c413a06 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -153,7 +153,7 @@ public:
   Option<std::string> eth0_name;
   Option<std::string> lo_name;
   Option<Bytes> egress_rate_limit_per_container;
-  Option<Bytes> egress_rate_per_cpu;
+  Option<std::string> egress_rate_per_cpu;
   Option<Bytes> minimum_egress_rate_limit;
   Option<Bytes> maximum_egress_rate_limit;
   Option<Bytes> egress_ceil_limit;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index 970109b7c..92e26e46a 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1742,7 +1742,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
   flags.egress_rate_limit_per_container = None();
 
   const Bytes egressRatePerCpu = 1000;
-  flags.egress_rate_per_cpu = egressRatePerCpu;
+  flags.egress_rate_per_cpu = stringify(egressRatePerCpu);
 
   const Bytes minRate = 2000;
   flags.minimum_egress_rate_limit = minRate;
@@ -1847,6 +1847,127 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
 }
 
 
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPUAutoConfig)
+{
+  flags.egress_rate_limit_per_container = None();
+  flags.network_link_speed = Bytes(10000);
+
+  // Change available CPUs to be 10.
+  vector<string> resources = strings::split(flags.resources.get(), ";");
+  std::replace_if(
+      resources.begin(),
+      resources.end(),
+      [](const string& s) {return strings::startsWith(s, "cpus:");},
+      "cpus:10");
+  flags.resources = strings::join(";", resources);
+
+  // Egress rate limit per CPU should be 10000 / 10 = 1000.
+  const Bytes egressRatePerCpu = Bytes(1000);
+  flags.egress_rate_per_cpu = "auto";
+
+  const Bytes minRate = 2000;
+  flags.minimum_egress_rate_limit = minRate;
+
+  const Bytes maxRate = 4000;
+  flags.maximum_egress_rate_limit = maxRate;
+
+  // CPU low enough for scaled network ingress to be increased to min limit:
+  // 1 * 1000 < 2000 ==> ingress is 2000.
+  Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+  ASSERT_SOME(lowCpu);
+
+  // CPU sufficient to be in linear scaling region, greater than min and less
+  // than max: 2000 < 3.1 * 1000 < 4000.
+  Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+  ASSERT_SOME(linearCpu);
+
+  // CPU high enough for scaled network ingress to be reduced to the max limit:
+  // 5 * 1000 > 4000.
+  Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+  ASSERT_SOME(highCpu);
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+  ContainerID containerId1;
+  containerId1.set_value(id::UUID::random().toString());
+
+  ContainerConfig containerConfig1;
+  containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+  Future<Option<ContainerLaunchInfo>> launchInfo1 =
+    isolator.get()->prepare(containerId1, containerConfig1);
+  AWAIT_READY(launchInfo1);
+  ASSERT_SOME(launchInfo1.get());
+  ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      "touch " + container1Ready + " && sleep 1000",
+      launchInfo1.get());
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> status = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+  // Signal forked child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for command to start to ensure all pre-exec scripts have
+  // executed.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  // The container should start with minimum limit.
+  Result<htb::cls::Config> config = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_SOME(config);
+  ASSERT_EQ(minRate, config->rate);
+
+  // Increase CPU to get to linear scaling.
+  Future<Nothing> update = isolator.get()->update(
+      containerId1, linearCpu.get());
+  AWAIT_READY(update);
+
+  config = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_SOME(config);
+  ASSERT_EQ(egressRatePerCpu.bytes() * floor(linearCpu->cpus().get()),
+            config->rate);
+
+  // Increase CPU further to hit maximum limit.
+  update = isolator.get()->update(containerId1, highCpu.get());
+  AWAIT_READY(update);
+
+  config = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_SOME(config);
+  ASSERT_EQ(maxRate, config->rate);
+
+  // Kill the container
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+
+  delete launcher.get();
+  delete isolator.get();
+}
+
+
 TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
 {
   flags.ingress_rate_limit_per_container = None();


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic