[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    flume git commit: Fix HttpSink bad response handling
From:       bessbd () apache ! org
Date:       2017-07-12 18:58:09
Message-ID: 7e816a776218445489289d457914495e () git ! apache ! org
[Download RAW message or body]

Repository: flume
Updated Branches:
  refs/heads/trunk 964bcf56a -> c570a51b3


Fix HttpSink bad response handling

After a bad response, connection.getInputStream() returns null.
This patch adds a check for this.

This closes #139

Reviewers: Bessenyei Balázs Donát

(filippovmn via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c570a51b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c570a51b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c570a51b

Branch: refs/heads/trunk
Commit: c570a51b3c53e4899d16dd623e19a0d939518dd2
Parents: 964bcf5
Author: filippovmn <mfil_86@mail.ru>
Authored: Wed Jul 12 18:51:26 2017 +0000
Committer: Bessenyei Balázs Donát <bessbd@apache.org>
Committed: Wed Jul 12 18:55:20 2017 +0000

----------------------------------------------------------------------
 .../org/apache/flume/sink/http/HttpSink.java    |  7 ++++-
 .../apache/flume/sink/http/TestHttpSink.java    | 17 +++++++++++
 .../apache/flume/sink/http/TestHttpSinkIT.java  | 32 ++++++++++++++++++++
 3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java \
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
 index 9637326..b9c42ed 100644
--- a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
                
+++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
 @@ -218,7 +218,12 @@ public class HttpSink extends AbstractSink implements \
Configurable {  int httpStatusCode = connection.getResponseCode();
           LOG.debug("Got status code : " + httpStatusCode);
 
-          connection.getInputStream().close();
+          if (httpStatusCode < HttpURLConnection.HTTP_BAD_REQUEST) {
+            connection.getInputStream().close();
+          } else {
+            LOG.debug("bad request");
+            connection.getErrorStream().close();
+          }
           LOG.debug("Response processed and closed");
 
           if (httpStatusCode >= HTTP_STATUS_CONTINUE) {

http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java \
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
 index 16cb6e8..bee089c 100644
--- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
                
+++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
 @@ -216,6 +216,22 @@ public class TestHttpSink {
   }
 
   @Test
+  public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("defaultRollback", "true");
+    context.put("defaultBackoff", "true");
+    context.put("defaultIncrementMetrics", "false");
+    context.put("rollback.401", "false");
+    context.put("backoff.401", "false");
+    context.put("incrementMetrics.401", "false");
+
+    executeWithMocks(true, Status.READY, false, true, context, \
HttpURLConnection.HTTP_UNAUTHORIZED); +  }
+
+  @Test
   public void ensureGroupConfigurationCorrectlyUsed() throws Exception {
     when(channel.take()).thenReturn(event);
     when(event.getBody()).thenReturn("something".getBytes());
@@ -278,6 +294,7 @@ public class TestHttpSink {
     when(channel.getTransaction()).thenReturn(transaction);
     when(httpURLConnection.getOutputStream()).thenReturn(outputStream);
     when(httpURLConnection.getInputStream()).thenReturn(inputStream);
+    when(httpURLConnection.getErrorStream()).thenReturn(inputStream);
     when(httpURLConnection.getResponseCode()).thenReturn(httpStatus);
 
     Status actualStatus = httpSink.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java \
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
 index 74dcf1d..f4fde57 100644
--- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
                
+++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
 @@ -71,6 +71,8 @@ public class TestHttpSinkIT {
       httpSinkContext.put("contentTypeHeader", "application/json");
       httpSinkContext.put("backoff.200", "false");
       httpSinkContext.put("rollback.200", "false");
+      httpSinkContext.put("backoff.401", "false");
+      httpSinkContext.put("rollback.401", "false");
       httpSinkContext.put("incrementMetrics.200", "true");
 
       Context memoryChannelContext = new Context();
@@ -132,6 +134,36 @@ public class TestHttpSinkIT {
   }
 
   @Test
+  public void ensureEventsNotResentOn401Failure() throws Exception {
+    String errorScenario = "Error skip scenario";
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+            .inScenario(errorScenario)
+            .whenScenarioStateIs(STARTED)
+            .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST")))
+            .willReturn(aResponse().withStatus(401)
+            .withHeader("Content-Type", "text/plain")
+            .withBody("Not allowed!"))
+            .willSetStateTo("Error Sent"));
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+            .inScenario(errorScenario)
+            .whenScenarioStateIs("Error Sent")
+            .withRequestBody(equalToJson(event("NEXT EVENT")))
+            .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("UNAUTHORIZED REQUEST"), Status.READY);
+    addEventToChannel(event("NEXT EVENT"), Status.READY);
+
+    service.verify(1, postRequestedFor(urlEqualTo("/endpoint"))
+            .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST"))));
+
+    service.verify(1, postRequestedFor(urlEqualTo("/endpoint"))
+            .withRequestBody(equalToJson(event("NEXT EVENT"))));
+
+  }
+
+  @Test
   public void ensureEventsResentOnNetworkFailure() throws Exception {
     String errorScenario = "Error Scenario";
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic