FIT5148 – Big data management and processing¶
Activity: Parallel Search¶
In this activity, we will learn and build different parallel search algorithms on various data partitioning strategies. This work will help you to better understand and familiarise you with how parallel search algorithms can work and be implemented.
Instructions:
• You will be using Python 3.
• Read the code base and comments carefully
• After understanding the provided function, run the cell right below it to check if the result is correct.
• Read carefully all the Exercise tasks below in each subheading. There are some code blocks that you need to complete yourself.
After this assignment you will:
• Be able to use iPython Notebooks
• Be able to build data partitionining strategies
• Be able to build basic search algorithms
• Be able to understand and build parallel search algorithms based on data partitioning techniques and basic search algorithms
Let’s get started!
What you need to remember:
• Run your cells using SHIFT+ENTER (or “Run cell”)
Dataset¶
In this activity, we use the following example dataset D which is a simply array to demonstrate data partitioning and parallel searching. Run the cell below:
In [0]:
# Our example dataset D consisting of 30 numeric elements.
D = [55,30,68,39,1,
4,49,90,34,76,
82,56,31,25,78,
56,38,32,88,9,
44,98,11,70,66,
89,99,22,23,26]
print(D)
1. Data Partitioning¶
Data partitioning is the fundamental step for parallel search algorithms as parallelism in query and search processing is achieved through data partionining. In this activity, we will consider the following four partitioning strategies:
• Round-robin data partitioning,
• Hash data partitioning,
• Range data partitioning, and
• Random-unequal data partitioning
1.1 Round-robin data partitioning¶
Round-robin data partitioning is the simplest data partitioning method in which each record in turn is allocated to a processing element (simply processor). Since it distributes the data evenly among all processors, it is also known as “equal-partitioning”.
Exercise: Understand the following code of round-robin data partitioning.
In [0]:
# Round-robin data partitionining function
def rr_partition(data, n):
“””
Perform data partitioning on data
Arguments:
data — an input dataset which is a list
n — the number of processors
Return:
result — the paritioned subsets of D
“””
result = []
for i in range(n):
result.append([])
### START CODE HERE ###
# For each bin, perform the following
for index, element in enumerate(data):
# Calculate the index of the bin that the current data point will be assigned
index_bin = (int) (index % n)
#print(str(index) + “:” + str(element))
result[index_bin].append(element)
### END CODE HERE ###
return result
In [0]:
# print the partitioned result
rr_partition(D, 3)
1.2 Hash data partitioning¶
Hash data partitioning makes a partition based on a particular attribute using a hash function. The result of a hash function determines the processor where the record will be placed. Thus, all records within a partition have the same hash value.
Exercise: Understand the following code of hash data partitioning. First, we define a very simple hash function.
In [0]:
# Define a simple hash function.
def s_hash(x, n):
“””
Define a simple hash function for demonstration
Arguments:
x — an input record
n — the number of processors
Return:
result — the hash value of x
“””
### START CODE HERE ###
result = x%n
### END CODE HERE ###
return result
In [0]:
# print a hash value
s_hash(11, 3)
In [0]:
# Hash data partitionining function.
# We will use the “s_hash” function defined above to realise this partitioning
def h_partition(data, n):
“””
Perform hash data partitioning on data
Arguments:
data — an input dataset which is a list
n — the number of processors
Return:
result — the paritioned subsets of D
“””
### START CODE HERE ###
dic = {} # We will use a dictionary
for x in data: # For each data record, perform the following
h = s_hash(x, n) # Get the hash key of the input
if (h in dic.keys()): # If the key exists
s = dic[h]
s.add(x)
dic[h] = s # Add the new input to the value set of the key
else: # If the key does not exist
s = set() # Create an empty value set
s.update({x})
dic[h] = s # Add the value set to the key
### END CODE HERE ###
return dic
In [0]:
# print the partitioned result
h_partition(D, 3)
1.3 Range data partitioning¶
Range data partitioning records based on a given range of the partitioning attribute. For example,the student table is partitioned based on “Last Name” based on the alphabetical order (i.e. A ~ Z).
Exercise: Understand the following code of range data partitioning. As our dataset D is represented by numerical values, we partition D according to numeric values.
In [0]:
# Range data partitionining function
def range_partition(data, range_indices):
“””
Perform range data partitioning on data
Arguments:
data — an input dataset which is a list
range_indices — the index list of ranges to be split
Return:
result — the paritioned subsets of D
“””
result = []
### START CODE HERE ###
# First, we sort the dataset according their values
new_data = list(data)
new_data.sort()
# Calculate the number of bins
n_bin = len(range_indices)
# For each bin, perform the following
for i in range(n_bin):
# Find elements to be belonging to each range
s = [x for x in new_data if x < range_indices[i]]
# Add the partitioned list to the result
result.append(s)
# Find the last element in the previous partition
last_element = s[len(s)-1]
# Find the index of of the last element
last = new_data.index(last_element)
# Remove the partitioned list from the dataset
new_data = new_data[int(last)+1:]
# Append the last remaining data list
result.append([x for x in new_data if x >= range_indices[n_bin-1]])
### END CODE HERE ###
return result
In [0]:
# print the partitioned result
range_partition(D, [40, 80])
1.4 Random-unequal data partitioning¶
In random-unequal data partitioning, the size of each partition is likely to be unequal. The word “random” in the name indicates that the records within each partition are not grouped semantically, but are randomly allocated.
Exercise: Implement random-unequal data partitioning based on your definition. Referring to the function, rr_partition(), complete the following code block between “### START CODE HERE ###” and “### END CODE HERE ###”.
In [0]:
import random
# Random-unequal data partitionining function
def re_partition(data, n):
“””
Perform random-unequal data partitioning on data
Arguments:
data — an input dataset which is a list
n — the number of processors
Return:
result — the paritioned subsets of D
“””
result = []
### START CODE HERE ###
for i in range(n):
result.append([])
for item in data:
randomProcessor = random.randint(0,n-1)
result[randomProcessor].append(item)
### END CODE HERE ###
return result
In [0]:
# print the partitioned result.
# Compare the result with the one obtained from rr_partition(.).
re_partition(D, 3)
Exercise: Generate the partitioned outputs in the form of the “hash data partitioning”. That is, each partition can be represented as “partition id: [list of elements]”. The “partition id” is any unique number or label.
2: Search Algorithms¶
Before discussing parallel search, it is important to know how searching is done serially. Making use of serial search algorithms with data partitioning will become the basis for parallel search algorithms. In this activity, we will consider the following two serial search algorithms:
• Linear Search
• Binary Search
2.1 Linear Search¶
Linear search is the simplest approach to searching. Given an unsorted table of records, it scans the entire table to search for a given record. As this is performed for each record one by one until either the desired record is found or the end of table is reached, this algorithms is also known as an “exhaustive search.”
Exercise: We use the dataset D to understand how this algorithm works. Each element in D will be considered as a data record. Let’s understand how linear search works on D, and analyse its performance by the “O” notation which is normally used to measure the complexity of an algorithm.
In [0]:
# Linear search function
def linear_search(data, key):
“””
Perform linear search on data for the given key
Arguments:
data — an input dataset which is a list or a numpy array
key — an query record
Return:
result — the position of searched record
“””
matched_record = None
position = -1 # not found position
### START CODE HERE ###
for x in data:
if x == key: # If x is matched with key
matched_record = x
position = data.index(x) # Get the index of x
break
### END CODE HERE ###
return position, matched_record
In [0]:
linear_search (D, 31)
2.2 Binary Search¶
Binary search requires that the list be already completely in order. It starts by comparing the key with the middle entry of an ordered table. If it finds the matched record, it returns its index, otherwise, this process continues using either the lower or upper half of the table (depending on the value of the key).
Exercise: Build a binary search function by completing the code block below between “### START CODE HERE ###” and “### END CODE HERE ###”. Discuss its complexity by comparing with the linear search algorithm.
In [0]:
# Binary search function
def binary_search(data, key):
“””
Perform binary search on data for the given key
Arguments:
data — an input dataset which is a list
key — an query record
Return:
result — the position of searched record
“””
matched_record = None
position = -1 # not found position
lower = 0
middle = 0
upper = len(data)-1
### START CODE HERE ###
while (lower <= upper):
# calculate middle: the half of lower and upper
middle = int((lower + upper)/2)
if (key == data[middle]):
# if we find the matched one
position = middle
matched_record = data[middle]
break
elif (key > data[middle]):
# reduce to the top half of the list
lower = middle + 1
else:
# reduce to the bottom half of the list
upper = middle – 1
### END CODE HERE ###
return position, matched_record
In [0]:
sortD = list(D) # Copy the dataset
sortD.sort() # Sort the dataset first
binary_search (sortD, 31)
Expected Output:
** binary_search (sortD, 31) **
(9, 31)
Note: Perform the binary search function above with the unsorted data D. What will happen?
3: Parallel Search Algorithms¶
Parallel search algorithms have three main elements:
• processor activation or involvement
• local searching method
• key comparison
Processor activation or involvement indicates the number of processors to be used by the algorithm.
Local searching method is the searching method to be applied to the processor(s). The search method is dependent upon the data ordering. If the data has already been sorted, then a binary search can be used, and otherwise, a linear search can be conducted.
Searching basically consists of comparing the data from the table with the condition specified by the user. When a match is found, there are two options: whether to continue the comparison process in order to find more matches, or whether to stop the entire process. It is obvious that the key comparison is dependent upon whether the search attribute values are, or are not, unique.
3.1 Parallel Searching for Exact Match¶
In this activity, we will understand and practice how parallel searching works for exact match search for a given query. Note that the number of processors to perform parallel searching is dependent on the data partitioning methods. For example, only one processor is needed if the data is already partioned with a range partitioning. In this case, there is no parallelism.
Exercise: We build a parallel search algorithm for exact match. Processor activation will be given by the user as input. As a location searching method, we will use the above two search functions: linear search function (i.e. linear_search()) and binary search function (i.e. binary_search()). As a local comparison method, we assume that we stop when a match is found for brevity. As data partitioning methods, we attempt to use the four different partitioning methods we built above:
• Round-robin data partitioning (i.e. rr_partition()),
• Hash data partitioning (i.e. h_partition()),
• Range data partitioning (i.e. range_partition()), and
• Random-unequal data partitioning (i.e. re_partition())
In [0]:
import multiprocessing as mp # For multiprocessing
# Parallel searching algorithm for exact match
def parallel_search_exact(data, query, n_processor, m_partition, m_search):
“””
Perform parallel search for exact match on data for the given key
Arguments:
data — an input dataset which is a list
query — a query record
n_processor — the number of parallel processors
m_partition — a data partitioning method
m_search — a search method
Return:
results — the matched record information
“””
results = []
# Pool: a Python method enabling parallel processing.
# We need to set the number of processes to n_processor,
# which means that the Pool class will only allow ‘n_processor’ processes
# running at the same time.
pool = mp.Pool(processes=n_processor)
### START CODE HERE ###
print(“data partitioning:” + str(m_partition.__name__))
print(“searching method:” + str(m_search.__name__))
if m_partition == range_partition: # for range partitioning method
# Perform data partitioning:
# 2nd parameter is a list of maximum range values (3 ranges)
range_indices = [40, 80] # ideally pass this into the function as a variable
DD = m_partition(data, range_indices)
for index,element in enumerate(range_indices):
if query < element:
m = DD[index]
break
else:
m = DD[-1]
result = pool.apply(m_search, [m, query])
results.append(result)
elif m_partition == h_partition: # for hash partitioning method
# Perform data partitioning first
DD = m_partition(data, n_processor)
# Each element in DD has a pair (hash key: records)
query_hash = s_hash(query, n_processor)
d = list(DD[query_hash])
result = pool.apply(m_search, [d, query])
results.append(result)
else: # for round-robin or random-unequal partitioning method
# Perform data partitioning first
DD = m_partition(data, n_processor)
for d in DD: # Perform parallel search on all data partitions
print(d)
result = pool.apply_async(m_search, [d, query])
output = result.get() # if you use pool.apply_sync(), uncomment this.
results.append(output) # if you use pool.apply_sync(), uncomment this.
#results.append(result) # if you use pool.apply_sync(), comment out this.
"""
The method 'pool.apply()' will lock the function call until the function call is finished.
The method 'pool.apply_sync()' will not lock the function call,the call results will return immediately instead
of waiting for the result, and each method call will be alloacted to a different process.
So in this case,pool.apply_async() is processing the search in parallel,
while the pool.apply() is not.
The reason we can use pool.apply() to do search for range_partition and hash_partition data
is that as long as we know which partition to do search,we don't need to search in parallel.
"""
### END CODE HERE ###
print(results)
Exercise: Test each of the following functions one by one.
In [0]:
# Common input values
data = D # input data
sortD = list(data)
sortD.sort()
query = 31 # query term
n_processor = 3 # number of parallel processors
### parallel searching for exact match
# round-robin partition, linear_search
results = parallel_search_exact(data, query, n_processor, rr_partition, linear_search)
print(results)
# round-robin partition, binary_search
results = parallel_search_exact(sortD, query, n_processor, rr_partition, binary_search)
print(results)
# random-unequal partition, linear_search
results = parallel_search_exact(data, query, n_processor, re_partition, linear_search)
print(results)
# random-equal partition, binary_search
results = parallel_search_exact(sortD, query, n_processor, re_partition, binary_search)
print(results)
# Hash partition, linear_search
results = parallel_search_exact(data, query, n_processor, h_partition, linear_search)
print(results)
# Hash partition, binary_search
results = parallel_search_exact(sortD, query, n_processor, h_partition, binary_search)
print(results)
# The above function can't find the query term. Can you identify why?
# Range partition, linear_search
results = parallel_search_exact(data, query, n_processor, range_partition, linear_search)
print(results)
# Range partition, binary_search
results = parallel_search_exact(sortD, query, n_processor, range_partition, binary_search)
print(results)
Note: What do we see from the results? We see the set of the pairs each consistings of the position and value of the matched recored given a query. The -1 position indicates the query was not found. If found, a position is > -1.
3.2 Parallel Searching for Range Selection (Continuous)¶
In this activity, we will build a parallel search algorithm for range selection (continuous) for a given query. In this practice, we attempt to implement one particular search algorithm which is instructed below.
Exercise: Build a parallel search algorithm that uses the linear search algorithm (i.e. linear_search()) and is able to work with the hash partitioning method (i.e. h_partition()). Complete the code block between “### START CODE HERE ###” and “### END CODE HERE ###”.
In [0]:
from multiprocessing import Pool
# Parallel searching algorithm for range selection
def parallel_search_range(data, query_range, n_processor):
“””
Perform parallel search for range selection on data for the given key
Arguments:
data — the input dataset which is a list
query_range — a query record in the form of a range (e.g. [30, 50])
n_processor — the number of parallel processors
Return:
results — the matched record information
“””
results = []
pool = Pool(processes=n_processor)
### START CODE HERE ###
# Perform data partitioning first
DD = h_partition(data, n_processor)
for query in range(query_range[0], query_range[1], 1):
# Each element in DD has a pair (hash key: records)
query_hash = s_hash(query, n_processor)
d = list(DD[query_hash])
result = pool.apply(linear_search, [d, query])
results.append(result)
### END CODE HERE ###
return results
In [0]:
# Range partition, linear_search
results = parallel_search_range(data, [30, 40], n_processor)
print(results)
Expected Output:
** parallel_search_range(data, [30, 40], n_processor) **
[(6, 30), (11, 31), (0, 32), (1, 34), (3, 38), (2, 39)]
Can you interprete the output?
Congratulations on finishing this activity!
Wrap up what we’ve learned:
• We are now able to build different data partitioning strategies
• We are familiar with basic search algorithms – linear search and binary search algorithms
• More importantly, we can now build parallel search algorithms based on various data partitioning strategies and basic search algorithms
• We understand how parallel search algorithms can be working for exact query match or range selection