[prev in list] [next in list] [prev in thread] [next in thread]
List: mesos-commits
Subject: (mesos) branch master updated: Added rates collector to network/port_mapping isolator.
From: bmahler () apache ! org
Date: 2024-01-22 18:04:15
Message-ID: 170594665551.1759884.17093311887344796345 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new e437a922a Added rates collector to network/port_mapping isolator.
e437a922a is described below
commit e437a922aa2542299c499f8136dde23bebff2775
Author: Ilya Pronin <ipronin@twitter.com>
AuthorDate: Thu Jan 11 16:56:28 2018 -0800
Added rates collector to network/port_mapping isolator.
Introduces RatesCollector class to network/port_mapping to (new)
RatePercentiles statistics on a variety of rates.
Statistics are derived from:
- https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
Queried through `rtnl_link_get_stat` in `src/linux/routing/link/link.cpp`.
---
include/mesos/mesos.proto | 48 +++
include/mesos/v1/mesos.proto | 48 +++
.../mesos/isolators/network/port_mapping.cpp | 363 ++++++++++++++++++++-
.../mesos/isolators/network/port_mapping.hpp | 75 ++++-
src/slave/flags.cpp | 18 +
src/slave/flags.hpp | 3 +
src/tests/containerizer/port_mapping_tests.cpp | 258 +++++++++++++++
7 files changed, 797 insertions(+), 16 deletions(-)
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 5db138f8d..5d3f2232e 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1891,6 +1891,54 @@ message ResourceStatistics {
optional uint64 net_rx_burst_rate_limit = 50;
optional uint64 net_rx_burst_size = 51;
+ message RatePercentiles {
+ optional uint64 min = 1;
+ optional uint64 max = 2;
+ optional uint64 p50 = 3;
+ optional uint64 p90 = 4;
+ optional uint64 p95 = 5;
+ optional uint64 p99 = 6;
+ optional uint64 p999 = 7;
+ optional uint64 p9999 = 8;
+ optional uint64 samples = 9;
+ }
+
+ // Network rate statistics measured in bytes per second
+ // or packets per second.
+ //
+ // Rates are sampled every {sampling_interval_secs}. A
+ // time series is created out of the samples taken over
+ // a moving sampling window of {sampling_window_secs}.
+ // Percentiles for each time series are exposed through
+ // RatePercentiles.
+ //
+ // Linux documentation for more information:
+ // https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
+ message RateStatistics {
+ // Bytes received per second.
+ optional RatePercentiles rx_rate = 1;
+ // Packets received per second.
+ optional RatePercentiles rx_packet_rate = 2;
+ // Received packets dropped per second.
+ optional RatePercentiles rx_drop_rate = 3;
+ // Receiving packet errors per second.
+ optional RatePercentiles rx_error_rate = 4;
+ // Bytes sent per second.
+ optional RatePercentiles tx_rate = 5;
+ // Packets sent per second.
+ optional RatePercentiles tx_packet_rate = 6;
+ // Send packets dropped per second.
+ optional RatePercentiles tx_drop_rate = 7;
+ // Sending packet errors per second.
+ optional RatePercentiles tx_error_rate = 8;
+ // Duration of the sliding time series window.
+ optional double sampling_window_secs = 9;
+ // The delay between rate samples.
+ optional double sampling_interval_secs = 10;
+ }
+
+ optional RateStatistics net_rate_statistics = 51;
+
// The kernel keeps track of RTT (round-trip time) for its TCP
// sockets. RTT is a way to tell the latency of a container.
optional double net_tcp_rtt_microsecs_p50 = 22;
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 4a71b3474..98c9a6693 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1855,6 +1855,54 @@ message ResourceStatistics {
optional uint64 net_rx_burst_rate_limit = 50;
optional uint64 net_rx_burst_size = 51;
+ message RatePercentiles {
+ optional uint64 min = 1;
+ optional uint64 max = 2;
+ optional uint64 p50 = 3;
+ optional uint64 p90 = 4;
+ optional uint64 p95 = 5;
+ optional uint64 p99 = 6;
+ optional uint64 p999 = 7;
+ optional uint64 p9999 = 8;
+ optional uint64 samples = 9;
+ }
+
+ // Network rate statistics measured in bytes per second
+ // or packets per second.
+ //
+ // Rates are sampled every {sampling_interval_secs}. A
+ // time series is created out of the samples taken over
+ // a moving sampling window of {sampling_window_secs}.
+ // Percentiles for each time series are exposed through
+ // RatePercentiles.
+ //
+ // Linux documentation for more information:
+ // https://docs.kernel.org/networking/statistics.html#c.rtnl_link_stats64
+ message RateStatistics {
+ // Bytes received per second.
+ optional RatePercentiles rx_rate = 1;
+ // Packets received per second.
+ optional RatePercentiles rx_packet_rate = 2;
+ // Received packets dropped per second.
+ optional RatePercentiles rx_drop_rate = 3;
+ // Receiving packet errors per second.
+ optional RatePercentiles rx_error_rate = 4;
+ // Bytes sent per second.
+ optional RatePercentiles tx_rate = 5;
+ // Packets sent per second.
+ optional RatePercentiles tx_packet_rate = 6;
+ // Send packets dropped per second.
+ optional RatePercentiles tx_drop_rate = 7;
+ // Sending packet errors per second.
+ optional RatePercentiles tx_error_rate = 8;
+ // Duration of the sliding time series window.
+ optional double sampling_window_secs = 9;
+ // The delay between rate samples.
+ optional double sampling_interval_secs = 10;
+ }
+
+ optional RateStatistics net_rate_statistics = 51;
+
// The kernel keeps track of RTT (round-trip time) for its TCP
// sockets. RTT is a way to tell the latency of a container.
optional double net_tcp_rtt_microsecs_p50 = 22;
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index \
3ed6863bc..f47a7bbb5 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -1483,6 +1483,279 @@ int PortMappingHTBConfig::execute()
}
+/////////////////////////////////////////////////
+// Implementation of RatesCollector.
+/////////////////////////////////////////////////
+
+PercentileRatesCollector::PercentileRatesCollector(
+ pid_t _executorPid,
+ const Duration& _window,
+ const Duration& _interval)
+ : link(veth(_executorPid))
+{
+ const size_t capacity = _window.secs() / _interval.secs();
+ rxRates = TimeSeries<uint64_t>(_window, capacity);
+ rxPackets = TimeSeries<uint64_t>(_window, capacity);
+ rxDrops = TimeSeries<uint64_t>(_window, capacity);
+ rxErrors = TimeSeries<uint64_t>(_window, capacity);
+ txRates = TimeSeries<uint64_t>(_window, capacity);
+ txPackets = TimeSeries<uint64_t>(_window, capacity);
+ txDrops = TimeSeries<uint64_t>(_window, capacity);
+ txErrors = TimeSeries<uint64_t>(_window, capacity);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxRate() const
+{
+ return Statistics<uint64_t>::from(rxRates);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxPacketRate() const
+{
+ return Statistics<uint64_t>::from(rxPackets);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxDropRate() const
+{
+ return Statistics<uint64_t>::from(rxDrops);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::rxErrorRate() const
+{
+ return Statistics<uint64_t>::from(rxErrors);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txRate() const
+{
+ return Statistics<uint64_t>::from(txRates);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txPacketRate() const
+{
+ return Statistics<uint64_t>::from(txPackets);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txDropRate() const
+{
+ return Statistics<uint64_t>::from(txDrops);
+}
+
+
+Option<Statistics<uint64_t>> PercentileRatesCollector::txErrorRate() const
+{
+ return Statistics<uint64_t>::from(txErrors);
+}
+
+
+void PercentileRatesCollector::sample()
+{
+ const Time ts = Clock::now();
+
+ Result<hashmap<string, uint64_t>> stats = link::statistics(link);
+ if (stats.isSome()) {
+ sample(ts, std::move(stats.get()));
+ }
+}
+
+
+void PercentileRatesCollector::sample(
+ const Time& ts, hashmap<string, uint64_t>&& statistics)
+{
+ if (previous.isSome() && previous.get() < ts) {
+ // We sample statistics on the host end of the veth pair, so we
+ // need to reverse RX and TX to get statistics inside the
+ // container.
+ const double deltaT = ts.secs() - previous->secs();
+ sampleRate(statistics, "tx_bytes", ts, deltaT, rxRates);
+ sampleRate(statistics, "tx_packets", ts, deltaT, rxPackets);
+ sampleRate(statistics, "tx_dropped", ts, deltaT, rxDrops);
+ sampleRate(statistics, "tx_errors", ts, deltaT, rxErrors);
+ sampleRate(statistics, "rx_bytes", ts, deltaT, txRates);
+ sampleRate(statistics, "rx_packets", ts, deltaT, txPackets);
+ sampleRate(statistics, "rx_dropped", ts, deltaT, txDrops);
+ sampleRate(statistics, "rx_errors", ts, deltaT, txErrors);
+ }
+
+ previous = ts;
+ previousStatistics = std::move(statistics);
+}
+
+
+void PercentileRatesCollector::sampleRate(
+ const hashmap<string, uint64_t>& statistics,
+ const string& metric,
+ const Time& timestamp,
+ double timeDelta,
+ TimeSeries<uint64_t>& rates)
+{
+ const Option<uint64_t> previousValue = previousStatistics.get(metric);
+ const Option<uint64_t> value = statistics.get(metric);
+ if (previousValue.isSome() && value.isSome()) {
+ rates.set((value.get() - previousValue.get()) / timeDelta, timestamp);
+ }
+}
+
+
+class RatesCollectorProcess : public Process<RatesCollectorProcess>
+{
+public:
+ explicit RatesCollectorProcess(
+ const Duration& _interval,
+ const Duration& _window)
+ : ProcessBase(ID::generate("mesos-port-mapping-rates-collector")),
+ interval(_interval),
+ window(_window)
+ {
+ CHECK_GT(window, interval);
+ }
+
+ void initialize() override
+ {
+ schedule();
+ }
+
+ Future<ResourceStatistics> usage(const ContainerID& containerId)
+ {
+ ResourceStatistics statistics;
+
+ if (!collectors.contains(containerId)) {
+ LOG(WARNING) << "Unknown container " << containerId;
+ return statistics;
+ }
+
+ const PercentileRatesCollector& collector = collectors.at(containerId);
+
+ ResourceStatistics::RateStatistics* rates =
+ statistics.mutable_net_rate_statistics();
+
+ Option<Statistics<uint64_t>> rate = collector.rxRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_rx_rate());
+ }
+ rate = collector.rxPacketRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_rx_packet_rate());
+ }
+ rate = collector.rxDropRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_rx_drop_rate());
+ }
+ rate = collector.rxErrorRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_rx_error_rate());
+ }
+ rate = collector.txRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_tx_rate());
+ }
+ rate = collector.txPacketRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_tx_packet_rate());
+ }
+ rate = collector.txDropRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_tx_drop_rate());
+ }
+ rate = collector.txErrorRate();
+ if (rate.isSome()) {
+ copyRate(rate.get(), rates->mutable_tx_error_rate());
+ }
+
+ rates->set_sampling_window_secs(window.secs());
+ rates->set_sampling_interval_secs(interval.secs());
+
+ return statistics;
+ }
+
+ // Add new container for metrics collecting.
+ Future<Nothing> add(const ContainerID& containerId, pid_t executorPid)
+ {
+ collectors.emplace(
+ containerId, PercentileRatesCollector(executorPid, window, interval));
+ return Nothing();
+ }
+
+ // Stop collecting metrics for the container.
+ Future<Nothing> remove(const ContainerID& containerId)
+ {
+ collectors.erase(containerId);
+ return Nothing();
+ }
+
+private:
+ void schedule()
+ {
+ foreachvalue (PercentileRatesCollector& collector, collectors) {
+ collector.sample();
+ }
+
+ delay(interval, self(), &Self::schedule);
+ }
+
+ void copyRate(
+ const Statistics<uint64_t>& statistics,
+ ResourceStatistics::RatePercentiles* rate)
+ {
+ rate->set_min(statistics.min);
+ rate->set_max(statistics.max);
+ rate->set_p50(statistics.p50);
+ rate->set_p90(statistics.p90);
+ rate->set_p95(statistics.p95);
+ rate->set_p99(statistics.p99);
+ rate->set_p999(statistics.p999);
+ rate->set_p9999(statistics.p9999);
+ rate->set_samples(statistics.count);
+ }
+
+ const Duration interval;
+ const Duration window;
+
+ hashmap<ContainerID, PercentileRatesCollector> collectors;
+};
+
+
+class RatesCollector
+{
+public:
+ explicit RatesCollector(const Duration& interval, const Duration& window)
+ {
+ process = new RatesCollectorProcess(interval, window);
+ spawn(process);
+ }
+
+ ~RatesCollector()
+ {
+ terminate(process);
+ wait(process);
+ delete process;
+ }
+
+ Future<ResourceStatistics> usage(const ContainerID& containerId)
+ {
+ return dispatch(process, &RatesCollectorProcess::usage, containerId);
+ }
+
+ Future<Nothing> add(const ContainerID& containerId, pid_t pid)
+ {
+ return dispatch(process, &RatesCollectorProcess::add, containerId, pid);
+ }
+
+ Future<Nothing> remove(const ContainerID& containerId)
+ {
+ return dispatch(process, &RatesCollectorProcess::remove, containerId);
+ }
+
+private:
+ RatesCollectorProcess* process;
+};
+
+
/////////////////////////////////////////////////
// Implementation for the isolator.
/////////////////////////////////////////////////
@@ -1995,6 +2268,31 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& \
flags) }
}
+ Owned<RatesCollector> ratesCollector;
+ if (flags.network_enable_rate_statistics) {
+ if (flags.network_rate_statistics_window.isNone() ||
+ flags.network_rate_statistics_interval.isNone()) {
+ return Error("Window size and sampling interval for rate statistics "
+ "are required");
+ }
+
+ if (flags.network_rate_statistics_window.get() <=
+ flags.network_rate_statistics_interval.get()) {
+ return Error("Rate statistics window size should be bigger than "
+ "the sampling interval");
+ }
+
+ const Duration minInterval = Milliseconds(20);
+ if (flags.network_rate_statistics_interval.get() < minInterval) {
+ return Error("Rate statistics interval should not be smaller than "
+ + stringify(minInterval));
+ }
+
+ ratesCollector.reset(new RatesCollector(
+ flags.network_rate_statistics_interval.get(),
+ flags.network_rate_statistics_window.get()));
+ }
+
// Get the host IP network, MAC and default gateway.
Result<net::IP::Network> hostIPNetwork =
net::IP::Network::fromLinkDevice(eth0.get(), AF_INET);
@@ -2380,10 +2678,44 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const \
Flags& flags) hostNetworkConfigurations,
nonEphemeralPorts,
ephemeralPortsAllocator,
- freeFlowIds)));
+ freeFlowIds,
+ ratesCollector)));
}
+PortMappingIsolatorProcess::PortMappingIsolatorProcess(
+ const Flags& _flags,
+ const std::string& _bindMountRoot,
+ const std::string& _eth0,
+ const std::string& _lo,
+ const net::MAC& _hostMAC,
+ const net::IP::Network& _hostIPNetwork,
+ const size_t _hostEth0MTU,
+ const net::IP& _hostDefaultGateway,
+ const routing::Handle& _hostTxFqCodelHandle,
+ const hashmap<std::string, std::string>& _hostNetworkConfigurations,
+ const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
+ const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
+ const std::set<uint16_t>& _flowIDs,
+ const Owned<RatesCollector>& _ratesCollector)
+ : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
+ flags(_flags),
+ bindMountRoot(_bindMountRoot),
+ eth0(_eth0),
+ lo(_lo),
+ hostMAC(_hostMAC),
+ hostIPNetwork(_hostIPNetwork),
+ hostEth0MTU(_hostEth0MTU),
+ hostDefaultGateway(_hostDefaultGateway),
+ hostTxFqCodelHandle(_hostTxFqCodelHandle),
+ hostNetworkConfigurations(_hostNetworkConfigurations),
+ managedNonEphemeralPorts(_managedNonEphemeralPorts),
+ ephemeralPortsAllocator(_ephemeralPortsAllocator),
+ freeFlowIds(_flowIDs),
+ ratesCollector(_ratesCollector)
+{}
+
+
Result<htb::cls::Config> recoverHTBConfig(
pid_t pid,
const std::string& eth0,
@@ -2657,6 +2989,10 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
infos[containerId] = recover.get();
+ if (ratesCollector.get()) {
+ ratesCollector->add(containerId, pid);
+ }
+
// Remove the successfully recovered pid.
pids.erase(pid);
}
@@ -3391,6 +3727,10 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
return Failure("Not expecting " + veth(pid) + " to be missing");
}
+ if (ratesCollector.get()) {
+ ratesCollector->add(containerId, pid);
+ }
+
return Nothing();
}
@@ -3839,12 +4179,14 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage(
PID<PortMappingIsolatorProcess>(this),
&PortMappingIsolatorProcess::_usage,
result,
+ containerId,
s.get()));
}
Future<ResourceStatistics> PortMappingIsolatorProcess::_usage(
const ResourceStatistics& result,
+ const ContainerID& containerId,
const Subprocess& s)
{
CHECK_READY(s.status());
@@ -3865,12 +4207,14 @@ Future<ResourceStatistics> \
PortMappingIsolatorProcess::_usage( PID<PortMappingIsolatorProcess>(this),
&PortMappingIsolatorProcess::__usage,
result,
+ containerId,
lambda::_1));
}
Future<ResourceStatistics> PortMappingIsolatorProcess::__usage(
ResourceStatistics result,
+ const ContainerID& containerId,
const Future<string>& out)
{
CHECK_READY(out);
@@ -3902,6 +4246,19 @@ Future<ResourceStatistics> \
PortMappingIsolatorProcess::__usage( // will overwrite the timestamp set in the \
containerizer. result.clear_timestamp();
+ if (ratesCollector.get()) {
+ return ratesCollector->usage(containerId)
+ .then([result](const Future<ResourceStatistics>& usage) mutable {
+ if (!usage.isReady()) {
+ LOG(WARNING) << "Failed to retrieve rates from the collector: "
+ << (usage.isFailed() ? usage.failure() : "discarded");
+ } else {
+ result.MergeFrom(usage.get());
+ }
+ return result;
+ });
+ }
+
return result;
}
@@ -3919,6 +4276,10 @@ Future<Nothing> PortMappingIsolatorProcess::cleanup(
return Nothing();
}
+ if (ratesCollector.get()) {
+ ratesCollector->remove(containerId);
+ }
+
Info* info = CHECK_NOTNULL(infos[containerId]);
// For a normally exited container, we take its info pointer off the
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp \
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index \
888b5806e..c073c6d9c 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -21,6 +21,7 @@
#include <sys/types.h>
+#include <memory>
#include <set>
#include <string>
#include <vector>
@@ -155,6 +156,59 @@ Result<routing::queueing::htb::cls::Config> recoverHTBConfig(
const std::string& eth0,
const Flags& flags);
+
+class PercentileRatesCollector final
+{
+public:
+ explicit PercentileRatesCollector(
+ pid_t _executorPid, const Duration& _window, const Duration& _interval);
+
+ Option<process::Statistics<uint64_t>> rxRate() const;
+ Option<process::Statistics<uint64_t>> rxPacketRate() const;
+ Option<process::Statistics<uint64_t>> rxDropRate() const;
+ Option<process::Statistics<uint64_t>> rxErrorRate() const;
+
+ Option<process::Statistics<uint64_t>> txRate() const;
+ Option<process::Statistics<uint64_t>> txPacketRate() const;
+ Option<process::Statistics<uint64_t>> txDropRate() const;
+ Option<process::Statistics<uint64_t>> txErrorRate() const;
+
+ // Sample statistics from the interface.
+ void sample();
+
+ // Register the statistics sample. Exposed for testing.
+ void sample(const process::Time& ts, hashmap<std::string, uint64_t>&& stats);
+
+private:
+ void sampleRate(
+ const hashmap<std::string, uint64_t>& statistics,
+ const std::string& metric,
+ const process::Time& timestamp,
+ double timeDelta,
+ process::TimeSeries<uint64_t>& rates);
+
+ // Name of the link to sample metrics from.
+ const std::string link;
+
+ process::TimeSeries<uint64_t> rxRates;
+ process::TimeSeries<uint64_t> rxPackets;
+ process::TimeSeries<uint64_t> rxDrops;
+ process::TimeSeries<uint64_t> rxErrors;
+
+ process::TimeSeries<uint64_t> txRates;
+ process::TimeSeries<uint64_t> txPackets;
+ process::TimeSeries<uint64_t> txDrops;
+ process::TimeSeries<uint64_t> txErrors;
+
+ // Previous sample and its timestamp for calculating rates.
+ Option<process::Time> previous;
+ hashmap<std::string, uint64_t> previousStatistics;
+};
+
+
+class RatesCollector;
+
+
// Provides network isolation using port mapping. Each container is
// assigned a fixed set of ports (including ephemeral ports). The
// isolator will set up filters on the host such that network traffic
@@ -290,21 +344,8 @@ private:
const hashmap<std::string, std::string>& _hostNetworkConfigurations,
const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
- const std::set<uint16_t>& _flowIDs)
- : ProcessBase(process::ID::generate("mesos-port-mapping-isolator")),
- flags(_flags),
- bindMountRoot(_bindMountRoot),
- eth0(_eth0),
- lo(_lo),
- hostMAC(_hostMAC),
- hostIPNetwork(_hostIPNetwork),
- hostEth0MTU(_hostEth0MTU),
- hostDefaultGateway(_hostDefaultGateway),
- hostTxFqCodelHandle(_hostTxFqCodelHandle),
- hostNetworkConfigurations(_hostNetworkConfigurations),
- managedNonEphemeralPorts(_managedNonEphemeralPorts),
- ephemeralPortsAllocator(_ephemeralPortsAllocator),
- freeFlowIds(_flowIDs) {}
+ const std::set<uint16_t>& _flowIDs,
+ const process::Owned<RatesCollector>& _ratesCollector);
// Continuations.
Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
@@ -316,10 +357,12 @@ private:
process::Future<ResourceStatistics> _usage(
const ResourceStatistics& result,
+ const ContainerID& containerId,
const process::Subprocess& s);
process::Future<ResourceStatistics> __usage(
ResourceStatistics result,
+ const ContainerID& containerId,
const process::Future<std::string>& out);
// Helper functions.
@@ -380,6 +423,8 @@ private:
// Recovered containers from a previous run that weren't managed by
// the network isolator.
hashset<ContainerID> unmanaged;
+
+ process::Owned<RatesCollector> ratesCollector;
};
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 90cf32e71..594dbedf8 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1327,6 +1327,24 @@ mesos::internal::slave::Flags::Flags()
"isolator.",
false);
+ add(&Flags::network_enable_rate_statistics,
+ "network_enable_rate_statistics",
+ "Whether to collect rate statistics for each container. Note that\n"
+ "proper sampling window and interval configuration is required to get\n"
+ "meaningful percentiles. This flag is used for the\n"
+ "'network/port_mapping'\n isolator.",
+ false);
+
+ add(&Flags::network_rate_statistics_window,
+ "network_rate_statistics_window",
+ "Window size for rate statistics time series. This flag is used for the\n"
+ "'network/port_mapping' isolator.");
+
+ add(&Flags::network_rate_statistics_interval,
+ "network_rate_statistics_interval",
+ "Interval with which to sample rate statistics. This flag is used for\n"
+ "the 'network/port_mapping' isolator.");
+
#endif // ENABLE_PORT_MAPPING_ISOLATOR
#ifdef ENABLE_NETWORK_PORTS_ISOLATOR
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 12c2404f9..0b6bd91c4 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -169,6 +169,9 @@ public:
bool network_enable_socket_statistics_summary;
bool network_enable_socket_statistics_details;
bool network_enable_snmp_statistics;
+ bool network_enable_rate_statistics;
+ Option<Duration> network_rate_statistics_window;
+ Option<Duration> network_rate_statistics_interval;
#endif // ENABLE_PORT_MAPPING_ISOLATOR
#ifdef ENABLE_NETWORK_PORTS_ISOLATOR
diff --git a/src/tests/containerizer/port_mapping_tests.cpp \
b/src/tests/containerizer/port_mapping_tests.cpp index c8ae69853..579b8205a 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -2157,6 +2157,264 @@ TEST_F(PortMappingIsolatorTest, \
ROOT_NC_PortMappingStatistics) }
+// Verify that rate statistics can be returned properly from
+// 'usage()'. This test is very similar to SmallIngressLimitTest in
+// setup.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_PortMappingRateStatistics)
+{
+ const Bytes rate = 2000;
+ const Bytes size = 20480;
+
+ flags.ingress_rate_limit_per_container = rate;
+ flags.minimum_ingress_rate_limit = 0;
+ flags.network_enable_rate_statistics = true;
+ flags.network_rate_statistics_window = Seconds(5);
+ flags.network_rate_statistics_interval = Milliseconds(50);
+
+ Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+ ASSERT_SOME(isolator);
+
+ Try<Launcher*> launcher = LinuxLauncher::create(flags);
+ ASSERT_SOME(launcher);
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(
+ Resources::parse(container1Ports).get());
+
+ ContainerID containerId;
+ containerId.set_value(id::UUID::random().toString());
+
+ Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+ ASSERT_SOME(dir);
+
+ ContainerConfig containerConfig;
+ containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
+ containerConfig.mutable_resources()->CopyFrom(executorInfo.resources());
+ containerConfig.set_directory(dir.get());
+
+ Future<Option<ContainerLaunchInfo>> launchInfo = isolator.get()->prepare(
+ containerId, containerConfig);
+ AWAIT_READY(launchInfo);
+ ASSERT_SOME(launchInfo.get());
+ ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size());
+
+ ostringstream cmd1;
+ cmd1 << "touch " << container1Ready << " && ";
+ cmd1 << "nc -l -k localhost " << validPort << " > /dev/null";
+
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ Try<pid_t> pid = launchHelper(
+ launcher.get(),
+ pipes,
+ containerId,
+ cmd1.str(),
+ launchInfo.get());
+
+ ASSERT_SOME(pid);
+
+ // Reap the forked child.
+ Future<Option<int>> reap = process::reap(pid.get());
+
+ // Continue in the parent.
+ ::close(pipes[0]);
+
+ // Isolate the forked child.
+ AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+ // Now signal the child to continue.
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+ ::close(pipes[1]);
+
+ // Wait for the command to finish.
+ ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+ const string data(size.bytes(), 'a');
+
+ ostringstream cmd2;
+ cmd2 << "echo " << data << " | nc localhost " << validPort;
+
+ Stopwatch stopwatch;
+ stopwatch.start();
+ ASSERT_SOME(os::shell(cmd2.str()));
+ Duration time = stopwatch.elapsed();
+
+ // Allow the time to deviate up to 1sec here to compensate for burstness.
+ Duration expectedTime = Seconds(size.bytes() / rate.bytes() - 1);
+ ASSERT_GE(time, expectedTime);
+
+ // Number of samples that should fit into the window.
+ const size_t samples = flags.network_rate_statistics_window->secs() /
+ flags.network_rate_statistics_interval->secs();
+
+ // Verify that TX and RX rates have been returned with resource
+ // statistics. It's hard to verify actual values here because of
+ // burstness.
+ Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
+ AWAIT_READY(usage);
+ EXPECT_TRUE(usage->has_net_rate_statistics());
+
+ const ResourceStatistics::RateStatistics& rates =
+ usage->net_rate_statistics();
+ EXPECT_TRUE(rates.has_tx_rate());
+ EXPECT_TRUE(rates.tx_rate().has_p90());
+ EXPECT_TRUE(rates.tx_rate().has_samples());
+ EXPECT_GE(samples, rates.tx_rate().samples());
+ EXPECT_TRUE(rates.has_tx_packet_rate());
+ EXPECT_TRUE(rates.tx_packet_rate().has_p90());
+ EXPECT_TRUE(rates.has_tx_drop_rate());
+ EXPECT_TRUE(rates.tx_drop_rate().has_p90());
+ EXPECT_TRUE(rates.has_tx_error_rate());
+ EXPECT_TRUE(rates.tx_error_rate().has_p90());
+ EXPECT_TRUE(rates.has_rx_rate());
+ EXPECT_TRUE(rates.rx_rate().has_p90());
+ EXPECT_TRUE(rates.has_rx_packet_rate());
+ EXPECT_TRUE(rates.rx_packet_rate().has_p90());
+ EXPECT_TRUE(rates.has_rx_drop_rate());
+ EXPECT_TRUE(rates.rx_drop_rate().has_p90());
+ EXPECT_TRUE(rates.has_rx_error_rate());
+ EXPECT_TRUE(rates.rx_error_rate().has_p90());
+
+ EXPECT_TRUE(rates.has_sampling_window_secs());
+ EXPECT_EQ(
+ flags.network_rate_statistics_window->secs(),
+ rates.sampling_window_secs());
+ EXPECT_TRUE(rates.has_sampling_interval_secs());
+ EXPECT_EQ(
+ flags.network_rate_statistics_interval->secs(),
+ rates.sampling_interval_secs());
+
+ // Ensure all processes are killed.
+ AWAIT_READY(launcher.get()->destroy(containerId));
+
+ // Let the isolator clean up.
+ AWAIT_READY(isolator.get()->cleanup(containerId));
+
+ delete isolator.get();
+ delete launcher.get();
+}
+
+
+// Verify that PercentileRatesCollector calculates rates correctly.
+TEST(RatesCollectorTest, PercentileRatesCollector)
+{
+ Clock::pause();
+ const Duration window = Seconds(10);
+ const Duration interval = Seconds(1);
+ const Time now = Clock::now();
+
+ PercentileRatesCollector collector(0, window, interval);
+
+ // No samples.
+ EXPECT_NONE(collector.txRate());
+ EXPECT_NONE(collector.txPacketRate());
+ EXPECT_NONE(collector.txDropRate());
+ EXPECT_NONE(collector.txErrorRate());
+ EXPECT_NONE(collector.rxRate());
+ EXPECT_NONE(collector.rxPacketRate());
+ EXPECT_NONE(collector.rxDropRate());
+ EXPECT_NONE(collector.rxErrorRate());
+
+ const auto createSample = [](
+ uint64_t txBytes,
+ uint64_t txPackets,
+ uint64_t txDropped,
+ uint64_t txErrors,
+ uint64_t rxBytes,
+ uint64_t rxPackets,
+ uint64_t rxDropped,
+ uint64_t rxErrors) -> hashmap<string, uint64_t> {
+ return {{"tx_bytes", txBytes},
+ {"tx_packets", txPackets},
+ {"tx_dropped", txDropped},
+ {"tx_errors", txErrors},
+ {"rx_bytes", rxBytes},
+ {"rx_packets", rxPackets},
+ {"rx_dropped", rxDropped},
+ {"rx_errors", rxErrors}};
+ };
+
+ // Simulate ingress traffic burst at 100 B/s for the first 2 sec and
+ // steady 50 B/s rate for 3 sec after that. Egress traffic rate was
+ // 10 B/s for the first 2 sec and 0 for the rest of the time.
+ collector.sample(now, createSample(0, 0, 0, 0, 0, 0, 0, 0));
+ collector.sample(now + Seconds(1), createSample(100, 10, 5, 1, 10, 1, 1, 1));
+ collector.sample(now + Seconds(2), createSample(200, 20, 10, 2, 20, 2, 2, 2));
+ collector.sample(now + Seconds(3), createSample(250, 25, 11, 3, 20, 2, 2, 2));
+ collector.sample(now + Seconds(4), createSample(300, 30, 12, 4, 20, 2, 2, 2));
+ collector.sample(now + Seconds(5), createSample(350, 35, 13, 5, 20, 2, 2, 2));
+
+ Option<Statistics<uint64_t>> rxRate = collector.rxRate();
+ ASSERT_SOME(rxRate);
+ EXPECT_EQ(5u, rxRate->count); // Number of statistics samples.
+ EXPECT_EQ(100u, rxRate->max); // Max seen byte rate.
+ EXPECT_EQ(50u, rxRate->min); // Min seen byte rate.
+ EXPECT_EQ(100u, rxRate->p90); // p90 is 4.5th sample here.
+ EXPECT_EQ(50u, rxRate->p50); // p50 is 2.5th sample here.
+
+ Option<Statistics<uint64_t>> rxPacketRate = collector.rxPacketRate();
+ ASSERT_SOME(rxPacketRate);
+ EXPECT_EQ(5u, rxPacketRate->count);
+ EXPECT_EQ(10u, rxPacketRate->max);
+ EXPECT_EQ(5u, rxPacketRate->min);
+ EXPECT_EQ(10u, rxPacketRate->p90);
+ EXPECT_EQ(5u, rxPacketRate->p50);
+
+ Option<Statistics<uint64_t>> rxDropRate = collector.rxDropRate();
+ ASSERT_SOME(rxDropRate);
+ EXPECT_EQ(5u, rxDropRate->count);
+ EXPECT_EQ(5u, rxDropRate->max);
+ EXPECT_EQ(1u, rxDropRate->min);
+ EXPECT_EQ(5u, rxDropRate->p90);
+ EXPECT_EQ(1u, rxDropRate->p50);
+
+ // RX error rate is constantly 1 here.
+ Option<Statistics<uint64_t>> rxErrorRate = collector.rxErrorRate();
+ ASSERT_SOME(rxErrorRate);
+ EXPECT_EQ(5u, rxErrorRate->count);
+ EXPECT_EQ(1u, rxErrorRate->max);
+ EXPECT_EQ(1u, rxErrorRate->min);
+ EXPECT_EQ(1u, rxErrorRate->p90);
+ EXPECT_EQ(1u, rxErrorRate->p50);
+
+ Option<Statistics<uint64_t>> txRate = collector.txRate();
+ ASSERT_SOME(txRate);
+ EXPECT_EQ(5u, txRate->count);
+ EXPECT_EQ(10u, txRate->max);
+ EXPECT_EQ(0u, txRate->min);
+ EXPECT_EQ(10u, txRate->p90);
+ EXPECT_EQ(0u, txRate->p50);
+
+ Option<Statistics<uint64_t>> txPacketRate = collector.txPacketRate();
+ ASSERT_SOME(txPacketRate);
+ EXPECT_EQ(5u, txPacketRate->count);
+ EXPECT_EQ(1u, txPacketRate->max);
+ EXPECT_EQ(0u, txPacketRate->min);
+ EXPECT_EQ(1u, txPacketRate->p90);
+ EXPECT_EQ(0u, txPacketRate->p50);
+
+ Option<Statistics<uint64_t>> txDropRate = collector.txDropRate();
+ ASSERT_SOME(txDropRate);
+ EXPECT_EQ(5u, txDropRate->count);
+ EXPECT_EQ(1u, txDropRate->max);
+ EXPECT_EQ(0u, txDropRate->min);
+ EXPECT_EQ(1u, txDropRate->p90);
+ EXPECT_EQ(0u, txDropRate->p50);
+
+ Option<Statistics<uint64_t>> txErrorRate = collector.txErrorRate();
+ ASSERT_SOME(txErrorRate);
+ EXPECT_EQ(5u, txErrorRate->count);
+ EXPECT_EQ(1u, txErrorRate->max);
+ EXPECT_EQ(0u, txErrorRate->min);
+ EXPECT_EQ(1u, txErrorRate->p90);
+ EXPECT_EQ(0u, txErrorRate->p50);
+
+ Clock::resume();
+}
+
+
static uint16_t roundUpToPow2(uint16_t x)
{
uint16_t r = 1 << static_cast<uint16_t>(std::log2(x));
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic