[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    svn commit: r1187126 - in /incubator/flume/branches/flume-728/flume-ng-core/src:
From:       esammer () apache ! org
Date:       2011-10-20 23:03:12
Message-ID: 20111020230313.1017823888CD () eris ! apache ! org
[Download RAW message or body]

Author: esammer
Date: Thu Oct 20 23:03:12 2011
New Revision: 1187126

URL: http://svn.apache.org/viewvc?rev=1187126&view=rev
Log:
FLUME-803: support re-entrant transaction for memory channel

- Modified configure() and tests of memory channel to deal with strings
  rather than ints so all config providers work at runtime.

(Prasad Mujumdar via E. Sammer)

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
  incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java


Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
                
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src \
/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1187126&r1=1187125&r2=1187126&view=diff
 ==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java \
                (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java \
Thu Oct 20 23:03:12 2011 @@ -81,6 +81,7 @@ public class MemoryChannel implements Ch
     private LinkedList<StampedEvent> undoTakeList;
     private LinkedList<StampedEvent> undoPutList;
     private TransactionState txnState;
+    private int refCount;
 
     public MemTransaction() {
       txnState = TransactionState.Closed;
@@ -93,6 +94,9 @@ public class MemoryChannel implements Ch
      *  set transaction state to Started
      */
     public void begin() {
+      if (++refCount > 1) {
+        return;
+      }
       undoTakeList = new LinkedList<StampedEvent>();
       undoPutList = new LinkedList<StampedEvent>();
       putStamp = 0;
@@ -109,6 +113,10 @@ public class MemoryChannel implements Ch
     public void commit() {
       Preconditions.checkArgument(txnState == TransactionState.Started,
           "transaction not started");
+      if (--refCount > 0) {
+        return;
+      }
+
       // if the txn put any events, then update the channel's stamp and
       // signal for availability of committed data in the queue
       if (putStamp != 0) {
@@ -134,6 +142,7 @@ public class MemoryChannel implements Ch
       undoPut(this);
       undoTake(this);
       txnState = TransactionState.RolledBack;
+      refCount = 0;
     }
 
     @Override
@@ -150,6 +159,10 @@ public class MemoryChannel implements Ch
       forgetTransaction(this);
     }
 
+    public TransactionState getState() {
+      return txnState;
+    }
+
     protected int lastTakeStamp() {
       return takeStamp;
     }
@@ -215,10 +228,12 @@ public class MemoryChannel implements Ch
       capacity = Integer.parseInt(strCapacity);
     }
 
-    keepAlive = context.get("keep-alive", Integer.class);
+    String strKeepAlive = context.get("keep-alive", String.class);
 
-    if (keepAlive == null) {
+    if (strKeepAlive == null) {
       keepAlive = defaultKeepAlive;
+    } else {
+      keepAlive = Integer.parseInt(strKeepAlive);
     }
 
     queue = new LinkedBlockingDeque<StampedEvent>(capacity);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
                
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src \
/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1187126&r1=1187125&r2=1187126&view=diff
 ==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java \
                (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java \
Thu Oct 20 23:03:12 2011 @@ -5,6 +5,8 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.Transaction.TransactionState;
+import org.apache.flume.channel.MemoryChannel.MemTransaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
@@ -12,6 +14,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestMemoryChannelTransaction {
+
   private Channel channel;
 
   @Before
@@ -21,33 +24,152 @@ public class TestMemoryChannelTransactio
 
   @Test
   public void testCommit() throws InterruptedException, EventDeliveryException {
-    
+
+    Event event, event2;
+    Context context = new Context();
+    int putCounter = 0;
+
+    context.put("keep-alive", "1");
+    Configurables.configure(channel, context);
+
+    Transaction transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    transaction = channel.getTransaction();
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+      // System.out.println(event2.toString());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.commit();
+    transaction.close();
+  }
+
+  @Test
+  public void testRollBack() throws InterruptedException,
+      EventDeliveryException {
+
     Event event, event2;
     Context context = new Context();
     int putCounter = 0;
 
+    context.put("keep-alive", "1");
     Configurables.configure(channel, context);
 
     Transaction transaction = channel.getTransaction();
     Assert.assertNotNull(transaction);
 
+    // add events and rollback txn
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.rollback();
+    transaction.close();
+
+    // verify that no events are stored due to rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+    transaction.commit();
+    transaction.close();
+
+    // add events and commit
+    transaction = channel.getTransaction();
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    // verify events are there, then rollback the take
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.rollback();
+    transaction.close();
+
+    // verify that the events were left in there due to rollback
+    transaction = channel.getTransaction();
     transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.rollback();
+    transaction.close();
+  }
+
+  @Test
+  public void testReEntTxn() throws InterruptedException,
+      EventDeliveryException {
+
+    Event event, event2;
+    Context context = new Context();
+    int putCounter = 0;
+
+    context.put("keep-alive", "1");
+    Configurables.configure(channel, context);
+
+    Transaction transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    transaction.begin(); // first begin
     for (putCounter = 0; putCounter < 10; putCounter++) {
+      transaction.begin(); // inner begin
       event = EventBuilder.withBody(("test event" + putCounter).getBytes());
       channel.put(event);
+      transaction.commit(); // inner commit
+      Assert.assertEquals(((MemTransaction) transaction).getState(),
+          TransactionState.Started);
     }
     transaction.commit();
+    Assert.assertEquals(((MemTransaction) transaction).getState(),
+        TransactionState.Committed);
     transaction.close();
 
     transaction = channel.getTransaction();
     Assert.assertNotNull(transaction);
 
     transaction.begin();
-    for (int i = 0; i < 10; i++ ) {
+    for (int i = 0; i < 10; i++) {
       event2 = channel.take();
       Assert.assertNotNull("lost an event", event2);
       Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
-//      System.out.println(event2.toString());
+      // System.out.println(event2.toString());
     }
     event2 = channel.take();
     Assert.assertNull("extra event found", event2);
@@ -56,12 +178,14 @@ public class TestMemoryChannelTransactio
     transaction.close();
   }
 
-  public void testRollBack() throws InterruptedException, EventDeliveryException {
-    
+  @Test
+  public void testReEntTxnRollBack() throws InterruptedException,
+      EventDeliveryException {
     Event event, event2;
     Context context = new Context();
     int putCounter = 0;
 
+    context.put("keep-alive", "1");
     Configurables.configure(channel, context);
 
     Transaction transaction = channel.getTransaction();
@@ -77,6 +201,7 @@ public class TestMemoryChannelTransactio
     transaction.close();
 
     // verify that no events are stored due to rollback
+    transaction = channel.getTransaction();
     transaction.begin();
     event2 = channel.take();
     Assert.assertNull("extra event found", event2);
@@ -84,6 +209,7 @@ public class TestMemoryChannelTransactio
     transaction.close();
 
     // add events and commit
+    transaction = channel.getTransaction();
     transaction.begin();
     for (putCounter = 0; putCounter < 10; putCounter++) {
       event = EventBuilder.withBody(("test event" + putCounter).getBytes());
@@ -91,27 +217,33 @@ public class TestMemoryChannelTransactio
     }
     transaction.commit();
     transaction.close();
-    
+
     transaction = channel.getTransaction();
     Assert.assertNotNull(transaction);
 
     // verify events are there, then rollback the take
     transaction.begin();
-    for (int i = 0; i < 10; i++ ) {
+    for (int i = 0; i < 10; i++) {
+      transaction.begin(); // inner begin
       event2 = channel.take();
       Assert.assertNotNull("lost an event", event2);
       Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+      transaction.commit(); // inner commit
+      Assert.assertEquals(((MemTransaction) transaction).getState(),
+          TransactionState.Started);
     }
     event2 = channel.take();
     Assert.assertNull("extra event found", event2);
 
     transaction.rollback();
+    Assert.assertEquals(((MemTransaction) transaction).getState(),
+        TransactionState.RolledBack);
     transaction.close();
-    
 
     // verify that the events were left in there due to rollback
+    transaction = channel.getTransaction();
     transaction.begin();
-    for (int i = 0; i < 10; i++ ) {
+    for (int i = 0; i < 10; i++) {
       event2 = channel.take();
       Assert.assertNotNull("lost an event", event2);
       Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic