IT代考 COMP9312_22T2

Big Graph Analysis
COMP9312_22T2

– Distributed Graph Algorithms

Copyright By PowCoder代写 加微信 powcoder

– Graph Database
– Other Graph Problems

Distributed Graph Algorithms

Distributed Processing is Non-Trivial
How to assign tasks to different workers in an efficient way? What happens if tasks fail?
How do workers exchange results?
How to synchronize distributed tasks allocated to different workers?

Big Data Tools

Apache Hadoop Ecosystem
We focus on the distributed computing model and algorithms. Explore more in COMP9311~

MapReduce for General Big Data Processing
Origin from Google
[OSDI’04] MapReduce: Simplified Data Processing on Large Clusters
Programming model for parallel data processing
For large-scale data processing
• Exploitslargesetofcommoditycomputers • Executesprocessinadistributedmanner
• Offershighavailability
Hadoop MapReduce is an implementation of MapReduce • MapReduceisacomputingparadigm(Google)
• HadoopMapReduceisanopen-sourcesoftware

Typical Big Data Problem
Iterate over a large number of records Extract something of interest from each Shuffle and sort intermediate results Aggregate intermediate results Generate final output
Key idea: provide a functional abstraction for these two operations

Idea of MapReduce
Inspired by the map and reduce functions in functional programming
We can view map as a transformation over a dataset
• Thistransformationisspecifiedbythefunctionf
• Eachfunctionalapplicationhappensinisolation
• Theapplicationofftoeachelementofadatasetcanbeparallelizedinastraightforwardmanner
We can view reduce as an aggregation operation
• Theaggregationisdefinedbythefunctiong
• Datalocality:elementsinthelistmustbe“broughttogether”
• Ifwecangroupelementsofthelist,alsothereducephasecanproceedinparallel
The framework coordinates the map and reduce phases: • Groupingintermediateresultshappensinparallel

MapReduce Data Flow in Hadoop
1. Mappers read from HDFS
2. Map output is partitioned by key and sent to Reducers
3. Reducers sort input by key
4. Reduce output is written to HDFS
Intermediate results are stored on local FS of Map and Reduce workers

MapReduce Example – WordCount

BFS (SSSP for Unweighted Graphs)

Intuition for parallel BFS
We consider unweighted graphs (BFS) below for simplicity.
Solution to the problem can be defined inductively Here’s the intuition:
• DISTANCETO(s)=0
• Forallneighborspofs,
DISTANCETO(p) = 1
• ForeverynodeuwhichistheneighborofsomeothersetofnodesM,DISTANCETO(u)=1+
min(DISTANCETO(m), m Î M)
d1 m1 … m2

Visualizing Parallel BFS

From Intuition to Algorithm
Data representation:
• Key: node n
• Value: d (distance from start), adjacency list (list of nodes reachable from n) • Initialization:forallnodesexceptforstartnode,d=¥
• “mÎadjacencylist:emit(m,d+1)
Sort/Shuffle
• Groups distances by reachable nodes
• Selects minimum distance path for each reachable node
• Additional bookkeeping needed to keep track of actual path

BFS in MapReduce
class Mapper
method Map(nid n, node N)
d ← N.Distance
Emit(nid n,N.AdjacencyList)
for all nodeid m ∈ N.AdjacencyList do
Emit(nid m, d+1)
//Pass along graph structure
//Emit distances to reachable nodes
class Reducer
method Reduce(nid m, [d1, d2, . . .])
dmin←∞ M←∅
for all d ∈ counts [d1, d2, . . .] do if IsNode(d) then
M.AdjacencyList ← d else if d < dmin then dmin ← d M.Distance ← dmin Emit(nid m, node M) //Recover graph structure //Look for shorter distance //Update shortest distance Multiple Iterations Needed The input of Mapper is the output of Reducer in the previous iteration. Multiple iterations are needed to explore entire graph. Preserving graph structure: • Problem: Where did the adjacency list go? • Solution: mapper emits (n, adjacency list) as well MapReduce for Graphs? Graph Computing Paradigm: Graph algorithms are expressed in multiple MR iterations. Data must be reloaded and reprocessed at each iteration. Need an extra MR Job for each iteration to detect termination condition. Workload unbalance by various vertex degree For each vertex: do something; neighbors; Iterative MapReduce Data Data Data Data Data Data Data Data Data Data Data Data Data Data Iterations Data Data Data Data Data Data Data Data Data Data Data Data Only a subset of data needs computation: Iterative MapReduce Data Data Data Data Data Data Data Data Iterations Data Data Data Data Data Data Data Data Data Data Data Data Data Data ! System is not optimized for iteration: 20 Disk Penalty Startup Penalty Disk Penalty Startup Penalty Disk Penalty StartupPenalty Pregel for Distributed Graph Processing Developed by Google Computing in Bulk Synchronous Parallel (BSP) model Computes in vertex-centric fashion Scalable and fault tolerant Pregel for Distributed Graph Processing Think Like a Vertex Model Bulk Synchronous Parallel (BSP) - Computations are consist of a sequence of iterations, called superstep. - During superstep, framework calls user-defined computation function on every vertex. - Computation function specifies behavior at a single vertex V in a superstep. - Supersteps end with barrier synchronization. - All communications are from superstep S to superstep S+1. Bulk Synchronous Parallel (BSP) Terminates when all vertices are inactive or no messages to be delivered Vertex in Pregel - Can mutate local value and value on outgoing edges. - Can send arbitrary number of messages to any other vertices. - Receive messages from previous superstep. - Can mutate local graph topology. - All active vertices participate in the computation in a superstep. • Consists of a message value and destination vertex. • Typically sent along outgoing edges. • Can be sent to any vertex whose identifier is known. • Are only available to receiver at the beginning of superstep. • Guaranteed to be delivered. • Guaranteed not to be duplicated. • Can be out of order. Vertex in Pregel Initially, every vertices are active. - A vertex can deactivate itself by vote to halt. - Deactivated vertices don't participate in computation. - Vertices are reactivated upon receiving message. The C++ API of Pregel Override this! in msgs Modify vertex value Implementation for SSSP Single-Source Shortest Path (SSSP) in Pregel ¥1¥ 023946 0 is the source vertex Inactive Vertex Active Vertex x Edge weight SSSP in Pregel 10 0¥23946 Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel 10 1 ¥ 023946 Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel 0 14 2 3 9 4 6 Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel Inactive Vertex Active Vertex x Edgeweight SSSP in Pregel Inactive Vertex Active Vertex x Edgeweight Combiner in Pregel • Sending messages incurs overhead. • System calls Combine() for several messages intended for a vertex into a single message containing the combined message. • No guarantees which messages will be combined or the order of combination. • Should be enabled for commutative and associative messages. • Not enabled by default. Combiner in Pregel (Cont.) Reduce message traffic and disk space Combiner in SSSP class MinIntCombiner : public Combiner {
virtual void Combine(MessageIterator* msgs) {
int mindist = INF;
for (; !msgs->Done(); msgs->Next())
mindist = min(mindist, msgs->Value());
Output(“combined_source”, mindist);

Others in Pregel
• Aggregator
Used for global communication, global data and monitoring
Topology Mutations
Fault Tolerance
Checking point, failure detection, recovery . . .

Used to determine the importance of a document based on the number of references to it and the importance of the source documents themselves.
A = A given page
T1 …. Tn = Pages that point to page A (citations)
d = Damping factor between 0 and 1 (usually kept as 0.85)
C(T) = number of links going out of T
PR(A) = the PageRank of page A (initialized as 1/N for each page)
𝑃𝑅 𝐴 =1−𝑑+𝑑(𝑃𝑅 𝑇! +𝑃𝑅 𝑇” +⋯+𝑃𝑅 𝑇# ) 𝑁 𝐶 𝑇! 𝐶 𝑇” 𝐶 𝑇#

Pagerank in 0: Value of each vertex is 1/NumVertices()
virtual void Compute(MessageIterator* msgs) { if (superstep() >= 1) {
double sum = 0;
for (; !msgs->done(); msgs->Next())
sum += msgs->Value();
*MutableValue() = 0.15/NumVertices() + 0.85 * sum;
if (supersteps() < 30) { const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n); VoteToHalt(); Combiner for Pagerank? virtual void Combine(MessageIterator* msgs) { Core Decomposition For each unvisited vertex u with the lowest degree in G assign core(u) as degree(u); mark u as visited; decrease the degree of its unvisited neighbors with higher degree than u by 1; The peeling algorithm for core decomposition is hard to parallelize. Core Decomposition: Local-view (Converging) Locality Theorem: Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Core(V) = 3 Core(V) = 4 4 neighbors with core number at least 3 √ Only 2 neighbors with core number at least 4 Montresor, Alberto, Pellegrini, and . "Distributed k-core decomposition." IEEE Transactions on parallel and distribuated systems 24.2 (2013): 288-300. What is the core number of v? Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 4 6 3 5 3 2 1 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 4 6 3 5 3 2 1 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 34 6 3 5 3 2 1 52 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 34 63 3 5 3 2 1 53 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulse ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 5 3 2 1 54 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 53 3 2 1 55 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 53 23 2 1 56 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 53 23 2 1 57 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 53 23 2 1 58 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 1 isContinue: True ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 63 3 53 23 2 1 59 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 21 isContinue: TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 53 23 2 1 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 21 isContinue: TFarulsee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 3 253 23 2 1 61 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 231 isContinue: TFraulesee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 63 32 253 23 2 1 62 22T2 Core Decomposition: Local-view (Converging) Given a vertex and its core number k: There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1. Initialize the core number by degree Iteration 2314 isContinue: TFaraullsesee ID 0 1 2 3 4 5 6 7 8 Core 3 3 43 36 32 253 23 2 1 Core Decomposition Pregel virtual void Compute(MessageIterator* msgs) { oldValue = GetValue() *MutableValue() = compute core number from the neighbor’s core numbers in msgs based on the locality theorem if (GetValue() != oldValue){ SendMessageToAllNeighbors(GetValue()); VotetoHalt(); Connected Component Detection in Pregel A basic distributed algorithm to compute connected components: virtual void Compute(MessageIterator* msgs) { int minID = GetValue(); for (; !msgs->done(); msgs->Next()) minID = min(minID,msg->Value());
if (minID < GetValue()){ *MutableValue() = minID; SendMessageToAllNeighbors(minID); VoteToHalt(); Graph Database Drawbacks of Relational Databases • Schema are inflexible • Missing values • Business Requirements change quickly • Inefficient • Consider the E-Commerce example • What items did a customer buy? • Which customers bought this product? • A basic query for recommendation: Which customers buying this product also bought that product? • NoSQL database faces the similar issue Drawbacks of Relational Databases (Cont.) BFS in RDBMS need many interaction operations~ id | node1 | node2 ------------------ 0|0|1 1|2|4 2|1|3 3|1|2 4|5|2 5|3|4 ... What is a Graph Database? § A database consists of entities and their relationships § An entity is modelled as a node (with arbitrary number of attributes). § A relationship is modelled as an edge (possibly with labels or weights) § No background of graph theory is needed to query a graph database § More intuitive to understand than an relational database management systems (RDBMS) Why we care about Graph Database~ • Performance q Traditional Joins are inefficient q Billion-scale data are common, e.g., Facebook social network , Google web graph • Flexibility q Real-world entities may not have a fixed schema. It is not feasible to design 1000 attributes for a table. q Relationships among entities can be arbitrary. It is not feasible to use 1000 tables to model 1000 types of relationships. q Business requirements changes over time q Today’s development practices are agile, test-driven How a graph database works • Graph Storage q Usually use the native graph structure, e.g., adjacency lists. q Efficient and easy to develop graph algorithms. • Graph Processing Engine q Algorithms and queries supported based on the graph storage q Native graph processing is more efficient Graph DB VS RDBMS • An Example: q Data: a social network of 1,000,000 people each with approximately 50 friends q Query: to find friends-of-friends connections to a depth of five degrees. • Efficiency Comparison: Data Modelling: RDBMS vs Graph DB § An Example: In this data center management domain, several data centers support a few applications using infrastructure like virtual machines and load balancers. § The “whiteboard” form is shown on the right Data Modelling in RDBMS § Data Model in RDBMS q Aim: From initial whiteboard to relations q Step 1: design schema for each table (consider data redundancy, efficiency, ...) q Step 2: design primary key (PK) and foreign key (FK) q Step 3: insert data for each table following the schema q Step 4: query the RDBMS using SQL q Needs careful modelling Data Modelling in Graph DB § Data Model in Graph DB q Aim: From initial whiteboard to Graph DB q Step 1: insert data for entities and relationships q Step 2: query the Graph DB q Looks just as what they are on the whiteboard q No schema but highly expressive. q New types of data can be easily integrated q We need a query language Cypher: the graph query language in Neo4j An example of Cypher: Find Sushi restaurants in that my friend Philip like Representing Nodes in Cypher § () //anonymous node (no label or variable) can refer to any node in the database § (p:Person) //using variable p and label Person § (:Technology) //no variable, label Technology § (work:Company) //using 程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com