How to develop custom flume interceptor.

Flume interceptor is a powerful way of modifying flume events in-flight. As the name suggests, they intercept the message on flume channel. You can also feed inputs to your interceptor through configurations. Flume also provides several out of the box interceptors such as "Timestamp Interceptor". But when you need further capabilities, you can develop your own custom interceptors.

In one of my projects I was required to develop a data pipeline between Kafka and HDFS. There were a couple of challenges, which I had to overcome to get this data from Kafka to HDFS.

1. Payload on Kafka was packed using massagepack. Data written to HDFS has to be plain and in comma seperated format so that HIVE can read them in a later stage.

2. Data landed on HDFS nedded to be structured in a way that they could be identified by HIVE in a latter stage, using a timestamp present on payload as a partition key. (Hive partition on HDFS is litterally a folder structure with corresponding partition keys.)

3. Partition interval should be configurable across the environments (e.g. 10mins for PROD and 30mins for DEV)

Flume Kafka source and HDFS sink provided me the basic tapping from Kafka to HDFS. But in order to address above 3 challenges, I had to come up with a custom flume interceptor. I will summarize the steps involved and a sample code used to address these issues.

Custom Interceptor provided me answers to all three problems.

1. Intercept event and unpack the body, generate comma seperated string.

2. Timestamp can be extracted from step 1, round up the timestamp to partition interval and introduce that as a new header to the event. (Flume events consist of two components: the header and the body. Header values can be utilized on HDFS sinks for creating HDFS paths which will later corresponds to Hive partitions.)

3. Define the partion interval as a config to the custom interceptor.

I am using cloudera flume distribution, so first thing is to define cloudera maven repo in your POM file.

<repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

Define flume dependency in your POM file.

<dependency>
    <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    <version>1.6.0-cdh5.11.0</version>
</dependency>

Let's look at the custom interceptor now.

First thing is you need to implement the org.apache.flume.interceptor.Interceptor.

public class CustomFlumeInterceptor implements Interceptor {
  ...

How do you intercept and do your stuff? You need to Override the intercept(Event event) method.

@Override
public Event intercept(Event event)
{
  try {
    byte[] eventBody = event.getBody();  //Reads the event body as a byte array. 
    //Do your stuff here.
    //For me it was unpacking this byte array and creating a comma seperated string. 
    //Then convert the comma seperated string back to byte array.
    
    //Re-Set the new eventbody
    event.setBody(eventBody);
    
    //Calculating partition header
    //I will skip the actual logic for rounding up the timestamp obtained from payload.
    String partition = "20200714120010"; //Dummy Partition Value for demonstration
 
    Map<String, String> headers = event.getHeaders(); //Reads the existing headers 
    
    //Add the partition value calculated above as a header to existing headers.
    //(header key 'partition')
    headers.put("partition", partition);
     
    //Re-Set the new headers. 
    event.setHeaders(headers);
    
    return event;
    
  } catch (Exception e) {
    // TODO Auto-generated catch block
  }
}

How do I pass the config values to my interceptor?

It is done through modifying the Builder class, which is responsible for instantiating the interceptor. The configure(Context context) method provides access to properties from the Flume .conf. I will retrieve my partition interval and use it in the intercept(Event event) method:

As stated above overriding build method in builder class is used to instantiate my custom interceptor, which is CustomFlumeInterceptor().

public static class Builder implements Interceptor.Builder
  {
    @Override
    public void configure(Context context) {
      //Setting the config value for static variable on my interceptor.
      PARTITIONINTERVALMIN = context.getInteger("partitionInterval");   
    }

    @Override
    public Interceptor build() {
      //Instantiate my custom interceptor
      return new CustomFlumeInterceptor(); 
    }
  }

Ok, here is my complete code.

import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.*;

public class CustomFlumeInterceptor implements Interceptor {
  
  //variable to hold the value retrieving from configs.
  public static  int PARTITIONINTERVALMIN = 10;  

  @Override
  public Event intercept(Event event)
  {
    try {
      byte[] eventBody = event.getBody();  //Reads the event body as a byte array. 
      //Do your stuff here.
      //For me it was unpacking this byte array and creating a comma seperated string. 
      //Then convert the comma seperated string back to byte array.
      
      //Re-Set the new eventbody
      event.setBody(eventBody);
      
      //Calculating partition header
      //I will skip the actual logic for rounding up the timestamp obtained from payload.
      String partition = "20200714120010"; //Dummy Partition Value for demonstration
 
      Map<String, String> headers = event.getHeaders(); //Reads the existing headers 
      
      //Add the partition value calculated above as a header to existing headers.
      //(header key 'partition')
      headers.put("partition", partition);
       
       //Re-Set the new headers. 
      event.setHeaders(headers);
      
      return event;
      
    } catch (Exception e) {
      // TODO Auto-generated catch block
    }
  }

  @Override
  public List<Event> intercept(List<Event> events)
  {
    for (Event event : events){
       intercept(event);
    }
    return events;
  }


  @Override
  public void close()
  {

  }

  @Override
  public void initialize()
  {

  }

  public static class Builder implements Interceptor.Builder
  {
    @Override
    public void configure(Context context) {
      PARTITIONINTERVALMIN = context.getInteger("partitionInterval");
    }

    @Override
    public Interceptor build() {
      return new CustomFlumeInterceptor();
    }
  }
}

How do you load your custom interptor on flume?

Place your buid artifact (jar file) together with any third-party dpendency jars (if you had to use any: in my case it was messagepack jar as I was using it.) on "/var/lib/flume-ng/plugins.d/" directory in each host where Flume is running.

Now you need to tell flume agents to use this interceptor and provide the config values which are expected by the interceptor. (replace the agent name, source, interceptor name, config name to values releavnt to your build.)

<agent-name>.sources.<source>.interceptors = i1
<agent-name>.sources.<source>.interceptors.i1.type = <CustomFlumeInterceptot>$Builder
<agent-name>.sources.<source>.interceptors.i1.<partitionInterval> = 10

How do you create a folder structure on HDFS for supporting Hive partitions?

Define the HDFS path to match with your Hive Table Location and include the header name(s) from your custom interceptor in a way that they will create the folder structure for hive partitions. (key1=value1/key2=value2/...)

<agent-name>.sinks.<sink>.type = hdfs
<agent-name>.sinks.<sink>.hdfs.path = hdfs://<name-node>/<hive-warehouse>/<table>/<partition_key>=%{partition}/...
Tags Flume, Custom Interceptor, Java, Flume Headers, HDFS Sink, Kafka Source, Custom Flume Configurations

Archives

July, 2020