Spark SQL DataFrame
Overview
Spark SQL is a component of Spark for the calculation of structured data. Spark SQL provides a programming abstraction called DataFrames, and DataFrames can act as distributed SQL query engines.
DataFrames
A DataFrame is a distributed set of data that is integrated in a named column. DataFrame can be understood as a table in a relational database, or a data frame in R / Python.
DataFrames can be constructed from a variety of data structures, such
as structured data files, hive tables, external databases, RBCs
generated during Spark calculations, and so on.
The DataFrame API supports 4 languages: Scala, Java, Python, R.
The DataFrame API supports 4 languages: Scala, Java, Python, R.
Entrance: SQLContext (Starting Point: SQLContext)
The main entry for the Spark SQL program is the SQLContext class or its subclasses. Create a basic SQLContext, you only need SparkContext, create code examples are as follows:
- Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc)
- Java
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc);
In addition to the basic SQLContext, you can also create HiveContext. SQLContext and HiveContext distinction and contact:
- SQLContext now only supports SQL parser (SQL-92 syntax)
- HiveContext now supports the SQL parser and the HiveSQL parser, which defaults to the HiveSQL parser, and the user can switch to the SQL parser by configuring to run the syntax that HiveSQL does not support.
- Use HiveContext to use Hive's UDF, read and write Hive table data and other Hive operations. SQLContext can not operate on Hive.
- Spark SQL future version will continue to enrich the SQLContext function, so that SQLContext and HiveContext function and capacity, and ultimately the two will be unified into a Context
HiveContext packaging Hive dependency package, the HiveContext alone
out, you can deploy the basic Spark when the need for Hive dependency
package, you need to use HiveContext and then add a variety of Hive
dependency package.
SQL parsers can be configured by configuring the spark.sql.dialect parameter. In the SQLContext can only use the Spark SQL provided "sql" parser. In the HiveContext the default parser is "hiveql", also supports "sql" parser.
Create DataFrames (Creating DataFrames)
Use SQLContext, spark application (Application) through RDD, Hive
table, JSON format data and other data sources to create DataFrames. The following is an example of creating a DataFrame based on a JSON file:
- Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc) val df = sqlContext. read . json ( "examples/src/main/resources/people.json" ) // Displays the content of the DataFrame to stdout df. show ()
- Java
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); DataFrame df = sqlContext. read (). json ( "examples/src/main/resources/people.json" ); // Displays the content of the DataFrame to stdout df. show ();
DataFrame Operations (DataFrame Operations)
DataFrames supports Scala, Java and Python operating interfaces. Here are a few examples of operations for Scala and Java:
- Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org. apache . spark . sql . SQLContext (sc) // Create the DataFrame val df = sqlContext. read . json ( "examples/src/main/resources/people.json" ) // Show the content of the DataFrame df. show () // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df. printSchema () // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df. select ( "name" ). show () // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df. select ( df ( "name" ), df ( "age" ) + 1 ). show () // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df. filter ( df ( "age" ) > 21 ). show () // age name // 30 Andy // Count people by age df. groupBy ( "age" ). count (). show () // age count // null 1 // 19 1 // 30 1
- Java
JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc) // Create the DataFrame DataFrame df = sqlContext. read (). json ( "examples/src/main/resources/people.json" ); // Show the content of the DataFrame df. show (); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df. printSchema (); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df. select ( "name" ). show (); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df. select (df. col ( "name" ), df. col ( "age" ). plus ( 1 )). show (); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df. filter (df. col ( "age" ). gt ( 21 )). show (); // age name // 30 Andy // Count people by age df. groupBy ( "age" ). count (). show (); // age count // null 1 // 19 1 // 30 1
For more information about the DataFrame API, please refer to the API Documentation .
In addition to simple column references and expressions, DataFrames
also has rich library, features include string operations, date
operations, common math operations, and so on. For more information, please refer to the DataFrame Function Reference .
Run the SQL Query Program (Running SQL Queries Programmatically)
Spark Application can use SQLContext sql () method to perform SQL query
operation, sql () method to return the query results for the DataFrame
format. The code is as follows:
- Scala
val sqlContext = ... // An existing SQLContext val df = sqlContext. sql ( "SELECT * FROM table" )
- Java
SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext. sql ( "SELECT * FROM table" )
Interoperability between DataFrames and RDDs (Interoperating with RDDs)
Spark SQL supports two ways to convert RDDs to DataFrames:
- Use reflection to get the schema within RDD
- When using a known class of schema, using this reflection-based approach will make the code more concise and the effect is also very good.
- Specify the Schema through the programming interface
- Through the Spark SQL interface to create RDD Schema, this approach will make the code more verbose.
- The advantage of this approach is that in the run to know the data column and the type of column, you can dynamically generate Schema
Using Schema (Inferring the Schema Using Reflection)
Spark SQL supports the automatic conversion of JavaBeans' RDDs into DataFrames. Through the reflection of the Bean to obtain the basic information, according to Bean information definition Schema. The current Spark SQL version (Spark 1.5.2) does not support nested JavaBeans and complex data types (such as List, Array).
Create a JavaObject class that implements the Serializable interface
that contains all the properties getters and setters to create a
JavaBean. By calling createDataFrame and providing the Class object of the JavaBean, specify a Schema for an RDD. Examples are as follows:
public static class Person implements Serializable { private String name; private int age; public String getName () { return name; } public void setName (String name) { this . name = name; } public int getAge () { return age; } public void setAge ( int age) { this . age = age; } }
// sc is an existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc. textFile ( "examples/src/main/resources/people.txt" ). map ( new Function<String, Person>() { public Person call (String line) throws Exception { String[] parts = line. split ( "," ); Person person = new Person (); person. setName (parts[ 0 ]); person. setAge (Integer. parseInt (parts[ 1 ]. trim ())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext. createDataFrame (people, Person. class ); schemaPeople. registerTempTable ( "people" ); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext. sql ( "SELECT name FROM people WHERE age >= 13 AND age <= 19" ) // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers. javaRDD (). map ( new Function<Row, String>() { public String call (Row row) { return "Name: " + row. getString ( 0 ); } }). collect ();
Through the programming interface designated Schema (Programmatically Specifying the Schema)
When the JavaBean can not be pre-defined, the creation of DataFrame programming is divided into three steps:
- Create a Row format RDD from the original RDD
- Creates a StructType that matches the Rows structure in RDD, creating a schema representing RDD from the StructType
- Creates a DataFrame with the createDataFrame method provided by SQLContext with the method parameter RDD's Schema
Examples are as follows:
import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org. apache . spark . sql . SQLContext (sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc. textFile ( "examples/src/main/resources/people.txt" ); // The schema is encoded in a string String schemaString = "name age" ; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString. split ( " " )) { fields. add (DataTypes. createStructField (fieldName, DataTypes. StringType , true )); } StructType schema = DataTypes. createStructType (fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people. map ( new Function<String, Row>() { public Row call (String record) throws Exception { String[] fields = record. split ( "," ); return RowFactory. create (fields[ 0 ], fields[ 1 ]. trim ()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext. createDataFrame (rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame. registerTempTable ( "people" ); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext. sql ( "SELECT name FROM people" ); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results. javaRDD (). map ( new Function<Row, String>() { public String call (Row row) { return "Name: " + row. getString ( 0 ); } }). collect ();
Commentaires
Enregistrer un commentaire