FIT5148 – Big data management and processing¶
Activity: Parallel Join¶
In this activity, we will learn and build different parallel algorithms for join queries. This practice will help you understand how parallel processing of a join operation can significantly improve the serial join operation which is considered to be one of the most expensive operations in relational database processing.
Instructions:
• You will be using Python 3.
• Read the code base and comments carefully
• After understanding the provided function, run the cell right below it to check if the result is correct.
• Read carefully all the Exercise tasks below in each subheading. There are some code blocks that you need to complete yourself.
After this assignment you will:
• Be able to build serial join algorithms
• Be able to build parallel join algorithms
• Be able to compare the performance of the serial and parallel join algorithms
Let’s get started!
What you need to remember:
• Run your cells using SHIFT+ENTER (or “Run cell”)
Dataset¶
In this activity, we use the following two tables R and S to explain and practice different parallel join algorithms.
In [0]:
# R consists of 15 pairs, each comprising two attributes (nominal (value) and numeric (key))
R = [(‘Adele’,8),(‘Bob’,22),(‘Clement’,16),(‘Dave’,23),(‘Ed’,11),
(‘Fung’,25),(‘Goel’,3),(‘Harry’,17),(‘Irene’,14),(‘Joanna’,2),
(‘Kelly’,6),(‘Lim’,20),(‘Meng’,1),(‘Noor’,5),(‘Omar’,19)]
# S consists of 8 pairs, each comprising two attributes (nominal (value) and numeric (key))
S = [(‘Arts’,8),(‘Business’,15),(‘CompSc’,2),(‘Dance’,12),(‘Engineering’,7),
(‘Finance’,21),(‘Geology’,10),(‘Health’,11),(‘IT’,18)]
1. Serial Join Algorithms¶
Let’s first understand serial join algorithms – join algorithms implemented in nonparallel machines. Parallel join algorithms adopt a data partitioning parallelism approach, whereby parallelism is achieved through data partitioning. That is, a join operation implemented on each processor would employ a serial join algorithm. In Section 2, we will learn more about parallel join algorithms.
In this activity, we will consider the following three serial join algorithms:
• Nested-loop join algorithm,
• Sort-merge join algorithm,
• Hash-based join algorithm
1.1 Nested-Loop Join Algorithm¶
Nested-loop join is the simplest form of join algorithm. For each record of the first table, it goes through all records of the second table. This is repeated for all records of the first table. It is called a nested loop because it consists of two levels of loops: inner loop (looping for the second table) and outer loop (looping for the first table).
Exercise: Implement the nested-loop join algorithm using the join attribute – the numeric attribute in two tables R and S. Then, discuss the time complexity of this algorithm as well as its pros and cons.
In [0]:
def NL_join(T1, T2):
“””
Perform the nested-loop join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
Return:
result — the joined table
“””
result = []
### START CODE HERE ###
# For each record of R
for tr1 in T1:
# For each record of S
for tr2 in T2:
#If matched Then
if (tr1[1] == tr2[1]):
# Store the joined records into the result list
result.append({“, “.join([tr1[0], str(tr1[1]), tr2[0]])})
### END CODE HERE ###
return result
In [0]:
NL_join(R, S)
Expected Output:
[{‘Adele, 8, Arts’}, {‘Ed, 11, Health’}, {‘Joanna, 2, CompSc’}]
1.2 Sort-Merge Join Algorithm¶
Sort-merge join is based on sorting and merging operations. The first step of joining is to sort the two tables based on the joining attribute in an ascending order, and the second step is merging the two sorted tables. If the value of the joining attribute in R is smaller than that in S, it skips to the next value of the joining attribute in R. On the other hand, if the value of the joining attribute in R is greater than that in S, it skips to the next value of the joining attribute in S. When the two values match, the two corresponding records are concatenated and placed into the query result.
Exercise: Complete the sort-merge join algorithm based on the above definition by implementing the following code block between ‘### START CODE HERE ###’ and ‘### END CODE HERE ###’. Discuss the time complexity of this algorithm in terms if its efficiency. Also, compare it with the nest-loop join algorithm.
In [0]:
def SM_join(T1, T2):
“””
Perform the sort-merge join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
Return:
result — the joined table
“””
result = []
# sort T1 based on the join attribute
s_T1 = list(T1)
s_T1 = sorted(s_T1, key=lambda s_T1: s_T1[1])
# sort T2 based on the join attribute
s_T2 = list(T2)
s_T2 = sorted(s_T2, key=lambda s_T2: s_T2[1])
### START CODE HERE ###
i = j = 0
while True:
r = s_T1[i][1]
s = s_T2[j][1]
# If join attribute s_T1(i) < join attribute s_T2(i) Then, i++
if r < s:
i += 1
# else
else:
# if join attribute s_T1(1) > join attribute s_T2(1) Then, j++
if r > s:
j += 1
# else
else:
# put records s_T1(i) and s_T2(j) into the result and i++
result.append({“, “.join([s_T1[i][0], str(s_T1[i][1]), s_T2[j][0]])})
i += 1
# j += 1
# if either s_T1(i) or s_T2(j) is EOF Then break
if (i == len(s_T1)) or (j == len(s_T2)):
break
### END CODE HERE ###
return result
In [0]:
# print a hash value
SM_join(R, S)
Expected Output:
[{‘Joanna, 2, CompSc’}, {‘Adele, 8, Arts’}, {‘Ed, 11, Health’}]
1.3 Hash-Based Join Algorithm¶
A hash-based join is basically made up of two processes: hashing and probing. A hash table is created by hashing all records of the first table using a particular hash function. Records from the second table are also hashed with the same hash function and probed. If any match is found, the two records are concatenated and placed in the query result.
A decision must be made about which table is to be hashed and which table is to be probed. Since a hash table has to be created, it would be better to choose the smaller table for hashing and the larger table for probing.
Exercise: Complete the hash-based join algorithm by implementing the following code block between ‘### START CODE HERE ###’ and ‘### END CODE HERE ###’. Discuss the time complexity of this algorithm in terms if its efficiency. Also, compare it with the above two join algorithms.
In [0]:
def H(r):
“””
We define a hash function ‘H’ that is used in the hashing process works
by summing the first and second digits of the hashed attribute, which
in this case is the join attribute.
Arguments:
r — a record where hashing will be applied on its join attribute
Return:
result — the hash index of the record r
“””
# Convert the value of the join attribute into the digits
digits = [int(d) for d in str(r[1])]
# Calulate the sum of elemenets in the digits
return sum(digits)
In [0]:
H(R[1])
Expected Output:
4
In [0]:
def HB_join(T1, T2):
“””
Perform the hash-based join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
Return:
result — the joined table
“””
result = []
### START CODE HERE ###
dic = {} # We will use a dictionary
# For each record in table T2
for s in T2:
# Hash the record based on join attribute value using hash function H into hash table
s_key = H(s)
if s_key in dic:
dic[s_key].add(s) # If there is an entry
else:
dic[s_key] = {s}
# For each record in table T1 (probing)
for r in T1:
# Hash the record based on join attribute value using H
r_key = H(r)
# If an index entry is found Then
if r_key in dic:
# Compare each record on this index entry with the record of table T1
for item in dic[r_key]:
if item[1] == r[1]:
# Put the rsult
result.append({“, “.join([r[0], str(r[1]), item[0]])})
### END CODE HERE ###
return result
In [0]:
# print the partitioned result
HB_join(R,S)
Expected Output:
[{‘Adele, 8, Arts’}, {‘Ed, 11, Health’}, {‘Joanna, 2, CompSc’}]
2 Parallel Join Algorithms¶
Parallelism of join queries is achieved through data parallelism, whereby the same task is applied to different parts of the data. That is, after data partitioning has been completed, each processor will have its own data to work with. Thus, each processor will apply any serial join algorithm. Once this is carried out in each processor, the final results of the join operation are consolidated from the results obtained from different processors.
We now look into the following two parallel join algorithms:
• Divide and Broadcast-Based Parallel Join Algorithms
• Disjoint Partitioning-Based Parallel Join Algorithms
2.1 Divide and Broadcast-Based Parallel Join Algorithms¶
These algorithms consist of two stages: (1) data partitioning using the divide and broadcast method and (2) a local join.
The divide and broadcast data partitioning method consists of dividing one table into multiple disjoint partitions, where each partition is allocated a processor, and broadcasts the other table to all available processors. Dividing one table may be done simply by using equal division, so that each partition will have the same size, whereas broadcasting is actually replicating the content of the second table to all processors. Thus it is preferable for the smaller table to be broadcast and the larger table to be divided.
Exercise: Understand how a divide and broadcast-based parallel join algorithms works given the tables R and S above. We assume that the hash-based join algorithm (i.e. HB_join(.)) are used (see the above) and the round-robin data partitioning function that designed for “Parallel Search” acitivity (i.e. see below: rr_partition(.)) is used to implement this parallel join algorithm. Also, we assume that we use a shared-memory architecture is used, so there is no replication of the broadcast table S.
In [0]:
# Round-robin data partitionining function
def rr_partition(data, n):
“””
Perform data partitioning on data
Arguments:
data — an input dataset which is a list
n — the number of processors
Return:
result — the paritioned subsets of D
“””
result = []
for i in range(n):
result.append([])
### START CODE HERE ###
# Calculate the number of the elements to be allocated to each bin
n_bin = len(data)/n
# For each bin, perform the following
for index, element in enumerate(data):
# Calculate the index of the bin that the current data point will be assigned
index_bin = (int) (index % n)
#print(str(index) + “:” + str(element))
result[index_bin].append(element)
### END CODE HERE ###
return result
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def DDP_join(T1, T2, n_processor):
“””
Perform a divide and broadcast-based parallel join algorithms.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
n_processor — the number of parallel processors
Return:
result — the joined table
“””
results = []
### START CODE HERE ###
# Partition T1 into sub-tables using rr_partition().
# The number of the sub-tables must be the equal to the n_processor
T1_subsets = rr_partition(T1, n_processor)
# Pool: a Python method enabling parallel processing.
pool = mp.Pool(processes = n_processor)
midResults = []
for t1 in T1_subsets:
# Apply a join on each processor
# Note that as we assume a shared-memory architecture, no replication
# of the broadcast table (in this case: table T2 (smaller table) occurs.
output = pool.apply_async(HB_join, [t1, T2])
midResults.append(output)
for result in midResults:
results.append(result.get())
### END CODE HERE ###
return results
In [0]:
n_processor = 3
DDP_join(R, S, n_processor)
1.2 Disjoint Partitioning-Based Parallel Join Algorithms¶
These algorithms also consist of two stages: a data partitioning stage using a disjoint partitioning and a local join. For the data partitioning, a disjoint partitioning, such as range partitioning or hash partitioning, may be used.
Exercise: Complete the following a disjoint partitioning-based parallel join algorithm.
Use all the three serial join algorithms above, and see whether the joined results are the same or not:
• Nested-loop join algorithm
• Sort-merge join algorithm
• Hash-based join algorithm
As a data partitioning method, use the range partitioninig method that we provided for “Parallel Search” acitivity (i.e. range_partition(.)). Assume that we have 3 parallel processors, processor 1 will get records with join attribute value between 1 and 9, processor 2 between 10 and 19, and processor 3 between 20 and 29. Note that you need to modify this function in the way that it partitions the table on the join attribute.
Note that both tables R and S need to be partitioned based on the join attribute with the same range partitioning function.
In [0]:
# Range data partitionining function (Need to modify as instructed above)
def range_partition(data, range_indices):
“””
Perform range data partitioning on data based on the join attribute
Arguments:
data — an input dataset which is a list
range_indices — the index list of ranges to be s:plit
Return:
result — the paritioned subsets of D
“””
result = []
### START CODE HERE ###
# First, we sort the dataset according their values
new_data = list(data)
new_data.sort(key = lambda x: x[1])
# Calculate the number of bins
n_bin = len(range_indices)
# For each bin, perform the following
for i in range(n_bin):
# Find elements to be belonging to each range
s = [x for x in new_data if x[1] < range_indices[i]]
# Add the partitioned list to the result
result.append(s)
# Find the last element in the previous partition
last_element = s[len(s)-1]
# Find the index of of the last element
last = new_data.index(last_element)
# Remove the partitioned list from the dataset
new_data = new_data[int(last)+1:]
# Append the last remaining data list
result.append([x for x in new_data if x[1] >= range_indices[n_bin-1]])
### END CODE HERE ###
return result
In [0]:
range_partition(R, [10, 20])
Expected Output:
[[(‘Meng’, 1),(‘Joanna’, 2),(‘Goel’, 3),(‘Noor’, 5),(‘Kelly’, 6),(‘Adele’, 8)],
[(‘Ed’, 11), (‘Irene’, 14), (‘Clement’, 16), (‘Harry’, 17), (‘Omar’, 19)],
[(‘Lim’, 20), (‘Bob’, 22), (‘Dave’, 23), (‘Fung’, 25)]]
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def DPBP_join(T1, T2, n_processor):
“””
Perform a disjoint partitioning-based parallel join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
n_processor — the number of parallel processors
Return:
result — the joined table
“””
results = []
### START CODE HERE ###
# Partition T1 & T2 into sub-tables using range_partition().
# The number of the sub-tables must be the equal to the n_processor
T1_subsets = range_partition(T1, [10, 20])
T2_subsets = range_partition(T2, [10, 20])
# Pool: a Python method enabling parallel processing.
pool = mp.Pool(processes = n_processor)
midResults = []
for i in range(len(T1_subsets)):
# Apply a join on each processor
output = pool.apply_async(HB_join, [T1_subsets[i], T2_subsets[i]])
midResults.append(output)
for result in midResults:
results.append(result.get())
#results.append(pool.apply(HB_join, [T1_subsets[i], T2_subsets[i]]))
### END CODE HERE ###
return results
In [0]:
n_processor = 3
DPBP_join(R, S, n_processor)
Congratulations on finishing this activity!
Wrap up what we’ve learned:
• The join operation is one of the most expensive operations in relational query processing, and hence the parallelizing join operation brings significant benefits.
• Parallel join algorithms are generally formed in two stages: data partitioning and local join.
• Data partitioning is performed by the two operations – divide and broadcast
• We are now able to build different local join operations: nested-loop join, sort-merge join, and hash join.
• We now now able to build different parallel join algorithms using different data partitioning algorithms and serial join operations.
In [0]:
In [0]: