程序代写代做代考 database SQL python Hive chain Data Processing using Pyspark¶

Data Processing using Pyspark¶

We will go through how to process (select columns, filter rows, aggregate, generate) data in this notebook.
If you are familiar with working with DataFrames in previous versions of Spark (e.g. Spark 1.x), you will notice that in Spark 2.0, we are using SparkSession instead of SQLContext. The various Spark contexts (e.g. HiveContext, SQLContext, StreamingContext, and SparkContext have merged together in SparkSession.
In [1]:
#import SparkSession
from pyspark.sql import SparkSession

The first step is to create a SparkSession. The choice of appName is completely up to you.
In [2]:
#create spark session object
spark=SparkSession.builder.appName(‘data_processing’).getOrCreate()
In [3]:
rdd1=spark.sparkContext.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df_rdd1=rdd1.toDF([“a”,”b”,”c”])
df_rdd1.show()

+—+—+—+
| a| b| c|
+—+—+—+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
+—+—+—+

In [4]:
df_range=spark.range(8).toDF(“numbers”).show()

+——-+
|numbers|
+——-+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
+——-+

In [5]:
df_txt=spark.read.text(“city.txt”).show()

+——————–+
| value|
+——————–+
| THE SOCIAL CITY|
| |
|Of course, people…|
| |
|Although they’re …|
| |
|To create walkabl…|
| |
|Read more at http…|
+——————–+

In [6]:
!head city.txt

THE SOCIAL CITY

Of course, people navigate using many different types of social wayfinding during the course of their walk. Apps such as Google Maps or Citymapper can also be used in a social way.

Although they’re typically designed with a single navigator in mind, in reality it’s not unusual for two or more people to be using a device at the same time, passing it around, discussing the instructions and jointly making decisions about where to go.

To create walkable cities, of course it’s important for planners and city leaders to understand what sort of physical features encourage people to walk more. But acknowledging how social interactions influence people’s choices about when and where to walk would give leaders a much more realistic understanding of people’s behaviour – and put them in a better position to encourage walking as a means of getting around.

Read more at https://www.channelnewsasia.com/news/commentary/how-to-make-cities-more-walkable-11258064
In [7]:
df1 = spark.read.option(“multiline”,True).json(“a.json”)
df1.select(“Competition”,”FullName”,”DateOfBirth”,”Team”,”Year”).show(4)

+———–+—————–+———–+———+—-+
|Competition| FullName|DateOfBirth| Team|Year|
+———–+—————–+———–+———+—-+
| World Cup| Ãngel Bossio| 1905-5-5|Argentina|1930|
| World Cup| Juan Botasso| 1908-10-23|Argentina|1930|
| World Cup| Roberto Cherro| 1907-2-23|Argentina|1930|
| World Cup|Alberto Chividini| 1907-2-23|Argentina|1930|
+———–+—————–+———–+———+—-+
only showing top 4 rows

In [8]:
!head a.json

The inferSchema parameter has been set to True, that means Spark will automatically determine the data type (String, integer, etc) of each column of structured data in your CSV file.
In [9]:
# Load csv Dataset
df=spark.read.csv(‘sample_data.csv’,inferSchema=True,header=True)

Without displaying the entire contents of the CSV file, we can find out the names of the columns by using the columns attribute.
In [10]:
#columns of dataframe
df.columns
Out[10]:
[‘ratings’, ‘age’, ‘experience’, ‘family’, ‘mobile’]

We can simply use the len() method to determine the number of columns in the df DataFrame. There are 5 columns.
In [11]:
#check number of columns
len(df.columns)
Out[11]:
5

We can use the count() method to determine the number of rows or records are in the df DataFrame. There are 33 rows.
In [12]:
#number of records or rows in dataframe
df.count()
Out[12]:
33

When you are asked about the shape of a dataset or DataFrame, you are expected to provide the number of rows followed by the number of columns. The sequence of reporting is important, please stick to it.
In [13]:
#shape of dataset
print((df.count(),len(df.columns)))

(33, 5)

A database schema is a collection of metadata that describes the relations of objects and information within a database.
As you can see below, we can use the printSchema() method to obtain information (or meta-data, data about data) of each column. The information includes the column name, data type, whether a null/empty value is allowed.
• Note: The double data type is a double precision floating-point data type.
In [14]:
#printSchema
df.printSchema()

root
|– ratings: integer (nullable = true)
|– age: integer (nullable = true)
|– experience: double (nullable = true)
|– family: integer (nullable = true)
|– mobile: string (nullable = true)

Sometimes, we simply want to get a quick overview of what the records inside the DataFrame looks like without displaying all the rows of records. For this, we can use the show() method with an integer argument to decide the number of rows of data that will be displayed.
• Note: This is just for choosing the number of rows to display. The data in your DataFrame is not changed or removed in any way.
• What is displayed to you will depend on how the data in your DataFrame was originally sorted, assuming you did not sort it after reading the CSV file into the DataFrame.
In [15]:
#first few rows of dataframe
df.show(8)

+——-+—+———-+——+——-+
|ratings|age|experience|family| mobile|
+——-+—+———-+——+——-+
| 3| 32| 9.0| 3| Vivo|
| 3| 27| 13.0| 3| Apple|
| 4| 22| 2.5| 0|Samsung|
| 4| 37| 16.5| 4| Apple|
| 5| 27| 9.0| 1| MI|
| 4| 27| 9.0| 0| Oppo|
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 23.0| 5|Samsung|
+——-+—+———-+——+——-+
only showing top 8 rows

In [16]:
#select only 2 particular columns
df.select(‘age’,’mobile’).show(5)

+—+——-+
|age| mobile|
+—+——-+
| 32| Vivo|
| 27| Apple|
| 22|Samsung|
| 37| Apple|
| 27| MI|
+—+——-+
only showing top 5 rows

We can also use the describe() method together with the show() method to calculate and thereafter display several descriptive statistics of each of your numerical columns. It does not work for categorical columns, see the column named mobile.
As shown below:
• count
• mean
• standard deviation
• minimum
• maximum
In [17]:
#info about dataframe
df.describe().show()

+——-+——————+——————+——————+——————+——+
|summary| ratings| age| experience| family|mobile|
+——-+——————+——————+——————+——————+——+
| count| 33| 33| 33| 33| 33|
| mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181| null|
| stddev|1.1188806636071336| 6.18527087180309| 6.770731351213326|1.8448330794164254| null|
| min| 1| 22| 2.5| 0| Apple|
| max| 5| 42| 23.0| 5| Vivo|
+——-+——————+——————+——————+——————+——+

In [18]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

We can generate a new column using the contents of the other columns in the DataFrame. This processing of generating new columns is often called Feature Engineering.
To create a new column, we use the withColumn() method.
In [19]:
#with column
df.withColumn(“age_after_10_yrs”,(df[“age”]+10)).show(10,False)

+——-+—+———-+——+——-+—————-+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+——-+—+———-+——+——-+—————-+
|3 |32 |9.0 |3 |Vivo |42 |
|3 |27 |13.0 |3 |Apple |37 |
|4 |22 |2.5 |0 |Samsung|32 |
|4 |37 |16.5 |4 |Apple |47 |
|5 |27 |9.0 |1 |MI |37 |
|4 |27 |9.0 |0 |Oppo |37 |
|5 |37 |23.0 |5 |Vivo |47 |
|5 |37 |23.0 |5 |Samsung|47 |
|3 |22 |2.5 |0 |Apple |32 |
|3 |27 |6.0 |0 |MI |37 |
+——-+—+———-+——+——-+—————-+
only showing top 10 rows

Sometimes, we need to change the data type of the new column that we generate. The proper term for such data type conversion is type casting, hence we use the cast() method to achieve that.
• Note: The double data type is a double precision floating-point data type.
In [20]:
df.withColumn(‘age_double’,df[‘age’].cast(DoubleType())).show(10,False)

+——-+—+———-+——+——-+———-+
|ratings|age|experience|family|mobile |age_double|
+——-+—+———-+——+——-+———-+
|3 |32 |9.0 |3 |Vivo |32.0 |
|3 |27 |13.0 |3 |Apple |27.0 |
|4 |22 |2.5 |0 |Samsung|22.0 |
|4 |37 |16.5 |4 |Apple |37.0 |
|5 |27 |9.0 |1 |MI |27.0 |
|4 |27 |9.0 |0 |Oppo |27.0 |
|5 |37 |23.0 |5 |Vivo |37.0 |
|5 |37 |23.0 |5 |Samsung|37.0 |
|3 |22 |2.5 |0 |Apple |22.0 |
|3 |27 |6.0 |0 |MI |27.0 |
+——-+—+———-+——+——-+———-+
only showing top 10 rows

Filtering¶
We usually use the term filter when we are choosing subsets of the rows. On the other hand, we usually use the term select when we are choosing subsets of columns.
As shown below, we are using the filter() method to display only those rows that satisfy a specific criterion.
In [21]:
#filter the records
df.filter(df[‘mobile’]==’Vivo’).show()

+——-+—+———-+——+——+
|ratings|age|experience|family|mobile|
+——-+—+———-+——+——+
| 3| 32| 9.0| 3| Vivo|
| 5| 37| 23.0| 5| Vivo|
| 4| 37| 6.0| 0| Vivo|
| 5| 37| 13.0| 1| Vivo|
| 4| 37| 6.0| 0| Vivo|
+——-+—+———-+——+——+

Filtering using SQL query¶
We can re-write that above using an SQL query instead. But we first need to create a temporary table using the registerTempTable() method.
In [22]:
df.registerTempTable(‘df_table’)
spark.sql(“SELECT * FROM df_table WHERE mobile = ‘Vivo'”).show()

+——-+—+———-+——+——+
|ratings|age|experience|family|mobile|
+——-+—+———-+——+——+
| 3| 32| 9.0| 3| Vivo|
| 5| 37| 23.0| 5| Vivo|
| 4| 37| 6.0| 0| Vivo|
| 5| 37| 13.0| 1| Vivo|
| 4| 37| 6.0| 0| Vivo|
+——-+—+———-+——+——+

In [23]:
#filter the records, select columns
df.filter(df[‘mobile’]==’Vivo’).select(‘age’,’ratings’,’mobile’).show()

+—+——-+——+
|age|ratings|mobile|
+—+——-+——+
| 32| 3| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
+—+——-+——+

Filtering using multiple conditions¶
We can chain several filter() methods together to achieve the effect of filtering using multiple conditions.
In [24]:
#filter the multiple conditions
df.filter(df[‘mobile’]==’Vivo’).filter(df[‘experience’] >10).show()

+——-+—+———-+——+——+
|ratings|age|experience|family|mobile|
+——-+—+———-+——+——+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+——-+—+———-+——+——+

Using a different syntax¶
Below we show another different syntax to filter using multiple conditions.
In [25]:
#filter the multiple conditions
df.filter((df[‘mobile’]==’Vivo’) & (df[‘experience’] >10)).show()

+——-+—+———-+——+——+
|ratings|age|experience|family|mobile|
+——-+—+———-+——+——+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+——-+—+———-+——+——+

In [26]:
#filter the multiple conditions
df.filter((df[‘mobile’]==’Vivo’) | (df[‘experience’] >10)).show()

+——-+—+———-+——+——-+
|ratings|age|experience|family| mobile|
+——-+—+———-+——+——-+
| 3| 32| 9.0| 3| Vivo|
| 3| 27| 13.0| 3| Apple|
| 4| 37| 16.5| 4| Apple|
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 23.0| 5|Samsung|
| 3| 37| 16.5| 5| Apple|
| 1| 37| 23.0| 5| MI|
| 2| 42| 23.0| 2| Oppo|
| 4| 37| 6.0| 0| Vivo|
| 3| 37| 16.5| 5| Apple|
| 3| 42| 23.0| 5| MI|
| 5| 37| 13.0| 1| Vivo|
| 2| 32| 16.5| 2| Oppo|
| 4| 37| 6.0| 0| Vivo|
+——-+—+———-+——+——-+

In [27]:
#filter the multiple conditions
df.filter((df[‘mobile’]==’Vivo’) | ~(df[‘experience’] >10)).show()

+——-+—+———-+——+——-+
|ratings|age|experience|family| mobile|
+——-+—+———-+——+——-+
| 3| 32| 9.0| 3| Vivo|
| 4| 22| 2.5| 0|Samsung|
| 5| 27| 9.0| 1| MI|
| 4| 27| 9.0| 0| Oppo|
| 5| 37| 23.0| 5| Vivo|
| 3| 22| 2.5| 0| Apple|
| 3| 27| 6.0| 0| MI|
| 2| 27| 6.0| 2| Oppo|
| 5| 27| 6.0| 2|Samsung|
| 5| 27| 6.0| 0| MI|
| 4| 22| 6.0| 1| Oppo|
| 4| 37| 9.0| 2|Samsung|
| 4| 27| 6.0| 1| Apple|
| 4| 37| 6.0| 0| Vivo|
| 5| 22| 2.5| 0|Samsung|
| 2| 27| 9.0| 2|Samsung|
| 4| 27| 6.0| 1| Apple|
| 5| 27| 2.5| 0| MI|
| 2| 27| 6.0| 2| Oppo|
| 5| 37| 13.0| 1| Vivo|
+——-+—+———-+——+——-+
only showing top 20 rows

Pay close attention to the effect of adding a ~ sign. The consequence is that of negation, it is a reversal of the original condition. In other words, it has the same effect of adding a “not” to the original condition.
In [28]:
#filter the multiple conditions
df.filter((df[‘mobile’]==’Vivo’) & ~(df[‘experience’] >10)).show()

+——-+—+———-+——+——+
|ratings|age|experience|family|mobile|
+——-+—+———-+——+——+
| 3| 32| 9.0| 3| Vivo|
| 4| 37| 6.0| 0| Vivo|
| 4| 37| 6.0| 0| Vivo|
+——-+—+———-+——+——+

Using a SQL Query¶
We can re-write the above using a SQL query. Pay attention to how multiple conditions are specified using the WHERE clause. Remember to create a temporary table using the registerTempTable() method.
In [29]:
#filter the multiple conditions
df.registerTempTable(‘df_table’)
spark.sql(“SELECT * FROM df_table WHERE mobile = ‘Vivo’ AND experience<10").show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ | 3| 32| 9.0| 3| Vivo| | 4| 37| 6.0| 0| Vivo| | 4| 37| 6.0| 0| Vivo| +-------+---+----------+------+------+ Showing only unique values¶ We use the distinct() method together with the show() method to display only unique values in a particular column. In [30]: #Distinct Values in a column df.select('mobile').distinct().show() +-------+ | mobile| +-------+ | MI| | Oppo| |Samsung| | Vivo| | Apple| +-------+ In [31]: #distinct value count df.select('mobile').distinct().count() Out[31]: 5 Using groupBy()¶ Next, we use the groupBy() method together with the count() method to tabulate the number of items for each category. In [32]: # Value counts df.groupBy('mobile').count().show() +-------+-----+ | mobile|count| +-------+-----+ | MI| 8| | Oppo| 7| |Samsung| 6| | Vivo| 5| | Apple| 7| +-------+-----+ We can sort the rows of records using the orderBy() method. The default is ascending=True. In [33]: # sorting df.groupBy('mobile').count().orderBy('count',ascending=False).show() +-------+-----+ | mobile|count| +-------+-----+ | MI| 8| | Oppo| 7| | Apple| 7| |Samsung| 6| | Vivo| 5| +-------+-----+ In [34]: # Means df.groupBy('mobile').mean().show(5,False) +-------+------------------+------------------+------------------+------------------+ |mobile |avg(ratings) |avg(age) |avg(experience) |avg(family) | +-------+------------------+------------------+------------------+------------------+ |MI |3.5 |30.125 |10.1875 |1.375 | |Oppo |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286| |Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333| |Vivo |4.2 |36.0 |11.4 |1.8 | |Apple |3.4285714285714284|30.571428571428573|11.0 |2.7142857142857144| +-------+------------------+------------------+------------------+------------------+ In [35]: df.groupBy('mobile').sum().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)| +-------+------------+--------+---------------+-----------+ |MI |28 |241 |81.5 |11 | |Oppo |20 |199 |72.5 |10 | |Samsung|25 |172 |52.0 |11 | |Vivo |21 |180 |57.0 |9 | |Apple |24 |214 |77.0 |19 | +-------+------------+--------+---------------+-----------+ In [36]: # Value counts df.groupBy('mobile').max().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |max(ratings)|max(age)|max(experience)|max(family)| +-------+------------+--------+---------------+-----------+ |MI |5 |42 |23.0 |5 | |Oppo |4 |42 |23.0 |2 | |Samsung|5 |37 |23.0 |5 | |Vivo |5 |37 |23.0 |5 | |Apple |4 |37 |16.5 |5 | +-------+------------+--------+---------------+-----------+ In [37]: # Minimums df.groupBy('mobile').min().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |min(ratings)|min(age)|min(experience)|min(family)| +-------+------------+--------+---------------+-----------+ |MI |1 |27 |2.5 |0 | |Oppo |2 |22 |6.0 |0 | |Samsung|2 |22 |2.5 |0 | |Vivo |3 |32 |6.0 |0 | |Apple |3 |22 |2.5 |0 | +-------+------------+--------+---------------+-----------+ Aggregations¶ Aggregation functions are used to perform operations across the entire column. We can use the agg() method together with groupBy() method to take the sum of experience for each mobile brand. In [38]: #Aggregation df.groupBy('mobile').agg({'experience':'sum'}).show(5,False) +-------+---------------+ |mobile |sum(experience)| +-------+---------------+ |MI |81.5 | |Oppo |72.5 | |Samsung|52.0 | |Vivo |57.0 | |Apple |77.0 | +-------+---------------+ In [39]: #Aggregation df.groupBy('mobile').agg({'experience':'avg'}).show(5,False) +-------+------------------+ |mobile |avg(experience) | +-------+------------------+ |MI |10.1875 | |Oppo |10.357142857142858| |Samsung|8.666666666666666 | |Vivo |11.4 | |Apple |11.0 | +-------+------------------+ In [40]: #Aggregation df.groupBy('mobile').agg({'experience':'min'}).show(5,False) +-------+---------------+ |mobile |min(experience)| +-------+---------------+ |MI |2.5 | |Oppo |6.0 | |Samsung|2.5 | |Vivo |6.0 | |Apple |2.5 | +-------+---------------+ In [41]: #Aggregation df.groupBy('mobile').agg({'experience':'max'}).show(5,False) +-------+---------------+ |mobile |max(experience)| +-------+---------------+ |MI |23.0 | |Oppo |23.0 | |Samsung|23.0 | |Vivo |23.0 | |Apple |16.5 | +-------+---------------+ User-defined Functions (UDFs)¶ There are two types of UDFs: • Conventional UDFs • Pandas (Vectorized) UDFs There will definitely be a time that you cannot find any built-in (pre-written) methods that is able to solve the problem that you have at hand. Hence the only way is for the user (you) to write the function(s) yourself. In [42]: # UDF from pyspark.sql.functions import udf The following function is just an example of something that you can define or write. Nothing exceptionally difficult to understand here. Just take note of the data type of the return value. You will need to specify it in the udf() method later. Conventional UDF¶ In [43]: #normal function def price_range(brand): if brand in ['Samsung','Apple']: return 'High Price' elif brand =='MI': return 'Mid Price' else: return 'Low Price' In [44]: #create udf using python function brand_udf=udf(price_range,StringType()) #apply udf on dataframe df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False) +-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range| +-------+---+----------+------+-------+-----------+ |3 |32 |9.0 |3 |Vivo |Low Price | |3 |27 |13.0 |3 |Apple |High Price | |4 |22 |2.5 |0 |Samsung|High Price | |4 |37 |16.5 |4 |Apple |High Price | |5 |27 |9.0 |1 |MI |Mid Price | |4 |27 |9.0 |0 |Oppo |Low Price | |5 |37 |23.0 |5 |Vivo |Low Price | |5 |37 |23.0 |5 |Samsung|High Price | |3 |22 |2.5 |0 |Apple |High Price | |3 |27 |6.0 |0 |MI |Mid Price | +-------+---+----------+------+-------+-----------+ only showing top 10 rows Using lambda function¶ In [45]: #using lambda function age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) #apply udf on dataframe df.withColumn("age_group", age_udf(df.age)).show(10,False) +-------+---+----------+------+-------+---------+ |ratings|age|experience|family|mobile |age_group| +-------+---+----------+------+-------+---------+ |3 |32 |9.0 |3 |Vivo |senior | |3 |27 |13.0 |3 |Apple |young | |4 |22 |2.5 |0 |Samsung|young | |4 |37 |16.5 |4 |Apple |senior | |5 |27 |9.0 |1 |MI |young | |4 |27 |9.0 |0 |Oppo |young | |5 |37 |23.0 |5 |Vivo |senior | |5 |37 |23.0 |5 |Samsung|senior | |3 |22 |2.5 |0 |Apple |young | |3 |27 |6.0 |0 |MI |young | +-------+---+----------+------+-------+---------+ only showing top 10 rows Pandas UDF (Vectorized UDF)¶ In [46]: #pandas udf from pyspark.sql.functions import pandas_udf, PandasUDFType In [47]: #create python function def remaining_yrs(age): yrs_left=100-age return yrs_left In [48]: from pyspark.sql.types import IntegerType #create udf using python function length_udf = pandas_udf(remaining_yrs, IntegerType()) #apply pandas udf on dataframe df.withColumn("yrs_left", length_udf(df['age'])).show(10,False) +-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left| +-------+---+----------+------+-------+--------+ |3 |32 |9.0 |3 |Vivo |68 | |3 |27 |13.0 |3 |Apple |73 | |4 |22 |2.5 |0 |Samsung|78 | |4 |37 |16.5 |4 |Apple |63 | |5 |27 |9.0 |1 |MI |73 | |4 |27 |9.0 |0 |Oppo |73 | |5 |37 |23.0 |5 |Vivo |63 | |5 |37 |23.0 |5 |Samsung|63 | |3 |22 |2.5 |0 |Apple |78 | |3 |27 |6.0 |0 |MI |73 | +-------+---+----------+------+-------+--------+ only showing top 10 rows In [49]: #udf using two columns def prod(rating,exp): x=rating*exp return x In [50]: #create udf using python function prod_udf = pandas_udf(prod, DoubleType()) #apply pandas udf on multiple columns of dataframe df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False) +-------+---+----------+------+-------+-------+ |ratings|age|experience|family|mobile |product| +-------+---+----------+------+-------+-------+ |3 |32 |9.0 |3 |Vivo |27.0 | |3 |27 |13.0 |3 |Apple |39.0 | |4 |22 |2.5 |0 |Samsung|10.0 | |4 |37 |16.5 |4 |Apple |66.0 | |5 |27 |9.0 |1 |MI |45.0 | |4 |27 |9.0 |0 |Oppo |36.0 | |5 |37 |23.0 |5 |Vivo |115.0 | |5 |37 |23.0 |5 |Samsung|115.0 | |3 |22 |2.5 |0 |Apple |7.5 | |3 |27 |6.0 |0 |MI |18.0 | +-------+---+----------+------+-------+-------+ only showing top 10 rows In [51]: # count the number of rows df.count() Out[51]: 33 Data Cleaning: Working with missing or bad data¶ The data that we have access to in the real world is far from ideal. Datasets almost always comes with some issues. Some of the common issues include missing data (empty cells) and duplicated values. Before we can use any datasets for building machine learning models, we need to clean the data. In the next few cells, we will see how to handle these common data issues using PySpark. In [52]: # Load csv Dataset dfbad=spark.read.csv('bad_data.csv',inferSchema=True,header=True) As you can see below, this dataset that you have just read in from a CSV file is not clean. The most obvious issue is the presence of multiple null values or empty cells. Later, we will also notice that this dataset has duplicated values too. In [53]: dfbad.show(8) +-------+----+----------+------+------+ |ratings| age|experience|family|mobile| +-------+----+----------+------+------+ | null| 32| null| 3| Vivo| | 3|null| 13.0| 3| null| | 4| 22| 2.5| 0| null| | 4|null| 16.5| 4| null| | 5| 27| null| 1| MI| | 4|null| 9.0| 0| Oppo| | 5| 37| 23.0| 5| Vivo| | 5| 37| 23.0| 5| null| +-------+----+----------+------+------+ only showing top 8 rows We will use the na.drop() method to remove the rows that have any empty cells. • Note: If you do not put in any parameters in the na.drop(), the default will be 'any'. In [54]: dfbad.na.drop().show(8) #dfbad.na.drop('any').show() #this is allowed as well +-------+---+----------+------+-------+ |ratings|age|experience|family| mobile| +-------+---+----------+------+-------+ | 5| 37| 23.0| 5| Vivo| | 3| 22| 2.5| 0| Apple| | 3| 27| 6.0| 0| MI| | 5| 27| 6.0| 2|Samsung| | 3| 37| 16.5| 5| Apple| | 5| 27| 6.0| 0| MI| | 4| 22| 6.0| 1| Oppo| | 4| 27| 6.0| 1| Apple| +-------+---+----------+------+-------+ only showing top 8 rows Naturally, the 'any' criterion used in the previous cell may be too lenient. The consequence is that we may accidentally remove more rows than we intended to. In the following cell, we can see an alternative setting 'all' that is more stringent since it insists that every cell of a particular row must be empty before the row is removed. In [55]: dfbad.na.drop('all').show(8) +-------+----+----------+------+------+ |ratings| age|experience|family|mobile| +-------+----+----------+------+------+ | null| 32| null| 3| Vivo| | 3|null| 13.0| 3| null| | 4| 22| 2.5| 0| null| | 4|null| 16.5| 4| null| | 5| 27| null| 1| MI| | 4|null| 9.0| 0| Oppo| | 5| 37| 23.0| 5| Vivo| | 5| 37| 23.0| 5| null| +-------+----+----------+------+------+ only showing top 8 rows Sometimes, we want to only drop rows if there are empty cells for a specific column. In the following cell, we see that rows with empty cells n the mobile column will be dropped. Notice that there are still many null values in the other columns but there are no longer any empty cells in the mobile column. In [56]: dfbad.na.drop(subset="mobile").show(8) +-------+----+----------+------+-------+ |ratings| age|experience|family| mobile| +-------+----+----------+------+-------+ | null| 32| null| 3| Vivo| | 5| 27| null| 1| MI| | 4|null| 9.0| 0| Oppo| | 5| 37| 23.0| 5| Vivo| | 3| 22| 2.5| 0| Apple| | 3| 27| 6.0| 0| MI| | 5| 27| 6.0| 2|Samsung| | 3| 37| 16.5| 5| Apple| +-------+----+----------+------+-------+ only showing top 8 rows Filling empty cells with values¶ The alternative approach to handling cells with null or empty values is to fill them with specific values. The most common value to use is the mean of the column that the empty cell is in. We can also fill in empty cells with a friendlier remark such as "Not available". In [57]: from pyspark.sql.functions import mean mean = dfbad.select(mean(dfbad['experience'])).collect() print(type(mean)) print(mean) print(type(mean[0])) print(mean[0]) print(mean[0][0]) print(type(mean[0][0]))
[Row(avg(experience)=10.387096774193548)]

Row(avg(experience)=10.387096774193548)
10.387096774193548

In [58]:
from pyspark.sql.functions import mean

mean = dfbad.select(mean(dfbad[‘experience’])).collect()
print(mean[0][0])

mean_exp = mean[0][0]
dfbad.na.fill(mean_exp,[‘experience’]).show(8)

10.387096774193548
+——-+—-+——————+——+——+
|ratings| age| experience|family|mobile|
+——-+—-+——————+——+——+
| null| 32|10.387096774193548| 3| Vivo|
| 3|null| 13.0| 3| null|
| 4| 22| 2.5| 0| null|
| 4|null| 16.5| 4| null|
| 5| 27|10.387096774193548| 1| MI|
| 4|null| 9.0| 0| Oppo|
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 23.0| 5| null|
+——-+—-+——————+——+——+
only showing top 8 rows

In [59]:
dfbad.na.fill(‘Not available’, subset=[‘mobile’]).show(8)

+——-+—-+———-+——+————-+
|ratings| age|experience|family| mobile|
+——-+—-+———-+——+————-+
| null| 32| null| 3| Vivo|
| 3|null| 13.0| 3|Not available|
| 4| 22| 2.5| 0|Not available|
| 4|null| 16.5| 4|Not available|
| 5| 27| null| 1| MI|
| 4|null| 9.0| 0| Oppo|
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 23.0| 5|Not available|
+——-+—-+———-+——+————-+
only showing top 8 rows

Dealing with duplicated rows¶
In [60]:
#validate new count
dfbad.count()
Out[60]:
33
In [61]:
#show every row
dfbad.show(dfbad.count())

+——-+—-+———-+——+——-+
|ratings| age|experience|family| mobile|
+——-+—-+———-+——+——-+
| null| 32| null| 3| Vivo|
| 3|null| 13.0| 3| null|
| 4| 22| 2.5| 0| null|
| 4|null| 16.5| 4| null|
| 5| 27| null| 1| MI|
| 4|null| 9.0| 0| Oppo|
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 23.0| 5| null|
| 3| 22| 2.5| 0| Apple|
| 3| 27| 6.0| 0| MI|
| 2| 27| 6.0| 2| null|
| 5| 27| 6.0| 2|Samsung|
| 3| 37| 16.5| 5| Apple|
| 5| 27| 6.0| 0| MI|
| 4| 22| 6.0| 1| Oppo|
| 4|null| 9.0| 2|Samsung|
| 4| 27| 6.0| 1| Apple|
| 1| 37| 23.0| 5| MI|
| 2| 42| 23.0| 2| Oppo|
| 4| 37| 6.0| 0| Vivo|
| 5| 22| 2.5| 0|Samsung|
| 3| 37| 16.5| 5| Apple|
| 3| 42| 23.0| 5| MI|
| 2| 27| 9.0| 2|Samsung|
| 4| 27| 6.0| 1| Apple|
| 5| 27| 2.5| 0| MI|
| 2| 27| 6.0| 2| Oppo|
| 5| 37| 13.0| 1| Vivo|
| 2| 32| 16.5| 2| Oppo|
| 3| 27| 6.0| 0| MI|
| 3| 27| 6.0| 0| MI|
| 4| 22| 6.0| 1| Oppo|
| 4| 37| 6.0| 0| Vivo|
+——-+—-+———-+——+——-+

In the following cell, we demonstrate how to drop rows that have replicated values in a specific column. The resulting DataFrame will show only a single row for each unique value of the mobile column, the other rows have been removed.
In [62]:
#drop rows that have duplicate values in a specific column
dfbad.dropDuplicates([“mobile”]).show()

+——-+—-+———-+——+——-+
|ratings| age|experience|family| mobile|
+——-+—-+———-+——+——-+
| 3|null| 13.0| 3| null|
| 5| 27| null| 1| MI|
| 4|null| 9.0| 0| Oppo|
| 5| 27| 6.0| 2|Samsung|
| null| 32| null| 3| Vivo|
| 3| 22| 2.5| 0| Apple|
+——-+—-+———-+——+——-+

In the following cell, we will remove rows that are exact duplicates of any other rows. Exact duplicates are rows that have exactly the same contents compared to any other rows in the same DataFrame.
• Note: we need to assign the contents to a named DataFrame in order to preserve our changes. Otherwise, the dropping and filtering that we have done above will not be stored and will only be transient.
In [63]:
#drop rows that have duplicate values in all columns, that means exact duplicates.
dfgood=dfbad.dropDuplicates()
In [64]:
#new count removing exact duplicates
dfgood.count()
Out[64]:
27

Next, we proceed to complete the final steps of our data cleaning process. We will remove all the rows that have empty cells in any of the columns. Thereafter, we will re-assign the results back to the dfgood DataFrame.
In [65]:
dfgood=dfbad.na.drop()
dfgood.count()
Out[65]:
24

It is often good to have a quick visual inspection of the resulting DataFrame in order to confirm that everything went smoothly as we intended.
In [66]:
#show every row
dfgood.show(dfgood.count())

+——-+—+———-+——+——-+
|ratings|age|experience|family| mobile|
+——-+—+———-+——+——-+
| 5| 37| 23.0| 5| Vivo|
| 3| 22| 2.5| 0| Apple|
| 3| 27| 6.0| 0| MI|
| 5| 27| 6.0| 2|Samsung|
| 3| 37| 16.5| 5| Apple|
| 5| 27| 6.0| 0| MI|
| 4| 22| 6.0| 1| Oppo|
| 4| 27| 6.0| 1| Apple|
| 1| 37| 23.0| 5| MI|
| 2| 42| 23.0| 2| Oppo|
| 4| 37| 6.0| 0| Vivo|
| 5| 22| 2.5| 0|Samsung|
| 3| 37| 16.5| 5| Apple|
| 3| 42| 23.0| 5| MI|
| 2| 27| 9.0| 2|Samsung|
| 4| 27| 6.0| 1| Apple|
| 5| 27| 2.5| 0| MI|
| 2| 27| 6.0| 2| Oppo|
| 5| 37| 13.0| 1| Vivo|
| 2| 32| 16.5| 2| Oppo|
| 3| 27| 6.0| 0| MI|
| 3| 27| 6.0| 0| MI|
| 4| 22| 6.0| 1| Oppo|
| 4| 37| 6.0| 0| Vivo|
+——-+—+———-+——+——-+

Dropping an entire column¶
Due to various reasons, we may need to remove or drop an entire column. To do that, we use the drop() method.
In the following cell, we demonstrate how to remove an entire column named mobile and thereafter stored the new contents in a new df_new DataFrame.
There may be various reasons why we need to drop an entire column. The most obvious reasons include exact column duplicates, completely empty columns, irrelevant columns.
In [67]:
#drop column of dataframe
df_new=df.drop(‘mobile’)
In [68]:
df_new.show(10)

+——-+—+———-+——+
|ratings|age|experience|family|
+——-+—+———-+——+
| 3| 32| 9.0| 3|
| 3| 27| 13.0| 3|
| 4| 22| 2.5| 0|
| 4| 37| 16.5| 4|
| 5| 27| 9.0| 1|
| 4| 27| 9.0| 0|
| 5| 37| 23.0| 5|
| 5| 37| 23.0| 5|
| 3| 22| 2.5| 0|
| 3| 27| 6.0| 0|
+——-+—+———-+——+
only showing top 10 rows

Saving files¶
In [69]:
#current working directory
!pwd

/Users/macuser/Downloads

save in csv format¶

Next, we will create read in a CSV file, the columns of data in this file are separated from one another by commas between the columns. This is one of the most common data file formats in use today. The main advantage is that it is a non-proprietory format.
Please note that the reference or path of the sample_data.csv file given in the cell below is pointing to a file that is in the same folder as this notebook file. If your file is in a different folder, you will need to provide the path to the location of your file.
The header parameter has been set to True, that means Spark will treat the first row of data in your file to be the column names.
The entire contents of the CSV file will be assigned to a DataFrame named df.
In [70]:
#target directory
write_uri=’./df_csv’
In [71]:
#save the dataframe as single csv
df.coalesce(1).write.format(“csv”).option(“header”,”true”).save(write_uri)