Spark reads the Hbase table data and implements a similar groupByKey operation

I. Overview
Program running environment is very important, this test is based on:
Nonsense not much to say, directly on demand

Andy column=baseINFO:age,  value=21
Andy column=baseINFO:gender,  value=0 
Andy column=baseINFO:telphone_number, value=110110110 
Tom  column=baseINFO:age, value=18 
Tom  column=baseINFO:gender, value=1 
Tom  column=baseINFO:telphone_number, value=120120120
 As shown in the table above, will be used to group the spark, to achieve this effect:

[Andy, (21,0,110110110)]
[Tom, (18,1,120120120)]
Demand is relatively simple, mainly familiar with the process of running the program

Second, the specific code

package com.union.bigdata.spark.hbase;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import scala.Tuple10;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class ReadHbase {

    private static String appName = "ReadTable";

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();

    //we can also run it at local:"local[3]"  the number 3 means 3 threads
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master"); 
        conf.set("", "2181"); 
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

        String scanToString = "";
        try {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            scanToString = Base64.encodeBytes(proto.toByteArray());
        } catch (IOException io) {

        for (int i = 0; i < 2; i++) {
            try {
                String tableName = "VIPUSER";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
                conf.set(TableInputFormat.SCAN, scanToString);

                //get the Result of query from the Table of Hbase
                JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
                        TableInputFormat.class, ImmutableBytesWritable.class,

                //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]
                JavaPairRDD<String, List<Integer>> art_scores = hBaseRDD.mapToPair(
                        new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<Integer>>() {
                            public Tuple2<String, List<Integer>> call(Tuple2<ImmutableBytesWritable, Result> results) {

                                List<Integer> list = new ArrayList<Integer>();

                                byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
                                byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
                                byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

                //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on 

                                return new Tuple2<String, List<Integer>>(Bytes.toString(results._1().get()), list);

                //switch to Cartesian product 
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart = art_scores.cartesian(art_scores);

                //use Row Key to delete the repetition from the last step "Cartesian product"  
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart2 = cart.filter(
                        new Function<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>, Boolean>() {
                            public Boolean call(Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> tuple2Tuple2Tuple2) throws Exception {

                                return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;

                System.out.println("Create the List 'collect'...");

        //get the result we need
                 List<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>> collect = cart2.collect();
                 System.out.println(collect.size() > i ? collect.get(i):"STOP");

                 if (collect.size() > i ) break;
            } catch (Exception e) {
Third, the process of running the program analysis
1, spark self-test and Driver and excutor start the process
Instantiate a SparkContext (if spark2.x is initialized here is a SparkSession object), this time to start the SecurityManager thread to check the user permissions, OK after the creation of sparkDriver thread, spark the underlying remote communication module (akka framework to achieve) to start and listen SparkDriver, followed by the sparkEnv object to register the BlockManagerMaster thread, by its implementation class object to monitor the running resource
2, zookeeper and Hbase self-test and start
After the first step successfully completed by the sparkContext object to the instance to start the program to access the Hbase entry, after triggering zookeeper to complete their own series of self-test activities, including user permissions, operating system, data directory, all OK after the client connection object , Then by the Hbase ClientCnxn object to establish a complete connection with the master
3, spark job running
The program began to call the spark of the action class method, such as calling the collect, will trigger the implementation of the job, the process of online information is very detailed, nothing more than DAGScheduler engage in a lot of things, even with a lot of threads, such as TaskSetManager, TaskScheduler Wait, finish the job, and return the result set
4, the end of the program
After the result set is returned correctly, the sparkContext calls the stop () method with the reflection, which also triggers a series of stop operations. The main threads have these: BlockManager, ShutdownHookManager, followed by the release of the actor's actions, etc. Finally, everything ends, Data and directories are deleted and resources are released
Copyright statement: This article is the bloggers original article, without the blog owner may not be reproduced.


Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch