Pig Operators – Pig Input, Output Operators, Pig Relational Operators

Input, output operators, relational operators, bincond operators are some of the Pig operators. Let us understand each of these, one by one.

Pig Input Output Operators

Pig LOAD Operator (Input)

The first task for any data flow language is to provide the input. Load operator in the Pig is used for input operation which reads the data from HDFS or local file system.

By default, it looks for the tab delimited file.

For Example X = load ‘/data/hdfs/emp’; will look for “emp” file in the directory “/data/hdfs/”. If the directory path is not specified, Pig will look for home directory on HDFS file system.

If you are loading the data from other storage system say HBase then you need to specify the loader function for that very storage system.

X = load ’emp’ using HBaseStorage();

If we will not specify the loader function then by default it will use the “PigStorage” and the file it assumes as tab delimited file.

If we have a file with the field separated other than tab delimited then we need to exclusively pass it as an argument in the load function. Example as below:

X = load ’emp’ using PigStorage(‘,’);

Pig Latin also allows you to specify the schema of data you are loading by using the “as” clause in the load statement.

X = load ’emp’ as (ename,eno,sal,dno);

If we will load the data without specifying schema then columns will be addressed as $01, $02, etc. While specifying the schema, we can also specify the datatype along with column name details. For Example:

X = load ’emp’ as (ename: chararray, eno: int,sal:float,dno:int);

X = load ”hdfs://localhost:9000/pig_data/emp_data.txt’ USING PigStorage(‘,’) as (ename: chararray, eno: int, sal:float, dno:int);

Pig STORE Operator (Output)

Once the data is processed, you want to write the data somewhere. The “store” operator is used for this purpose. By default, Pig stores the processed data into HDFS in tab-delimited format.

Store processed into ‘/data/hdfs/emp’;

PigStorage will be used as the default store function otherwise we can specify exclusively depending upon the storage.

Store emp into emp using HBaseStorage();

We can also specify the file delimiter while writing the data.

Store emp into emp using PigStorage(‘,’);

Pig DUMP Operator (on command window)

If you wish to see the data on screen or command window (grunt prompt) then we can use the dump operator.

dump emp;

Pig Relational Operators

Pig FOREACH Operator

Loop through each tuple and generate new tuple(s). Let us suppose we have a file emp.txt kept on HDFS directory. Sample data of emp.txt as below:

mak,101,5000.0,500.0,10
ronning,102,6000.0,300.0,20
puru,103,6500.0,700.0,10

Firstly we have to load the data into pig say through relation name as “emp_details”.

grunt> emp_details = LOAD ’emp’ USING PigStorage(‘,’) as (ename: chararray, eno: int,sal:float,bonus:float,dno:int);

Now we need to get the ename, eno and dno for each employee from the relation emp_details and store it into another relation named employee_foreach.

grunt> employee_foreach = FOREACH emp_details GENERATE ename,eno,dno;

Verify the foreach relation “employee_foreach”  using DUMP operator.

grunt> Dump employee_foreach;

Standard arithmetic operation for integers and floating point numbers are supported in foreach relational operator.

grunt> emp_details = LOAD ’emp’ USING PigStorage(‘,’) as (ename: chararray, eno: int,sal:float,bonus:float,dno:int);

grunt> emp_total_sal = foreach emp_details GENERATE sal+bonus;

grunt> emp_total_sal1 = foreach emp_details GENERATE $2+$3;

emp_total_sal and emp_total_sal1 gives you the same output. References through positions are useful when the schema is unknown or undeclared. Positional references starts from 0 and is preceded by $ symbol.

Range of fields can also be accessed by using double dot (..). For Example:

grunt> emp_details = LOAD ’emp’ USING PigStorage(‘,’) as (ename: chararray, eno: int,sal:float,bonus:float,dno:int);

grunt> beginning = FOREACH emp_details GENERATE ..sal;

The output of the above statement will generate the values for the columns ename, eno, sal.

grunt> middle = FOREACH emp_details GENERATE eno..bonus;

The output of the above statement will generate the values for the columns eno, sal, bonus.

grunt> end = FOREACH emp_details GENERATE bonus..;

The output of the above statement will generate the values for the columns bonus, dno.

Bincond or Boolean Test

The binary conditional operator also referred as “bincond” operator. Let us understand it with the help of an example.

5==5 ? 1:2 It begins with the Boolean test followed by the symbol “?”. If the Boolean condition is true then it will return the first value after “?” otherwise it will return the value which is after the “:”. Here, the Boolean condition is true hence the output will be “1”.

5==6 ? 1:2 Output, in this case, will be “2”.

We have to use projection operator for complex data types. If you reference a key that does not exist in the map, the result is a null. For Example:

student_details = LOAD ‘student’ as (sname:chararray, sclass:chararray, rollnum:int, stud:map[]);

avg = FOREACH student_details GENERATE stud#’student_avg’);

For maps this is # (the hash), followed by the name of the key as a string. Here, ‘student_avg’ is the name of the key and ‘stud’ is the name of the column/field.

Pig FILTER Operator

A filter operator allows you to select required tuples based on the predicate clause. Let us consider the same emp file. Our requirement is to filter the department number (dno) =10 data.

grunt> filter_data = FILTER emp BY dno == 10;

If you will dump the “filter_data” relation, then the output on your screen as below:

mak,101,5000.0,500.0,10

puru,103,6500.0,700.0,10

We can use multiple filters combined together using the Boolean operators “and” and “or”. Pig also uses the regular expression to match the values present in the file. For example: If we want all the records whose ename starts with ‘ma’ then we can use the expression as:

grunt> filter_ma= FILTER emp by ename matches ‘ma.*’;

Since, the filter passes only those values which are ‘true’. It evaluates on the basis of ‘true’ or ‘false’.

It is important to note that if say z==null then the result would be null only which is neither true nor false.

Let us suppose we have values as 1, 8 and null. If the filter is x==8 then the return value will be 8. If the filter is x!=8 then the return value will be 1.

We can see that null is not considered in either case. Therefore, to play around with null values we either use ‘is null’ or ‘is not null’ operator.

Pig GROUP Operator

Pig group operator fundamentally works differently from what we use in SQL.

This basically collects records together in one bag with same key values. In SQL, group by clause creates the group of values which is fed into one or more aggregate function while as in Pig Latin, it just groups all the records together and put it into one bag.

Hence, in Pig Latin there is no direct connection with group and aggregate function.

grunt> emp_details = LOAD ’emp’ USING PigStorage(‘,’) as (ename: chararray, eno: int,sal:float,bonus:float,dno:int);

grunt> grpd = GROUP emp_details BY dno;

grunt> cnt = FOREACH grpd GENERATE group,COUNT(emp_details);

Pig ORDER BY Operator

Pig Order By operator is used to display the result of a relation in sorted order based on one or more fields. For Example:

grunt> Order_by_ename = ORDER emp_details BY ename ASC;

Pig DISTINCT Operator

This is used to remove duplicate records from the file. It doesn’t work on the individual field rather it work on entire records.

grunt> unique_records = distinct emp_details;

Pig LIMIT Operator

Limit allows you to limit the number of records you wanted to display from a file.

grunt> emp_details = LOAD ‘emp’;

grunt> first50 = limit emp_details BY 50;

Pig SAMPLE Operator

Sample operator allows you to get the sample of data from your whole data-set i.e it returns the percentage of rows. It takes the value between 0 and 1. If it is 0.2, then it indicates 20% of the data.

grunt> emp_details = LOAD ‘emp’;

grunt> sample20 = SAMPLE emp_details BY 0.2;

Pig PARALLEL

Pig Parallel command is used for parallel data processing. It is used to set the number of reducers at the operator level.

We can include the PARALLEL clause wherever we have a reducer phase such as DISTINCT, JOIN, GROUP, COGROUP, ORDER BY etc.

For Example: SET DEFAULT_PARALLEL 10; 

Meaning is that all MapReduce jobs that get launched will have 10 parallel reducers running at a time.

It is important to note that parallel only sets the reducer parallelism while as the mapper parallelism is controlled by the MapReduce engine itself.

Pig FLATTEN Operator

Pig Flatten removes the level of nesting for the tuples as well as a bag. For Example: We have a tuple in the form of (1, (2,3)).

GENERATE expression $0 and flatten($1), will transform the tuple as (1,2,3). 

When we un-nest a bag using flatten operator, then it creates tuples. For Example: we have bag as (1,{(2,3),(4,5)}).

GENERATE $0, flatten($1), then we create a tuple as (1,2,3), (1,4,5)

Pig COGROUP Operator

Pig COGROUP operator works same as GROUP operator. The only difference between both is that GROUP operator works with single relation and COGROUP operator is used when we have more than one relation.

Let us suppose we have below two relations with their data sets:

student_details.txt 

101,Kum May,29,9010101010,Bangalore102,

Abh Nig,24,9020202020,Delhi103,

Sum Nig,24,9030303030,Delhi

employee_details.txt 

101,Nancy,22,London102,

Martin,24,Newyork103,

Romi,23,Tokyo

Now, let us try to group the student_details.txt and employee_details.txt records.

grunt> cogroup_final = COGROUP employee_details by age, student_details by age; 

Output as below:

(22, {(101, Nancy, 22, London)}, {})

(23, {(103, Romy, 23, Tokyo)},{})

(24, {(102, Martin, 24, Newyork)}, {(102, Abh Nig, 24, 9020202020, Delhi), (103, Sum Nig, 24, 9030303030, Delhi)})

(29, {}, {(101, Kum May, 29, 9010101010, Bangalore)})

Pig SPLIT Operator

Pig Split operator is used to split a single relation into more than one relation depending upon the condition you will provide.

Let us suppose we have emp_details as one relation. We have to split the relation based on department number (dno). Sample data of emp_details as below:

mak,101,5000.0,500.0,10

ronning,102,6000.0,300.0,20

puru,103,6500.0,700.0,10

jetha,103,6500.0,700.0,30

grunt> SPLIT emp_details into emp_details1 IF dno=10, emp_details2 if (dno=20 OR dno=30);

grunt> DUMP emp_details1;

mak,101,5000.0,500.0,10

puru,103,6500.0,700.0,10

grunt> DUMP emp_details2;

ronning,102,6000.0,300.0,20

jetha,103,6500.0,700.0,30