[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: flume git commit: Fix HttpSink bad response handling
From: bessbd () apache ! org
Date: 2017-07-12 18:58:09
Message-ID: 7e816a776218445489289d457914495e () git ! apache ! org
[Download RAW message or body]
Repository: flume
Updated Branches:
refs/heads/trunk 964bcf56a -> c570a51b3
Fix HttpSink bad response handling
After a bad response, connection.getInputStream() returns null.
This patch adds a check for this.
This closes #139
Reviewers: Bessenyei Balázs Donát
(filippovmn via Bessenyei Balázs Donát)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c570a51b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c570a51b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c570a51b
Branch: refs/heads/trunk
Commit: c570a51b3c53e4899d16dd623e19a0d939518dd2
Parents: 964bcf5
Author: filippovmn <mfil_86@mail.ru>
Authored: Wed Jul 12 18:51:26 2017 +0000
Committer: Bessenyei Balázs Donát <bessbd@apache.org>
Committed: Wed Jul 12 18:55:20 2017 +0000
----------------------------------------------------------------------
.../org/apache/flume/sink/http/HttpSink.java | 7 ++++-
.../apache/flume/sink/http/TestHttpSink.java | 17 +++++++++++
.../apache/flume/sink/http/TestHttpSinkIT.java | 32 ++++++++++++++++++++
3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java \
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
index 9637326..b9c42ed 100644
--- a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
+++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
@@ -218,7 +218,12 @@ public class HttpSink extends AbstractSink implements \
Configurable { int httpStatusCode = connection.getResponseCode();
LOG.debug("Got status code : " + httpStatusCode);
- connection.getInputStream().close();
+ if (httpStatusCode < HttpURLConnection.HTTP_BAD_REQUEST) {
+ connection.getInputStream().close();
+ } else {
+ LOG.debug("bad request");
+ connection.getErrorStream().close();
+ }
LOG.debug("Response processed and closed");
if (httpStatusCode >= HTTP_STATUS_CONTINUE) {
http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java \
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
index 16cb6e8..bee089c 100644
--- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
+++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
@@ -216,6 +216,22 @@ public class TestHttpSink {
}
@Test
+ public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws Exception {
+ when(channel.take()).thenReturn(event);
+ when(event.getBody()).thenReturn("something".getBytes());
+
+ Context context = new Context();
+ context.put("defaultRollback", "true");
+ context.put("defaultBackoff", "true");
+ context.put("defaultIncrementMetrics", "false");
+ context.put("rollback.401", "false");
+ context.put("backoff.401", "false");
+ context.put("incrementMetrics.401", "false");
+
+ executeWithMocks(true, Status.READY, false, true, context, \
HttpURLConnection.HTTP_UNAUTHORIZED); + }
+
+ @Test
public void ensureGroupConfigurationCorrectlyUsed() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
@@ -278,6 +294,7 @@ public class TestHttpSink {
when(channel.getTransaction()).thenReturn(transaction);
when(httpURLConnection.getOutputStream()).thenReturn(outputStream);
when(httpURLConnection.getInputStream()).thenReturn(inputStream);
+ when(httpURLConnection.getErrorStream()).thenReturn(inputStream);
when(httpURLConnection.getResponseCode()).thenReturn(httpStatus);
Status actualStatus = httpSink.process();
http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java \
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
index 74dcf1d..f4fde57 100644
--- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
+++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
@@ -71,6 +71,8 @@ public class TestHttpSinkIT {
httpSinkContext.put("contentTypeHeader", "application/json");
httpSinkContext.put("backoff.200", "false");
httpSinkContext.put("rollback.200", "false");
+ httpSinkContext.put("backoff.401", "false");
+ httpSinkContext.put("rollback.401", "false");
httpSinkContext.put("incrementMetrics.200", "true");
Context memoryChannelContext = new Context();
@@ -132,6 +134,36 @@ public class TestHttpSinkIT {
}
@Test
+ public void ensureEventsNotResentOn401Failure() throws Exception {
+ String errorScenario = "Error skip scenario";
+
+ service.stubFor(post(urlEqualTo("/endpoint"))
+ .inScenario(errorScenario)
+ .whenScenarioStateIs(STARTED)
+ .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST")))
+ .willReturn(aResponse().withStatus(401)
+ .withHeader("Content-Type", "text/plain")
+ .withBody("Not allowed!"))
+ .willSetStateTo("Error Sent"));
+
+ service.stubFor(post(urlEqualTo("/endpoint"))
+ .inScenario(errorScenario)
+ .whenScenarioStateIs("Error Sent")
+ .withRequestBody(equalToJson(event("NEXT EVENT")))
+ .willReturn(aResponse().withStatus(200)));
+
+ addEventToChannel(event("UNAUTHORIZED REQUEST"), Status.READY);
+ addEventToChannel(event("NEXT EVENT"), Status.READY);
+
+ service.verify(1, postRequestedFor(urlEqualTo("/endpoint"))
+ .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST"))));
+
+ service.verify(1, postRequestedFor(urlEqualTo("/endpoint"))
+ .withRequestBody(equalToJson(event("NEXT EVENT"))));
+
+ }
+
+ @Test
public void ensureEventsResentOnNetworkFailure() throws Exception {
String errorScenario = "Error Scenario";
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic