Spark SQL Catalyst source code analysis of the UDF (6)

In the world of SQL, in addition to the official use of the commonly used processing functions, generally provide a scalable external custom function interface, which has become a de facto standard.
In the previous Spark SQL source code analysis of the core process, has introduced the role of Spark SQL Catalyst Analyzer, which contains ResolveFunctions this resolution function. But with the Spark1.1 version of the release, Spark SQL code has a lot of new and new features, and I earlier based on the 1.0 source code analysis somewhat different, such as support for UDF:
Spark1.0 and previous implementations:
  1. Protected [sql] lazy val catalog: Catalog = new SimpleCatalog
  2. @transient
  3. Protected [sql] lazy val analyzer: Analyzer =
  4. New Analyzer (catalog, EmptyFunctionRegistry, caseSensitive = true) // EmptyFunctionRegistry
  5. @transient
  6. Protected [sql] val optimizer = Optimizer
Copy the code

Spark1.1 and later implementations:
  1. Protected [sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry // SimpleFunctionRegistry implementation, support for simple UDF

  2. @transient
  3. Protected [sql] lazy val analyzer: Analyzer =
  4. New Analyzer (catalog, functionRegistry, caseSensitive = true)
Copy the code

First, the primer:
For the SQL statement in the function, will be resolved by SqlParser UnresolvedFunction. UnresolvedFunction is finally parsed by Analyzer.
In addition to the unofficial definition of the function, you can also define a custom function, sql parser will be resolved.
  1. Ident ~ "(" ~ repsep (expression, ",") <~ ")" ^^ {
  2. Case udfName ~ _ ~ exprs => UnresolvedFunction (udfName, exprs)
Copy the code

The SqlParser passed udfName and exprs are encapsulated into a class class UnresolvedFunction inherited from Expression.
Only this expression of dataType and a series of properties and eval calculation methods are unable to access, forced access will throw an exception, because it is not Resolved, just a carrier.
  1. Case class UnresolvedFunction (name: String, children: Seq [Expression]) extends Expression {
  2. Override def dataType = throw new UnresolvedException (this, "dataType")
  3. Override def foldable = throw new UnresolvedException (this, "foldable")
  4. Override def nullable = throw new UnresolvedException (this, "nullable")
  5. Override lazy val resolved = false

  6. // Unresolved functions are transient at compile time and do not get published
  7. Override def eval (input: Row = null): EvaluatedType =
  8. Throw new TreeNodeException (this, s "No function to evaluate expression. Type: $ {this.nodeName}")

  9. Override def toString = s "'$ name ($ {children.mkString (", ")})"
  10. } <Strong> </ strong>
Copy the code

Analyzer initialization will need Catalog, database and table metadata relationship, and FunctionRegistry to maintain UDF name and UDF implementation of the metadata, where the use of SimpleFunctionRegistry.
  1. / **
  2. * Replaces [[UnresolvedFunction]] s with concrete [[catalyst.expressions.Expression Expressions]].
  3. * /
  4. Object ResolveFunctions extends Rule [LogicalPlan] {
  5. Def apply (plan: LogicalPlan): LogicalPlan = plan transform {
  6. Case q: LogicalPlan =>
  7. Q transformExpressions {// transforms the current LogicalPlan
  8. Case u @ UnresolvedFunction (name, children) if u.childrenResolved => // If traversal to UnresolvedFunction
  9. Registry.lookupFunction (name, children) // Find the udf function from the UDF metadata table
  10. }
  11. }
  12. }
Copy the code

Second, UDF registration
2.1 UDFRegistration

RegisterFunction ("len", (x: String) => x.length)
RegisterFunction is a UDFRegistration method, SQLContext now implements the UDFRegistration this trait, as long as the import SQLContext, you can use udf function.
UDFRegistration core method registerFunction:
RegisterFunction method signature def registerFunction [T: TypeTag] (name: String, func: Function1 [_, T]): Unit
Accept a udfName and a FunctionN, which can be Function1 to Function22. That the parameters of this udf only support 1-22. (Scala's pain)
The internal builder constructs an Expression through ScalaUdf, where ScalaUdf inherits from Expression (which simply understands the current SimpleUDF is an expression of a Catalyst), passes the scala function as a UDF implementation, and checks whether the field type is a reflection Allow, see ScalaReflection.
  1. Def registerFunction [T: TypeTag] (name: String, func: Function1 [_, T]): Unit = {
  2. Def builder (e: Seq [Expression]) = ScalaUdf (func, ScalaReflection.schemaFor (typeTag [T]). DataType, e) // Constructs Expression
  3. FunctionRegistry.registerFunction (name, builder) // to functionRegistry of SQLContext (which maintains a hashMap to manage udf mappings)
Copy the code

2.2 Register Function:
Note: Here FunctionBuilder is a type FunctionBuilder = Seq [Expression] => Expression
  1. Class SimpleFunctionRegistry extends FunctionRegistry {
  2. Val functionBuilders = new mutable.HashMap [String, FunctionBuilder] () // udf mapping relationship maintenance [udfName, Expression]

  3. Def registerFunction (name: String, builder: FunctionBuilder) = {// put expression into Map
  4. FunctionBuilders.put (name, builder)
  5. }

  6. Override def lookupFunction (name: String, children: Seq [Expression]): Expression = {
  7. FunctionBuilders (name) (children) // Find udf and return to Expression
  8. }
  9. }
Copy the code

At this point, we will be a scala function registered as a catalyst of an Expression, which is spark simple udf.

Third, UDF calculation:
UDF has been encapsulated as an Expression node in the catalyst tree, then the calculation is calculated when the ScalaUdf eval method.
First through the Row and expression calculation function required parameters, and finally through the call function call to achieve the purpose of calculating udf.
ScalaUdf inherited from Expression:

ScalaUdf accepts a function, dataType, and a series of expressions.
Relatively simple to see the note can be:
  1. Case class ScalaUdf (function: AnyRef, dataType: DataType, children: Seq [Expression])
  2. Extends Expression {

  3. Type EvaluatedType = Any

  4. Def nullable = true

  5. Override def toString = s "scalaUDF ($ {children.mkString (", ")})"
  6. Override def eval (input: Row): Any = {
  7. Val result = children.size match {
  8. Case 0 => function.asInstanceOf [() => Any] ()
  9. Case 1 => function.asInstanceOf [(Any) => Any] (children (0) .eval (input)) // Reflection call function
  10. Case 2 =>
  11. Function.asInstanceOf [(Any, Any) => Any] (
  12. Children (0) .eval (input), // expression parameter calculation
  13. Children (1) .eval (input))
  14. Case 3 =>
  15. Function.asInstanceOf [(Any, Any, Any) => Any] (
  16. Children (0) .eval (input),
  17. Children (1) .eval (input),
  18. Children (2) .eval (input)
  19. Case 4 =>
  20. ...
  21. Case 22 => // scala function only supports 22 parameters, here enumerated.
  22. () Any (Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) = > Any] (
  23. Children (0) .eval (input),
  24. Children (1) .eval (input),
  25. Children (2) .eval (input),
  26. Children (3) .eval (input),
  27. Children (4) .eval (input),
  28. Children (5) .eval (input),
  29. Children (6) .eval (input),
  30. Children (7) .eval (input),
  31. Children (8) .eval (input),
  32. Children (9) .eval (input),
  33. Children (10) .eval (input),
  34. Children (11) .eval (input),
  35. Children (12) .eval (input),
  36. Children (13) .eval (input),
  37. Children (14) .eval (input),
  38. Children (15) .eval (input),
  39. Children (16) .eval (input),
  40. Children (17) .eval (input),
  41. Children (18) .eval (input),
  42. Children (19) .eval (input),
  43. Children (20) .eval (input),
  44. Children (21) .eval (input)
Copy the code

Four, summary
Spark's current UDF is actually scala function. Encapsulate scala function into a Catalyst Expression, and use the same Eval method to calculate the current input Row when doing sql calculations.
Writing a spark udf is very simple, just give the UDF a function name, and pass a scala function. Relying on scala function programming performance, making the preparation of scala udf relatively simple, and compared to hive udf easier to understand.


Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch