Dataframe operations¶
In [1]:
from pyspark.sql import Row
row = Row(name=”Alice”, age=11)
print row
print row[‘name’], row[‘age’]
print row.name, row.age
row = Row(name=”Alice”, age=11, count=1)
print row.count
print row[‘count’]
File “
print row
^
SyntaxError: Missing parentheses in call to ‘print’. Did you mean print(row)?
In [ ]:
# Data file at https://www.cse.ust.hk/msbd5003/data/building.csv
df = spark.read.csv(‘../data/building.csv’, header=True, inferSchema=True)
In [ ]:
# show the content of the dataframe
df.show()
In [ ]:
# Print the dataframe schema in a tree format
df.printSchema()
In [ ]:
# Create an RDD from the dataframe
dfrdd = df.rdd
dfrdd.take(3)
In [2]:
# Retrieve specific columns from the dataframe
df.select(‘BuildingID’, ‘Country’).show()
—————————————————————————
NameError Traceback (most recent call last)
1 # Retrieve specific columns from the dataframe
—-> 2 df.select(‘BuildingID’, ‘Country’).show()
NameError: name ‘df’ is not defined
In [3]:
from pyspark.sql.functions import *
df.where(“Country=’USA'”).select(‘BuildingID’, lit(‘OK’)).show()
—————————————————————————
NameError Traceback (most recent call last)
1 from pyspark.sql.functions import *
2
—-> 3 df.where(“Country=’USA'”).select(‘BuildingID’, lit(‘OK’)).show()
NameError: name ‘df’ is not defined
In [4]:
# Use GroupBy clause with dataframe
df.groupBy(‘HVACProduct’).count().show()
—————————————————————————
NameError Traceback (most recent call last)
1 # Use GroupBy clause with dataframe
—-> 2 df.groupBy(‘HVACProduct’).count().show()
NameError: name ‘df’ is not defined
Rewriting SQL with DataFrame API¶
The data files have been put to a public blob container, which can be accessed as follows
In [1]:
# Load data from csv files
# Data files at https://www.cse.ust.hk/msbd5003/data
dfCustomer = spark.read.csv(‘../data/Customer.csv’, header=True, inferSchema=True)
dfProduct = spark.read.csv(‘../data/Product.csv’, header=True, inferSchema=True)
dfDetail = spark.read.csv(‘../data/SalesOrderDetail.csv’, header=True, inferSchema=True)
dfHeader = spark.read.csv(‘../data/SalesOrderHeader.csv’, header=True, inferSchema=True)
In [14]:
# SELECT ProductID, Name, ListPrice
# FROM Product
# WHERE Color = ‘black’
dfProduct.filter(“Color = ‘Black'”)\
.select(‘ProductID’, ‘Name’, ‘ListPrice’)\
.show(truncate=False)
+———+—————————–+———+
|ProductID|Name |ListPrice|
+———+—————————–+———+
|680 |HL Road Frame – Black, 58 |1431.5 |
|708 |Sport-100 Helmet, Black |34.99 |
|722 |LL Road Frame – Black, 58 |337.22 |
|723 |LL Road Frame – Black, 60 |337.22 |
|724 |LL Road Frame – Black, 62 |337.22 |
|736 |LL Road Frame – Black, 44 |337.22 |
|737 |LL Road Frame – Black, 48 |337.22 |
|738 |LL Road Frame – Black, 52 |337.22 |
|743 |HL Mountain Frame – Black, 42|1349.6 |
|744 |HL Mountain Frame – Black, 44|1349.6 |
|745 |HL Mountain Frame – Black, 48|1349.6 |
|746 |HL Mountain Frame – Black, 46|1349.6 |
|747 |HL Mountain Frame – Black, 38|1349.6 |
|765 |Road-650 Black, 58 |782.99 |
|766 |Road-650 Black, 60 |782.99 |
|767 |Road-650 Black, 62 |782.99 |
|768 |Road-650 Black, 44 |782.99 |
|769 |Road-650 Black, 48 |782.99 |
|770 |Road-650 Black, 52 |782.99 |
|775 |Mountain-100 Black, 38 |3374.99 |
+———+—————————–+———+
only showing top 20 rows
In [4]:
dfProduct.where(dfProduct.Color==’Black’) \
.select(dfProduct.ProductID, dfProduct[‘Name’], (dfProduct.ListPrice * 2).alias(‘Double price’)) \
.show(truncate=False)
+———+—————————–+————+
|ProductID|Name |Double price|
+———+—————————–+————+
|680 |HL Road Frame – Black, 58 |2863.0 |
|708 |Sport-100 Helmet, Black |69.98 |
|722 |LL Road Frame – Black, 58 |674.44 |
|723 |LL Road Frame – Black, 60 |674.44 |
|724 |LL Road Frame – Black, 62 |674.44 |
|736 |LL Road Frame – Black, 44 |674.44 |
|737 |LL Road Frame – Black, 48 |674.44 |
|738 |LL Road Frame – Black, 52 |674.44 |
|743 |HL Mountain Frame – Black, 42|2699.2 |
|744 |HL Mountain Frame – Black, 44|2699.2 |
|745 |HL Mountain Frame – Black, 48|2699.2 |
|746 |HL Mountain Frame – Black, 46|2699.2 |
|747 |HL Mountain Frame – Black, 38|2699.2 |
|765 |Road-650 Black, 58 |1565.98 |
|766 |Road-650 Black, 60 |1565.98 |
|767 |Road-650 Black, 62 |1565.98 |
|768 |Road-650 Black, 44 |1565.98 |
|769 |Road-650 Black, 48 |1565.98 |
|770 |Road-650 Black, 52 |1565.98 |
|775 |Mountain-100 Black, 38 |6749.98 |
+———+—————————–+————+
only showing top 20 rows
In [5]:
dfProduct.where(dfProduct.ListPrice * 2 > 100) \
.select(dfProduct.ProductID, dfProduct[‘Name’], dfProduct.ListPrice * 2) \
.show(truncate=False)
—————————————————————————
NameError Traceback (most recent call last)
—-> 1 dfProduct.where(dfProduct.ListPrice * 2 > 100) \
2 .select(dfProduct.ProductID, dfProduct[‘Name’], dfProduct.ListPrice * 2) \
3 .show(truncate=False)
NameError: name ‘dfProduct’ is not defined
In [14]:
# SELECT ProductID, Name, ListPrice
# FROM Product
# WHERE Color = ‘black’
# ORDER BY ProductID
dfProduct.filter(“Color = ‘Black'”)\
.select(‘ProductID’, ‘Name’, ‘ListPrice’)\
.orderBy(‘ListPrice’)\
.show(truncate=False)
+———+————————–+———+
|ProductID|Name |ListPrice|
+———+————————–+———+
|860 |Half-Finger Gloves, L |24.49 |
|859 |Half-Finger Gloves, M |24.49 |
|858 |Half-Finger Gloves, S |24.49 |
|708 |Sport-100 Helmet, Black |34.99 |
|862 |Full-Finger Gloves, M |37.99 |
|861 |Full-Finger Gloves, S |37.99 |
|863 |Full-Finger Gloves, L |37.99 |
|841 |Men’s Sports Shorts, S |59.99 |
|849 |Men’s Sports Shorts, M |59.99 |
|851 |Men’s Sports Shorts, XL |59.99 |
|850 |Men’s Sports Shorts, L |59.99 |
|815 |LL Mountain Front Wheel |60.745 |
|868 |Women’s Mountain Shorts, M|69.99 |
|869 |Women’s Mountain Shorts, L|69.99 |
|867 |Women’s Mountain Shorts, S|69.99 |
|853 |Women’s Tights, M |74.99 |
|854 |Women’s Tights, L |74.99 |
|852 |Women’s Tights, S |74.99 |
|818 |LL Road Front Wheel |85.565 |
|823 |LL Mountain Rear Wheel |87.745 |
+———+————————–+———+
only showing top 20 rows
In [6]:
# Find all orders and details on black product,
# return the product SalesOrderID, SalesOrderDetailID, Name, UnitPrice, and OrderQty
# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesOrderDetail.ProductID = Product.ProductID AND Color = ‘Black’
# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = ‘Black’
# Spark SQL supports natural joins
dfDetail.join(dfProduct, ‘ProductID’) \
.select(‘SalesOrderID’, ‘SalesOrderDetailID’, ‘Name’, ‘UnitPrice’, ‘OrderQty’) \
.filter(“Color=’Black'”)\
.show()
# If we move the filter to after select, it still works. Why?
+————+——————+——————–+———+——–+
|SalesOrderID|SalesOrderDetailID| Name|UnitPrice|OrderQty|
+————+——————+——————–+———+——–+
| 71780| 110620|Mountain-500 Blac…| 323.994| 1|
| 71780| 110621|LL Mountain Frame…| 149.874| 1|
| 71780| 110622|HL Mountain Frame…| 809.76| 1|
| 71780| 110623|Mountain-200 Blac…| 1376.994| 4|
| 71780| 110627|Women’s Mountain …| 41.994| 6|
| 71780| 110629|Mountain-500 Blac…| 323.994| 2|
| 71780| 110630|Mountain-500 Blac…| 323.994| 3|
| 71780| 110631|Mountain-500 Blac…| 323.994| 1|
| 71780| 110632|Mountain-500 Blac…| 323.994| 2|
| 71780| 110638|Mountain-200 Blac…| 1376.994| 5|
| 71780| 110642|LL Mountain Frame…| 149.874| 1|
| 71780| 110643|Women’s Mountain …| 41.994| 7|
| 71782| 110690|Sport-100 Helmet,…| 20.994| 7|
| 71782| 110697| HL Crankset| 242.994| 2|
| 71782| 110705|Half-Finger Glove…| 14.694| 1|
| 71783| 110710|LL Road Frame – B…| 202.332| 4|
| 71783| 110712| Road-250 Black, 44| 1466.01| 3|
| 71783| 110713| Road-750 Black, 58| 323.994| 4|
| 71783| 110715|Half-Finger Glove…| 14.694| 7|
| 71783| 110725|Half-Finger Glove…| 14.694| 2|
+————+——————+——————–+———+——–+
only showing top 20 rows
In [7]:
# This also works:
d1 = dfDetail.join(dfProduct, ‘ProductID’) \
.select(‘SalesOrderID’, ‘SalesOrderDetailID’, ‘Name’, ‘UnitPrice’, ‘OrderQty’)
d1.show()
d2 = d1.filter(“Color = ‘Black'”)
d2.show()
d2.explain()
+————+——————+——————–+———+——–+
|SalesOrderID|SalesOrderDetailID| Name|UnitPrice|OrderQty|
+————+——————+——————–+———+——–+
| 71774| 110562|ML Road Frame-W -…| 356.898| 1|
| 71774| 110563|ML Road Frame-W -…| 356.898| 1|
| 71776| 110567| Rear Brakes| 63.9| 1|
| 71780| 110616|ML Mountain Frame…| 218.454| 4|
| 71780| 110617|Mountain-400-W Si…| 461.694| 2|
| 71780| 110618|Mountain-500 Silv…| 112.998| 6|
| 71780| 110619|HL Mountain Frame…| 818.7| 2|
| 71780| 110620|Mountain-500 Blac…| 323.994| 1|
| 71780| 110621|LL Mountain Frame…| 149.874| 1|
| 71780| 110622|HL Mountain Frame…| 809.76| 1|
| 71780| 110623|Mountain-200 Blac…| 1376.994| 4|
| 71780| 110624|LL Mountain Frame…| 158.43| 2|
| 71780| 110625|Mountain-200 Silv…| 1391.994| 4|
| 71780| 110626| HL Mountain Pedal| 48.594| 1|
| 71780| 110627|Women’s Mountain …| 41.994| 6|
| 71780| 110628|Mountain-500 Silv…| 112.998| 1|
| 71780| 110629|Mountain-500 Blac…| 323.994| 2|
| 71780| 110630|Mountain-500 Blac…| 323.994| 3|
| 71780| 110631|Mountain-500 Blac…| 323.994| 1|
| 71780| 110632|Mountain-500 Blac…| 323.994| 2|
+————+——————+——————–+———+——–+
only showing top 20 rows
+————+——————+——————–+———+——–+
|SalesOrderID|SalesOrderDetailID| Name|UnitPrice|OrderQty|
+————+——————+——————–+———+——–+
| 71780| 110620|Mountain-500 Blac…| 323.994| 1|
| 71780| 110621|LL Mountain Frame…| 149.874| 1|
| 71780| 110622|HL Mountain Frame…| 809.76| 1|
| 71780| 110623|Mountain-200 Blac…| 1376.994| 4|
| 71780| 110627|Women’s Mountain …| 41.994| 6|
| 71780| 110629|Mountain-500 Blac…| 323.994| 2|
| 71780| 110630|Mountain-500 Blac…| 323.994| 3|
| 71780| 110631|Mountain-500 Blac…| 323.994| 1|
| 71780| 110632|Mountain-500 Blac…| 323.994| 2|
| 71780| 110638|Mountain-200 Blac…| 1376.994| 5|
| 71780| 110642|LL Mountain Frame…| 149.874| 1|
| 71780| 110643|Women’s Mountain …| 41.994| 7|
| 71782| 110690|Sport-100 Helmet,…| 20.994| 7|
| 71782| 110697| HL Crankset| 242.994| 2|
| 71782| 110705|Half-Finger Glove…| 14.694| 1|
| 71783| 110710|LL Road Frame – B…| 202.332| 4|
| 71783| 110712| Road-250 Black, 44| 1466.01| 3|
| 71783| 110713| Road-750 Black, 58| 323.994| 4|
| 71783| 110715|Half-Finger Glove…| 14.694| 7|
| 71783| 110725|Half-Finger Glove…| 14.694| 2|
+————+——————+——————–+———+——–+
only showing top 20 rows
== Physical Plan ==
*Project [SalesOrderID#102, SalesOrderDetailID#103, Name#56, UnitPrice#106, OrderQty#104]
+- *BroadcastHashJoin [ProductID#105], [ProductID#55], Inner, BuildRight
:- *Project [SalesOrderID#102, SalesOrderDetailID#103, OrderQty#104, ProductID#105, UnitPrice#106]
: +- *Filter isnotnull(ProductID#105)
: +- *FileScan csv [SalesOrderID#102,SalesOrderDetailID#103,OrderQty#104,ProductID#105,UnitPrice#106] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/csproject/msbd5003/public_html/data/SalesOrderDetail.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ProductID)], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *Project [ProductID#55, Name#56]
+- *Filter ((isnotnull(Color#58) && (Color#58 = Black)) && isnotnull(ProductID#55))
+- *FileScan csv [ProductID#55,Name#56,Color#58] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/csproject/msbd5003/public_html/data/Product.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Color), EqualTo(Color,Black), IsNotNull(ProductID)], ReadSchema: struct
In [8]:
# This will report an error:
d1 = dfDetail.join(dfProduct, ‘ProductID’) \
.select(‘SalesOrderID’, ‘SalesOrderDetailID’, ‘Name’, ‘UnitPrice’, ‘OrderQty’)
d1.write.csv(‘temp.csv’, mode = ‘overwrite’, header = True)
d2 = spark.read.csv(‘temp.csv’, header = True, inferSchema = True)
d2.filter(“Color = ‘Black'”).show()
Out[8]:
Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
File “/tmp/kernel-PySpark-2d222b68-b19a-4db5-8f83-b91a82b73836/pyspark_runner.py”, line 189, in
eval(compiled_code)
File “
File “/csproject/msbd5003/python/pyspark/sql/dataframe.py”, line 1078, in filter
jdf = self._jdf.filter(condition)
File “/csproject/msbd5003/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File “/csproject/msbd5003/python/pyspark/sql/utils.py”, line 69, in deco
raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
AnalysisException: u”cannot resolve ‘`Color`’ given input columns: [OrderQty, UnitPrice, SalesOrderID, Name, SalesOrderDetailID]; line 1 pos 0;\n’Filter (‘Color = Black)\n+- Relation[SalesOrderID#435,SalesOrderDetailID#436,Name#437,UnitPrice#438,OrderQty#439] csv\n”
StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:280)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:748)
In [20]:
# Find all orders that include at least one black product,
# return the product SalesOrderID, Name, UnitPrice, and OrderQty
# SELECT DISTINCT SalesOrderID
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = ‘Black’
dfDetail.join(dfProduct.filter(“Color=’Black'”), ‘ProductID’) \
.select(‘SalesOrderID’) \
.distinct() \
.show()
+————+
|SalesOrderID|
+————+
| 71902|
| 71832|
| 71915|
| 71831|
| 71898|
| 71935|
| 71938|
| 71845|
| 71783|
| 71815|
| 71936|
| 71863|
| 71780|
| 71782|
| 71899|
| 71784|
| 71797|
+————+
In [5]:
# How many colors in the products?
# SELECT COUNT(DISTINCT Color)
# FROM SalesLT.Product
dfProduct.select(‘Color’).distinct().count()
# It’s 1 more than standard SQL. In standard SQL, COUNT() does not count NULLs.
Out[5]:
10
In [17]:
# Find the total price of each order,
# return SalesOrderID and total price (column name should be ‘totalprice’)
# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID
dfDetail.select(‘*’, (dfDetail.UnitPrice * dfDetail.OrderQty
* (1 – dfDetail.UnitPriceDiscount)).alias(‘netprice’))\
.groupBy(‘SalesOrderID’).sum(‘netprice’) \
.withColumnRenamed(‘sum(netprice)’, ‘TotalPrice’)\
.show()
+————+——————+
|SalesOrderID| TotalPrice|
+————+——————+
| 71867| 858.9|
| 71902|59894.209199999976|
| 71832| 28950.678108|
| 71915|1732.8899999999999|
| 71946| 31.584|
| 71895|221.25600000000003|
| 71816|2847.4079999999994|
| 71831| 1712.946|
| 71923| 96.108824|
| 71858|11528.844000000001|
| 71917| 37.758|
| 71897| 10585.05|
| 71885| 524.664|
| 71856|500.30400000000003|
| 71898| 53248.69200000002|
| 71774| 713.796|
| 71796| 47848.02600000001|
| 71935|5533.8689079999995|
| 71938| 74160.228|
| 71845| 34118.5356|
+————+——————+
only showing top 20 rows
In [21]:
# Find the total price of each order where the total price > 10000
# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000
dfDetail.select(‘*’, (dfDetail.UnitPrice * dfDetail. OrderQty
* (1 – dfDetail.UnitPriceDiscount)).alias(‘netprice’))\
.groupBy(‘SalesOrderID’).sum(‘netprice’) \
.withColumnRenamed(‘sum(netprice)’, ‘TotalPrice’)\
.where(‘TotalPrice > 10000’)\
.show()
+————+——————+
|SalesOrderID| TotalPrice|
+————+——————+
| 71902|59894.209199999976|
| 71832| 28950.678108|
| 71858|11528.844000000001|
| 71897| 10585.05|
| 71898| 53248.69200000002|
| 71796| 47848.02600000001|
| 71938| 74160.228|
| 71845| 34118.5356|
| 71783| 65683.367986|
| 71936| 79589.61602399996|
| 71780|29923.007999999998|
| 71782| 33319.98600000001|
| 71784| 89869.27631400003|
| 71797| 65123.46341800001|
+————+——————+
In [22]:
# Find the total price on the black products of each order where the total price > 10000
# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesLT.SalesOrderDetail.ProductID = SalesLT.Product.ProductID AND Color = ‘Black’
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000
dfDetail.select(‘*’, (dfDetail.UnitPrice * dfDetail. OrderQty
* (1 – dfDetail.UnitPriceDiscount)).alias(‘netprice’))\
.join(dfProduct.where(“Color = ‘Black'”), ‘ProductID’) \
.groupBy(‘SalesOrderID’).sum(‘netprice’) \
.withColumnRenamed(‘sum(netprice)’, ‘TotalPrice’)\
.where(‘TotalPrice > 10000’)\
.show()
+————+——————+
|SalesOrderID| TotalPrice|
+————+——————+
| 71902| 26677.884|
| 71832| 16883.748108|
| 71938| 33779.448|
| 71845| 18109.836|
| 71783|15524.117476000003|
| 71936| 44490.29042399999|
| 71780|16964.321999999996|
| 71797| 27581.613792|
+————+——————+
In [23]:
# For each customer, find the total quantity of black products bought.
# Report CustomerID, FirstName, LastName, and total quantity
# select saleslt.customer.customerid, FirstName, LastName, sum(orderqty)
# from saleslt.customer
# left outer join
# (
# saleslt.salesorderheader
# join saleslt.salesorderdetail
# on saleslt.salesorderdetail.salesorderid = saleslt.salesorderheader.salesorderid
# join saleslt.product
# on saleslt.product.productid = saleslt.salesorderdetail.productid and color = ‘black’
# )
# on saleslt.customer.customerid = saleslt.salesorderheader.customerid
# group by saleslt.customer.customerid, FirstName, LastName
# order by sum(orderqty) desc
d1 = dfDetail.join(dfProduct, ‘ProductID’)\
.where(‘Color = “Black”‘) \
.join(dfHeader, ‘SalesOrderID’)\
.groupBy(‘CustomerID’).sum(‘OrderQty’)
dfCustomer.join(d1, ‘CustomerID’, ‘left_outer’)\
.select(‘CustomerID’, ‘FirstName’, ‘LastName’, ‘sum(OrderQty)’)\
.orderBy(‘sum(OrderQty)’, ascending=False)\
.show()
+———-+————+————+————-+
|CustomerID| FirstName| LastName|sum(OrderQty)|
+———-+————+————+————-+
| 30050| Krishna|Sunkammurali| 89|
| 29796| Jon| Grande| 65|
| 29957| Kevin| Liu| 62|
| 29929| Jeffrey| Kurtz| 46|
| 29546| Christopher| Beck| 45|
| 29922| Pamala| Kotc| 34|
| 30113| Raja| Venugopal| 34|
| 29938| Frank| Campbell| 29|
| 29736| Terry| Eminhizer| 23|
| 29485| Catherine| Abel| 10|
| 30019| Matthew| Miller| 9|
| 29932| Rebecca| Laszlo| 7|
| 29975| Walter| Mays| 5|
| 29638| Rosmarie| Carroll| 2|
| 30089|Michael John| Troyer| 1|
| 29568| Donald| Blanton| 1|
| 29531| Cory| Booth| 1|
+———-+————+————+————-+
Embed SQL queries¶
You can also run SQL queries over dataframes once you register them as temporary tables within the SparkSession.
In [10]:
# Register the dataframe as a temporary view called HVAC
df.createOrReplaceTempView(‘HVAC’)
In [11]:
spark.sql(‘SELECT * FROM HVAC WHERE BuildingAge >= 10’).show()
+———-+———–+———–+———–+————+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct| Country|
+———-+———–+———–+———–+————+
| 1| M1| 25| AC1000| USA|
| 2| M2| 27| FN39TG| France|
| 3| M3| 28| JDNS77| Brazil|
| 4| M4| 17| GG1919| Finland|
| 7| M7| 13| FN39TG|South Africa|
| 8| M8| 25| JDNS77| Australia|
| 9| M9| 11| GG1919| Mexico|
| 10| M10| 23| ACMAX22| China|
| 11| M11| 14| AC1000| Belgium|
| 12| M12| 26| FN39TG| Finland|
| 13| M13| 25| JDNS77|Saudi Arabia|
| 14| M14| 17| GG1919| Germany|
| 15| M15| 19| ACMAX22| Israel|
| 16| M16| 23| AC1000| Turkey|
| 17| M17| 11| FN39TG| Egypt|
| 18| M18| 25| JDNS77| Indonesia|
| 19| M19| 14| GG1919| Canada|
| 20| M20| 19| ACMAX22| Argentina|
+———-+———–+———–+———–+————+
In [12]:
# Can even mix DataFrame API with SQL:
df.where(‘BuildingAge >= 10’).createOrReplaceTempView(‘OldBuildings’)
spark.sql(‘SELECT HVACproduct, COUNT(*) FROM OldBuildings GROUP BY HVACproduct’).show()
+———–+——–+
|HVACproduct|count(1)|
+———–+——–+
| ACMAX22| 3|
| AC1000| 3|
| JDNS77| 4|
| FN39TG| 4|
| GG1919| 4|
+———–+——–+
In [13]:
d1 = spark.sql(‘SELECT * FROM HVAC WHERE BuildingAge >= 10’)
d1.groupBy(‘HVACproduct’).count().show()
+———–+—–+
|HVACproduct|count|
+———–+—–+
| ACMAX22| 3|
| AC1000| 3|
| JDNS77| 4|
| FN39TG| 4|
| GG1919| 4|
+———–+—–+
In [24]:
# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s)+2, IntegerType())
df.select(‘*’, slen(df[‘Country’]).alias(‘slen’)).show()
+———-+———–+———–+———–+————+—-+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct| Country|slen|
+———-+———–+———–+———–+————+—-+
| 1| M1| 25| AC1000| USA| 5|
| 2| M2| 27| FN39TG| France| 8|
| 3| M3| 28| JDNS77| Brazil| 8|
| 4| M4| 17| GG1919| Finland| 9|
| 5| M5| 3| ACMAX22| Hong Kong| 11|
| 6| M6| 9| AC1000| Singapore| 11|
| 7| M7| 13| FN39TG|South Africa| 14|
| 8| M8| 25| JDNS77| Australia| 11|
| 9| M9| 11| GG1919| Mexico| 8|
| 10| M10| 23| ACMAX22| China| 7|
| 11| M11| 14| AC1000| Belgium| 9|
| 12| M12| 26| FN39TG| Finland| 9|
| 13| M13| 25| JDNS77|Saudi Arabia| 14|
| 14| M14| 17| GG1919| Germany| 9|
| 15| M15| 19| ACMAX22| Israel| 8|
| 16| M16| 23| AC1000| Turkey| 8|
| 17| M17| 11| FN39TG| Egypt| 7|
| 18| M18| 25| JDNS77| Indonesia| 11|
| 19| M19| 14| GG1919| Canada| 8|
| 20| M20| 19| ACMAX22| Argentina| 11|
+———-+———–+———–+———–+————+—-+
In [15]:
spark.udf.register(‘slen’, lambda s: len(s), IntegerType())
spark.sql(‘SELECT *, slen(Country) AS slen FROM HVAC’).show()
+———-+———–+———–+———–+————+—-+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct| Country|slen|
+———-+———–+———–+———–+————+—-+
| 1| M1| 25| AC1000| USA| 3|
| 2| M2| 27| FN39TG| France| 6|
| 3| M3| 28| JDNS77| Brazil| 6|
| 4| M4| 17| GG1919| Finland| 7|
| 5| M5| 3| ACMAX22| Hong Kong| 9|
| 6| M6| 9| AC1000| Singapore| 9|
| 7| M7| 13| FN39TG|South Africa| 12|
| 8| M8| 25| JDNS77| Australia| 9|
| 9| M9| 11| GG1919| Mexico| 6|
| 10| M10| 23| ACMAX22| China| 5|
| 11| M11| 14| AC1000| Belgium| 7|
| 12| M12| 26| FN39TG| Finland| 7|
| 13| M13| 25| JDNS77|Saudi Arabia| 12|
| 14| M14| 17| GG1919| Germany| 7|
| 15| M15| 19| ACMAX22| Israel| 6|
| 16| M16| 23| AC1000| Turkey| 6|
| 17| M17| 11| FN39TG| Egypt| 5|
| 18| M18| 25| JDNS77| Indonesia| 9|
| 19| M19| 14| GG1919| Canada| 6|
| 20| M20| 19| ACMAX22| Argentina| 9|
+———-+———–+———–+———–+————+—-+
Flexible Data Model¶
Sample data file at
https://www.cse.ust.hk/msbd5003/data/products.json
In [1]:
df = spark.read.json(‘../data/products.json’)
df.printSchema()
root
|– dimensions: struct (nullable = true)
| |– height: double (nullable = true)
| |– length: double (nullable = true)
| |– width: double (nullable = true)
|– id: long (nullable = true)
|– name: string (nullable = true)
|– price: double (nullable = true)
|– tags: array (nullable = true)
| |– element: string (containsNull = true)
|– warehouseLocation: struct (nullable = true)
| |– latitude: double (nullable = true)
| |– longitude: double (nullable = true)
In [2]:
df.show()
+————–+—+—————-+—–+———–+—————–+
| dimensions| id| name|price| tags|warehouseLocation|
+————–+—+—————-+—–+———–+—————–+
|[9.5,7.0,12.0]| 2|An ice sculpture| 12.5|[cold, ice]| [-78.75,20.4]|
| [1.0,3.1,1.0]| 3| A blue mouse| 25.5| null| [54.4,-32.7]|
+————–+—+—————-+—–+———–+—————–+
In [4]:
# Accessing nested fields
df.select(df[‘dimensions.height’]).show()
+——+
|height|
+——+
| 9.5|
| 1.0|
+——+
In [19]:
df.select(‘dimensions.height’).show()
+——+
|height|
+——+
| 9.5|
| 1.0|
+——+
In [20]:
df.select(‘dimensions.height’)\
.filter(“tags[0] = ‘cold’ AND warehouseLocation.latitude < 0")\
.show()
+------+
|height|
+------+
| 9.5|
+------+
In [21]:
df.rdd.take(3)
Out[21]:
[Row(dimensions=Row(height=9.5, length=7.0, width=12.0), id=2, name=u'An ice sculpture', price=12.5, tags=[u'cold', u'ice'], warehouseLocation=Row(latitude=-78.75, longitude=20.4)), Row(dimensions=Row(height=1.0, length=3.1, width=1.0), id=3, name=u'A blue mouse', price=25.5, tags=None, warehouseLocation=Row(latitude=54.4, longitude=-32.7))]
Converting between RDD and DataFrame¶
Sample data file at:
https://www.cse.ust.hk/msbd5003/data/people.txt
In [22]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")
def parse(l):
a = l.split(',')
return (a[0], int(a[1]))
rdd = lines.map(parse)
rdd.collect()
Out[22]:
[(u'Michael', 29), (u'Andy', 30), (u'Justin', 19)]
In [23]:
# Create the DataFrame from an RDD of tuples, schema is inferred
df = spark.createDataFrame(rdd)
df.printSchema()
df.show()
root
|-- _1: string (nullable = true)
|-- _2: long (nullable = true)
+-------+---+
| _1| _2|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
In [24]:
# Create the DataFrame from an RDD of tuples with column names, type is inferred
df = spark.createDataFrame(rdd, ['name', 'age'])
df.printSchema()
df.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
In [25]:
# Create the DataFrame from an RDD of Rows, type is given in the Row objects
from pyspark.sql import Row
rdd_rows = rdd.map(lambda p: Row(name = p[0], age = p[1]))
df = spark.createDataFrame(rdd_rows)
df.printSchema()
df.show()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+---+-------+
|age| name|
+---+-------+
| 29|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
In [26]:
# Row fields with types incompatible with that of previous rows will be turned into nulls
row1 = Row(name="Alice", age=11)
row2 = Row(name="Bob", age='12')
rdd_rows = sc.parallelize([row1, row2])
df1 = spark.createDataFrame(rdd_rows)
df1.show()
+----+-----+
| age| name|
+----+-----+
| 11|Alice|
|null| Bob|
+----+-----+
In [27]:
# rdd returns the content as an RDD of Rows
teenagers = df.filter('age >= 13 and age <= 19')
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name)
teenNames.collect()
Out[27]:
[u'Name: Justin']
Note:¶
DataFrames are stored using columnar storage with compression
RDDs are stored using row storage without compression
The RDD view of DataFrame just provides an interface, the Row objects are constructed on the fly and do not necessarily represent the internal storage format of the data
Closure in DataFrames¶
In [18]:
data = range(10)
df = spark.createDataFrame(zip(data, data))
df.printSchema()
df.show()
root
|-- _1: long (nullable = true)
|-- _2: long (nullable = true)
+---+---+
| _1| _2|
+---+---+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+---+
In [19]:
# The 'closure' behaviour in RDD doesn't seem to exist for DataFrames
x = 5
df1 = df.filter(df._1 < x)
df1.show()
x = 3
df1.show()
+---+---+
| _1| _2|
+---+---+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
+---+---+
In [6]:
# Because of the Catalyst optimizer !
df1.explain()
== Physical Plan ==
*Filter (isnotnull(_1#9L) && (_1#9L < 5))
+- Scan ExistingRDD[_1#9L,_2#10L]
In [34]:
def f():
return x/2
x = 5
df1 = df.select(df._1 * 2 + f() + 1 + 1)
df1.explain()
== Physical Plan ==
*Project [((_1#773L * 2) + 4) AS ((((_1 * 2) + 2) + 1) + 1)#794L]
+- Scan ExistingRDD[_1#773L,_2#774L]
In [20]:
rdd = sc.parallelize(range(10))
x = 5
a = rdd.filter(lambda z: z < x)
print a.take(10)
x = 3
print a.take(10)
[0, 1, 2, 3, 4]
[0, 1, 2]
In [1]:
counter = 0
def increment_counter(x):
global counter
counter += 1
df.foreach(increment_counter)
print counter
2
In [ ]: