Thursday, July 30, 2015

FLUME ile HDFS'e real time LOG streaming

Merhaba arkadaşlar,

Hadoop'u örneklerle öğrenmeye devam ediyorum :)

Bu yazıda FLUME aracını kullanarak streaming veriyi HDFS'e atmayı örnekleyeceğim. Streaming bir datayı , mesela LOG fileın real time aktarımını Flume ile hızlı bir şekilde HDFS'e atabiliriz. Daha sonra ise sorgulama araçları ile datayı görebilir , Map/Reduce işlemleri ile analiz edebiliriz.


Öncelikle FLUME dan bahsetmek istiyorum.

Flume 'ın mantığı gayet basit. Bir source'dan gelen datayı , sink adı ile tanımladığımız bir target lokasyona atmamızı kolaylaştırıyor. Burda source olarak tanımladığımız yapı stream data üreten bir yapı olabilir. Örneğin bir weberser logu, bir sosyal medya datası, network datası vs.. Gelen datalar bulk şekilde sink olarak tanımlanan lokasyona aktarılır. Bir sink'in çıkışı HDFS olmak zorunda değil, custom bir yapı oluşturarak başka bir source'a bağlayabiliriz.

Source , channel ve sink olarak neler kullanabileceğimizi buradan araştırabilirsiniz.

Flume command line :

$flume-ng help
Usage: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/../lib/flume-ng/bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: $FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.

Kolaylık olması açısından bu örnekte Flume agent üzerinden bir text file'a append eden satırların HDFS'e aktarilmasini örnekleyeceğim. Flume da konfigürasyon file'ın oluşturulması gayet basit. Bizim kullanacağımız conf-file:

$cat exampleFlumeTestHdfs.conf 

# Name the components on this agent
testagent.sources = source1
testagent.sinks = sink1
testagent.channels = channel1

# Describe/configure the source
testagent.sources.source1.type = exec
testagent.sources.source1.command = tail -f test.txt

# Describe the sink
testagent.sinks.sink1.type = hdfs
testagent.sinks.sink1.hdfs.path=hdfs://DATANODE#/user/oracle/flumetest
testagent.sinks.sink1.hdfs.fileType=DataStream


# Use a channel which buffers events in memory
testagent.channels.channel1.type = memory
testagent.channels.channel1.capacity = 1000
testagent.channels.channel1.transactionCapacity = 100

# Bind the source and sink to the channel
testagent.sources.source1.channels = channel1
testagent.sinks.sink1.channel = channel1

Şimdi agentı başlatıyoruz. Çok fazla library yüklemesinden screen kayıyor , gözünüz korkmasın :)

$flume-ng agent --conf conf --conf-file exampleFlumeTestHdfs.conf --name testagent 

...
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/libexec/../../hadoop/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/libexec/../../hadoop/lib/slf4j-log4j12.jar from classpath
Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hbase/bin/../lib/slf4j-log4j12.jar from classpath
Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/libexec/../../hadoop/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/libexec/../../hadoop/lib/slf4j-log4j12.jar from classpath
.....
...
.....
15/07/30 16:49:54 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/07/30 16:49:54 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:exampleFlumeTestHdfs.conf
15/07/30 16:49:54 INFO conf.FlumeConfiguration: Processing:sink1
15/07/30 16:49:54 INFO conf.FlumeConfiguration: Processing:sink1
15/07/30 16:49:54 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: testagent
15/07/30 16:49:54 INFO conf.FlumeConfiguration: Processing:sink1
15/07/30 16:49:55 INFO conf.FlumeConfiguration: Processing:sink1
15/07/30 16:49:55 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [testagent]
15/07/30 16:49:55 INFO node.AbstractConfigurationProvider: Creating channels
15/07/30 16:49:55 INFO channel.DefaultChannelFactory: Creating instance of channel channel1 type memory
15/07/30 16:49:55 INFO node.AbstractConfigurationProvider: Created channel channel1
15/07/30 16:49:55 INFO source.DefaultSourceFactory: Creating instance of source source1, type exec
15/07/30 16:49:55 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
15/07/30 16:49:55 INFO node.AbstractConfigurationProvider: Channel channel1 connected to [source1, sink1]
15/07/30 16:49:55 INFO node.Application: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7ae62ad4 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
15/07/30 16:49:55 INFO node.Application: Starting Channel channel1
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
15/07/30 16:49:55 INFO node.Application: Starting Sink sink1
15/07/30 16:49:55 INFO node.Application: Starting Source source1
15/07/30 16:49:55 INFO source.ExecSource: Exec source starting with command:tail -f test.txt
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
15/07/30 16:49:55 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
15/07/30 16:49:55 INFO source.ExecSource: Command [tail -f test.txt] exited with 1

Flume burda beklemeye geçiyor. Ayrı bir sessionda test.txt ye data girip, server loguna tail ediyormuş testi yapabiliriz. Bu testimde şöyle bir şey farkettim. Eğer test.txt önceden yoksa create edildiğini farketmiyor. Flume agent başlamadan test.txt oluşturmalıyız. 

Biz data attıkça flume da şunlar olmakta :

15/07/30 16:54:49 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
15/07/30 16:54:49 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
15/07/30 16:54:49 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
15/07/30 16:54:49 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
15/07/30 16:54:53 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
15/07/30 16:54:53 INFO hdfs.BucketWriter: Creating hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493696.tmp
15/07/30 16:55:07 INFO hdfs.BucketWriter: Closing hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493696.tmp
15/07/30 16:55:07 INFO hdfs.BucketWriter: Renaming hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493696.tmp to hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493696
15/07/30 16:55:07 INFO hdfs.BucketWriter: Creating hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493697.tmp
15/07/30 16:55:10 INFO hdfs.BucketWriter: Closing hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493697.tmp
15/07/30 16:55:10 INFO hdfs.BucketWriter: Renaming hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493697.tmp to hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493697
15/07/30 16:55:10 INFO hdfs.BucketWriter: Creating hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493698.tmp
15/07/30 16:55:13 INFO hdfs.BucketWriter: Closing hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493698.tmp
15/07/30 16:55:13 INFO hdfs.BucketWriter: Renaming hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493698.tmp to hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493698
15/07/30 16:55:13 INFO hdfs.BucketWriter: Creating hdfs://DATANODE#/user/oracle/flumetest/FlumeData.1438264493699.tmp
....


Bu sırada HDFS'e baktığımızda dataların atıldığını görebiliriz.

$hadoop fs -ls /user/oracle/flumetest

-rw-r--r--   3 oracle hadoop        142 2015-07-30 16:55 /user/oracle/flumetest/FlumeData.1438264493696
-rw-r--r--   3 oracle hadoop        150 2015-07-30 16:55 /user/oracle/flumetest/FlumeData.1438264493697
-rw-r--r--   3 oracle hadoop        150 2015-07-30 16:55 /user/oracle/flumetest/FlumeData.1438264493698
-rw-r--r--   3 oracle hadoop         15 2015-07-30 16:55 /user/oracle/flumetest/FlumeData.1438264493699

Data HDFS'e geldikten sonra cat kullanarak içerik görüntülenebilir. Source olarak pekçok stream kaynağı kullanmamıza izin veriyor Flume, Avro , Thrift, Syslog ,Netcat ..., ve Sink olarak HIVE,Hbase, logger yapıları mevcut.. Sonraki yazılarımda bunlarla ilgili örnekler yapmaya çalışacağım.

Source:
https://flume.apache.org/FlumeUserGuide.html

İyi çalışmalar.




No comments :

Post a Comment