In [1]:
from pyspark import SparkContext
# sc = SparkContext()
sc.addPyFile(“./graphframes-0.5.0-spark2.1-s_2.11.jar”)
In [2]:
from graphframes import *
from pyspark.sql.functions import *
In [45]:
#Run every time before starting: pyspark –packages graphframes:graphframes:0.5.0-spark2.1-s_2.11
In [3]:
# Vertics DataFrame
v = spark.createDataFrame([
(“a”, “Alice”, 34),
(“b”, “Bob”, 36),
(“c”, “Charlie”, 37),
(“d”, “David”, 29),
(“e”, “Esther”, 32),
(“f”, “Fanny”, 38),
(“g”, “Gabby”, 60)
], [“id”, “name”, “age”])
# Edges DataFrame
e = spark.createDataFrame([
(“a”, “b”, “friend”),
(“b”, “c”, “follow”),
(“c”, “b”, “follow”),
(“f”, “c”, “follow”),
(“e”, “f”, “follow”),
(“e”, “d”, “friend”),
(“d”, “a”, “friend”),
(“a”, “e”, “friend”),
(“g”, “e”, “follow”)
], [“src”, “dst”, “relationship”])
# Create a GraphFrame
g = GraphFrame(v, e)
g.vertices.show()
g.edges.show()
+—+——-+—+
| id| name|age|
+—+——-+—+
| a| Alice| 34|
| b| Bob| 36|
| c|Charlie| 37|
| d| David| 29|
| e| Esther| 32|
| f| Fanny| 38|
| g| Gabby| 60|
+—+——-+—+
+—+—+————+
|src|dst|relationship|
+—+—+————+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
| g| e| follow|
+—+—+————+
In [4]:
# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them
g.edges.filter(“src = ‘a'”).show()
+—+—+————+
|src|dst|relationship|
+—+—+————+
| a| b| friend|
| a| e| friend|
+—+—+————+
In [5]:
g.edges.filter(“src = ‘a'”).count()
Out[5]:
2
In [6]:
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter(“relationship = ‘follow’ and dst = ‘c'”).count())
2
In [7]:
# A GraphFrame has additional attributes
g.outDegrees.show()
+—+———+
| id|outDegree|
+—+———+
| g| 1|
| f| 1|
| e| 2|
| d| 1|
| c| 1|
| b| 1|
| a| 2|
+—+———+
In [8]:
g.inDegrees.show()
+—+——–+
| id|inDegree|
+—+——–+
| f| 1|
| e| 2|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+—+——–+
In [9]:
g.inDegrees.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[dst#7], functions=[count(1)])
+- Exchange hashpartitioning(dst#7, 200)
+- *(1) HashAggregate(keys=[dst#7], functions=[partial_count(1)])
+- *(1) Project [dst#7]
+- Scan ExistingRDD[src#6,dst#7,relationship#8]
In [10]:
myInDegrees = g.edges.groupBy(‘dst’).count()\
.withColumnRenamed(‘dst’, ‘id’).withColumnRenamed(‘count’, ‘inDegree’)
myInDegrees.show()
+—+——–+
| id|inDegree|
+—+——–+
| f| 1|
| e| 2|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+—+——–+
In [11]:
myInDegrees.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[dst#7], functions=[count(1)])
+- Exchange hashpartitioning(dst#7, 200)
+- *(1) HashAggregate(keys=[dst#7], functions=[partial_count(1)])
+- *(1) Project [dst#7]
+- Scan ExistingRDD[src#6,dst#7,relationship#8]
In [12]:
print(g.inDegrees.storageLevel)
Serialized 1x Replicated
In [13]:
g.inDegrees.cache()
Out[13]:
DataFrame[id: string, inDegree: int]
In [14]:
print(g.inDegrees.storageLevel)
Disk Memory Deserialized 1x Replicated
In [15]:
print(g.vertices.storageLevel)
Serialized 1x Replicated
In [16]:
g.cache()
Out[16]:
GraphFrame(v:[id: string, name: string … 1 more field], e:[src: string, dst: string … 1 more field])
In [17]:
print(g.vertices.storageLevel)
print(g.edges.storageLevel)
Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated
In [18]:
# A triplet view of the graph
g.triplets.show()
+—————-+————–+—————-+
| src| edge| dst|
+—————-+————–+—————-+
| [e, Esther, 32]|[e, f, follow]| [f, Fanny, 38]|
| [g, Gabby, 60]|[g, e, follow]| [e, Esther, 32]|
| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [e, Esther, 32]|[e, d, friend]| [d, David, 29]|
| [f, Fanny, 38]|[f, c, follow]|[c, Charlie, 37]|
| [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[c, Charlie, 37]|[c, b, follow]| [b, Bob, 36]|
| [a, Alice, 34]|[a, b, friend]| [b, Bob, 36]|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|
+—————-+————–+—————-+
Motif Finding¶
In [19]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find(“(a)-[]->(b); (b)-[]->(a)”).filter(‘a.id < b.id')
motifs.show()
+------------+----------------+
| a| b|
+------------+----------------+
|[b, Bob, 36]|[c, Charlie, 37]|
+------------+----------------+
In [20]:
# Find triangles
triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)”)
triangles = triangles.filter(“a.id < b.id AND a.id < c.id")
triangles.show()
+--------------+---------------+--------------+
| a| b| c|
+--------------+---------------+--------------+
|[a, Alice, 34]|[e, Esther, 32]|[d, David, 29]|
+--------------+---------------+--------------+
In [21]:
triangles.explain()
== Physical Plan ==
*(6) Project [a#362, b#364, c#389]
+- *(6) BroadcastHashJoin [c#389.id, a#362.id], [__tmp-6526019406657860729#419.src, __tmp-6526019406657860729#419.dst], Inner, BuildRight
:- *(6) Project [a#362, b#364, c#389]
: +- *(6) BroadcastHashJoin [__tmp-430217833014886237#387.dst], [c#389.id], Inner, BuildRight, (a#362.id < c#389.id)
: :- *(6) BroadcastHashJoin [b#364.id], [__tmp-430217833014886237#387.src], Inner, BuildRight
: : :- *(6) Project [a#362, b#364]
: : : +- *(6) BroadcastHashJoin [__tmp-1043886091038848698#360.dst], [b#364.id], Inner, BuildRight, (a#362.id < b#364.id)
: : : :- *(6) BroadcastHashJoin [__tmp-1043886091038848698#360.src], [a#362.id], Inner, BuildRight
: : : : :- *(6) Project [named_struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp-1043886091038848698#360]
: : : : : +- InMemoryTableScan [dst#7, relationship#8, src#6]
: : : : : +- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : : : +- Scan ExistingRDD[src#6,dst#7,relationship#8]
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: : : : +- *(1) Project [named_struct(id, id#0, name, name#1, age, age#2L) AS a#362]
: : : : +- InMemoryTableScan [age#2L, id#0, name#1]
: : : : +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : : +- Scan ExistingRDD[id#0,name#1,age#2L]
: : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: : : +- *(2) Project [named_struct(id, id#0, name, name#1, age, age#2L) AS b#364]
: : : +- InMemoryTableScan [age#2L, id#0, name#1]
: : : +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- Scan ExistingRDD[id#0,name#1,age#2L]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: : +- *(3) Project [named_struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp-430217833014886237#387]
: : +- InMemoryTableScan [dst#7, relationship#8, src#6]
: : +- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- Scan ExistingRDD[src#6,dst#7,relationship#8]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: +- *(4) Project [named_struct(id, id#0, name, name#1, age, age#2L) AS c#389]
: +- InMemoryTableScan [age#2L, id#0, name#1]
: +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id#0,name#1,age#2L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
+- *(5) Project [named_struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp-6526019406657860729#419]
+- InMemoryTableScan [dst#7, relationship#8, src#6]
+- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[src#6,dst#7,relationship#8]
In [22]:
# Negation
oneway = g.find(“(a)-[]->(b); !(b)-[]->(a)”)
oneway.show()
+—————+—————-+
| a| b|
+—————+—————-+
| [a, Alice, 34]| [e, Esther, 32]|
|[e, Esther, 32]| [d, David, 29]|
| [a, Alice, 34]| [b, Bob, 36]|
| [g, Gabby, 60]| [e, Esther, 32]|
|[e, Esther, 32]| [f, Fanny, 38]|
| [f, Fanny, 38]|[c, Charlie, 37]|
| [d, David, 29]| [a, Alice, 34]|
+—————+—————-+
In [23]:
# Find vertices without incoming edges. This is wrong:
g.find(‘(a)-[]->(b); (b)-[]->(c)’).filter(‘a.name = “Alice”‘).select(‘c.name’)
# Because negation is implemented as a subtraction
Out[23]:
DataFrame[name: string]
In [24]:
# Still doesn’t work:
g.vertices.join(g.inDegrees, ‘id’).filter(‘inDegree=0’).show()
+—+—-+—+——–+
| id|name|age|inDegree|
+—+—-+—+——–+
+—+—-+—+——–+
In [25]:
# Why? Because inDegree is computed by a groupBy followed by a count
g.inDegrees.show()
+—+——–+
| id|inDegree|
+—+——–+
| f| 1|
| e| 2|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+—+——–+
In [26]:
# Correct way:
g.vertices.join(g.inDegrees, ‘id’, ‘left_outer’).filter(‘inDegree is null’).show()
+—+—–+—+——–+
| id| name|age|inDegree|
+—+—–+—+——–+
| g|Gabby| 60| null|
+—+—–+—+——–+
In [27]:
# Or use subtract:
g.vertices.select(‘id’).subtract(g.inDegrees.select(‘id’)).join(g.vertices,’id’).show()
+—+—–+—+
| id| name|age|
+—+—–+—+
| g|Gabby| 60|
+—+—–+—+
In [28]:
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?
g.find(“(a)-[]->(b); (b)-[]->(a)”).filter(“b.age > 36”).show()
+————+—————-+
| a| b|
+————+—————-+
|[b, Bob, 36]|[c, Charlie, 37]|
+————+—————-+
In [29]:
g.find(“(a)-[]->(b); (b)-[]->(a)”).filter(“b.age > 36”).explain()
== Physical Plan ==
*(4) Project [a#1271, b#1273]
+- *(4) BroadcastHashJoin [b#1273.id, a#1271.id], [__tmp-548034949987229701#1296.src, __tmp-548034949987229701#1296.dst], Inner, BuildRight
:- *(4) Project [a#1271, b#1273]
: +- *(4) BroadcastHashJoin [__tmp2506060614762666678#1269.dst], [b#1273.id], Inner, BuildRight
: :- *(4) BroadcastHashJoin [__tmp2506060614762666678#1269.src], [a#1271.id], Inner, BuildRight
: : :- *(4) Project [named_struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp2506060614762666678#1269]
: : : +- InMemoryTableScan [dst#7, relationship#8, src#6]
: : : +- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- Scan ExistingRDD[src#6,dst#7,relationship#8]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: : +- *(1) Project [named_struct(id, id#0, name, name#1, age, age#2L) AS a#1271]
: : +- InMemoryTableScan [age#2L, id#0, name#1]
: : +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- Scan ExistingRDD[id#0,name#1,age#2L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
: +- *(2) Project [named_struct(id, id#0, name, name#1, age, age#2L) AS b#1273]
: +- *(2) Filter (isnotnull(age#2L) && (age#2L > 36))
: +- InMemoryTableScan [age#2L, id#0, name#1], [isnotnull(age#2L), (age#2L > 36)]
: +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id#0,name#1,age#2L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct
+- *(3) Project [named_struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp-548034949987229701#1296]
+- InMemoryTableScan [dst#7, relationship#8, src#6]
+- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[src#6,dst#7,relationship#8]
In [30]:
# Find chains of 4 vertices such that at least 2 of the 3 edges are “friend” relationships.
# The when function is similar to the CASE WHEN in SQL
chain4 = g.find(“(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)”).where(‘a!=d’)
friendTo1 = lambda e: when(e[‘relationship’] == ‘friend’, 1).otherwise(0)
chain4.select(‘*’,friendTo1(chain4[‘e1’]).alias(‘f1’), \
friendTo1(chain4[‘e2’]).alias(‘f2’), \
friendTo1(chain4[‘e3’]).alias(‘f3’)) \
.where(‘f1 + f2 + f3 >= 2’).select(‘a’, ‘b’, ‘c’, ‘d’).show()
+—————+—————+—————+—————-+
| a| b| c| d|
+—————+—————+—————+—————-+
|[e, Esther, 32]| [d, David, 29]| [a, Alice, 34]| [b, Bob, 36]|
| [d, David, 29]| [a, Alice, 34]|[e, Esther, 32]| [f, Fanny, 38]|
| [d, David, 29]| [a, Alice, 34]| [b, Bob, 36]|[c, Charlie, 37]|
| [g, Gabby, 60]|[e, Esther, 32]| [d, David, 29]| [a, Alice, 34]|
+—————+—————+—————+—————-+
Subgraphs¶
In [31]:
# Select subgraph of users older than 30, and edges of type “friend”
v2 = g.vertices.filter(“age > 30”)
e2 = g.edges.filter(“relationship = ‘friend'”)
g2 = GraphFrame(v2, e2)
g2.vertices.show()
g2.edges.show()
# GraphFrames does not check if a vertex is isolated (which is OK)
# or if an edge connects two existing vertices (which could cause bugs)
+—+——-+—+
| id| name|age|
+—+——-+—+
| a| Alice| 34|
| b| Bob| 36|
| c|Charlie| 37|
| e| Esther| 32|
| f| Fanny| 38|
| g| Gabby| 60|
+—+——-+—+
+—+—+————+
|src|dst|relationship|
+—+—+————+
| a| b| friend|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+—+—+————+
In [32]:
g2.inDegrees.show()
+—+——–+
| id|inDegree|
+—+——–+
| e| 1|
| d| 1|
| b| 1|
| a| 1|
+—+——–+
In [33]:
# Only keeping edges that connect existing vertices
e3 = e2.join(v2, e2[‘src’] == v2[‘id’], ‘left_semi’) \
.join(v2, e2[‘dst’] == v2[‘id’], ‘left_semi’)
g3 = GraphFrame(v2, e3)
In [34]:
g3.edges.show()
+—+—+————+
|src|dst|relationship|
+—+—+————+
| a| b| friend|
| a| e| friend|
+—+—+————+
In [35]:
# Select subgraph based on edges of type “follow”
# pointing from an older user to an youner user.
e4 = g.find(“(a)-[e]->(b)”)\
.filter(“e.relationship = ‘follow'”)\
.filter(“a.age > b.age”) \
.select(“e.*”)
e4.show()
# Only keeping vertices that appear in the edges
v4 = g.vertices.join(e4, g.vertices[‘id’] == e4[‘src’], ‘leftsemi’) \
.union(g.vertices.join(e4, g.vertices[‘id’] == e4[‘dst’], ‘leftsemi’)) \
.distinct()
# Construct the subgraph
g4 = GraphFrame(v4, e4)
g4.vertices.show()
+—+—+————+
|src|dst|relationship|
+—+—+————+
| c| b| follow|
| f| c| follow|
| g| e| follow|
+—+—+————+
+—+——-+—+
| id| name|age|
+—+——-+—+
| g| Gabby| 60|
| e| Esther| 32|
| b| Bob| 36|
| f| Fanny| 38|
| c|Charlie| 37|
+—+——-+—+
In [36]:
g4.triplets.show()
+—————-+————–+—————-+
| src| edge| dst|
+—————-+————–+—————-+
|[c, Charlie, 37]|[c, b, follow]| [b, Bob, 36]|
| [f, Fanny, 38]|[f, c, follow]|[c, Charlie, 37]|
| [g, Gabby, 60]|[g, e, follow]| [e, Esther, 32]|
+—————-+————–+—————-+
BFS¶
In [37]:
# Starting vertex is ‘a’
layers = [g.vertices.select(‘id’).where(“id = ‘a'”)]
visited = layers[0]
while layers[-1].count() > 0:
# From the current layer, get all the one-hop neighbors
d1 = layers[-1].join(g.edges, layers[-1][‘id’] == g.edges[‘src’])
# Rename the column as ‘id’, and remove visited verices and duplicates
d2 = d1.select(d1[‘dst’].alias(‘id’)) \
.subtract(visited).distinct()
layers += [d2]
visited = visited.union(layers[-1])
In [38]:
layers[0].show()
+—+
| id|
+—+
| a|
+—+
In [39]:
layers[1].show()
+—+
| id|
+—+
| e|
| b|
+—+
In [40]:
layers[2].show()
+—+
| id|
+—+
| f|
| d|
| c|
+—+
In [41]:
layers[3].show()
+—+
| id|
+—+
+—+
In [42]:
# GraphFrames provides own BFS:
paths = g.bfs(“id = ‘a'”, “age > 36”)
paths.show()
+————–+————–+—————+————–+—————-+
| from| e0| v1| e1| to|
+————–+————–+—————+————–+—————-+
|[a, Alice, 34]|[a, b, friend]| [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 38]|
+————–+————–+—————+————–+—————-+
List Ranking¶
In [43]:
# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)]
e = spark.createDataFrame(data, [‘src’, ‘dst’])
v = e.select(col(‘src’).alias(‘id’), when(e.dst == -1, 0).otherwise(1).alias(‘d’))
v1 = spark.createDataFrame([(-1, 0)], [‘id’, ‘d’])
v = v.union(v1)
v.show()
e.show()
+—+—+
| id| d|
+—+—+
| 0| 1|
| 1| 1|
| 3| 1|
| 4| 1|
| 5| 0|
| 6| 1|
| -1| 0|
+—+—+
+—+—+
|src|dst|
+—+—+
| 0| 5|
| 1| 0|
| 3| 4|
| 4| 6|
| 5| -1|
| 6| 1|
+—+—+
In [44]:
while e.filter(‘dst != -1’).count() > 0:
g = GraphFrame(v, e)
g.cache()
v = g.triplets.select(col(‘src.id’).alias(‘id’),
(col(‘src.d’) + col(‘dst.d’)).alias(‘d’)) \
.union(v1)
e = g.find(‘(a)-[]->(b); (b)-[]->(c)’) \
.select(col(‘a.id’).alias(‘src’), col(‘c.id’).alias(‘dst’)) \
.union(e.filter(‘dst = -1’))
v.show()
+—+—+
| id| d|
+—+—+
| 0| 1|
| 1| 2|
| 3| 5|
| 4| 4|
| 5| 0|
| 6| 3|
| -1| 0|
+—+—+
In [ ]: