Tuesday, 1 August 2017

Hive Basics

Hive CLI

  • Support for SQL to perform ad-hoc queries
  • Support for map reduce, custom mappers and reducer support with UDF (user defined function in Hive)
  • Limitation – Support for single user at a time
  • No Authentication support provided
  • Hive Cl is simple to use and widely used interface, still in production use


HiveServer1
  • Hive server is a server client model service
  • Allow users to connect using Hive CLI interface and using thrift client.
  • Support for remote client connection but only one client can connect at a time.
  • No session management support.
  • Because of thrift no concurrency control due to thrift API.



HiveServer2

  • Hive server 2 is also a client and server model.
  • It allows to connect many different clients like thrift
  • HiveServer2 gives multi client support where many clients can connect at the same time
  • Authentication is much better using Kerberos.
  • Support for JDBC and ODBC driver connection.
  • Beeline cli is used for connecting to HiveServer2





Beeline CLI
  • Beeline is a command line interface for HiveServer2
  • This is based on SQL Line CLI.
  • It gives better support for JDBC/ODBC.
  • This is not compatible with old HiveServer1
  • To configure beeline over HiveServer2 you need some extra configuration


Hive Mode : 

Embedded mode – In this mode beeline cli starts embedded hive which is similar like regular Hive CLI ( Remember – Hive CLI and HiveServer1 is two different things )

Remote mode – In remote mode you are connecting using thrift API to another HiveServer2 process running on separate machine/server.




Hive Auto Map Joins : 

Auto Map-Join is a very useful feature when joining a big table with a small table. if we enable this feature, the small table will be saved in the local cache on each node, and then joined with the big table in the Map phase. Enabling Auto Map Join provides two advantages. First, loading a small table into cache will save read time on each data node. Second, it avoids skew joins in the Hive query, since the join operation has been already done in the Map phase for each block of data.



Hive Skew Joins

We can enable optimization of skew joins, i.e. imbalanced joins by setting hive.optimize.skewjoin property to true either via SET command in hive shell or hive-site.xml file. Below are the list of properties that can be fine tuned to better optimize the skew joins.

Example : 

A join of 2 large data tables is done by a set of MapReduce jobs which first sorts the tables based on the join key and then joins them. The Mapper gives all rows with a particular key to the same Reducer.
e.g., Suppose we have table A with a key column, "id" which has values 1, 2, 3 and 4, and table B with a similar column, which has values 1, 2 and 3.
We want to do a join corresponding to the following query
select A.id from A join B on A.id = B.id
A set of Mappers read the tables and gives them to Reducers based on the keys. e.g., rows with key 1 go to Reducer R1, rows with key 2 go to Reducer R2 and so on. These Reducers do a cross product of the values from A and B, and write the output. The Reducer R4 gets rows from A, but will not produce any results.
Now let's assume that A was highly skewed in favor of id = 1. Reducers R2 and R3 will complete quickly but R1 will continue for a long time, thus becoming the bottleneck. If the user has information about the skew, the bottleneck can be avoided manually as follows:
Do two separate queries
select A.id from A join B on A.id = B.id where A.id <> 1;
select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
The first query will not have any skew, so all the Reducers will finish at roughly the same time. If we assume that B has only few rows with B.id = 1, then it will fit into memory. So the join can be done efficiently by storing the B values in an in-memory hash table. This way, the join can be done by the Mapper itself and the data do not have to go to a Reducer. The partial results of the two queries can then be merged to get the final results.
Advantages
If a small number of skewed keys make up for a significant percentage of the data, they will not become bottlenecks.
Disadvantages
The tables A and B have to be read and processed twice.
Because of the partial results, the results also have to be read and written twice.
The user needs to be aware of the skew in the data and manually do the above process.
We can improve this further by trying to reduce the processing of skewed keys. First read B and store the rows with key 1 in an in-memory hash table. Now run a set of mappers to read A and do the following:
If it has key 1, then use the hashed version of B to compute the result.
For all other keys, send it to a reducer which does the join. This reducer will get rows of B also from a mapper.
This way, we end up reading only B twice. The skewed keys in A are only read and processed by the Mapper, and not sent to the reducer. The rest of the keys in A go through only a single Map/Reduce.
The assumption is that B has few rows with keys which are skewed in A. So these rows can be loaded into the memory.


Hive  Vectorization

Vectorization feature is introduced into hive for the first time in hive-0.13.1 release only. By vectorized query execution, we can improve performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

Vectorized query execution is a Hive feature that greatly reduces the CPU usage for typical query operations like scans, filters, aggregates, and joins. A standard query execution system processes one row at a time. This involves long code paths and significant metadata interpretation in the inner loop of execution. Vectorized query execution streamlines operations by processing a block of 1024 rows at a time. Within the block, each column is stored as a vector (an array of a primitive data type). Simple operations like arithmetic and comparisons are done by quickly iterating through the vectors in a tight loop, with no or very few function calls or conditional branches inside the loop. These loops compile in a streamlined way that uses relatively few instructions and finishes each instruction in fewer clock cycles, on average, by effectively using the processor pipeline and cache memory.

Use vectorized query execution, you must store your data in ORC format, and set the following variable as shown in Hive SQL (see Configuring Hive):
set hive.vectorized.execution.enabled = true;

Vectorized execution is off by default, so your queries only utilize it if this variable is turned on. To disable vectorized execution and go back to standard execution, do the following:
set hive.vectorized.execution.enabled = false;








Apache Pig


What is difference between pig and sql?
Answer: Pig latin is procedural version of SQl.pig has certainly similarities,more difference from sql.sql is a query language for user asking question in query form.sql makes answer for given but dont tell how to answer the given question.suppose ,if user want to do multiple operations on tables,we have write multiple queries and also use temporary table for storing,sql is support for subqueries but intermediate we have to use temporary tables,SQL users find subqueries confusing and difficult to form properly.using sub-queries creates an inside-out design where the first step in the data pipeline is the innermost query .pig is designed with a long series of data operations in mind, so there is no need to write the data pipeline in an inverted set of subqueries or to worry about storing data in temporary tables.


Explain different execution modes available in Pig.
Three different execution modes available in Pig they are,
  1. Interactive mode or Grunt mode.
  2. Batch mode or Script mode.
  3. Embedded mode
    Interactive mode or grunt mode: Pig’s interactive shell is known as grunt shell. If no file is specified to run in Pig it will start.
  4. grunt> run scriptfile.pig
grunt> exec scriptfile.pig
Batch mode or Script mode : Pig executes the specified commands in the script file.
Embedded mode : We can embed Pig programs in Java and we can run the programs from Java.



Differentiate between the physical plan and logical plan in Pig script.
Both plans are created while to execute the pig script.
Physical plan : It is a series of MapReduce jobs while creating the physical plan.It’s divided into three physical operators such as Local Rearrange, Global Rearrange, and package. It illustrates the physical operators Pig will use to execute the script without referring to how they will execute in MapReduce Loading and storing functions are resolved in physical plan.
Logical plan : The Logical plan is a plan which is created for each line in the Pig scripts. It is produced after semantic checking and basic parsing. With every line, the logical plan for that particular program becomes extended and larger because each and every statement has its own logical plan.Loading and storing function are not resolved in logical plan.



 What are the advantages of pig language?
Answer: The pig is easy to learn: Pig is easy to learn, it overcomes the need for writing complex MapReduce programs to some extent. Pig works in a step by step manner. So it is easy to write, and even better, it is easy to read.
It can handle heterogeneous data: Pig can handle all types of data – structured, semi-structured, or unstructured.
Pig is Faster: Pig’s multi-query approach combines certain types of operations together in a single pipeline, reducing the number of times data is scanned.
Pig does more with less: Pig provides the common data operations (filters, joins, ordering, etc.) And nested data types (e.g. Tuples, bags, and maps) which can be used in processing data.
Pig is Extensible: Pig is easily extensible by UDFs – including Python, Java, JavaScript, and Ruby so you can use them to load, aggregate and analysis. Pig insulates your code from changes to the Hadoop Java API.



Why do we need Pig?
Answer: Pig is a high level scripting language that is used with Apache Hadoop. Pig excels at describing data analysis problems as data flows. Pig is complete in that you can do all the required data manipulations in Apache Hadoop with Pig. In addition through the User Defined Functions(UDF) facility in Pig you can have Pig invoke code in many languages like JRuby, Jython and Java. Conversely you can execute Pig scripts in other languages. The result is that you can use Pig as a component to build larger and more complex applications that tackle real business problems.


What Is Difference Between Map reduce and Pig ?
Answer:
  • In MR Need to write entire logic for operations like join,group,filter,sum etc ..
  • In Pig Built in functions are available
  • In MR Number of lines of code required is too much even for a simple functionality
  • In Pig 10 lines of pig Latin equal to 200 lines of java
  • In MR Time of effort in coding is high
  • In Pig What took 4hrs to write in java took 15 mins in pig Latin (approx)
  • In MR Less productivity
  • In PIG High Productivity


Explain the uses of Map Reduce in Pig.
  • Apache Pig programs are written in Pig Latin query language which is similar to the SQL query language. To execute this queries, there requires an execution engine. The Pig engine enables to convert the queries into MapReduce jobs and thus MapReduce acts as the execution engine and is designed to run the programs as per the requirements.
  • Pigs’ operators are using Hadoops’ API depending upon the configurations the job is executed in local mode or Hadoop cluster. Pig is never passes any outputs to Hadoop instead set the inputs and data locations for map-reduce.
  • Pig Latin provides a set of standard Data-processing operations, such as join, filter, group by, order by, union, etc which are mapped to do the map-reduce tasks. A Pig Latin script describes a (DAG) directed acyclic graph, where the edges are data flows and the nodes are operators that process the data.


State the usage of ‘filters’, ‘group’ , ‘orderBy’, ‘distinct’ keywords in pig scripts.
Filters : Filters has the similar functionality as where clause in SQL. Filters contain predicate and if it evaluates true for a given record, then that record will be passed down the pipeline. Otherwise, it will not predicate the results and thus contains different operators like ==,>=, <=,!=.so,== and != which is been applied in creating maps and tuples.
A= load ‘inputs’ as (name,address)
B=filter A by symbol matches ‘CM.*';
GroupBy : The group statement collects various records with the same key. In SQL database GroupBy creates a group which feeds directly to one or more aggregate functions. But in Pig Latin has no direct connection between group and aggregate functions.
Input 2 = load ‘daily’ as(exchanges,stocks);
grpds = group input2 by stocks;
Order : The Order statement sorts the data producing a total order of output data. The Order syntax is similar to Group. Give a key or set of keys to order your data as per requirement. The following are the examples for the same:
Input 2 = load ‘daily’ as(exchanges,stocks);
grpds = order input2 by exchanges;
Distinct : The distinct statement is very simple to understand and implement. It removes duplicate records and the original data will be secured. It is implemented only on entire records, not on individual fields. Consider the below examples which explains the same:
Input 2 = load ‘daily’ as(exchanges,stocks);
grpds = distinct exchanges;






What is difference between GROUP and COGROUP?
The GROUP and COGROUP operators are identical. Both operators work with one or more relations. For readability GROUP is used in statements involving one relation and COGROUP is used in statements involving two or more relations. We can COGROUP up to
but no more than 127 relations at a time.

COGROUP Two Tables
Where COGROUP gets fancy is that you can COGROUP on two tables at once. Pig will group the two tables and then join the two tables on the grouped column. For example, assume we also had a data set of pet names:

Given this table, we could compare for example all the people with a given animal to all the names of that animal. The COGROUP command is:
owners = LOAD 'owners.csv'
    USING PigStorage(',')
    AS (owner:chararray,animal:chararray);

pets = LOAD 'pets.csv'
    USING PigStorage(',')
    AS (name:chararray,animal:chararray);

grouped = COGROUP owners BY animal, pets by animal;
DUMP grouped;
This will group each table based on the animal column. For each animal, it will create a bag of matching rows from both tables. For this example, we get:
group
owners
pets
cat
{(adam,cat),(alice,cat)}
{(paws,cat),(wiskers,cat)}
dog
{(adam,dog),(steve,dog)}
{(fido,dog),(rex,dog)}
fish
{(alex,fish)}
{(nemo,fish)}
In summary, you can use COGROUP when you need to group two tables by a column and then join on the grouped column.




 What is bag?
Answer: A bag is one of the data models present in Pig. It is an unordered collection of tuples with possible duplicates. Bags are used to store collections while grouping. The size of bag is the size of the local disk, this means that the size of the bag is limited. When the bag is full, then Pig will spill this bag into local disk and keep only some parts of the bag in memory. There is no necessity that the complete bag should fit into memory. We represent bags with “{}”.



What is UDF in Pig?
Answer: The pig has wide-ranging inbuilt functions, but occasionally we need to write complex business logic, which may not be implemented using primitive functions. Thus, Pig provides support to allow writing User Defined Functions (UDFs) as a way to stipulate custom processing.
Pig UDFs can presently be implemented in Java, Python, JavaScript, Ruby and Groovy. The most far-reaching support is provided for Java functions. You can customize all parts of the processing, including data load/store, column transformation, and aggregation. Java functions are also additional efficient because they are implemented in the same language as Pig and because additional interfaces are supported. Such as the Algebraic Interface and the Accumulator Interface. Limited support is provided for Python, JavaScript, Ruby and Groovy functions.


What is the difference between store and dumps commands?
Answer: Dump Command after process the data displayed on the terminal, but it’s not stored anywhere. Whereas store store in local file system or HDFS and output execute in a folder. In the protection environment most often Hadoop developer used ‘store’ command to store data in the HDFS.

Tuesday, 18 July 2017

Hadoop Common Things


Number of Inputsplits in Mapreduce : 


An inputsplit is a chunk of the input data allocated to a map task for processing.  FileInputFormat
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the
file spans more than a HDFS block at which point it factors in the configured values of minimum split
size, maximimum split size and block size in determining the split size.

Here's the formula, from Hadoop the definitive guide-

Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize))

So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an
HDFS block.



Problem with mapreduce processing of small files :


We all know that Hadoop works best with large files;  But the reality is that we still have to deal
with small files.  When you want to process many small files in a mapreduce job, by default, each file
is processed by a map task (So, 1000 small files = 1000 map tasks).  Having too many tasks that
finish in a matter of seconds is inefficient. 

Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is
not the right solution as it will be at the potential cost of locality.

Solution

CombineFileInputFormat packs many files into a split, providing more data for a map task to process.
It factors in node and rack locality so performance is not compromised.


Layers of Big Data

Six Layers of Big Data:



1: First, the Data Ingestion layer takes in all kinds of data including: structured data; unstructured data; data from devices, sensors, logs, click streams, and applications; and data from both cloud and on premises sources.


2: Next, the Processing and Persistence layer is performed by cloud based systems such as Hadoop and Spark.

3: The Orchestration layer handles transformation and cleansing.

4: The Data Discovery layer is the critical next step, because it solves the silo problem, and it does that using a mixture of data modeling, data preparation, data curation, and data virtualization. Data virtualization creates a combined, virtual view of the data across two or more silos, which can be accessed by consumers in real-time as if the disparate silos were part of the same dataset.


5: The Data Management and Intelligence layer provides security and governance across the other five layers.

6: Finally, the Data Access layer delivers the data directly to analysts or to applications, tools, and dashboards.

Tuesday, 4 July 2017

Hive Conditional function

Hive Conditional function  

Hive supports three types of conditional functions. These functions are listed below:


IF( Test Condition, True Value, False Value ) 

The IF condition evaluates the “Test Condition” and if the “Test Condition” is true, then it returns the “True Value”. Otherwise, it returns the False Value.

Example: IF(1=1, 'working', 'not working') returns 'working'


COALESCE( value1,value2,... )

The COALESCE function returns the fist not NULL value from the list of values. If all the values in the list are NULL, then it returns NULL.

Example: COALESCE(NULL,NULL,5,NULL,4) returns 5

CASE Statement

The syntax for the case statement is:

All the conditions must be of same datatype. Conditions are evaluated in the order listed. Once a condition is found to be true, the case statement will return the result and not evaluate the conditions any further

CASE
       WHEN Fruit = 'APPLE' THEN 'The owner is APPLE'
       WHEN Fruit = 'ORANGE' THEN 'The owner is ORANGE'
       ELSE 'It is another Fruit'
END

Hive Functions

Hive is mostly used Structured Data Processing.

Below are some general function used in Day to day work.

LENGTH( string str )

The LENGTH function returns the number of characters in a string
Example: LENGTH('Bharat') returns 5
Query: Select length(name) from table1;

LTRIM( string str )

The LTRIM function removes all the trailing spaces from the string.
LTRIM('   Bharat') returns 'Bharat'

RTRIM( string str )

The RTRIM function removes all the ending spaces from the string.
LTRIM('Bharat    ') returns 'Bharat'

TRIM( string str )

The TRIM function removes all the starting/ending spaces from the string.
LTRIM('       Bharat    ') returns 'Bharat'


SPLIT( string str, string pat ) 

The SPLIT function splits the string around the pattern pat and returns an array of strings. You can specify regular expressions as patterns.
SPLIT('Bharat:Manoj:Amol',':') returns ["Bharat","Manoj",""Amol]

SUBSTR( string source_str, int start_position [,int length]  )

The SUBSTR function returns a part of the source string from the start position with the specified length of characters. If the length is not given, then it returns from the start position to the end of the string.

SUBSTR('Bharatkumar',7) returns 'kumar'
SUBSTR('Bharatkumar',6,5) returns 'tkuma'
Note : String start with position 1, First Example string start with 7 postion to end. second example substring of postion 6 to 5 character.


regexp_extract(string, pattern) 

Returns the first substring matched by the regular expression pattern in string:
SELECT regexp_extract('1a 2b 14m', '\d+'); -- 1


regexp_replace(string, pattern)


Removes every instance of the substring matched by the regular expression pattern from string:
SELECT regexp_replace('1a 2b 14m', '\d+[ab] '); -- '14m'

ROUND( double value [, int n] )

The ROUND function returns the value rounded to n integer places.
Example: ROUND(123.456,2) Retrun 123.46

UNIX_TIMESTAMP( string date, string pattern )

This function converts the date to the specified date format and returns the number of seconds between the specified date and Unix epoch. If it fails, then it returns 0.
UNIX_TIMESTAMP('2000-01-01 10:20:30','yyyy-MM-dd') returns 946713600


FROM_UNIXTIME( bigint number_of_seconds  [, string format] )

The FROM_UNIX function converts the specified number of seconds from Unix epoch and returns the date in the format 'yyyy-MM-dd HH:mm:ss'.
FROM_UNIXTIME( UNIX_TIMESTAMP() ) returns the current date including the time. This is equivalent to the SYSDATE in oracle.


TO_DATE( string timestamp )

The TO_DATE function returns the date part of the timestamp in the format 'yyyy-MM-dd'.
Example: TO_DATE('2000-01-01 10:20:30') returns '2000-01-01'


DATEDIFF( string date1, string date2 )
The DATEDIFF function returns the number of days between the two given dates.
Example: DATEDIFF('2000-03-01', '2000-01-10')  returns 51


DATE_ADD( string date, int days ) 

The DATE_ADD function adds the number of days to the specified date
Example: DATE_ADD('2000-03-01', 5) returns '2000-03-06'

DATE_SUB( string date, int days )

The DATE_SUB function subtracts the number of days to the specified date
Example: DATE_SUB('2000-03-01', 5) returns ‘2000-02-25’

Tuesday, 3 January 2017

Core Hadoop Components

The Hadoop Ecosystem:

Hadoop Distributed File System (HDFS) 

          The default big data storage layer for Apache Hadoop is HDFS. HDFS is the “Secret Sauce” of Apache Hadoop components as users can dump huge datasets into HDFS and the data will sit there nicely until the user wants to leverage it for analysis. HDFS component creates several replicas of the data block to be distributed across different clusters for reliable and quick data access. HDFS comprises of 3 important components-NameNode, DataNode and Secondary NameNode. HDFS operates on a Master-Slave architecture model where the NameNode acts as the master node for keeping a track of the storage cluster and the DataNode acts as a slave node summing up to the various systems within a Hadoop cluster.

MapReduce

          MapReduce is a Java-based system created by Google where the actual data from the HDFS store gets processed efficiently. MapReduce breaks down a big data processing job into smaller tasks. MapReduce is responsible for the analysing large datasets in parallel before reducing it to find the results. In the Hadoop ecosystem, Hadoop MapReduce is a framework based on YARN architecture. YARN based Hadoop architecture, supports parallel processing of huge data sets and MapReduce provides the framework for easily writing applications on thousands of nodes, considering fault and failure management.
The basic principle of operation behind MapReduce is that the “Map” job sends a query for processing to various nodes in a Hadoop cluster and the “Reduce” job collects all the results to output into a single value. Map Task in the Hadoop ecosystem takes input data and splits into independent chunks and output of this task will be the input for Reduce Task. In The same Hadoop ecosystem Reduce task combines Mapped data tuples into smaller set of tuples. Meanwhile, both input and output of tasks are stored in a file system. MapReduce takes care of scheduling jobs, monitoring jobs and re-executes the failed task.
MapReduce framework forms the compute node while the HDFS file system forms the data node. Typically in the Hadoop ecosystem architecture both data node and compute node are considered to be the same.
  

YARN-

          YARN forms an integral part of Hadoop 2.0.YARN is great enabler for dynamic resource utilization on Hadoop framework as users can run various Hadoop applications without having to bother about increasing workloads.

  
Pig-
     Pig is a convenient tools developed by Yahoo for analysing huge data sets efficiently and easily. It provides a high level data flow language Pig Latin that is optimized, extensible and easy to use. The most outstanding feature of Pig programs is that their structure is open to considerable parallelization making it easy for handling large data sets.

Hive

       Hive developed by Facebook is a data warehouse built on top of Hadoop and provides a simple language known as HiveQL similar to SQL for querying, data summarization and analysis. Hive makes querying faster through indexing.
Sqoop
Sqoop component is used for importing data from external sources into related Hadoop components like HDFS, HBase or Hive. It can also be used for exporting data from Hadoop o other external structured data stores. Sqoop parallelized data transfer, mitigates excessive loads, allows data imports, efficient data analysis and copies data quickly.

Flume-

Flume component is used to gather and aggregate large amounts of data. Apache Flume is used for collecting data from its origin and sending it back to the resting location (HDFS).Flume accomplishes this by outlining data flows that consist of 3 primary structures channels, sources and sinks. The processes that run the dataflow with flume are known as agents and the bits of data that flow via flume are known as events.

HBase –

HBase is a column-oriented database that uses HDFS for underlying storage of data. HBase supports random reads and also batch computations using MapReduce. With HBase NoSQL database enterprise can create large tables with millions of rows and columns on hardware machine. The best practice to use HBase is when there is a requirement for random ‘read or write’ access to big datasets.

Oozie-

Oozie is a workflow scheduler where the workflows are expressed as Directed Acyclic Graphs. Oozie runs in a Java servlet container Tomcat and makes use of a database to store all the running workflow instances, their states ad variables along with the workflow definitions to manage Hadoop jobs (MapReduce, Sqoop, Pig and Hive).The workflows in Oozie are executed based on data and time dependencies.

Zookeeper-

Zookeeper is the king of coordination and provides simple, fast, reliable and ordered operational services for a Hadoop cluster. Zookeeper is responsible for synchronization service, distributed configuration service and for providing a naming registry for distributed systems.

MAHOUT -
Mahout is an important Hadoop component for machine learning, this provides implementation of various machine learning algorithms. This Hadoop component helps with considering user behavior in providing suggestions, categorizing the items to its respective group, classifying items based on the categorization and supporting in implementation group mining or itemset mining, to determine items which appear in group

Kafka-

A distributed public-subscribe message  developed by LinkedIn that is fast, durable and scalable.Just like other Public-Subscribe messaging systems ,feeds of messages are maintained in topics

Monday, 2 January 2017

User-Defined Functions (UDF)

User-Defined Functions (UDFs)


User-defined functions (frequently abbreviated as UDFs) let you code your own application logic for processing column values during an Impala query. For example, a UDF could perform calculations using an external math library, combine several column values into one, do geospatial calculations, or other kinds of tests and transformations that are outside the scope of the built-in SQL operators and functions.

There are two different interfaces you can use for writing UDFs for Apache Hive. One is really simple, the other… not so much.
The simple API (org.apache.hadoop.hive.ql.exec.UDF) can be used so long as your function reads and returns primitive types. By this I mean basic Hadoop & Hive writable types - Text, IntWritable, LongWritable, DoubleWritable, etc.
However, if you plan on writing a UDF that can manipulate embedded data structures, such as MapList, and Set, then you’re stuck using org.apache.hadoop.hive.ql.udf.generic.GenericUDF, which is a little more involved.

UDF versus UDAF
In Hive, you can define two main kinds of custom functions:

UDF

A UDF processes one or several columns of one row and outputs one value. For example :
o    SELECT lower(str) from table
For each row in "table," the "lower" UDF takes one argument, the value of "str", and outputs one value, the lowercase representation of "str".
o    SELECT datediff(date_begin, date_end) from table
For each row in "table," the "datediff" UDF takes two arguments, the value of "date_begin" and "date_end", and outputs one value, the difference in time between these two dates.
Each argument of a UDF can be:
o    A column of the table
o    A constant value
o    The result of another UDF
o    The result of an arithmetic computation
TODO : Example
UDAF
An UDAF processes one or several columns of several input rows and outputs one value. It is commonly used together with the GROUP operator. For example:
o    SELECT sum(price) from table GROUP by customer;
The Hive Query executor will group rows by customer, and for each group, call the UDAF with all price values. The UDAF then outputs one value for the output record (one output record per customer);
o    SELECT total_customer_value(quantity, unit_price, day) from table group by customer;
For each record of each group, the UDAF will receive the three values of the three selected column, and output one value of the output record.
Simple UDFs
In Hive, you can write both UDF and UDAF in two ways: "simple" and "generic".
"Simple", especially for UDF are truly simple to write. It can be as easy as:
    /** A simple UDF to convert Celcius to Fahrenheit */
    public class ConvertToCelcius extends UDF {
    public double evaluate(double value) {
    return (value - 32) / 1.8;
  }
}
Once compiled, you can invoke an UDF like that:
  hive> addjar my-udf.jar
  hive> create temporary function fahrenheit_to_celcius using "com.mycompany.hive.udf.ConvertToCelcius";
  hive> SELECT fahrenheit_to_celcius(temp_fahrenheit) from temperature_data;
Simple UDF can also handle multiple types by writing several versions of the "evaluate" method.

  /** A simple UDF to get the absolute value of a number */
  public class AbsValue extends UDF {
  public double evaluate(double value) {
  return Math.abs(value);
}

public long evaluate(long value) {
return Math.abs(value);
}

public int evalute(int value) {
return Math.abs(value);
}
}

In short, to write a simple UDF:
o    Extend the org.apache.hadoop.hive.ql.exec.UDF class
o    Write an "evaluate" method that has a signature equivalent to the signature of your UDF in HiveQL.
Types
Simple UDF can accept a large variety of types to represent the column types. Notably, it accepts both Java primitive types and Hadoop IO types
Hive column type
UDF types
string
java.lang.String, org.apache.hadoop.io.Text
Int            
int, java.lang.Integer, org.apache.hadoop.io.IntWritable
boolean
bool, java.lang.Boolean, org.apache.hadoop.io.BooleanWritable
array<type>
java.util.List<Java type>
map<ktype, vtype>
java.util.Map<Java type for K, Java type for V>
struct
Don't use Simple UDF, use GenericUDF

Simple versus Generic
While Simple UDF and UDAF are .. simple, they do have some limitations and shortcomings. The main limitation is related to handling of complex types. One of Hive's main feature is its advanced handling of advanced types:
o    Arrays of typed objects
o    Maps of typed keys and values
o    Structs of typed named fields
The system of simple UDFs is based on reflection and method overloading, which cannot accept everything.
For example, if you wanted to write an "array_sum" UDF, that would return the sum of elements in an array, you would write
public class ArraySum extends UDF {
  public double evaluate(List<Double> value) {
    double sum = 0;
    for (int i = 0; i < value.size(); i++) {
      if (value.get(i) != null) {
        sum += value.get(i);
      }
    }
    return sum;
  }
}
view rawArraySum.java hosted with by GitHub
But what happens if you would also handle arrays of integers ? You cannot overload with
public int evaluate(List<Integer> value)
because this is not valid in Java : the types of a method cannot differ by only the generic type. For more details, see the Generics article on Wikipedia
For this UDF to work, we need to use a Generic UDF.
The following table summarizes the main differences between Simple UDF / UDAF and Generic UDF / UDAF

Simple
Generic
Reduced performance due to use of reflection: each call of the evaluate method is reflective. Furthermore, all arguments are evaluated and parsed.
Optimal performance: no reflective call, and arguments are parsed lazily
Limited handling of complex types. Arrays are handled but suffer from type erasure limitations
All complex parameters are supported (even nested ones like array<array>
Variable number of arguments are not supported
Variable number of arguments are supported
Very easy to write
Not very difficult, but not well documented


Generic UDF
A generic UDF is written by extending the GenericUDF class.
    public interface GenericUDF {
    public Object evaluate(DeferredObject[] args) throws HiveException;
    public String getDisplayString(String[] args);
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
  }
A key concept when working with Generic UDF and UDAF is the ObjectInspector.
In generic UDFs, all objects are passed around using the Object type. Hive is structured this way so that all code handling records and cells is generic, and to avoid the costs of instantiating and deserializing objects when it's not needed.
Therefefore, all interaction with the data passed in to UDFs is done via ObjectInspectors. They allow you to read values from an UDF parameter, and to write output values.
Object Inspectors belong to one of the following categories:
o    Primitive, for primitive types (all numerical types, string, boolean, …)
o    List, for Hive arrays
o    Map, for Hive maps
o    Struct, for Hive structs
When Hive analyses the query, it computes the actual types of the parameters passed in to the UDF, and calls
public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
The method receives one object inspector for each of the arguments of the query, and must return an object inspector for the return type. Hive then uses the returned ObjectInspector to know what the UDF returns and to continue analyzing the query.
After that, rows are passed in to the UDF, which must use the ObjectInspectors it received in initialize() to read the deferred objects. UDFs generally stores the ObjectInspectors received and created in initialize() in member variables.
First, here is a very minimal sample of an UDF that takes an integer, and returns it multiplied by two. We could have easily implemented this sample as a simple UDF. This is really a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.io.IntWritable;

  public class UDFMultiplyByTwo extends GenericUDF {
  PrimitiveObjectInspector inputOI;
  PrimitiveObjectInspector outputOI;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  // This UDF accepts one argument
  assert (args.length == 1);
  // The first argument is a primitive type
  assert(args[0].getCategory() == Category.PRIMITIVE);

  inputOI  = (PrimitiveObjectInspector)args[0];
  /* We only support INTEGER type */
  assert(inputOI.getPrimitiveCategory() == PrimitiveCategory.INT);

  /* And we'll return a type int, so let's return the corresponding object inspector */
  outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;

  return outputOI;
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int value = (Integer) inputOI.getPrimitiveJavaObject(oin);

int output = value * 2;
return new IntWritable(output);
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
Here is another minimal sample of an UDF that takes an array as only parameter, and returns the first element of this array. We could not have done that with a simple UDF, because of type erasure. This is still a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;

  public class UDFArrayFirst extends GenericUDF {
  ListObjectInspector listInputObjectInspector;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  assert (args.length == 1); // This UDF accepts one argument
  // The first argument is a list
  assert(args[0].getCategory() == Category.LIST);

  listInputObjectInspector = (ListObjectInspector)args[0];

  /* Here comes the real usage for Object Inspectors : we know that our
  * return type is equal to the type of the elements of the input array.
  * We don't need to know in details what this type is, the
  * ListObjectInspector already has it */
  return listInputObjectInspector.getListElementObjectInspector();
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int nbElements = listInputObjectInspector.getListLength(oin);
if (nbElements > 0) {
// The list is not empty, return its head
return  listInputObjectInspector.getListElement(oin, 0);
} else {
return null;
}
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
See? It's not much more complex than a simple UDF, but much more powerful.
Some traps of generic UDF
Everything in the Generic UDF stack is processed through Object, so you will definitely have a hard time grasping the correct object types. Almost no type checking can be done at compile time, you will have to do it all at Runtime.
It is important to understand that the Object returned by the evaluate method must be a Writable object. For example, in the "multiply by two" example, we did not return Integer, but IntWritable. Failure to do so will result in cast exceptions.
Debugging generic UDFs is not trivial. You will often need to peek at the execution logs.
o    When running Hive in full map-reduce mode, use the task logs from your Jobtracker interface
o    When running Hive in local mode (which I recommend for development purposes), look for the following lines in the Hive output
o          Total MapReduce jobs = 1
o          Launching Job 1 out of 1
o          Number of reduce tasks is set to 0 since there's no reduce operator
o          Execution log at: /tmp/clement/clement_20130219103434_08913263-5a10-496f-8ddd-408b9c2ff0af.log
o          Job running in-process (local Hadoop)
o          Hadoop job information for null: number of mappers: 0; number of reducers: 0
   
Here, Hive tells you where the logs for this query will be stored. If the query fails, you'll have the full stack there.
More on Writable versus Primitive
ObjectInspectors, especially primitive ones, exist in two versions: Writable versions and Primitive versions. Some tools like ObjectInspectorUtils.compare are very sensitive to this distinction. Using the wrong kind of ObjectInspector leads to cast exceptions.
To switch between modes, use:
  ObjectInspector before;
  after = ObjectInspectorUtils.getStandardObjectInspector(before, ObjectInspectorCopyOption.WRITABLE)
A note about threading
Hive creates one instance of your UDF per mapper, so you may store some data as instance variables safely : each instance of your UDF will only be used in a single thread. However, multiple instances of the UDF can be running concurrently in the same process.
UDAF
Writing a UDAF is slightly more complex, even in the "Simple" variation, and requires understanding how Hive performs aggregations, especially with the GROUP BY operator.
In Hive, the computation of an aggregation must be divisible over the data :
o    given any subset of the input rows, the UDAF must be able to compute a partial result (and actually, you won't receive all rows at once, but one after another, and must be able to keep a state)
o    given a pair of partial results, the UDAF must be able to merge them in another partial result
o    given a partial result, the UDAF must be able to compute the final result (a single column value)
Furthermore, you must be able to serialize the partial result to an Object that MapReduce can read and write.

o