[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mesos-commits
Subject:    (mesos) branch master updated: Added rates collector to network/port_mapping isolator.
From:       bmahler () apache ! org
Date:       2024-01-22 18:04:15
Message-ID: 170594665551.1759884.17093311887344796345 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new e437a922a Added rates collector to network/port_mapping isolator.
e437a922a is described below

commit e437a922aa2542299c499f8136dde23bebff2775
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Thu Jan 11 16:56:28 2018 -0800

    Added rates collector to network/port_mapping isolator.
    
    Introduces RatesCollector class to network/port_mapping to (new)
    RatePercentiles statistics on a variety of rates.
    
    Statistics are derived from:
    - https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
    
    Queried through `rtnl_link_get_stat` in `src/linux/routing/link/link.cpp`.
---
 include/mesos/mesos.proto                          |  48 +++
 include/mesos/v1/mesos.proto                       |  48 +++
 .../mesos/isolators/network/port_mapping.cpp       | 363 ++++++++++++++++++++-
 .../mesos/isolators/network/port_mapping.hpp       |  75 ++++-
 src/slave/flags.cpp                                |  18 +
 src/slave/flags.hpp                                |   3 +
 src/tests/containerizer/port_mapping_tests.cpp     | 258 +++++++++++++++
 7 files changed, 797 insertions(+), 16 deletions(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 5db138f8d..5d3f2232e 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1891,6 +1891,54 @@ message ResourceStatistics {
   optional uint64 net_rx_burst_rate_limit = 50;
   optional uint64 net_rx_burst_size = 51;
 
+  message RatePercentiles {
+    optional uint64 min = 1;
+    optional uint64 max = 2;
+    optional uint64 p50 = 3;
+    optional uint64 p90 = 4;
+    optional uint64 p95 = 5;
+    optional uint64 p99 = 6;
+    optional uint64 p999 = 7;
+    optional uint64 p9999 = 8;
+    optional uint64 samples = 9;
+  }
+
+  // Network rate statistics measured in bytes per second
+  // or packets per second. 
+  // 
+  // Rates are sampled every {sampling_interval_secs}. A 
+  // time series is created out of the samples taken over
+  // a moving sampling window of {sampling_window_secs}. 
+  // Percentiles for each time series are exposed through 
+  // RatePercentiles.
+  //
+  // Linux documentation for more information:
+  // https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
+  message RateStatistics {
+    // Bytes received per second.
+    optional RatePercentiles rx_rate = 1;
+    // Packets received per second.
+    optional RatePercentiles rx_packet_rate = 2;
+    // Received packets dropped per second.
+    optional RatePercentiles rx_drop_rate = 3;
+    // Receiving packet errors per second.
+    optional RatePercentiles rx_error_rate = 4;
+    // Bytes sent per second.
+    optional RatePercentiles tx_rate = 5;
+    // Packets sent per second.
+    optional RatePercentiles tx_packet_rate = 6;
+    // Send packets dropped per second.
+    optional RatePercentiles tx_drop_rate = 7;
+    // Sending packet errors per second.
+    optional RatePercentiles tx_error_rate = 8;
+    // Duration of the sliding time series window.
+    optional double sampling_window_secs = 9;
+    // The delay between rate samples.
+    optional double sampling_interval_secs = 10;
+  }
+
+  optional RateStatistics net_rate_statistics = 51;
+
   // The kernel keeps track of RTT (round-trip time) for its TCP
   // sockets. RTT is a way to tell the latency of a container.
   optional double net_tcp_rtt_microsecs_p50 = 22;
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 4a71b3474..98c9a6693 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1855,6 +1855,54 @@ message ResourceStatistics {
   optional uint64 net_rx_burst_rate_limit = 50;
   optional uint64 net_rx_burst_size = 51;
 
+  message RatePercentiles {
+    optional uint64 min = 1;
+    optional uint64 max = 2;
+    optional uint64 p50 = 3;
+    optional uint64 p90 = 4;
+    optional uint64 p95 = 5;
+    optional uint64 p99 = 6;
+    optional uint64 p999 = 7;
+    optional uint64 p9999 = 8;
+    optional uint64 samples = 9;
+  }
+
+  // Network rate statistics measured in bytes per second
+  // or packets per second. 
+  // 
+  // Rates are sampled every {sampling_interval_secs}. A 
+  // time series is created out of the samples taken over
+  // a moving sampling window of {sampling_window_secs}. 
+  // Percentiles for each time series are exposed through 
+  // RatePercentiles.
+  //
+  // Linux documentation for more information:
+  // https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
+  message RateStatistics {
+    // Bytes received per second.
+    optional RatePercentiles rx_rate = 1;
+    // Packets received per second.
+    optional RatePercentiles rx_packet_rate = 2;
+    // Received packets dropped per second.
+    optional RatePercentiles rx_drop_rate = 3;
+    // Receiving packet errors per second.
+    optional RatePercentiles rx_error_rate = 4;
+    // Bytes sent per second.
+    optional RatePercentiles tx_rate = 5;
+    // Packets sent per second.
+    optional RatePercentiles tx_packet_rate = 6;
+    // Send packets dropped per second.
+    optional RatePercentiles tx_drop_rate = 7;
+    // Sending packet errors per second.
+    optional RatePercentiles tx_error_rate = 8;
+    // Duration of the sliding time series window.
+    optional double sampling_window_secs = 9;
+    // The delay between rate samples.
+    optional double sampling_interval_secs = 10;
+  }
+
+  optional RateStatistics net_rate_statistics = 51;
+
   // The kernel keeps track of RTT (round-trip time) for its TCP
   // sockets. RTT is a way to tell the latency of a container.
   optional double net_tcp_rtt_microsecs_p50 = 22;
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
                3ed6863bc..f47a7bbb5 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -1483,6 +1483,279 @@ int PortMappingHTBConfig::execute()
 }
 
 
+/////////////////////////////////////////////////
+// Implementation of RatesCollector.
+/////////////////////////////////////////////////
+
+PercentileRatesCollector::PercentileRatesCollector(
+    pid_t _executorPid,
+    const Duration& _window,
+    const Duration& _interval)
+  : link(veth(_executorPid))
+{
+  const size_t capacity = _window.secs() / _interval.secs();
+  rxRates = TimeSeries<uint64_t>(_window, capacity);
+  rxPackets = TimeSeries<uint64_t>(_window, capacity);
+  rxDrops = TimeSeries<uint64_t>(_window, capacity);
+  rxErrors = TimeSeries<uint64_t>(_window, capacity);
+  txRates = TimeSeries<uint64_t>(_window, capacity);
+  txPackets = TimeSeries<uint64_t>(_window, capacity);
+  txDrops = TimeSeries<uint64_t>(_window, capacity);
+  txErrors = TimeSeries<uint64_t>(_window, capacity);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxRate() const
+{
+  return Statistics<uint64_t>::from(rxRates);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxPacketRate() const
+{
+  return Statistics<uint64_t>::from(rxPackets);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxDropRate() const
+{
+  return Statistics<uint64_t>::from(rxDrops);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxErrorRate() const
+{
+  return Statistics<uint64_t>::from(rxErrors);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txRate() const
+{
+  return Statistics<uint64_t>::from(txRates);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txPacketRate() const
+{
+  return Statistics<uint64_t>::from(txPackets);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txDropRate() const
+{
+  return Statistics<uint64_t>::from(txDrops);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txErrorRate() const
+{
+  return Statistics<uint64_t>::from(txErrors);
+}
+
+
+void PercentileRatesCollector::sample()
+{
+  const Time ts = Clock::now();
+
+  Result<hashmap<string, uint64_t>> stats = link::statistics(link);
+  if (stats.isSome()) {
+    sample(ts, std::move(stats.get()));
+  }
+}
+
+
+void PercentileRatesCollector::sample(
+    const Time& ts, hashmap<string, uint64_t>&& statistics)
+{
+  if (previous.isSome() && previous.get() < ts) {
+    // We sample statistics on the host end of the veth pair, so we
+    // need to reverse RX and TX to get statistics inside the
+    // container.
+    const double deltaT = ts.secs() - previous->secs();
+    sampleRate(statistics, "tx_bytes", ts, deltaT, rxRates);
+    sampleRate(statistics, "tx_packets", ts, deltaT, rxPackets);
+    sampleRate(statistics, "tx_dropped", ts, deltaT, rxDrops);
+    sampleRate(statistics, "tx_errors", ts, deltaT, rxErrors);
+    sampleRate(statistics, "rx_bytes", ts, deltaT, txRates);
+    sampleRate(statistics, "rx_packets", ts, deltaT, txPackets);
+    sampleRate(statistics, "rx_dropped", ts, deltaT, txDrops);
+    sampleRate(statistics, "rx_errors", ts, deltaT, txErrors);
+  }
+
+  previous = ts;
+  previousStatistics = std::move(statistics);
+}
+
+
+void PercentileRatesCollector::sampleRate(
+    const hashmap<string, uint64_t>& statistics,
+    const string& metric,
+    const Time& timestamp,
+    double timeDelta,
+    TimeSeries<uint64_t>& rates)
+{
+  const Option<uint64_t> previousValue = previousStatistics.get(metric);
+  const Option<uint64_t> value = statistics.get(metric);
+  if (previousValue.isSome() && value.isSome()) {
+    rates.set((value.get() - previousValue.get()) / timeDelta, timestamp);
+  }
+}
+
+
+class RatesCollectorProcess : public Process<RatesCollectorProcess>
+{
+public:
+  explicit RatesCollectorProcess(
+      const Duration& _interval,
+      const Duration& _window)
+    : ProcessBase(ID::generate("mesos-port-mapping-rates-collector")),
+      interval(_interval),
+      window(_window)
+  {
+    CHECK_GT(window, interval);
+  }
+
+  void initialize() override
+  {
+    schedule();
+  }
+
+  Future<ResourceStatistics> usage(const ContainerID& containerId)
+  {
+    ResourceStatistics statistics;
+
+    if (!collectors.contains(containerId)) {
+      LOG(WARNING) << "Unknown container " << containerId;
+      return statistics;
+    }
+
+    const PercentileRatesCollector& collector = collectors.at(containerId);
+
+    ResourceStatistics::RateStatistics* rates =
+      statistics.mutable_net_rate_statistics();
+
+    Option<Statistics<uint64_t>> rate = collector.rxRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_rx_rate());
+    }
+    rate = collector.rxPacketRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_rx_packet_rate());
+    }
+    rate = collector.rxDropRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_rx_drop_rate());
+    }
+    rate = collector.rxErrorRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_rx_error_rate());
+    }
+    rate = collector.txRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_tx_rate());
+    }
+    rate = collector.txPacketRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_tx_packet_rate());
+    }
+    rate = collector.txDropRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_tx_drop_rate());
+    }
+    rate = collector.txErrorRate();
+    if (rate.isSome()) {
+      copyRate(rate.get(), rates->mutable_tx_error_rate());
+    }
+
+    rates->set_sampling_window_secs(window.secs());
+    rates->set_sampling_interval_secs(interval.secs());
+
+    return statistics;
+  }
+
+  // Add new container for metrics collecting.
+  Future<Nothing> add(const ContainerID& containerId, pid_t executorPid)
+  {
+    collectors.emplace(
+        containerId, PercentileRatesCollector(executorPid, window, interval));
+    return Nothing();
+  }
+
+  // Stop collecting metrics for the container.
+  Future<Nothing> remove(const ContainerID& containerId)
+  {
+    collectors.erase(containerId);
+    return Nothing();
+  }
+
+private:
+  void schedule()
+  {
+    foreachvalue (PercentileRatesCollector& collector, collectors) {
+      collector.sample();
+    }
+
+    delay(interval, self(), &Self::schedule);
+  }
+
+  void copyRate(
+      const Statistics<uint64_t>& statistics,
+      ResourceStatistics::RatePercentiles* rate)
+  {
+    rate->set_min(statistics.min);
+    rate->set_max(statistics.max);
+    rate->set_p50(statistics.p50);
+    rate->set_p90(statistics.p90);
+    rate->set_p95(statistics.p95);
+    rate->set_p99(statistics.p99);
+    rate->set_p999(statistics.p999);
+    rate->set_p9999(statistics.p9999);
+    rate->set_samples(statistics.count);
+  }
+
+  const Duration interval;
+  const Duration window;
+
+  hashmap<ContainerID, PercentileRatesCollector> collectors;
+};
+
+
+class RatesCollector
+{
+public:
+  explicit RatesCollector(const Duration& interval, const Duration& window)
+  {
+    process = new RatesCollectorProcess(interval, window);
+    spawn(process);
+  }
+
+  ~RatesCollector()
+  {
+    terminate(process);
+    wait(process);
+    delete process;
+  }
+
+  Future<ResourceStatistics> usage(const ContainerID& containerId)
+  {
+    return dispatch(process, &RatesCollectorProcess::usage, containerId);
+  }
+
+  Future<Nothing> add(const ContainerID& containerId, pid_t pid)
+  {
+    return dispatch(process, &RatesCollectorProcess::add, containerId, pid);
+  }
+
+  Future<Nothing> remove(const ContainerID& containerId)
+  {
+    return dispatch(process, &RatesCollectorProcess::remove, containerId);
+  }
+
+private:
+  RatesCollectorProcess* process;
+};
+
+
 /////////////////////////////////////////////////
 // Implementation for the isolator.
 /////////////////////////////////////////////////
@@ -1995,6 +2268,31 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags)  }
   }
 
+  Owned<RatesCollector> ratesCollector;
+  if (flags.network_enable_rate_statistics) {
+    if (flags.network_rate_statistics_window.isNone() ||
+        flags.network_rate_statistics_interval.isNone()) {
+      return Error("Window size and sampling interval for rate statistics "
+                   "are required");
+    }
+
+    if (flags.network_rate_statistics_window.get() <=
+        flags.network_rate_statistics_interval.get()) {
+      return Error("Rate statistics window size should be bigger than "
+                   "the sampling interval");
+    }
+
+    const Duration minInterval = Milliseconds(20);
+    if (flags.network_rate_statistics_interval.get() < minInterval) {
+      return Error("Rate statistics interval should not be smaller than "
+                   + stringify(minInterval));
+    }
+
+    ratesCollector.reset(new RatesCollector(
+          flags.network_rate_statistics_interval.get(),
+          flags.network_rate_statistics_window.get()));
+  }
+
   // Get the host IP network, MAC and default gateway.
   Result<net::IP::Network> hostIPNetwork =
     net::IP::Network::fromLinkDevice(eth0.get(), AF_INET);
@@ -2380,10 +2678,44 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const \
Flags& flags)  hostNetworkConfigurations,
           nonEphemeralPorts,
           ephemeralPortsAllocator,
-          freeFlowIds)));
+          freeFlowIds,
+          ratesCollector)));
 }
 
 
+PortMappingIsolatorProcess::PortMappingIsolatorProcess(
+    const Flags& _flags,
+    const std::string& _bindMountRoot,
+    const std::string& _eth0,
+    const std::string& _lo,
+    const net::MAC& _hostMAC,
+    const net::IP::Network& _hostIPNetwork,
+    const size_t _hostEth0MTU,
+    const net::IP& _hostDefaultGateway,
+    const routing::Handle& _hostTxFqCodelHandle,
+    const hashmap<std::string, std::string>& _hostNetworkConfigurations,
+    const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
+    const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
+    const std::set<uint16_t>& _flowIDs,
+    const Owned<RatesCollector>& _ratesCollector)
+  : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
+    flags(_flags),
+    bindMountRoot(_bindMountRoot),
+    eth0(_eth0),
+    lo(_lo),
+    hostMAC(_hostMAC),
+    hostIPNetwork(_hostIPNetwork),
+    hostEth0MTU(_hostEth0MTU),
+    hostDefaultGateway(_hostDefaultGateway),
+    hostTxFqCodelHandle(_hostTxFqCodelHandle),
+    hostNetworkConfigurations(_hostNetworkConfigurations),
+    managedNonEphemeralPorts(_managedNonEphemeralPorts),
+    ephemeralPortsAllocator(_ephemeralPortsAllocator),
+    freeFlowIds(_flowIDs),
+    ratesCollector(_ratesCollector)
+{}
+
+
 Result<htb::cls::Config> recoverHTBConfig(
     pid_t pid,
     const std::string& eth0,
@@ -2657,6 +2989,10 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
 
     infos[containerId] = recover.get();
 
+    if (ratesCollector.get()) {
+      ratesCollector->add(containerId, pid);
+    }
+
     // Remove the successfully recovered pid.
     pids.erase(pid);
   }
@@ -3391,6 +3727,10 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
     return Failure("Not expecting " + veth(pid) + " to be missing");
   }
 
+  if (ratesCollector.get()) {
+    ratesCollector->add(containerId, pid);
+  }
+
   return Nothing();
 }
 
@@ -3839,12 +4179,14 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage(
         PID<PortMappingIsolatorProcess>(this),
         &PortMappingIsolatorProcess::_usage,
         result,
+        containerId,
         s.get()));
 }
 
 
 Future<ResourceStatistics> PortMappingIsolatorProcess::_usage(
     const ResourceStatistics& result,
+    const ContainerID& containerId,
     const Subprocess& s)
 {
   CHECK_READY(s.status());
@@ -3865,12 +4207,14 @@ Future<ResourceStatistics> \
PortMappingIsolatorProcess::_usage(  PID<PortMappingIsolatorProcess>(this),
         &PortMappingIsolatorProcess::__usage,
         result,
+        containerId,
         lambda::_1));
 }
 
 
 Future<ResourceStatistics> PortMappingIsolatorProcess::__usage(
     ResourceStatistics result,
+    const ContainerID& containerId,
     const Future<string>& out)
 {
   CHECK_READY(out);
@@ -3902,6 +4246,19 @@ Future<ResourceStatistics> \
PortMappingIsolatorProcess::__usage(  // will overwrite the timestamp set in the \
containerizer.  result.clear_timestamp();
 
+  if (ratesCollector.get()) {
+    return ratesCollector->usage(containerId)
+      .then([result](const Future<ResourceStatistics>& usage) mutable {
+        if (!usage.isReady()) {
+          LOG(WARNING) << "Failed to retrieve rates from the collector: "
+                       << (usage.isFailed() ? usage.failure() : "discarded");
+        } else {
+          result.MergeFrom(usage.get());
+        }
+        return result;
+      });
+  }
+
   return result;
 }
 
@@ -3919,6 +4276,10 @@ Future<Nothing> PortMappingIsolatorProcess::cleanup(
     return Nothing();
   }
 
+  if (ratesCollector.get()) {
+    ratesCollector->remove(containerId);
+  }
+
   Info* info = CHECK_NOTNULL(infos[containerId]);
 
   // For a normally exited container, we take its info pointer off the
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
                888b5806e..c073c6d9c 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -21,6 +21,7 @@
 
 #include <sys/types.h>
 
+#include <memory>
 #include <set>
 #include <string>
 #include <vector>
@@ -155,6 +156,59 @@ Result<routing::queueing::htb::cls::Config> recoverHTBConfig(
     const std::string& eth0,
     const Flags& flags);
 
+
+class PercentileRatesCollector final
+{
+public:
+  explicit PercentileRatesCollector(
+      pid_t _executorPid, const Duration& _window, const Duration& _interval);
+
+  Option<process::Statistics<uint64_t>> rxRate() const;
+  Option<process::Statistics<uint64_t>> rxPacketRate() const;
+  Option<process::Statistics<uint64_t>> rxDropRate() const;
+  Option<process::Statistics<uint64_t>> rxErrorRate() const;
+
+  Option<process::Statistics<uint64_t>> txRate() const;
+  Option<process::Statistics<uint64_t>> txPacketRate() const;
+  Option<process::Statistics<uint64_t>> txDropRate() const;
+  Option<process::Statistics<uint64_t>> txErrorRate() const;
+
+  // Sample statistics from the interface.
+  void sample();
+
+  // Register the statistics sample. Exposed for testing.
+  void sample(const process::Time& ts, hashmap<std::string, uint64_t>&& stats);
+
+private:
+  void sampleRate(
+      const hashmap<std::string, uint64_t>& statistics,
+      const std::string& metric,
+      const process::Time& timestamp,
+      double timeDelta,
+      process::TimeSeries<uint64_t>& rates);
+
+  // Name of the link to sample metrics from.
+  const std::string link;
+
+  process::TimeSeries<uint64_t> rxRates;
+  process::TimeSeries<uint64_t> rxPackets;
+  process::TimeSeries<uint64_t> rxDrops;
+  process::TimeSeries<uint64_t> rxErrors;
+
+  process::TimeSeries<uint64_t> txRates;
+  process::TimeSeries<uint64_t> txPackets;
+  process::TimeSeries<uint64_t> txDrops;
+  process::TimeSeries<uint64_t> txErrors;
+
+  // Previous sample and its timestamp for calculating rates.
+  Option<process::Time> previous;
+  hashmap<std::string, uint64_t> previousStatistics;
+};
+
+
+class RatesCollector;
+
+
 // Provides network isolation using port mapping. Each container is
 // assigned a fixed set of ports (including ephemeral ports). The
 // isolator will set up filters on the host such that network traffic
@@ -290,21 +344,8 @@ private:
       const hashmap<std::string, std::string>& _hostNetworkConfigurations,
       const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
       const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
-      const std::set<uint16_t>& _flowIDs)
-    : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
-      flags(_flags),
-      bindMountRoot(_bindMountRoot),
-      eth0(_eth0),
-      lo(_lo),
-      hostMAC(_hostMAC),
-      hostIPNetwork(_hostIPNetwork),
-      hostEth0MTU(_hostEth0MTU),
-      hostDefaultGateway(_hostDefaultGateway),
-      hostTxFqCodelHandle(_hostTxFqCodelHandle),
-      hostNetworkConfigurations(_hostNetworkConfigurations),
-      managedNonEphemeralPorts(_managedNonEphemeralPorts),
-      ephemeralPortsAllocator(_ephemeralPortsAllocator),
-      freeFlowIds(_flowIDs) {}
+      const std::set<uint16_t>& _flowIDs,
+      const process::Owned<RatesCollector>& _ratesCollector);
 
   // Continuations.
   Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
@@ -316,10 +357,12 @@ private:
 
   process::Future<ResourceStatistics> _usage(
       const ResourceStatistics& result,
+      const ContainerID& containerId,
       const process::Subprocess& s);
 
   process::Future<ResourceStatistics> __usage(
       ResourceStatistics result,
+      const ContainerID& containerId,
       const process::Future<std::string>& out);
 
   // Helper functions.
@@ -380,6 +423,8 @@ private:
   // Recovered containers from a previous run that weren't managed by
   // the network isolator.
   hashset<ContainerID> unmanaged;
+
+  process::Owned<RatesCollector> ratesCollector;
 };
 
 
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 90cf32e71..594dbedf8 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1327,6 +1327,24 @@ mesos::internal::slave::Flags::Flags()
       "isolator.",
       false);
 
+  add(&Flags::network_enable_rate_statistics,
+      "network_enable_rate_statistics",
+      "Whether to collect rate statistics for each container. Note that\n"
+      "proper sampling window and interval configuration is required to get\n"
+      "meaningful percentiles. This flag is used for the\n"
+      "'network/port_mapping'\n isolator.",
+      false);
+
+  add(&Flags::network_rate_statistics_window,
+      "network_rate_statistics_window",
+      "Window size for rate statistics time series. This flag is used for the\n"
+      "'network/port_mapping' isolator.");
+
+  add(&Flags::network_rate_statistics_interval,
+      "network_rate_statistics_interval",
+      "Interval with which to sample rate statistics. This flag is used for\n"
+      "the 'network/port_mapping' isolator.");
+
 #endif // ENABLE_PORT_MAPPING_ISOLATOR
 
 #ifdef ENABLE_NETWORK_PORTS_ISOLATOR
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 12c2404f9..0b6bd91c4 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -169,6 +169,9 @@ public:
   bool network_enable_socket_statistics_summary;
   bool network_enable_socket_statistics_details;
   bool network_enable_snmp_statistics;
+  bool network_enable_rate_statistics;
+  Option<Duration> network_rate_statistics_window;
+  Option<Duration> network_rate_statistics_interval;
 #endif // ENABLE_PORT_MAPPING_ISOLATOR
 
 #ifdef ENABLE_NETWORK_PORTS_ISOLATOR
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index c8ae69853..579b8205a 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -2157,6 +2157,264 @@ TEST_F(PortMappingIsolatorTest, \
ROOT_NC_PortMappingStatistics)  }
 
 
+// Verify that rate statistics can be returned properly from
+// 'usage()'. This test is very similar to SmallIngressLimitTest in
+// setup.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_PortMappingRateStatistics)
+{
+  const Bytes rate = 2000;
+  const Bytes size = 20480;
+
+  flags.ingress_rate_limit_per_container = rate;
+  flags.minimum_ingress_rate_limit = 0;
+  flags.network_enable_rate_statistics = true;
+  flags.network_rate_statistics_window = Seconds(5);
+  flags.network_rate_statistics_interval = Milliseconds(50);
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+
+  ContainerID containerId;
+  containerId.set_value(id::UUID::random().toString());
+
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+
+  ContainerConfig containerConfig;
+  containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
+  containerConfig.mutable_resources()->CopyFrom(executorInfo.resources());
+  containerConfig.set_directory(dir.get());
+
+  Future<Option<ContainerLaunchInfo>> launchInfo = isolator.get()->prepare(
+      containerId, containerConfig);
+  AWAIT_READY(launchInfo);
+  ASSERT_SOME(launchInfo.get());
+  ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size());
+
+  ostringstream cmd1;
+  cmd1 << "touch " << container1Ready << " && ";
+  cmd1 << "nc -l -k localhost " << validPort << " > /dev/null";
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      cmd1.str(),
+      launchInfo.get());
+
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> reap = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for the command to finish.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  const string data(size.bytes(), 'a');
+
+  ostringstream cmd2;
+  cmd2 << "echo " << data << " | nc localhost " << validPort;
+
+  Stopwatch stopwatch;
+  stopwatch.start();
+  ASSERT_SOME(os::shell(cmd2.str()));
+  Duration time = stopwatch.elapsed();
+
+  // Allow the time to deviate up to 1sec here to compensate for burstness.
+  Duration expectedTime = Seconds(size.bytes() / rate.bytes() - 1);
+  ASSERT_GE(time, expectedTime);
+
+  // Number of samples that should fit into the window.
+  const size_t samples = flags.network_rate_statistics_window->secs() /
+    flags.network_rate_statistics_interval->secs();
+
+  // Verify that TX and RX rates have been returned with resource
+  // statistics. It's hard to verify actual values here because of
+  // burstness.
+  Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_TRUE(usage->has_net_rate_statistics());
+
+  const ResourceStatistics::RateStatistics& rates =
+    usage->net_rate_statistics();
+  EXPECT_TRUE(rates.has_tx_rate());
+  EXPECT_TRUE(rates.tx_rate().has_p90());
+  EXPECT_TRUE(rates.tx_rate().has_samples());
+  EXPECT_GE(samples, rates.tx_rate().samples());
+  EXPECT_TRUE(rates.has_tx_packet_rate());
+  EXPECT_TRUE(rates.tx_packet_rate().has_p90());
+  EXPECT_TRUE(rates.has_tx_drop_rate());
+  EXPECT_TRUE(rates.tx_drop_rate().has_p90());
+  EXPECT_TRUE(rates.has_tx_error_rate());
+  EXPECT_TRUE(rates.tx_error_rate().has_p90());
+  EXPECT_TRUE(rates.has_rx_rate());
+  EXPECT_TRUE(rates.rx_rate().has_p90());
+  EXPECT_TRUE(rates.has_rx_packet_rate());
+  EXPECT_TRUE(rates.rx_packet_rate().has_p90());
+  EXPECT_TRUE(rates.has_rx_drop_rate());
+  EXPECT_TRUE(rates.rx_drop_rate().has_p90());
+  EXPECT_TRUE(rates.has_rx_error_rate());
+  EXPECT_TRUE(rates.rx_error_rate().has_p90());
+
+  EXPECT_TRUE(rates.has_sampling_window_secs());
+  EXPECT_EQ(
+      flags.network_rate_statistics_window->secs(),
+      rates.sampling_window_secs());
+  EXPECT_TRUE(rates.has_sampling_interval_secs());
+  EXPECT_EQ(
+      flags.network_rate_statistics_interval->secs(),
+      rates.sampling_interval_secs());
+
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+
+  delete isolator.get();
+  delete launcher.get();
+}
+
+
+// Verify that PercentileRatesCollector calculates rates correctly.
+TEST(RatesCollectorTest, PercentileRatesCollector)
+{
+  Clock::pause();
+  const Duration window = Seconds(10);
+  const Duration interval = Seconds(1);
+  const Time now = Clock::now();
+
+  PercentileRatesCollector collector(0, window, interval);
+
+  // No samples.
+  EXPECT_NONE(collector.txRate());
+  EXPECT_NONE(collector.txPacketRate());
+  EXPECT_NONE(collector.txDropRate());
+  EXPECT_NONE(collector.txErrorRate());
+  EXPECT_NONE(collector.rxRate());
+  EXPECT_NONE(collector.rxPacketRate());
+  EXPECT_NONE(collector.rxDropRate());
+  EXPECT_NONE(collector.rxErrorRate());
+
+  const auto createSample = [](
+      uint64_t txBytes,
+      uint64_t txPackets,
+      uint64_t txDropped,
+      uint64_t txErrors,
+      uint64_t rxBytes,
+      uint64_t rxPackets,
+      uint64_t rxDropped,
+      uint64_t rxErrors) -> hashmap<string, uint64_t> {
+    return {{"tx_bytes", txBytes},
+            {"tx_packets", txPackets},
+            {"tx_dropped", txDropped},
+            {"tx_errors", txErrors},
+            {"rx_bytes", rxBytes},
+            {"rx_packets", rxPackets},
+            {"rx_dropped", rxDropped},
+            {"rx_errors", rxErrors}};
+  };
+
+  // Simulate ingress traffic burst at 100 B/s for the first 2 sec and
+  // steady 50 B/s rate for 3 sec after that. Egress traffic rate was
+  // 10 B/s for the first 2 sec and 0 for the rest of the time.
+  collector.sample(now, createSample(0, 0, 0, 0, 0, 0, 0, 0));
+  collector.sample(now + Seconds(1), createSample(100, 10, 5, 1, 10, 1, 1, 1));
+  collector.sample(now + Seconds(2), createSample(200, 20, 10, 2, 20, 2, 2, 2));
+  collector.sample(now + Seconds(3), createSample(250, 25, 11, 3, 20, 2, 2, 2));
+  collector.sample(now + Seconds(4), createSample(300, 30, 12, 4, 20, 2, 2, 2));
+  collector.sample(now + Seconds(5), createSample(350, 35, 13, 5, 20, 2, 2, 2));
+
+  Option<Statistics<uint64_t>> rxRate = collector.rxRate();
+  ASSERT_SOME(rxRate);
+  EXPECT_EQ(5u, rxRate->count);   // Number of statistics samples.
+  EXPECT_EQ(100u, rxRate->max);   // Max seen byte rate.
+  EXPECT_EQ(50u, rxRate->min);    // Min seen byte rate.
+  EXPECT_EQ(100u, rxRate->p90);   // p90 is 4.5th sample here.
+  EXPECT_EQ(50u, rxRate->p50);    // p50 is 2.5th sample here.
+
+  Option<Statistics<uint64_t>> rxPacketRate = collector.rxPacketRate();
+  ASSERT_SOME(rxPacketRate);
+  EXPECT_EQ(5u, rxPacketRate->count);
+  EXPECT_EQ(10u, rxPacketRate->max);
+  EXPECT_EQ(5u, rxPacketRate->min);
+  EXPECT_EQ(10u, rxPacketRate->p90);
+  EXPECT_EQ(5u, rxPacketRate->p50);
+
+  Option<Statistics<uint64_t>> rxDropRate = collector.rxDropRate();
+  ASSERT_SOME(rxDropRate);
+  EXPECT_EQ(5u, rxDropRate->count);
+  EXPECT_EQ(5u, rxDropRate->max);
+  EXPECT_EQ(1u, rxDropRate->min);
+  EXPECT_EQ(5u, rxDropRate->p90);
+  EXPECT_EQ(1u, rxDropRate->p50);
+
+  // RX error rate is constantly 1 here.
+  Option<Statistics<uint64_t>> rxErrorRate = collector.rxErrorRate();
+  ASSERT_SOME(rxErrorRate);
+  EXPECT_EQ(5u, rxErrorRate->count);
+  EXPECT_EQ(1u, rxErrorRate->max);
+  EXPECT_EQ(1u, rxErrorRate->min);
+  EXPECT_EQ(1u, rxErrorRate->p90);
+  EXPECT_EQ(1u, rxErrorRate->p50);
+
+  Option<Statistics<uint64_t>> txRate = collector.txRate();
+  ASSERT_SOME(txRate);
+  EXPECT_EQ(5u, txRate->count);
+  EXPECT_EQ(10u, txRate->max);
+  EXPECT_EQ(0u, txRate->min);
+  EXPECT_EQ(10u, txRate->p90);
+  EXPECT_EQ(0u, txRate->p50);
+
+  Option<Statistics<uint64_t>> txPacketRate = collector.txPacketRate();
+  ASSERT_SOME(txPacketRate);
+  EXPECT_EQ(5u, txPacketRate->count);
+  EXPECT_EQ(1u, txPacketRate->max);
+  EXPECT_EQ(0u, txPacketRate->min);
+  EXPECT_EQ(1u, txPacketRate->p90);
+  EXPECT_EQ(0u, txPacketRate->p50);
+
+  Option<Statistics<uint64_t>> txDropRate = collector.txDropRate();
+  ASSERT_SOME(txDropRate);
+  EXPECT_EQ(5u, txDropRate->count);
+  EXPECT_EQ(1u, txDropRate->max);
+  EXPECT_EQ(0u, txDropRate->min);
+  EXPECT_EQ(1u, txDropRate->p90);
+  EXPECT_EQ(0u, txDropRate->p50);
+
+  Option<Statistics<uint64_t>> txErrorRate = collector.txErrorRate();
+  ASSERT_SOME(txErrorRate);
+  EXPECT_EQ(5u, txErrorRate->count);
+  EXPECT_EQ(1u, txErrorRate->max);
+  EXPECT_EQ(0u, txErrorRate->min);
+  EXPECT_EQ(1u, txErrorRate->p90);
+  EXPECT_EQ(0u, txErrorRate->p50);
+
+  Clock::resume();
+}
+
+
 static uint16_t roundUpToPow2(uint16_t x)
 {
   uint16_t r = 1 << static_cast<uint16_t>(std::log2(x));


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic