Custom Source in Flume
Custom Source in Flume
Example:
Sample Input File :
20
50
50
04
17
59
18
43
28
58
27
81
Sample Output File :
20
2050
205050
20505004
2050500417
205050041759
20505004175918
2050500417591843
205050041759184328
20505004175918432858
First line is concatenated with other and process continues in this way.
Here is my Java Code.
MySource.Java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private static final Logger logger = LoggerFactory.getLogger(MySource.class); 
  private String myProp;
  BufferedReader br;
  Thread tailThread;
  @Override
  public void configure(Context context) {
    String myProp = context.getString("filepath", "defaultValue");
    logger.info("Path Property==============>" + myProp);
    this.myProp = myProp;
  }
  @Override
  public void start() {
   ConcatRunner t = new ConcatRunner();
   tailThread = new Thread(t);
   tailThread.start();
  }
  @Override
  public void stop () {
  }
@Override
public Status process() throws EventDeliveryException {
 // TODO Auto-generated method stub
 return null;
}
private class ConcatRunner implements Runnable {
    @Override
    public void run() {
     Event e;
      String sCurrentLine;
     String finalFlumeString = "";
     try
       {
        br = new BufferedReader(new FileReader(myProp));
        while ((sCurrentLine = br.readLine()) != null) {
     System.out.println(sCurrentLine);
     finalFlumeString = finalFlumeString +  sCurrentLine ; // Concatinating String
     e = EventBuilder.withBody(finalFlumeString,
                  Charset.forName("UTF-8"));
     getChannelProcessor().processEvent(e);
     Thread.sleep(3000);
    }
       }
       catch(Exception ex){
        System.out.println("Exception in Reading File" + ex.getMessage());        
       }
       try {
     if (br != null)br.close();
    } catch (IOException ex) {
     ex.printStackTrace();
    }
            }
} //ConcatRunner Over 
}
FlumeConfig.conf File :
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = MySource
a1.sources.r1.restart = true
a1.sources.r1.filepath = /root/input.txt
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.fileType = DataStream
#channel
a1.channels.c1.type = memory
#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Before you proceed for running job , create Jar of your  java project and place it into lib folder of Flume(/usr/lib/flume/lib).
Once your are done with above then fire following command form shell.
flume-ng agent -n a1  -f FlumeConfig.conf
Output of Job:
<This Part is optional>
Apart from above , I have created a shell script which will generate 
input for you.This script expects two parameter 1. No of Rows 2. Delay 
Time for generating next row, I have delay time as 2 sec. You can change
 it as per your need.Here is a sample shell script
#!/bin/sh
echo "Please enter the size in terms of rows you want to generate the random file data"
read rows
#rows=$1
echo "Please enter the delay time needed in between the writing of the rows"
read delayTime
#Delete the tmp & generatedRandomDataFile files if they already exist
rm -f tmp
rm -f generatedRandomDataFile
start=$(date +%s)
for i in $(seq $rows)
do
tr -dc 0-9 < /dev/urandom | head -c 2 > tmp
gawk '$1=$1' tmp >> generatedRandomDataFile
sleep $delayTime
done
end=$(date +%s)
DIFF=$(( $end - $start ))
echo "File generated is `pwd`/generatedRandomDataFile"
echo "The file generation took $DIFF seconds"
Here is sample output of shell script.
Reference :
https://flume.apache.org/FlumeDeveloperGuide.html 
Let me know for any suggestion.
Cheers!!!!!!!!!!!!!!!


Commentaires
Enregistrer un commentaire