FIT5148 – Big data management and processing¶
Activity: Parallel Sort and GroupBy¶
This activity consists of two parts. In the first part, we will learn and build different serial/parallel sorting algorithms where the volume of data to be sorted is large and stored in a database. In the second part, we focus on implementing serial/parallel GroupBy queries. GroupBy queries involving aggregates are very common in database processing, especially in Online Analytical Processing (OLAP), and data warehouse.
This activity will help you to learn how parallel sorting and GroupBy operations can be implemented for parallel database systems.
Instructions:
• You will be using Python 3.
• Read the code base and comments carefully
• After understanding the provided function, run the cell right below it to check if the result is correct.
• Read carefully all the Exercise tasks below. There are some code blocks that you need to complete yourself.
After this assignment you will:
• Be able to build serial/parallel sorting algorithms
• Be able to build serial/parallel GroupBy operations
Let’s get started!
What you need to remember:
• Run your cells using SHIFT+ENTER (or “Run cell”)
Dataset¶
In this activity, we use the following dataset R consisting of numbers for simplicity. In the real world, each number indicates a record. R indicates our experimental entire record set that contains unordered numbers ranging from 1 to 16.
In [0]:
R = [8, 12, 16, 4, 11, 15, 3, 7, 14, 2, 6, 10, 1, 5, 9, 13]
Quicksort Algorithm¶
Throughout this activity, as an internal sorting method, we will use the quicksort method. In internal sorting, sorting takes place totally within main memory. The data to be sorted is assumed to be small and fits into main memory. This sorting method will be commonly used in the serial/parallel external sorting methods below.
In [0]:
def qsort(arr):
“””
Quicksort a list
Arguments:
arr — the input list to be sorted
Return:
result — the sorted arr
“””
if len(arr) <= 1:
return arr
else:
#take the first element as the pivot
pivot = arr[0]
left_arr = [x for x in arr[1:] if x < pivot]
right_arr = [x for x in arr[1:] if x >= pivot]
# uncomment this to see what to print
# print(“Left:” + str(left_arr)+” Pivot : “+ str(pivot)+” Right: ” + str(right_arr))
value = qsort(left_arr) + [pivot] + qsort(right_arr)
return value
In [0]:
qsort(R)
Out[0]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Expected Output:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
1. Serial External Sorting based on Sort-Merge¶
The serial sorting method we consider is serial external sorting which is external sorting in a uniprocessor environment. The most common serial external sorting algorithm is based on sort-merge. The underlying idea is that we (1) break the given record set into unsorted sub-record sets, (2) sort the sub-record sets, and (3) merge them into larger and larger sorted sub-record sets until the entire record set is sorted. In the real-word, each sub-record set is replaced by a file.
Note¶
It is important to determine the size of each sub-record set to be sorted. Each sub-record set must be small enough to fit into the main memory. The size of these sub-record sets is determined by the buffer size in main memory, which is to be used for sorting each sub-record set internally.
Exercise: Understand and run the following serial external sorting algorithm. Then, discuss the time complexity of this algorithm as well as its pros and cons.
In [0]:
# Let’s first look at ‘k-way merging algorithm’ that will be used
# to merge sub-record sets in our external sorting algorithm.
import sys
# Find the smallest record
def find_min(records):
“””
Find the smallest record
Arguments:
records — the input record set
Return:
result — the smallest record’s index
“””
m = records[0]
index = 0
for i in range(len(records)):
if(records[i] < m):
index = i
m = records[i]
return index
def k_way_merge(record_sets):
"""
K-way merging algorithm
Arguments:
record_sets -- the set of mulitple sorted sub-record sets
Return:
result -- the sorted and merged record set
"""
# indexes will keep the indexes of sorted records in the given buffers
indexes = []
for x in record_sets:
indexes.append(0) # initialisation with 0
# final result will be stored in this variable
result = []
while(True):
merged_result = [] # the merging unit (i.e. # of the given buffers)
# This loop gets the current position of every buffer
for i in range(len(record_sets)):
if(indexes[i] >= len(record_sets[i])):
merged_result.append(sys.maxsize)
else:
merged_result.append(record_sets[i][indexes[i]])
# find the smallest record
smallest = find_min(merged_result)
# if we only have sys.maxsize on the tuple, we reached the end of every record set
if(merged_result[smallest] == sys.maxsize):
break
# This record is the next on the merged list
result.append(record_sets[smallest][indexes[smallest]])
indexes[smallest] +=1
return result
In [0]:
# Test k-way merging method
buffers = [[1, 2, 3, 4, 8, 13], [5, 6, 7, 11, 12], [9, 10, 14, 15, 16]]
result = k_way_merge(buffers)
print(result)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Expected Output:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
In [0]:
def serial_sorting(dataset, buffer_size):
“””
Perform a serial external sorting method based on sort-merge
The buffer size determines the size of eac sub-record set
Arguments:
dataset — the entire record set to be sorted
buffer_size — the buffer size determining the size of each sub-record set
Return:
result — the sorted record set
“””
if (buffer_size <= 2):
print("Error: buffer size should be greater than 2")
return
result = []
### START CODE HERE ###
# --- Sort Phase ---
sorted_set = []
# Read buffer_size pages at a time into memory and
# sort them, and write out a sub-record set (i.e. variable: subset)
start_pos = 0
N = len(dataset)
while True:
if ((N - start_pos) > buffer_size):
# read B-records from the input, where B = buffer_size
subset = dataset[start_pos:start_pos + buffer_size]
# sort the subset (using qucksort defined above)
sorted_subset = qsort(subset)
sorted_set.append(sorted_subset)
start_pos += buffer_size
else:
# read the last B-records from the input, where B is less than buffer_size
subset = dataset[start_pos:]
# sort the subset (using qucksort defined above)
sorted_subset = qsort(subset)
sorted_set.append(sorted_subset)
break
# — Merge Phase —
merge_buffer_size = buffer_size – 1
dataset = sorted_set
while True:
merged_set = []
N = len(dataset)
start_pos = 0
while True:
if ((N – start_pos) > merge_buffer_size):
# read C-record sets from the merged record sets, where C = merge_buffer_size
subset = dataset[start_pos:start_pos + merge_buffer_size]
merged_set.append(k_way_merge(subset)) # merge lists in subset
start_pos += merge_buffer_size
else:
# read C-record sets from the merged sets, where C is less than merge_buffer_size
subset = dataset[start_pos:]
merged_set.append(k_way_merge(subset)) # merge lists in subset
break
dataset = merged_set
if (len(dataset) <= 1): # if the size of merged record set is 1, then stop
result = merged_set
break
### END CODE HERE ###
return result
In [0]:
result = serial_sorting(R, 4)
print("final sorting result:" + str(result))
final sorting result:[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]]
Expected Output:
final sorting result:[[1, 2, 3, 5, 6, 4, 7, 9, 10, 8, 11, 13, 14, 12, 15, 16]]
2. Algorithms for Parallel External Sort¶
Having practiced how serial external sorting works, let's move onto building parallel sorting methods. In the lectures, you have learned a number of different parallel sorting methods. For this activity, we focus on two widely-used parallel external sorting methods: (1) parallel merge-all sort, and (2) parallel binary-merge sort.
2.1 Parallel Merge-All Sort¶
The Parallel merge-all sort method is a traditional approach and is composed of two phases: (1) local sort and (2) final merge. The first phase is carried out independently in each processor. Local sorting in each processor is performed as per a normal serial external sorting mechanism. In the final merge phase, the results from the local sort phase are merged. The final merge phase is carried out by one processor, namely, the host using k-way mergin (see function k_way_merge() above)
Exercise: Complete the parallel merge-all sort algorithm by implementing the following code block between '### START CODE HERE ###' and '### END CODE HERE ###'. Assume that we use the serial sorting method defined above (see above serial_sorting()) and a data partitioning method, round-robin data partitioning method designed for "Parallel Search" acitivity (i.e. see below: rr_partition()). Further, discuss the pros and cons of this algorithm. Also, compare it with the above serial external sorting algorithm.
Use the round-robin data partitioning method¶
As a pre-requiste process, we first need to partition the given data into a number of subsets according to the number of parallel processors available. As mentioned above, let's assume that we use the round-robin partitioning method. Refer to the "Parallel Search" activity and copy the rr_partition() function below.
In [0]:
# Round-robin data partitionining function
def rr_partition(data, n):
"""
Perform data partitioning on data
Arguments:
data -- an input dataset which is a list
n -- the number of processors
Return:
result -- the paritioned subsets of D
"""
result = []
for i in range(n):
result.append([])
### START CODE HERE ###
# Calculate the number of the elements to be allocated to each bin
n_bin = len(data)/n
# For each bin, perform the following
for index, element in enumerate(data):
# Calculate the index of the bin that the current data point will be assigned
index_bin = (int) (index % n)
result[index_bin].append(element)
### END CODE HERE ###
return result
In [0]:
# Test the round-robin partitioning function
result = rr_partition(R, 4)
print(result)
[[8, 11, 14, 1], [12, 15, 2, 5], [16, 3, 6, 9], [4, 7, 10, 13]]
Expected Output:
[[8, 11, 14, 1], [12, 15, 2, 5], [16, 3, 6, 9], [4, 7, 10, 13]]
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def parallel_merge_all_sorting(dataset, n_processor, buffer_size):
"""
Perform a parallel merge-all sorting method
Arguments:
dataset -- entire record set to be sorted
n_processor -- number of parallel processors
buffer_size -- buffer size determining the size of each sub-record set
Return:
result -- the merged record set
"""
if (buffer_size <= 2):
print("Error: buffer size should be greater than 2")
return
result = []
### START CODE HERE ###
# Pre-requisite: Perform data partitioning using round-robin partitioning
subsets = rr_partition(dataset, n_processor)
# Pool: a Python method enabling parallel processing.
pool = mp.Pool(processes = n_processor)
# ----- Sort phase -----
sorted_set = []
for s in subsets:
# call the serial_sorting method above
sorted_set.append(*pool.apply_async(serial_sorting, [s, buffer_size]).get())
pool.close()
# ---- Final merge phase ----
print("sorted entire set:" + str(sorted_set))
result = k_way_merge(sorted_set)
### END CODE HERE ###
return result
In [0]:
result = parallel_merge_all_sorting(R, 4, 4)
print("final result:" + str(result))
sorted entire set:[[1, 8, 11, 14], [2, 5, 12, 15], [3, 6, 9, 16], [4, 7, 10, 13]]
final result:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Expected Output:
sorted entire set:[[1, 8, 11, 14], [2, 5, 12, 15], [3, 6, 9, 16], [4, 7, 10, 13]]
final result:[1, 2, 3, 4, 5, 6, 7, 9, 10, 13, 8, 11, 12, 14, 15, 16]
2.2 Parallel Binary-Merge Sort¶
The Parallel binary-merge sort method also takes two phases as the parallel merge-all sort: (1) local sort and (2) final merge. The first phase is similar to the parallel merge-all sort. The second phase, the merging phase, is pipelined instead of concentrating on one processor. In this phase, we take the results from two processors and then merging the two in one processor, called binary merging. The result of the merging between two processors is passed on to the next level until one processor (the host) is left.
Exercise: Complete the parallel merge-all sort algorithm by implementing the following code block between '### START CODE HERE ###' and '### END CODE HERE ###'. Assume that we use the same partitioning method as the parallel merge-all sort (i.e. rr_partition()). Further, discuss the pros and cons of this algorithm with comparing with the parallel merge-all sort method.
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def parallel_binary_merge_sorting(dataset, n_processor, buffer_size):
"""
Perform a parallel binary-merge sorting method
Arguments:
dataset -- entire record set to be sorted
n_processor -- number of parallel processors
buffer_size -- buffer size determining the size of each sub-record set
Return:
result -- the merged record set
"""
if (buffer_size <= 2):
print("Error: buffer size should be greater than 2")
return
result = []
### START CODE HERE ###
# Pre-requisite: Perform data partitioning using round-robin partitioning
subsets = rr_partition(dataset, n_processor)
# Pool: a Python method enabling parallel processing.
pool = mp.Pool(processes = n_processor)
# ----- Sort phase -----
sorted_set = []
for s in subsets:
# call the serial_sorting method above
sorted_set.append(*pool.apply_async(serial_sorting, [s, buffer_size]).get())
pool.close()
# ---- Final merge phase ----
print("sorted entire set:" + str(sorted_set))
dataset = sorted_set
while True:
merged_set = []
N = len(dataset)
start_pos = 0
pool = mp.Pool(processes = N//2)
while True:
if ((N - start_pos) > 2):
subset = dataset[start_pos:start_pos + 2]
merged_set.append(pool.apply(k_way_merge, [subset]))
start_pos += 2
else:
subset = dataset[start_pos:]
merged_set.append(pool.apply(k_way_merge, [subset]))
break
pool.close()
dataset = merged_set
if (len(dataset) == 1): # if the size of merged record set is 1, then stop
result = merged_set
break
### END CODE HERE ###
return result
In [0]:
result = parallel_binary_merge_sorting(R, 4, 3)
print(“final result:” + str(result))
sorted entire set:[[1, 8, 11, 14], [2, 5, 12, 15], [3, 6, 9, 16], [4, 7, 10, 13]]
final result:[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]]
Expected Output:
sorted entire set:[[1, 8, 11, 14], [2, 5, 12, 15], [3, 6, 9, 16], [4, 7, 10, 13]]
final result:[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]]
3. Parallel Algorithms for GroupBy Queries¶
Parallel aggregate processing is very similar to parallel sorting. From the lessons we learned from parallel sorting, we focus on one parallel aggregate query algorithms: A traditional merge-all method.
3.1 Merge-All GroupBy Method¶
This method takes two phases: (1) a local aggregation step, and (2) a global aggregation step. In the first step, each processor groups local records according to the designated group-by attribute and performs the aggregate function. The second step is a global aggregation step, in which all the temporary results obtained in each node are passed to the host for consolidation in order to produce the global aggregate values.
The dataset for the GroupBy implementation¶
Let’s assume that we have two different datasets D1 and D2 where each dataset will be handled by a processor in a local aggregation step. In the second global aggregation step, the aggreated results will be handled by the host. Each record is represented by a nominal key and a numeric value. Note that duplicated keys exist in D1 and D2. For our GroupBy implementation, we retrieve pairs of keys and values according to the key attribute.
In [0]:
D1 = [(‘A’, 1), (‘B’, 2), (‘C’, 3), (‘A’, 10), (‘B’, 20), (‘C’, 30)]
D2 = [(‘A’, 4), (‘B’, 5), (‘C’, 6), (‘A’, 40), (‘B’, 50), (‘C’, 60)]
Exercise: Understand and run the fist phase of the parallel merge-all GroupBy method
In [0]:
# The first step in the merge-all groupby method
def local_groupby(dataset):
“””
Perform a local groupby method
Arguments:
dataset — entire record set to be merged
Return:
result — the aggregated record set according to the group_by attribute index
“””
dict = {}
for index, record in enumerate(dataset):
key = record[0]
val = record[1]
if key not in dict:
dict[key] = 0
dict[key] += val
return dict
In [0]:
result = local_groupby (D1)
print(result)
result = local_groupby (D2)
print(result)
{‘A’: 11, ‘B’: 22, ‘C’: 33}
{‘A’: 44, ‘B’: 55, ‘C’: 66}
Expected Output:
{‘A’: 11, ‘B’: 22, ‘C’: 33}
{‘A’: 44, ‘B’: 55, ‘C’: 66}
Exercise: Complete the parallel merge-all groupby algorithm by implementing the following code block between ‘### START CODE HERE ###’ and ‘### END CODE HERE ###’. You need to use the local aggregation method defined above (i.e. local_groupby()).
In [0]:
import multiprocessing as mp
def parallel_merge_all_groupby(dataset):
“””
Perform a parallel merge_all groupby method
Arguments:
dataset — entire record set to be merged
Return:
result — the aggregated record dictionary according to the group_by attribute index
“””
result = {}
### START CODE HERE ###
# Define the number of parallel processors: the number of sub-datasets.
n_processor = len(dataset)
# Pool: a Python method enabling parallel processing.
pool = mp.Pool(processes = n_processor)
# —– Local aggregation step —–
# Implement here
local_result = []
for s in dataset:
# call the local aggregation method
local_result.append(pool.apply(local_groupby, [s]))
pool.close()
# —- Global aggregation step —-
# Let’s assume that the global operator is sum.
# Implement here
for r in local_result:
for key, val in r.items():
if key not in result:
result[key] = 0
result[key] += val
### END CODE HERE ###
return result
In [0]:
E = [D1, D2]
result = parallel_merge_all_groupby (E)
print(result)
{‘A’: 55, ‘B’: 77, ‘C’: 99}
Expected Output:
{‘A’: 55, ‘B’: 77, ‘C’: 99}
Congratulations on finishing this activity!
Wrap up what we’ve learned:
• Internal sorting takes place totally within main memory. The data to be sorted is assumed to be small and fits into main memory. External sorting on the other hand is where the volume of data to be sorted is large and resides in secondary memory. Thus external sorting is also known as file sorting.
• We practiced that how serial external sorting can be implemented using the k-way merge operation.
• We are now able to build parallel external sorting methods using the serial external sorting methods: (1) parallel merge-all sort, and (2) parallel binary-merge sort
• We now now able to build a parallel groubby method consisting of two phases: (1) a local aggregation step, and (2) a global aggregation step.
In [0]: