[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-user
Subject:    Re: Flume && Kafka Integration
From:       "Mangtani, Kushal" <Kushal.Mangtani () viasat ! com>
Date:       2015-03-20 7:44:28
Message-ID: 48F5CF66-DA80-4A5F-B00A-9B9EE8A0074D () viasat ! com
[Download RAW message or body]

Bumping up the thread*

Hari - Did u get a chance to look into this issue ?

-Kushal

On Mar 17, 2015, at 2:51 PM, Mangtani, Kushal \
<Kushal.Mangtani@viasat.com<mailto:Kushal.Mangtani@viasat.com>> wrote:

flume.conf (Cleaned up for security and making it less verbose by removing additional \
sources,channel,sinks)

# Name the components on this agent
collector.sources = Source1
collector.channels = HdfsChannel KafkaChannel

collector.sinks = HdfsSink KafkaSink

# Describe/configure the source AvroSource
collector.sources.CustomSource.type = com.flume.CustomSource
collector.sources.CustomSource.channels = HdfsChannel KafkaChannel
collector.sources.CustomSource.bind = 0.0.0.0
collector.sources.CustomSource.port = 9898
collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas
collector.sources.CustomSource.selector.type = multiplexing
collector.sources.CustomSource.selector.header = recordType
# required channel mapings
collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel
# optional channel mapings
collector.sources.CustomSource.selector.optional.MyRecord  = KafkaChannel

# HdfsChannel channel config
collector.channels.HdfsChannel.type = file
collector.channels.HdfsChannel.useDualCheckpoints = true
collector.channels.HdfsChannel.checkpointDir = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint \
collector.channels.HdfsChannel.backupCheckpointDir = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint \
collector.channels.HdfsChannel.dataDirs = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/logs collector.channels.HdfsChannel.capacity \
= 1000000 collector.channels.HdfsChannel.transactionCapacity = 50000
collector.channels.HdfsChannel.write-timeout = 60
collector.channels.HdfsChannel.keep-alive = 30

# HdfsSink sink config
collector.sinks.HdfsSink.type = hdfs
collector.sinks.HdfsSink.channel = HdfsChannel
collector.sinks.HdfsSink.hdfs.fileType = DataStream
collector.sinks.HdfsSink.serializer = CUstomSerializer
collector.sinks.HdfsSink.serializer.schemaFolder = /usr/lib/flume-ng/schemas
collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000
collector.sinks.HdfsSink.serializer.compressionCodec = snappy
collector.sinks.HdfsSink.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/myrecord
collector.sinks.HdfsSink.hdfs.rollSize = 0
collector.sinks.HdfsSink.hdfs.rollInterval = 1200
collector.sinks.HdfsSink.hdfs.rollCount = 0
collector.sinks.HdfsSink.hdfs.callTimeout = 60000
collector.sinks.HdfsSink.hdfs.batchSize = 10000

# ObjectRecordKafkaChannel channel config
collector.channels.KafkaChannel.type = memory
collector.channels.KafkaChannel.capacity = 1500000
collector.channels.KafkaChannel.transactionCapacity = 50000
collector.channels.KafkaChannel.write-timeout = 60
collector.channels.KafkaChannel.keep-alive = 30

# ObjectRecordKafkaSink sink config
collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.KafkaSink.channel = KafkaChannel
collector.sinks.KafkaSink.zk.connect = \
zk-1.com<http://zk-1.com/>,zk-2.com<http://zk-2.com/>,zk-3.com<http://zk-3.com/> \
collector.sinks.KafkaSink.metadata.broker.list = \
kafka-1.com<http://kafka-1.com/>:9092,kafka-2.com<http://kafka-2.com/>:9092,kafka-3.com<http://kafka-3.com/>:9092
 collector.sinks.KafkaSink.topic = MyRecord
collector.sinks.KafkaSink.batch.num.messages = 1000
collector.sinks.KafkaSink.producer.type = async
collector.sinks.KafkaSink.request.required.acks = 0
collector.sinks.KafkaSink.serializer.class = kafka.serializer.DefaultEncoder
collector.sinks.KafkaSink.key.serializer.class = kafka.serializer.StringEncoder
collector.sinks.KafkaSink.partition.key=keyName


-Kushal

On Mar 17, 2015, at 2:31 PM, Hari Shreedharan \
<hshreedharan@cloudera.com<mailto:hshreedharan@cloudera.com>> wrote:

I have seen one other report recently with the optional mapping issues. Can you also \
send your configuration? I’d like to investigate this and figure out what the issue \
is.

Thanks,
Hari Shreedharan




On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal \
<Kushal.Mangtani@viasat.com<mailto:Kushal.Mangtani@viasat.com>> wrote:

Hello,

We are using Flume in our prod env to ingest data. A while back, we decided to extend \
the functionality and added kafka for real time monitoring. So, the Flume Source \
forks off and deposits the data into two separate channels ,one if HDFS(required \
mapping) and other is Kafka(optional mapping). We have made the KafkaChannels as \
optional selector mapping<http://flume.apache.org/releases/content/1.4.0/FlumeUserGuide.html#fan-out-flow> \
so that any issue with Kafka should not block the HDFS pipeline. However, I have \
noticed this never happens. Any issue with Kafka cluster eventually also brings down \
the HDFS ingestion. So, my question is that either Optional Channel Mapping in flume \
src code does not works correctly OR kafka-sink/kafka cluster  I am using is outdated \
? Any inputs will be appreciated.

Env:

  *   Ubuntu 12.04
  *   CDH 5 flume 1.4
  *   Kafka Src Download - 2.9.1-0.8.1.1
  *   Using Custom Flume-Kafka Sink https://github.com/baniuyao/flume-ng-kafka-sink

- Kushal


[Attachment #3 (text/html)]

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=Windows-1252">
</head>
<body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: \
after-white-space;"> <div>Bumping up the thread*</div>
<div><br>
</div>
<div>Hari - Did u get a chance to look into this issue ?</div>
<div><br>
</div>
<div>-Kushal&nbsp;</div>
<br>
<div>
<div>On Mar 17, 2015, at 2:51 PM, Mangtani, Kushal &lt;<a \
href="mailto:Kushal.Mangtani@viasat.com">Kushal.Mangtani@viasat.com</a>&gt; \
wrote:</div> <br class="Apple-interchange-newline">
<blockquote type="cite">
<div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: \
after-white-space;"> flume.conf (Cleaned up for security and making it less verbose \
by removing additional sources,channel,sinks) <div><br>
</div>
<div>
<div># Name the components on this agent</div>
<div>collector.sources = Source1</div>
<div>collector.channels = HdfsChannel KafkaChannel&nbsp;</div>
<div><br>
</div>
<div>collector.sinks = HdfsSink KafkaSink&nbsp;</div>
<div><br>
</div>
<div># Describe/configure the source AvroSource</div>
<div>collector.sources.CustomSource.type = com.flume.CustomSource</div>
<div>collector.sources.CustomSource.channels = HdfsChannel KafkaChannel&nbsp;</div>
<div>collector.sources.CustomSource.bind = 0.0.0.0</div>
<div>collector.sources.CustomSource.port = 9898</div>
<div>collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas</div>
<div>collector.sources.CustomSource.selector.type = multiplexing</div>
<div>collector.sources.CustomSource.selector.header = recordType</div>
<div># required channel mapings</div>
<div>collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel</div>
<div># optional channel mapings</div>
<div>collector.sources.CustomSource.selector.optional.MyRecord &nbsp;= \
KafkaChannel</div> <div><br>
</div>
<div># HdfsChannel channel config</div>
<div>collector.channels.HdfsChannel.type = file</div>
<div>collector.channels.HdfsChannel.useDualCheckpoints = true</div>
<div>collector.channels.HdfsChannel.checkpointDir = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint</div> \
<div>collector.channels.HdfsChannel.backupCheckpointDir = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint</div> \
<div>collector.channels.HdfsChannel.dataDirs = \
/mnt/persistent/0/flume-ng-data/Hdfsdata/logs</div> \
<div>collector.channels.HdfsChannel.capacity = 1000000</div> \
<div>collector.channels.HdfsChannel.transactionCapacity = 50000</div> \
<div>collector.channels.HdfsChannel.write-timeout = 60</div> \
<div>collector.channels.HdfsChannel.keep-alive = 30</div> <div><br>
</div>
<div># HdfsSink sink config</div>
<div>collector.sinks.HdfsSink.type = hdfs</div>
<div>collector.sinks.HdfsSink.channel = HdfsChannel</div>
<div>collector.sinks.HdfsSink.hdfs.fileType = DataStream</div>
<div>collector.sinks.HdfsSink.serializer = CUstomSerializer</div>
<div>collector.sinks.HdfsSink.serializer.schemaFolder = \
/usr/lib/flume-ng/schemas</div> \
<div>collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000</div> \
<div>collector.sinks.HdfsSink.serializer.compressionCodec = snappy</div> \
<div>collector.sinks.HdfsSink.hdfs.path = <a \
href="hdfs://namenode/data/%Y/%m/%d/%H00/myrecord"> \
hdfs://namenode/data/%Y/%m/%d/%H00/myrecord</a></div> \
<div>collector.sinks.HdfsSink.hdfs.rollSize = 0</div> \
<div>collector.sinks.HdfsSink.hdfs.rollInterval = 1200</div> \
<div>collector.sinks.HdfsSink.hdfs.rollCount = 0</div> \
<div>collector.sinks.HdfsSink.hdfs.callTimeout = 60000</div> \
<div>collector.sinks.HdfsSink.hdfs.batchSize = 10000</div> <div><br>
</div>
<div># ObjectRecordKafkaChannel channel config</div>
<div>collector.channels.KafkaChannel.type = memory</div>
<div>collector.channels.KafkaChannel.capacity = 1500000</div>
<div>collector.channels.KafkaChannel.transactionCapacity = 50000</div>
<div>collector.channels.KafkaChannel.write-timeout = 60</div>
<div>collector.channels.KafkaChannel.keep-alive = 30</div>
<div><br>
</div>
<div># ObjectRecordKafkaSink sink config</div>
<div>collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink</div>
<div>collector.sinks.KafkaSink.channel = KafkaChannel</div>
<div>collector.sinks.KafkaSink.zk.connect = <a \
href="http://zk-1.com/">zk-1.com</a>,<a href="http://zk-2.com/">zk-2.com</a>,<a \
href="http://zk-3.com/">zk-3.com</a></div> \
<div>collector.sinks.KafkaSink.metadata.broker.list = <a href="http://kafka-1.com/"> \
kafka-1.com</a>:9092,<a href="http://kafka-2.com/">kafka-2.com</a>:9092,<a \
href="http://kafka-3.com/">kafka-3.com</a>:9092</div> \
<div>collector.sinks.KafkaSink.topic = MyRecord</div> \
<div>collector.sinks.KafkaSink.batch.num.messages = 1000</div> \
<div>collector.sinks.KafkaSink.producer.type = async</div> \
<div>collector.sinks.KafkaSink.request.required.acks = 0</div> \
<div>collector.sinks.KafkaSink.serializer.class = \
kafka.serializer.DefaultEncoder</div> \
<div>collector.sinks.KafkaSink.key.serializer.class = \
kafka.serializer.StringEncoder</div> \
<div>collector.sinks.KafkaSink.partition.key=keyName</div> <div><br>
</div>
<div><br>
</div>
<div>-Kushal&nbsp;</div>
<div><br>
</div>
<div>
<div>On Mar 17, 2015, at 2:31 PM, Hari Shreedharan &lt;<a \
href="mailto:hshreedharan@cloudera.com">hshreedharan@cloudera.com</a>&gt; \
wrote:</div> <br class="Apple-interchange-newline">
<blockquote type="cite">
<div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: \
after-white-space;" class=""> I have seen one other report recently with the optional \
mapping issues. Can you also send your configuration? I’d like to investigate this \
and figure out what the issue is.<br class=""> <div class="">
<div style="font-family: Helvetica; font-size: 12px; font-style: normal; \
font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: \
normal; orphans: auto; text-align: start; text-indent: 0px; text-transform: none; \
white-space: normal; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; \
word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: \
after-white-space;" class=""> <div class=""><br class="Apple-interchange-newline">
Thanks,</div>
<div class="">Hari Shreedharan</div>
<div class=""><br class="">
</div>
</div>
<br class="Apple-interchange-newline" style="font-family: Helvetica; font-size: 12px; \
font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: \
normal; line-height: normal; orphans: auto; text-align: start; text-indent: 0px; \
text-transform: none; white-space: normal; widows: auto; word-spacing: 0px; \
-webkit-text-stroke-width: 0px;"> <br class="Apple-interchange-newline">
</div>
<br class="">
<div>
<blockquote type="cite" class="">
<div class="">On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal &lt;<a \
href="mailto:Kushal.Mangtani@viasat.com" class="">Kushal.Mangtani@viasat.com</a>&gt; \
wrote:</div> <br class="Apple-interchange-newline">
<div class="">
<div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: \
after-white-space;" class=""> Hello,
<div class=""><br class="">
</div>
<div class="">We are using Flume in our prod env to ingest data. A while back, we \
decided to extend the functionality and added kafka for real time monitoring.</div> \
<div class="">So, the Flume Source forks off and deposits the data into two separate \
channels ,one if HDFS(required mapping) and other is Kafka(optional mapping). We have \
made the KafkaChannels as&nbsp;<a \
href="http://flume.apache.org/releases/content/1.4.0/FlumeUserGuide.html#fan-out-flow" \
class="">optional  selector mapping</a>&nbsp;so that any issue with Kafka should not \
block the HDFS pipeline.</div> <div class="">However, I have noticed this never \
happens. Any issue with Kafka cluster eventually also brings down the HDFS ingestion. \
So, my question is that either Optional Channel Mapping in flume src code does not \
works correctly OR kafka-sink/kafka cluster  &nbsp;I am using is outdated ? Any \
inputs will be appreciated.</div> <div class=""><br class="">
</div>
<div class="">Env:</div>
<div class="">
<ul class="MailOutline">
<li class="">Ubuntu 12.04&nbsp;</li><li class="">CDH 5 flume 1.4</li><li \
class="">Kafka Src Download -&nbsp;<span style="font-family: Menlo; font-size: 11px;" \
class="">2.9.1-0.8.1.1</span></li><li class=""><span style="font-family: Menlo; \
font-size: 11px;" class="">Using Custom Flume-Kafka Sink&nbsp;</span><a \
href="https://github.com/baniuyao/flume-ng-kafka-sink" \
class="">https://github.com/baniuyao/flume-ng-kafka-sink</a></li></ul> </div>
<div class=""><br class="">
</div>
<div class="">- Kushal&nbsp;</div>
</div>
</div>
</blockquote>
</div>
<br class="">
</div>
</blockquote>
</div>
<br>
</div>
</div>
</blockquote>
</div>
<br>
</body>
</html>



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic