from logging import exception from keras.backend import constant from tensorflow import keras from tensorflow.keras import layers from keras.preprocessing.image import load_img, ImageDataGenerator, array_to_img, img_to_array from tensorflow.keras.models import load_model from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input, decode_predictions from tensorflow.keras.models import Model import matplotlib.pyplot as plt import numpy as np import pandas as pd import streamlit as st from tensorflow.python.keras.callbacks import History from CustomCallback import CustomCallback import os, shutil from os import path import splitfolders import time ########## Augmentation with VGG19 ############# def tranferLearningVGG(model, train_dir, test_dir): model = VGG19(include_top=False, input_shape=(224,224,3)) train_datagen = ImageDataGenerator( rescale=1/255, horizontal_flip=True, rotation_range=20, width_shift_range=0.2, height_shift_range=0.2 ) test_datagen = ImageDataGenerator( rescale=1/255, ) for layer in model.layers: layer.trainable = False flatten_layer = layers.Flatten()(model.output) flatten_fully_connected_layer = layers.Dense(512, activation='relu')(flatten_layer) flatten_fully_connected_softmax_layer =layers.Dense(5, activation='softmax')(flatten_fully_connected_layer) model = Model(inputs=model.inputs, outputs=flatten_fully_connected_softmax_layer) training_iterater = train_datagen.flow_from_directory(train_dir, batch_size=64, target_size=(224,224)) test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(224,224)) model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam') model.save('models/vgg19_tl.h5') history = model.fit(training_iterater, validation_data=test_iterater, epochs=4) return history ############ Model creation ################ def buildSimpleModel(cats): model = keras.Sequential() model.add(layers.Conv2D(32, (3,3), activation='relu', input_shape=(100,100,3))) model.add(layers.MaxPooling2D((2,2))) model.add(layers.Conv2D(64, (3,3), activation='relu')) model.add(layers.MaxPooling2D((2,2))) model.add(layers.Conv2D(128, (3,3), activation='relu')) model.add(layers.MaxPooling2D((2,2))) model.add(layers.Flatten()) model.add(layers.Dense(512, activation='relu')) model.add(layers.Dense(cats, activation='softmax')) return model ###### preapre iterater ######## def prepareIterater(folder_path, batch_size, img_size): train_datagen = ImageDataGenerator(rescale=1) test_datagen = ImageDataGenerator(rescale=1) training_iterater = train_datagen.flow_from_directory(os.path.join(folder_path, 'train'), batch_size=batch_size, target_size=img_size) test_iterater = test_datagen.flow_from_directory(os.path.join(folder_path, 'test'), batch_size=batch_size, target_size=img_size) return training_iterater, test_iterater ########## start trining ############## def trainSimplaeModel(model, epochs, tran_iterater, test_iterater, model_name, text_output, progrss_bar, graph): # train_datagen = ImageDataGenerator(rescale=1) # test_datagen = ImageDataGenerator(rescale=1) # training_iterater = train_datagen.flow_from_directory(tran_dir, batch_size=64, target_size=(100,100)) # test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(100,100)) model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam') history = model.fit(tran_iterater, validation_data=test_iterater, epochs=epochs, callbacks=[CustomCallback(text_output, progrss_bar, graph)],) model_path = os.path.join(st.session_state['output_folder'], model_name) st.session_state['model_folder'] = st.session_state["output_folder"] st.markdown(f'##### Model output : {model_path}') #st.write(f'Output folder is : {model_path}') model.save(model_path) return history ######### plot ######### def plotHistory(history): plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title("model loss") plt.ylabel("loss") plt.xlabel("no of epochs") plt.legend(['training', 'testing'], loc='upper left') plt.show() ####### print class labels ####### def printLabels(training_iterater): class_labels = training_iterater.class_indices print(class_labels) ####### predict ######## def predict(modelname): mdl = load_model('models/'+ modelname) img = load_img('dataset/flowers/sunflower.jpeg', target_size=(100,100)) img = img_to_array(img) img = img.reshape(1,100,100,3) res = mdl.predict(img) print(res) print(np.argmax(res)) print([key for key in class_labels][np.argmax(res)]) def getModelNames(model_folder): if model_folder == "": return [] param = [] length =len([name for name in os.listdir('.') if os.path.isfile(model_folder)]) for entry in os.scandir(model_folder): if entry.name.lower().endswith('.h5'): param.append(entry.name) return param ##### prepare image folders ###### def prepareFolders(folder_path, output_folder): file_path = st.session_state['output_folder'] try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) splitfolders.ratio(folder_path, output=output_folder, seed=1337, ratio=(.8, 0.1,0.1)) except Exception as e: pass ##### handle change on image input folder ##### def handlerImageFolderChanged(): output_folder = st.session_state["output_folder"] def handlePrepare(): prepareFolders(raw_image_folder, st.session_state['output_folder']) def handSelectModel(): st.write('#####') # if 'model_name' in st.session_state: # st.write(st.session_state['model_name']) raw_image_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/raw_data" #output_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/output" #model_folder = '/Users/mohamednouffer/workspace/akira_san/image_classifier/models' #train_folder = os.path.join(output_folder, "train") #test_folder = os.path.join(output_folder, "test") st.title("Sumasen AI") st.sidebar.header("Sumasen Trainer") st.session_state["image_arranged"] = False #model_folder = "" options = ("New", "Existing model") mode = st.sidebar.empty() model_empty = st.sidebar.empty() if "training_mode" not in st.session_state: st.session_state['training_mode'] = "New" training_mode = mode.radio("Training mode:", options,0) st.session_state['training_mode'] = training_mode if training_mode == "Existing model": if 'model_folder' in st.session_state: model_folder = model_empty.text_input("model folder", st.session_state['model_folder']) if st.sidebar.button("Load models"): selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder), on_change=handSelectModel, key='model_name') st.session_state['selected_model'] = selected_model else: model_folder = model_empty.text_input("Enter model folder",'') if st.sidebar.button("Load models"): selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder)) st.session_state['model_folder'] = model_folder st.session_state['selected_model'] = selected_model if 'model_name' in st.session_state: st.write('Selected models is :', st.session_state['model_name']) st.subheader("Predict a image") input_shape = st.selectbox('Select image input shape',[(100,100), [150, 150], (200,200), (240,240), (300, 300)]) file = st.file_uploader("Upload image",['jpeg', 'jpg', 'png']) if file: bytes_data = file.getvalue() with open(os.path.join(st.session_state['model_folder'],file.name),"wb") as f: f.write(file.getbuffer()) model_file = os.path.join(st.session_state['model_folder'], st.session_state['model_name']) mdl = load_model(model_file) img = load_img(os.path.join(st.session_state['model_folder'], file.name), target_size=input_shape) st.image(img) img = img_to_array(img) img = img.reshape(1,input_shape[0],input_shape[1],3) res = mdl.predict(img) st.markdown('## Category is {}'.format(np.argmax(res))) #st.write([key for key in class_labels][np.argmax(res)]) else: if "output_folder" in st.session_state: for fls in os.scandir(st.session_state["output_folder"]): if os.path.isdir(fls.path) and (fls.name == "tran" or fls.name == "test" or fls.name == "val"): st.info("Images are ready for training") st.session_state["image_arranged"] = True selected_image_folder = st.text_input("Enter image folder (make sure only category folders in this directory)", "Enter images folder ...") output_folder = os.path.join(selected_image_folder, "arranged") output_folder = st.text_input("Enter a folder to prepare images", output_folder) input_shape = st.selectbox('Select image input shape',[(100,100), [150, 150], (200,200), (240,240), (300, 300)]) batch_size = st.slider('Batch size', 1, 1000, 40) if selected_image_folder == "Enter images folder ..." or selected_image_folder == "": st.error("Need a valid image folder") else: try: st.session_state["selected_image_folder"] = selected_image_folder st.session_state["output_folder"] = output_folder arranged = st.button("arrange images", on_click=handlePrepare) if st.session_state["image_arranged"] == True: traing_iterater, test_iterater = prepareIterater(st.session_state['output_folder'],batch_size, input_shape) class_labels = traing_iterater.class_indices st.write('class labels',class_labels) model_name = st.text_input('model name:', 'mymodel.h5') epochs = st.slider("Epochs",1,500, 2) if st.button('begin train') and epochs and batch_size: mdl = buildSimpleModel(len(class_labels)) text_output = st.empty() graph = st.empty() my_bar = st.empty() history = trainSimplaeModel(mdl, epochs, traing_iterater, test_iterater, model_name, text_output, my_bar, graph) his_df = pd.DataFrame(history.history) st.line_chart(his_df) training_mode = "Existing model" st.markdown('## Training completed, plese check the output folder for saved model.') except Exception as ex: st.error(ex)