Spark Windowing Function

Estimated reading: 3 minutes 369 views

Python Window Function.

PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows. In this article,
I’ve explained the concept of window functions, syntax, and finally how to use them with PySpark SQL and PySpark DataFrame API.
These come in handy when we need to make aggregate operations in a specific window frame on DataFrame columns.

Preparing the data

				
					from pyspark.sql import functions as f
df = sc.parallelize([("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)]).toDF(["employee_name", "department", "salary"])
df.show()

				
			

row_number Window Function

row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

				
					from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")
res = df.withColumn("row_number",row_number().over(windowSpec))
res.show()
				
			

rank()

window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

				
					from pyspark.sql.functions import rank
res1 = df.withColumn("rank",rank().over(windowSpec))
res1.show()
				
			

dense_rank()

window function is used to get the result with rank of rows within a window partition without any gaps.
This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

				
					from pyspark.sql.functions import dense_rank
res3=df.withColumn("dense_rank",dense_rank().over(windowSpec))
res3.show()
				
			

lag()

PySpark allows the user to query on more than one row of a table returning the previous row in the table.

				
					from pyspark.sql.functions import lag       
df.withColumn("lag",lag("salary",1).over(windowSpec)).show()   

				
			

lead()

returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.

				
					from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",1).over(windowSpec)).show()

				
			

cume_dist()

Returns the cumulative distribution of values within a window partition

				
					""" cume_dist """
from pyspark.sql.functions import cume_dist   

df.withColumn("cume_dist",cume_dist().over(windowSpec)).show()

				
			

Leave a Comment

CONTENTS