Spark DataFrame
Spark DataFrame
DSCI 551
Wensheng Wu
1
Create & display dataframes
• country = spark.read.json(‘country.json’) # also supports read.csv(…)
• city = spark.read.json(‘city.json’)
• cl = spark.read.json(‘countrylanguage.json’)
• country.show() # show top 20 rows as a table
• Similar to country.head() in Pandas
• country.show(5)
• Also has head(5)/take(5), tail(5), collect()
• return Row(…)’s instead
2
Creating data frames
• df = spark.createDataFrame([(‘Tom’, 80), (‘Alice’, None)], [“name”,
“height”])
3
Projection
• Selecting a subset of columns
• country[[‘Continent’, ‘Region’]] # similar to Pandas
• Alternative:
• country[[country.Continent, country.Region ]]
• country.select(‘Continent’, ‘Region’) # this returns a dataframe
• country.select(country.LifeExpectancy.alias(‘le’))
• country[‘Continent’, ‘Region’] # this returns a dataframe
• country[‘Continent’] # this returns a column
4
Renaming columns
• country.withColumnRenamed(‘Name’, ‘CountryName’).show()
5
Adding a new column => a new dataframe
• import pyspark.sql.functions as fc
• df = spark.createDataFrame([(1,1), (1,2), (1,3)], [‘a’, ‘b’])
• >>> df.show()
+—+—+
| a| b|
+—+—+
| 1| 1|
| 1| 2|
| 1| 3|
+—+—+
• df1 = df.withColumn(‘c’, fc.col(‘b’))
• df1.show()
• +—+—+—+
| a| b| c|
+—+—+—+
| 1| 1| 1|
| 1| 2| 2|
| 1| 3| 3|
+—+—+—+
6
More examples
• df.withColumn(‘c’, fc.when(df.b > 2, 1).otherwise(-1)).show()
• df.withColumn(‘c’, fc.lit(1)).show()
7
+—+—+—+
| a| b| c|
+—+—+—+
| 1| 1| -1|
| 1| 2| -1|
| 1| 3| 1|
+—+—+—+
+—+—+—+
| a| b| c|
+—+—+—+
| 1| 1| 1|
| 1| 2| 1|
| 1| 3| 1|
+—+—+—+
Selection/filtering
• Selecting a subset of rows
• country[country.GNP > 10000] # similar to Pandas
• Alternative:
• country.filter(country.GNP > 10000)
• country.filter(“GNP > 10000”)
• country.filter(‘GNP > 10000 and GNP < 50000') # filter takes SQL style where-condition
• country.where('GNP > 10000′)
• country[(country.GNP > 10000) & (country.GNP < 50000)] # similar to Pandas • Other logical operators: |, ~ 8 Distinct • country[['Continent', 'Region']].distinct() • Alternative: • country[['Continent', 'Region']].dropDuplicates() • Similar to drop_duplicates()/unique() in Pandas 9 Groupby without aggregation • country.groupBy('Continent') or country.groupby('Continent') • Similar to groupby in Pandas • Need to aggregate so that we can show the grouping details • country.groupBy('Continent').count()[['Continent']].show() • May also use: • country[['Continent']].distinct() 10 Aggregation w/o groupby • country.agg({'GNP': 'max'}) • import pyspark.sql.functions as fc • country.agg(fc.max('GNP').alias('max_gnp')) • Similar to agg(max_gnp = pd.NamedAgg('GNP', 'max')) in Pandas • country.agg(fc.max('GNP').alias('max_gnp'), fc.min('GNP').alias('min_gnp')).show() 11 +---------+-------+ | max_gnp|min_gnp| +---------+-------+ |8510700.0| 0.0| +---------+-------+ Group by with aggregation • import pyspark.sql.functions as fc • country.groupBy('Continent').agg(fc.max("GNP").alias("max_gnp"),\ fc.count("*").alias("cnt")).show() =>
Select Continent, max(GNP) max_gnp, count(*) cnt
From country
Group by Continent
12
Group by with having
• import pyspark.sql.functions as fc
• country.groupBy(‘Continent’).agg(fc.max(“GNP”).alias(“max_gnp”),
fc.count(“*”).alias(“cnt”)).filter(‘cnt > 5’).show()
=>
select Continent, max(GNP) max_gnp, count(*) cnt
from country
group by Continent
having cnt > 5
13
Counting w/o group by
• country.count()
• Note different from Pandas count()
=>
select count(*)
from country
14
Aggregating one column
• country.groupBy(‘Continent’).max(‘GNP’).show()
• country.groupBy([‘Continent’, ‘Region’]).max(‘GNP’).show()
15
Order by
• import pyspark.sql.functions as fc
• country.orderBy(‘Continent’)
• country.orderBy(fc.desc(‘Continent’))
• Alternative: country.orderBy(country.Continent.desc())
• country.orderBy([‘Continent’, ‘GNP’], ascending=[True, False])
• Alternatives: replacing orderBy with sort
• country.sort(fc.desc(‘Continent’), fc.desc(‘GNP’))
• country.sort([‘Continent’, ‘GNP’], ascending=[True, False])
16
Aggregation function
• count
• max,min
• avg/mean
• sum
17
Example: putting them together
• country[(country.GNP > 1000) & (country.GNP < 10000)].groupBy('Continent', 'Region').agg(fc.mean('LifeExpectancy').alias('avg_le'), fc.count('*').alias('cnt')).filter('cnt > 5′).orderBy(fc.desc(‘avg_le’)).show()
=> select Continent, avg(LifeExpecancy) avg_le, count(*) cnt
from country
group by Continent
having cnt > 5
order by avg_le desc
18
Limit
• res = country[(country.GNP > 1000) & (country.GNP < 10000)].groupBy('Continent', 'Region').agg(fc.mean('LifeExpectancy').alias('avg_le'), fc.count('*').alias('cnt')) • res[res.cnt > 5].orderBy(fc.desc(‘avg_le’)).limit(2)
=>
select Continent, Region, avg(LifeExpectancy) avg_le, count(*) cnt
from country
group by Continent, Region
having cnt > 5
order by avg_le desc
limit 2
19
Join
• country.join(city, country.Capital == city.ID)
• country.join(city, (country.Capital == city.ID) & (country.Population >
city.Population))
• Alternative:
• country.join(city, [country.Capital == city.ID, country.Population >
city.Population])
20
Natural join
• cl.join(city, ‘CountryCode’)
• Equivalent to: cl.join(city, cl.CountryCode == city.CountryCode)
select *
from countrylanguage natural join city
21
Outer join
• country.join(city, country.Capital == city.ID, how=’left’).filter(“ID is
null”)
=>
select *
from country left outer join city on country.Capital = city.ID
where city.ID is null
22
Union
• usa = cl[(cl.CountryCode == ‘USA’)][[‘Language’, “IsOfficial”]]
• can = cl[(cl.CountryCode == ‘CAN’)][[‘Language’, “IsOfficial”]]
• usa_can = cl[(cl.CountryCode == ‘USA’) | (cl.CountryCode ==
‘CAN’)][[‘Language’, “IsOfficial”]]
• Bag union
• usa.union(can) or usa.unionAll(can)
• Set union
• usa.union(can).distinct()
23
24
>>> usa.orderBy(‘Language’).show()
+———-+———-+
| Language|IsOfficial|
+———-+———-+
| Chinese| F|
| English| T|
| French| F|
| German| F|
| Italian| F|
| Japanese| F|
| Korean| F|
| Polish| F|
|Portuguese| F|
| Spanish| F|
| Tagalog| F|
|Vietnamese| F|
+———-+———-+
>>> can.orderBy(‘Language’).show()
+—————-+———-+
| Language|IsOfficial|
+—————-+———-+
| Chinese| F|
| Dutch| F|
| English| T|
|Eskimo Languages| F|
| French| T|
| German| F|
| Italian| F|
| Polish| F|
| Portuguese| F|
| Punjabi| F|
| Spanish| F|
| Ukrainian| F|
+—————-+———-+
Intersection
• Bag intersection:
• usa_can.intersectAll(usa_can)
• Set intersection:
• usa_can.intersect(usa_can)
• Note it removes duplicates
25
+—————-+———-+
| Language|IsOfficial|
+—————-+———-+
| Chinese| F|
| Chinese| F|
| Dutch| F|
| English| T|
| English| T|
|Eskimo Languages| F|
| French| F|
| French| T|
| German| F|
| German| F|
| Italian| F|
| Italian| F|
…
+—————-+———-+
| Language|IsOfficial|
+—————-+———-+
| Chinese| F|
| Dutch| F|
| English| T|
|Eskimo Languages| F|
| French| F|
| French| T|
| German| F|
| Italian| F|
…
Subtract (set semantics)
• uc = usa_can.union(usa_can)
• uc.orderBy(‘Language’).show()
• uc.subtract(can).show()
+———-+———-+
| Language|IsOfficial|
+———-+———-+
| Japanese| F|
| Tagalog| F|
| French| F|
|Vietnamese| F|
| Korean| F|
+———-+———-+
+—————-+———-+
| Language|IsOfficial|
+—————-+———-+
| Chinese| F|
| Dutch| F|
| English| T|
|Eskimo Languages| F|
| French| T|
| German| F|
| Italian| F|
| Polish| F|
| Portuguese| F|
| Punjabi| F|
| Spanish| F|
| Ukrainian| F|
+—————-+———-+
CAN
| French| F|
| French| T|
| French| T|
| French| F|
26
Spark installation
• http://spark.apache.org/downloads.html
• Choose “pre-built for Hadoop 3.2 and later”
• Direct link (choose version 3.0.2):
• wget https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-
hadoop3.2.tgz
27
http://spark.apache.org/downloads.html
Spark installation
• tar xvf spark-3.0.2-bin-hadoop3.2.tgz
• This will create “spark-3.0.2-bin-hadoop3.2” folder
• Containing all Spark stuffs (scripts, programs, libraries, examples, data)
28
Prerequisites
• Make sure Java is installed & JAVA_HOME is set
• Put these in your ~/.bashrc file
• export JAVA_HOME=/usr/lib/jvm/java
• export PYSPARK_PYTHON=python3.8 # needed to use Python 3
• export PATH=$PATH:/home/ec2-user/spark-3.0.2-bin-hadoop3.2/bin
29
Accessing Spark from Python
• Interactive shell:
• bin/pyspark
• A SparkSession object spark will be automatically created
• bin/pyspark –master local[4]
• This starts Spark on local host with 4 threads
• “–master” specifies the location of Spark master node
30
Accessing Spark from Python
• Standalone program
• Executed using spark-submit script
• E.g., bin/spark-submit
• You may find many Python Spark examples under
• examples/src/main/python
31
Dataframe to RDD and back
• df.rdd.toDF()
• df.rdd.map(lambda r: (r[‘a’], r[‘b’], 1)).toDF([‘a’, ‘b’, ‘c’]).show()
32
+—+—+—+
| a| b| c|
+—+—+—+
| 1| 1| 1|
| 1| 2| 1|
| 1| 3| 1|
+—+—+—+
Resources
• Important classes of Spark SQL and DataFrames
• https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
• read.csv(…), read.json(…)
• https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.s
ql.DataFrameReader
33
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html