Spark DataFrame Api’s and Functions

Estimated reading: 8 minutes 374 views

Spark Read CSV file into DataFrame

Using spark.read.format(“csv”).load(“path”)
you can read a CSV file with fields delimited by pipe, comma, tab (and many more) into a Spark DataFrame,
These methods take a file path to read from as an argument.
using header True , it will read header as well.
and using inferSchema as True it will read through Schema

				
					df = spark.read.format("csv").option("header", "true").option("c","true").load("/FileStore/tables/first_test.csv")  
df.show(2)

				
			

getting Schema

using printSchema , u can easily get the Schema of dataframe.

				
					df.printSchema()
				
			

select

using select u can select multiple column

				
					df.select("name","id").show()
df.select(df.name, df.zip + 1).show()

				
			

filter

using filter we can easily do the Filter based upon column conditionwe can use or also and operator.

				
					df.filter(df.id > 3).show()
df.filter((df.id == 1) & (df.zip == 560098)).show()
df.filter((df.id == 1) | (df.zip == 560098)).show()

				
			

groupBy

using filter we can easily do the Filter based upon column conditionwe can use or also and operator.

When we perform groupBy() on Spark Dataframe, it returns RelationalGroupedDataset object which contains below aggregate functions.


count() – Returns the count of rows for each group.

mean() – Returns the mean of values for each group.

max() – Returns the maximum of values for each group.


min() – Returns the minimum of values for each group.

sum() – Returns the total for values for each group.

avg() – Returns the average for values for each group.


agg() – Using agg() function, we can calculate more than one aggregate at a time.

groupBy and aggregate on DataFrame columns

				
					df.groupBy("department").sum("salary").show(false)
df.groupBy("department").count()
df.groupBy("department").min("salary")
df.groupBy("department").max("salary")
df.groupBy("department").mean( "salary") 
//GroupBy on multiple columns
df.groupBy("department","state")
    .sum("salary","bonus")
    .show(false)
				
			

Running more aggregates at a time

Using agg() aggregate function we can calculate many aggregations at a time on a single statement using Spark SQL aggregate functions sum(), avg(), min(), max() mean() e.t.c. In order to use these,
we should import “import org.apache.spark.sql.functions._”

				
					import pyspark.sql.functions as f
df.groupBy("department")
    .agg(
      sum("salary").as("sum_salary"),
      avg("salary").as("avg_salary"),
      sum("bonus").as("sum_bonus"),
      max("bonus").as("max_bonus"))
    .show(false)
				
			

agg

using agg we can perform max,min,count and avg on the given column

				
					df.agg({"id": "max"}).show()
df.agg({"id": "min"}).show()
df.agg({"id": "count"}).show()
df.agg({"id": "avg"}).show()
				
			

where

using where we can filter on the given column with condition.

				
					new_df = df.where(df.id>2).select("id","zip")
new_df.show()

				
			

collect

using the collect() on dataframe we can loop in the dataframe.

				
					for i in df.collect():
  print(i['id'])
  print(i['area'])

				
			

drop

using drop() , we can drop multiple column as shown below.

				
					df2 = df2.drop("id","area")
df2.show()
				
			

union

Dataframe union() – union() method of the DataFrame is used to combine two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.

DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().

				
					union_res = df1.union(df2)
union_res.show()

union_all_res = df1.unionAll(df2)  
union_all_res.show()
				
			

intersection

using intersection , we can get the common record between dataframes.

				
					intersection_res = df1.intersect(df2)
intersection_res.show()
				
			

orderBy

PySpark DataFrame also provides orderBy() function to sort on one or more columns. By default, it orders by ascending.

				
					df1.orderBy(f.desc("zip")).show()

				
			

lit

using lit function we can add the value in dataframe as default value.

				
					import pyspark.sql.functions as f
rdd1 = sc.parallelize([("Rafferty",31),("Jones",33),("Heisenberg", 33),("Robinson", 34),("Smith", 34),("Williams", 39)])
employees = rdd1.toDF(["LastName","DepartmentID"])
employees.show()
employees = employees.withColumn("salary", f.lit(7500))
				
			

withColumnRenamed

using withColumnRenamed we can change the columnname.

				
					employees = employees.withColumnRenamed("LastName","LastName_name")
employees.show()
				
			

Typecaste

using pyspark.sql.types we can typecast from one Type to another.

				
					from pyspark.sql.types import IntegerType
file_location1 = "/FileStore/tables/first_test.csv"
df = spark.read.format("csv").option("header", "true").load(file_location1)
#df.show()
df.printSchema()
df = df.withColumn("id", df["id"].cast(IntegerType()))
df = df.withColumn("zip", df["zip"].cast(IntegerType()))
#df.show()
df.printSchema()
				
			

dropping the null values

using .na.drop() ,will remove all record which is having a single null also

				
					df.na.drop().show()
				
			

.na.fill(integervalue)

it will fill the value for those column which is having Number Type(Int or Float)

				
					df.na.fill(100).show()
				
			

na.fill(StringValue)

it will fill the value for those column which is having String Type

				
					df.na.fill("Missing_Number").show()
				
			

PySpark When Otherwise

pySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column, when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).otherwise(default).

when() function take 2 parameters, first param takes a condition and second takes a literal value or Column, if condition evaluates to true then it returns a value from second param.

The below code snippet replaces the value of gender with a new derived value, when conditions not matched, we are assigning “Unknown” as value, for null assigning empty.

				
					from pyspark.sql.functions import when
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")
                                 .when(df.gender == "F","Female")
                                 .when(df.gender.isNull() ,"")
                                 .otherwise(df.gender))
df2.show()
				
			

PySpark SQL Case When on DataFrame.

If you have a SQL background you might have familiar with Case When statement that is used to execute a sequence of conditions and returns a value when the first condition met,
similar to SWITH and IF THEN ELSE statements. Similarly, PySpark SQL Case When statement can be used on DataFrame


Syntax of SQL CASE WHEN ELSE END


CASE
WHEN condition1 THEN result_value1
WHEN condition2 THEN result_value2
—–
—–
ELSE result
END;

CASE is the start of the expression
Clause WHEN takes a condition, if condition true it returns a value from THEN
If the condition is false it goes to the next condition and so on.
If none of the condition matches, it returns a value from the ELSE clause.
END is to end the expression

				
					#Using Case When on withColumn()
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " + 
               "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
               "ELSE gender END"))
df3.show(truncate=False)
				
			
				
					#Using Case When on select()
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
           "ELSE gender END").alias("new_gender"))
				
			

Using Case When on SQL Expression

You can also use Case When with SQL statement after creating a temporary view. This returns a similar output as above.

				
					df.createOrReplaceTempView("EMP")
spark.sql("select name, CASE WHEN gender = 'M' THEN 'Male' " + 
               "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
              "ELSE gender END as new_gender from EMP").show()
				
			

Multiple Conditions using & and | operator

We often need to check with multiple conditions,
below is an example of using PySpark When Otherwise with multiple conditions by using and (&) or (|) operators.
To explain this I will use a new set of data to make it simple.

				
					df5.withColumn(“new_column”, when((col(“code”) == “a”) | (col(“code”) == “d”), “A”)
.when((col(“code”) == “b”) & (col(“amt”) == “4”), “B”)
.otherwise(“A1”)).show()
				
			

Etraction year,month,day from date Column

we can convert to date Type Column

				
					from pyspark.sql import functions as f
f.concat_ws
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/Test_Date2.csv")
df.show()
df.printSchema()
modifiedDF = df.withColumn("Date", f.to_date(df.Date, "MM/dd/yyyy"))
				
			

year

using year Function , we can get the year.

				
					modifiedDF = modifiedDF.withColumn("birth_year",f.year(modifiedDF.Date))
				
			

month

using month Function , we can get the month.

				
					modifiedDF = modifiedDF.withColumn("birth_month",f.month(modifiedDF.Date))

				
			

dayofmonth

using dayofmonth Function , we can get the date.

				
					modifiedDF = modifiedDF.withColumn("birth_date",f.dayofmonth(modifiedDF.Date))

				
			

date_add

using date_add Function , we can add and substract as integer.

				
					modifiedDF = modifiedDF.withColumn("fivteen_days_later",f.date_add(modifiedDF.Date, 15))
modifiedDF = modifiedDF.withColumn("twentty_days_Before",f.date_add(modifiedDF.Date, -20))
modifiedDF.show()

				
			

minute

using minute , we can get minute

				
					df = df.withColumn("minute",f.minute(df.DateTime))

				
			

second

using second , we can get second.

				
					df = df.withColumn("second",f.second(df.DateTime))
				
			

hour

using hour , we can get hour.

				
					
df = df.withColumn("hour",f.hour(df.DateTime))
				
			

Leave a Comment

CONTENTS