Wednesday, February 8, 2017

Query/parse massive Json Data on Hadoop/Hive

Parsing massive amounts of semi structure data is a pain using traditional parser. And more over you want to make that data queryable is an additional task.

Problem with traditional approach: 

If you have massive data, or expecting your data to grow huge, you would be restricted by your hardware(storage/processing).

Overview of traditional approach:

  1.  Store the data on a server.(beware servers have limited space and CPUs)
  2.  Write a parser program to parse Json.Rewrite the code if structure of JSON changes. 
  3.  Store it in RDBMS(Again, RDBMS is also constrained by storage and processing capasity)
  4.  Query RDBMs.


Limitations:
The above approach has worked well with limited amount of data. But in this age of data flood where data is generated by almost every device, the above approach would fall short on

  1.  Scalable storage: Scale your storage as the need arise
  2.  Scalable processing: Increase CPU's as the data grows.
  3.  Fault Tolerance: If the server fail after few hours of processing you need to start from the beginning. Moreover it needs manual intervention.


All the above drawback/Limitations are addressed if you are using big data platform like Hadoop.

Here is how you can query massive amount of Data on hadoop/Hive.

Hadoop based Approach overview:

  1. Store your JSON data on HDFS.
  2. Create external tables using hive and use jsonSerde to map json data to coloumns of your table.
  3. Query your data using hiveQL.


Example implementation:

Step 1: Store Json file on HDFS

hdfs dfs -put


hdfs dfs -put /user/mik/jsondata/*.json /user/mik/data/

Step 2: Create external tables using hive and use jsonSerde


> Download the serde file:
http://www.congiu.net/hive-json-serde/1.3.7/cdh5/json-udf-1.3.7-jar-with-dependencies.jar

> Store jar on HDFS home directory.
$ hdfs dfs -put json-udf-1.3.7-jar-with-dependencies.jar /user/mik/

> Create Hive external tables and map column names with json attribute name using Json serde

$ hive

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p1357.1177/jars/hive-common-1.1.0-cdh5.4.5.jar!/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.

hive> add jar json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar;
Added [json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar]

hive> CREATE EXTERNAL TABLE tb_countrycode_json
    > (
    >         countryName             STRING,
    >         countryCode             STRING
    >
    > )
    > ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    > WITH SERDEPROPERTIES ( "mapping.time_stamp" = "timestamp" )
    > LOCATION '/user/mik/data'
    > ;

Step 3: Query your data using HiveQL

hive> Select * from tb_countrycode_json;

That's it.

Tuesday, February 7, 2017

Insert, Update , delete on records now possible on Hadoop...!!!

With the release of Apache Kudu alonge with CDH 5.10 GA, we are bit more confident about Kudu being production ready.

Kudu fill in the gap of hadoop not being able to insert,update,delete records on hive tables. Kudu allows insert,delete,update on tables in collaboration with impala. This would also facilitate the pain point of incremental updates on fast moving/changing data loads .

Kudu is a columnar storage manager developed for the Apache Hadoop platform. Kudu shares the common technical properties of Hadoop ecosystem applications: it runs on commodity hardware, is horizontally scalable, and supports highly available operation.

Some of Kudu’s benefits:

  • Fast processing of OLAP workloads.
  • Integration with MapReduce, Spark and other Hadoop ecosystem components.
  • Tight integration with Apache Impala (incubating), making it a good, mutable alternative to using HDFS with Apache Parquet.
  • Strong but flexible consistency model, allowing you to choose consistency requirements on a per-request basis, including the option for strict-serializable consistency.
  • Strong performance for running sequential and random workloads simultaneously.



If intrested in reading more go to :https://kudu.apache.org/

After a brief discription, let go back to our basic requirements.

Insert,Update,Delete operations on record level.

We could achieve insert, update, delete operatios by using Kudu alaong with Impala(http://impala.apache.org/)

Syntax to support record level insert/updates/deletes are provided by implala and the actual storage and modification/deletion of records is managed by Kudu.
Some of the features of Kudu with Impala:

CREATE/ALTER/DROP TABLE
Impala supports creating, altering, and dropping tables using Kudu as the persistence layer. The tables follow the same internal / external approach as other tables in Impala, allowing for flexible data ingestion and querying.

INSERT
Data can be inserted into Kudu tables in Impala using the same syntax as any other Impala table like those using HDFS or HBase for persistence.

UPDATE / DELETE
Impala supports the UPDATE and DELETE SQL commands to modify existing data in a Kudu table row-by-row or as a batch. The syntax of the SQL commands is chosen to be as compatible as possible with existing standards. In addition to simple DELETE or UPDATE commands, you can specify complex joins with a FROM clause in a subquery.

Flexible Partitioning
Similar to partitioning of tables in Hive, Kudu allows you to dynamically pre-split tables by hash or range into a predefined number of tablets, in order to distribute writes and queries evenly across your cluster. You can partition by any number of primary key columns, by any number of hashes, and an optional list of split rows. See Schema Design.

Parallel Scan
To achieve the highest possible performance on modern hardware, the Kudu client used by Impala parallelizes scans across multiple tablets.

High-efficiency queries
Where possible, Impala pushes down predicate evaluation to Kudu, so that predicates are evaluated as close as possible to the data. Query performance is comparable to Parquet in many workloads.


Example of insert at record level. Here we would use location data.

Step 1: Creating an Table with Impala(optional).
CREATE EXTERNAL TABLE hdoopgig_demo (
 version int,
 time string,
 tagid int,
 longitude float,
 latitude float,
 speed float,
 heading float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION ''
TBLPROPERTIES ('skip.header.line.count'='1'); // if your data is having first line as header or else you can ignore it.
Step 2: Check if the data is loaded in the table

SELECT count(*) FROM hdoopgig_demo;

Step 3: Create table in kudu

CREATE TABLE hdoopgig_kudu
PRIMARY KEY (time, tagid)
PARTITION BY HASH(time) PARTITIONS 8
STORED AS KUDU
AS SELECT
  UNIX_TIMESTAMP(time,  'MM/dd/yyyy HH:mm:ss') AS time,
  tagid,
  longitude,
  latitude,
  speed,
  heading
FROM hdoopgig_demo;

Step 3: Try deleting on record(Which is impossible in hive till date)

SELECT * FROM hdoopgig_kudu LIMIT 1; // get a record which you want to delete

DELETE FROM hdoopgig_kudu WHERE tag = '1234';

Step 4: Inserting Single Values

INSERT INTO hdoopgig_kudu VALUES ("02/05/2017",1234,120.098,120.023,100.20,10.00);

Three rows using a single statement.

INSERT INTO hdoopgig_kudu VALUES ("02/05/2017",1234,120.098,120.023,100.20,10.00),("02/05/2017",1234,120.098,120.023,100.20,10.00),("02/05/2017",1234,120.098,120.023,100.20,10.00);

Updates:

UPDATE hdoopgig_kudu SET heading=50 where heading =49  ;

Will post more updates on my blog with more exciting features.