374 lines
13 KiB
Python
Executable file
374 lines
13 KiB
Python
Executable file
import numpy as np
|
|
import time
|
|
import random
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
# save theta to p6_params.npz that can be used by easynn
|
|
def save_theta(theta):
|
|
c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta
|
|
|
|
np.savez_compressed("p6_params.npz", **{
|
|
"c1.weight": c1_W,
|
|
"c1.bias": c1_b,
|
|
"c2.weight": c2_W,
|
|
"c2.bias": c2_b,
|
|
"f.weight": f_W,
|
|
"f.bias": f_b
|
|
})
|
|
|
|
|
|
# initialize theta using uniform distribution [-bound, bound]
|
|
# return theta as (c1_W, c1_b, c2_W, c2_b, f_W, f_b)
|
|
def initialize_theta(bound):
|
|
c1_W = np.random.uniform(-bound, bound, (8, 1, 5, 5))
|
|
c1_b = np.random.uniform(-bound, bound, 8)
|
|
c2_W = np.random.uniform(-bound, bound, (8, 8, 5, 5))
|
|
c2_b = np.random.uniform(-bound, bound, 8)
|
|
f_W = np.random.uniform(-bound, bound, (10, 128))
|
|
f_b = np.random.uniform(-bound, bound, 10)
|
|
return (c1_W, c1_b, c2_W, c2_b, f_W, f_b)
|
|
|
|
|
|
# return out_nchw
|
|
def Conv2d(in_nchw, kernel_W, kernel_b):
|
|
OC, IC, KH, KW = kernel_W.shape
|
|
N, C, H, W = in_nchw.shape
|
|
if C != IC or kernel_b.shape != (OC,):
|
|
raise Exception("Conv2d size mismatch: %s @ (%s, %s)" % (
|
|
in_nchw.shape, kernel_W.shape, kernel_b.shape))
|
|
|
|
# view in_nchw as a 6D tensor
|
|
shape = (N, IC, H-KH+1, W-KW+1, KH, KW)
|
|
strides = in_nchw.strides+in_nchw.strides[2:]
|
|
data = np.lib.stride_tricks.as_strided(in_nchw,
|
|
shape = shape, strides = strides, writeable = False)
|
|
# np.einsum("nihwyx,oiyx->nohw", data, kernel_W)
|
|
nhwo = np.tensordot(data, kernel_W, ((1,4,5), (1,2,3)))
|
|
return nhwo.transpose(0,3,1,2)+kernel_b.reshape((1, OC, 1, 1))
|
|
|
|
# return p_b for the whole batch
|
|
def Conv2d_backprop_b(p_out, in_nchw, kernel_W, kernel_b):
|
|
OC, IC, KH, KW = kernel_W.shape
|
|
N, C, H, W = in_nchw.shape
|
|
if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1):
|
|
raise Exception("Conv2d_backprop_b size mismatch: %s = %s @ (%s, %s)" % (
|
|
p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape))
|
|
|
|
return np.einsum("nohw->o", p_out, optimize = "optimal")/N
|
|
|
|
# return p_W for the whole batch
|
|
def Conv2d_backprop_W(p_out, in_nchw, kernel_W, kernel_b):
|
|
OC, IC, KH, KW = kernel_W.shape
|
|
N, C, H, W = in_nchw.shape
|
|
if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1):
|
|
raise Exception("Conv2d_backprop_W size mismatch: %s = %s @ (%s, %s)" % (
|
|
p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape))
|
|
|
|
# view in_nchw as a 6D tensor
|
|
shape = (N, IC, KH, KW, H-KH+1, W-KW+1)
|
|
strides = in_nchw.strides+in_nchw.strides[2:]
|
|
data = np.lib.stride_tricks.as_strided(in_nchw,
|
|
shape = shape, strides = strides, writeable = False)
|
|
# np.einsum("nohw,niyxhw->oiyx", p_out, data)
|
|
return np.tensordot(p_out, data, ((0,2,3), (0,4,5)))/N
|
|
|
|
# return p_in for the whole batch
|
|
def Conv2d_backprop_in(p_out, in_nchw, kernel_W, kernel_b):
|
|
OC, IC, KH, KW = kernel_W.shape
|
|
N, C, H, W = in_nchw.shape
|
|
if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1):
|
|
raise Exception("Conv2d_backprop_in size mismatch: %s = %s @ (%s, %s)" % (
|
|
p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape))
|
|
|
|
# view p_out as a padded 6D tensor
|
|
padded = np.zeros((N, OC, H+KH-1, W+KW-1))
|
|
padded[:, :, KH-1:-KH+1, KW-1:-KW+1] = p_out
|
|
shape = (N, IC, H, W, KH, KW)
|
|
strides = padded.strides+padded.strides[2:]
|
|
data = np.lib.stride_tricks.as_strided(padded,
|
|
shape = shape, strides = strides, writeable = False)
|
|
# np.einsum("nohwyx,oiyx->nihw", data, kernel_W)
|
|
nhwi = np.tensordot(data, kernel_W, ((1,4,5), (0,2,3)))
|
|
return nhwi.transpose((0,3,1,2))
|
|
|
|
|
|
# return out_nchw
|
|
def MaxPool_2by2(in_nchw):
|
|
N, C, H, W = in_nchw.shape
|
|
shape = (N, C, H//2, 2, W//2, 2)
|
|
return np.nanmax(in_nchw.reshape(shape), axis = (3,5))
|
|
|
|
# return p_in for the whole batch
|
|
def MaxPool_2by2_backprop(p_out, out_nchw, in_nchw):
|
|
p_in = np.zeros(in_nchw.shape)
|
|
p_in[:, :, 0::2, 0::2] = p_out*(out_nchw == in_nchw[:, :, 0::2, 0::2])
|
|
p_in[:, :, 0::2, 1::2] = p_out*(out_nchw == in_nchw[:, :, 0::2, 1::2])
|
|
p_in[:, :, 1::2, 0::2] = p_out*(out_nchw == in_nchw[:, :, 1::2, 0::2])
|
|
p_in[:, :, 1::2, 1::2] = p_out*(out_nchw == in_nchw[:, :, 1::2, 1::2])
|
|
return p_in
|
|
|
|
|
|
# forward:
|
|
# x = NCHW(images)
|
|
# cx = Conv2d_c1(x)
|
|
# rx = ReLU(cx)
|
|
# y = MaxPool(rx)
|
|
# cy = Conv2d_c2(hx)
|
|
# ry = ReLU(cy)
|
|
# g = MaxPool(ry)
|
|
# h = Flatten(g)
|
|
# z = Linear_f(h)
|
|
# return (z, h, g, ry, cy, y, rx, cx, x)
|
|
def forward(images, theta):
|
|
# number of samples
|
|
N = images.shape[0]
|
|
|
|
# unpack theta into c1, c2, and f
|
|
c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta
|
|
|
|
# x = NCHW(images)
|
|
x = images.astype(float).transpose(0,3,1,2)
|
|
|
|
# cx = Conv2d_c1(x)
|
|
cx = Conv2d(x, c1_W, c1_b)
|
|
|
|
# rx = ReLU(cx)
|
|
rx = cx*(cx > 0)
|
|
|
|
# y = MaxPool(rx)
|
|
y = MaxPool_2by2(rx)
|
|
|
|
# cy = Conv2d_c2(y)
|
|
cy = Conv2d(y, c2_W, c2_b)
|
|
|
|
# ry = ReLU(cy)
|
|
ry = cy*(cy > 0)
|
|
|
|
# g = MaxPool(ry)
|
|
g = MaxPool_2by2(ry)
|
|
|
|
# h = Flatten(g)
|
|
h = g.reshape((N, -1))
|
|
|
|
# z = Linear_f(h)
|
|
z = np.zeros((N, f_b.shape[0]))
|
|
for i in range(N):
|
|
z[i, :] = np.matmul(f_W, h[i])+f_b
|
|
|
|
return (z, h, g, ry, cy, y, rx, cx, x)
|
|
|
|
def backprop(labels, theta, z, h, g, ry, cy, y, rx, cx, x):
|
|
# Number of samples
|
|
N = labels.shape[0]
|
|
|
|
# Unpack theta into c1, c2, and f
|
|
c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta
|
|
|
|
# Initialize gradients for each parameter
|
|
p_c1_W, p_c1_b = np.zeros_like(c1_W), np.zeros_like(c1_b)
|
|
p_c2_W, p_c2_b = np.zeros_like(c2_W), np.zeros_like(c2_b)
|
|
p_f_W, p_f_b = np.zeros_like(f_W), np.zeros_like(f_b)
|
|
# sample-by-sample from z to h
|
|
#p_f_W = np.zeros(f_W.shape)
|
|
#p_f_b = np.zeros(f_b.shape)
|
|
p_h = np.zeros(h.shape)
|
|
|
|
# Step 1: Backprop through linear layer
|
|
for i in range(N):
|
|
# Softmax and cross-entropy gradient
|
|
expz = np.exp(z[i] - np.max(z[i]))
|
|
softmax = expz / np.sum(expz)
|
|
softmax[labels[i]] -= 1
|
|
p_z = softmax / N
|
|
|
|
# Backprop through linear layer to compute gradients
|
|
p_h[i, :] = np.dot(f_W.T, p_z)
|
|
p_f_W += np.outer(p_h[i, :], p_z).T
|
|
p_f_b += p_z
|
|
|
|
# Average the gradients over the batch
|
|
p_f_W /= N
|
|
p_f_b /= N
|
|
|
|
# Step 2: Backprop through flatten operation
|
|
p_g = p_h.reshape(g.shape)
|
|
|
|
# Step 3: Backprop through MaxPool layer
|
|
p_ry = MaxPool_2by2_backprop(p_g, g, ry)
|
|
|
|
# Step 4: Backprop through ReLU layer
|
|
p_cy = (ry > 0) * p_ry
|
|
|
|
# Step 5: Backprop through second convolutional layer
|
|
p_c2_W = Conv2d_backprop_W(p_cy, y, c2_W, c2_b)
|
|
p_c2_b = Conv2d_backprop_b(p_cy, y, c2_W, c2_b)
|
|
p_y = Conv2d_backprop_in(p_cy, y, c2_W, c2_b)
|
|
|
|
# Step 6: Backprop through first MaxPool layer
|
|
p_rx = MaxPool_2by2_backprop(p_y, y, rx)
|
|
|
|
# Step 7: Backprop through first ReLU layer
|
|
p_cx = (rx > 0) * p_rx
|
|
|
|
# Step 8: Backprop through first convolutional layer
|
|
p_c1_W = Conv2d_backprop_W(p_cx, x, c1_W, c1_b)
|
|
p_c1_b = Conv2d_backprop_b(p_cx, x, c1_W, c1_b)
|
|
|
|
return (p_c1_W, p_c1_b, p_c2_W, p_c2_b, p_f_W, p_f_b)
|
|
|
|
# apply SGD to update theta by nabla_J and the learning rate epsilon
|
|
# return updated theta
|
|
|
|
def update_theta(theta, nabla_J, epsilon):
|
|
# Unpack the current parameters and their corresponding gradients
|
|
c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta
|
|
p_c1_W, p_c1_b, p_c2_W, p_c2_b, p_f_W, p_f_b = nabla_J
|
|
|
|
# Update each parameter by subtracting the gradient times the learning rate
|
|
c1_W_updated = c1_W - epsilon * p_c1_W
|
|
c1_b_updated = c1_b - epsilon * p_c1_b
|
|
c2_W_updated = c2_W - epsilon * p_c2_W
|
|
c2_b_updated = c2_b - epsilon * p_c2_b
|
|
f_W_updated = f_W - epsilon * p_f_W
|
|
f_b_updated = f_b - epsilon * p_f_b
|
|
|
|
# Return the updated parameters as a tuple
|
|
updated_theta = (c1_W_updated, c1_b_updated, c2_W_updated, c2_b_updated, f_W_updated, f_b_updated)
|
|
return updated_theta
|
|
|
|
def sample_attempts(accuracies, num_samples=10):
|
|
total_attempts = len(accuracies)
|
|
if total_attempts <= num_samples:
|
|
return accuracies
|
|
|
|
# Always include first and last attempts
|
|
sampled_indices = [0, total_attempts - 1]
|
|
|
|
# Sample additional attempts, excluding the first and last
|
|
additional_samples = random.sample(range(1, total_attempts - 1), num_samples - 2)
|
|
sampled_indices.extend(additional_samples)
|
|
|
|
# Sort the indices to maintain chronological order
|
|
sampled_indices.sort()
|
|
|
|
# Get the sampled attempts
|
|
sampled_accuracies = [accuracies[i] for i in sampled_indices]
|
|
return sampled_accuracies
|
|
|
|
# hyperparameters
|
|
# default params
|
|
#bound = 1 # initial weight range
|
|
#epsilon = 0.00001 # learning rate
|
|
#batch_size = 4
|
|
|
|
# start training
|
|
def start_training(mnist_train, bound, epsilon, batch_size):
|
|
|
|
print(f"bound: {bound}, epsilon: {epsilon}, batch_size: {batch_size}")
|
|
validation_images = mnist_train["images"][:1000]
|
|
validation_labels = mnist_train["labels"][:1000]
|
|
training_images = mnist_train["images"][1000:21000]
|
|
training_labels = mnist_train["labels"][1000:21000]
|
|
#training_images = mnist_train["images"][1000:11000]
|
|
#training_labels = mnist_train["labels"][1000:11000]
|
|
|
|
|
|
start = time.time()
|
|
theta = initialize_theta(bound)
|
|
batches = training_images.shape[0]//batch_size
|
|
|
|
accuracies = []
|
|
|
|
for epoch in range(5):
|
|
indices = np.arange(training_images.shape[0])
|
|
np.random.shuffle(indices)
|
|
for i in range(batches):
|
|
batch_images = training_images[indices[i*batch_size:(i+1)*batch_size]]
|
|
batch_labels = training_labels[indices[i*batch_size:(i+1)*batch_size]]
|
|
|
|
z, h, g, ry, cy, y, rx, cx, x = forward(batch_images, theta)
|
|
nabla_J = backprop(batch_labels, theta, z, h, g, ry, cy, y, rx, cx, x)
|
|
theta = update_theta(theta, nabla_J, epsilon)
|
|
|
|
# check accuracy using validation examples
|
|
z, _, _, _, _, _, _, _, _ = forward(validation_images, theta)
|
|
pred_labels = z.argmax(axis = 1)
|
|
count = sum(pred_labels == validation_labels)
|
|
accuracy = count/validation_images.shape[0]
|
|
accuracies.append(accuracy)
|
|
print("epoch %d, accuracy %.3f, time %.2f" % (
|
|
epoch, accuracy, time.time()-start))
|
|
|
|
|
|
# save the weights to be submitted
|
|
save_theta(theta)
|
|
return accuracies
|
|
|
|
def plot_accuracies(accuracies):
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
for attempt, acc_list in accuracies:
|
|
plt.plot(acc_list, label=f'Attempt {attempt+1}')
|
|
|
|
plt.title('Model Accuracy over Epochs for Sampled Attempts')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Accuracy')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
def main():
|
|
# list of (attempt, accuracies_for_run)
|
|
accuracies = []
|
|
|
|
# ToDo: set numpy random seed to the last 8 digits of your CWID
|
|
np.random.seed(20497299)
|
|
|
|
# load training data once
|
|
mnist_train = np.load("mnist_train.npz")
|
|
|
|
max_accuracy = 0.93 # Target accuracy
|
|
max_attempts = 999999 # Maximum number of attempts to reach the target accuracy
|
|
|
|
# initial starting values
|
|
# notes from experimentation:
|
|
|
|
# 90 percent accuracy
|
|
# attempt: 36
|
|
# increase bound/epsilon *= 1.01
|
|
bound = 0.01
|
|
epsilon = 0.0001
|
|
batch_size = 1
|
|
|
|
# 90 percent accuracy if we start at initial starting values. on 36th attempt
|
|
# note: if we start at these below values we do not hit 90. only if we start at 0.01/0.0001 and *= by 1.01
|
|
#bound = 0.014166027560312683
|
|
#epsilon = 0.0001416602756031268
|
|
#batch_size = 1
|
|
|
|
attempt = 0
|
|
while attempt < max_attempts:
|
|
print(f"Attempt: {attempt} out of: {max_attempts}")
|
|
accuracies_for_run = start_training(mnist_train, bound, epsilon, batch_size)
|
|
accuracies.append((attempt, accuracies_for_run))
|
|
|
|
if any(i >= max_accuracy for i in accuracies_for_run):
|
|
print(f"Reached Max Accuracy: {max_accuracy}")
|
|
print(f"bound: {bound}")
|
|
print(f"epsilon: {epsilon}")
|
|
print(f"batch_size: {batch_size}")
|
|
sampled_accuracies = sample_attempts(accuracies)
|
|
plot_accuracies(sampled_accuracies)
|
|
break
|
|
|
|
bound *= 1.0005 # Increase by 0.05%
|
|
epsilon *= 1.0005 # Increase by 0.05%
|
|
|
|
#bound *= 1.01 # Increase by 1%
|
|
#epsilon *= 1.01 # Increase by 1%
|
|
|
|
attempt += 1
|
|
|
|
if __name__ == "__main__":
|
|
main()
|