[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-user
Subject:    Re: Kafka sink deleted, brokerList null
From:       "Hari Shreedharan" <hshreedharan () cloudera ! com>
Date:       2015-03-25 19:44:58
Message-ID: 1427312698014.56845263 () Nodemailer
[Download RAW message or body]

The ones which prepend kafka in the config params are the ones that need to be passed \
to Kafka, but things like brokerList are required for Flume to connect to Kafka in \
the first place. The basic idea is that some params need to be passed to the Flume \
sink, the rest which the sink does not need to connect or write to Kafka can be \
passed using the kafka. prefix so the Kafka client can use them.




Thanks,  Hari

On Wed, Mar 25, 2015 at 12:41 PM, Adam Tannir <atannir@gmail.com> wrote:

> Thanks, Hari. Everything works as it should now.
> I had tried a few other configuration entries previously but they all had
> kafka after the sink name.
> On Wed, Mar 25, 2015 at 3:15 PM, Hari Shreedharan <hshreedharan@cloudera.com
> > wrote:
> > Set this param: a1.sinks.k1.brokerList = <list of brokers>
> > instead of a1.sinks.k1.kafka.metadata.broker.list =
> > localhost:9091,localhost:9092
> > 
> > 
> > Thanks,
> > Hari
> > 
> > On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <atannir@gmail.com> wrote:
> > 
> > > Hello,
> > > 
> > > When running flume with kafka as a sink, an error is logged that
> > > "brokerList must contain at least one Kafka broker" but the line
> > > immediately previous shows the host:port entries as were entered in the
> > > config file and stored in the context.
> > > 
> > > Everything works when I hardcode the host:port into the brokerList string
> > > and skip the failing test but that is a suboptimal solution. The kafka
> > > instances are from their quickstart guide and have no issues.
> > > 
> > > Why isn't the value being selected from the context?
> > > 
> > > 
> > > flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:
> > >  
> > > private static void addDocumentedKafkaProps(Context context,
> > > Properties kafkaProps)
> > > throws ConfigurationException {
> > > String brokerList = context.getString(KafkaSinkConstants
> > > .BROKER_LIST_FLUME_KEY);
> > > if (brokerList == null) {
> > > throw new ConfigurationException("brokerList must contain at least
> > > " +
> > > "one Kafka broker");
> > > }
> > > kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
> > > 
> > > String requiredKey = context.getString(
> > > KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
> > > 
> > > if (requiredKey != null ) {
> > > kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
> > > }
> > > }
> > > 
> > > 
> > > 
> > > Config:
> > > 
> > > # Describe the sink
> > > a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
> > > a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
> > > a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
> > > a1.sinks.k1.topic = test
> > > 
> > > logs/flume.log
> > > 
> > > 25 Mar 2015 14:28:52,598 INFO  [conf-file-poller-0]
> > > (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  -
> > > context={ parameters:{topic=test,
> > > kafka.metadata.broker.list=localhost:9091,localhost:9092,
> > > kafka.zookeeper.connect=localhost:2181,
> > > type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
> > > 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
> > > (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)  - Sink
> > > k1 has been removed due to an error during configuration
> > > org.apache.flume.conf.ConfigurationException: brokerList must contain at
> > > least one Kafka broker
> > > at
> > > org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
> > >  at
> > > org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
> > >  at
> > > org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
> > > at
> > > org.apache.flume.conf.Configurables.configure(Configurables.java:41)
> > > at
> > > org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
> > >  at
> > > org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
> > >  at
> > > org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
> > >  at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > > at
> > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> > > at
> > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> > >  at
> > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > >  at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > at java.lang.Thread.run(Thread.java:745)
> > > 
> > > 
> > > git clone https://github.com/apache/flume.git
> > > mvn compile install -DskipTests
> > > Version 1.6.0-SNAPSHOT from today
> > > 
> > > Thanks!
> > > 
> > 
> > 


[Attachment #3 (text/html)]

<span id="mailbox-conversation"><div>The ones which prepend kafka in the config \
params are the ones that need to be passed to Kafka, but things like brokerList are \
required for Flume to connect to Kafka in the first place. The basic idea is that \
some params need to be passed to the Flume sink, the rest which the sink does not \
need to connect or write to Kafka can be passed using the kafka. prefix so the Kafka \
client can use them.</div></span><div class="mailbox_signature"> <br>

Thanks,  <div>Hari</div>
</div>
<br><br><div class="gmail_quote"><p>On Wed, Mar 25, 2015 at 12:41 PM, Adam Tannir \
<span dir="ltr">&lt;<a href="mailto:atannir@gmail.com" \
target="_blank">atannir@gmail.com</a>&gt;</span> wrote:<br></p><blockquote \
class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc \
solid;padding-left:1ex;"><div> <div dir="ltr">
<div>Thanks, Hari. Everything works as it should now.<br><br></div>
<div>I had tried a few other configuration entries previously but they all had kafka \
after the sink name.<br><br><br></div> </div>
<div class="gmail_extra">
<br><div class="gmail_quote">On Wed, Mar 25, 2015 at 3:15 PM, Hari Shreedharan <span \
dir="ltr">&lt;<a href="mailto:hshreedharan@cloudera.com">hshreedharan@cloudera.com</a>&gt;</span> \
wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px \
#ccc solid;padding-left:1ex"> <div dir="ltr">Set this param:  <span \
style="font-size:12.8000001907349px">a1.sinks.k1.brokerList = &lt;list of \
brokers&gt;</span><div> <span style="font-size:12.8000001907349px">instead of  \
</span><span style="font-size:12.8000001907349px">a1.sinks.k1.kafka.metadata.</span><span \
style="font-size:12.8000001907349px">broker.list = \
localhost:9091,localhost:9092</span> </div>
</div>
<div class="gmail_extra">
<br clear="all"><div><div><div dir="ltr">
<br><div>Thanks,</div>
<div>Hari</div>
</div></div></div>
<div><div class="h5">
<br><div class="gmail_quote">On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <span \
dir="ltr">&lt;<a href="mailto:atannir@gmail.com">atannir@gmail.com</a>&gt;</span> \
wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px \
#ccc solid;padding-left:1ex"> <div dir="ltr">
<div>
<div>
<div>
<div>Hello,<br><br></div>When running flume with kafka as a sink, an error is logged \
that "brokerList must contain at least one Kafka broker" but the line immediately \
previous shows the host:port entries as were entered in the config file and stored in \
the context.<br><br></div> <div>Everything works when I hardcode the host:port into \
the brokerList string and skip the failing test but that is a suboptimal solution. \
The kafka instances are from their quickstart guide and have no issues.<br><br></div> \
<div>Why isn't the value being selected from the context?<br><br></div> \
<div>flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:<br></div>
 <div>
<br>   private static void addDocumentedKafkaProps(Context context,<br>               \
Properties kafkaProps)<br>                   throws ConfigurationException {<br>      \
String brokerList = context.getString(KafkaSinkConstants<br>                       \
.BROKER_LIST_FLUME_KEY);<br>       if (brokerList == null) {<br>           throw new \
ConfigurationException("brokerList must contain at least " +<br>                      \
"one Kafka broker");<br>       }<br>       \
kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);<br><br>       String \
requiredKey = context.getString(<br>                       \
KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);<br><br>       if (requiredKey != null ) \
{<br>           kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, \
requiredKey);<br>       }<br>   }<br><br><br><br></div> <div>Config:<br><br># \
Describe the sink<br>a1.sinks.k1.type = \
org.apache.flume.sink.kafka.KafkaSink<br>a1.sinks.k1.kafka.metadata.broker.list = \
localhost:9091,localhost:9092<br>a1.sinks.k1.kafka.zookeeper.connect = \
localhost:2181<br>a1.sinks.k1.topic = test<br><br></div> \
<div>logs/flume.log<br></div> <div>
<br>25 Mar 2015 14:28:52,598 INFO   [conf-file-poller-0] \
(org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)   - context={ \
parameters:{topic=test, kafka.metadata.broker.list=localhost:9091,localhost:9092, \
kafka.zookeeper.connect=localhost:2181, type=org.apache.flume.sink.kafka.KafkaSink, \
channel=c1} }<br>25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0] \
(org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)   - Sink k1 has \
been removed due to an error during \
configuration<br>org.apache.flume.conf.ConfigurationException: brokerList must \
contain at least one Kafka broker<br>               at \
org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)<br> \
at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)<br> \
at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)<br>            \
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)<br>           \
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)<br> \
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)<br> \
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)<br> \
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)<br>        \
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)<br>               \
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)<br> \
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)<br> \
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)<br> \
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)<br> \
at java.lang.Thread.run(Thread.java:745)<br><br></div> <div>
<br>git clone <a href="https://github.com/apache/flume.git">https://github.com/apache/flume.git</a><br></div>mvn \
compile install -DskipTests<br></div>Version 1.6.0-SNAPSHOT from \
today<br><br></div>Thanks!<br></div> </blockquote>
</div>
<br></div></div>
</div>
</blockquote>
</div>
<br></div>
</div></blockquote></div><br>



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic