Analyze Big data using Apache Spark SQL
Apache Spark SQL is a powerful data processing
engine and in-memory computing framework to perform processing quickly
and analyze large vloume of data . We fetch the elements of a RDD into a
Spark SQL table, and query on that table. We can write only SELECT
queries on the Spark SQL table and no other SQL operations are possible.
Select query on Spark SQL returns RDD only. It has Rich API’s
supporting in 3 different languages (Java, Scala and Python).
We use Spark SQL extensively to perform ETL on Big Data where we find it convenient to dispense with writing complex code using Spark.
Working with Spark SQL:
As in Spark, to start with Spark SQL first we have to get the data into RDD(Resilient Distributed Data sets). Once the RDD is available, we create a Spark SQL table with desired RDD elements as table records. This we achieve using SparkSqlContext. Now we implement business logic writing appropriate SELECT queries on Spark SQL tables . The output of the query will be another RDD and the elements of output RDD will be saved as a Text file, or as an Object file as we need..
Sample of Apache Spark SQL:
Since Spark SQL has API’s designed in Java language, let us start with some sample example using Java.
To try and test the below code, ensure following are ready in your machine.
For the development:
Download Spark 1.2 and Spark SQL 1.2 jars
Eclipse IDE
To run the code:
Cloudera CDH 5.3.*
No need for any additional configuration since Cloudera provides CDH with the full set up of Spark SQL
We can run the Spark SQL programs on our development IDE’s also, changing the file path. While developing I would suggest to adapt this approach only. Once the development is totally completed , to run the program with large size data (big data), run in cluster mode.
SparkSqlExample.java:
package com.bimarian.sparksql.spark_proj;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
public class SparkSqlExample {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName(“Spark_Sql “).setMaster(“local”);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaSQLContext sqlContext = new JavaSQLContext(sc);
String inputPath = args[0];
JavaRDD<Person> rdd = sc.textFile(inputPath).map(new
Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(“,”);
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1]));
return person;
}
});
//Registering the RDD as a table using Schema RDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rdd, Person.class);
schemaRDD.registerTempTable(“Person_Data”);
//Querying the “Person_Data” table using sqlContext.sql() method
String sql = “SELECT name, age FROM Person_Data “;
JavaSchemaRDD output = sqlContext.sql(sql);
/* sql() method returns a SchemaRDD which contains Row objects. Each Row represents one record. Here we have to get the elements from Row objects and convert them as Person Object */
JavaRDD<Person> finalOutput = output.map(new Function<Row, Person>() {
public Person call(Row row) throws Exception {
Person bean = new Person();
bean.setName(row.getString(0));
bean.setAge(row.getInt(1));
return bean;
}
});
System.out.println(finalOutput.collect());
finalOutput.saveAsTextFile(“SparkSQL_wordcount.csv”);
}
}
Now generate the jar file and execute the jar , passing input file. Command to run the above program:
Spark -submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2
Spark -submit –class com.bimarian.sparksql.spark_proj.SparkSqlExample –master local Spark
SQLwordcount-0.0.1-SNAPSHOT.jar input.txt 2
Output:
Sysout statement:
[james,31, ronald,28, stella,22, david,38]
We can observe the same output as Hadoop part files in SparkSQL_wordcount.csv folder, since we have called counts.saveAsTextFile(“Spark SQL_wordcount.csv”) in the above code; the output will be written to Spark SQL_wordcount.csv folder.
We use Spark SQL extensively to perform ETL on Big Data where we find it convenient to dispense with writing complex code using Spark.
Working with Spark SQL:
As in Spark, to start with Spark SQL first we have to get the data into RDD(Resilient Distributed Data sets). Once the RDD is available, we create a Spark SQL table with desired RDD elements as table records. This we achieve using SparkSqlContext. Now we implement business logic writing appropriate SELECT queries on Spark SQL tables . The output of the query will be another RDD and the elements of output RDD will be saved as a Text file, or as an Object file as we need..
Sample of Apache Spark SQL:
Since Spark SQL has API’s designed in Java language, let us start with some sample example using Java.
To try and test the below code, ensure following are ready in your machine.
For the development:
Download Spark 1.2 and Spark SQL 1.2 jars
Eclipse IDE
To run the code:
Cloudera CDH 5.3.*
No need for any additional configuration since Cloudera provides CDH with the full set up of Spark SQL
We can run the Spark SQL programs on our development IDE’s also, changing the file path. While developing I would suggest to adapt this approach only. Once the development is totally completed , to run the program with large size data (big data), run in cluster mode.
SparkSqlExample.java:
package com.bimarian.sparksql.spark_proj;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
public class SparkSqlExample {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName(“Spark_Sql “).setMaster(“local”);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaSQLContext sqlContext = new JavaSQLContext(sc);
String inputPath = args[0];
JavaRDD<Person> rdd = sc.textFile(inputPath).map(new
Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(“,”);
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1]));
return person;
}
});
//Registering the RDD as a table using Schema RDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rdd, Person.class);
schemaRDD.registerTempTable(“Person_Data”);
//Querying the “Person_Data” table using sqlContext.sql() method
String sql = “SELECT name, age FROM Person_Data “;
JavaSchemaRDD output = sqlContext.sql(sql);
/* sql() method returns a SchemaRDD which contains Row objects. Each Row represents one record. Here we have to get the elements from Row objects and convert them as Person Object */
JavaRDD<Person> finalOutput = output.map(new Function<Row, Person>() {
public Person call(Row row) throws Exception {
Person bean = new Person();
bean.setName(row.getString(0));
bean.setAge(row.getInt(1));
return bean;
}
});
System.out.println(finalOutput.collect());
finalOutput.saveAsTextFile(“SparkSQL_wordcount.csv”);
}
}
Now generate the jar file and execute the jar , passing input file. Command to run the above program:
Spark -submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2
Spark -submit –class com.bimarian.sparksql.spark_proj.SparkSqlExample –master local Spark
SQLwordcount-0.0.1-SNAPSHOT.jar input.txt 2
Output:
Sysout statement:
[james,31, ronald,28, stella,22, david,38]
We can observe the same output as Hadoop part files in SparkSQL_wordcount.csv folder, since we have called counts.saveAsTextFile(“Spark SQL_wordcount.csv”) in the above code; the output will be written to Spark SQL_wordcount.csv folder.
You are doing a great job by sharing useful information about Apache Spark course. It is one of the post to read and improve my knowledge in Apache Spark.You can check our SQL commands in Apache Spark,tutorial for more information about Apache Spark SQL Commands.
RépondreSupprimerThis post is really awesome one,keep sharing more blogs.
RépondreSupprimerbig data online training