In [1]:
#import pyspark
#from pyspark import SparkContext
#sc = SparkContext(‘local’)
lines = sc.textFile(‘/Users/kenny/Desktop/Spark/data/adj_noun_pairs.txt’, 8)
In [2]:
lines.count()
Out[2]:
3162692
In [3]:
lines.getNumPartitions()
Out[3]:
8
In [4]:
lines.take(5)
Out[4]:
[‘early radical’,
‘french revolution’,
‘pejorative way’,
‘violent means’,
‘positive label’]
In [5]:
# Converting lines into word pairs.
# Data is dirty: some lines have more than 2 words, so filter them out.
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()
Out[5]:
PythonRDD[4] at RDD at PythonRDD.scala:53
In [6]:
pairs.take(5)
Out[6]:
[(‘early’, ‘radical’),
(‘french’, ‘revolution’),
(‘pejorative’, ‘way’),
(‘violent’, ‘means’),
(‘positive’, ‘label’)]
In [7]:
N = pairs.count()
In [8]:
N
Out[8]:
3162674
In [9]:
# Compute the frequency of each pair.
# Ignore pairs that not frequent enough
pair_freqs = pairs.map(lambda p: (p,1)).reduceByKey(lambda f1, f2: f1 + f2) \
.filter(lambda pf: pf[1] >= 100)
In [10]:
pair_freqs.take(5)
Out[10]:
[((’16th’, ‘century’), 950),
((‘civil’, ‘war’), 2236),
((‘social’, ‘class’), 155),
((‘sixteenth’, ‘century’), 137),
((‘late’, ‘1970’), 444)]
In [11]:
# Computing the frequencies of the adjectives and the nouns
a_freqs = pairs.map(lambda p: (p[0],1)).reduceByKey(lambda x,y: x+y)
n_freqs = pairs.map(lambda p: (p[1],1)).reduceByKey(lambda x,y: x+y)
In [12]:
a_freqs.take(5)
Out[12]:
[(‘social’, 7413),
(‘free’, 6635),
(‘stoic’, 72),
(‘good’, 7499),
(‘other’, 75260)]
In [13]:
n_freqs.take(5)
Out[13]:
[(‘revolution’, 2305),
(‘wealth’, 363),
(‘idea’, 2852),
(‘anarchism’, 67),
(‘something’, 744)]
In [14]:
# Broadcasting the adjective and noun frequencies.
a_dict = a_freqs.collectAsMap()
a_dict = sc.parallelize(a_dict).map(lambda x: x)
n_dict = sc.broadcast(n_freqs.collectAsMap())
a_dict = sc.broadcast(a_freqs.collectAsMap())
# it could broadcast any python data structure, but do not broadcast RDD, which is too big
# #global n_dict, a_dict
# n_dict = n_freqs.collectAsMap()
# a_dict = a_freqs.collectAsMap()
# a_dict[‘violent’]””
In [15]:
from math import *
# Computing the PMI for a pair.
# def pmi_score(pair_freq):
# global n_dict, a_dict
# w1, w2 = pair_freq[0]
# f = pair_freq[1]
# pmi = log(float(f)*N/(a_dict[w1]*n_dict[w2]), 2)
# return pmi, (w1, w2)
def pmi_score(pair_freq):
w1, w2 = pair_freq[0]
f = pair_freq[1]
pmi = log(float(f)*N/(a_dict.value[w1]*n_dict.value[w2]), 2)
return pmi, (w1, w2)
In [16]:
# Computing the PMI for all pairs.
scored_pairs = pair_freqs.map(pmi_score)
In [17]:
# Printing the most strongly associated pairs.
scored_pairs.top(10) # top() defaultly sort by value
Out[17]:
[(14.41018838546462, (‘magna’, ‘carta’)),
(13.071365888694997, (‘polish-lithuanian’, ‘Commonwealth’)),
(12.990597616733414, (‘nitrous’, ‘oxide’)),
(12.64972604311254, (‘latter-day’, ‘Saints’)),
(12.50658937509916, (‘stainless’, ‘steel’)),
(12.482331020687814, (‘pave’, ‘runway’)),
(12.19140721768055, (‘corporal’, ‘punishment’)),
(12.183248694293388, (‘capital’, ‘punishment’)),
(12.147015483562537, (‘rush’, ‘yard’)),
(12.109945794428935, (‘globular’, ‘cluster’))]
In [18]:
[(14.41018838546462, (‘magna’, ‘carta’)),
(13.071365888694997, (‘polish-lithuanian’, ‘Commonwealth’)),
(12.990597616733414, (‘nitrous’, ‘oxide’)),
(12.64972604311254, (‘latter-day’, ‘Saints’)),
(12.50658937509916, (‘stainless’, ‘steel’)),
(12.482331020687814, (‘pave’, ‘runway’)),
(12.19140721768055, (‘corporal’, ‘punishment’)),
(12.183248694293388, (‘capital’, ‘punishment’)),
(12.147015483562537, (‘rush’, ‘yard’)),
(12.109945794428935, (‘globular’, ‘cluster’))]
Out[18]:
[(14.41018838546462, (‘magna’, ‘carta’)),
(13.071365888694997, (‘polish-lithuanian’, ‘Commonwealth’)),
(12.990597616733414, (‘nitrous’, ‘oxide’)),
(12.64972604311254, (‘latter-day’, ‘Saints’)),
(12.50658937509916, (‘stainless’, ‘steel’)),
(12.482331020687814, (‘pave’, ‘runway’)),
(12.19140721768055, (‘corporal’, ‘punishment’)),
(12.183248694293388, (‘capital’, ‘punishment’)),
(12.147015483562537, (‘rush’, ‘yard’)),
(12.109945794428935, (‘globular’, ‘cluster’))]