import numpy as np import time import random import matplotlib.pyplot as plt # save theta to p6_params.npz that can be used by easynn def save_theta(theta): c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta np.savez_compressed("p6_params.npz", **{ "c1.weight": c1_W, "c1.bias": c1_b, "c2.weight": c2_W, "c2.bias": c2_b, "f.weight": f_W, "f.bias": f_b }) # initialize theta using uniform distribution [-bound, bound] # return theta as (c1_W, c1_b, c2_W, c2_b, f_W, f_b) def initialize_theta(bound): c1_W = np.random.uniform(-bound, bound, (8, 1, 5, 5)) c1_b = np.random.uniform(-bound, bound, 8) c2_W = np.random.uniform(-bound, bound, (8, 8, 5, 5)) c2_b = np.random.uniform(-bound, bound, 8) f_W = np.random.uniform(-bound, bound, (10, 128)) f_b = np.random.uniform(-bound, bound, 10) return (c1_W, c1_b, c2_W, c2_b, f_W, f_b) # return out_nchw def Conv2d(in_nchw, kernel_W, kernel_b): OC, IC, KH, KW = kernel_W.shape N, C, H, W = in_nchw.shape if C != IC or kernel_b.shape != (OC,): raise Exception("Conv2d size mismatch: %s @ (%s, %s)" % ( in_nchw.shape, kernel_W.shape, kernel_b.shape)) # view in_nchw as a 6D tensor shape = (N, IC, H-KH+1, W-KW+1, KH, KW) strides = in_nchw.strides+in_nchw.strides[2:] data = np.lib.stride_tricks.as_strided(in_nchw, shape = shape, strides = strides, writeable = False) # np.einsum("nihwyx,oiyx->nohw", data, kernel_W) nhwo = np.tensordot(data, kernel_W, ((1,4,5), (1,2,3))) return nhwo.transpose(0,3,1,2)+kernel_b.reshape((1, OC, 1, 1)) # return p_b for the whole batch def Conv2d_backprop_b(p_out, in_nchw, kernel_W, kernel_b): OC, IC, KH, KW = kernel_W.shape N, C, H, W = in_nchw.shape if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1): raise Exception("Conv2d_backprop_b size mismatch: %s = %s @ (%s, %s)" % ( p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape)) return np.einsum("nohw->o", p_out, optimize = "optimal")/N # return p_W for the whole batch def Conv2d_backprop_W(p_out, in_nchw, kernel_W, kernel_b): OC, IC, KH, KW = kernel_W.shape N, C, H, W = in_nchw.shape if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1): raise Exception("Conv2d_backprop_W size mismatch: %s = %s @ (%s, %s)" % ( p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape)) # view in_nchw as a 6D tensor shape = (N, IC, KH, KW, H-KH+1, W-KW+1) strides = in_nchw.strides+in_nchw.strides[2:] data = np.lib.stride_tricks.as_strided(in_nchw, shape = shape, strides = strides, writeable = False) # np.einsum("nohw,niyxhw->oiyx", p_out, data) return np.tensordot(p_out, data, ((0,2,3), (0,4,5)))/N # return p_in for the whole batch def Conv2d_backprop_in(p_out, in_nchw, kernel_W, kernel_b): OC, IC, KH, KW = kernel_W.shape N, C, H, W = in_nchw.shape if C != IC or kernel_b.shape != (OC,) or p_out.shape != (N, OC, H-KH+1, W-KW+1): raise Exception("Conv2d_backprop_in size mismatch: %s = %s @ (%s, %s)" % ( p_out.shape, in_nchw.shape, kernel_W.shape, kernel_b.shape)) # view p_out as a padded 6D tensor padded = np.zeros((N, OC, H+KH-1, W+KW-1)) padded[:, :, KH-1:-KH+1, KW-1:-KW+1] = p_out shape = (N, IC, H, W, KH, KW) strides = padded.strides+padded.strides[2:] data = np.lib.stride_tricks.as_strided(padded, shape = shape, strides = strides, writeable = False) # np.einsum("nohwyx,oiyx->nihw", data, kernel_W) nhwi = np.tensordot(data, kernel_W, ((1,4,5), (0,2,3))) return nhwi.transpose((0,3,1,2)) # return out_nchw def MaxPool_2by2(in_nchw): N, C, H, W = in_nchw.shape shape = (N, C, H//2, 2, W//2, 2) return np.nanmax(in_nchw.reshape(shape), axis = (3,5)) # return p_in for the whole batch def MaxPool_2by2_backprop(p_out, out_nchw, in_nchw): p_in = np.zeros(in_nchw.shape) p_in[:, :, 0::2, 0::2] = p_out*(out_nchw == in_nchw[:, :, 0::2, 0::2]) p_in[:, :, 0::2, 1::2] = p_out*(out_nchw == in_nchw[:, :, 0::2, 1::2]) p_in[:, :, 1::2, 0::2] = p_out*(out_nchw == in_nchw[:, :, 1::2, 0::2]) p_in[:, :, 1::2, 1::2] = p_out*(out_nchw == in_nchw[:, :, 1::2, 1::2]) return p_in # forward: # x = NCHW(images) # cx = Conv2d_c1(x) # rx = ReLU(cx) # y = MaxPool(rx) # cy = Conv2d_c2(hx) # ry = ReLU(cy) # g = MaxPool(ry) # h = Flatten(g) # z = Linear_f(h) # return (z, h, g, ry, cy, y, rx, cx, x) def forward(images, theta): # number of samples N = images.shape[0] # unpack theta into c1, c2, and f c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta # x = NCHW(images) x = images.astype(float).transpose(0,3,1,2) # cx = Conv2d_c1(x) cx = Conv2d(x, c1_W, c1_b) # rx = ReLU(cx) rx = cx*(cx > 0) # y = MaxPool(rx) y = MaxPool_2by2(rx) # cy = Conv2d_c2(y) cy = Conv2d(y, c2_W, c2_b) # ry = ReLU(cy) ry = cy*(cy > 0) # g = MaxPool(ry) g = MaxPool_2by2(ry) # h = Flatten(g) h = g.reshape((N, -1)) # z = Linear_f(h) z = np.zeros((N, f_b.shape[0])) for i in range(N): z[i, :] = np.matmul(f_W, h[i])+f_b return (z, h, g, ry, cy, y, rx, cx, x) def backprop(labels, theta, z, h, g, ry, cy, y, rx, cx, x): # Number of samples N = labels.shape[0] # Unpack theta into c1, c2, and f c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta # Initialize gradients for each parameter p_c1_W, p_c1_b = np.zeros_like(c1_W), np.zeros_like(c1_b) p_c2_W, p_c2_b = np.zeros_like(c2_W), np.zeros_like(c2_b) p_f_W, p_f_b = np.zeros_like(f_W), np.zeros_like(f_b) # sample-by-sample from z to h #p_f_W = np.zeros(f_W.shape) #p_f_b = np.zeros(f_b.shape) p_h = np.zeros(h.shape) # Step 1: Backprop through linear layer for i in range(N): # Softmax and cross-entropy gradient expz = np.exp(z[i] - np.max(z[i])) softmax = expz / np.sum(expz) softmax[labels[i]] -= 1 p_z = softmax / N # Backprop through linear layer to compute gradients p_h[i, :] = np.dot(f_W.T, p_z) p_f_W += np.outer(p_h[i, :], p_z).T p_f_b += p_z # Average the gradients over the batch p_f_W /= N p_f_b /= N # Step 2: Backprop through flatten operation p_g = p_h.reshape(g.shape) # Step 3: Backprop through MaxPool layer p_ry = MaxPool_2by2_backprop(p_g, g, ry) # Step 4: Backprop through ReLU layer p_cy = (ry > 0) * p_ry # Step 5: Backprop through second convolutional layer p_c2_W = Conv2d_backprop_W(p_cy, y, c2_W, c2_b) p_c2_b = Conv2d_backprop_b(p_cy, y, c2_W, c2_b) p_y = Conv2d_backprop_in(p_cy, y, c2_W, c2_b) # Step 6: Backprop through first MaxPool layer p_rx = MaxPool_2by2_backprop(p_y, y, rx) # Step 7: Backprop through first ReLU layer p_cx = (rx > 0) * p_rx # Step 8: Backprop through first convolutional layer p_c1_W = Conv2d_backprop_W(p_cx, x, c1_W, c1_b) p_c1_b = Conv2d_backprop_b(p_cx, x, c1_W, c1_b) return (p_c1_W, p_c1_b, p_c2_W, p_c2_b, p_f_W, p_f_b) # apply SGD to update theta by nabla_J and the learning rate epsilon # return updated theta def update_theta(theta, nabla_J, epsilon): # Unpack the current parameters and their corresponding gradients c1_W, c1_b, c2_W, c2_b, f_W, f_b = theta p_c1_W, p_c1_b, p_c2_W, p_c2_b, p_f_W, p_f_b = nabla_J # Update each parameter by subtracting the gradient times the learning rate c1_W_updated = c1_W - epsilon * p_c1_W c1_b_updated = c1_b - epsilon * p_c1_b c2_W_updated = c2_W - epsilon * p_c2_W c2_b_updated = c2_b - epsilon * p_c2_b f_W_updated = f_W - epsilon * p_f_W f_b_updated = f_b - epsilon * p_f_b # Return the updated parameters as a tuple updated_theta = (c1_W_updated, c1_b_updated, c2_W_updated, c2_b_updated, f_W_updated, f_b_updated) return updated_theta def sample_attempts(accuracies, num_samples=10): total_attempts = len(accuracies) if total_attempts <= num_samples: return accuracies # Always include first and last attempts sampled_indices = [0, total_attempts - 1] # Sample additional attempts, excluding the first and last additional_samples = random.sample(range(1, total_attempts - 1), num_samples - 2) sampled_indices.extend(additional_samples) # Sort the indices to maintain chronological order sampled_indices.sort() # Get the sampled attempts sampled_accuracies = [accuracies[i] for i in sampled_indices] return sampled_accuracies # hyperparameters # default params #bound = 1 # initial weight range #epsilon = 0.00001 # learning rate #batch_size = 4 # start training def start_training(mnist_train, bound, epsilon, batch_size): print(f"bound: {bound}, epsilon: {epsilon}, batch_size: {batch_size}") validation_images = mnist_train["images"][:1000] validation_labels = mnist_train["labels"][:1000] training_images = mnist_train["images"][1000:21000] training_labels = mnist_train["labels"][1000:21000] #training_images = mnist_train["images"][1000:11000] #training_labels = mnist_train["labels"][1000:11000] start = time.time() theta = initialize_theta(bound) batches = training_images.shape[0]//batch_size accuracies = [] for epoch in range(5): indices = np.arange(training_images.shape[0]) np.random.shuffle(indices) for i in range(batches): batch_images = training_images[indices[i*batch_size:(i+1)*batch_size]] batch_labels = training_labels[indices[i*batch_size:(i+1)*batch_size]] z, h, g, ry, cy, y, rx, cx, x = forward(batch_images, theta) nabla_J = backprop(batch_labels, theta, z, h, g, ry, cy, y, rx, cx, x) theta = update_theta(theta, nabla_J, epsilon) # check accuracy using validation examples z, _, _, _, _, _, _, _, _ = forward(validation_images, theta) pred_labels = z.argmax(axis = 1) count = sum(pred_labels == validation_labels) accuracy = count/validation_images.shape[0] accuracies.append(accuracy) print("epoch %d, accuracy %.3f, time %.2f" % ( epoch, accuracy, time.time()-start)) # save the weights to be submitted save_theta(theta) return accuracies def plot_accuracies(accuracies): plt.figure(figsize=(12, 6)) for attempt, acc_list in accuracies: plt.plot(acc_list, label=f'Attempt {attempt+1}') plt.title('Model Accuracy over Epochs for Sampled Attempts') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() plt.grid(True) plt.show() def main(): # list of (attempt, accuracies_for_run) accuracies = [] # ToDo: set numpy random seed to the last 8 digits of your CWID np.random.seed(20497299) # load training data once mnist_train = np.load("mnist_train.npz") max_accuracy = 0.93 # Target accuracy max_attempts = 999999 # Maximum number of attempts to reach the target accuracy # initial starting values # notes from experimentation: # 90 percent accuracy # attempt: 36 # increase bound/epsilon *= 1.01 bound = 0.01 epsilon = 0.0001 batch_size = 1 # 90 percent accuracy if we start at initial starting values. on 36th attempt # note: if we start at these below values we do not hit 90. only if we start at 0.01/0.0001 and *= by 1.01 #bound = 0.014166027560312683 #epsilon = 0.0001416602756031268 #batch_size = 1 attempt = 0 while attempt < max_attempts: print(f"Attempt: {attempt} out of: {max_attempts}") accuracies_for_run = start_training(mnist_train, bound, epsilon, batch_size) accuracies.append((attempt, accuracies_for_run)) if any(i >= max_accuracy for i in accuracies_for_run): print(f"Reached Max Accuracy: {max_accuracy}") print(f"bound: {bound}") print(f"epsilon: {epsilon}") print(f"batch_size: {batch_size}") sampled_accuracies = sample_attempts(accuracies) plot_accuracies(sampled_accuracies) break bound *= 1.0005 # Increase by 0.05% epsilon *= 1.0005 # Increase by 0.05% #bound *= 1.01 # Increase by 1% #epsilon *= 1.01 # Increase by 1% attempt += 1 if __name__ == "__main__": main()