Spark Joins

Estimated reading: 4 minutes 371 views

pyspark joins

it is used to join the dataframe based upon the keys.

Below dataframe we will be using for joining it.

				
					rdd1 = sc.parallelize([("Rafferty",31),("Jones",33),("Heisenberg", 33),("Robinson", 34),("Smith", 34),("Williams", 39)])
rdd2 = sc.parallelize([(31, "Sales"),(33, "Engineering"),(34, "Clerical"),(35, "Marketing")])
employees = rdd1.toDF(["LastName","DepartmentID"])
departments = rdd2.toDF(["DepartmentID","DepartmentName"])

				
			

PySpark Inner Join DataFrame

Inner join is the default join in PySpark and it’s mostly used.
This joins two datasets on key columns, where keys don’t match the rows get dropped from both datasets (emp & dept).

				
					inner_join =  employees.join(departments, "DepartmentID")
inner_join_with_Multiple = employees.join(departments, ["DepartmentID","name"])
inner_join.show()
inner_join_with_Multiple.show()
				
			

PySpark Full Outer Join

Outer a.k.a full, fullouter join returns all rows from both datasets, where join expression doesn’t match it returns null on respective record columns.

				
					fullouter = employees.join(departments, "DepartmentID", "fullouter")
fullouter.show()
				
			

PySpark Left Outer Join/ Left Join

Left a.k.a Leftouter join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.

				
					left = employees.join(departments, "DepartmentID", "left")
left_outer = employees.join(departments, "DepartmentID", "left_outer")
left_outer.show()
				
			

PySpark Right Outer Join/ Right Join

Right a.k.a Rightouter join is opposite of left join, here it returns all rows from the right dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.

				
					
right = employees.join(departments, "DepartmentID", "right")
right_outer = employees.join(departments, "DepartmentID", "right_outer")
right_outer.show()

				
			

Left Semi Join

leftsemi join is similar to inner join difference being leftsemi join returns all columns from the left dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.

 

The same result can be achieved using select on the result of the inner join however, using this join would be efficient.

				
					left_semi = employees.join(departments, "DepartmentID", "left_semi")
left_semi.show()
				
			

Left Anti Join

leftanti join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records.

				
					left_anti = employees.join(departments, "DepartmentID", "left_anti")
left_anti.show()
				
			

PySpark Self Join

Joins are not complete without a self join, Though there is no self-join type available,
we can use any of the above-explained join types to join DataFrame to itself. below example use inner self join.

				
					empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
				
			

Here, we are joining emp dataset with itself to find out superior emp_id and name for all employees.

Using SQL Expression

Since PySpark SQL support native SQL syntax, we can also write join operations after creating temporary tables on DataFrames and use these tables on spark.sql().

				
					empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)
				
			

PySpark SQL Join on multiple DataFrames

When you need to join more than two tables, you either use SQL expression after creating a temporary view on the DataFrame or
use the result of join operation to join with another DataFrame like chaining them. for example

				
					
df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")
				
			

Leave a Comment

CONTENTS