b’a4.tgz’
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
This is a test script for clustering methods.
“””
import _pickle as pickle
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import os
from functools import partial
from gmm import GMM
from kmeans import KMeans
def test_all(base_path, tests, test_method, visualize=False):
assert test_method in [‘kmeans’, ‘gmm’, ‘all’], f”Only support methods: {[‘kmeans’, ‘gmm’, ‘all’]}. Got: {test_method}”
run_experiment = partial(run_test, visualize=visualize, test_method=test_method)
for test in tests:
data_path = os.path.join(base_path, test)
assert os.path.isfile(data_path)
run_experiment(data_path=data_path)
def run_test(data_path, test_method, visualize=False):
with open(data_path, “rb”) as f:
test_data = pickle.load(f)
kmeans_enabled = False
gmm_enabled = False
num_plots = 1
if test_method in [‘kmeans’, ‘all’]:
test_kmeans(test_data)
kmeans_labels = test_data[“kmeans_labels”].flatten()
kmeans_enabled = True
num_plots += 1
if test_method in [‘gmm’, ‘all’]:
test_gmm(test_data)
gmm_labels = test_data[“gmm_labels”].flatten()
gmm_enabled = True
num_plots += 1
if visualize:
K = test_data[“init_centers”].shape[0]
fig = plt.figure(figsize=(5, 10))
ax = fig.add_subplot(num_plots, 1, 1)
for cluster_i in range(K):
ax.set_title(“Original”)
ax.scatter(test_data[‘data’][:, 0],
test_data[‘data’][:, 1])
ax.scatter(test_data[“gmm_centers”][:, 0], test_data[“gmm_centers”][:, 1], c=”black”)
ax.scatter(test_data[“kmeans_centers”][:, 0], test_data[“kmeans_centers”][:, 1], c=”black”, marker=”x”)
ax = fig.add_subplot(num_plots, 1, 2)
if kmeans_enabled:
for cluster_i in range(K):
ax.set_title(“KMeans”)
ax.scatter(test_data[‘data’][kmeans_labels == cluster_i, 0],
test_data[‘data’][kmeans_labels == cluster_i, 1])
ax.scatter(test_data[“kmeans_centers”][:, 0], test_data[“kmeans_centers”][:, 1], c=”black”)
if gmm_enabled:
ax = fig.add_subplot(num_plots, 1, 3)
if gmm_enabled:
for cluster_i in range(K):
ax.set_title(“GMM”)
ax.scatter(test_data[‘data’][gmm_labels == cluster_i, 0],
test_data[‘data’][gmm_labels == cluster_i, 1])
ax.scatter(test_data[“gmm_centers”][:, 0], test_data[“gmm_centers”][:, 1], c=”black”)
plt.show()
def test_kmeans(test_data):
model = KMeans(test_data[“init_centers”])
labels = model.train(test_data[“data”])
assert np.allclose(model.centers, test_data[“kmeans_centers”])
assert np.allclose(labels, test_data[“kmeans_labels”])
def test_gmm(test_data):
model = GMM(test_data[“init_centers”])
labels = model.train(test_data[“data”])
assert np.allclose(model.centers, test_data[“gmm_centers”])
assert np.allclose(model.covariances, test_data[“gmm_covariances”])
assert np.allclose(model.mixture_proportions, test_data[“gmm_mixture_proportions”])
assert np.allclose(labels, test_data[“gmm_labels”])
if __name__ == “__main__”:
base_path = “../data/”
tests = [f”test_{i}.pkl” for i in range(1, 6)]
# Test methods: kmeans, gmm, all
test_method = “kmeans”
# Whether or not to visualize clusters
visualize = True
test_all(base_path, tests, test_method, visualize)
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
This file clusters a document data set, which consists few thousand BBC articles
represented by word-frequency vectors, using K-Means algorithm.
NOTE: You can try using GMM but you will realized it is not a good idea with the dimensionality.
“””
import _pickle as pickle
import numpy as np
import os
from center_initializations import (random_init, kmeans_pp)
from kmeans import KMeans
def get_data(norm_flag, diffuse):
“”” This function preprocesses the data given the flags.
If the data is already cached, simply load and return the cache.
Args:
– norm_flag (bool): Whether or not to normalize the feature vectors.
– diffuse (int): Number of random walk steps to take. NOTE: diffuse >= 0.
Output:
– data (ndarray (Shape: (N, D))): The preprocessed data.
– terms (ndarray (Shape: (D, 1))): The terms corresponding to each feature.
“””
data_path = “../data/BBC_data.pkl”
transition_matrix_path = “../data/word_transition.pkl”
cache_path = f”../data/BBC_cache_{norm_flag}_{diffuse}.pkl”
terms = pickle.load(open(data_path, “rb”))[‘terms’]
# Load from cache file if it exists
if os.path.isfile(cache_path):
with open(cache_path, “rb”) as f:
data = pickle.load(f)
else:
with open(data_path, “rb”) as f:
data = pickle.load(f)[“data”]
if not norm_flag and diffuse <= 0:
return data, terms
# Normalize documents
data = data / np.sum(data, axis=1, keepdims=True)
if diffuse > 0:
# Perform diffusion to obtain less-sparse vectors
with open(transition_matrix_path, “rb”) as f:
transition_matrix = pickle.load(f)
for _ in range(diffuse):
data = (transition_matrix @ data.T).T
# Save the cache
with open(cache_path, “wb”) as f:
pickle.dump(data, f)
return data, terms
def main(seed, num_trials, center_init, K, norm_flag, diffuse, max_iterations=1000):
assert num_trials > 0, f”Must run the experiment at least once. Got: {num_trials}”
assert center_init in (“kmeans_pp”, “random”), f”Support only kmeans_pp and random. Got: {center_init}”
assert K > 1, f”Must have at least 2 clusters. Got: {K}”
assert diffuse >= 0, f”Diffusion must be at least 0. Got: {diffuse}”
# Create directory if it doesn’t exist
result_dir = f”results/seed_{seed}-init_{center_init}-K_{K}-norm_{norm_flag}-diffuse_{diffuse}-max_iter_{max_iterations}”
if not os.path.isdir(result_dir):
os.makedirs(result_dir, exist_ok=True)
center_init_mapping = {
“kmeans_pp”: kmeans_pp,
“random”: random_init
}
data, terms = get_data(norm_flag, diffuse)
# Run multiple trials
errors = []
for trial_i in range(num_trials):
curr_dir = os.path.join(result_dir, str(trial_i))
if not os.path.isdir(curr_dir):
os.makedirs(curr_dir, exist_ok=True)
init_centers = center_init_mapping[center_init](K=K, train_X=data)
print(f”Trial: {trial_i} – Intial Centers: {init_centers}”)
model = KMeans(init_centers=init_centers)
labels = model.train(train_X=data, max_iterations=max_iterations)
total_error = 0
mean_centers = np.abs(model.centers – np.mean(model.centers, axis=0))
mean_centers = mean_centers / np.sum(mean_centers, axis=1, keepdims=True)
for cluster_i in range(K):
# Only record the words away from the mean
word_idxes = np.where(mean_centers[cluster_i] != 0)[0]
word_counts = np.round(mean_centers[cluster_i] * 500)
# Save results
with open(os.path.join(curr_dir, f”centers_{cluster_i}.txt”), “w”) as f:
for word_idx in word_idxes:
for _ in range(int(word_counts[word_idx])):
f.write(terms[word_idx][0].item())
# Compute error
total_error += np.sum(np.linalg.norm(data[labels.flatten() == cluster_i] – model.centers[cluster_i], ord=2, axis=1))
errors.append(total_error)
error_avg = np.mean(errors)
error_var = np.var(errors)
print(f”ERRORS: {errors}”)
print(f”Average: {error_avg}”)
print(f”Variance: {error_var}”)
with open(os.path.join(result_dir, “errors.pkl”), “wb”) as f:
pickle.dump({“errors”: errors,
“average”: error_avg,
“variance”: error_var}, f)
if __name__ == “__main__”:
# Set random seed
seed = 2
np.random.seed(seed)
# Center Initialization method: kmeans_pp or random
center_init = “random”
# Number of clusters. NOTE: K > 1
K = 5
# Normalize data
norm_flag = False
# Amount of diffusion
diffuse = 0
# Number of trials
num_trials = 1
# Number of iterations of EM algorithm
max_iterations = 1000
main(seed=seed,
num_trials=num_trials,
center_init=center_init,
K=K,
norm_flag=norm_flag,
diffuse=diffuse,
max_iterations=max_iterations)
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import numpy as np
class KMeans:
def __init__(self, init_centers):
“”” This class represents the K-means model.
TODO: You will need to implement the methods of this class:
– train: ndarray, int -> ndarray
Implementation description will be provided under each method.
For the following:
– N: Number of samples.
– D: Dimension of input features.
– K: Number of centers.
NOTE: K > 1
Args:
– init_centers (ndarray (shape: (K, D))): A KxD matrix consisting K D-dimensional centers.
“””
assert len(init_centers.shape) == 2, f”init_centers should be a KxD matrix. Got: {init_centers.shape}”
(self.K, self.D) = init_centers.shape
assert self.K > 1, f”There must be at least 2 clusters. Got: {self.K}”
# Shape: K x D
self.centers = np.copy(init_centers)
def train(self, train_X, max_iterations=1000):
“”” This method trains the K-means model.
NOTE: This method updates self.centers
The algorithm is the following:
– Assigns data points to the closest cluster center.
– Re-computes cluster centers based on the data points assigned to them.
– Update the labels array to contain the index of the cluster center each point is assigned to.
– Loop ends when the labels do not change from one iteration to the next.
Args:
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
– max_iterations (int): Maximum number of iterations.
Output:
– labels (ndarray (shape: (N, 1))): A N-column vector consisting N labels of input data.
“””
assert len(train_X.shape) == 2 and train_X.shape[1] == self.D, f”train_X should be a NxD matrix. Got: {train_X.shape}”
assert max_iterations > 0, f”max_iterations must be positive. Got: {max_iterations}”
N = train_X.shape[0]
labels = np.empty(shape=(N, 1), dtype=np.long)
distances = np.empty(shape=(N, self.K))
for _ in range(max_iterations):
old_labels = labels
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
# Check convergence
if np.allclose(old_labels, labels):
break
return labels
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import numpy as np
def kmeans_pp(K, train_X):
“”” This function runs K-means++ algorithm to choose the centers.
Args:
– K (int): Number of centers.
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
Output:
– centers (ndarray (shape: (K, D))): A KxD matrix consisting K D-dimensional centers.
“””
centers = np.empty(shape=(K, train_X.shape[1]))
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
return centers
def random_init(K, train_X):
“”” This function randomly chooses K data points as centers.
Args:
– K (int): Number of centers.
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
Output:
– centers (ndarray (shape: (K, D))): A KxD matrix consisting K D-dimensional centers.
“””
centers = train_X[np.random.randint(train_X.shape[0], size=K)]
return centers
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4 – Clustering
B. Chan, S. Wei, D. Fleet
“””
%%%%%%%%%%
%% Step 0
%%%%%%%%%%
1) What is the average sparsity of input vectors? (in [0, 1])
2) Find the 10 most common terms, find the 10 least common terms. (list, separated by commas)
3) What is the average frequency of non-zero vector entries in any document?
%%%%%%%%%%
%% Step 1
%%%%%%%%%%
1) Can you categorize the topic for each cluster? (list, comma separated)
2) What factors make clustering difficult?
3) Will we get better results with a lucky initial guess for cluster centers?
(yes/no and a short explanation of why)
%%%%%%%%%%
%% Step 2
%%%%%%%%%%
1) What problem from step 1 is solved now?
2) What are the topics for clusters?
3) Is the result better or worse than step 1? (give a short explanation as well)
%%%%%%%%%%
%% Step 3
%%%%%%%%%%
1) What are the topics for clusters?
2) Why is the clustering better now?
3) What is the general lesson you learned in clustering sparse, high-dimensional
data?
%%%%%%%%%%
%% Step 5
%%%%%%%%%%
1) What is the total error difference between K-Means++ and random center initialization?
2) What is the mean and variance of total errors after running K-Means++ 5 times?
3) Do the training errors appear to be more consistent?
4) Do the topics appear to be more meaningful?
%%%%%%%%%%
%% K-Means vs GMM
%%%%%%%%%%
1) Under what scenarios do the methods find drastically different clusters? Why?
2) What happens to GMM as we increase the dimensionality of input feature? Does K-Means suffer from the same problem?
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import numpy as np
from functools import partial
class GMM:
def __init__(self, init_centers):
“”” This class represents the GMM model.
TODO: You will need to implement the methods of this class:
– _e_step: ndarray, ndarray -> ndarray
– _m_step: ndarray, ndarray -> None
Implementation description will be provided under each method.
For the following:
– N: Number of samples.
– D: Dimension of input features.
– K: Number of Gaussians.
NOTE: K > 1
Args:
– init_centers (ndarray (shape: (K, D))): A KxD matrix consisting K D-dimensional centers, each for a Gaussian.
“””
assert len(init_centers.shape) == 2, f”init_centers should be a KxD matrix. Got: {init_centers.shape}”
(self.K, self.D) = init_centers.shape
assert self.K > 1, f”There must be at least 2 clusters. Got: {self.K}”
# Shape: K x D
self.centers = np.copy(init_centers)
# Shape: K x D x D
self.covariances = np.tile(np.eye(self.D), reps=(self.K, 1, 1))
# Shape: K x 1
self.mixture_proportions = np.ones(shape=(self.K, 1)) / self.K
def _e_step(self, train_X):
“”” This method performs the E-step of the EM algorithm.
Args:
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
Output:
– probability_matrix_updated (ndarray (shape: (N, K))): A NxK matrix consisting N conditional probabilities of p(z_k|x_i) (i.e. the responsibilities).
“””
(N, D) = train_X.shape
probability_matrix = np.empty(shape=(N, self.K))
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
assert probability_matrix.shape == (train_X.shape[0], self.K), f”probability_matrix shape mismatch. Expected: {(train_X.shape[0], self.K)}. Got: {probability_matrix.shape}”
return probability_matrix
def _m_step(self, train_X, probability_matrix):
“”” This method performs the M-step of the EM algorithm.
NOTE: This method updates self.centers, self.covariances, and self.mixture_proportions
Args:
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
– probability_matrix (ndarray (shape: (N, K))): A NxK matrix consisting N conditional probabilities of p(z_k|x_i) (i.e. the responsibilities).
Output:
– centers (ndarray (shape: (K, D))): A KxD matrix consisting K D-dimensional means for each Gaussian component.
– covariances (ndarray (shape: (K, D, D))): A KxDxD tensor consisting K DxD covariance matrix for each Gaussian component.
– mixture_proportions (ndarray (shape: (K, 1))): A K-column vector consistent the mixture proportion for each Gaussian component.
“””
(N, D) = train_X.shape
centers = np.empty(shape=(self.K, self.D))
covariances = np.empty(shape=(self.K, self.D, self.D))
mixture_proportions = np.empty(shape=(self.K, 1))
# ====================================================
# TODO: Implement your solution within the box
#
# ====================================================
assert centers.shape == (self.K, self.D), f”centers shape mismatch. Expected: {(self.K, self.D)}. Got: {centers.shape}”
assert covariances.shape == (self.K, self.D, self.D), f”covariances shape mismatch. Expected: {(self.K, self.D, self.D)}. Got: {covariances.shape}”
assert mixture_proportions.shape == (self.K, 1), f”mixture_proportions shape mismatch. Expected: {(self.K, 1)}. Got: {mixture_proportions.shape}”
return centers, covariances, mixture_proportions
def train(self, train_X, max_iterations=1000):
“”” This method trains the GMM model using EM algorithm.
NOTE: This method updates self.centers, self.covariances, and self.mixture_proportions
Args:
– train_X (ndarray (shape: (N, D))): A NxD matrix consisting N D-dimensional input data.
– max_iterations (int): Maximum number of iterations.
Output:
– labels (ndarray (shape: (N, 1))): A N-column vector consisting N labels of input data.
“””
assert len(train_X.shape) == 2 and train_X.shape[1] == self.D, f”train_X should be a NxD matrix. Got: {train_X.shape}”
assert max_iterations > 0, f”max_iterations must be positive. Got: {max_iterations}”
N = train_X.shape[0]
e_step = partial(self._e_step, train_X=train_X)
m_step = partial(self._m_step, train_X=train_X)
labels = np.empty(shape=(N, 1), dtype=np.long)
for _ in range(max_iterations):
old_labels = labels
# E-Step
probability_matrix = e_step()
# Reassign labels
labels = np.argmax(probability_matrix, axis=1).reshape((N, 1))
# Check convergence
if np.allclose(old_labels, labels):
break
# M-Step
self.centers, self.covariances, self.mixture_proportions = m_step(probability_matrix=probability_matrix)
return labels
#!/usr/bin/env python
“””
Derived from the minimal word cloud plotting example from
the word cloud GIT repo:
https://github.com/amueller/word_cloud
WordCloud by Andreas Mueller
“””
from os import path
from wordcloud import WordCloud
d = path.dirname(__file__)
# Read the whole text.
for cc in range(5):
name=”centers_”+str(cc)+”.txt”
text = open(path.join(d, name)).read()
# Generate a word cloud image
wordcloud = WordCloud(collocations=False).generate(text)
# Display the generated image:
# the matplotlib way:
import matplotlib.pyplot as plt
# Display with unbound font size
plt.imshow(wordcloud, interpolation=’bilinear’)
plt.axis(“off”)
# lower max_font_size
# wordcloud = WordCloud(max_font_size=40).generate(text)
# plt.figure()
# plt.imshow(wordcloud, interpolation=”bilinear”)
# plt.axis(“off”)
plt.show()
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import numpy as np
from pca import PCA
from utils import gram_schmidt
class EMPCA():
def __init__(self, Y, K):
“”” This class represents EM-PCA with components given by data.
TODO: You will need to implement the methods of this class:
– _e_step: ndarray, ndarray -> ndarray
– _m_step: ndarray, ndarray -> ndarray
Implementation description will be provided under each method.
For the following:
– N: Number of samples.
– D: Dimension of observation features.
– K: Dimension of state (subspace) features.
NOTE: K >= 1
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
– K (int): Number of dimensions for the state data.
“””
# Mean of each row, shape: (D, )
self.mean = np.mean(Y, axis=1, keepdims=True)
self.V, self.w = self._compute_components(Y, K)
def _e_step(self, Y, A):
“”” This method runs the E-step of the EM algorithm.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
– A (ndarray (shape: (D, K))): The estimated state (subspace) basis matrix.
Output:
– X (ndarray (shape: (K, N))): The estimated state data.
“””
K, N = A.shape[1], Y.shape[1]
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
assert X.shape == (K, N), f”X shape mismatch. Expected: {(K, N)}. Got: {X.shape}”
return X
def _m_step(self, X, Y):
“”” This method runs the M-step of the EM algorithm.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
– X (ndarray (shape: (K, N))): A SxN matrix consisting N K-dimensional state (subspace) data.
Output:
– A (ndarray (shape: (D, K))): The estimated state (subspace) basis matrix.
“””
D, K = Y.shape[0], X.shape[0]
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
assert A.shape == (D, K), f”A shape mismatch. Expected: {(D, K)}. Got: {A.shape}”
return A
def _compute_components(self, Y, K):
“”” This method computes the state (subspace) basis using EM-PCA.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
– K (int): Number of dimensions for the state data.
Output:
– V (ndarray (shape: (D, K))): The matrix of top K PCA directions (one per column) sorted in descending order.
– w (ndarray (shape: (D, ))): The vector of eigenvalues corresponding to the eigenvectors.
“””
assert len(Y.shape) == 2, f”Y must be a DxN matrix. Got: {Y.shape}”
(D, N) = Y.shape
# Randomly initialize basis A
A = np.random.randn(D, K)
iter_i = 0
while True:
X = self._e_step(Y, A)
old_A = A
A = self._m_step(X, Y)
iter_i += 1
if np.allclose(old_A, A, atol=1e-8, rtol=1e-8):
print(“Break at iteration {}”.format(iter_i))
break
# Apply Gram Schmidt process to orthogonalize A.
A = gram_schmidt(A)
X = self._e_step(Y, A)
# Diagonalize state data to align principal directions
pca = PCA(X)
V = A @ pca.V
w = pca.w
assert V.shape == (D, K), f”V shape mismatch. Expected: {(D, K)}. Got: {V.shape}”
return V, w
def inference(self, Y):
“”” This method estimates state data X from observation data Y using the precomputed mean and components.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
Output:
– X (ndarray (shape: (K, N))): The estimated state data.
“””
assert len(Y.shape) == 2, f”Y must be a DxN matrix. Got: {Y.shape}”
(D, N) = Y.shape
K = self.V.shape[1]
assert D > 0, f”dimensionality of observation representation must be at least 1. Got: {D}”
assert K > 0, f”dimensionality of state representation must be at least 1. Got: {K}”
X = self.V.T @ (Y – self.mean)
assert X.shape == (K, N), f”X shape mismatch. Expected: {(K, N)}. Got: {X.shape}”
return X
def reconstruct(self, X):
“”” This method estimates observation data Y from state data X using the precomputed mean and components.
NOTE: The K is implicitly defined by X.
Args:
– X (ndarray (shape: (K, N))): A SxN matrix consisting N K-dimensional state (subspace) data.
Output:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional reconstructed observation data.
“””
assert len(X.shape) == 2, f”X must be a NxK matrix. Got: {X.shape}”
(K, N) = X.shape
assert K > 0, f”dimensionality of state representation must be at least 1. Got: {K}”
D = self.mean.shape[0]
Y = self.V @ X + self.mean
assert Y.shape == (D, N), f”Y shape mismatch. Expected: {(D, N)}. Got: {Y.shape}”
return Y
if __name__ == “__main__”:
Y = np.arange(11)[None, :] – 5
Y = np.vstack((Y, Y, Y))
print(f”Original observations: \n{Y}”)
test_pca = EMPCA(Y, 1)
print(f”V: \n{test_pca.V}”)
est_X = test_pca.inference(Y)
print(f”Estimated states: \n{est_X}”)
est_Y = test_pca.reconstruct(est_X)
print(f”Estimated observations from estimated states: \n{est_Y}”)
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
This file visualizes the document dataset by reducing the dimensionality with PCA
“””
import _pickle as pickle
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import timeit
import os
from mpl_toolkits.mplot3d import Axes3D
from em_pca import EMPCA
from pca import PCA
def main(dataset, algo):
K = 3
documents = dataset[‘data’].astype(np.float).T
pca_tic = timeit.default_timer()
if algo == “pca”:
pca = PCA(documents)
pca_X = pca.inference(documents, K)
elif algo == “empca”:
pca = EMPCA(documents, K)
pca_X = pca.inference(documents)
else:
raise NotImplementedError
pca_toc = timeit.default_timer()
classes = np.unique(dataset[‘labels’])
fig = plt.figure()
ax = fig.add_subplot(211, projection=”3d”)
ax.set_ylabel(f”{algo} Reconstruction\nTook: {pca_toc – pca_tic:.2f}s”, size=’large’)
for class_i in classes:
class_i_data = pca_X[:, dataset[‘labels’].flatten() == class_i]
ax.scatter(class_i_data[0, :],
class_i_data[1, :],
class_i_data[2, :],
s=1)
plt.tight_layout()
plt.show()
if not os.path.isdir(“results”):
os.mkdir(“results”)
with open(f”results/{algo}_compressed_data.pkl”, “wb”) as f:
pickle.dump({
“est_X”: pca_X,
}, f)
with open(f”results/{algo}.pkl”, “wb”) as f:
pickle.dump({
“V”: pca.V,
“mean”: pca.mean
}, f)
if __name__ == “__main__”:
dataset = pickle.load(open(“../data/BBC_data.pkl”, “rb”))
algo = “pca”
assert algo in (“pca”, “empca”)
main(dataset, algo)
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import numpy as np
def gram_schmidt(basis):
“”” This method apply Gram Schmidt process to orthogonalize the given basis matrix.
Args:
– basis (ndarray (shape: (D, K))): A DxK basis matrix
Output:
– orth_basis (ndarray (shape: (D, K))): The orthnormal basis matrix of “basis”
“””
orth_basis = np.empty(basis.shape)
orth_basis[:, 0] = basis[:, 0] / np.linalg.norm(basis[:, 0])
for ii in range(1, basis.shape[1]):
orth_basis[:, [ii]] = basis[:, [ii]] – np.sum((basis[:, [ii]].T @ orth_basis[:, :ii]) * orth_basis[:, :ii], axis=1, keepdims=True)
orth_basis[:, [ii]] = orth_basis[:, [ii]] / np.linalg.norm(orth_basis[:, ii])
return orth_basis
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4 – Dimensionality Reduction
B. Chan, S. Wei, D. Fleet
“””
Answer The Following Questions:
Understanding Eigenvalues:
1. How do the eigenvalues relate to the diagonal variances used in generating the data?
2. How does the additive noise affect the curves?
3. Based on the plots, can you hypothesize the ways to choose the number of subspace dimensions?
PCA on document data:
1. How big is the covariance matrix used in the PCA algorithm?
2. How long does PCA algorithm take?
3. Do the points from different classes look reasonably separable in this space?
EM-PCA v.s. PCA:
1. After running visualize_documents.py, compare the parameters you’ve estimated using PCA and EM-PCA. Are they identical and if not, how do they differ?
2. Which algorithm takes more space?
3. How long does EM-PCA algorithm take to run compared to PCA?
“””
CSCC11 – Introduction to Machine Learning, Winter 2021, Assignment 4
B. Chan, S. Wei, D. Fleet
“””
import matplotlib.pyplot as plt
import numpy as np
import os
class PCA:
def __init__(self, Y):
“”” This class represents PCA with components and mean given by data.
For the following:
– N: Number of samples.
– D: Dimension of observation features.
– K: Dimension of state features.
NOTE: K >= 1
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
“””
self.D = Y.shape[0]
# Mean of each row, shape: (D, )
self.mean = np.mean(Y, axis=1, keepdims=True)
self.V, self.w = self._compute_components(Y)
def _compute_components(self, Y):
“”” This method computes the PCA directions (one per column) given data.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
Output:
– V (ndarray (shape: (D, D))): The matrix of PCA directions (one per column) sorted in descending order.
– w (ndarray (shape: (D, ))): The vector of eigenvalues corresponding to the eigenvectors.
“””
assert len(Y.shape) == 2, f”Y must be a DxN matrix. Got: {Y.shape}”
(D, N) = Y.shape
data_shifted = Y – self.mean
data_cov = np.cov(data_shifted)
# Numpy collapses the ndarray into a scalar when the output size i.
if D == 1:
data_cov = np.array([[data_cov]])
w, V = np.linalg.eigh(data_cov)
w = np.flip(w)
V = np.flip(V, axis=1)
assert V.shape == (D, D), f”V shape mismatch. Expected: {(D, D)}. Got: {V.shape}”
return V, w
def inference(self, Y, K):
“”” This method estimates state data X from observation data Y using the precomputed mean and components.
Args:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional observation data.
– K (int): Number of dimensions for the state data.
Output:
– X (ndarray (shape: (K, N))): The estimated state data.
“””
assert len(Y.shape) == 2, f”Y must be a DxN matrix. Got: {Y.shape}”
(D, N) = Y.shape
assert D > 0, f”dimensionality of observation representation must be at least 1. Got: {D}”
assert K > 0, f”dimensionality of state representation must be at least 1. Got: {K}”
X = self.V[:, :K].T @ (Y – self.mean)
assert X.shape == (K, N), f”X shape mismatch. Expected: {(K, N)}. Got: {X.shape}”
return X
def reconstruct(self, X):
“”” This method estimates observation data Y from state data X using the precomputed mean and components.
NOTE: The K is implicitly defined by X.
Args:
– X (ndarray (shape: (K, N))): A SxN matrix consisting N K-dimensional state (subspace) data.
Output:
– Y (ndarray (shape: (D, N))): A DxN matrix consisting N D-dimensional reconstructed observation data.
“””
assert len(X.shape) == 2, f”X must be a NxK matrix. Got: {X.shape}”
(K, N) = X.shape
assert K > 0, f”dimensionality of state representation must be at least 1. Got: {K}”
D = self.mean.shape[0]
Y = self.V[:, :K] @ X + self.mean
assert Y.shape == (D, N), f”Y shape mismatch. Expected: {(D, N)}. Got: {Y.shape}”
return Y
def plot_eigenvalues(self, savefig=False):
“”” This function plots the eigenvalues captured by each subspace dimension from 1 to D.
Output:
– eigenvalues (ndarray (shape: (D,))): D-column vector corresponding to the eigenvalues captured by each subspace dimension.
“””
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
plt.title(“Eigenvalues”)
if savefig:
if not os.path.isdir(“results”):
os.mkdir(“results”)
plt.savefig(f”results/eigenvalues.eps”, format=”eps”)
else:
plt.show()
plt.clf()
assert eigenvalues.shape == (self.D,), f”eigenvalues shape mismatch. Expected: {(self.D,)}. Got: {eigenvalues.shape}”
return eigenvalues
def plot_subspace_variance(self, savefig=False):
“”” This function plots the fractions of the total variance in the data from 1 to D.
NOTE: Include the case when K=0.
Output:
– fractions (ndarray (shape: (D,))): D-column vector corresponding to the fractions of the total variance.
“””
# ====================================================
# TODO: Implement your solution within the box
# ====================================================
plt.title(“Fractions of Total Variance”)
if savefig:
if not os.path.isdir(“results”):
os.mkdir(“results”)
plt.savefig(f”results/fraction_variance.eps”, format=”eps”)
else:
plt.show()
plt.clf()
assert fractions.shape == (self.D + 1,), f”fractions shape mismatch. Expected: {(self.D + 1,)}. Got: {fractions.shape}”
return fractions
if __name__ == “__main__”:
Y = np.arange(11)[None, :] – 5
Y = np.vstack((Y, Y, Y))
print(f”Original observations: \n{Y}”)
test_pca = PCA(Y)
print(f”V: \n{test_pca.V}”)
est_X = test_pca.inference(Y, 1)
print(f”Estimated states: \n{est_X}”)
est_Y = test_pca.reconstruct(est_X)
print(f”Estimated observations from estimated states: \n{est_Y}”)
import numpy as np
from em_pca import EMPCA
from pca import PCA
from utils import gram_schmidt
def generate_orthonormal_basis(obs_dim, state_dim):
a = np.random.randn(obs_dim, state_dim)
return gram_schmidt(a)
if __name__ == “__main__”:
seed = 1
plot_eigenvalues = True
run_em_pca = False
np.random.seed(seed)
# Observation space (data) dimension
obs_dim = 10
# State space (subspace) dimension
state_dim = 2
# Standard deviations of an independent Gaussian in subspace
s = [4.0, 2.0]
# Noise
noise = 5e-1
# Number of samples
num_samples = 15
# Generate basis for the subspace
gt_basis = generate_orthonormal_basis(obs_dim, state_dim)
# Generate subspace coordinates from isotropic Gaussian,
# then scale by standard deviations along subspace axes
state_data = np.diag(s) @ np.random.randn(state_dim, num_samples)
# For simplicity, assume the data is zero mean
obs_data = gt_basis @ state_data + noise * np.random.randn(obs_dim, num_samples)
# PCA
pca = PCA(obs_data)
pca_state_basis = pca.V[:, :state_dim]
if plot_eigenvalues:
pca.plot_eigenvalues(savefig=False)
pca.plot_subspace_variance(savefig=False)
print(“True subspace basis: \n{}”.format(gt_basis))
print(“PCA Estimated subspace basis: \n{}”.format(pca_state_basis))
# EM-PCA
if run_em_pca:
em_pca = EMPCA(obs_data, state_dim)
em_pca_state_basis = em_pca.V
print(“EM-PCA Estimated subspace basis: \n{}”.format(em_pca_state_basis))
A4/clustering/test_clustering.py
A4/clustering/document_clustering.py
A4/clustering/kmeans.py
A4/clustering/center_initializations.py
A4/clustering/questions.txt
A4/clustering/gmm.py
A4/clustering/mkWordClouds.sh
A4/pca/em_pca.py
A4/pca/visualize_documents.py
A4/pca/utils.py
A4/pca/questions.txt
A4/pca/pca.py
A4/pca/test_pca.py
A4/data/test_2.pkl
A4/data/BBC_data.pkl
A4/data/eye_image_data.pkl
A4/data/test_3.pkl
A4/data/word_transition.pkl
A4/data/test_4.pkl
A4/data/test_5.pkl
A4/data/test_1.pkl