Big Graph Analysis
COMP9312_22T2
– Distributed Graph Algorithms
Copyright By PowCoder代写 加微信 powcoder
– Graph Database
– Other Graph Problems
Distributed Graph Algorithms
Distributed Processing is Non-Trivial
How to assign tasks to different workers in an efficient way? What happens if tasks fail?
How do workers exchange results?
How to synchronize distributed tasks allocated to different workers?
Big Data Tools
Apache Hadoop Ecosystem
We focus on the distributed computing model and algorithms. Explore more in COMP9311~
MapReduce for General Big Data Processing
Origin from Google
[OSDI’04] MapReduce: Simplified Data Processing on Large Clusters
Programming model for parallel data processing
For large-scale data processing
• Exploitslargesetofcommoditycomputers • Executesprocessinadistributedmanner
• Offershighavailability
Hadoop MapReduce is an implementation of MapReduce • MapReduceisacomputingparadigm(Google)
• HadoopMapReduceisanopen-sourcesoftware
Typical Big Data Problem
Iterate over a large number of records Extract something of interest from each Shuffle and sort intermediate results Aggregate intermediate results Generate final output
Key idea: provide a functional abstraction for these two operations
Idea of MapReduce
Inspired by the map and reduce functions in functional programming
We can view map as a transformation over a dataset
• Thistransformationisspecifiedbythefunctionf
• Eachfunctionalapplicationhappensinisolation
• Theapplicationofftoeachelementofadatasetcanbeparallelizedinastraightforwardmanner
We can view reduce as an aggregation operation
• Theaggregationisdefinedbythefunctiong
• Datalocality:elementsinthelistmustbe“broughttogether”
• Ifwecangroupelementsofthelist,alsothereducephasecanproceedinparallel
The framework coordinates the map and reduce phases: • Groupingintermediateresultshappensinparallel
MapReduce Data Flow in Hadoop
1. Mappers read from HDFS
2. Map output is partitioned by key and sent to Reducers
3. Reducers sort input by key
4. Reduce output is written to HDFS
Intermediate results are stored on local FS of Map and Reduce workers
MapReduce Example – WordCount
BFS (SSSP for Unweighted Graphs)
Intuition for parallel BFS
We consider unweighted graphs (BFS) below for simplicity.
Solution to the problem can be defined inductively Here’s the intuition:
• DISTANCETO(s)=0
• Forallneighborspofs,
DISTANCETO(p) = 1
• ForeverynodeuwhichistheneighborofsomeothersetofnodesM,DISTANCETO(u)=1+
min(DISTANCETO(m), m Î M)
d1 m1 … m2
Visualizing Parallel BFS
From Intuition to Algorithm
Data representation:
• Key: node n
• Value: d (distance from start), adjacency list (list of nodes reachable from n) • Initialization:forallnodesexceptforstartnode,d=¥
• “mÎadjacencylist:emit(m,d+1)
Sort/Shuffle
• Groups distances by reachable nodes
• Selects minimum distance path for each reachable node
• Additional bookkeeping needed to keep track of actual path
BFS in MapReduce
class Mapper
method Map(nid n, node N)
d ← N.Distance
Emit(nid n,N.AdjacencyList)
for all nodeid m ∈ N.AdjacencyList do
Emit(nid m, d+1)
//Pass along graph structure
//Emit distances to reachable nodes
class Reducer
method Reduce(nid m, [d1, d2, . . .])
dmin←∞ M←∅
for all d ∈ counts [d1, d2, . . .] do if IsNode(d) then
M.AdjacencyList ← d else if d < dmin then
dmin ← d M.Distance ← dmin
Emit(nid m, node M)
//Recover graph structure //Look for shorter distance
//Update shortest distance
Multiple Iterations Needed
The input of Mapper is the output of Reducer in the previous iteration. Multiple iterations are needed to explore entire graph.
Preserving graph structure:
• Problem: Where did the adjacency list go?
• Solution: mapper emits (n, adjacency list) as well
MapReduce for Graphs?
Graph Computing Paradigm:
Graph algorithms are expressed in multiple MR iterations.
Data must be reloaded and reprocessed at each iteration.
Need an extra MR Job for each iteration to detect termination condition.
Workload unbalance by various vertex degree
For each vertex:
do something;
neighbors;
Iterative MapReduce
Data Data Data Data Data Data Data
Data Data Data Data Data Data Data
Iterations
Data Data Data Data Data
Data Data Data Data Data Data Data
Only a subset of data needs computation:
Iterative MapReduce
Data Data Data Data
Data Data Data Data
Iterations
Data Data Data Data Data Data Data
Data Data Data Data Data Data Data
! System is not optimized for iteration: 20
Disk Penalty
Startup Penalty Disk Penalty
Startup Penalty Disk Penalty
StartupPenalty
Pregel for Distributed Graph Processing
Developed by Google
Computing in Bulk Synchronous Parallel (BSP) model Computes in vertex-centric fashion
Scalable and fault tolerant
Pregel for Distributed Graph Processing
Think Like a Vertex Model
Bulk Synchronous Parallel (BSP)
- Computations are consist of a sequence of iterations, called superstep.
- During superstep, framework calls user-defined computation function on every vertex. - Computation function specifies behavior at a single vertex V in a superstep.
- Supersteps end with barrier synchronization.
- All communications are from superstep S to superstep S+1.
Bulk Synchronous Parallel (BSP)
Terminates when all vertices are inactive or no messages to be delivered
Vertex in Pregel
- Can mutate local value and value on outgoing edges.
- Can send arbitrary number of messages to any other vertices.
- Receive messages from previous superstep.
- Can mutate local graph topology.
- All active vertices participate in the computation in a superstep.
• Consists of a message value and destination vertex.
• Typically sent along outgoing edges.
• Can be sent to any vertex whose identifier is known.
• Are only available to receiver at the beginning of superstep.
• Guaranteed to be delivered.
• Guaranteed not to be duplicated.
• Can be out of order.
Vertex in Pregel
Initially, every vertices are active.
- A vertex can deactivate itself by vote to halt.
- Deactivated vertices don't participate in computation. - Vertices are reactivated upon receiving message.
The C++ API of Pregel
Override this!
in msgs Modify vertex value
Implementation for SSSP
Single-Source Shortest Path (SSSP) in Pregel
¥1¥ 023946
0 is the source vertex
Inactive Vertex
Active Vertex x Edge weight
SSSP in Pregel
10 0¥23946
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
10 1 ¥ 023946
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
0 14 2 3 9 4 6
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
Inactive Vertex
Active Vertex x Edgeweight
SSSP in Pregel
Inactive Vertex
Active Vertex x Edgeweight
Combiner in Pregel
• Sending messages incurs overhead.
• System calls Combine() for several messages intended for a vertex
into a single message containing the combined message.
• No guarantees which messages will be combined or the order of combination.
• Should be enabled for commutative and associative messages.
• Not enabled by default.
Combiner in Pregel (Cont.)
Reduce message traffic and disk space
Combiner in SSSP
class MinIntCombiner : public Combiner
virtual void Combine(MessageIterator* msgs) {
int mindist = INF;
for (; !msgs->Done(); msgs->Next())
mindist = min(mindist, msgs->Value());
Output(“combined_source”, mindist);
Others in Pregel
• Aggregator
Used for global communication, global data and monitoring
Topology Mutations
Fault Tolerance
Checking point, failure detection, recovery . . .
Used to determine the importance of a document based on the number of references to it and the importance of the source documents themselves.
A = A given page
T1 …. Tn = Pages that point to page A (citations)
d = Damping factor between 0 and 1 (usually kept as 0.85)
C(T) = number of links going out of T
PR(A) = the PageRank of page A (initialized as 1/N for each page)
𝑃𝑅 𝐴 =1−𝑑+𝑑(𝑃𝑅 𝑇! +𝑃𝑅 𝑇” +⋯+𝑃𝑅 𝑇# ) 𝑁 𝐶 𝑇! 𝐶 𝑇” 𝐶 𝑇#
Pagerank in 0: Value of each vertex is 1/NumVertices()
virtual void Compute(MessageIterator* msgs) { if (superstep() >= 1) {
double sum = 0;
for (; !msgs->done(); msgs->Next())
sum += msgs->Value();
*MutableValue() = 0.15/NumVertices() + 0.85 * sum;
if (supersteps() < 30) {
const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n);
VoteToHalt();
Combiner for Pagerank?
virtual void Combine(MessageIterator* msgs) {
Core Decomposition
For each unvisited vertex u with the
lowest degree in G
assign core(u) as degree(u);
mark u as visited;
decrease the degree of its
unvisited neighbors with higher
degree than u by 1;
The peeling algorithm for core decomposition is hard to parallelize.
Core Decomposition: Local-view (Converging)
Locality Theorem:
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Core(V) = 3
Core(V) = 4
4 neighbors with core number at least 3 √ Only 2 neighbors with core number at least 4
Montresor, Alberto, Pellegrini, and . "Distributed k-core decomposition." IEEE Transactions on parallel and distribuated systems 24.2 (2013): 288-300.
What is the core number of v?
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 4 6 3 5 3 2 1
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 4 6 3 5 3 2 1
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 34 6 3 5 3 2 1 52 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 34 63 3 5 3 2 1 53 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulse ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 5 3 2 1 54 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue:TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 53 3 2 1 55 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 53 23 2 1 56 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 53 23 2 1 57 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue: TFTarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 53 23 2 1 58 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 1 isContinue: True ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 63 3 53 23 2 1 59 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 21 isContinue: TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 53 23 2 1
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 21 isContinue: TFarulsee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 3 253 23 2 1 61 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 231 isContinue: TFraulesee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 63 32 253 23 2 1 62 22T2
Core Decomposition: Local-view (Converging)
Given a vertex and its core number k:
There exists at least k neighbors with core number k; There does not exist k+1 neighbors with core number k+1.
Initialize the core number by degree Iteration 2314 isContinue: TFaraullsesee ID 0 1 2 3 4 5 6 7 8
Core 3 3 43 36 32 253 23 2 1
Core Decomposition Pregel
virtual void Compute(MessageIterator* msgs) {
oldValue = GetValue()
*MutableValue() = compute core number from the neighbor’s core
numbers in msgs based on the locality theorem
if (GetValue() != oldValue){
SendMessageToAllNeighbors(GetValue());
VotetoHalt();
Connected Component Detection in Pregel
A basic distributed algorithm to compute connected components:
virtual void Compute(MessageIterator* msgs) { int minID = GetValue();
for (; !msgs->done(); msgs->Next()) minID = min(minID,msg->Value());
if (minID < GetValue()){
*MutableValue() = minID; SendMessageToAllNeighbors(minID);
VoteToHalt();
Graph Database
Drawbacks of Relational Databases
• Schema are inflexible
• Missing values
• Business Requirements change quickly • Inefficient
• Consider the E-Commerce example
• What items did a customer buy?
• Which customers bought this product?
• A basic query for recommendation: Which
customers buying this product also bought
that product?
• NoSQL database faces the similar issue
Drawbacks of Relational Databases (Cont.)
BFS in RDBMS need many interaction operations~
id | node1 | node2 ------------------ 0|0|1 1|2|4 2|1|3 3|1|2 4|5|2 5|3|4 ...
What is a Graph Database?
§ A database consists of entities and their relationships
§ An entity is modelled as a node (with arbitrary number of attributes).
§ A relationship is modelled as an edge (possibly with labels or weights)
§ No background of graph theory is needed to query a graph database
§ More intuitive to understand than an relational database management systems (RDBMS)
Why we care about Graph Database~
• Performance
q Traditional Joins are inefficient
q Billion-scale data are common, e.g., Facebook social network , Google web graph • Flexibility
q Real-world entities may not have a fixed schema. It is not feasible to design 1000 attributes for a table.
q Relationships among entities can be arbitrary. It is not feasible to use 1000 tables to model 1000 types of
relationships.
q Business requirements changes over time
q Today’s development practices are agile, test-driven
How a graph database works
• Graph Storage
q Usually use the native graph structure, e.g., adjacency lists.
q Efficient and easy to develop graph algorithms.
• Graph Processing Engine
q Algorithms and queries supported based on the graph storage q Native graph processing is more efficient
Graph DB VS RDBMS
• An Example:
q Data: a social network of 1,000,000 people each with approximately 50 friends
q Query: to find friends-of-friends connections to a depth of five degrees.
• Efficiency Comparison:
Data Modelling: RDBMS vs Graph DB
§ An Example: In this data center management domain, several data centers support a few applications using infrastructure like virtual machines and load balancers.
§ The “whiteboard” form is shown on the right
Data Modelling in RDBMS
§ Data Model in RDBMS
q Aim: From initial whiteboard to relations
q Step 1: design schema for each table (consider data redundancy, efficiency, ...)
q Step 2: design primary key (PK) and foreign key (FK)
q Step 3: insert data for each table following the schema
q Step 4: query the RDBMS using SQL q Needs careful modelling
Data Modelling in Graph DB
§ Data Model in Graph DB
q Aim: From initial whiteboard to Graph DB
q Step 1: insert data for entities and relationships
q Step 2: query the Graph DB
q Looks just as what they are on the whiteboard q No schema but highly expressive.
q New types of data can be easily integrated
q We need a query language
Cypher: the graph query language in Neo4j
An example of Cypher:
Find Sushi restaurants in that my friend Philip like
Representing Nodes in Cypher
§ () //anonymous node (no label or variable) can refer to any node in the database
§ (p:Person) //using variable p and label Person
§ (:Technology) //no variable, label Technology
§ (work:Company) //using
程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com