Thursday, 24 November 2016

MapReduce Job Life Cycle

MapReduce Job Life Cycle:

This section briefly sketches the life cycle of a MapReduce job and the roles of the primary actors in the life cycle. The full life cycle is much more complex. For details, refer to the documentation for your Hadoop distribution or the Apache Hadoop MapReduce documentation.
Though other configurations are possible, a common Hadoop cluster configuration is a single master node where the Job Tracker runs, and multiple worker nodes, each running a Task Tracker. The Job Tracker node can also be a worker node.
When the user submits a MapReduce job to Hadoop:
  1. The local Job Client prepares the job for submission and hands it off to the Job Tracker.
  2. The Job Tracker schedules the job and distributes the map work among the Task Trackers for parallel processing.
  3. Each Task Tracker spawns a Map Task. The Job Tracker receives progress information from the Task Trackers.
  4. As map results become available, the Job Tracker distributes the reduce work among the Task Trackers for parallel processing.
  5. Each Task Tracker spawns a Reduce Task to perform the work. The Job Tracker receives progress information from the Task Trackers.
All map tasks do not have to complete before reduce tasks begin running. Reduce tasks can begin as soon as map tasks begin completing. Thus, the map and reduce steps often overlap.

Job Client


The Job Client prepares a job for execution.When you submit a MapReduce job to Hadoop, the local JobClient:
  1. Validates the job configuration.
  2. Generates the input splits. 
  3. Copies the job resources (configuration, job JAR file, input splits) to a shared location, such as an HDFS directory, where it is accessible to the Job Tracker and Task Trackers.
  4. Submits the job to the Job Tracker.

Job Tracker


The Job Tracker is responsible for scheduling jobs, dividing a job into map and reduce tasks, distributing map and reduce tasks among worker nodes, task failure recovery, and tracking the job status. Job scheduling and failure recovery are not discussed here; see the documentation for your Hadoop distribution or the Apache Hadoop MapReduce documentation.
When preparing to run a job, the Job Tracker:
  1. Fetches input splits from the shared location where the Job Client placed the information.
  2. Creates a map task for each split.
  3. Assigns each map task to a Task Tracker (worker node).
The Job Tracker monitors the health of the Task Trackers and the progress of the job. As map tasks complete and results become available, the Job Tracker:
  1. Creates reduce tasks up to the maximum enableed by the job configuration.
  2. Assigns each map result partition to a reduce task.
  3. Assigns each reduce task to a Task Tracker.
A job is complete when all map and reduce tasks successfully complete, or, if there is no reduce step, when all map tasks successfully complete.

Task Tracker


A Task Tracker manages the tasks of one worker node and reports status to the Job Tracker. Often, the Task Tracker runs on the associated worker node, but it is not required to be on the same host.
When the Job Tracker assigns a map or reduce task to a Task Tracker, the Task Tracker:
  1. Fetches job resources locally.
  2. Spawns a child JVM on the worker node to execute the map or reduce task.
  3. Reports status to the Job Tracker.
The task spawned by the Task Tracker runs the job's map or reduce functions.

Map Task


The Hadoop MapReduce framework creates a map task to process each input split. The map task:
  1. Uses the InputFormat to fetch the input data locally and create input key-value pairs.
  2. Applies the job-supplied map function to each key-value pair.
  3. Performs local sorting and aggregation of the results.
  4. If the job includes a Combiner, runs the Combiner for further aggregation.
  5. Stores the results locally, in memory and on the local file system.
  6. Communicates progress and status to the Task Tracker.
Map task results undergo a local sort by key to prepare the data for consumption by reduce tasks. If a Combiner is configured for the job, it also runs in the map task. A Combiner consolidates the data in an application-specific way, reducing the amount of data that must be transferred to reduce tasks. For example, a Combiner might compute a local maximum value for a key and discard the rest of the values. The details of how map tasks manage, sort, and shuffle results are not covered here. See the documentation for your Hadoop distribution or the Apache Hadoop MapReduce documentation.
When a map task notifies the Task Tracker of completion, the Task Tracker notifies the Job Tracker. The Job Tracker then makes the results available to reduce tasks.

Reduce Task


The reduce phase aggregates the results from the map phase into final results. Usually, the final result set is smaller than the input set, but this is application dependent. The reduction is carried out by parallel reduce tasks. The reduce input keys and values need not have the same type as the output keys and values.The reduce phase is optional. You may configure a job to stop after the map phase completes
Reduce is carried out in three phases, copy, sort, and merge. A reduce task:
  1. Fetches job resources locally.
  2. Enters the copy phase to fetch local copies of all the assigned map results from the map worker nodes.
  3. When the copy phase completes, executes the sort phase to merge the copied results into a single sorted set of (key, value-list) pairs.
  4. When the sort phase completes, executes the reduce phase, invoking the job-supplied reduce function on each (key, value-list)pair.
  5. Saves the final results to the output destination, such as HDFS.
The input to a reduce function is key-value pairs where the value is a list of values sharing the same key. For example, if one map task produces a key-value pair ('eat', 2) and another map task produces the pair ('eat', 1), then these pairs are consolidated into('eat', (2, 1)) for input to the reduce function. If the purpose of the reduce phase is to compute a sum of all the values for each key, then the final output key-value pair for this input is ('eat', 3). For a more complete example,
Output from the reduce phase is saved to the destination configured for the job, such as HDFS or MarkLogic Server. Reduce tasks use anOutputFormat subclass to record results. The Hadoop API provides OutputFormat subclasses for using HDFS as the output destination. The MarkLogic Connector for Hadoop provides OutputFormat subclasses for using a MarkLogic Server database as the destination. For a list of available subclasses,  The connector also provides classes for defining key and value types.

How Hadoop Partitions Map Input Data

When you submit a job, the MapReduce framework divides the input data set into chunks called splits using theorg.apache.hadoop.mapreduce.InputFormat subclass supplied in the job configuration. Splits are created by the local Job Client and included in the job information made available to the Job Tracker.
The JobTracker creates a map task for each split. Each map task uses a RecordReader provided by the InputFormat subclass to transform the split into input key-value pairs. The diagram below shows how the input data is broken down for analysis during the map phase:
The Hadoop API provides InputFormat subclasses for using HDFS as an input source. The MarkLogic Connector for Hadoop providesInputFormat subclasses for using MarkLogic Server as an input source. For a list of available MarkLogic-specific subclasses, seeInputFormat Subclasses.

Tuesday, 3 May 2016

Hive Query Language

Hive Query Language
HiveQL is an SQL-like query language for Hive. It mostly mimics SQL syntax for creation of tables, loading data into tables and querying the tables. HiveQL also allows users to embed their custom map-reduce scripts. These scripts can be written in any language using a simple row-based streaming interface – read rows from standard input and write out rows to standard output. This flexibility comes at a cost of a performance hit caused by converting rows from and to strings. However, we have seen that users do not mind this given that they can implement their scripts in the language of their choice. Another feature unique to HiveQL is multi-table insert. In this construct, users can perform multiple queries on the same input data using a single HiveQL query. Hive optimizes these queries to share the scan of the input data, thus increasing the throughput of these queries several orders of magnitude. We omit more details due to lack of space
Phases of Hive
Compiler
·         Parser – Transform a query string to a parse tree representation.
·         Semantic Analyser – Transform the parse tree to an internal query representation, which is still block based and not an operator tree. As part of this step, the column names are verified and expansions like * are performed. Type-checking and any implicit type conversions are also performed at this stage. If the table under consideration is a partitioned table, which is the common scenario, all the expressions for that table are collected so that they can be later used to prune the partitions which are not needed. If the query has specified sampling, that is also collected to be used later on.
·         Logical Plan Generator – Convert the internal query representation to a logical plan, which consists of a tree of operators. Some of the operators are relational algebra operators like 'filter', 'join' etc. But some of the operators are Hive specific and are used later on to convert this plan into a series of map-reduce jobs. One such operator is a reduceSink operator which occurs at the map-reduce boundary. This step also includes the optimizer to transform the plan to improve performance – some of those transformations include: converting a series of joins into a single multi-way join, performing a map-side partial aggregation for a group-by, performing a group-by in 2 stages to avoid the scenario when a single reducer can become a bottleneck in presence of skewed data for the grouping key. Each operator comprises a descriptor which is a serializable object.
·         Query Plan Generator – Convert the logical plan to a series of map-reduce tasks. The operator tree is recursively traversed, to be broken up into a series of map-reduce serializable tasks which can be submitted later on to the map-reduce framework for the Hadoop distributed file system. The reduceSink operator is the map-reduce boundary, whose descriptor contains the reduction keys. The reduction keys in the reduceSink descriptor are used as the reduction keys in the map-reduce boundary. The plan consists of the required samples/partitions if the query specified so. The plan is serialized and written to a file.
Optimizer
More plan transformations are performed by the optimizer. The optimizer is an evolving component. As of 2011, it was rule-based and performed the following: column pruning and predicate pushdown. However, the infrastructure was in place, and there was work under progress to include other optimizations like map-side join.

Metastore
The Metastore provides two important but often overlooked features of a data warehouse: data abstraction and data discovery. Without the data abstractions provided in Hive, a user has to provide information about data formats, extractors and loaders along with the query. In Hive, this information is given during table creation and reused every time the table is referenced. This is very similar to the traditional warehousing systems. The second functionality, data discovery, enables users to discover and explore relevant and specific data in the warehouse.



Metadata Objects
·         Database – is a namespace for tables. It can be used as an administrative unit in the future. The database 'default' is used for tables with no user-supplied database name.
·         Table – Metadata for a table contains list of columns, owner, storage and SerDe information. It can also contain any user-supplied key and value data. Storage information includes location of the underlying data, file inout and output formats and bucketing information. SerDe metadata includes the implementation class of serializer and deserializer and any supporting information required by the implementation. All of this information can be provided during creation of the table.
·         Partition – Each partition can have its own columns and SerDe and storage information. This facilitates schema changes without affecting older partitions.

External table in HIVE
(Stores data on HDFS) 

·         External table stores files on the HDFS server but tables are not linked to the source file completely.(I have explained below what I meant by completely) 
·         If you delete an external table the file still remains on the HDFS server.
·         The file and the table link is there but read only.
·         As an example if you create an external table called “amandeep_test” in HIVE using HIVE-QL and link the table to file “flat_file.txt”, then deleting “amandeep_test” from HIVE will not delete “flat_file.txt” from HDFS.
·         External table files are accessible to anyone who has access to HDFS file structure and therefore security needs to be managed at the HDFS file/folder level.
·         Meta data is maintained on master node and deleting an external table from HIVE, only deletes the metadata not the data/file.
Use external table if you:
·         Want to manage the data outside HIVE e.g. you are planning to use an ETL tool to load/merge data files etc.
·         Want to load the latest information to the table but still want to retain old dataset in a file on HDFS for regulatory/legal purposes.
·         Are not planning to create a table from another table schema e.g. Create table1 as (Select * from table2)
Internal table in HIVE
(Stores data on HDFS but in a kind of restricted area)

 Stored in a directory based on settings in the following file:hive.metastore.warehouse.dir
by default internal tables are stored in the following directory “/user/hive/warehouse” you can change it by updating the location in the config file mentioned above.
e.g. in the following screen shot internal table “ tbl_batting“ is stored as a file on HDFS in warehouse folder

·         Deleting the table deletes the metadata & data from masternode and HDFS respectively
·         Security needs to be managed within HIVE, probably at the schema level (depends on organisation to organisation). HDFS security is out of scope in this case. 
 Use internal table if you:
·         Want to store the data temporary.
·         Want to use HIVE to manage the lifecycle of tables and data.







Monday, 2 May 2016

Welcome

Hi All!
Welcome to the Hadoop Basics Tutorial!
Keep following the blog for easy tutorials on Hadoop technologies.