Efficient Parallel Set-Similarity Joins Using MapReduce
Rares Vernica Department of Computer Science
University of California, BSTRACT
In this paper we study how to efficiently perform set-simi- larity joins in parallel using the popular MapReduce frame- work. We propose a 3-stage approach for end-to-end set- similarity joins. We take as input a set of records and output a set of joined records based on a set-similarity condition. We efficiently partition the data across nodes in order to balance the workload and minimize the need for replication. We study both self-join and R-S join cases, and show how to carefully control the amount of data kept in main memory on each node. We also propose solutions for the case where, even if we use the most fine-grained partitioning, the data still does not fit in the main memory of a node. We report results from extensive experiments on real datasets, synthet- ically increased in size, to evaluate the speedup and scaleup properties of the proposed algorithms using Hadoop.
Copyright By PowCoder代写 加微信 powcoder
Categories and Subject Descriptors
H.2.4 [Database Management]: Systems—query process- ing, parallel databases
General Terms
Algorithms, Performance
1. INTRODUCTION
There are many applications that require detecting simi- lar pairs of records where the records contain string or set- based data. A list of possible applications includes: de- tecting near duplicate web-pages in web crawling [14], doc- ument clustering[5], plagiarism detection [15], master data management, making recommendations to users based on their similarity to other users in query refinement [22], min- ing in social networking sites [25], and identifying coalitions of click fraudsters in online advertising [20]. For example, in master-data-management applications, a system has to identify that names “ . Smith”, “Smith, John”, and
“ illiam Smith” are potentially referring to the same
Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee.
SIGMOD’10, June 6–11, 2010, Indianapolis, Indiana, USA. Copyright 2010 ACM 978-1-4503-0032-2/10/06 …$10.00.
. of Computer Science
University of California, Li Department of Computer Science
University of California, Irvine
person. As another example, when mining social networking sites where users’ preferences are stored as bit vectors (where a “1” bit means interest in a certain domain), applications want to use the fact that a user with preference bit vector “[1,0,0,1,1,0,1,0,0,1]” possibly has similar interests to a user with preferences “[1,0,0,0,1,0,1,0,1,1]”.
Detecting such similar pairs is challenging today, as there is an increasing trend of applications being expected to deal with vast amounts of data that usually do not fit in the main memory of one machine. For example, the Google N-gram dataset [27] has 1 trillion records; the GeneBank dataset [11] contains 100 million records and has a size of 416 GB. Applications with such datasets usually make use of clusters of machines and employ parallel algorithms in order to efficiently deal with this vast amount of data. For data- intensive applications, the MapReduce [7] paradigm has re- cently received a lot of attention for being a scalable parallel shared-nothing data-processing platform. The framework is able to scale to thousands of nodes [7]. In this paper, we use MapReduce as the parallel data-processing paradigm for finding similar pairs of records.
When dealing with a very large amount of data, detecting similar pairs of records becomes a challenging problem, even if a large computational cluster is available. Parallel data- processing paradigms rely on data partitioning and redis- tribution for efficient query execution. Partitioning records for finding similar pairs of records is challenging for string or set-based data as hash-based partitioning using the en- tire string or set does not suffice. The contributions of this paper are as follows:
• We describe efficient ways to partition a large dataset across nodes in order to balance the workload and mini- mize the need for replication. Compared to the equi-join case, the set-similarity joins case requires “partitioning” the data based on set contents.
• We describe efficient solutions that exploit the MapRe- duce framework. We show how to efficiently deal with problems such as partitioning, replication, and multiple inputs by manipulating the keys used to route the data in the framework.
• We present methods for controlling the amount of data kept in memory during a join by exploiting the properties of the data that needs to be joined.
• We provide algorithms for answering set-similarity self- join queries end-to-end, where we start from records con- taining more than just the join attribute and end with actual pairs of joined records.
• We show how our set-similarity self-join algorithms can be extended to answer set-similarity R-S join queries.
• We present strategies for exceptional situations where, even if we use the finest-granularity partitioning method, the data that needs to be held in the main memory of one node is too large to fit.
The rest of the paper is structured as follows. In Sec- tion 2 we introduce the problem and present the main idea of our algorithms. In Section 3 we present set-similarity join algorithms for the self-join case, while in Section 4 we show how the algorithms can be extended to the R-S join case. Next, in Section 5, we present strategies for handling the insufficient-memory case. A performance evaluation is presented in Section 6. Finally, we discuss related work in Section 7 and conclude in Section 8. A longer technical re- port on this work is available in [26].
2. PRELIMINARIES
In this work we focus on the following set-similarity join application: identifying similar records based on string sim- ilarity. Our results can be generalized to other set-similarity join applications.
Problem statement: Given two files of records, R and S, a set-similarity function, sim, and a similarity threshold τ, we define the set-similarity join of R and S on R.a and S.a as finding and combining all pairs of records from R and S where sim(R.a, S.a) ≥ τ .
We map strings into sets by tokenizing them. Examples of tokens are words or q-grams (overlapping sub-strings of fixed length). For example, the string “I will call back” can be tokenized into the word set [I, will, call, back]. In order to measure the similarity between strings, we use a set- similarity function such as Jaccard or Tanimoto coefficient, cosine coefficient, etc.1. For example, the Jaccard similarity function for two sets x and y is defined as: jaccard(x,y) =
|x∩y| . Thus, the Jaccard similarity between strings “I will |x∪y|
call back” and “I will call you soon” is 63 = 0.5.
In the remainder of the section, we provide an introduc- tion to the MapReduce paradigm, present the main idea of our parallel set-similarity join algorithms, and provide an overview of filtering methods for detecting set-similar pairs.
2.1 MapReduce
MapReduce [7] is a popular paradigm for data-intensive parallel computation in shared-nothing clusters. Example applications for the MapReduce paradigm include process- ing crawled documents, Web request logs, etc. In the open- source community, Hadoop [1] is a poplar implementation of this paradigm. In MapReduce, data is initially partitioned across the nodes of a cluster and stored in a distributed file system (DFS). Data is represented as (key, value) pairs. The computation is expressed using two functions:
map (k1,v1) → list(k2,v2); reduce (k2,list(v2)) → list(k3,v3).
Figure 1 shows the data flow in a MapReduce computa- tion. The computation starts with a map phase in which the map functions are applied in parallel on different partitions
1The techniques described in this paper can also be used for approximate string search using the edit or Levenshtein distance [13].
Figure 1: Data flow in a MapReduce computation.
of the input data. The (key, value) pairs output by each map function are hash-partitioned on the key. For each par- tition the pairs are sorted by their key and then sent across the cluster in a shuffle phase. At each receiving node, all the received partitions are merged in a sorted order by their key. All the pair values that share a certain key are passed to a single reduce call. The output of each reduce function is written to a distributed file in the DFS.
Besides the map and reduce functions, the framework also allows the user to provide a combine function that is ex- ecuted on the same nodes as mappers right after the map functions have finished. This function acts as a local reducer, operating on the local (key, value) pairs. This function al- lows the user to decrease the amount of data sent through the network. The signature of the combine function is:
combine (k2,list(v2)) → list(k2,list(v2)).
Finally, the framework also allows the user to provide initial- ization and tear-down function for each MapReduce function and customize hashing and comparison functions to be used when partitioning and sorting the keys. From Figure 1 one can notice the similarity between the MapReduce approach and query-processing techniques for parallel DBMS [8, 21].
2.2 Parallel Set-Similarity Joins
One of the main issues when answering set-similarity joins using the MapReduce paradigm, is to decide how data should be partitioned and replicated. The main idea of our al- gorithms is the following. The framework hash-partitions the data across the network based on keys; data items with the same key are grouped together. In our case, the join- attribute value cannot be directly used as a partitioning key. Instead, we use (possibly multiple) signatures generated from the value as partitioning keys. Signatures are defined such that similar attribute values have at least one signature in common. Possible example signatures include: the list of word tokens of a string and ranges of similar string lengths. For instance, the string “I will call back” would have 4 word-based signatures: “I”, “will”, “call”, and “back”.
We divide the processing into three stages:
• Stage 1: Computes data statistics in order to generate good signatures. The techniques in later stages utilize these statistics.
• Stage 2: Extracts the record IDs (“RID”) and the join- attribute value from each record and distributes the RID and the join-attribute value pairs so that the pairs shar- ing a signature go to at least one common reducer. The reducers compute the similarity of the join-attribute val- ues and output RID pairs of similar records.
• Stage 3: Generates actual pairs of joined records. It uses the list of RID pairs from the second stage and the original data to build the pairs of similar records.
An alternative to using the second and third stages is to use one stage in which we let key-value pairs carry com- plete records, instead of projecting records on their RIDs and join-attribute values. We implemented this alternative and noticed a much worse performance, so we do not con- sider this option in this paper.
2.3 Set-Similarity Filtering
Efficient set-similarity join algorithms rely on effective filters, which can decrease the number of candidate pairs whose similarity needs to be verified. In the past few years, there have been several studies involving a technique called prefix filtering [6, 4, 29], which is based on the pigeonhole principle and works as follows. The tokens of strings are ordered based on a global token ordering. For each string, we define its prefix of length n as the first n tokens of the ordered set of tokens. The required length of the prefix de- pends on the size of the token set, the similarity function, and the similarity threshold. For example, given the string, s, “I will call back” and the global token ordering {back, call, will, I}, the prefix of length 2 of s is [back, call]. The prefix filtering principle states that similar strings need to share at least one common token in their prefixes. Using this principle, records of one relation are organized based on the tokens in their prefixes. Then, using the prefix tokens of the records in the second relation, we can probe the first relation and generate candidate pairs. The prefix filtering principle gives a necessary condition for similar records, so the generated candidate pairs need to be verified. A good performance can be achieved when the global token ordering corresponds to their increasing token-frequency order, since fewer candidate pairs will be generated.
A state-of-the-art algorithm in the set-similarity join liter- ature is the PPJoin+ technique presented in [29]. It uses the prefix filter along with a length filter (similar strings need to have similar lengths [3]). It also proposed two other filters: a positional filter and a suffix filter. The PPJoin+ technique provides a good solution for answering such queries on one node. One of our approaches is to use PPJoin+ in parallel on multiple nodes.
3. SELF-JOIN CASE
In this section we present techniques for the set-similarity self-join case. As outlined in the previous section, the solu- tion is divided into three stages. The first stage builds the global token ordering necessary to apply the prefix-filter.2 It scans the data, computes the frequency of each token, and sorts the tokens based on frequency. The second stage uses the prefix-filtering principle to produce a list of similar-RID pairs. The algorithm extracts the RID and join-attribute value of each record, and replicates and re-partitions the records based on their prefix tokens. The MapReduce frame- work groups the RID and join-attribute value pairs based on the prefix tokens. It is worth noting that using the infre- quent prefix tokens to redistribute the data helps us avoid
2An alternative would be to apply the length filter. We explored this alternative but the performance was not good because it suffered from the skewed distribution of string lengths.
unbalanced workload due to token-frequency skew. Each group represents a set of candidates that are cross paired and verified. The third stage uses the list of similar-RID pairs and the original data to generate pairs of similar records.
3.1 Stage 1: Token Ordering
We consider two methods for ordering the tokens in the first stage. Both approaches take as input the original records and produce a list of tokens that appear in the join-attribute value ordered increasingly by frequency.
3.1.1 Basic Token Ordering (BTO)
Our first approach, called Basic Token Ordering (“BTO”), relies on two MapReduce phases. The first phase computes the frequency of each token and the second phase sorts the tokens based on their frequencies. In the first phase, the map function gets as input the original records. For each record, the function extracts the value of the join attribute and tokenizes it. Each token produces a (token, 1) pair. To minimize the network traffic between the map and reduce functions, we use a combine function to aggregates the 1’s output by the map function into partial counts. Figure 2(a) shows the data flow for an example dataset, self-joined on an attribute called “a”. In the figure, for the record with RID 1, the join-attribute value is “A B C”, which is tokenized as “A”,
“B”, and “C”. Subsequently, the reduce function computes the total count for each token and outputs (token, count) pairs, where “count” is the total frequency for the token.
The second phase uses MapReduce to sort the pairs of to- kens and frequencies from the first phase. The map function swaps the input keys and values so that the input pairs of the reduce function are sorted based on their frequencies. This phase uses exactly one reducer so that the result is a totally ordered list of tokens. The pseudo-code of this algorithm and other algorithms presented is available in [26].
3.1.2 Using One Phase to Order Tokens (OPTO)
An alternative approach to token ordering is to use one MapReduce phase. This approach, called One-Phase Token Ordering (“OPTO”), exploits the fact that the list of tokens could be much smaller than the original data size. Instead of using MapReduce to sort the tokens, we can explicitly sort the tokens in memory. We use the same map and combine functions as in the first phase of the BTO algorithm. Similar to BTO we use only one reducer. Figure 2(b) shows the data flow of this approach for our example dataset. The reduce function in OPTO gets as input a list of tokens and their partial counts. For each token, the function computes its total count and stores the information locally. When there is no more input for the reduce function, the reducer calls a tear-down function to sort the tokens based on their counts, and to output the tokens in an increasing order of their counts.
3.2 Stage 2: RID-Pair Generation
The second stage of the join, called “Kernel”, scans the original input data and extracts the prefix of each record using the token order computed by the first stage. In general the list of unique tokens is much smaller and grows much slower than the list of records. We thus assume that the list of tokens fits in memory. Based on the prefix tokens, we extract the RID and the join-attribute value of each record, and distribute these record projections to reducers. The
(a) Basic Token Ordering (BTO) (b) One-Phase Token Ordering (OPTO)
Figure 2: Example data flow of Stage 1. (Token ordering for a self-join on attribute “a”.)
join-attribute values that share at least one prefix token are verified at a reducer.
Routing Records to Reducers. We first take a look at two possible ways to generate (key, value) pairs in the map function. (1) Using Individual Tokens: This method treats each token as a key. Thus, for each record, we would generate a (key, value) pair for each of its prefix tokens. Thus, a record projection is replicated as many times as the number of its prefix tokens. For example, if the record value is “A B C D” and the prefix tokens are “A”, “B”, and “C”, we would output three (key, value) pairs, corresponding to the three tokens. In the reducer, as the values get grouped by prefix tokens, all the values passed in a reduce call share the same prefix token.
(2) Using Grouped Tokens: This method maps multiple tokens to one synthetic key, thus can map different tokens to the same key. For each record, the map function generates one (key, value) pair for each of the groups of the prefix tokens. In our running example of a record “A B C D”, if tokens “A” and “B” belong to one group (denoted by “X”), and token “C” belongs to another group (denoted by “Y”), we output two (key, value) pairs, one for key “X” and one for key “Y”. Two records that share the same token group do not necessarily share any prefix token. Continuing our running example, for record “E F G”, if its prefix token “E” belongs to group “Y”, then the records “A B C D” and “E F G” share token group “Y” but do not share any prefix token. So, in the reducer, as the values get grouped by their token group, no two values share a prefix token. This method can help us have fewer replications of record projections. One way to define the token groups in order to balance data across reducers is the following. We use the token ordering produced in the first stage, and assign the tokens to groups in a Round-Robin order. In this way we balance the sum of token frequencies across groups. We study the effect of the number of groups in Section 6. For both routing strategies, since two records might share more that o
程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com