The use of UDF in sparkSQL

Spark in the use of some of the functions need to customize the method to achieve, this time you can use the UDF function to achieve

Multi-parameter support

UDF does not support the  way to enter multiple parameters, such as String* , but you can use array to solve this problem.
Define the udf method, where the function is to merge multiple fields into one field
 def allInOne (seq: Seq[Any], sep: String) : String = seq.mkString(sep) 
Used in sql
sqlContext.udf.register("allInOne", allInOne _)

//将col1,col2,col3三个字段合并,使用','分割
val sql =
    """
      |select allInOne(array(col1,col2,col3),",") as col
      |from tableName
    """.stripMargin
sqlContext.sql(sql).show()
 
 Used in a DataFrame
import org.apache.spark.sql.functions.{udf,array,lit}
val myFunc = udf(allInOne _)
val cols = array("col1","col2","col3")
val sep = lit(",")
df.select(myFunc(cols,sep).alias("col")).show()

Some simple examples

Count the number of statistics

Table structure is as follows, statistics out the number of each person's hobbies
Name Hobbies
Alice               Jogging, coding, cooking
Lina              Travel, dance
 
result
Name Hobbies Hobby_num
Alice    Read book, coding, cooking   3
Lina     Travel, dance                               2

2. Blank fill

The table structure is as follows
A B
Null      123456
234234      234234






sqlContext.udf.register("combine", (s1: String,s2: String)=> {if(s1 == null) s2 else s1})
sqlContext.sql("select combine(A,B) as A from table")

A
123456
234234  

3. Type conversion

Type conversion, convert String to Int
 sqlContext.udf.register("str2Int", (s: String) => s.toInt) 
Or use cast directly
 sqlContext.sql("select cast(a AS Int) from table") 

4. Comprehensive use

Raw data, ID (user name), loginIP (account login ip address)
ID
login        IP
alice  ip1
lina             ip2
sven ip3
alice ip1
sven ip2
alice ip4

Calculate which ip users have logged in and count the number of users
ID Ip_list LoginIP_num
Alice Ip1, ip4  2
Lina          Ip2                             1
Sven Ip2, ip3 2

sqlContext.udf.register("list_size", (s: String) => s.split(',').size)
val sql =
    """select ID,ip_list,list_size(ip_list) as loginIP_num
      |from (select ID,concat_ws(',',collect_set(loginIP)) as ip_list from table)
    """.stripMargin
sqlContext.sql(sql)
 

reference:

[Spark UDF with varargs] (
Http://stackoverflow.com/questions/33151866/spark-udf-with-varargs )
 

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization