Spark DataFrame Api’s and Functions
Spark Read CSV file into DataFrame
Using spark.read.format(“csv”).load(“path”)
you can read a CSV file with fields delimited by pipe, comma, tab (and many more) into a Spark DataFrame,
These methods take a file path to read from as an argument.
using header True , it will read header as well.
and using inferSchema as True it will read through Schema
df = spark.read.format("csv").option("header", "true").option("c","true").load("/FileStore/tables/first_test.csv")
df.show(2)
getting Schema
using printSchema , u can easily get the Schema of dataframe.
df.printSchema()
select
using select u can select multiple column
df.select("name","id").show()
df.select(df.name, df.zip + 1).show()
filter
using filter we can easily do the Filter based upon column conditionwe can use or also and operator.
df.filter(df.id > 3).show()
df.filter((df.id == 1) & (df.zip == 560098)).show()
df.filter((df.id == 1) | (df.zip == 560098)).show()
groupBy
using filter we can easily do the Filter based upon column conditionwe can use or also and operator.
When we perform groupBy() on Spark Dataframe, it returns RelationalGroupedDataset object which contains below aggregate functions.
count() – Returns the count of rows for each group.
mean() – Returns the mean of values for each group.
max() – Returns the maximum of values for each group.
min() – Returns the minimum of values for each group.
sum() – Returns the total for values for each group.
avg() – Returns the average for values for each group.
agg() – Using agg() function, we can calculate more than one aggregate at a time.
groupBy and aggregate on DataFrame columns
df.groupBy("department").sum("salary").show(false)
df.groupBy("department").count()
df.groupBy("department").min("salary")
df.groupBy("department").max("salary")
df.groupBy("department").mean( "salary")
//GroupBy on multiple columns
df.groupBy("department","state")
.sum("salary","bonus")
.show(false)
Running more aggregates at a time
Using agg() aggregate function we can calculate many aggregations at a time on a single statement using Spark SQL aggregate functions sum(), avg(), min(), max() mean() e.t.c. In order to use these,
we should import “import org.apache.spark.sql.functions._”
import pyspark.sql.functions as f
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.show(false)
agg
using agg we can perform max,min,count and avg on the given column
df.agg({"id": "max"}).show()
df.agg({"id": "min"}).show()
df.agg({"id": "count"}).show()
df.agg({"id": "avg"}).show()
where
using where we can filter on the given column with condition.
new_df = df.where(df.id>2).select("id","zip")
new_df.show()
collect
using the collect() on dataframe we can loop in the dataframe.
for i in df.collect():
print(i['id'])
print(i['area'])
drop
using drop() , we can drop multiple column as shown below.
df2 = df2.drop("id","area")
df2.show()
union
Dataframe union() – union() method of the DataFrame is used to combine two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.
DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().
union_res = df1.union(df2)
union_res.show()
union_all_res = df1.unionAll(df2)
union_all_res.show()
intersection
using intersection , we can get the common record between dataframes.
intersection_res = df1.intersect(df2)
intersection_res.show()
orderBy
PySpark DataFrame also provides orderBy() function to sort on one or more columns. By default, it orders by ascending.
df1.orderBy(f.desc("zip")).show()
lit
using lit function we can add the value in dataframe as default value.
import pyspark.sql.functions as f
rdd1 = sc.parallelize([("Rafferty",31),("Jones",33),("Heisenberg", 33),("Robinson", 34),("Smith", 34),("Williams", 39)])
employees = rdd1.toDF(["LastName","DepartmentID"])
employees.show()
employees = employees.withColumn("salary", f.lit(7500))
withColumnRenamed
using withColumnRenamed we can change the columnname.
employees = employees.withColumnRenamed("LastName","LastName_name")
employees.show()
Typecaste
using pyspark.sql.types we can typecast from one Type to another.
from pyspark.sql.types import IntegerType
file_location1 = "/FileStore/tables/first_test.csv"
df = spark.read.format("csv").option("header", "true").load(file_location1)
#df.show()
df.printSchema()
df = df.withColumn("id", df["id"].cast(IntegerType()))
df = df.withColumn("zip", df["zip"].cast(IntegerType()))
#df.show()
df.printSchema()
dropping the null values
using .na.drop() ,will remove all record which is having a single null also
df.na.drop().show()
.na.fill(integervalue)
it will fill the value for those column which is having Number Type(Int or Float)
df.na.fill(100).show()
na.fill(StringValue)
it will fill the value for those column which is having String Type
df.na.fill("Missing_Number").show()
PySpark When Otherwise
pySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column, when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).otherwise(default).
when() function take 2 parameters, first param takes a condition and second takes a literal value or Column, if condition evaluates to true then it returns a value from second param.
The below code snippet replaces the value of gender with a new derived value, when conditions not matched, we are assigning “Unknown” as value, for null assigning empty.
from pyspark.sql.functions import when
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")
.when(df.gender == "F","Female")
.when(df.gender.isNull() ,"")
.otherwise(df.gender))
df2.show()
PySpark SQL Case When on DataFrame.
If you have a SQL background you might have familiar with Case When statement that is used to execute a sequence of conditions and returns a value when the first condition met,
similar to SWITH and IF THEN ELSE statements. Similarly, PySpark SQL Case When statement can be used on DataFrame
Syntax of SQL CASE WHEN ELSE END
CASE
WHEN condition1 THEN result_value1
WHEN condition2 THEN result_value2
—–
—–
ELSE result
END;
CASE is the start of the expression
Clause WHEN takes a condition, if condition true it returns a value from THEN
If the condition is false it goes to the next condition and so on.
If none of the condition matches, it returns a value from the ELSE clause.
END is to end the expression
#Using Case When on withColumn()
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE gender END"))
df3.show(truncate=False)
#Using Case When on select()
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE gender END").alias("new_gender"))
Using Case When on SQL Expression
You can also use Case When with SQL statement after creating a temporary view. This returns a similar output as above.
df.createOrReplaceTempView("EMP")
spark.sql("select name, CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE gender END as new_gender from EMP").show()
Multiple Conditions using & and | operator
We often need to check with multiple conditions,
below is an example of using PySpark When Otherwise with multiple conditions by using and (&) or (|) operators.
To explain this I will use a new set of data to make it simple.
df5.withColumn(“new_column”, when((col(“code”) == “a”) | (col(“code”) == “d”), “A”)
.when((col(“code”) == “b”) & (col(“amt”) == “4”), “B”)
.otherwise(“A1”)).show()
Etraction year,month,day from date Column
we can convert to date Type Column
from pyspark.sql import functions as f
f.concat_ws
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/Test_Date2.csv")
df.show()
df.printSchema()
modifiedDF = df.withColumn("Date", f.to_date(df.Date, "MM/dd/yyyy"))
year
using year Function , we can get the year.
modifiedDF = modifiedDF.withColumn("birth_year",f.year(modifiedDF.Date))
month
using month Function , we can get the month.
modifiedDF = modifiedDF.withColumn("birth_month",f.month(modifiedDF.Date))
dayofmonth
using dayofmonth Function , we can get the date.
modifiedDF = modifiedDF.withColumn("birth_date",f.dayofmonth(modifiedDF.Date))
date_add
using date_add Function , we can add and substract as integer.
modifiedDF = modifiedDF.withColumn("fivteen_days_later",f.date_add(modifiedDF.Date, 15))
modifiedDF = modifiedDF.withColumn("twentty_days_Before",f.date_add(modifiedDF.Date, -20))
modifiedDF.show()
minute
using minute , we can get minute
df = df.withColumn("minute",f.minute(df.DateTime))
second
using second , we can get second.
df = df.withColumn("second",f.second(df.DateTime))
hour
using hour , we can get hour.
df = df.withColumn("hour",f.hour(df.DateTime))