Spark SQL DataFrame

Overview

Spark SQL is a component of Spark for the calculation of structured data. Spark SQL provides a programming abstraction called DataFrames, and DataFrames can act as distributed SQL query engines.

DataFrames

A DataFrame is a distributed set of data that is integrated in a named column. DataFrame can be understood as a table in a relational database, or a data frame in R / Python. DataFrames can be constructed from a variety of data structures, such as structured data files, hive tables, external databases, RBCs generated during Spark calculations, and so on.
The DataFrame API supports 4 languages: Scala, Java, Python, R.

Entrance: SQLContext (Starting Point: SQLContext)

The main entry for the Spark SQL program is the SQLContext class or its subclasses. Create a basic SQLContext, you only need SparkContext, create code examples are as follows:
  • Scala
 val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc) 
  • Java
 JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); 
In addition to the basic SQLContext, you can also create HiveContext. SQLContext and HiveContext distinction and contact:
  • SQLContext now only supports SQL parser (SQL-92 syntax)
  • HiveContext now supports the SQL parser and the HiveSQL parser, which defaults to the HiveSQL parser, and the user can switch to the SQL parser by configuring to run the syntax that HiveSQL does not support.
  • Use HiveContext to use Hive's UDF, read and write Hive table data and other Hive operations. SQLContext can not operate on Hive.
  • Spark SQL future version will continue to enrich the SQLContext function, so that SQLContext and HiveContext function and capacity, and ultimately the two will be unified into a Context
HiveContext packaging Hive dependency package, the HiveContext alone out, you can deploy the basic Spark when the need for Hive dependency package, you need to use HiveContext and then add a variety of Hive dependency package.
SQL parsers can be configured by configuring the spark.sql.dialect parameter. In the SQLContext can only use the Spark SQL provided "sql" parser. In the HiveContext the default parser is "hiveql", also supports "sql" parser.

Create DataFrames (Creating DataFrames)

Use SQLContext, spark application (Application) through RDD, Hive table, JSON format data and other data sources to create DataFrames. The following is an example of creating a DataFrame based on a JSON file:
  • Scala
 val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc) val df = sqlContext. read . json ( "examples/src/main/resources/people.json" ) // Displays the content of the DataFrame to stdout df. show () 
  • Java
 JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); DataFrame df = sqlContext. read (). json ( "examples/src/main/resources/people.json" ); // Displays the content of the DataFrame to stdout df. show (); 

DataFrame Operations (DataFrame Operations)

DataFrames supports Scala, Java and Python operating interfaces. Here are a few examples of operations for Scala and Java:
  • Scala
 val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc) // Create the DataFrame val df = sqlContext. read . json ( "examples/src/main/resources/people.json" ) // Show the content of the DataFrame df. show () // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df. printSchema () // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df. select ( "name" ). show () // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df. select ( df ( "name" ), df ( "age" ) + 1 ). show () // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df. filter ( df ( "age" ) > 21 ). show () // age name // 30 Andy // Count people by age df. groupBy ( "age" ). count (). show () // age count // null 1 // 19 1 // 30 1 
  • Java
 JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc) // Create the DataFrame DataFrame df = sqlContext. read (). json ( "examples/src/main/resources/people.json" ); // Show the content of the DataFrame df. show (); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df. printSchema (); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df. select ( "name" ). show (); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df. select (df. col ( "name" ), df. col ( "age" ). plus ( 1 )). show (); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df. filter (df. col ( "age" ). gt ( 21 )). show (); // age name // 30 Andy // Count people by age df. groupBy ( "age" ). count (). show (); // age count // null 1 // 19 1 // 30 1 
For more information about the DataFrame API, please refer to the API Documentation .
In addition to simple column references and expressions, DataFrames also has rich library, features include string operations, date operations, common math operations, and so on. For more information, please refer to the DataFrame Function Reference .

Run the SQL Query Program (Running SQL Queries Programmatically)

Spark Application can use SQLContext sql () method to perform SQL query operation, sql () method to return the query results for the DataFrame format. The code is as follows:
  • Scala
 val sqlContext = ... // An existing SQLContext val df = sqlContext. sql ( "SELECT * FROM table" ) 
  • Java
 SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext. sql ( "SELECT * FROM table" ) 

Interoperability between DataFrames and RDDs (Interoperating with RDDs)

Spark SQL supports two ways to convert RDDs to DataFrames:
  • Use reflection to get the schema within RDD
    • When using a known class of schema, using this reflection-based approach will make the code more concise and the effect is also very good.
  • Specify the Schema through the programming interface
    • Through the Spark SQL interface to create RDD Schema, this approach will make the code more verbose.
    • The advantage of this approach is that in the run to know the data column and the type of column, you can dynamically generate Schema

Using Schema (Inferring the Schema Using Reflection)

Spark SQL supports the automatic conversion of JavaBeans' RDDs into DataFrames. Through the reflection of the Bean to obtain the basic information, according to Bean information definition Schema. The current Spark SQL version (Spark 1.5.2) does not support nested JavaBeans and complex data types (such as List, Array). Create a JavaObject class that implements the Serializable interface that contains all the properties getters and setters to create a JavaBean. By calling createDataFrame and providing the Class object of the JavaBean, specify a Schema for an RDD. Examples are as follows:
 public static class Person implements Serializable { private String name; private int age; public  String getName () { return name; } public   void   setName (String name) { this . name = name; } public   int   getAge () { return age; } public   void   setAge ( int  age) { this . age = age; } } 
 // sc is an existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc. textFile ( "examples/src/main/resources/people.txt" ). map ( new Function<String, Person>() { public  Person call (String line) throws  Exception { String[] parts = line. split ( "," ); Person person = new Person (); person. setName (parts[ 0 ]); person. setAge (Integer. parseInt (parts[ 1 ]. trim ())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext. createDataFrame (people, Person. class ); schemaPeople. registerTempTable ( "people" ); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext. sql ( "SELECT name FROM people WHERE age >= 13 AND age <= 19" ) // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers. javaRDD (). map ( new Function<Row, String>() { public  String call (Row row) { return "Name: " + row. getString ( 0 ); } }). collect (); 

Through the programming interface designated Schema (Programmatically Specifying the Schema)

When the JavaBean can not be pre-defined, the creation of DataFrame programming is divided into three steps:
  • Create a Row format RDD from the original RDD
  • Creates a StructType that matches the Rows structure in RDD, creating a schema representing RDD from the StructType
  • Creates a DataFrame with the createDataFrame method provided by SQLContext with the method parameter RDD's Schema
Examples are as follows:
 import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc. textFile ( "examples/src/main/resources/people.txt" ); // The schema is encoded in a string String schemaString = "name age" ; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString. split ( " " )) { fields. add (DataTypes. createStructField (fieldName, DataTypes. StringType , true )); } StructType schema = DataTypes. createStructType (fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people. map ( new Function<String, Row>() { public  Row call (String record) throws  Exception { String[] fields = record. split ( "," ); return RowFactory. create (fields[ 0 ], fields[ 1 ]. trim ()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext. createDataFrame (rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame. registerTempTable ( "people" ); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext. sql ( "SELECT name FROM people" ); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results. javaRDD (). map ( new Function<Row, String>() { public  String call (Row row) { return "Name: " + row. getString ( 0 ); } }). collect (); 

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization