Flume interceptor is a powerful way of modifying flume events in-flight. As the name suggests, they intercept the message on flume channel. You can also feed inputs to your interceptor through configurations. Flume also provides several out of the box interceptors such as "Timestamp Interceptor". But when you need further capabilities, you can develop your own custom interceptors.
In one of my projects I was required to develop a data pipeline between Kafka and HDFS. There were a couple of challenges, which I had to overcome to get this data from Kafka to HDFS.
1. Payload on Kafka was packed using massagepack. Data written to HDFS has to be plain and in comma seperated format so that HIVE can read them in a later stage.
2. Data landed on HDFS nedded to be structured in a way that they could be identified by HIVE in a latter stage, using a timestamp present on payload as a partition key. (Hive partition on HDFS is litterally a folder structure with corresponding partition keys.)
3. Partition interval should be configurable across the environments (e.g. 10mins for PROD and 30mins for DEV)
Flume Kafka source and HDFS sink provided me the basic tapping from Kafka to HDFS. But in order to address above 3 challenges, I had to come up with a custom flume interceptor. I will summarize the steps involved and a sample code used to address these issues.
Custom Interceptor provided me answers to all three problems.
1. Intercept event and unpack the body, generate comma seperated string.
2. Timestamp can be extracted from step 1, round up the timestamp to partition interval and introduce that as a new header to the event. (Flume events consist of two components: the header and the body. Header values can be utilized on HDFS sinks for creating HDFS paths which will later corresponds to Hive partitions.)
3. Define the partion interval as a config to the custom interceptor.
I am using cloudera flume distribution, so first thing is to define cloudera maven repo in your POM file.
<repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository>
Define flume dependency in your POM file.
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.6.0-cdh5.11.0</version> </dependency>
Let's look at the custom interceptor now.
First thing is you need to implement the org.apache.flume.interceptor.Interceptor
.
public class CustomFlumeInterceptor implements Interceptor { ...
How do you intercept and do your stuff? You need to Override
the intercept(Event event)
method.
@Override public Event intercept(Event event) { try { byte[] eventBody = event.getBody(); //Reads the event body as a byte array. //Do your stuff here. //For me it was unpacking this byte array and creating a comma seperated string. //Then convert the comma seperated string back to byte array. //Re-Set the new eventbody event.setBody(eventBody); //Calculating partition header //I will skip the actual logic for rounding up the timestamp obtained from payload. String partition = "20200714120010"; //Dummy Partition Value for demonstration Map<String, String> headers = event.getHeaders(); //Reads the existing headers //Add the partition value calculated above as a header to existing headers. //(header key 'partition') headers.put("partition", partition); //Re-Set the new headers. event.setHeaders(headers); return event; } catch (Exception e) { // TODO Auto-generated catch block } }
How do I pass the config values to my interceptor?
It is done through modifying the Builder
class, which is responsible for instantiating the interceptor. The configure(Context context)
method provides access to properties from the Flume .conf. I will retrieve my partition interval and use it in the intercept(Event event)
method:
As stated above overriding build method in builder class is used to instantiate my custom interceptor, which is CustomFlumeInterceptor()
.
public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { //Setting the config value for static variable on my interceptor. PARTITIONINTERVALMIN = context.getInteger("partitionInterval"); } @Override public Interceptor build() { //Instantiate my custom interceptor return new CustomFlumeInterceptor(); } }
Ok, here is my complete code.
import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.*; public class CustomFlumeInterceptor implements Interceptor { //variable to hold the value retrieving from configs. public static int PARTITIONINTERVALMIN = 10; @Override public Event intercept(Event event) { try { byte[] eventBody = event.getBody(); //Reads the event body as a byte array. //Do your stuff here. //For me it was unpacking this byte array and creating a comma seperated string. //Then convert the comma seperated string back to byte array. //Re-Set the new eventbody event.setBody(eventBody); //Calculating partition header //I will skip the actual logic for rounding up the timestamp obtained from payload. String partition = "20200714120010"; //Dummy Partition Value for demonstration Map<String, String> headers = event.getHeaders(); //Reads the existing headers //Add the partition value calculated above as a header to existing headers. //(header key 'partition') headers.put("partition", partition); //Re-Set the new headers. event.setHeaders(headers); return event; } catch (Exception e) { // TODO Auto-generated catch block } } @Override public List<Event> intercept(List<Event> events) { for (Event event : events){ intercept(event); } return events; } @Override public void close() { } @Override public void initialize() { } public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { PARTITIONINTERVALMIN = context.getInteger("partitionInterval"); } @Override public Interceptor build() { return new CustomFlumeInterceptor(); } } }
How do you load your custom interptor on flume?
Place your buid artifact (jar file) together with any third-party dpendency jars (if you had to use any: in my case it was messagepack jar as I was using it.) on "/var/lib/flume-ng/plugins.d/" directory in each host where Flume is running.
Now you need to tell flume agents to use this interceptor and provide the config values which are expected by the interceptor. (replace the agent name, source, interceptor name, config name to values releavnt to your build.)
<agent-name>.sources.<source>.interceptors = i1 <agent-name>.sources.<source>.interceptors.i1.type = <CustomFlumeInterceptot>$Builder <agent-name>.sources.<source>.interceptors.i1.<partitionInterval> = 10
How do you create a folder structure on HDFS for supporting Hive partitions?
Define the HDFS path to match with your Hive Table Location and include the header name(s) from your custom interceptor in a way that they will create the folder structure for hive partitions. (key1=value1/key2=value2/...)
<agent-name>.sinks.<sink>.type = hdfs <agent-name>.sinks.<sink>.hdfs.path = hdfs://<name-node>/<hive-warehouse>/<table>/<partition_key>=%{partition}/...