Monday, 2 January 2017

User-Defined Functions (UDF)

User-Defined Functions (UDFs)


User-defined functions (frequently abbreviated as UDFs) let you code your own application logic for processing column values during an Impala query. For example, a UDF could perform calculations using an external math library, combine several column values into one, do geospatial calculations, or other kinds of tests and transformations that are outside the scope of the built-in SQL operators and functions.

There are two different interfaces you can use for writing UDFs for Apache Hive. One is really simple, the other… not so much.
The simple API (org.apache.hadoop.hive.ql.exec.UDF) can be used so long as your function reads and returns primitive types. By this I mean basic Hadoop & Hive writable types - Text, IntWritable, LongWritable, DoubleWritable, etc.
However, if you plan on writing a UDF that can manipulate embedded data structures, such as MapList, and Set, then you’re stuck using org.apache.hadoop.hive.ql.udf.generic.GenericUDF, which is a little more involved.

UDF versus UDAF
In Hive, you can define two main kinds of custom functions:

UDF

A UDF processes one or several columns of one row and outputs one value. For example :
o    SELECT lower(str) from table
For each row in "table," the "lower" UDF takes one argument, the value of "str", and outputs one value, the lowercase representation of "str".
o    SELECT datediff(date_begin, date_end) from table
For each row in "table," the "datediff" UDF takes two arguments, the value of "date_begin" and "date_end", and outputs one value, the difference in time between these two dates.
Each argument of a UDF can be:
o    A column of the table
o    A constant value
o    The result of another UDF
o    The result of an arithmetic computation
TODO : Example
UDAF
An UDAF processes one or several columns of several input rows and outputs one value. It is commonly used together with the GROUP operator. For example:
o    SELECT sum(price) from table GROUP by customer;
The Hive Query executor will group rows by customer, and for each group, call the UDAF with all price values. The UDAF then outputs one value for the output record (one output record per customer);
o    SELECT total_customer_value(quantity, unit_price, day) from table group by customer;
For each record of each group, the UDAF will receive the three values of the three selected column, and output one value of the output record.
Simple UDFs
In Hive, you can write both UDF and UDAF in two ways: "simple" and "generic".
"Simple", especially for UDF are truly simple to write. It can be as easy as:
    /** A simple UDF to convert Celcius to Fahrenheit */
    public class ConvertToCelcius extends UDF {
    public double evaluate(double value) {
    return (value - 32) / 1.8;
  }
}
Once compiled, you can invoke an UDF like that:
  hive> addjar my-udf.jar
  hive> create temporary function fahrenheit_to_celcius using "com.mycompany.hive.udf.ConvertToCelcius";
  hive> SELECT fahrenheit_to_celcius(temp_fahrenheit) from temperature_data;
Simple UDF can also handle multiple types by writing several versions of the "evaluate" method.

  /** A simple UDF to get the absolute value of a number */
  public class AbsValue extends UDF {
  public double evaluate(double value) {
  return Math.abs(value);
}

public long evaluate(long value) {
return Math.abs(value);
}

public int evalute(int value) {
return Math.abs(value);
}
}

In short, to write a simple UDF:
o    Extend the org.apache.hadoop.hive.ql.exec.UDF class
o    Write an "evaluate" method that has a signature equivalent to the signature of your UDF in HiveQL.
Types
Simple UDF can accept a large variety of types to represent the column types. Notably, it accepts both Java primitive types and Hadoop IO types
Hive column type
UDF types
string
java.lang.String, org.apache.hadoop.io.Text
Int            
int, java.lang.Integer, org.apache.hadoop.io.IntWritable
boolean
bool, java.lang.Boolean, org.apache.hadoop.io.BooleanWritable
array<type>
java.util.List<Java type>
map<ktype, vtype>
java.util.Map<Java type for K, Java type for V>
struct
Don't use Simple UDF, use GenericUDF

Simple versus Generic
While Simple UDF and UDAF are .. simple, they do have some limitations and shortcomings. The main limitation is related to handling of complex types. One of Hive's main feature is its advanced handling of advanced types:
o    Arrays of typed objects
o    Maps of typed keys and values
o    Structs of typed named fields
The system of simple UDFs is based on reflection and method overloading, which cannot accept everything.
For example, if you wanted to write an "array_sum" UDF, that would return the sum of elements in an array, you would write
public class ArraySum extends UDF {
  public double evaluate(List<Double> value) {
    double sum = 0;
    for (int i = 0; i < value.size(); i++) {
      if (value.get(i) != null) {
        sum += value.get(i);
      }
    }
    return sum;
  }
}
view rawArraySum.java hosted with by GitHub
But what happens if you would also handle arrays of integers ? You cannot overload with
public int evaluate(List<Integer> value)
because this is not valid in Java : the types of a method cannot differ by only the generic type. For more details, see the Generics article on Wikipedia
For this UDF to work, we need to use a Generic UDF.
The following table summarizes the main differences between Simple UDF / UDAF and Generic UDF / UDAF

Simple
Generic
Reduced performance due to use of reflection: each call of the evaluate method is reflective. Furthermore, all arguments are evaluated and parsed.
Optimal performance: no reflective call, and arguments are parsed lazily
Limited handling of complex types. Arrays are handled but suffer from type erasure limitations
All complex parameters are supported (even nested ones like array<array>
Variable number of arguments are not supported
Variable number of arguments are supported
Very easy to write
Not very difficult, but not well documented


Generic UDF
A generic UDF is written by extending the GenericUDF class.
    public interface GenericUDF {
    public Object evaluate(DeferredObject[] args) throws HiveException;
    public String getDisplayString(String[] args);
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
  }
A key concept when working with Generic UDF and UDAF is the ObjectInspector.
In generic UDFs, all objects are passed around using the Object type. Hive is structured this way so that all code handling records and cells is generic, and to avoid the costs of instantiating and deserializing objects when it's not needed.
Therefefore, all interaction with the data passed in to UDFs is done via ObjectInspectors. They allow you to read values from an UDF parameter, and to write output values.
Object Inspectors belong to one of the following categories:
o    Primitive, for primitive types (all numerical types, string, boolean, …)
o    List, for Hive arrays
o    Map, for Hive maps
o    Struct, for Hive structs
When Hive analyses the query, it computes the actual types of the parameters passed in to the UDF, and calls
public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
The method receives one object inspector for each of the arguments of the query, and must return an object inspector for the return type. Hive then uses the returned ObjectInspector to know what the UDF returns and to continue analyzing the query.
After that, rows are passed in to the UDF, which must use the ObjectInspectors it received in initialize() to read the deferred objects. UDFs generally stores the ObjectInspectors received and created in initialize() in member variables.
First, here is a very minimal sample of an UDF that takes an integer, and returns it multiplied by two. We could have easily implemented this sample as a simple UDF. This is really a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.io.IntWritable;

  public class UDFMultiplyByTwo extends GenericUDF {
  PrimitiveObjectInspector inputOI;
  PrimitiveObjectInspector outputOI;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  // This UDF accepts one argument
  assert (args.length == 1);
  // The first argument is a primitive type
  assert(args[0].getCategory() == Category.PRIMITIVE);

  inputOI  = (PrimitiveObjectInspector)args[0];
  /* We only support INTEGER type */
  assert(inputOI.getPrimitiveCategory() == PrimitiveCategory.INT);

  /* And we'll return a type int, so let's return the corresponding object inspector */
  outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;

  return outputOI;
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int value = (Integer) inputOI.getPrimitiveJavaObject(oin);

int output = value * 2;
return new IntWritable(output);
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
Here is another minimal sample of an UDF that takes an array as only parameter, and returns the first element of this array. We could not have done that with a simple UDF, because of type erasure. This is still a minimal sample, that lacks some very important type checking logic, which we'll add later
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;

  public class UDFArrayFirst extends GenericUDF {
  ListObjectInspector listInputObjectInspector;

  public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  assert (args.length == 1); // This UDF accepts one argument
  // The first argument is a list
  assert(args[0].getCategory() == Category.LIST);

  listInputObjectInspector = (ListObjectInspector)args[0];

  /* Here comes the real usage for Object Inspectors : we know that our
  * return type is equal to the type of the elements of the input array.
  * We don't need to know in details what this type is, the
  * ListObjectInspector already has it */
  return listInputObjectInspector.getListElementObjectInspector();
}

public Object evaluate(DeferredObject[] args) throws HiveException {
if (args.length != 1) return null;

// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
Object oin = args[0].get();

if (oin == null) return null;

int nbElements = listInputObjectInspector.getListLength(oin);
if (nbElements > 0) {
// The list is not empty, return its head
return  listInputObjectInspector.getListElement(oin, 0);
} else {
return null;
}
}

@Override
public String getDisplayString(String[] args) {
return "Here, write a nice description";
}
}
See? It's not much more complex than a simple UDF, but much more powerful.
Some traps of generic UDF
Everything in the Generic UDF stack is processed through Object, so you will definitely have a hard time grasping the correct object types. Almost no type checking can be done at compile time, you will have to do it all at Runtime.
It is important to understand that the Object returned by the evaluate method must be a Writable object. For example, in the "multiply by two" example, we did not return Integer, but IntWritable. Failure to do so will result in cast exceptions.
Debugging generic UDFs is not trivial. You will often need to peek at the execution logs.
o    When running Hive in full map-reduce mode, use the task logs from your Jobtracker interface
o    When running Hive in local mode (which I recommend for development purposes), look for the following lines in the Hive output
o          Total MapReduce jobs = 1
o          Launching Job 1 out of 1
o          Number of reduce tasks is set to 0 since there's no reduce operator
o          Execution log at: /tmp/clement/clement_20130219103434_08913263-5a10-496f-8ddd-408b9c2ff0af.log
o          Job running in-process (local Hadoop)
o          Hadoop job information for null: number of mappers: 0; number of reducers: 0
   
Here, Hive tells you where the logs for this query will be stored. If the query fails, you'll have the full stack there.
More on Writable versus Primitive
ObjectInspectors, especially primitive ones, exist in two versions: Writable versions and Primitive versions. Some tools like ObjectInspectorUtils.compare are very sensitive to this distinction. Using the wrong kind of ObjectInspector leads to cast exceptions.
To switch between modes, use:
  ObjectInspector before;
  after = ObjectInspectorUtils.getStandardObjectInspector(before, ObjectInspectorCopyOption.WRITABLE)
A note about threading
Hive creates one instance of your UDF per mapper, so you may store some data as instance variables safely : each instance of your UDF will only be used in a single thread. However, multiple instances of the UDF can be running concurrently in the same process.
UDAF
Writing a UDAF is slightly more complex, even in the "Simple" variation, and requires understanding how Hive performs aggregations, especially with the GROUP BY operator.
In Hive, the computation of an aggregation must be divisible over the data :
o    given any subset of the input rows, the UDAF must be able to compute a partial result (and actually, you won't receive all rows at once, but one after another, and must be able to keep a state)
o    given a pair of partial results, the UDAF must be able to merge them in another partial result
o    given a partial result, the UDAF must be able to compute the final result (a single column value)
Furthermore, you must be able to serialize the partial result to an Object that MapReduce can read and write.

o 


2 comments:

  1. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Solutions

    Data Lake Companies

    Advanced Analytics Solutions

    Full Stack Development Company

    ReplyDelete
  2. The article is so appealing. You should read this article before choosing the AWS big data consultant you want to learn.

    ReplyDelete