[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mesos-commits
Subject:    (mesos) branch master updated: Added auto calculation of ingress_rate_per_cpu.
From:       bmahler () apache ! org
Date:       2024-01-22 22:49:10
Message-ID: 170596375084.2321612.7600090679141258744 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new f4087b81c Added auto calculation of ingress_rate_per_cpu.
f4087b81c is described below

commit f4087b81c3d19af447fde003decf5590dfb7cdae
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Tue May 1 16:44:30 2018 -0700

    Added auto calculation of ingress_rate_per_cpu.
    
    Currently ingress rate limit scaling requires manual configuration of per CPU \
rate limit.  While it is more predictable and gives rounder values, in a \
heterogeneous environment  where hosts with one NIC type have different CPUs it may \
be cumbersome to configure.  
    This commit allows you to pass an "auto" value to the `--ingress_rate_per_cpu` \
flag  of the isolator
    ```
    --ingress_rate_per_cpu=auto
    ```
    to automatically calculate the value. The value is calculated by determining the
    link speed using the `/sys/class/net/<iface>/speed` interface
    (https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net) and \
dividing the  speed by the number of CPUs resources that are available.  \
`--ingress_rate_per_cpu=auto`  will fail if there are 0 CPU resources.
    
    An additional flag
    ```
    --network_link_speed=<link speed in bytes/second>
    ```
    is provided to override link speed detection when `--ingress_rate_per_cpu=auto`.
---
 .../mesos/isolators/network/port_mapping.cpp       |  59 +++++++++-
 .../mesos/isolators/network/port_mapping.hpp       |   5 +-
 src/slave/flags.cpp                                |   7 ++
 src/slave/flags.hpp                                |   3 +-
 src/tests/containerizer/port_mapping_tests.cpp     | 127 ++++++++++++++++++++-
 5 files changed, 192 insertions(+), 9 deletions(-)

diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
                c0dcb0ae7..c316b7dba 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -739,6 +739,7 @@ static Try<Nothing> updateIngressHTB(
 }
 
 
+// Computes the speed of a link in bytes per second. 
 static Result<Bytes> getLinkSpeed(const string& link)
 {
   const string linkSpeedPath = path::join("/sys/class/net", link, "speed");
@@ -2245,6 +2246,50 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags)  " maximum ingress rate.");
   }
 
+  Option<Bytes> ingressRatePerCpu;
+  if (flags.ingress_rate_per_cpu.isSome()) {
+    if (flags.ingress_rate_per_cpu.get() == "auto") {
+      // Extract the number of CPUs from resources flag.
+      const uint64_t cpus = resources->cpus().getOrElse(0);
+      if (cpus == 0) {
+        return Error(
+            "CPUs resource has to be specified to determine per CPU ingress "
+            "rate limit");
+      }
+
+      // Link speed may be provided by the operator. If not, we can try to read
+      // the self-reported speed.
+      Result<Bytes> speed = flags.network_link_speed;
+      if (speed.isNone()) {
+        speed = getLinkSpeed(eth0.get());
+        if (!speed.isSome()) {
+          return Error(
+              "Failed to determine per CPU ingress rate limit: "
+              "Failed to determine link speed of " + eth0.get() + ": " +
+              (speed.isError() ? speed.error() : "Not supported"));
+        }
+      }
+
+      ingressRatePerCpu = speed.get() / cpus;
+
+      LOG(INFO) << "Using " << ingressRatePerCpu.get()
+                << " per CPU ingress rate limit"
+                << " (" << speed.get() << "/" << cpus << ")";
+    } else {
+      Try<Bytes> limit = Bytes::parse(flags.ingress_rate_per_cpu.get());
+      if (limit.isError()) {
+        return Error(
+            "Bad option for 'ingress_rate_per_cpu' flag: " + limit.error());
+      }
+
+      ingressRatePerCpu = limit.get();
+
+      LOG(INFO) << "Using " << ingressRatePerCpu.get()
+                << " per CPU ingress rate limit";
+    }
+    CHECK_SOME(ingressRatePerCpu);
+  }
+
   // If an egress or ingress rate limit is provided, do a sanity check
   // that it is not greater than the host physical link speed.
   if (flags.egress_rate_limit_per_container.isSome() ||
@@ -2723,7 +2768,8 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags)  nonEphemeralPorts,
           ephemeralPortsAllocator,
           freeFlowIds,
-          ratesCollector)));
+          ratesCollector,
+          ingressRatePerCpu)));
 }
 
 
@@ -2741,7 +2787,8 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
     const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
     const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
     const std::set<uint16_t>& _flowIDs,
-    const Owned<RatesCollector>& _ratesCollector)
+    const Owned<RatesCollector>& _ratesCollector,
+    const Option<Bytes>& _ingressRatePerCpu)
   : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
     flags(_flags),
     bindMountRoot(_bindMountRoot),
@@ -2756,7 +2803,8 @@ PortMappingIsolatorProcess::PortMappingIsolatorProcess(
     managedNonEphemeralPorts(_managedNonEphemeralPorts),
     ephemeralPortsAllocator(_ephemeralPortsAllocator),
     freeFlowIds(_flowIDs),
-    ratesCollector(_ratesCollector)
+    ratesCollector(_ratesCollector),
+    ingressRatePerCpu(_ingressRatePerCpu)
 {}
 
 
@@ -4979,9 +5027,8 @@ Option<htb::cls::Config> \
PortMappingIsolatorProcess::ingressHTBConfig(  Bytes rate(0);
   if (flags.ingress_rate_limit_per_container.isSome()) {
     rate = flags.ingress_rate_limit_per_container.get();
-  } else if (flags.ingress_rate_per_cpu.isSome()) {
-    rate = flags.ingress_rate_per_cpu.get() *
-           floor(resources.cpus().getOrElse(0));
+  } else if (ingressRatePerCpu.isSome()) {
+    rate = ingressRatePerCpu.get() * floor(resources.cpus().getOrElse(0));
   } else {
     return None();
   }
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
                df0584b97..e79427d4b 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -342,7 +342,8 @@ private:
       const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
       const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
       const std::set<uint16_t>& _flowIDs,
-      const process::Owned<RatesCollector>& _ratesCollector);
+      const process::Owned<RatesCollector>& _ratesCollector,
+      const Option<Bytes>& _ingressRatePerCpu);
 
   // Continuations.
   Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
@@ -422,6 +423,8 @@ private:
   hashset<ContainerID> unmanaged;
 
   process::Owned<RatesCollector> ratesCollector;
+
+  const Option<Bytes> ingressRatePerCpu;
 };
 
 
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 594dbedf8..fe0b220ac 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1307,6 +1307,13 @@ mesos::internal::slave::Flags::Flags()
       "Amount of data in Bytes that can be received at the higher ceil rate."
       "This flag is used by the `network/port_mapping_isolator`.");
 
+  add(&Flags::network_link_speed,
+      "network_link_speed",
+      "Physical network link speed in Bytes/s. This flag is used only when\n"
+      "--ingress_rate_per_cpu=\'auto\'. This provided link speed overrides\n"
+      "automatic detection of the link speed. This flag is used by the\n"
+      "`network/port_mapping_isolator`.");
+
   add(&Flags::network_enable_socket_statistics_summary,
       "network_enable_socket_statistics_summary",
       "Whether to collect socket statistics summary for each container.\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 0b6bd91c4..af98c12c0 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -161,11 +161,12 @@ public:
   bool egress_unique_flow_per_container;
   std::string egress_flow_classifier_parent;
   Option<Bytes> ingress_rate_limit_per_container;
-  Option<Bytes> ingress_rate_per_cpu;
+  Option<std::string> ingress_rate_per_cpu;
   Option<Bytes> minimum_ingress_rate_limit;
   Option<Bytes> maximum_ingress_rate_limit;
   Option<Bytes> ingress_ceil_limit;
   Option<Bytes> ingress_burst;
+  Option<Bytes> network_link_speed;
   bool network_enable_socket_statistics_summary;
   bool network_enable_socket_statistics_details;
   bool network_enable_snmp_statistics;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index 579b8205a..f2c1e9e78 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1851,7 +1851,132 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
   flags.ingress_rate_limit_per_container = None();
 
   const Bytes ingressRatePerCpu = 1000;
-  flags.ingress_rate_per_cpu = ingressRatePerCpu;
+  flags.ingress_rate_per_cpu = stringify(ingressRatePerCpu);
+
+  const Bytes minRate = 2000;
+  flags.minimum_ingress_rate_limit = minRate;
+
+  const Bytes maxRate = 4000;
+  flags.maximum_ingress_rate_limit = maxRate;
+
+  // CPU low enough for scaled network ingress to be increased to min
+  // limit: 1 * 1000 < 2000 ==> ingress is 2000.
+  Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+  ASSERT_SOME(lowCpu);
+
+  // CPU sufficient to be in linear scaling region, greater than min
+  // and less than max: 2000 < 3.1 * 1000 < 4000.
+  Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+  ASSERT_SOME(linearCpu);
+
+  // CPU high enough for scaled network ingress to be reduced to the
+  // max limit: 5 * 1000 > 4000.
+  Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+  ASSERT_SOME(highCpu);
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+  ContainerID containerId1;
+  containerId1.set_value(id::UUID::random().toString());
+
+  ContainerConfig containerConfig1;
+  containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+  Future<Option<ContainerLaunchInfo>> launchInfo1 =
+    isolator.get()->prepare(containerId1, containerConfig1);
+  AWAIT_READY(launchInfo1);
+  ASSERT_SOME(launchInfo1.get());
+  ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      "touch " + container1Ready + " && sleep 1000",
+      launchInfo1.get());
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> status = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+  // Signal forked child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for command to start to ensure all pre-exec scripts have
+  // executed.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get());
+  const routing::Handle cls(routing::Handle(1, 0), 1);
+
+  Result<htb::cls::Config> config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(minRate, config->rate);
+
+  // Increase CPU to get to linear scaling.
+  Future<Nothing> update = isolator.get()->update(
+      containerId1,
+      linearCpu.get());
+  AWAIT_READY(update);
+
+  config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(
+      ingressRatePerCpu.bytes() * floor(linearCpu.get().cpus().get()),
+      config->rate);
+
+  // Increase CPU further to hit maximum limit.
+  update = isolator.get()->update(
+      containerId1,
+      highCpu.get());
+  AWAIT_READY(update);
+
+  config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(maxRate, config->rate);
+
+  // Kill the container
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+}
+
+
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPUAutoConfig)
+{
+  flags.ingress_rate_limit_per_container = None();
+  flags.network_link_speed = Bytes(10000);
+
+  // Change available CPUs to be 10.
+  vector<string> resources = strings::split(flags.resources.get(), ";");
+  std::replace_if(
+      resources.begin(),
+      resources.end(),
+      [](const string& s) {return strings::startsWith(s, "cpus:");},
+      "cpus:10");
+
+  flags.resources = strings::join(";", resources);
+
+  // Ingress rate limit per CPU should be 10000 / 10 = 1000.
+  const Bytes ingressRatePerCpu = Bytes(1000);
+  flags.ingress_rate_per_cpu = "auto";
 
   const Bytes minRate = 2000;
   flags.minimum_ingress_rate_limit = minRate;


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic