Custom Source in Flume

Custom Source in Flume

Flume provides a way where you can write your own source.As we know that there are default source type available in flume like exec,spoolDir,Tiwtter. Here I have a tried small demonstration for custom flume source.In this example I have written MySource java class which will read single line from input and concatenate them as output and it will pass it to channel.
Example:
Sample Input File :

20
50
50
04
17
59
18
43
28
58
27
81
Sample Output File :
 
20
2050
205050
20505004
2050500417
205050041759
20505004175918
2050500417591843
205050041759184328
20505004175918432858
First line is concatenated with other and process continues in this way.
Here is my Java Code.
MySource.Java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private static final Logger logger = LoggerFactory.getLogger(MySource.class);
  private String myProp;
  BufferedReader br;
  Thread tailThread;
  @Override
  public void configure(Context context) {
    String myProp = context.getString("filepath", "defaultValue");
    logger.info("Path Property==============>" + myProp);
    this.myProp = myProp;
  }
  @Override
  public void start() {
 ConcatRunner t = new ConcatRunner();
 tailThread = new Thread(t);
 tailThread.start();
}
  @Override
  public void stop () {
  }
@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
return null;
}
private class ConcatRunner implements Runnable {
    @Override
    public void run() {
   Event e;
      String sCurrentLine;
   String finalFlumeString = "";
   
   try
     {
      br = new BufferedReader(new FileReader(myProp));
     while ((sCurrentLine = br.readLine()) != null) {
  System.out.println(sCurrentLine);
  finalFlumeString = finalFlumeString +  sCurrentLine ; // Concatinating String
  e = EventBuilder.withBody(finalFlumeString,
               Charset.forName("UTF-8"));
  getChannelProcessor().processEvent(e);
  Thread.sleep(3000);
  }
     }
     catch(Exception ex){
      System.out.println("Exception in Reading File" + ex.getMessage());      
     }
     try {
  if (br != null)br.close();
  } catch (IOException ex) {
  ex.printStackTrace();
  }
            }
} //ConcatRunner Over 
}
FlumeConfig.conf File :

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#source
a1.sources.r1.type = MySource
a1.sources.r1.restart = true
a1.sources.r1.filepath = /root/input.txt
#sink

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.fileType = DataStream
#channel
a1.channels.c1.type = memory

#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Before you proceed for running job , create Jar of your  java project and place it into lib folder of Flume(/usr/lib/flume/lib).
Once your are done with above then fire following command form shell.
flume-ng agent -n a1  -f FlumeConfig.conf

Output of Job:



<This Part is optional>
Apart from above , I have created a shell script which will generate input for you.This script expects two parameter 1. No of Rows 2. Delay Time for generating next row, I have delay time as 2 sec. You can change it as per your need.Here is a sample shell script
#!/bin/sh
echo "Please enter the size in terms of rows you want to generate the random file data"
read rows
#rows=$1
echo "Please enter the delay time needed in between the writing of the rows"
read delayTime
#Delete the tmp & generatedRandomDataFile files if they already exist
rm -f tmp
rm -f generatedRandomDataFile
start=$(date +%s)
for i in $(seq $rows)
do
tr -dc 0-9 < /dev/urandom | head -c 2 > tmp
gawk '$1=$1' tmp >> generatedRandomDataFile
sleep $delayTime
done
end=$(date +%s)
DIFF=$(( $end - $start ))
echo "File generated is `pwd`/generatedRandomDataFile"
echo "The file generation took $DIFF seconds"
Here is sample output of shell script.
Reference :
https://flume.apache.org/FlumeDeveloperGuide.html 
Let me know for any suggestion.
Cheers!!!!!!!!!!!!!!!

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization