Spark SQL DataFrame

First, the DataFrame object is generated

Spark-SQL can generate DataFrame objects with other RDD objects, parquet files, json files, hive tables, and other JDBC-based relational databases as data sources. This article will be MySQL database as a data source, generate DataFrame object after the relevant DataFame on the operation.
The code for generating the DataFrame in the text is as follows:
 object DataFrameOperations { def main (args: Array[String ]) { val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations" ).setMaster( "local[2]" ) val sparkContext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkContext) val url = "jdbc:mysql://m000:3306/test" val jdbcDF = sqlContext.read.format( "jdbc" ).options( Map( "url" -> url, "user" -> "root" , "password" -> "root" , "dbtable" -> "spark_sql_test" )).load() val joinDF1 = sqlContext.read.format( "jdbc" ).options( Map( "url" -> url , "user" -> "root" , "password" -> "root" , "dbtable" -> "spark_sql_join1" )).load() val joinDF2 = sqlContext.read.format( "jdbc" ).options( Map ( "url" -> url , "user" -> "root" , "password" -> "root" , "dbtable" -> "spark_sql_join2" )).load() ... ... } } 
  • 1
Subsequent code is above.

http://m.blog.csdn.net/article/details?id=52802150

Second, the DataFrame object on the Action

1, show : show data

In the form of a table in the output show jdbcDF data, similar to select * from spark_sql_test function.
There are four ways to call the way, respectively,
(1) show
Only show the first 20 records.
Example:
 jdbcDF. show 
  • 1
result:
这里写图片描述
(2) show(numRows: Int)
Show numRows example:
 jdbcDF. show ( 3 ) 
  • 1
result:
这里写图片描述
(3) show(truncate: Boolean)
Whether it only shows up to 20 characters, the default is true .
Example:
 jdbcDF. show ( true ) jdbcDF. show ( false ) 
  • 1
result:
这里写图片描述
(4) show(numRows: Int, truncate: Boolean)
Synthesize the number of previous display records, and the display format for too long strings.
Example:
 jdbcDF. show ( 3 , false ) 
  • 1
result:
这里写图片描述

2, collect : get all the data to the array

jdbcDF from the previous show method, where the collect method will jdbcDF all the data are obtained, and return an Array object.
 jdbcDF .collect () 
  • 1
The result is as follows, the result array contains a record of jdbcDF , each record by a GenericRowWithSchema object to GenericRowWithSchema , you can store the field name and field values.
这里写图片描述

3, collectAsList : get all the data to the List

Function and collect similar, but will return to the List into a List object, use the following method
 jdbcDF .collectAsList () 
  • 1
The results are as follows:
这里写图片描述

4, describe(cols: String*) : Get the specified field describe(cols: String*)

This method can be DataFrame passed to one or more String type field names, the result is still DataFrame object, for DataFrame value type field DataFrame , such as count, mean, stddev, min, max and so on.
Use the c1 methods, where c1 field for the character type, c2 field for the integer, c4 field for the floating-point type
 jdbcDF .describe ( "c1" , "c2" , "c4" ) .show () 
  • 1
The results are as follows:
这里写图片描述

5, first, head, take, takeAsList : get a number of line records

(1) first get the first line of the record (2) head obtain the first line of records, head(n: Int) obtain the first n line records (3) take(n: Int) (4) takeAsList(n: Int) obtain the first n rows of data, and in the form of List to show Row or Array[Row] in the form of one or more rows of data. first and head function the same.
take and takeAsList method will get the data back to the Driver side, so the use of these two methods need to pay OutOfMemoryError amount of data, so that Driver occurred OutOfMemoryError
Use and results slightly.

Second, the DataFrame object on the conditional query and join and other operations

The following returns a method for the DataFrame type that can be called continuously.

1, where conditional correlation

(1) where(conditionExpr: String) : SQL language where the keyword after the where(conditionExpr: String) , you can use and and or . Get the return result of the DataFrame type,
Example:
 jdbcDF .where ( "id = 1 or c1 = 'b'" ) .show () 
  • 1
result,
这里写图片描述
(2) filter : Filter by field <br> Incoming filter conditional expression, get the return result of the DataFrame type. And where the same where use:
 jdbcDF .filter ( "id = 1 or c1 = 'b'" ) .show () 
  • 1
result,
这里写图片描述

2, query the specified field

(1) select : Get the specified field value <br> Get the value of the specified field based on the passed String type field name, return the sample to the DataFrame type Example:
 jdbcDF. select ( "id" , "c3" ). show ( false ) 
  • 1
result:
这里写图片描述
There is also a overloaded select method, not passed in the String type parameter, but passed the Column type parameter. Can achieve select id, id+1 from test this logic.
 jdbcDF. select (jdbcDF( "id" ), jdbcDF( "id" ) + 1 ). show ( false ) 
  • 1
result:
这里写图片描述
Can get the type of Column method is apply and col method, the use of apply method is more simple.
(2) selectExpr : <br> special handling may be performed on the specified field can directly call the UDF field specified function, or specify aliases. Pass the String type parameter to get the DataFrame object.
Example, query id field, c3 field take alias time , c4 field rounded:
 jdbcDF .selectExpr ( "id" , "c3 as time" , "round(c4)" ) .show (false) 
  • 1
result,
这里写图片描述
(3) col : get the col field <br> can only get a field, return the object for the Column type.
Val idCol = jdbcDF.col ("id").
(4) apply : get the specified field <br> can only get a field, return the object for the Column type Example:
 val idCol1 = jdbcDF.apply( "id" ) val idCol2 = jdbcDF( "id" ) 
  • 1
The results slightly.
(5) drop : remove the specified field, keep other fields <br> return a new DataFrame object, which does not contain the removed field, one can only remove a field.
Example:
 jdbcDF. drop ( "id" ) jdbcDF. drop (jdbcDF( "id" )) 
  • 1
result:
这里写图片描述

3, limit

limit method gets the first n rows of the specified DataFrame to get a new DataFrame object. And take with the head is limit , limit method is not Action.
 jdbcDF.limit(3). show ( false ) 
  • 1
result,
这里写图片描述

4, order by

(1) orderBy and sort : sorted by the orderBy field, the default is ascending Example 1, sorted by the specified field. Plus a - that descending order. sort and orderBy use the same method
 jdbcDF .orderBy (- jdbcDF( "c4" )) .show (false) // 或者jdbcDF .orderBy (jdbcDF( "c4" ) .desc ) .show (false) 
  • 1
result,
这里写图片描述
Example 2, sorted by field string
 jdbcDF .orderBy ( "c4" ) .show (false) 
  • 1
result,
这里写图片描述
(2) sortWithinPartitions
Similar to the sort method above, the sortWithinPartitions is that the sortWithinPartitions method returns a DataFrame object that is sorted by Partition.

5, group by

(1) groupBy : groupBy to the field group by groupBy method has two ways to call, you can pass the String type of field name, can also be passed to the Column type of object.
Use the following method,
 jdbcDF .groupBy ( "c1" ) jdbcDF .groupBy ( jdbcDF( "c1" )) 
  • 1
(2) cube and rollup : group by the rollup function similar to the SQL group by cube/rollup , slightly.
(3) GroupedData object <br> This method is the GroupedData type object, in the GroupedData API provides a group by the GroupedData , for example,
  • max(colNames: String*) method, to obtain the max(colNames: String*) field in the group or all the numeric type field maximum, can only apply to the digital field
  • min(colNames: String*) method to get the minimum value of the min(colNames: String*) field in the group or all the numeric type fields, only to the numeric field
  • mean(colNames: String*) method, which takes the average of the mean(colNames: String*) field in the group or all of the numeric type fields, can only be applied to the numeric field
  • sum(colNames: String*) method, to obtain the sum(colNames: String*) field in the group or all the numeric type field and the value can only be applied to the digital field
  • count() method to get the number of elements in the group
    Example of running results:
    count
    这里写图片描述
    max
    这里写图片描述
    Which is more complicated inside the two methods,
    agg , which is similar to the one agg below and can be used to agg field.
pivot

6, distinct

(1) distinct : Returns a DataFrame that does not contain duplicate records
Returns a unique Row record in the current DataFrame. This method is the same as the result when the next dropDuplicates() method does not pass in the dropDuplicates() field.
Example:
 jdbcDF. distinct () 
  • 1
result,
这里写图片描述
(2) dropDuplicates : dropDuplicates field to go to the weight of the dropDuplicates field. Similar to select distinct a, b Example of operation:
 jdbcDF.dropDuplicates(Se q("c1") ) 
  • 1
result:
这里写图片描述

7, polymerization

agg call the agg method, which has a variety of agg methods. groupBy used in groupBy with the groupBy method.
The following example is one of the most simple and intuitive use of the id field for the maximum value of the c4 field sum.
 jdbcDF .agg( "id" -> "max" , "c4" -> "sum" ) 
  • 1
result:
这里写图片描述

8, union

unionAll method: Combining two DataFrams is similar to the UNION ALL operation in SQL .
Example:
 jdbcDF .unionALL (jdbcDF .limit ( 1 )) 
  • 1
result:
这里写图片描述

9, join

The key is coming. In the SQL language is a lot of use is the join operation, DataFrame also provides a join function.
The next step is to introduce the join method. There are six overloaded join methods in the DataFrame.
(1), Descartes plot
 joinDF1. join (joinDF2) 
  • 1
(2), using a field form <br> The following join is similar to the form of a join b using column1 , the need for two DataFrame have the same column name,
 joinDF1. join (joinDF2, "id" ) 
  • 1
joinDF1 and joinDF2 to the field id join joinDF2 , the results are as follows, using field only once.
这里写图片描述
(3), using multiple field forms <br> In addition to the above using a field of the case, you can also use multiple fields, as follows
 joinDF1. join (joinDF2, Se q("id", "name") 
  • 1
(4), specify the join type <br> Two DataFrame join operations are inner, outer, left_outer, right_outer, leftsemi type. In the above example of using multiple fields join, you can write a third String type parameter, specify the join type, as shown below
 joinDF1. join (joinDF2, Se q("id", "name") , "inner" 
  • 1
(5), use the Column type to join
If you do not use the using mode, the flexibility to specify the join field, you can use the following form
 joinDF1. join (joinDF2 , joinDF1( "id" ) === joinDF2( "t1_id" )) 
  • 1
The results are as follows:
这里写图片描述
(6), specify the join type in the join field at the same time as shown below
 joinDF1. join (joinDF2 , joinDF1( "id" ) === joinDF2( "t1_id" ), "inner" ) 
  • 1

10, get the specified field statistics

stat method can be used to compute stat fields or specified fields, such as variance, covariance, and so on. This method returns a DataFramesStatFunctions type object.
The c4 code c4 field, the statistics of the field value of the c4 more than 30% of the content. The jdbcDF of field c1 in jdbcDF are "a, b, a, c, d, b" . Where a and b appear at frequencies of 2 / 6 and greater than 0.3
 jdbcDF. stat .freqItems(Se q ("c1") , 0 . 3 ).show() 
  • 1
The results are as follows:
这里写图片描述

11, get two DataFrame common records

intersect method can compute the same records in both DataFrames,
 jdbcDF .intersect (jdbcDF .limit ( 1 )) .show (false) 
  • 1
The results are as follows:
这里写图片描述

12, get a DataFrame in another DataFrame that does not have a record

Example:
 jdbcDF .except (jdbcDF .limit ( 1 )) .show (false) 
  • 1
The results are as follows:
这里写图片描述

13, the operation field name

(1) withColumnRenamed : rename the specified field name in the DataFrame <br> If the specified field name does not exist, do nothing. The jdbcDF example jdbcDF the id field in jdbcDF to idx .
 jdbcDF .withColumnRenamed ( "id" , "idx" ) 
  • 1
The results are as follows:
这里写图片描述
(2) withColumn : add a column to the current DataFrame whtiColumn(colName: String , col: Column) method according to the specified colName to the DataFrame add a column, if colName already exists, it will overwrite the current column.
The jdbcDF code adds a column named id2 to jdbcDF ,
 jdbcDF .withColumn ( "id2" , jdbcDF( "id" )) .show ( false) 
  • 1
The results are as follows:
这里写图片描述

14, the line column

Sometimes need to be based on the contents of a field c3_ , and then generate multiple lines, then you can use the explode method The c3_ , c3_ to the space in the c3 field to segment the contents of the field, c3_ content stored in the new field c3_ , as follows As shown
 jdbcDF.explode( "c3" , "c3_" ){ time : String => time . split ( " " )} 
  • 1
The results are as follows:
这里写图片描述

15, other operations

API there na, randomSplit, repartition, alias, as method, to be added later.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization