### Joining Spark RDD's

Hi Friends,

Today I will be demonstrating, how you can perform joins on Spark RDD's. We are going to focus on three basic join operations.

1. Join (Inner)
2.Left Outer Join
3. Right Outer Join

Lets take standard Employee+Department example and create a two RDDs;one holding employee data and another holding department data.

Employee Table :
Eid EName LName
101 Sam Flam
102 Scot Rut
103 Jass Tez

```` val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))  ````
``````
````Array[(Int, String, String)] = Array((101,Sam,Flam), (102,Scot,Rut), (103,Jas,Tez)) // output ````
``````

Department Table :
DeptId DepartmentName Eid
D01 Computer 101
D02 Electronic 104
D03 Civil 102

```` val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))  ````
``````
````Array[(String, String, Int)] = Array((D01,Computer,101), (D02,Electronic,104), (D03,Civil,102)) // output````
``````

Now as we know for joining two table, we need a key. So lets format our RDD to have key for performing join operation.

`` val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1)  // Forms a new RDD with key as Eid` `
``````  val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)
``````

```Array[(Int, (Int, String, String))] = Array((101,(101,Sam,Flam)), (102,(102,Scot,Rut)), (103,(103,Jas,Tez)))

Array[(Int, (String, String, Int))] = Array((101,(D01,Computer,101)), (104,(D02,Electronic,104)), (102,(D03,Civil,102)))
```

We are good to go now. Lets perform join operation on two RDD's

1. Inner Join
``````val EmpDeptJoin =  EmpWithKeyRDD.join(DetpWithKeyRDD)
``````
``EmpDeptJoin .collect()  // to Check Result``

Output :
``` Array[(Int, ((Int, String, String), (String, String, Int)))] = Array((101,((101,Sam,Flam),(D01,Computer,101))), (102,((102,Scot,Rut),(D03,Civil,102))))```

### `2.Left Outer Join`

``````val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
``````
``EmpDeptLeftJoin.collect()  // to Check Result``

### `Output :`

```Array[(Int, ((Int, String, String), Option[(String, String, Int)]))] = Array((101,((101,Sam,Flam),Some((D01,Computer,101)))), (102,((102,Scot,Rut),Some((D03,Civil,102)))), (103,((103,Jas,Tez),None)))```

### ``` 3.Left Outer Join```

``` ```

``````val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)
``````
``EmpDeptRightJoin .collect()  // to Check Result``

``` ```

Output :
Array[(Int, (Option[(Int, String, String)], (String, String, Int)))] = Array((101,(Some((101,Sam,Flam)),(D01,Computer,101))), (104,(None,(D02,Electronic,104))), (102,(Some((102,Scot,Rut)),(D03,Civil,102))))

We can perform formatting on data that we received as output from each action. Lets take output from inner join and bring in proper format.

`val EmpDeptJoinFormatted =  EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))`

Output :

Array[(Int, String, String, String, String)] = Array((101,Sam,Flam,D01,Computer), (102,Scot,Rut,D03,Civil))
Explanation of operation performed using map function on each element of RDD.

Looking at diagram , it is clear that how each element is formatted for creating final RDD.

### Complete Code

```package org.com.td.sparkdemo.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object JoinRDD {

val conf = new SparkConf()
.setAppName("JoinRDD")
.setMaster("local")
val sc = new SparkContext(conf)

val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))
val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))

val EmpWithKeyRDD  = EmpRDD.keyBy(x => x._1)
val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)

val EmpDeptJoin =  EmpWithKeyRDD.join(DetpWithKeyRDD)
val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)

val EmpDeptJoinFormatted =  EmpDeptJoin.map(x => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))

EmpDeptJoinFormatted.saveAsTextFile("/path/to/your/output/directory")

}
```

Enjoy...Happy Coding...!!!!!!