[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-user
Subject:    Re: Flume log rolling when you need to do rollups for multiple time zones
From:       Hari Shreedharan <hshreedharan () apache ! org>
Date:       2014-07-29 22:08:20
Message-ID: CAL=PKNBr8wzShq2ZShCeG3Ymyj6qg0hNLExqwx6dQBzXb_Gfrw () mail ! gmail ! com
[Download RAW message or body]

The issue seems to be that it is already the next day when the event
arrives at the agent. You can either move the timestamp interceptor to the
first Flume agent in the pipeline - which reduces the time window in which
this can occur or insert a timestamp header when you create the event (the
header name needs to be "timestamp" with value being
System.currentTimeMillis()). That will guarantee that the event will be in
the correct bucket.

Also, use idleTimeout in the hdfs sink config to reduce the time period a
file waits before it is closed.


On Tue, Jul 29, 2014 at 9:01 AM, Gary Malouf <malouf.gary@gmail.com> wrote:

> Hi Hari,
>
> Below is the config for one of our source-channel-sink combos.  In
> hadoop/spark world, how do you then handle the events that arrive late to
> the bucket?  That is, events for July 15 UTC end up in the July 16 bucket.
>  The ugly way I have been handling this to date is that for any query for a
> particular day I attempt to grab the next UTC day in addition to the
> current one (per my path config in previous email).
>
>
> agent-1.channels.imp-ch1.type = file
> agent-1.channels.imp-ch1.checkpointDir = /opt/flume/imp-ch1/checkpoint
> agent-1.channels.imp-ch1.dataDirs = /opt/flume/imp-ch1/data
> agent-1.channels.imp-ch1.capacity = 100000000
>
> agent-1.sources.avro-imp-source1.channels = imp-ch1
> agent-1.sources.avro-imp-source1.type = avro
> agent-1.sources.avro-imp-source1.bind = 0.0.0.0
> agent-1.sources.avro-imp-source1.port = 41414
>
> agent-1.sources.avro-imp-source1.interceptors = host1 timestamp1
> agent-1.sources.avro-imp-source1.interceptors.host1.type = host
> agent-1.sources.avro-imp-source1.interceptors.host1.useIP = false
> agent-1.sources.avro-imp-source1.interceptors.timestamp1.type = timestamp
>
>
> agent-1.sinks.hdfs-imp-sink1.channel = imp-ch1
> agent-1.sinks.hdfs-imp-sink1.type = hdfs
> agent-1.sinks.hdfs-imp-sink1.hdfs.path =
> hdfs://nn-01:8020/impressions/yr=%Y/mo=%m/d=%d/
> agent-1.sinks.hdfs-imp-sink1.hdfs.filePrefix = %{host}
> agent-1.sinks.hdfs-imp-sink1.hdfs.batchSize = 25
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollInterval = 3600
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollCount = 0
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollSize = 0
>
>
>
>
> On Tue, Jul 29, 2014 at 11:17 AM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
>
>> Can you send your config? There are a couple of params that allow the
>> files to be rolled faster - idleTimeout and rollInterval. I am assuming you
>> are using rollInterval already. idleTimeout will close a file when it is
>> not written to for the configured time. That might help with the rolling.
>> Remember though that if events arrive "late" for a bucket due to failures
>> or network issues, new files will be opened in that bucket if none are
>> currently open.
>>
>>
>> On Tue, Jul 29, 2014 at 7:29 AM, Gary Malouf <malouf.gary@gmail.com>
>> wrote:
>>
>>> We are an ad tech company that buys and sells digital media.  To date,
>>> we have been using Apache Flume 1.4.x to ingest all of our bid request,
>>> response, impression and attribution data.
>>>
>>> The logs currently 'roll' hourly for each data type, meaning that at
>>> some point during each hour (if Flume is behaving) the tmp file in HDFS is
>>> closed/renamed with a new one being opened.  This is done for each of 5
>>> running Flume instances.
>>>
>>> One problem that has been a challenge to date is effectively bounding
>>> our data queries to make sure we capture all of the data for a given
>>> interval without pulling in the world.  To date, our structure (all in UTC)
>>> for each data type is:
>>>
>>> /datatype/yr=2014/mo=06/d=15/{files}
>>>
>>> The challenge for us is that Flume is not perfect.
>>>
>>> 1) It can and will often write data that came in on the new UTC day into
>>> the previous one if that log file has not rolled yet.
>>>
>>> 2) Since it does not roll perfectly at the top of each hour, we are
>>> having trouble determining the best way to tightly bound a query for data
>>> that is within a few [3-6] hour window properly.
>>>
>>> 3) When we are doing data rollups in timezones other than UTC, we end up
>>> reading in all of the data for both UTC containing that data to be on the
>>> safe-side.  It would be nice to bound this as described in (2).
>>>
>>>
>>> One of the major problems affecting the first two cases is that Flume
>>> sometimes gets 'stuck' - that is, the data will hang out in the file
>>> channel for longer than we anticipate.
>>>
>>> Anyway, I was just wondering how others have approached these problems
>>> to date.  If not for the edge cases when data can get stuck in Flume, I
>>> think this would be straightforward.
>>>
>>>
>>
>

[Attachment #3 (text/html)]

<div dir="ltr"><div>The issue seems to be that it is already the next day when the \
event arrives at the agent. You can either move the timestamp interceptor to the \
first Flume agent in the pipeline - which reduces the time window in which this can \
occur or insert a timestamp header when you create the event (the header name needs \
to be &quot;timestamp&quot; with value being System.currentTimeMillis()). That will \
guarantee that the event will be in the correct bucket.<br> \
</div><div><br></div><div>Also, use idleTimeout in the hdfs sink config to reduce the \
time period a file waits before it is closed.</div><div \
class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Jul 29, 2014 at 9:01 AM, \
Gary Malouf <span dir="ltr">&lt;<a href="mailto:malouf.gary@gmail.com" \
target="_blank">malouf.gary@gmail.com</a>&gt;</span> wrote:<br> <blockquote \
class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc \
solid;padding-left:1ex"><div dir="ltr">Hi Hari,<div><br></div><div>Below is the \
config for one of our source-channel-sink combos.   In hadoop/spark world, how do you \
then handle the events that arrive late to the bucket?   That is, events for July 15 \
UTC end up in the July 16 bucket.   The ugly way I have been handling this to date is \
that for any query for a particular day I attempt to grab the next UTC day in \
addition to the current one (per my path config in previous email).</div>

<div><br></div><div><br></div><div><div>agent-1.channels.imp-ch1.type = \
file</div><div>agent-1.channels.imp-ch1.checkpointDir = \
/opt/flume/imp-ch1/checkpoint</div><div>agent-1.channels.imp-ch1.dataDirs = \
/opt/flume/imp-ch1/data</div>

<div>agent-1.channels.imp-ch1.capacity = \
100000000</div><div><br></div><div>agent-1.sources.avro-imp-source1.channels = \
imp-ch1</div><div>agent-1.sources.avro-imp-source1.type = \
avro</div><div>agent-1.sources.avro-imp-source1.bind = 0.0.0.0</div>

<div>agent-1.sources.avro-imp-source1.port = \
41414</div><div><br></div><div>agent-1.sources.avro-imp-source1.interceptors = host1 \
timestamp1</div><div>agent-1.sources.avro-imp-source1.interceptors.host1.type = \
host</div> <div>
agent-1.sources.avro-imp-source1.interceptors.host1.useIP = \
false</div><div>agent-1.sources.avro-imp-source1.interceptors.timestamp1.type = \
timestamp</div><div><br></div><div><br></div><div>agent-1.sinks.hdfs-imp-sink1.channel \
= imp-ch1</div>

<div>agent-1.sinks.hdfs-imp-sink1.type = \
hdfs</div><div>agent-1.sinks.hdfs-imp-sink1.hdfs.path = \
hdfs://nn-01:8020/impressions/yr=%Y/mo=%m/d=%d/</div><div>agent-1.sinks.hdfs-imp-sink1.hdfs.filePrefix \
= %{host}</div><div> agent-1.sinks.hdfs-imp-sink1.hdfs.batchSize = 25</div>
<div>agent-1.sinks.hdfs-imp-sink1.hdfs.rollInterval = \
3600</div><div>agent-1.sinks.hdfs-imp-sink1.hdfs.rollCount = \
0</div><div>agent-1.sinks.hdfs-imp-sink1.hdfs.rollSize = \
0</div></div><div><br></div><div><br></div></div> <div class="HOEnZb"><div \
class="h5"> <div class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Jul 29, \
2014 at 11:17 AM, Hari Shreedharan <span dir="ltr">&lt;<a \
href="mailto:hshreedharan@cloudera.com" \
target="_blank">hshreedharan@cloudera.com</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc \
solid;padding-left:1ex"><div dir="ltr">Can you send your config? There are a couple \
of params that allow the files to be rolled faster - idleTimeout and rollInterval. I \
am assuming you are using rollInterval already. idleTimeout will close a file when it \
is not written to for the configured time. That might help with the rolling. Remember \
though that if events arrive &quot;late&quot; for a bucket due to failures or network \
issues, new files will be opened in that bucket if none are currently open.</div>

<div><div>

<div class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Jul 29, 2014 at \
7:29 AM, Gary Malouf <span dir="ltr">&lt;<a href="mailto:malouf.gary@gmail.com" \
target="_blank">malouf.gary@gmail.com</a>&gt;</span> wrote:<br>



<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc \
solid;padding-left:1ex"><div dir="ltr">We are an ad tech company that buys and sells \
digital media.   To date, we have been using Apache Flume 1.4.x to ingest all of our \
bid request, response, impression and attribution data.   <div>



<br></div><div>
The logs currently &#39;roll&#39; hourly for each data type, meaning that at some \
point during each hour (if Flume is behaving) the tmp file in HDFS is closed/renamed \
with a new one being opened.   This is done for each of 5 running Flume \
instances.</div>




<div><br></div><div>One problem that has been a challenge to date is effectively \
bounding our data queries to make sure we capture all of the data for a given \
interval without pulling in the world.   To date, our structure (all in UTC) for each \
data type is:</div>




<div><br></div><div>/datatype/yr=2014/mo=06/d=15/{files}</div><div><br></div><div>The \
challenge for us is that Flume is not perfect.   </div><div><br></div><div>1) It can \
and will often write data that came in on the new UTC day into the previous one if \
that log file has not rolled yet.</div>




<div><br></div><div>2) Since it does not roll perfectly at the top of each hour, we \
are having trouble determining the best way to tightly bound a query for data that is \
within a few [3-6] hour window properly.   </div><div>




<br></div><div>3) When we are doing data rollups in timezones other than UTC, we end \
up reading in all of the data for both UTC containing that data to be on the \
safe-side.   It would be nice to bound this as described in (2).</div>




<div><br></div><div><br></div><div>One of the major problems affecting the first two \
cases is that Flume sometimes gets &#39;stuck&#39; - that is, the data will hang out \
in the file channel for longer than we anticipate.</div>




<div><br></div><div>Anyway, I was just wondering how others have approached these \
problems to date.   If not for the edge cases when data can get stuck in Flume, I \
think this would be straightforward.</div><div><br></div>



</div>
</blockquote></div><br></div>
</div></div></blockquote></div><br></div>
</div></div></blockquote></div><br></div></div>



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic