Files
SumasenTrainer/trainer.py
2021-12-15 09:34:59 +05:30

267 lines
11 KiB
Python

from logging import exception
from keras.backend import constant
from tensorflow import keras
from tensorflow.keras import layers
from keras.preprocessing.image import load_img, ImageDataGenerator, array_to_img, img_to_array
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input, decode_predictions
from tensorflow.keras.models import Model
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import streamlit as st
from tensorflow.python.keras.callbacks import History
from CustomCallback import CustomCallback
import os, shutil
from os import path
import splitfolders
import time
########## Augmentation with VGG19 #############
def tranferLearningVGG(model, train_dir, test_dir):
model = VGG19(include_top=False, input_shape=(224,224,3))
train_datagen = ImageDataGenerator(
rescale=1/255,
horizontal_flip=True,
rotation_range=20,
width_shift_range=0.2,
height_shift_range=0.2
)
test_datagen = ImageDataGenerator(
rescale=1/255,
)
for layer in model.layers:
layer.trainable = False
flatten_layer = layers.Flatten()(model.output)
flatten_fully_connected_layer = layers.Dense(512, activation='relu')(flatten_layer)
flatten_fully_connected_softmax_layer =layers.Dense(5, activation='softmax')(flatten_fully_connected_layer)
model = Model(inputs=model.inputs, outputs=flatten_fully_connected_softmax_layer)
training_iterater = train_datagen.flow_from_directory(train_dir, batch_size=64, target_size=(224,224))
test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(224,224))
model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam')
model.save('models/vgg19_tl.h5')
history = model.fit(training_iterater, validation_data=test_iterater, epochs=4)
return history
############ Model creation ################
def buildSimpleModel(cats):
model = keras.Sequential()
model.add(layers.Conv2D(32, (3,3), activation='relu', input_shape=(100,100,3)))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Conv2D(64, (3,3), activation='relu'))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Conv2D(128, (3,3), activation='relu'))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Flatten())
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dense(cats, activation='softmax'))
return model
###### preapre iterater ########
def prepareIterater(folder_path, batch_size, img_size):
train_datagen = ImageDataGenerator(rescale=1)
test_datagen = ImageDataGenerator(rescale=1)
training_iterater = train_datagen.flow_from_directory(os.path.join(folder_path, 'train'), batch_size=batch_size, target_size=img_size)
test_iterater = test_datagen.flow_from_directory(os.path.join(folder_path, 'test'), batch_size=batch_size, target_size=img_size)
return training_iterater, test_iterater
########## start trining ##############
def trainSimplaeModel(model, epochs, tran_iterater, test_iterater, model_name, text_output, progrss_bar, graph):
# train_datagen = ImageDataGenerator(rescale=1)
# test_datagen = ImageDataGenerator(rescale=1)
# training_iterater = train_datagen.flow_from_directory(tran_dir, batch_size=64, target_size=(100,100))
# test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(100,100))
model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam')
history = model.fit(tran_iterater, validation_data=test_iterater, epochs=epochs, callbacks=[CustomCallback(text_output, progrss_bar, graph)],)
model_path = os.path.join(st.session_state['output_folder'], model_name)
st.session_state['model_folder'] = st.session_state["output_folder"]
st.markdown(f'##### Model output : {model_path}')
#st.write(f'Output folder is : {model_path}')
model.save(model_path)
return history
######### plot #########
def plotHistory(history):
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title("model loss")
plt.ylabel("loss")
plt.xlabel("no of epochs")
plt.legend(['training', 'testing'], loc='upper left')
plt.show()
####### print class labels #######
def printLabels(training_iterater):
class_labels = training_iterater.class_indices
print(class_labels)
####### predict ########
def predict(modelname):
mdl = load_model('models/'+ modelname)
img = load_img('dataset/flowers/sunflower.jpeg', target_size=(100,100))
img = img_to_array(img)
img = img.reshape(1,100,100,3)
res = mdl.predict(img)
print(res)
print(np.argmax(res))
print([key for key in class_labels][np.argmax(res)])
def getModelNames(model_folder):
if model_folder == "":
return []
param = []
length =len([name for name in os.listdir('.') if os.path.isfile(model_folder)])
for entry in os.scandir(model_folder):
if entry.name.lower().endswith('.h5'):
param.append(entry.name)
return param
##### prepare image folders ######
def prepareFolders(folder_path, output_folder):
file_path = st.session_state['output_folder']
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
splitfolders.ratio(folder_path, output=output_folder, seed=1337, ratio=(.8, 0.1,0.1))
except Exception as e:
pass
##### handle change on image input folder #####
def handlerImageFolderChanged():
output_folder = st.session_state["output_folder"]
def handlePrepare():
prepareFolders(raw_image_folder, st.session_state['output_folder'])
def handSelectModel():
st.write('#####')
# if 'model_name' in st.session_state:
# st.write(st.session_state['model_name'])
raw_image_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/raw_data"
#output_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/output"
#model_folder = '/Users/mohamednouffer/workspace/akira_san/image_classifier/models'
#train_folder = os.path.join(output_folder, "train")
#test_folder = os.path.join(output_folder, "test")
st.title("Sumasen AI")
st.sidebar.header("Sumasen Trainer")
st.session_state["image_arranged"] = False
#model_folder = ""
options = ("New", "Existing model")
mode = st.sidebar.empty()
model_empty = st.sidebar.empty()
if "training_mode" not in st.session_state:
st.session_state['training_mode'] = "New"
training_mode = mode.radio("Training mode:", options,0)
st.session_state['training_mode'] = training_mode
if training_mode == "Existing model":
if 'model_folder' in st.session_state:
model_folder = model_empty.text_input("model folder", st.session_state['model_folder'])
if st.sidebar.button("Load models"):
selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder), on_change=handSelectModel, key='model_name')
st.session_state['selected_model'] = selected_model
else:
model_folder = model_empty.text_input("Enter model folder",'')
if st.sidebar.button("Load models"):
selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder))
st.session_state['model_folder'] = model_folder
st.session_state['selected_model'] = selected_model
if 'model_name' in st.session_state:
st.write('Selected models is :', st.session_state['model_name'])
st.subheader("Predict a image")
input_shape = st.selectbox('Select image input shape',[(100,100), [150, 150], (200,200), (240,240), (300, 300)])
file = st.file_uploader("Upload image",['jpeg', 'jpg', 'png'])
if file:
bytes_data = file.getvalue()
with open(os.path.join(st.session_state['model_folder'],file.name),"wb") as f:
f.write(file.getbuffer())
model_file = os.path.join(st.session_state['model_folder'], st.session_state['model_name'])
mdl = load_model(model_file)
img = load_img(os.path.join(st.session_state['model_folder'], file.name), target_size=input_shape)
st.image(img)
img = img_to_array(img)
img = img.reshape(1,input_shape[0],input_shape[1],3)
res = mdl.predict(img)
st.markdown('## Category is {}'.format(np.argmax(res)))
#st.write([key for key in class_labels][np.argmax(res)])
else:
if "output_folder" in st.session_state:
for fls in os.scandir(st.session_state["output_folder"]):
if os.path.isdir(fls.path) and (fls.name == "tran" or fls.name == "test" or fls.name == "val"):
st.info("Images are ready for training")
st.session_state["image_arranged"] = True
selected_image_folder = st.text_input("Enter image folder (make sure only category folders in this directory)", "Enter images folder ...")
output_folder = os.path.join(selected_image_folder, "arranged")
output_folder = st.text_input("Enter a folder to prepare images", output_folder)
input_shape = st.selectbox('Select image input shape',[(100,100), [150, 150], (200,200), (240,240), (300, 300)])
batch_size = st.slider('Batch size', 1, 1000, 40)
if selected_image_folder == "Enter images folder ..." or selected_image_folder == "":
st.error("Need a valid image folder")
else:
try:
st.session_state["selected_image_folder"] = selected_image_folder
st.session_state["output_folder"] = output_folder
arranged = st.button("arrange images", on_click=handlePrepare)
if st.session_state["image_arranged"] == True:
traing_iterater, test_iterater = prepareIterater(st.session_state['output_folder'],batch_size, input_shape)
class_labels = traing_iterater.class_indices
st.write('class labels',class_labels)
model_name = st.text_input('model name:', 'mymodel.h5')
epochs = st.slider("Epochs",1,500, 2)
if st.button('begin train') and epochs and batch_size:
mdl = buildSimpleModel(len(class_labels))
text_output = st.empty()
graph = st.empty()
my_bar = st.empty()
history = trainSimplaeModel(mdl, epochs, traing_iterater, test_iterater, model_name, text_output, my_bar, graph)
his_df = pd.DataFrame(history.history)
st.line_chart(his_df)
training_mode = "Existing model"
st.markdown('## Training completed, plese check the output folder for saved model.')
except Exception as ex:
st.error(ex)