Tuesday, 3 January 2017

Core Hadoop Components

The Hadoop Ecosystem:

Hadoop Distributed File System (HDFS) 

          The default big data storage layer for Apache Hadoop is HDFS. HDFS is the “Secret Sauce” of Apache Hadoop components as users can dump huge datasets into HDFS and the data will sit there nicely until the user wants to leverage it for analysis. HDFS component creates several replicas of the data block to be distributed across different clusters for reliable and quick data access. HDFS comprises of 3 important components-NameNode, DataNode and Secondary NameNode. HDFS operates on a Master-Slave architecture model where the NameNode acts as the master node for keeping a track of the storage cluster and the DataNode acts as a slave node summing up to the various systems within a Hadoop cluster.

MapReduce

          MapReduce is a Java-based system created by Google where the actual data from the HDFS store gets processed efficiently. MapReduce breaks down a big data processing job into smaller tasks. MapReduce is responsible for the analysing large datasets in parallel before reducing it to find the results. In the Hadoop ecosystem, Hadoop MapReduce is a framework based on YARN architecture. YARN based Hadoop architecture, supports parallel processing of huge data sets and MapReduce provides the framework for easily writing applications on thousands of nodes, considering fault and failure management.
The basic principle of operation behind MapReduce is that the “Map” job sends a query for processing to various nodes in a Hadoop cluster and the “Reduce” job collects all the results to output into a single value. Map Task in the Hadoop ecosystem takes input data and splits into independent chunks and output of this task will be the input for Reduce Task. In The same Hadoop ecosystem Reduce task combines Mapped data tuples into smaller set of tuples. Meanwhile, both input and output of tasks are stored in a file system. MapReduce takes care of scheduling jobs, monitoring jobs and re-executes the failed task.
MapReduce framework forms the compute node while the HDFS file system forms the data node. Typically in the Hadoop ecosystem architecture both data node and compute node are considered to be the same.
  

YARN-

          YARN forms an integral part of Hadoop 2.0.YARN is great enabler for dynamic resource utilization on Hadoop framework as users can run various Hadoop applications without having to bother about increasing workloads.

  
Pig-
     Pig is a convenient tools developed by Yahoo for analysing huge data sets efficiently and easily. It provides a high level data flow language Pig Latin that is optimized, extensible and easy to use. The most outstanding feature of Pig programs is that their structure is open to considerable parallelization making it easy for handling large data sets.

Hive

       Hive developed by Facebook is a data warehouse built on top of Hadoop and provides a simple language known as HiveQL similar to SQL for querying, data summarization and analysis. Hive makes querying faster through indexing.
Sqoop
Sqoop component is used for importing data from external sources into related Hadoop components like HDFS, HBase or Hive. It can also be used for exporting data from Hadoop o other external structured data stores. Sqoop parallelized data transfer, mitigates excessive loads, allows data imports, efficient data analysis and copies data quickly.

Flume-

Flume component is used to gather and aggregate large amounts of data. Apache Flume is used for collecting data from its origin and sending it back to the resting location (HDFS).Flume accomplishes this by outlining data flows that consist of 3 primary structures channels, sources and sinks. The processes that run the dataflow with flume are known as agents and the bits of data that flow via flume are known as events.

HBase –

HBase is a column-oriented database that uses HDFS for underlying storage of data. HBase supports random reads and also batch computations using MapReduce. With HBase NoSQL database enterprise can create large tables with millions of rows and columns on hardware machine. The best practice to use HBase is when there is a requirement for random ‘read or write’ access to big datasets.

Oozie-

Oozie is a workflow scheduler where the workflows are expressed as Directed Acyclic Graphs. Oozie runs in a Java servlet container Tomcat and makes use of a database to store all the running workflow instances, their states ad variables along with the workflow definitions to manage Hadoop jobs (MapReduce, Sqoop, Pig and Hive).The workflows in Oozie are executed based on data and time dependencies.

Zookeeper-

Zookeeper is the king of coordination and provides simple, fast, reliable and ordered operational services for a Hadoop cluster. Zookeeper is responsible for synchronization service, distributed configuration service and for providing a naming registry for distributed systems.

MAHOUT -
Mahout is an important Hadoop component for machine learning, this provides implementation of various machine learning algorithms. This Hadoop component helps with considering user behavior in providing suggestions, categorizing the items to its respective group, classifying items based on the categorization and supporting in implementation group mining or itemset mining, to determine items which appear in group

Kafka-

A distributed public-subscribe message  developed by LinkedIn that is fast, durable and scalable.Just like other Public-Subscribe messaging systems ,feeds of messages are maintained in topics

Monday, 2 January 2017

User-Defined Functions (UDF)

User-Defined Functions (UDFs)


User-defined functions (frequently abbreviated as UDFs) let you code your own application logic for processing column values during an Impala query. For example, a UDF could perform calculations using an external math library, combine several column values into one, do geospatial calculations, or other kinds of tests and transformations that are outside the scope of the built-in SQL operators and functions.

There are two different interfaces you can use for writing UDFs for Apache Hive. One is really simple, the other… not so much.
The simple API (org.apache.hadoop.hive.ql.exec.UDF) can be used so long as your function reads and returns primitive types. By this I mean basic Hadoop & Hive writable types - Text, IntWritable, LongWritable, DoubleWritable, etc.
However, if you plan on writing a UDF that can manipulate embedded data structures, such as MapList, and Set, then you’re stuck using org.apache.hadoop.hive.ql.udf.generic.GenericUDF, which is a little more involved.

UDF versus UDAF
In Hive, you can define two main kinds of custom functions:

UDF

A UDF processes one or several columns of one row and outputs one value. For example :
o    SELECT lower(str) from table
For each row in "table," the "lower" UDF takes one argument, the value of "str", and outputs one value, the lowercase representation of "str".
o    SELECT datediff(date_begin, date_end) from table
For each row in "table," the "datediff" UDF takes two arguments, the value of "date_begin" and "date_end", and outputs one value, the difference in time between these two dates.
Each argument of a UDF can be:
o    A column of the table
o    A constant value
o    The result of another UDF
o    The result of an arithmetic computation
TODO : Example
UDAF
An UDAF processes one or several columns of several input rows and outputs one value. It is commonly used together with the GROUP operator. For example:
o    SELECT sum(price) from table GROUP by customer;
The Hive Query executor will group rows by customer, and for each group, call the UDAF with all price values. The UDAF then outputs one value for the output record (one output record per customer);
o    SELECT total_customer_value(quantity, unit_price, day) from table group by customer;
For each record of each group, the UDAF will receive the three values of the three selected column, and output one value of the output record.
Simple UDFs
In Hive, you can write both UDF and UDAF in two ways: "simple" and "generic".
"Simple", especially for UDF are truly simple to write. It can be as easy as:
    /** A simple UDF to convert Celcius to Fahrenheit */
    public class ConvertToCelcius extends UDF {
    public double evaluate(double value) {
    return (value - 32) / 1.8;
  }
}
Once compiled, you can invoke an UDF like that:
  hive> addjar my-udf.jar
  hive> create temporary function fahrenheit_to_celcius using "com.mycompany.hive.udf.ConvertToCelcius";
  hive> SELECT fahrenheit_to_celcius(temp_fahrenheit) from temperature_data;
Simple UDF can also handle multiple types by writing several versions of the "evaluate" method.

  /** A simple UDF to get the absolute value of a number */
  public class AbsValue extends UDF {
  public double evaluate(double value) {
  return Math.abs(value);
}

public long evaluate(long value) {
return Math.abs(value);
}

public int evalute(int value) {
return Math.abs(value);
}
}

In short, to write a simple UDF:
o    Extend the org.apache.hadoop.hive.ql.exec.UDF class
o    Write an "evaluate" method that has a signature equivalent to the signature of your UDF in HiveQL.
Types
Simple UDF can accept a large variety of types to represent the column types. Notably, it accepts both Java primitive types and Hadoop IO types
Hive column type
UDF types
string
java.lang.String, org.apache.hadoop.io.Text
Int            
int, java.lang.Integer, org.apache.hadoop.io.IntWritable
boolean
bool, java.lang.Boolean, org.apache.hadoop.io.BooleanWritable
array<type>
java.util.List<Java type>
map<ktype, vtype>
java.util.Map<Java type for K, Java type for V>
struct
Don't use Simple UDF, use GenericUDF

Simple versus Generic
While Simple UDF and UDAF are .. simple, they do have some limitations and shortcomings. The main limitation is related to handling of complex types. One of Hive's main feature is its advanced handling of advanced types:
o    Arrays of typed objects
o    Maps of typed keys and values
o    Structs of typed named fields
The system of simple UDFs is based on reflection and method overloading, which cannot accept everything.
For example, if you wanted to write an "array_sum" UDF, that would return the sum of elements in an array, you would write
public class ArraySum extends UDF {
  public double evaluate(List<Double> value) {
    double sum = 0;
    for (int i = 0; i < value.size(); i++) {
      if (value.get(i) != null) {
        sum += value.get(i);
      }
    }
    return sum;
  }
}
view rawArraySum.java hosted with by GitHub
But what happens if you would also handle arrays of integers ? You cannot overload with
public int evaluate(List<Integer> value)
because this is not valid in Java : the types of a method cannot differ by only the generic type. For more details, see the Generics article on Wikipedia
For this UDF to work, we need to use a Generic UDF.
The following table summarizes the main differences between Simple UDF / UDAF and Generic UDF / UDAF

Simple
Generic
Reduced performance due to use of reflection: each call of the evaluate method is reflective. Furthermore, all arguments are evaluated and parsed.
Optimal performance: no reflective call, and arguments are parsed lazily
Limited handling of complex types. Arrays are handled but suffer from type erasure limitations
All complex parameters are supported (even nested ones like array<array>
Variable number of arguments are not supported
Variable number of arguments are supported
Very easy to write
Not very difficult, but not well documented


Generic UDF
A generic UDF is written by extending the GenericUDF class.
    public interface GenericUDF {
    public Object evaluate(DeferredObject[] args) throws HiveException;
    public String getDisplayString(String[] args);
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
  }
A key concept when working with Generic UDF and UDAF is the ObjectInspector.
In generic UDFs, all objects are passed around using the Object type. Hive is structured this way so that all code handling records and cells is generic, and to avoid the costs of instantiating and deserializing objects when it's not needed.
Therefefore, all interaction with the data passed in to UDFs is done via ObjectInspectors. They allow you to read values from an UDF parameter, and to write output values.
Object Inspectors belong to one of the following categories:
o    Primitive, for primitive types (all numerical types, string, boolean, …)
o    List, for Hive arrays
o    Map, for Hive maps
o    Struct, for Hive structs
When Hive analyses the query, it computes the actual types of the parameters passed in to the UDF, and calls
public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
The method receives one object inspector for each of the arguments of the query, and must return an object inspector for the return type. Hive then uses the returned ObjectInspector to know what the UDF returns and to continue analyzing the query.
After that, rows are passed in to the UDF, which must use the ObjectInspectors it received in initialize() to read the deferred objects. UDFs generally stores the ObjectInspectors received and created in initialize() in member variables.
First, here is a very minimal sample of an UDF that takes an integer, and returns it multiplied by two. We could have easily implemented this sample as a simple UDF. This is really a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.io.IntWritable;

  public class UDFMultiplyByTwo extends GenericUDF {
  PrimitiveObjectInspector inputOI;
  PrimitiveObjectInspector outputOI;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  // This UDF accepts one argument
  assert (args.length == 1);
  // The first argument is a primitive type
  assert(args[0].getCategory() == Category.PRIMITIVE);

  inputOI  = (PrimitiveObjectInspector)args[0];
  /* We only support INTEGER type */
  assert(inputOI.getPrimitiveCategory() == PrimitiveCategory.INT);

  /* And we'll return a type int, so let's return the corresponding object inspector */
  outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;

  return outputOI;
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int value = (Integer) inputOI.getPrimitiveJavaObject(oin);

int output = value * 2;
return new IntWritable(output);
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
Here is another minimal sample of an UDF that takes an array as only parameter, and returns the first element of this array. We could not have done that with a simple UDF, because of type erasure. This is still a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;

  public class UDFArrayFirst extends GenericUDF {
  ListObjectInspector listInputObjectInspector;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  assert (args.length == 1); // This UDF accepts one argument
  // The first argument is a list
  assert(args[0].getCategory() == Category.LIST);

  listInputObjectInspector = (ListObjectInspector)args[0];

  /* Here comes the real usage for Object Inspectors : we know that our
  * return type is equal to the type of the elements of the input array.
  * We don't need to know in details what this type is, the
  * ListObjectInspector already has it */
  return listInputObjectInspector.getListElementObjectInspector();
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int nbElements = listInputObjectInspector.getListLength(oin);
if (nbElements > 0) {
// The list is not empty, return its head
return  listInputObjectInspector.getListElement(oin, 0);
} else {
return null;
}
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
See? It's not much more complex than a simple UDF, but much more powerful.
Some traps of generic UDF
Everything in the Generic UDF stack is processed through Object, so you will definitely have a hard time grasping the correct object types. Almost no type checking can be done at compile time, you will have to do it all at Runtime.
It is important to understand that the Object returned by the evaluate method must be a Writable object. For example, in the "multiply by two" example, we did not return Integer, but IntWritable. Failure to do so will result in cast exceptions.
Debugging generic UDFs is not trivial. You will often need to peek at the execution logs.
o    When running Hive in full map-reduce mode, use the task logs from your Jobtracker interface
o    When running Hive in local mode (which I recommend for development purposes), look for the following lines in the Hive output
o          Total MapReduce jobs = 1
o          Launching Job 1 out of 1
o          Number of reduce tasks is set to 0 since there's no reduce operator
o          Execution log at: /tmp/clement/clement_20130219103434_08913263-5a10-496f-8ddd-408b9c2ff0af.log
o          Job running in-process (local Hadoop)
o          Hadoop job information for null: number of mappers: 0; number of reducers: 0
   
Here, Hive tells you where the logs for this query will be stored. If the query fails, you'll have the full stack there.
More on Writable versus Primitive
ObjectInspectors, especially primitive ones, exist in two versions: Writable versions and Primitive versions. Some tools like ObjectInspectorUtils.compare are very sensitive to this distinction. Using the wrong kind of ObjectInspector leads to cast exceptions.
To switch between modes, use:
  ObjectInspector before;
  after = ObjectInspectorUtils.getStandardObjectInspector(before, ObjectInspectorCopyOption.WRITABLE)
A note about threading
Hive creates one instance of your UDF per mapper, so you may store some data as instance variables safely : each instance of your UDF will only be used in a single thread. However, multiple instances of the UDF can be running concurrently in the same process.
UDAF
Writing a UDAF is slightly more complex, even in the "Simple" variation, and requires understanding how Hive performs aggregations, especially with the GROUP BY operator.
In Hive, the computation of an aggregation must be divisible over the data :
o    given any subset of the input rows, the UDAF must be able to compute a partial result (and actually, you won't receive all rows at once, but one after another, and must be able to keep a state)
o    given a pair of partial results, the UDAF must be able to merge them in another partial result
o    given a partial result, the UDAF must be able to compute the final result (a single column value)
Furthermore, you must be able to serialize the partial result to an Object that MapReduce can read and write.

o