Spark SQL 1.3.0 DataFrame introduced, used and provided some complete data write

Since March 2013, Spark SQL has become the largest Spark component other than Spark Core. In addition to taking over Shark's baton and continuing to provide high-performance SQL on Hadoop solutions for Spark users, it also provides Spark with a versatile, efficient and multi-disciplined structured data processing capability. In the recently released version 1.3.0, Spark SQL two upgrades were interpreted most vividly. DataFrame
For ease of use, compared to the traditional MapReduce API, said Spark's RDD API has an infinite leap in the past is not an exaggeration. However, for those who do not have MapReduce and functional programming experience, the RDD API still has a certain threshold. On the other hand, data scientists are familiar with the R, Pandas and other traditional data framework, although providing an intuitive API, but limited to stand-alone processing, can not do big data scene. In order to solve this contradiction, Spark SQL 1.3.0 in the original SchemaRDD based on the R and Pandas style similar to the DataFrame API. The new DataFrame API not only significantly reduces the learning threshold for regular developers, but also supports Scala, Java and Python in three languages. More importantly, due to self-extracting from SchemaRDD, DataFrame is naturally applicable to distributed large data scenes.
What is the DataFrame?
In Spark, DataFrame is an RDD-based distributed data set, similar to the traditional database in the two-dimensional form. The main difference between a DataFrame and RDD is that the former has schema metadata, that is, each column of a two-dimensional table dataset represented by a DataFrame has a name and a type. This allows the Spark SQL to gain insight into the structure of the information, so as to hide behind the DataFrame data source and the role of the DataFrame on the transformation of the targeted optimization, and ultimately achieve a substantial increase in operational efficiency goals. In contrast, RBC, because there is no way to know the specific internal structure of the stored data elements, Spark Core only at the stage level for simple, common pipeline optimization.
Create a DataFrame
In Spark SQL, developers can easily convert various internal and external standalone, distributed data to DataFrame. The following Python sample code fully demonstrates the richness and ease of use of DataFrame data sources in Spark SQL 1.3.0:
  1. # Construct a DataFrame from the users table in Hive
  2. Users = sqlContext.table ("users")

  3. # Load the JSON file on S3
  4. Logs = sqlContext.load ("s3n: //path/to/data.json", "json")

  5. # Load the Parquet file on HDFS
  6. Clicks = sqlContext.load ("hdfs: //path/to/data.parquet", "parquet")

  7. # Access MySQL via JDBC
  8. Comments = sqlContext.jdbc ("jdbc: mysql: // localhost / comments", "user")

  9. # Change the normal RDD to a DataFrame
  10. Rdd = sparkContext.textFile ("article.txt") \
  11. .flatMap (lambda line: line.split ()) \
  12. .map (lambda word: (word, 1)) \
  13. .reduceByKey (lambda a, b: a + b) \
  14. WordCounts = sqlContext.createDataFrame (rdd, ["word", "count"])

  15. # Change the local data container to a DataFrame
  16. Data = [("Alice", 21), ("Bob", 24)]
  17. People = sqlContext.createDataFrame (data, ["name", "age"])

  18. # Convert Pandas DataFrame to Spark DataFrame (Python API unique feature)
  19. SparkDF = sqlContext.createDataFrame (pandasDF)
Copy the code

Visible, from the Hive table to the external data source API support for a variety of data sources (JSON, Parquet, JDBC), and then to the RDD and even a variety of local data sets, can be quickly and easily loaded, converted to DataFrame. These features also exist in Spark SQL's Scala API and Java API.
Use a DataFrame
Similar to R, Pandas, Spark DataFrame also provides a complete set of DSL for manipulating data. These DSLs are semantically similar to SQL query queries (which is one of the important reasons why Spark SQL can provide seamless support for DataFrame). The following is an example of a set of user data analysis:
  1. # Create a DataFrame that contains only "young" users
  2. Young = users.filter (users.age <21)

  3. # You can also use Pandas style syntax
  4. Young = users [users.age <21]

  5. # Increase the age of all
  6. (, young.age + 1)

  7. # Count the number of young people in each country
  8. Young.groupBy ("gender"). Count ()

  9. # Connect all the young users to another DataFrame named logs
  10. Young.join (logs, logs.userId == users.userId, "left_outer")
Copy the code

In addition to DSL, we can of course, as usual, with SQL to deal with DataFrame:
  1. Young.registerTempTable ("young")
  2. SqlContext.sql ("SELECT count (*) FROM young")
Copy the code
Finally, when the data analysis logic is written, we can save the final result or show it:
  1. # Add the Parquet file to HDFS
  2. (path = "hdfs: //path/to/data.parquet",
  3. Source = "parquet",
  4. Mode = "append")

  5. # Overrides the JSON file on S3
  6. (path = "s3n: //path/to/data.json",
  7. Source = "json",
  8. Mode = "append")

  9. # Save as a SQL table
  10. Young.saveAsTable (tableName = "young", source = "parquet" mode = "overwrite")

  11. # Convert to Pandas DataFrame (Python API unique features)
  12. PandasDF = young.toPandas ()

  13. # Print the output in tabular form
  14. ()
Copy the code

Behind the scenes: Spark SQL query optimizer with code generation
Just as the various transformations of RDD are actually just constructing RDD DAG, various transformations of DataFrame are also lazy. They do not directly calculate the results, but will be assembled into a variety of transformations and RDD DAG similar to the logical query plan. As mentioned earlier, because the DataFrame with schema meta information, Spark SQL query optimizer to insight into the data and the calculation of the fine structure, so the implementation of a highly targeted optimization. Subsequently, the optimized logic execution plan was translated into a physical execution plan and eventually implemented as RDD DAG.

The benefits of doing so are reflected in several aspects:
1. The user can use fewer declarative code to clarify the calculation logic, the physical execution path is selected by Spark SQL. On the one hand to reduce the development costs, on the one hand also reduces the use of the threshold - in many cases, even if the novice to write a more inefficient query, Spark SQL can also push the conditions of push, column pruning and other strategies to be effectively optimized. This is not available in the RDD API.
2. Spark SQL can dynamically generate JVM bytecode for the expression in the physical execution plan, further implement the virtual function call overhead, reduce the number of object allocation and other underlying optimization, so that the final query execution performance can be related to the performance of handwritten code Comparable.
3. For PySpark, the use of DataFrame programming only need to build a small logical execution plan, the physical implementation of all by the JVM side responsible for the Python interpreter and JVM between a large number of unnecessary cross-process communication can be removed. As shown in the figure above, a simple set of tests on the aggregation of 10 million integers to PySpark, the performance of the DataFrame API, is nearly five times faster than the RDD API. In addition, PySpark can benefit from Spark SQL's performance improvements to the query optimizer at Scala.
External data source API enhancements
From the previous text we have seen, Spark 1.3.0 for DataFrame provides a wealth of data source support. One of the highlights is the external data source API introduced by Spark 1.2.0. In 1.3.0, we have further enhanced this API.
Data write support
In Spark 1.2.0, the external data source API can only read data from external data sources into Spark, and can not write the results back to the data source. At the same time, the tables imported and registered through the data source can only be temporary tables, Relevant meta information can not be persisted. In 1.3.0, we provide complete data write support, which complements the last piece of important puzzle for multiple data source interoperability. In the previous example, Hive, Parquet, JSON, Pandas and other data sources between the arbitrary conversion, it is the direct result of this enhancement.
At the perspective of Spark SQL external data source developers, the data written to support APIs mainly include:
1. Data source table metadata is persistent
1.3.0 introduces a new external data source DDL syntax (SQL code snippet)
  2. <Table-name> [(col-name data-type [, ...]]
  3. USING <source> [OPTIONS ...]
  4. [AS <select-query>]
Copy the code

Thus, the SQL tables registered from external data can be either temporary tables or persistent to Hive metastore. In addition to the need to inherit the original RelationProvider, but also need to inherit CreatableRelationProvider.
2. InsertableRelation
Support the data to write the external data source of the relation class, need to inherit trait InsertableRelation, and insert method to achieve data insertion logic.
The built-in JSON and Parquet data sources in Spark 1.3.0 have implemented the API described above and can serve as a reference example for developing external data sources.
Uniform load / save API
In Spark 1.2.0, in order to preserve the results in SchemaRDD, the choice is not much more convenient. Some of the commonly used include:

  • Rdd.saveAsParquetFile (...)
  • Rdd.saveAsTextFile (...)
  • Rdd.toJSON.saveAsTextFile (...)
  • Rdd.saveAsTable (...)
  • ...

Can be seen, different data output, the API is not the same. Even more daunting is that we lack a flexible way to extend the new data writing format.
In response to this problem, 1.3.0 unified load / save API, allowing users to freely choose an external data source. This API includes:
Load the DataFrame from the SQL table.
Loads the DataFrame from the specified external data source.
Save the data for the specified location as an external SQL table, store the information in Hive metastore, and return the DataFrame containing the corresponding data.
Writes a DataFrame to the specified external data source.
Save the DataFrame as a SQL table, and the meta information is stored in Hive metastore, and the data is written to the specified location.
Parquet data source enhancement
Spark SQL from the beginning built-in support Parquet this efficient column storage format. After opening the external data source API, the original Parquet support is also gradually turning to external data sources. 1.3.0, the ability of Parquet external data sources has been significantly enhanced. Including schema merge and automatic partition processing.
1.Schema merge
Similar to ProtocolBuffer and Thrift, Parquet also allows users to add new columns over time after defining a schema. As long as they do not modify the meta information of the original column, the new and old schema is still compatible. This feature allows users to add new data columns at any time without having to worry about data migration.
2. Partition information discovery
According to the directory on the same table of data partition storage, Hive and other systems used in a common data storage. The new Parquet data source can automatically discover and deduce partition information based on the directory structure.
3. Partition pruning
Partitioning actually provides a coarse-grained index. When the query conditions only involve part of the partition, through the partition pruning skip the unnecessary scan of the partition directory, you can greatly improve the query performance.
The following Scala code example shows these capabilities (Scala code snippets) for the Parquet data source in 1.3.0:
  1. // Create two simple DataFrames and store them into two separate partition directories
  2. Val df1 = (1 to 5) .map (i => (i, i * 2)). ToDF ("single", "double")
  3. ("data / test_table / key = 1", "parquet", SaveMode.Append)
  4. Val df2 = (6 to 10) .map (i => (i, i * 2)). ToDF ("single", "double")
  5. ("data / test_table / key = 2", "parquet", SaveMode.Append)
  6. // Introduce a new column in another DataFrame and store it in another partition directory
  7. Val df3 = (11 to 15) .map (i => (i, i * 3)). ToDF ("single", "triple")
  8. ("data / test_table / key = 3", "parquet", SaveMode.Append)
  9. // read the data for the entire partition table at once
  10. Val df4 = sqlContext.load ("data / test_table", "parquet")
  11. // query by partition and display the results
  12. Val df5 = df4.filter ($ "key"> = 2) ()
Copy the code
The result of this code is:
  1. 6 12 null 2
  2. 7 14 null 2
  3. 8 16 null 2
  4. 9 18 null 2
  5. 10 20 null 2
  6. 11 null 33 3
  7. 12 null 36 3
  8. 13 null 39 3
  9. 14 null 42 3
  10. 15 null 45 3
Copy the code
Visible, Parquet data source automatically from the file path found in the key of the partition column, and the correct merger of two different but compatible with the schema. It is worth noting that in the last query query conditions skip the key = 1 this partition. Spark SQL query optimizer will be based on the query conditions will be cut off the partition directory, do not scan the data in the directory, thereby enhancing the query performance.
The introduction of the DataFrame API has changed the RAD API's high-calorie FP posture, making Spark more approachable, making the development experience of large data analysis more and more close to the development experience of traditional standalone data analysis. External data source API is reflected in the inclusive. At present, in addition to the built-in JSON, Parquet, JDBC, the community has emerged in the CSV, Avro, HBase and other data sources, Spark SQL diversified structured data processing capabilities are gradually released.
For developers to provide more expansion points, Spark throughout the 2015 one of the themes. We hope that through these expansion API, effectively detonated the community's energy, so that Spark's ecology is more full and diverse.


Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch