264 lines
10 KiB
Python
264 lines
10 KiB
Python
from logging import exception
|
|
from keras.backend import constant
|
|
from tensorflow import keras
|
|
from tensorflow.keras import layers
|
|
from keras.preprocessing.image import load_img, ImageDataGenerator, array_to_img, img_to_array
|
|
from tensorflow.keras.models import load_model
|
|
from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input, decode_predictions
|
|
from tensorflow.keras.models import Model
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import pandas as pd
|
|
import streamlit as st
|
|
from tensorflow.python.keras.callbacks import History
|
|
from CustomCallback import CustomCallback
|
|
import os, shutil
|
|
from os import path
|
|
import splitfolders
|
|
import time
|
|
|
|
|
|
|
|
########## Augmentation with VGG19 #############
|
|
def tranferLearningVGG(model, train_dir, test_dir):
|
|
model = VGG19(include_top=False, input_shape=(224,224,3))
|
|
|
|
train_datagen = ImageDataGenerator(
|
|
rescale=1/255,
|
|
horizontal_flip=True,
|
|
rotation_range=20,
|
|
width_shift_range=0.2,
|
|
height_shift_range=0.2
|
|
)
|
|
test_datagen = ImageDataGenerator(
|
|
rescale=1/255,
|
|
)
|
|
|
|
for layer in model.layers:
|
|
layer.trainable = False
|
|
flatten_layer = layers.Flatten()(model.output)
|
|
|
|
flatten_fully_connected_layer = layers.Dense(512, activation='relu')(flatten_layer)
|
|
flatten_fully_connected_softmax_layer =layers.Dense(5, activation='softmax')(flatten_fully_connected_layer)
|
|
model = Model(inputs=model.inputs, outputs=flatten_fully_connected_softmax_layer)
|
|
|
|
training_iterater = train_datagen.flow_from_directory(train_dir, batch_size=64, target_size=(224,224))
|
|
test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(224,224))
|
|
model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam')
|
|
model.save('models/vgg19_tl.h5')
|
|
history = model.fit(training_iterater, validation_data=test_iterater, epochs=4)
|
|
return history
|
|
|
|
|
|
############ Model creation ################
|
|
def buildSimpleModel():
|
|
model = keras.Sequential()
|
|
model.add(layers.Conv2D(32, (3,3), activation='relu', input_shape=(100,100,3)))
|
|
model.add(layers.MaxPooling2D((2,2)))
|
|
|
|
model.add(layers.Conv2D(64, (3,3), activation='relu'))
|
|
model.add(layers.MaxPooling2D((2,2)))
|
|
|
|
model.add(layers.Conv2D(128, (3,3), activation='relu'))
|
|
model.add(layers.MaxPooling2D((2,2)))
|
|
|
|
model.add(layers.Flatten())
|
|
|
|
model.add(layers.Dense(512, activation='relu'))
|
|
model.add(layers.Dense(5, activation='softmax'))
|
|
return model
|
|
|
|
###### preapre iterater ########
|
|
def prepareIterater(folder_path, batch_size, img_size):
|
|
train_datagen = ImageDataGenerator(rescale=1)
|
|
test_datagen = ImageDataGenerator(rescale=1)
|
|
training_iterater = train_datagen.flow_from_directory(os.path.join(folder_path, 'train'), batch_size=batch_size, target_size=img_size)
|
|
test_iterater = test_datagen.flow_from_directory(os.path.join(folder_path, 'test'), batch_size=batch_size, target_size=img_size)
|
|
return training_iterater, test_iterater
|
|
|
|
|
|
########## start trining ##############
|
|
def trainSimplaeModel(model, epochs, tran_iterater, test_iterater, model_name, text_output, progrss_bar, graph):
|
|
|
|
# train_datagen = ImageDataGenerator(rescale=1)
|
|
# test_datagen = ImageDataGenerator(rescale=1)
|
|
# training_iterater = train_datagen.flow_from_directory(tran_dir, batch_size=64, target_size=(100,100))
|
|
# test_iterater = test_datagen.flow_from_directory(test_dir, batch_size=64, target_size=(100,100))
|
|
model.compile(loss="categorical_crossentropy", metrics=['accuracy'], optimizer='adam')
|
|
history = model.fit(tran_iterater, validation_data=test_iterater, epochs=epochs, callbacks=[CustomCallback(text_output, progrss_bar, graph)],)
|
|
model_path = os.path.join(st.session_state['output_folder'], model_name)
|
|
st.session_state['model_folder'] = st.session_state["output_folder"]
|
|
#st.write(model_path)
|
|
model.save(model_path)
|
|
return history
|
|
|
|
|
|
######### plot #########
|
|
def plotHistory(history):
|
|
plt.plot(history.history['loss'])
|
|
plt.plot(history.history['val_loss'])
|
|
plt.title("model loss")
|
|
plt.ylabel("loss")
|
|
plt.xlabel("no of epochs")
|
|
plt.legend(['training', 'testing'], loc='upper left')
|
|
plt.show()
|
|
|
|
|
|
####### print class labels #######
|
|
def printLabels(training_iterater):
|
|
class_labels = training_iterater.class_indices
|
|
print(class_labels)
|
|
|
|
|
|
####### predict ########
|
|
def predict(modelname):
|
|
mdl = load_model('models/'+ modelname)
|
|
img = load_img('dataset/flowers/sunflower.jpeg', target_size=(100,100))
|
|
img = img_to_array(img)
|
|
img = img.reshape(1,100,100,3)
|
|
res = mdl.predict(img)
|
|
print(res)
|
|
print(np.argmax(res))
|
|
print([key for key in class_labels][np.argmax(res)])
|
|
|
|
def getModelNames(model_folder):
|
|
if model_folder == "":
|
|
return []
|
|
param = []
|
|
length =len([name for name in os.listdir('.') if os.path.isfile(model_folder)])
|
|
for entry in os.scandir(model_folder):
|
|
if entry.name.lower().endswith('.h5'):
|
|
param.append(entry.name)
|
|
return param
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### prepare image folders ######
|
|
def prepareFolders(folder_path, output_folder):
|
|
file_path = st.session_state['output_folder']
|
|
try:
|
|
if os.path.isfile(file_path) or os.path.islink(file_path):
|
|
os.unlink(file_path)
|
|
elif os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
splitfolders.ratio(folder_path, output=output_folder, seed=1337, ratio=(.8, 0.1,0.1))
|
|
except Exception as e:
|
|
pass
|
|
|
|
##### handle change on image input folder #####
|
|
def handlerImageFolderChanged():
|
|
output_folder = st.session_state["output_folder"]
|
|
|
|
def handlePrepare():
|
|
prepareFolders(raw_image_folder, st.session_state['output_folder'])
|
|
|
|
def handSelectModel():
|
|
st.write('#####')
|
|
# if 'model_name' in st.session_state:
|
|
# st.write(st.session_state['model_name'])
|
|
|
|
|
|
raw_image_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/raw_data"
|
|
#output_folder = "/Users/mohamednouffer/workspace/akira_san/image_classifier/dataset/output"
|
|
#model_folder = '/Users/mohamednouffer/workspace/akira_san/image_classifier/models'
|
|
|
|
#train_folder = os.path.join(output_folder, "train")
|
|
#test_folder = os.path.join(output_folder, "test")
|
|
|
|
st.title("Sumasen AI")
|
|
st.sidebar.header("Sumasen Trainer")
|
|
st.session_state["image_arranged"] = False
|
|
#model_folder = ""
|
|
|
|
|
|
|
|
options = ("New", "Existing model")
|
|
|
|
mode = st.sidebar.empty()
|
|
model_empty = st.sidebar.empty()
|
|
|
|
|
|
if "training_mode" not in st.session_state:
|
|
st.session_state['training_mode'] = "New"
|
|
|
|
training_mode = mode.radio("Training mode:", options,0)
|
|
st.session_state['training_mode'] = training_mode
|
|
|
|
if training_mode == "Existing model":
|
|
|
|
if 'model_folder' in st.session_state:
|
|
model_folder = model_empty.text_input("model folder", st.session_state['model_folder'])
|
|
if st.sidebar.button("Load models"):
|
|
selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder), on_change=handSelectModel, key='model_name')
|
|
st.session_state['selected_model'] = selected_model
|
|
else:
|
|
model_folder = model_empty.text_input("Enter model folder",'')
|
|
if st.sidebar.button("Load models"):
|
|
selected_model = st.sidebar.selectbox("Select a Model", getModelNames(model_folder))
|
|
st.session_state['model_folder'] = model_folder
|
|
st.session_state['selected_model'] = selected_model
|
|
|
|
if 'model_name' in st.session_state:
|
|
st.write('Selected models is :', st.session_state['model_name'])
|
|
st.subheader("Predict a image")
|
|
file = st.file_uploader("Upload image",['jpeg', 'jpg', 'png'])
|
|
if file:
|
|
bytes_data = file.getvalue()
|
|
with open(os.path.join(st.session_state['model_folder'],file.name),"wb") as f:
|
|
f.write(file.getbuffer())
|
|
model_file = os.path.join(st.session_state['model_folder'], st.session_state['model_name'])
|
|
mdl = load_model(model_file)
|
|
img = load_img(os.path.join(st.session_state['model_folder'], file.name), target_size=(100,100))
|
|
st.image(img)
|
|
img = img_to_array(img)
|
|
img = img.reshape(1,100,100,3)
|
|
res = mdl.predict(img)
|
|
st.write(np.argmax(res))
|
|
#st.write([key for key in class_labels][np.argmax(res)])
|
|
|
|
else:
|
|
if "output_folder" in st.session_state:
|
|
for fls in os.scandir(st.session_state["output_folder"]):
|
|
if os.path.isdir(fls.path) and (fls.name == "tran" or fls.name == "test" or fls.name == "val"):
|
|
st.info("Images are ready for training")
|
|
st.session_state["image_arranged"] = True
|
|
|
|
|
|
selected_image_folder = st.text_input("Enter image folder (make sure only category folders in this directory)", "Enter images folder ...")
|
|
output_folder = os.path.join(selected_image_folder, "arranged")
|
|
output_folder = st.text_input("Enter a folder to prepare images", output_folder)
|
|
input_shape = st.selectbox('Select image input shape',[(100,100), [150, 150], (200,200), (240,240), (300, 300)])
|
|
batch_size = st.slider('Batch size', 1, 1000, 40)
|
|
if selected_image_folder == "Enter images folder ..." or selected_image_folder == "":
|
|
st.error("Need a valid image folder")
|
|
else:
|
|
try:
|
|
st.session_state["selected_image_folder"] = selected_image_folder
|
|
st.session_state["output_folder"] = output_folder
|
|
arranged = st.button("arrange images", on_click=handlePrepare)
|
|
if st.session_state["image_arranged"] == True:
|
|
traing_iterater, test_iterater = prepareIterater(st.session_state['output_folder'],batch_size, input_shape)
|
|
class_labels = traing_iterater.class_indices
|
|
st.write('class labels',class_labels)
|
|
model_name = st.text_input('model name:', 'mymodel.h5')
|
|
epochs = st.slider("Epochs",1,500, 2)
|
|
if st.button('begin train') and epochs and batch_size:
|
|
mdl = buildSimpleModel()
|
|
text_output = st.empty()
|
|
graph = st.empty()
|
|
my_bar = st.empty()
|
|
history = trainSimplaeModel(mdl, epochs, traing_iterater, test_iterater, model_name, text_output, my_bar, graph)
|
|
his_df = pd.DataFrame(history.history)
|
|
st.line_chart(his_df)
|
|
training_mode = "Existing model"
|
|
|
|
except Exception as ex:
|
|
st.error(ex)
|
|
|
|
|
|
|
|
|