[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: svn commit: r1187126 - in /incubator/flume/branches/flume-728/flume-ng-core/src:
From: esammer () apache ! org
Date: 2011-10-20 23:03:12
Message-ID: 20111020230313.1017823888CD () eris ! apache ! org
[Download RAW message or body]
Author: esammer
Date: Thu Oct 20 23:03:12 2011
New Revision: 1187126
URL: http://svn.apache.org/viewvc?rev=1187126&view=rev
Log:
FLUME-803: support re-entrant transaction for memory channel
- Modified configure() and tests of memory channel to deal with strings
rather than ints so all config providers work at runtime.
(Prasad Mujumdar via E. Sammer)
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src \
/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1187126&r1=1187125&r2=1187126&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java \
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java \
Thu Oct 20 23:03:12 2011 @@ -81,6 +81,7 @@ public class MemoryChannel implements Ch
private LinkedList<StampedEvent> undoTakeList;
private LinkedList<StampedEvent> undoPutList;
private TransactionState txnState;
+ private int refCount;
public MemTransaction() {
txnState = TransactionState.Closed;
@@ -93,6 +94,9 @@ public class MemoryChannel implements Ch
* set transaction state to Started
*/
public void begin() {
+ if (++refCount > 1) {
+ return;
+ }
undoTakeList = new LinkedList<StampedEvent>();
undoPutList = new LinkedList<StampedEvent>();
putStamp = 0;
@@ -109,6 +113,10 @@ public class MemoryChannel implements Ch
public void commit() {
Preconditions.checkArgument(txnState == TransactionState.Started,
"transaction not started");
+ if (--refCount > 0) {
+ return;
+ }
+
// if the txn put any events, then update the channel's stamp and
// signal for availability of committed data in the queue
if (putStamp != 0) {
@@ -134,6 +142,7 @@ public class MemoryChannel implements Ch
undoPut(this);
undoTake(this);
txnState = TransactionState.RolledBack;
+ refCount = 0;
}
@Override
@@ -150,6 +159,10 @@ public class MemoryChannel implements Ch
forgetTransaction(this);
}
+ public TransactionState getState() {
+ return txnState;
+ }
+
protected int lastTakeStamp() {
return takeStamp;
}
@@ -215,10 +228,12 @@ public class MemoryChannel implements Ch
capacity = Integer.parseInt(strCapacity);
}
- keepAlive = context.get("keep-alive", Integer.class);
+ String strKeepAlive = context.get("keep-alive", String.class);
- if (keepAlive == null) {
+ if (strKeepAlive == null) {
keepAlive = defaultKeepAlive;
+ } else {
+ keepAlive = Integer.parseInt(strKeepAlive);
}
queue = new LinkedBlockingDeque<StampedEvent>(capacity);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src \
/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1187126&r1=1187125&r2=1187126&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java \
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java \
Thu Oct 20 23:03:12 2011 @@ -5,6 +5,8 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
+import org.apache.flume.Transaction.TransactionState;
+import org.apache.flume.channel.MemoryChannel.MemTransaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
@@ -12,6 +14,7 @@ import org.junit.Before;
import org.junit.Test;
public class TestMemoryChannelTransaction {
+
private Channel channel;
@Before
@@ -21,33 +24,152 @@ public class TestMemoryChannelTransactio
@Test
public void testCommit() throws InterruptedException, EventDeliveryException {
-
+
+ Event event, event2;
+ Context context = new Context();
+ int putCounter = 0;
+
+ context.put("keep-alive", "1");
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ transaction = channel.getTransaction();
+ transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ // System.out.println(event2.toString());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.commit();
+ transaction.close();
+ }
+
+ @Test
+ public void testRollBack() throws InterruptedException,
+ EventDeliveryException {
+
Event event, event2;
Context context = new Context();
int putCounter = 0;
+ context.put("keep-alive", "1");
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
Assert.assertNotNull(transaction);
+ // add events and rollback txn
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.rollback();
+ transaction.close();
+
+ // verify that no events are stored due to rollback
+ transaction = channel.getTransaction();
+ transaction.begin();
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+ transaction.commit();
+ transaction.close();
+
+ // add events and commit
+ transaction = channel.getTransaction();
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ // verify events are there, then rollback the take
+ transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.rollback();
+ transaction.close();
+
+ // verify that the events were left in there due to rollback
+ transaction = channel.getTransaction();
transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.rollback();
+ transaction.close();
+ }
+
+ @Test
+ public void testReEntTxn() throws InterruptedException,
+ EventDeliveryException {
+
+ Event event, event2;
+ Context context = new Context();
+ int putCounter = 0;
+
+ context.put("keep-alive", "1");
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ transaction.begin(); // first begin
for (putCounter = 0; putCounter < 10; putCounter++) {
+ transaction.begin(); // inner begin
event = EventBuilder.withBody(("test event" + putCounter).getBytes());
channel.put(event);
+ transaction.commit(); // inner commit
+ Assert.assertEquals(((MemTransaction) transaction).getState(),
+ TransactionState.Started);
}
transaction.commit();
+ Assert.assertEquals(((MemTransaction) transaction).getState(),
+ TransactionState.Committed);
transaction.close();
transaction = channel.getTransaction();
Assert.assertNotNull(transaction);
transaction.begin();
- for (int i = 0; i < 10; i++ ) {
+ for (int i = 0; i < 10; i++) {
event2 = channel.take();
Assert.assertNotNull("lost an event", event2);
Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
-// System.out.println(event2.toString());
+ // System.out.println(event2.toString());
}
event2 = channel.take();
Assert.assertNull("extra event found", event2);
@@ -56,12 +178,14 @@ public class TestMemoryChannelTransactio
transaction.close();
}
- public void testRollBack() throws InterruptedException, EventDeliveryException {
-
+ @Test
+ public void testReEntTxnRollBack() throws InterruptedException,
+ EventDeliveryException {
Event event, event2;
Context context = new Context();
int putCounter = 0;
+ context.put("keep-alive", "1");
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
@@ -77,6 +201,7 @@ public class TestMemoryChannelTransactio
transaction.close();
// verify that no events are stored due to rollback
+ transaction = channel.getTransaction();
transaction.begin();
event2 = channel.take();
Assert.assertNull("extra event found", event2);
@@ -84,6 +209,7 @@ public class TestMemoryChannelTransactio
transaction.close();
// add events and commit
+ transaction = channel.getTransaction();
transaction.begin();
for (putCounter = 0; putCounter < 10; putCounter++) {
event = EventBuilder.withBody(("test event" + putCounter).getBytes());
@@ -91,27 +217,33 @@ public class TestMemoryChannelTransactio
}
transaction.commit();
transaction.close();
-
+
transaction = channel.getTransaction();
Assert.assertNotNull(transaction);
// verify events are there, then rollback the take
transaction.begin();
- for (int i = 0; i < 10; i++ ) {
+ for (int i = 0; i < 10; i++) {
+ transaction.begin(); // inner begin
event2 = channel.take();
Assert.assertNotNull("lost an event", event2);
Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ transaction.commit(); // inner commit
+ Assert.assertEquals(((MemTransaction) transaction).getState(),
+ TransactionState.Started);
}
event2 = channel.take();
Assert.assertNull("extra event found", event2);
transaction.rollback();
+ Assert.assertEquals(((MemTransaction) transaction).getState(),
+ TransactionState.RolledBack);
transaction.close();
-
// verify that the events were left in there due to rollback
+ transaction = channel.getTransaction();
transaction.begin();
- for (int i = 0; i < 10; i++ ) {
+ for (int i = 0; i < 10; i++) {
event2 = channel.take();
Assert.assertNotNull("lost an event", event2);
Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic