Sunday, May 29, 2016

[Big Data] FLAFKA & 2 topics - 2 hdfs sinks on a Kerberos Secured Cluster

Hi All,

After a long time , i started to complete my drafts :)

Here i will show an example for Flafka, which use Kafka as topics (messages) receiver and writes informations to HDFS via Flume.. After that i will query the logs via HIVE external table.




HINT: For sample message generation, you can use Kafka-producer 

>> IN THIS WORK, I ASSUME YOU INSTALLED KAFKA & FLUME - RUNNING ON KERBEROS SECURED CLOUDERA DISTRO.

So, lets talk about Flafka architecture ..

Flafka means Kafka+Flume combination and configuration together for complete message broadcasting.

As you seen the image from Cloudera page,


Kafka can be located in front of Flume agents and more detailed configurations ( and less simple ) can be done.

Here in my example , i will produce two different messages from source and i want them written in different hdfs paths.

Message X must be written to HDFS/hdfs_x_path
Message Y must be written to HDFS/hdfs_y_path

And i want only 1 agent configuration for this purpose. Lets see how we do Flafka Configuration

# Naming the components on the current agent
#FLAFKA_AGENT#.sources = #SOURCE_FOR_TOPIC_1# #SOURCE_FOR_TOPIC_2#
#FLAFKA_AGENT#.channels = #CHANNEL_FOR_TOPIC_1# #CHANNEL_FOR_TOPIC_2#
#FLAFKA_AGENT#.sinks = #HDFS_SINK_FOR_TOPIC_1# #HDFS_SINK_FOR_TOPIC_2#

# Describing/Configuring the source 
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.type = org.apache.flume.source.kafka.KafkaSource 
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.channels = #CHANNEL_FOR_TOPIC_1#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.zookeeperConnect = #ZOOKEPER_HOST_1#:#ZOOKEPER_PORT#,#ZOOKEPER_HOST_2#:#ZOOKEPER_PORT#,#ZOOKEPER_HOST_3#:#ZOOKEPER_PORT#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.topic = #SOURCE_FOR_TOPIC_1#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.groupId = #GROUP_FOR_TOPIC_1#

#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.type = org.apache.flume.source.kafka.KafkaSource 
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.channels = #CHANNEL_FOR_TOPIC_2#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.zookeeperConnect = #ZOOKEPER_HOST_1#:#ZOOKEPER_PORT#,#ZOOKEPER_HOST_2#:#ZOOKEPER_PORT#,#ZOOKEPER_HOST_3#:#ZOOKEPER_PORT#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.topic = #SOURCE_FOR_TOPIC_2#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.groupId = #GROUP_FOR_TOPIC_2#

# Describing/Configuring the sink 
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.type = hdfs
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.channel = #CHANNEL_FOR_TOPIC_1#
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.useLocalTimeStamp=true
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.path = #HDFS_PATH_FOR_TOPIC_1#
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.rollInterval = 36
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.rollSize = 0
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.rollCount = 0
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.filePrefix = #PREFIX_FOR_TOPIC_1_%Y%m%d
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.fileSuffix = .log
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.inUseSuffix = .incomplete
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.fileType=DataStream
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.writeFormat=Text
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_1#.hdfs.proxyUser = oracle
                      
                                                                       
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.type = hdfs
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.channel = #CHANNEL_FOR_TOPIC_2#
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.useLocalTimeStamp=true
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.path = #HDFS_PATH_FOR_TOPIC_2#
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.rollInterval = 36
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.rollSize = 0
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.rollCount = 0
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.filePrefix = #PREFIX_FOR_TOPIC_2_%Y%m%d
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.fileSuffix = .log
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.inUseSuffix = .incomplete
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.fileType=DataStream
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.writeFormat=Text
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
#FLAFKA_AGENT#.sinks.#HDFS_SINK_FOR_TOPIC_2#.hdfs.proxyUser = oracle


# Describing/Configuring the channel 
#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_1#.type = memory 
#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_1#.capacity = 1000 
#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_1#.transactionCapacity = 100 

#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_2#.type = memory 
#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_2#.capacity = 1000 
#FLAFKA_AGENT#.channels.#CHANNEL_FOR_TOPIC_2#.transactionCapacity = 100 

# Bind the source and sink to the channel 
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_1#.channels = #CHANNEL_FOR_TOPIC_1#
#FLAFKA_AGENT#.sources.#SOURCE_FOR_TOPIC_2#.channels = #CHANNEL_FOR_TOPIC_2#

Here important parts are :
- Define N source-channel-sink configuration per topic, // if you use 1 sink for all , you can define 1 sink 
- If you use kerberos secured environment , define Kerberos Principal and Keytab
- Define source type as KAFKA

After applying this configuration on Cloudera Manager and restart Flume agent

Now lets produce some topic messages for Kafka and see how it works

For sample message generation we can use Kafka producer:

$kafka-console-producer --broker-list #KAFKA_ADDRESSES# --topic #TOPIC_1#
...
sample message
message
asd
dads
sds
..
.

And we can test if our Kafka properly get messages via Kafka-consumer
$kafka-console-consumer --zookeeper #ZOOKEPER_ADDRESS#--topic #TOPIC_1#
..
sample message
message
asd
dads
sds
...

Lets check HDFS locations, if Flume agent writes log or not


$hadoop fs -ls #HDFS_PATH_FOR_TOPIC_1#
....
-rwxrwxrwx   3 oracle supergroup         47 2016-05-25 18:53 #HDFS_PATH_FOR_TOPIC_1#/#PREFIX_FOR_TOPIC_1_20160525.1464191590190.log
-rwxrwxrwx   3 oracle supergroup         64 2016-05-25 18:55 #HDFS_PATH_FOR_TOPIC_1#/#PREFIX_FOR_TOPIC_1_20160525.1464191703561.log
-rwxrwxrwx   3 oracle supergroup         20 2016-05-25 18:59 #HDFS_PATH_FOR_TOPIC_1#/#PREFIX_FOR_TOPIC_1_20160525.1464191953430.log
-rwxrwxrwx   3 oracle supergroup         10 2016-05-25 19:01 #HDFS_PATH_FOR_TOPIC_1#/#PREFIX_FOR_TOPIC_1_20160525.1464192095659.log.incomplete
...

Here we see that operation works well. You can do same operation with TOPIC_2

Now lets create a hive table on those files and see what is inside:

For this purpose, connect beeline and create external table:


$beeline
..
0: jdbc:hive2://$#HIVE_SERVER#:#HIVE_SERVER_PORT#/default> create external table flafka( item STRING, itemvalue STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '#HDFS_PATH_FOR_TOPIC_1#';
No rows affected (0.038 seconds)

>select * from flafka;
+--------------------------------+------------+--+
|           flafka.item | flafka.itemvalue |
+--------------------------------+------------+--+
|.............|
| sample message           | NULL       |
| message| NULL       |
|   asd | NULL       |
| dads| NULL       |
| sds | NULL       |
| ...

Here we see our data can be queried via Hive,

>>> DO NOT FORGET! to create external table according to messages you produce, if you use ',' in your data or number values please specify a detailed external table.

Ok, here is end of story. I hope it works for you.

Thanks for reading.

Thanks to Barış Akgün :)

Enjoy & share.

Source: 







1 comment :

  1. very informative blog and useful article thank you for sharing with us , keep posting Big data hadoop online Course

    ReplyDelete