CAP Theorem – Brewer’s Theorem | Hadoop HBase

In this post, we will understand about CAP theorem or Brewer’s theorem. This theorem was proposed by Eric Brewer of  University of California, Berkeley.

CAP Theorem or Brewer’s Theorem

CAP theorem, also known as Brewer’s theorem states that it is impossible for a distributed computing system to simultaneously provide all the three guarantee i.e.  Consistency, Availability or Partition tolerance.

Therefore, at any point of time for any distributed system, we can choose only two of consistency, availability or partition tolerance.

Availability

Even if any of one node goes down, we can still access the data.

Consistency

You access the most recent data.

Partition Tolerance

Between the nodes, it should tolerate network outage.

The above of the three guarantees are shown in three vertices of a triangle and we are free to choose any side of the triangle.

Therefore, we can choose (Availability and Consistency) or (Availability and Partition Tolerance) or (Consistency and Partition Tolerance).

Please refer to figure below:

CAP theorem
CAP Theorem

Relational Databases such as Oracle, MySQL choose Availability and Consistency while databases such as Cassandra, Couch, DynoDB choose Availability and Partition Tolerance and the databases such as HBase, MongoDB choose Consistency and Partition Tolerance.

CAP Theorem Example 1:  Consistency and Partition Tolerance

Let us take an example to understand one of the use cases say (Consistency and Partition Tolerance).

These databases are usually shared or distributed data and they tend to have master or primary node through which they can handle the right request. A good example is MongoDB.

What happens when the master goes down?

In this case, usually another master will get elected and till then data can’t be read from other nodes as it is not consistent. Therefore, availability is sacrificed.

However, if the write operation went fine and there is network outage between the nodes, there is no problem because the secondary node can serve the data. Therefore, partition tolerance is achieved.

CAP Theorem Example 2: Availability and Partition Tolerance

Let us try to understand an example for Availability and Partition Tolerance.

These databases are also shared and distributed in nature and usually master-less. This means every node is equal. Cassandra is a good example of this kind of databases.

Let us consider we have an overnight batch job that writes the data from a mainframe to Cassandra database and the same database is read throughout a day. If we have to read the data as and when it is written then we might get stale data and hence the consistency is sacrificed.

Since this is the read heavy and write once use case, I don’t care about reading data immediately. I just care about once the write has happened, we can read from any of the nodes.

But Availability is one of the important parameters because if one of the nodes goes down we can be able to read the data from another backup node. The system as a whole is available.

Partition tolerance will help us in any network outage between the nodes. If any of the nodes goes down due to network issue another node can take it up.

NoSQL databases – Introduction, features, NoSQL vs SQL

NoSQL databases are a non-relational database management system, which is cluster-friendly, designed for the large volume of distributed data stores.

Relational Model follows the de facto standard for database design which uses primary and foreign keys relation in order to store or manipulate data.

However, with the growing volume of unstructured data in distributed computing environment, relational model does not suites well. Relational models were not built to take advantage of commodity storage and the processing power available in today’s arena.

As the data volume grows in size, it is difficult to store the data into single node system which relational model adhere to. This gives the birth of commodity storage where the large cluster of commodity machine interacting each other in distributed fashion.

SQL (Structured Query Language) is designed to work with single node system. It does not work very well with the large cluster of storage. Therefore, top internet companies such as Google, Facebook and Amazon started looking solution to overcome the drawback of RDBMS.

This inspired the whole new movement of databases which is the “NoSQL” movement.

NoSQL databases do not require the fixed schema and typically it scales horizontally i.e. addition of extra commodity machine to the resource pool, so that load can be distributed easily.

Sometimes we create the data with several levels of nesting which is highly complicated to understand. For example geo-spatial, molecular modeling data.

Big Data NoSQL databases ease the representation of nested or multi-level hierarchical data using the JSON (JavaScript Object Notation) format.

NoSQL Databases Features

Lets got through some of the key NoSQL database features and how it is different from the traditional databases.

Schemaless databases

Traditional databases require pre-defined schema.

A database schema is basically the structure that tells how the data is organized, relations among database objects and the constraint that is applied to the data.

While in NoSQL database there is no need to define the structure. This gives us the flexibility to store information without doing upfront schema design.

Therefore, a user of NoSQL databases can store data of different structures in the same database table. That’s why; these databases are also sometimes referred as “Schema on Read” databases.

That means data is applied to a plan or schema only when it is read or pulled out from a stored location.

Non-Relational

NoSQL databases are non-relational in nature. It can store any type of contents.

There is no need to apply the data modeling techniques such as ER modeling, Star modeling etc.

A single record can accommodate transaction as well as attribute details such as address, account, cost center etc.

Non-Relational doesn’t fit into rows and columns and is mainly designed to take care unstructured data.

Distributed Computing

You can scale your system horizontally by taking advantage of low-end commodity servers.

Distribution of processing load and the scaling of data sets is the common features of many NoSQL databases.

Data is automatically distributed over the cluster of commodity servers and if you need further improvement to the scalability, you can keep adding the commodity server in the cluster.

Aggregate Data Models

Aggregation Model talks about data as a unit. It makes easier to manage data over a cluster.

When the unit of data gets retrieved from the NoSQL databases, it gets all the related data along with it.

Let us say we need to find Product by Category. In the relational model, we use normalization technique and we create two tables as Product and Category respectively. Whenever we need to retrieve the details about Product by Category then we perform a join operation and retrieve the details.

While as in NoSQL databases, we create one document which holds product as well as category information.

Product =

{

sku: 321342,

name:book

price:50.00

subject: mathematics

item_in_stocks: 5000

category:[{id:1,name:math5},{id:2,name:math6}]

}

Flume Hadoop Agent – Spool directory to HDFS

In the previous post, we have already seen how to write the data from spooling directory to the console log. In this post, we will learn how Flume Hadoop agent can write data from Spooling directory to HDFS.

Similarly, if we want to write the data from spooling directory to HDFS we need to edit the properties files and change the sink information to HDFS.

spool-to-hdfs.properties

agent1.sources =datasource1
agent1.sinks =datastore1
agent1.channels =ch1

agent1.sources.datasource1.channels = ch1
agent1.sinks.datastore1.channels = ch1

agent1.sources.datasource1.type =spooldir
agent1.sources.datasource1.spooldir =/usr/kmayank/spooldir
agent1.sinks.datastore1.type =hdfs
agent1.sinks.datastore1.hdfs.path =/temp/flume
agent1.sinks.datastore1.hdfs.filePrefix =events
agent1.sinks.datastore1.hdfs.fileSuffix =.log
agent1.sinks.datastore1.hdfs.inUsePrefix =_
agent1.sinks.datastore1.hdfs.filetype =Datastream

agent1.channels.ch1.type =file

With the help of above properties file when the file will be written to HDFS, the file name will be events_.log

How many events read from the source will go into one HDFS file?

The files in HDFS are rolled over every 30 seconds by default. You can change the interval by setting the “rollInterval” property. The value of “rollInterval” property will be specified in seconds. You can also roll over the files by event count or cumulative event size.

“Filetype” property can be of three different types i.e. sequence file, datastream (text file) or compressed stream.

The default option is sequence file which is binary format file. The event body will be the byte array and the byte array will be written to the sequence file.

It is expected that whoever is reading the sequence file knows how to serialize the binary data to object/data.

Datastream can be any file which is uncompressed such as text file while as the compressed stream will be any file format such as gzip, rar, bzip2 etc.

Now, start the flume agent using below command:

>flume-ng agent \
>--conf-file spool-to-hdfs.properties \
>--name agent1 \
>--Dflume.root.logger=WARN, console

Once, the Flume Hadoop agent is ready, start putting the files in spooling directory. It will trigger some actions in the flume agent.

Once you will see that the spooling directory files are suffixed with “COMPLETED”, go to the HDFS and check whether files have arrived or not. Use below command to list the file in HDFS directory.

hadoop fs –ls /temp/flume

Use the ‘cat’ command to print the content of the file.

hadoop fs –cat /temp/flume/<file_name>

Apache Flume Agent – Flume Hadoop | Spool directory to logger

In this tutorial, we will learn to set up an Apache Flume agent that is going to monitor a spooling directory.

When any log files are added to the directory, it will read those log files and push the content of the log file to the console log.

Here the source is spooling directory and the sink is console log, Apache Flume Agent sits in between spooling directory and console log.

Apache Flume Agent

Configuration of Apache Flume Agent

  1. We need to set up all the options in “.properties” file.
  2. Use the command line to start a flume agent.
  3. Command to start a flume agent as below. “ng” stands for next generation.
$flume-ng agent
  1. Set the “.properties” file as below.

spool-to-log.properties

--name=agent1
agent1.sources =datasource1
agent1.sinks =datastore1
agent1.channels =ch1

agent1.sources.datasource1.channels = ch1
agent1.sinks.datastore1.channels = ch1

agent1.sources.datasource1.type =spooldir
agent1.sources.datasource1.spooldir =/usr/kmayank/spooldir

agent1.sinks.datastore1.type =logger
agent1.channels.ch1.type =file
  1. Run the flume agent using below command:
>flume-ng agent \
>--conf-file spool-to-log.properties \
>--name agent1 \
>--Dflume.root.logger=WARN, console

The last option “Dflume” is optional. It just specifies what the logging should look like on the screen. Here, we have only mentioned the warning to be printed on the screen.

  1. Place the file into the spooling directory. You will notice that after few seconds the file name will get appended with COMPLETED and data of the file will get printed to the screen/console.

It is important to note that whenever you place a file in the spooling directory, the file should be a text file, unique in name and immutable.

To read files in any other format, you have to write a custom deserializer in Java and plug it to your properties file.

Flume Events in Apache Flume Agent

The data inside the Apache Flume agent is represented as flume events. It is the base unit of communication between source, channel, and sink.

A flume event is a discrete object that represents one record of data that need to be transported from source to sink.

Source reads the data in form of flume event, sends the data to the channel and then the sink reads the data from the channel in the form of events.

Apache Flume event consists of key-value pair representing 1 record of data. Key consists of event header i.e. metadata information which deals with how you want to process or route the data to channel or sink.

The value is the actual data which is also called event body. Therefore, each record is represented by one flume event.

For example, one file in the spooling directory is considered as one event. The event body is usually represented by a byte array. When the sources write these data to channel, it can be one event or multiple events.

If the event body exceeds the channel capacity, then apache flume won’t be able to transport that event. For example, if you want to transport very large file to say HDFS, then the direct copy would be a better option than using a flume.

Flume Installation – Apache Flume Agent

Flume installation is a very simple process. Go through the following steps to install and configure Apache Flume.

Steps for Flume Installation

  1. Download the Apache flume from the Apache Webpage. (Apache Flume Download Url)
  2. Extract the folder from the zip file that is downloaded and point to the flume folder in bash profile. The entry to the bash profile is to make sure that you can start the flume agent from any directory. For Example : export FLUME_HOME = $HOME/apache-flume-1.6.0-bin
  3. Append the path variable with FLUME_HOME. For example, export PATH=$PATH:$FLUME_HOME/bin

Flume Agent

A flume agent is an independent java daemon (JVM) which receives events (data) from an external source to the next destination. The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of data storesin big data.

The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of datastores.

Let’s understand this with an example:

Apache Flume Agent

If you have two data sources say DS1 and DS2 and you want to write DS1 data into HDFS and DS2 data into Cassandra.

For this scenario, one flume agent is enough to complete the job. A flume agent does hop by hop operation i.e. writing the data of DS1 and DS2 to HDFS and Cassandra is one hop complete.

Let us suppose you have another hop for the data i.e. data that is written to the HDFS is read by some other application and finally it needs to go to some other datastore say hive.

Here, there are two flume agents required since we have two hops of data. One flume agent for DS1 and DS2 data to HDFS, Cassandra respectively and the other flume agent from HDFS to Hive.

There are three basic components of a flume agent:

  1. The first component to receive data. This component is called as the source.
  2. The second component to buffer data. This component is called as the channel.
  3. The third component to write data. This component is called as the sink.

A flume agent is set up using a configuration file and within that configuration file, we have to configure about the sources and the format in which data is sent to your source.

It is important to configure the channel capacity as per the rates of the source and the sink. There are many types of channels but the two are most commonly used channels.

First, is called an in-memory channel which uses the memory of the system where flume agent is running and buffers the data where in-memory. The in-memory channel basically acts like an in-memory queue.

The source will write to the tail of the queue and the sink will read from the head of the queue.

But there are issues with the memory channel. One primary issue is that it is capacity constraint by the amount of memory system has. In case of the crash, memory channel is not persistent.

Therefore, all the data that is present in the buffer might be lost. File channels are the best because it gives you fault-tolerance and non-lossy data i.e. you will get a guarantee of no data loss.

Since the data is buffered on disk for file channels, you can have larger buffer capacity as per your requirement. Channels are continuously polled by sink components which write the data to endpoints.

Multiple sources can write to the single channel. Also, one source can write to multiple channels i.e. there is a many-to-many relationship between sources and channel.

However, channel to sink relationship is one to one. The channel will not immediately delete the data as it writes to the sink. It will wait for an acknowledgment from sink before deleting any data from the channel.