Flume Hadoop Agent – Spool directory to HDFS

In the previous post, we have already seen how to write the data from spooling directory to the console log. In this post, we will learn how Flume Hadoop agent can write data from Spooling directory to HDFS.

Similarly, if we want to write the data from spooling directory to HDFS we need to edit the properties files and change the sink information to HDFS.

spool-to-hdfs.properties

agent1.sources =datasource1
agent1.sinks =datastore1
agent1.channels =ch1

agent1.sources.datasource1.channels = ch1
agent1.sinks.datastore1.channels = ch1

agent1.sources.datasource1.type =spooldir
agent1.sources.datasource1.spooldir =/usr/kmayank/spooldir
agent1.sinks.datastore1.type =hdfs
agent1.sinks.datastore1.hdfs.path =/temp/flume
agent1.sinks.datastore1.hdfs.filePrefix =events
agent1.sinks.datastore1.hdfs.fileSuffix =.log
agent1.sinks.datastore1.hdfs.inUsePrefix =_
agent1.sinks.datastore1.hdfs.filetype =Datastream

agent1.channels.ch1.type =file

With the help of above properties file when the file will be written to HDFS, the file name will be events_.log

How many events read from the source will go into one HDFS file?

The files in HDFS are rolled over every 30 seconds by default. You can change the interval by setting the “rollInterval” property. The value of “rollInterval” property will be specified in seconds. You can also roll over the files by event count or cumulative event size.

“Filetype” property can be of three different types i.e. sequence file, datastream (text file) or compressed stream.

The default option is sequence file which is binary format file. The event body will be the byte array and the byte array will be written to the sequence file.

It is expected that whoever is reading the sequence file knows how to serialize the binary data to object/data.

Datastream can be any file which is uncompressed such as text file while as the compressed stream will be any file format such as gzip, rar, bzip2 etc.

Now, start the flume agent using below command:

>flume-ng agent \
>--conf-file spool-to-hdfs.properties \
>--name agent1 \
>--Dflume.root.logger=WARN, console

Once, the Flume Hadoop agent is ready, start putting the files in spooling directory. It will trigger some actions in the flume agent.

Once you will see that the spooling directory files are suffixed with “COMPLETED”, go to the HDFS and check whether files have arrived or not. Use below command to list the file in HDFS directory.

hadoop fs –ls /temp/flume

Use the ‘cat’ command to print the content of the file.

hadoop fs –cat /temp/flume/<file_name>

Apache Flume Agent – Flume Hadoop | Spool directory to logger

In this tutorial, we will learn to set up an Apache Flume agent that is going to monitor a spooling directory.

When any log files are added to the directory, it will read those log files and push the content of the log file to the console log.

Here the source is spooling directory and the sink is console log, Apache Flume Agent sits in between spooling directory and console log.

Apache Flume Agent

Configuration of Apache Flume Agent

  1. We need to set up all the options in “.properties” file.
  2. Use the command line to start a flume agent.
  3. Command to start a flume agent as below. “ng” stands for next generation.
$flume-ng agent
  1. Set the “.properties” file as below.

spool-to-log.properties

--name=agent1
agent1.sources =datasource1
agent1.sinks =datastore1
agent1.channels =ch1

agent1.sources.datasource1.channels = ch1
agent1.sinks.datastore1.channels = ch1

agent1.sources.datasource1.type =spooldir
agent1.sources.datasource1.spooldir =/usr/kmayank/spooldir

agent1.sinks.datastore1.type =logger
agent1.channels.ch1.type =file
  1. Run the flume agent using below command:
>flume-ng agent \
>--conf-file spool-to-log.properties \
>--name agent1 \
>--Dflume.root.logger=WARN, console

The last option “Dflume” is optional. It just specifies what the logging should look like on the screen. Here, we have only mentioned the warning to be printed on the screen.

  1. Place the file into the spooling directory. You will notice that after few seconds the file name will get appended with COMPLETED and data of the file will get printed to the screen/console.

It is important to note that whenever you place a file in the spooling directory, the file should be a text file, unique in name and immutable.

To read files in any other format, you have to write a custom deserializer in Java and plug it to your properties file.

Flume Events in Apache Flume Agent

The data inside the Apache Flume agent is represented as flume events. It is the base unit of communication between source, channel, and sink.

A flume event is a discrete object that represents one record of data that need to be transported from source to sink.

Source reads the data in form of flume event, sends the data to the channel and then the sink reads the data from the channel in the form of events.

Apache Flume event consists of key-value pair representing 1 record of data. Key consists of event header i.e. metadata information which deals with how you want to process or route the data to channel or sink.

The value is the actual data which is also called event body. Therefore, each record is represented by one flume event.

For example, one file in the spooling directory is considered as one event. The event body is usually represented by a byte array. When the sources write these data to channel, it can be one event or multiple events.

If the event body exceeds the channel capacity, then apache flume won’t be able to transport that event. For example, if you want to transport very large file to say HDFS, then the direct copy would be a better option than using a flume.

Flume Installation – Apache Flume Agent

Flume installation is a very simple process. Go through the following steps to install and configure Apache Flume.

Steps for Flume Installation

  1. Download the Apache flume from the Apache Webpage. (Apache Flume Download Url)
  2. Extract the folder from the zip file that is downloaded and point to the flume folder in bash profile. The entry to the bash profile is to make sure that you can start the flume agent from any directory. For Example : export FLUME_HOME = $HOME/apache-flume-1.6.0-bin
  3. Append the path variable with FLUME_HOME. For example, export PATH=$PATH:$FLUME_HOME/bin

Flume Agent

A flume agent is an independent java daemon (JVM) which receives events (data) from an external source to the next destination. The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of data storesin big data.

The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of datastores.

Let’s understand this with an example:

Apache Flume Agent

If you have two data sources say DS1 and DS2 and you want to write DS1 data into HDFS and DS2 data into Cassandra.

For this scenario, one flume agent is enough to complete the job. A flume agent does hop by hop operation i.e. writing the data of DS1 and DS2 to HDFS and Cassandra is one hop complete.

Let us suppose you have another hop for the data i.e. data that is written to the HDFS is read by some other application and finally it needs to go to some other datastore say hive.

Here, there are two flume agents required since we have two hops of data. One flume agent for DS1 and DS2 data to HDFS, Cassandra respectively and the other flume agent from HDFS to Hive.

There are three basic components of a flume agent:

  1. The first component to receive data. This component is called as the source.
  2. The second component to buffer data. This component is called as the channel.
  3. The third component to write data. This component is called as the sink.

A flume agent is set up using a configuration file and within that configuration file, we have to configure about the sources and the format in which data is sent to your source.

It is important to configure the channel capacity as per the rates of the source and the sink. There are many types of channels but the two are most commonly used channels.

First, is called an in-memory channel which uses the memory of the system where flume agent is running and buffers the data where in-memory. The in-memory channel basically acts like an in-memory queue.

The source will write to the tail of the queue and the sink will read from the head of the queue.

But there are issues with the memory channel. One primary issue is that it is capacity constraint by the amount of memory system has. In case of the crash, memory channel is not persistent.

Therefore, all the data that is present in the buffer might be lost. File channels are the best because it gives you fault-tolerance and non-lossy data i.e. you will get a guarantee of no data loss.

Since the data is buffered on disk for file channels, you can have larger buffer capacity as per your requirement. Channels are continuously polled by sink components which write the data to endpoints.

Multiple sources can write to the single channel. Also, one source can write to multiple channels i.e. there is a many-to-many relationship between sources and channel.

However, channel to sink relationship is one to one. The channel will not immediately delete the data as it writes to the sink. It will wait for an acknowledgment from sink before deleting any data from the channel.

Apache Flume – Apache Sqoop | Need and Importance

This tutorial outlines the need and importance of Apache Flume and Apache Sqoop. There are quite a few number of data stores in the market today. Among them, the most popular ones are HDFS, Hive, HBase, MongoDB, Cassandra.

Advantages of Distributed Data Stores

All of them are open source technologies, but the key question to think about is, why are these data stores so popular?

All of these data stores are distributed in nature i.e. they use the cluster of machines to scale them linearly.

The other advantage of these data stores is that you can use one single system for both transactional and analytical data.

But the main question here is where does this data come from? General, the data come from two kinds of sources.

Either it is an application that is producing the data such as user notifications, weblogs, sales etc produces a large amount of data on regular basis.

Therefore, we need a datastore to store this kind of data.

The other kind of data source is a traditional RDBMS data. This could be OracleDB, MySQL, IBM DB2, SQL Server etc.

Steps to port RDBMS data to HDFS

Let us suppose we have a requirement to port the archived RDBMS data to HDFS datastore. The important thing to understand is that how do we get the RDBMS or Application data into HDFS.

Normally, Hadoop Ecosystem exposes JAVA API’s to write data into HDFS or different data stores. Therefore, we have JAVA API’s for HDFS, HBASE, Cassandra etc.

We can directly use these API’s to write the data to HDFS, HBase, Cassandra etc. But there are problems when you are streaming from an application or bulk transferring the data from tables in RDBMS.

Let us suppose we have an application that sends “user notifications” for a professional network and then tracks metrics such as the number of views or number of clicks on user notifications.

We have the number of events that are producing data for example:

  1. Creating a notification is an event
  2. The user reading the notification
  3. The user clicking on a notification produces data

This data needs to be stored as the event occurs. The moment this data gets generated, it needs to be sent to the data store. This is called streaming data.

Let us suppose we want to write this data to HDFS. For this, firstly we need to integrate my application with JAVA API’s of HDFS data store.

Secondly, we need to devise a mechanism to buffer these streaming data. It is important to note that HDFS stores the data in the form of files which is distributed across the cluster of commodity server in terms of the block of data say 64MB, 128 MB chunk etc.

The metadata about these data blocks is written into one high-end server which we call as NameNode. If there is a number of small files then we are creating an extra overhead to NameNode to keep the metadata information for more number of files.

If we have the small amount of data then we are not taking the advantage of HDFS which has the cluster of commodity server.

Now, we have a challenge here, how do we create large size files with the small number from of streaming data. One way is to keep HDFS file open and keep writing into the same file until it is big enough.

But, this is not a good idea because if you will keep the file open for longer duration there are a lot of chances that file become corrupt or data being lost in case there is a failure.

Therefore, the solution is the buffer, i.e either you have an in-memory buffer or intermediate file before we write to HDFS.

It is not just the buffer mechanism, you need to create a single large file but the buffer layer should be fault tolerant and non-lossy.

Therefore, we can say that writing an application which buffers the data and integrating with the JAVA API’s of the various data store is a time-consuming and costly process.

Similarly, when you want to port your RDBMS data to HDFS, the similar kind of challenges will occur.

So, directly using the JAVA API’s is a very tedious process and hence the solution is flume Hadoop and Sqoop Hadoop which are the technologies developed to isolate and abstract the transport of data between a source and a data store.

Apache Flume

Apache Flume and Apache Sqoop are open source technologies developed and maintained by Apache foundation.

Apache Flume acts as a buffer between your streaming application and data store. Apache Flume can read data from different types of sources such as HTTP, a file directory, Syslog messages etc. and write data to many types of sinks e.g. HDFS, HBase, Cassandra etc.

Once we define the source and the sink then we will put Apache Flume in between to transport the data between application and data store.

Apache Flume will buffer the data based on the requirement of the source and the sink.

The source might have different rates at which it writes the data and the sink might have the different rate at which it reads the data.

Different sources might produce data at different rates and in different formats. Flume Hadoop can read the different sources at the same time. Therefore Flume Hadoop will able to deal with different rates and different formats.

Sink might require data to be written in particular format at a particular rate. Apache Flume comes with built-in handlers for common sources and sinks.

Apache Flume takes care of fault tolerance and guarantees no data loss for certain configurations. Apache Flume is a push-based system meaning it will decide when the data should be written to sink.

Apache Sqoop

In a pull-based system, the sink would subscribe a system that produces the data and pull the data at a regular frequency.

Whenever you add multiple sinks then the configuration of flume must change while as in case of the pull-based system configuration of a system that is writing will not change only a new system will start subscribing when it is added as part of the sink. Apache Kafka is an example of the pull-based system.

Apache Kafka is an example of the pull-based system.

Apache Sqoop is a command line tool which can directly import data to RDBMS from Hadoop layer.

Therefore, Sqoop is a pull-based system used for bulk import, not the streaming data.

Sqoop comes with the connector for many popular RDBMS. With the help of Sqoop, you can import the entire tables from RDBMS or the result of specific SQL Queries. You can also schedule periodic imports from Apache Sqoop jobs.