Thursday, August 27, 2015

Incremental updates in Hive

Incremental Updates

Hadoop and Hive are quickly evolving to outgrow previous limitations for integration and data access.
On the near-term development roadmap, we expect to see Hive supporting full CRUD operations (Insert, Select, Update, Delete). As we wait for these advancements, there is still a need to work with the current options—OVERWRITE or APPEND— for Hive table integration.
The OVERWRITE option requires moving the complete record set from source to Hadoop. While this approach may work for smaller data sets, it may be prohibitive at scale.
The APPEND option can limit data movement only to new or updated records. As true Inserts and Updates are not yet available in Hive, we need to consider a process of preventing duplicate records as Updates are appended to the cumulative record set.
In this blog, we will look at a four-step strategy for appending Updates and Inserts from delimited and RDBMS sources to existing Hive table definitions. While there are several options within the Hadoop platform for achieving this goal, our focus will be on a process that uses standard SQL within the Hive toolset.
IMPORTANT: For this blog, we assume that each source table will have a unique single or multi-key identifier and that a “modified_date” field is maintained for each record – either defined as part of the original source table or added as part of the ingest process.

Hive Table Definition Options: External, Local and View

External Tables are the combination of Hive table definitions and HDFS managed folders and files. The table definition exists independent from the data, so that, if the table is dropped, the HDFS folders and files remain in their original state.
Local Tables are Hive tables that are directly tied to the source data. The data is physically tied to the table definition and will be deleted if the table is dropped.
Views, as with traditional RDBMS, are stored SQL queries that support the same READ interaction as HIVE tables, yet they do not store any data of their own. Instead, the data are stored and sourced from the HIVE tables referenced in the stored SQL query.
The following process outlines a workflow that leverages all of the above in four steps:
Screen Shot 2014-07-14 at 2.24.01 PM
  • Ingest. Complete (base_table) table movement followed by Change (incremental_table) records only.
  • Reconcile. Creating a Single View of Base + Change records (reconcile_view) to reflect the most up-to-date record set.
  • Compact. Creating a Reporting table (reporting_table) from the reconciled view.
  • Purge. Replacing the Base table with Reporting table contents and deleting any previously processed Change records before the next Data Ingestion cycle.
The tables and views that will be a part of the Incremental Update Workflow are:
  • base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table (as explained in the Step 4: Purge).
  • incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content (as explained in the Step 4: Purge).
  • reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table (as explained in Step 3: Compact).
  • reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.

Step 1: Ingest

Depending on whether direct access is available to the RDBMS source system, you may opt for either a File Processing method (when no direct access is available) or RDBMS Processing (when database client access is available).
Regardless of the ingest option, the processing workflow in this article requires:
  1. One-time, initial load to move all data from source table to HIVE.
  2. On-going, “Change Only” data loads from the source table to HIVE.
Below, both File Processing and Database-direct (SQOOP) ingest will be discussed.

File Processing

For this blog, we assume that a file or set of files within a folder will have a delimited format and will have been generated from a relational system (i.e. records have unique keys or identifiers).
Files will need to be moved into HDFS using standard ingest options:
  • WebHDFS: Primarily used when integrating with applications, a Web URL provides an Upload end-point into a designated HDFS folder.
  • NFS: Appears as a standard network drive and allows end-users to use standard Copy-Paste operations to move files from standard file systems into HDFS.
Once the initial set of records are moved into HDFS, subsequent scheduled events can move files containing only new Inserts and Updates.

RDBMS Processing

SQOOP is the JDBC-based utility for integrating with traditional databases. A SQOOP Import allows for the movement of data into either HDFS (a delimited format can be defined as part of the Import definition) or directly into a Hive table.
The entire source table can be moved into HDFS or Hive using the “–table” parameter
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1
After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1--check-column modified_date --incremental lastmodified --last-value {last_import_date}
Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --target-dir /user/hive/incremental_table -m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’
Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.

Step 2: Reconcile

In order to support an on-going reconciliation between current records in HIVE and new change records, two tables should be defined: base_table and incremental_table

base_table

The example below shows DDL for the Hive table “base_table” that will include any delimited files located in HDFS under the ‘/user/hive/base_table’ directory. This table will house the initial, complete record load from the source system. After the first processing run, it will house the on-going, most up-to-date set of records from the source system:
01
02
03
04
05
06
07
08
09
10
11
12
CREATE TABLE base_table (
    id string,
    field1 string,
    field2 string,
    field3 string,
    field4 string,
    field5 string,
    modified_date string
)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
LOCATION '/user/hive/base_table';

incremental_table

The DDL below shows an external Hive table “incremental_table” that will include any delimited files with incremental change records, located in HDFS under the ‘/user/hive/incremental_append’ directory:
01
02
03
04
05
06
07
08
09
10
11
12
CREATE EXTERNAL TABLE incremental_table (
    id string,
    field1 string,
    field2 string,
    field3 string,
    field4 string,
    field5 string,
    modified_date string
)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
LOCATION '/user/hive/incremental_table';

reconcile_view

This view combines record sets from both the Base (base_table) and Change (incremental_table) tables and is reduced only to the most recent records for each unique “id”.
It is defined as follows:
01
02
03
04
05
06
07
08
09
10
11
12
CREATE VIEW reconcile_view AS
SELECT t1.* FROM
(SELECT * FROM base_table
    UNION ALL
    SELECT * FROM incremental_table) t1
JOIN
    (SELECT id, max(modified_date) max_modified FROM
        (SELECT * FROM base_table
        UNION ALL
        SELECT * FROM incremental_table) t2
    GROUP BY id) s
ON t1.id = s.id AND t1.modified_date = s.max_modified;

Example

The sample data below represents the UNION of both the base_table and incremental_table. Note, there are new updates for “id” values 1 and 2, which are found as the last two records in the table. The record for “id” 3 remains unchanged.
Screen Shot 2014-07-14 at 5.57.32 PM
The reconcile_view should only show one record for each unique “id”, based on the latest “modified_date” field value.
The resulting query from “select * from reconcile_view” shows only three records, based on both unique “id” and “modified_date”
Screen Shot 2014-07-14 at 5.57.42 PM

Step 3: Compact

The reconcile_view now contains the most up-to-date set of records and is now synchronized with changes from the RDBMS source system. For BI Reporting and Analytical tools, a reporting_table can be generated from the reconcile_view. Before creating this table, any previous instances of the table should be dropped as in the example below.

reporting_table

1
2
3
DROP TABLE reporting_table;
CREATE TABLE reporting_table AS
SELECT * FROM reconcile_view;
Moving the Reconciled View (reconcile_view) to a Reporting Table (reporting_table) reduces the amount of processing needed for reporting queries.
Further, the data stored in the Reporting Table will also be static, unchanged until the next processing cycle. This provides consistency in reporting between processing cycles. In contrast, the Reconciled View (reconcile_view) is dynamic and will change as soon as new files (holding change records) are added to or removed from the Change table (incremental_table) folder /user/hive/incremental_table.

Step 4: Purge

To prepare for the next series of incremental records from the source, replace the Base table (base_table) with only the most up-to-date records (reporting_table). Also, delete the previously imported Change record content (incremental_table) by deleting the files located in the external table location (‘/user/hive/incremental_table’).
From a HIVE client:
1
2
3
DROP TABLE base_table;
CREATE TABLE base_table AS
SELECT * FROM reporting_table;
From an HDFS client:
hadoop fs –rm –r /user/hive/incremental_table/*

Final Thoughts

While there are several possible approaches to supporting incremental data feeds into Hive, this example has a few key advantages:
  1. By maintaining an External Table for updates only, the table contents can be refreshed by simply adding or deleting files to that folder.
  2. The four steps in the processing cycle (Ingest, Reconcile, Compact and Purge) can be coordinated in a single OOZIE workflow. The OOZIE workflow can be a scheduled event that corresponds to the data freshness SLA (i.e. Daily, Weekly, Monthly, etc.)
  3. In addition to supporting INSERT and UPDATE synchronization, DELETES can be synchronized by adding either a DELETE_FLAG or DELETE_DATE field to the import source. Then, use this field as a filter in the Hive reporting table to hide deleted records. For example,
    01
    02
    03
    04
    05
    06
    07
    08
    09
    10
    11
    12
    13
    CREATE VIEW reconcile_view AS
       SELECT t1.* FROM
          (SELECT * FROM base_table
          UNION
          SELECT * FROM incremental_table) t1
       JOIN
          (SELECT id, max(modified_date) max_modified FROM
             (SELECT * FROM base_table
             UNION
             SELECT * FROM incremental_table) 
          GROUP BY id) s
       ON t1.id = s.id AND t1.modified_date = s.max_modified
       AND t1.delete_date IS NULL;
In essence, this four-step strategy enables incremental updates, as we await the near-term development
of Hive support for full CRUD operations (Insert, Select, Update, Delete).

2 comments: