FIT5148 – Distributed Databases and Big Data¶
Take Home Test – Solution Workbook¶
This test consists of three questions total worth 5% of the final marks. The first question is related to Parallel Search Algorithms (1 Marks), the second question is related to Parallel Join Algorithms (2 Marks) and the third question is realted to Parallel Sort and GroupBy Algorithms (2 Marks).
Instructions:
• You will be using Python 3.
• Read the instructions, code base and comments carefully.
• There are code blocks that you need to complete yourself as a part of test.
• **Comment each line of code properly such that the tutor can easily understand what you are trying to do in the code.**
Your Details:
• Name:
• StudentID:
• Email:
Let’s get started!
Dataset¶
For this test, we will use the following two tables R and S to write the solutions to three parallel algorithms.
In [0]:
# R consists of 15 pairs, each comprising two attributes (nominal and numeric)
R = [(‘Adele’,8),(‘Bob’,22),(‘Clement’,16),(‘Dave’,23),(‘Ed’,11),
(‘Fung’,25),(‘Goel’,3),(‘Harry’,17),(‘Irene’,14),(‘Joanna’,2),
(‘Kelly’,6),(‘Lim’,20),(‘Meng’,1),(‘Noor’,5),(‘Omar’,19)]
# S consists of 8 pairs, each comprising two attributes (nominal and numeric)
S = [(‘Arts’,8),(‘Business’,15),(‘CompSc’,2),(‘Dance’,12),(‘Engineering’,7),
(‘Finance’,21),(‘Geology’,10),(‘Health’,11),(‘IT’,18)]
1. Parallel Searching Algorithm¶
In this task, you will build a parallel search algorithm for range selection (continuous) for a given query. You will implement one particular search algorithm which is instructed below.
Implement a parallel search algorithm that uses the linear search algorithm (i.e. linear_search()) and is able to work with the hash partitioning method (i.e.h_partition()). Complete the code block between “### START CODE HERE ###” and “### END CODE HERE ###”.
In [0]:
# Linear search function
def linear_search(data, key):
“””
Perform linear search on data for the given key
Arguments:
data — an input dataset which is a list or a numpy array
key — an query record
Return:
result — the position of searched record
“””
matched_records = []
### START CODE HERE ###
### END CODE HERE ###
return matched_records
In [0]:
# Define a simple hash function.
def s_hash(x, n):
“””
Define a simple hash function for demonstration
Arguments:
x — an input record
n — the number of processors
Return:
result — the hash value of x
“””
result = x%n
return result
In [0]:
# Hash data partitionining function.
# We will use the “s_hash” function defined above to realise this partitioning
def h_partition(data, n):
“””
Perform hash data partitioning on data
Arguments:
data — an input dataset which is a list
n — the number of processors
Return:
result — the paritioned subsets of D
“””
partitions = {}
### START CODE HERE ###
### END CODE HERE ###
return partitions
In [0]:
from multiprocessing import Pool
# Parallel searching algorithm for range selection
def parallel_search_range(data, query_range, n_processor):
“””
Perform parallel search for range selection on data for the given key
Arguments:
data — the input dataset which is a list
query_range — a query record in the form of a range (e.g. [30, 50])
n_processor — the number of parallel processors
Return:
results — the matched record information
“””
results = []
pool = Pool(processes=n_processor)
### START CODE HERE ###
# Perform data partitioning first
### END CODE HERE ###
return results
In [0]:
n_processor = 3
# Range partition, linear_search
results = parallel_search_range(R, [5, 20], n_processor)
print(results)
2. Parallel Join Algorithm¶
In this task, you will implement a disjoint-partitioning based parallel join algorithm. This algorithm consist of two stages: a data partitioning stage using a disjoint partitioning and a local join.
As a data partitioning method, use the range partitioninig method (i.e. range_partition( )). Assume that we have 3 parallel processors, processor 1 will get records with join attribute value between 1 and 9, processor 2 between 10 and 19, and processor 3 between 20 and 29. Note that both tables R and S need to be partitioned based on the join attribute with the same range partitioning function.
As a joining technique, use the hash based join algorithm (i.e.HB_join( ) ). Complete the code block between “### START CODE HERE ###” and “### END CODE HERE ###”.
In [0]:
# Range data partitionining function (Need to modify as instructed above)
def range_partition(data, range_indices):
“””
Perform range data partitioning on data based on the join attribute
Arguments:
data — an input dataset which is a list
range_indices — the index list of ranges to be s:plit
Return:
result — the paritioned subsets of D
“””
result = []
### START CODE HERE ###
### END CODE HERE ###
return result
In [0]:
def H(r):
“””
We define a hash function ‘H’ that is used in the hashing process works
by summing the first and second digits of the hashed attribute, which
in this case is the join attribute.
Arguments:
r — a record where hashing will be applied on its join attribute
Return:
result — the hash index of the record r
“””
# Convert the value of the join attribute into the digits
digits = [int(d) for d in str(r[1])]
# Calulate the sum of elemenets in the digits
return sum(digits)
In [0]:
def HB_join(T1, T2):
“””
Perform the hash-based join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
Return:
result — the joined table
“””
result = []
dic = {} # We will use a dictionary
# For each record in table T2
for s in T2:
# Hash the record based on join attribute value using hash function H into hash table
s_key = H(s)
if s_key in dic:
dic[s_key].add(s) # If there is an entry
else:
dic[s_key] = {s}
# For each record in table T1 (probing)
for r in T1:
# Hash the record based on join attribute value using H
r_key = H(r)
# If an index entry is found Then
if r_key in dic:
# Compare each record on this index entry with the record of table T1
for item in dic[r_key]:
if item[1] == r[1]:
# Put the rsult
result.append({“, “.join([r[0], str(r[1]), item[0]])})
return result
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def DPBP_join(T1, T2, n_processor):
“””
Perform a disjoint partitioning-based parallel join algorithm.
The join attribute is the numeric attribute in the input tables T1 & T2
Arguments:
T1 & T2 — Tables to be joined
n_processor — the number of parallel processors
Return:
result — the joined table
“””
results = []
### START CODE HERE ###
# Partition T1 & T2 into sub-tables using range_partition().
# The number of the sub-tables must be the equal to the n_processor
T1_subsets = range_partition(T1, [10, 20])
T2_subsets = range_partition(T2, [10, 20])
### END CODE HERE ###
return results
In [0]:
n_processor = 3
DPBP_join(R, S, n_processor)
3. Parallel Sorting Algorithm¶
In this task, you will implement parallel binary-merge sort method. It has two phases same as the parallel merge-all sort that you learnt in the labs: (1) local sort and (2) final merge. The first phase is similar to the parallel merge-all sort. The second phase, the merging phase, is pipelined instead of concentrating on one processor. In this phase, we take the results from two processors and then merging the two in one processor, called binary merging. The result of the merging between two processors is passed on to the next level until one processor (the host) is left.
Complete the code block between “### START CODE HERE ###” and “### END CODE HERE ###”. Assume that we use the round robin partitioning method (i.e. rr_partition()).
In [0]:
# You will have to edit qsort(arr) to make it work.
def qsort(arr):
“””
Quicksort a list
Arguments:
arr — the input list to be sorted
Return:
result — the sorted arr
“””
if len(arr) <= 1:
return arr
else:
#take the first element as the pivot
pivot = arr[0]
left_arr = [x for x in arr[1:] if x < pivot] # Edit is required here
right_arr = [x for x in arr[1:] if x >= pivot] # Edit is required here
# uncomment this to see what to print
# print(“Left:” + str(left_arr)+” Pivot : “+ str(pivot)+” Right: ” + str(right_arr))
value = qsort(left_arr) + [pivot] + qsort(right_arr)
return value
In [0]:
# You will have to edit find_min(records) and k_way_merge(record_sets) to make it work.
import sys
# Find the smallest record
def find_min(records):
“””
Find the smallest record
Arguments:
records — the input record set
Return:
result — the smallest record’s index
“””
m = records[0]
index = 0
for i in range(len(records)):
if(records[i] < m): # Edit is required here
index = i
m = records[i]
return index
def k_way_merge(record_sets):
"""
K-way merging algorithm
Arguments:
record_sets -- the set of mulitple sorted sub-record sets
Return:
result -- the sorted and merged record set
"""
# indexes will keep the indexes of sorted records in the given buffers
indexes = []
for x in record_sets:
indexes.append(0) # initialisation with 0
# final result will be stored in this variable
result = []
while(True):
merged_result = [] # the merging unit (i.e. # of the given buffers)
# This loop gets the current position of every buffer
for i in range(len(record_sets)):
if(indexes[i] >= len(record_sets[i])):
merged_result.append(sys.maxsize) # Edit is required here
else:
merged_result.append(record_sets[i][indexes[i]])
# find the smallest record
smallest = find_min(merged_result)
# if we only have sys.maxsize on the tuple, we reached the end of every record set
if(merged_result[smallest] == sys.maxsize): # Edit is required here
break
# This record is the next on the merged list
result.append(record_sets[smallest][indexes[smallest]])
indexes[smallest] +=1
return result
In [0]:
def serial_sorting(dataset, buffer_size):
“””
Perform a serial external sorting method based on sort-merge
The buffer size determines the size of eac sub-record set
Arguments:
dataset — the entire record set to be sorted
buffer_size — the buffer size determining the size of each sub-record set
Return:
result — the sorted record set
“””
if (buffer_size <= 2):
print("Error: buffer size should be greater than 2")
return
result = []
### START CODE HERE ###
# --- Sort Phase ---
sorted_set = []
# Read buffer_size pages at a time into memory and
# sort them, and write out a sub-record set (i.e. variable: subset)
start_pos = 0
N = len(dataset)
while True:
if ((N - start_pos) > buffer_size):
# read B-records from the input, where B = buffer_size
subset = dataset[start_pos:start_pos + buffer_size]
# sort the subset (using qucksort defined above)
sorted_subset = qsort(subset)
sorted_set.append(sorted_subset)
start_pos += buffer_size
else:
# read the last B-records from the input, where B is less than buffer_size
subset = dataset[start_pos:]
# sort the subset (using qucksort defined above)
sorted_subset = qsort(subset)
sorted_set.append(sorted_subset)
break
# — Merge Phase —
merge_buffer_size = buffer_size – 1
dataset = sorted_set
while True:
merged_set = []
N = len(dataset)
start_pos = 0
while True:
if ((N – start_pos) > merge_buffer_size):
# read C-record sets from the merged record sets, where C = merge_buffer_size
subset = dataset[start_pos:start_pos + merge_buffer_size]
merged_set.append(k_way_merge(subset)) # merge lists in subset
start_pos += merge_buffer_size
else:
# read C-record sets from the merged sets, where C is less than merge_buffer_size
subset = dataset[start_pos:]
merged_set.append(k_way_merge(subset)) # merge lists in subset
break
dataset = merged_set
if (len(dataset) <= 1): # if the size of merged record set is 1, then stop
result = merged_set
break
### END CODE HERE ###
return result
In [0]:
# Include this package for parallel processing
import multiprocessing as mp
def parallel_binary_merge_sorting(dataset, n_processor, buffer_size):
"""
Perform a parallel binary-merge sorting method
Arguments:
dataset -- entire record set to be sorted
n_processor -- number of parallel processors
buffer_size -- buffer size determining the size of each sub-record set
Return:
result -- the merged record set
"""
if (buffer_size <= 2):
print("Error: buffer size should be greater than 2")
return
result = []
### START CODE HERE ###
### END CODE HERE ###
return result
In [0]:
result = parallel_binary_merge_sorting(R, 10, 20)
print("Final Result:" + str(result))