[prev in list] [next in list] [prev in thread] [next in thread]
List: hadoop-commits
Subject: hadoop git commit: YARN-7822. Constraint satisfaction checker support for composite OR and AND const
From: asuresh () apache ! org
Date: 2018-01-31 16:12:23
Message-ID: 455bd0ac399b4099854f76685c3a8442 () git ! apache ! org
[Download RAW message or body]
Repository: hadoop
Updated Branches:
refs/heads/trunk 8d1e2c640 -> d48134478
YARN-7822. Constraint satisfaction checker support for composite OR and AND \
constraints. (Weiwei Yang via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4813447
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4813447
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4813447
Branch: refs/heads/trunk
Commit: d4813447831770446399f2d6501860141551ff33
Parents: 8d1e2c6
Author: Arun Suresh <asuresh@apache.org>
Authored: Tue Jan 30 10:15:33 2018 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Wed Jan 31 01:51:08 2018 -0800
----------------------------------------------------------------------
.../TestPlacementConstraintTransformations.java | 2 +-
.../constraint/PlacementConstraintsUtil.java | 53 +++-
.../TestPlacementConstraintsUtil.java | 278 ++++++++++++++++---
.../constraint/TestPlacementProcessor.java | 159 ++++++++++-
4 files changed, 444 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4813447/hadoop-yarn-project/hadoop \
-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java \
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
index 62da092..aa92d7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java
@@ -156,7 +156,7 @@ public class TestPlacementConstraintTransformations {
SingleConstraintTransformer singleTransformer =
new SingleConstraintTransformer(specConstraint);
PlacementConstraint simConstraint = singleTransformer.transform();
- Assert.assertTrue(constraintExpr instanceof Or);
+ Assert.assertTrue(simConstraint.getConstraintExpr() instanceof Or);
Or simOrExpr = (Or) specConstraint.getConstraintExpr();
for (AbstractConstraint child : simOrExpr.getChildren()) {
Assert.assertTrue(child instanceof SingleConstraint);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4813447/hadoop-yarn-project/hadoop \
-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/h \
adoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-res \
ourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java \
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanage \
r/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 199dd62..6396e57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
@@ -149,6 +151,48 @@ public final class PlacementConstraintsUtil {
return true;
}
+ /**
+ * Returns true if all child constraints are satisfied.
+ * @param appId application id
+ * @param constraint Or constraint
+ * @param node node
+ * @param atm allocation tags manager
+ * @return true if all child constraints are satisfied, false otherwise
+ * @throws InvalidAllocationTagsQueryException
+ */
+ private static boolean canSatisfyAndConstraint(ApplicationId appId,
+ And constraint, SchedulerNode node, AllocationTagsManager atm)
+ throws InvalidAllocationTagsQueryException {
+ // Iterate over the constraints tree, if found any child constraint
+ // isn't satisfied, return false.
+ for (AbstractConstraint child : constraint.getChildren()) {
+ if(!canSatisfyConstraints(appId, child.build(), node, atm)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns true as long as any of child constraint is satisfied.
+ * @param appId application id
+ * @param constraint Or constraint
+ * @param node node
+ * @param atm allocation tags manager
+ * @return true if any child constraint is satisfied, false otherwise
+ * @throws InvalidAllocationTagsQueryException
+ */
+ private static boolean canSatisfyOrConstraint(ApplicationId appId,
+ Or constraint, SchedulerNode node, AllocationTagsManager atm)
+ throws InvalidAllocationTagsQueryException {
+ for (AbstractConstraint child : constraint.getChildren()) {
+ if (canSatisfyConstraints(appId, child.build(), node, atm)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static boolean canSatisfyConstraints(ApplicationId appId,
PlacementConstraint constraint, SchedulerNode node,
AllocationTagsManager atm)
@@ -167,9 +211,16 @@ public final class PlacementConstraintsUtil {
if (sConstraintExpr instanceof SingleConstraint) {
SingleConstraint single = (SingleConstraint) sConstraintExpr;
return canSatisfySingleConstraint(appId, single, node, atm);
+ } else if (sConstraintExpr instanceof And) {
+ And and = (And) sConstraintExpr;
+ return canSatisfyAndConstraint(appId, and, node, atm);
+ } else if (sConstraintExpr instanceof Or) {
+ Or or = (Or) sConstraintExpr;
+ return canSatisfyOrConstraint(appId, or, node, atm);
} else {
throw new InvalidAllocationTagsQueryException(
- "Unsupported type of constraint.");
+ "Unsupported type of constraint: "
+ + sConstraintExpr.getClass().getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4813447/hadoop-yarn-project/hadoop \
-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/h \
adoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-res \
ourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java \
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanage \
r/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index a5460c2..5135f63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -21,7 +21,12 @@ import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality; +import \
static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; import static \
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.AbstractMap;
import java.util.Arrays;
@@ -34,6 +39,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -44,12 +50,11 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -66,9 +71,10 @@ public class TestPlacementConstraintsUtil {
private RMContext rmContext;
private static final int GB = 1024;
private ApplicationId appId1;
- private PlacementConstraint c1, c2, c3, c4;
+ private PlacementConstraint c1, c2, c3, c4, c5, c6, c7;
private Set<String> sourceTag1, sourceTag2;
- private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
+ private Map<Set<String>, PlacementConstraint> constraintMap1,
+ constraintMap2, constraintMap3, constraintMap4;
private AtomicLong requestID = new AtomicLong(0);
@Before
@@ -92,6 +98,16 @@ public class TestPlacementConstraintsUtil {
.build(targetNotIn(NODE, allocationTag("hbase-m")));
c4 = PlacementConstraints
.build(targetNotIn(RACK, allocationTag("hbase-rs")));
+ c5 = PlacementConstraints
+ .build(and(targetNotIn(NODE, allocationTag("hbase-m")),
+ maxCardinality(NODE, 3, "spark")));
+ c6 = PlacementConstraints
+ .build(or(targetIn(NODE, allocationTag("hbase-m")),
+ targetIn(NODE, allocationTag("hbase-rs"))));
+ c7 = PlacementConstraints
+ .build(or(targetIn(NODE, allocationTag("hbase-m")),
+ and(targetIn(NODE, allocationTag("hbase-rs")),
+ targetIn(NODE, allocationTag("spark")))));
sourceTag1 = new HashSet<>(Arrays.asList("spark"));
sourceTag2 = new HashSet<>(Arrays.asList("zk"));
@@ -106,6 +122,15 @@ public class TestPlacementConstraintsUtil {
new AbstractMap.SimpleEntry<>(sourceTag2, c4))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
+ constraintMap3 = Stream
+ .of(new AbstractMap.SimpleEntry<>(sourceTag1, c5))
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+ AbstractMap.SimpleEntry::getValue));
+ constraintMap4 = Stream
+ .of(new AbstractMap.SimpleEntry<>(sourceTag1, c6),
+ new AbstractMap.SimpleEntry<>(sourceTag2, c7))
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+ AbstractMap.SimpleEntry::getValue));
}
private SchedulingRequest createSchedulingRequest(Set<String> allocationTags,
@@ -124,6 +149,20 @@ public class TestPlacementConstraintsUtil {
return createSchedulingRequest(allocationTags, null);
}
+ private ContainerId newContainerId(ApplicationId appId) {
+ return ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(appId, 0), 0);
+ }
+
+ private SchedulerNode newSchedulerNode(String hostname, String rackName,
+ NodeId nodeId) {
+ SchedulerNode node = mock(SchedulerNode.class);
+ when(node.getNodeName()).thenReturn(hostname);
+ when(node.getRackName()).thenReturn(rackName);
+ when(node.getNodeID()).thenReturn(nodeId);
+ return node;
+ }
+
@Test
public void testNodeAffinityAssignment()
throws InvalidAllocationTagsQueryException {
@@ -137,8 +176,9 @@ public class TestPlacementConstraintsUtil {
Iterator<RMNode> nodeIterator = rmNodes.iterator();
while (nodeIterator.hasNext()) {
RMNode currentNode = nodeIterator.next();
- FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
- currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
+ SchedulerNode schedulerNode =newSchedulerNode(currentNode.getHostName(),
+ currentNode.getRackName(), currentNode.getNodeID());
+
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@@ -153,14 +193,15 @@ public class TestPlacementConstraintsUtil {
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
- FiCaSchedulerNode schedulerNode0 = TestUtils
- .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode1 = TestUtils
- .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode2 = TestUtils
- .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode3 = TestUtils
- .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+ n0_r1.getRackName(), n0_r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+ n1_r1.getRackName(), n1_r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+ n2_r2.getRackName(), n2_r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+ n3_r2.getRackName(), n3_r2.getNodeID());
+
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@@ -200,14 +241,15 @@ public class TestPlacementConstraintsUtil {
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
- FiCaSchedulerNode schedulerNode0 = TestUtils
- .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode1 = TestUtils
- .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode2 = TestUtils
- .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode3 = TestUtils
- .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+ n0_r1.getRackName(), n0_r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+ n1_r1.getRackName(), n1_r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+ n2_r2.getRackName(), n2_r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+ n3_r2.getRackName(), n3_r2.getNodeID());
+
// 'zk' placement on Rack1 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
@@ -238,14 +280,16 @@ public class TestPlacementConstraintsUtil {
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
- FiCaSchedulerNode schedulerNode0 = TestUtils
- .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode1 = TestUtils
- .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode2 = TestUtils
- .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode3 = TestUtils
- .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+ n0_r1.getRackName(), n0_r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+ n1_r1.getRackName(), n1_r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+ n2_r2.getRackName(), n2_r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+ n3_r2.getRackName(), n3_r2.getNodeID());
+
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@@ -285,14 +329,14 @@ public class TestPlacementConstraintsUtil {
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
- FiCaSchedulerNode schedulerNode0 = TestUtils
- .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode1 = TestUtils
- .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode2 = TestUtils
- .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
- FiCaSchedulerNode schedulerNode3 = TestUtils
- .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
+ n0_r1.getRackName(), n0_r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
+ n1_r1.getRackName(), n1_r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
+ n2_r2.getRackName(), n2_r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
+ n3_r2.getRackName(), n3_r2.getNodeID());
// 'zk' placement on Rack1 should FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@@ -306,4 +350,162 @@ public class TestPlacementConstraintsUtil {
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
+
+ @Test
+ public void testORConstraintAssignment()
+ throws InvalidAllocationTagsQueryException {
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ // Register App1 with anti-affinity constraint map.
+ pcm.registerApplication(appId1, constraintMap4);
+ RMNode n0r1 = rmNodes.get(0);
+ RMNode n1r1 = rmNodes.get(1);
+ RMNode n2r2 = rmNodes.get(2);
+ RMNode n3r2 = rmNodes.get(3);
+
+ /**
+ * Place container:
+ * n0: hbase-m(1)
+ * n1: ""
+ * n2: hbase-rs(1)
+ * n3: ""
+ */
+ tm.addContainer(n0r1.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("hbase-m"));
+ tm.addContainer(n2r2.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("hbase-rs"));
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+ .get("hbase-m").longValue());
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+ .get("hbase-rs").longValue());
+
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+ n0r1.getRackName(), n0r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+ n1r1.getRackName(), n1r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+ n2r2.getRackName(), n2r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+ n3r2.getRackName(), n3r2.getNodeID());
+
+ // n0 and n2 should be qualified for allocation as
+ // they either have hbase-m or hbase-rs tag
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+ /**
+ * Place container:
+ * n0: hbase-m(1)
+ * n1: ""
+ * n2: hbase-rs(1)
+ * n3: hbase-rs(1)
+ */
+ tm.addContainer(n3r2.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("hbase-rs"));
+ // n3 is qualified now because it is allocated with hbase-rs tag
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+ /**
+ * Place container:
+ * n0: hbase-m(1)
+ * n1: ""
+ * n2: hbase-rs(1), spark(1)
+ * n3: hbase-rs(1)
+ */
+ // Place
+ tm.addContainer(n2r2.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("spark"));
+ // According to constraint, "zk" is allowed to be placed on a node
+ // has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
+ }
+
+ @Test
+ public void testANDConstraintAssignment()
+ throws InvalidAllocationTagsQueryException {
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ // Register App1 with anti-affinity constraint map.
+ pcm.registerApplication(appId1, constraintMap3);
+ RMNode n0r1 = rmNodes.get(0);
+ RMNode n1r1 = rmNodes.get(1);
+ RMNode n2r2 = rmNodes.get(2);
+ RMNode n3r2 = rmNodes.get(3);
+
+ /**
+ * Place container:
+ * n0: hbase-m(1)
+ * n1: ""
+ * n2: hbase-m(1)
+ * n3: ""
+ */
+ tm.addContainer(n0r1.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("hbase-m"));
+ tm.addContainer(n2r2.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("hbase-m"));
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+ .get("hbase-m").longValue());
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+ .get("hbase-m").longValue());
+
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+ n0r1.getRackName(), n0r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+ n1r1.getRackName(), n1r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+ n2r2.getRackName(), n2r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+ n3r2.getRackName(), n3r2.getNodeID());
+
+ // Anti-affinity with hbase-m so it should not be able to be placed
+ // onto n0 and n2 as they already have hbase-m allocated.
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+
+ /**
+ * Place container:
+ * n0: hbase-m(1)
+ * n1: spark(3)
+ * n2: hbase-m(1)
+ * n3: ""
+ */
+ for (int i=0; i<4; i++) {
+ tm.addContainer(n1r1.getNodeID(),
+ newContainerId(appId1), ImmutableSet.of("spark"));
+ }
+ Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
+ .get("spark").longValue());
+
+ // Violate cardinality constraint
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4813447/hadoop-yarn-project/hadoop \
-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/h \
adoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-res \
ourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java \
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanage \
r/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 698c17b..a530230 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcema \
nager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -60,6 +60,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
@@ -142,7 +143,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
- waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 4);
Assert.assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -195,7 +197,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
- waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -244,7 +247,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
- waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 8);
Assert.assertEquals(8, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
@@ -294,7 +298,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
- waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@@ -347,7 +352,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
- waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 6);
Assert.assertEquals(6, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
@@ -584,7 +590,7 @@ public class TestPlacementProcessor {
// Ensure unique nodes
Assert.assertEquals(4, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
- Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
+ Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@@ -592,9 +598,145 @@ public class TestPlacementProcessor {
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
}
+ @Test(timeout = 300000)
+ public void testAndOrPlacement() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 40960, 100,
+ rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 40960, 100,
+ rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 40960, 100,
+ rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 40960, 100,
+ rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+
+ // Register app1 with following constraints
+ // 1) foo anti-affinity with foo on node
+ // 2) bar anti-affinity with foo on node AND maxCardinality = 2
+ // 3) moo affinity with foo OR bar
+ Map<Set<String>, PlacementConstraint> app1Constraints = new HashMap<>();
+ app1Constraints.put(Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+ app1Constraints.put(Collections.singleton("bar"),
+ PlacementConstraints.build(
+ PlacementConstraints.and(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")),
+ PlacementConstraints.maxCardinality(NODE, 2, "bar"))));
+ app1Constraints.put(Collections.singleton("moo"),
+ PlacementConstraints.build(
+ PlacementConstraints.or(
+ PlacementConstraints.targetIn(NODE, allocationTag("foo")),
+ PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, app1Constraints);
+
+ // Allocates 3 foo containers on 3 different nodes,
+ // in anti-affinity fashion.
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo")
+ ));
+ List<Container> allocatedContainers = new ArrayList<>();
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 3);
+ printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+ Assert.assertEquals(3, allocatedContainers.size());
+
+ /** Testing AND placement constraint**/
+ // Now allocates a bar container, as restricted by the AND constraint,
+ // bar could be only allocated to the node without foo
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "bar")
+ ));
+ allocatedContainers.clear();
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 1);
+ printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+ Assert.assertEquals(1, allocatedContainers.size());
+ NodeId barNode = allocatedContainers.get(0).getNodeId();
+
+ // Sends another 3 bar request, 2 of them can be allocated
+ // as maxCardinality is 2, for placed containers, they should be all
+ // on the node where the last bar was placed.
+ allocatedContainers.clear();
+ List<RejectedSchedulingRequest> rejectedContainers = new ArrayList<>();
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 2, 1, 512, "bar"),
+ schedulingRequest(1, 3, 1, 512, "bar"),
+ schedulingRequest(1, 4, 1, 512, "bar")
+ ));
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, rejectedContainers, 2);
+ printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
+ Assert.assertEquals(2, allocatedContainers.size());
+ Assert.assertTrue(allocatedContainers.stream().allMatch(
+ container -> container.getNodeId().equals(barNode)));
+
+ // The third request could not be satisfied because it violates
+ // the cardinality constraint. Validate rejected request correctly
+ // capture this.
+ Assert.assertEquals(1, rejectedContainers.size());
+ Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
+ rejectedContainers.get(0).getReason());
+
+ /** Testing OR placement constraint**/
+ // Register one more NM for testing
+ MockNM nm5 = new MockNM("h5:1234", 4096, 100,
+ rm.getResourceTrackerService());
+ nodes.put(nm5.getNodeId(), nm5);
+ nm5.registerNode();
+ nm5.nodeHeartbeat(true);
+
+ List<SchedulingRequest> mooRequests = new ArrayList<>();
+ for (int i=5; i<25; i++) {
+ mooRequests.add(schedulingRequest(1, i, 1, 100, "moo"));
+ }
+ am1.addSchedulingRequest(mooRequests);
+ allocatedContainers.clear();
+ waitForContainerAllocation(nodes.values(), am1,
+ allocatedContainers, new ArrayList<>(), 20);
+
+ // All 20 containers should be allocated onto nodes besides nm5,
+ // because moo affinity to foo or bar which only exists on rest of nodes.
+ Assert.assertEquals(20, allocatedContainers.size());
+ for (Container mooContainer : allocatedContainers) {
+ // nm5 has no moo allocated containers.
+ Assert.assertFalse(mooContainer.getNodeId().equals(nm5.getNodeId()));
+ }
+ }
+
+ private static void printTags(Collection<MockNM> nodes,
+ AllocationTagsManager atm){
+ for (MockNM nm : nodes) {
+ Map<String, Long> nmTags = atm
+ .getAllocationTagsWithCount(nm.getNodeId());
+ StringBuffer sb = new StringBuffer();
+ if (nmTags != null) {
+ nmTags.forEach((tag, count) ->
+ sb.append(tag + "(" + count + "),"));
+ LOG.info("nm_" + nm.getNodeId() + ": " + sb.toString());
+ }
+ }
+ }
+
private static void waitForContainerAllocation(Collection<MockNM> nodes,
- MockAM am, List<Container> allocatedContainers, int containerNum)
- throws Exception {
+ MockAM am, List<Container> allocatedContainers,
+ List<RejectedSchedulingRequest> rejectedRequests,
+ int containerNum) throws Exception {
int attemptCount = 10;
while (allocatedContainers.size() < containerNum && attemptCount > 0) {
for (MockNM node : nodes) {
@@ -605,6 +747,7 @@ public class TestPlacementProcessor {
sleep(1000);
AllocateResponse allocResponse = am.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedRequests.addAll(allocResponse.getRejectedSchedulingRequests());
attemptCount--;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic