In my previous post I have explained how to develop a custom flume interceptor, which could be used to create a HDFS folder structure that can support Hive partitioning. Having folder structure on HDFS is not enough for Hive to identify the partitions. We need to update the Hive metastore to detect these HDFS folders as partitions.
Hive provides built in capability to detect new partitions on HDFS by executing bellow HSQL.
MSCK [REPAIR] TABLE table_name [ADD/DROP/SYNC PARTITIONS]
But when you have a very large number of nested partitions, this will become very in-efficient and time consuming, since HIVE is going to run a full scan on HDFS (table location) to identify each and every partition.
In my case, HDFS is partitioned based on timestamp, enabling me to predict the partitions to be added in future.
Suppose I have following directory structure on hive warehouse (HDFS).
+--- ORDERS | +--- YEAR=2020 | | +--- MONTH=202007 | | | +--- DAY=20200715 | | | +--- DAY=20200716 | | +--- MONTH=202006 | | | +--- DAY=20200601
My ORDER data is now structured on HDFS to support three partitions on HIVE, i.e YEAR, MONTH and DAY.
I will now create the hive table using the following HSQL.
CREATE EXTERNAL TABLE ORDERS (ORDERID string, ORDERDATE string, TOTAL double) partitioned by ( YEAR INT, MONTH INT, DAY INT ) clustered by (ORDERID) into 2 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/hive/warehouse/ORDERS';
Now we have created a table on HIVE pointing to our data on HDFS, yet Hive won't be reading any data as it's unaware of the actual partitions available on HDFS.
At this point we could run the MSCK REPAIR TABLE
, but it would be really inefficient as I have a large number of partitions and given that I have to run this daily, it's just a waste of resources. (And at some point it's going to take indefinite time.)
As I can predict the partitions which are to be created, I have developed following HiveServer2 client which is going to add individual partitions to table, instead of having to scan the whole external table on HDFS.
I am using cloudera flume distribution, so first thing is to define cloudera maven repo in your POM file.
<repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository>
Define following dependency in your POM file.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency>
Even though the above dependency is enough for you to build the source, you still need the following jars in your classpath when you run the client.
Make sure you have following jars available in your classpath.
hadoop-auth-2.6.0-cdh5.8.0.jar
hive-jdbc-1.1.0-cdh5.8.0-standalone.jar
Lets look at the client now.
This example will add partitions from a predefined start date to a predefined end date.
HiveServer is secured with Kerberos, therefore you need to set up UserGroupInformation
with keytab
before creating the connection.
Refer my previous posts for more information on Keytab creation and Java HDSF kerberos clients
I am just iterating from start date to end date and adding partitions for each day on HIVE metastore. You don't necessarily need to have underlying HDFS folders for each day. If the folders are not yet available on HDFS, hive will just create the folder structure and update it's metadata with empty partitions. If you add data to these empty HDFS folders at a later stage, hive will automatically pick them, when you query these partitions.
import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; public class HMSSync { //HIVE JDBC Driver private static String driverName = "org.apache.hive.jdbc.HiveDriver"; //FORMATER for PARTITION values private static final SimpleDateFormat PARTITION_YEAR = new SimpleDateFormat("yyyy"); private static final SimpleDateFormat PARTITION_MONTH = new SimpleDateFormat("yyyyMM"); private static final SimpleDateFormat PARTITION_DAY = new SimpleDateFormat("yyyyMMdd"); public static void main(String[] args) { try { //Load the driver Class.forName(driverName); //Change following to match your requirements/env. String jdbcURL = "jdbc:hive2://h2server:port;principal=hive/h2server@yourdomain.com"; String table = "ORDERS"; String start_date = "2010-12-20"; //Partitions will be added starting String end_date = "2010-12-26"; //End date for the partitions String principal = "you@yourdomain.com"; String keyTab = "path-to-your-ketab.file"; //Set Kerberos Configuration conf = new Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); UserGroupInformation.setConfiguration(conf); //Login using the keytab UserGroupInformation.loginUserFromKeytab(principal,keyTab); //Create JDBC connection Connection con = DriverManager.getConnection(jdbcURL); //Setting up date iterator SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); Calendar start = Calendar.getInstance(); start.setTime(formatter.parse(start_date)); Calendar end = Calendar.getInstance(); end.setTime(formatter.parse(end_date)); for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) { //Generate alter table SQL to add PARTITIONS. String sql = "ALTER TABLE "+table+" ADD IF NOT EXISTS PARTITION (YEAR="+PARTITION_YEAR.format(date)+", MONTH="+PARTITION_MONTH.format(date)+", DAY="+PARTITION_DAY.format(date)+")"; Statement stmt = con.createStatement(); int result = stmt.executeUpdate(sql); if (result == 0){ System.out.println("Partition Added Successfully."); }else{ System.out.println("Error occured."); } } } catch (Exception e) { e.printStackTrace(); } } }
Running the client.
Make sure you have the additional jars mentioned above on your classpath. You also need to disable useSubjectCredsOnly
option for your JRE as bellow.
java -Djavax.security.auth.useSubjectCredsOnly=false