[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mesos-commits
Subject:    [mesos] branch master updated: Fixed a bug preventing agent recovery when executor GC is interrupted
From:       asekretenko () apache ! org
Date:       2021-04-10 17:39:32
Message-ID: 161807637220.7736.12216350695584418256 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new f30f10d  Fixed a bug preventing agent recovery when executor GC is interrupted.
f30f10d is described below

commit f30f10d03e7291b60beedc9742f163d393f94d88
Author: Charles-Francois Natali <cf.natali@gmail.com>
AuthorDate: Sat Jan 30 18:41:37 2021 +0000

    Fixed a bug preventing agent recovery when executor GC is interrupted.
    
    If the agent is interrupted after garbage collecting the executor's
    latest run meta directory but before garbage collecting the top-level
    executor meta directory, the "latest" symlink will dangle, which would
    cause the agent executor recovery to fail.
    Instead, we can simply ignore if the "latest" symlink dangles, since
    it's always created after the latest run directory it points to, and
    never deleted until the top-level executor meta directory is garbage
    collected.
---
 src/slave/state.cpp                |  14 +++--
 src/tests/slave_recovery_tests.cpp | 124 +++++++++++++++++++++++++++++++++++++
 2 files changed, 133 insertions(+), 5 deletions(-)

diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 96f22b3..6d065d0 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -417,13 +417,17 @@ Try<ExecutorState> ExecutorState::recover(
   foreach (const string& path, runs.get()) {
     if (Path(path).basename() == paths::LATEST_SYMLINK) {
       const Result<string> latest = os::realpath(path);
-      if (!latest.isSome()) {
+      if (latest.isNone()) {
+        // This can happen if the slave died between garbage collecting the
+        // executor latest run and garbage collecting the top level executor
+        // meta directory, containing the "latest" symlink.
+        LOG(WARNING) << "Dangling 'latest' run symlink of executor '"
+                     << executorId << "'";
+        continue;
+      } else if (latest.isError()) {
         return Error(
             "Failed to find latest run of executor '" +
-            executorId.value() + "': " +
-            (latest.isError()
-             ? latest.error()
-             : "No such file or directory"));
+            executorId.value() + "': " + latest.error());
       }
 
       // Store the ContainerID of the latest executor run.
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 1c5177d..3980114 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -3301,6 +3301,130 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
 }
 
 
+// When the slave is down we remove the latest run directory
+// but not the "latest" symlink, to simulate a situation where the
+// slave died in the middle of gc'ing the run meta directory.
+TYPED_TEST(SlaveRecoveryTest, ExecutorDanglingLatestSymlink)
+{
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+  flags.strict = true;
+
+  Fetcher fetcher(flags);
+
+  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1)).WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->empty());
+
+  TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000));
+
+  // Capture the slave and framework ids.
+  SlaveID slaveId = offers1.get()[0].slave_id();
+  FrameworkID frameworkId = offers1.get()[0].framework_id();
+
+  Future<RegisterExecutorMessage> registerExecutor =
+    FUTURE_PROTOBUF(RegisterExecutorMessage(), _, _);
+
+  Future<Nothing> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureSatisfy(&status))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  driver.launchTasks(offers1.get()[0].id(), {task});
+
+  // Capture the executor id.
+  AWAIT_READY(registerExecutor);
+  ExecutorID executorId = registerExecutor->executor_id();
+
+  // Wait for TASK_RUNNING update.
+  AWAIT_READY(status);
+
+  // Terminate the slave.
+  slave.get()->terminate();
+
+  // The "latest" symlink should exist.
+  const string latestPath = paths::getExecutorLatestRunPath(
+      paths::getMetaRootDir(flags.work_dir),
+      slaveId,
+      frameworkId,
+      executorId);
+  ASSERT_TRUE(os::exists(latestPath));
+  // And should point to the latest run.
+  const Result<string> path = os::realpath(latestPath);
+  ASSERT_SOME(path);
+  // Delete it - "latest" will now dangle.
+  ASSERT_SOME(os::rmdir(path.get(), true));
+
+  // Recover the state.
+  Result<slave::state::State> recoverState =
+    slave::state::recover(paths::getMetaRootDir(flags.work_dir), true);
+
+  ASSERT_SOME(recoverState);
+  ASSERT_SOME(recoverState->slave);
+
+  // The executor should be recovered without any run.
+  slave::state::FrameworkState frameworkState =
+    recoverState->slave->frameworks.at(frameworkId);
+  ASSERT_EQ(1u, frameworkState.executors.size());
+  slave::state::ExecutorState& executorState =
+    frameworkState.executors.at(executorId);
+  ASSERT_NONE(executorState.latest);
+  ASSERT_TRUE(executorState.runs.empty());
+
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // The slave should re-register.
+  AWAIT_READY(reregisterSlaveMessage);
+
+  // Wait until executor's work and meta directories get gc'ed.
+  Clock::pause();
+  Clock::advance(flags.gc_delay);
+  Clock::settle();
+
+  ASSERT_FALSE(os::exists(paths::getExecutorPath(
+      flags.work_dir, slaveId, frameworkId, executorId)));
+  ASSERT_FALSE(os::exists(paths::getExecutorPath(
+      paths::getMetaRootDir(flags.work_dir),
+      slaveId,
+      frameworkId,
+      executorId)));
+
+  driver.stop();
+  driver.join();
+}
+
+
 // The slave is asked to shutdown. When it comes back up, it should
 // reregister as the same agent.
 TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic