In the previous post, we have walked through the process of fitting a straight line or a polynomial line using linear regression on continues data. But, what if the output we are trying to predict is a discrete value. For example what if we are supposed to model a logical AND gate?!
In this case the modeling is quite a bit different. The underlying mechanics are basically the same as linear regression, but we should change the loss function, and also change the prediction in a way that it shows the probability of each possible output.
Sigmoid Function
As mentioned above, we still use the basics of the linear regression but this time, to be able to limit the predicted value between 0 and 1 which represents the probability of each output, we should pass the value of w*input to a function called Sigmoid or Logistic defined as:
\[Sigmoid(x) = {\frac{1}{1+e^{-x}}}\]
import math
import matplotlib.pyplot as plt
defsigmoid(x):
return 1.0/(1 + math.exp(-x))
x=[x * 0.1 for x inrange(-100, 100)]
y=[sigmoid(x) for x in x]
plt.scatter(x, y)
plt.show()
So let’s modify our linear regressor class and plug in the Sigmoid function in the predict() function as well.
Now, we can check the logistic regressor with arbitrary coefficients and the AND gate truth table as the prediction inputs.
model =LogisticRegressor(coeff=np.array([2.5,2.5,-4]))
x = np.array([[0,0],[0,1],[1,0],[1,1]])
predicted = [model.predict(input) for input in x ]
print(predicted))))
You may think that this result is not actual discrete 0, 1 value and you are right! but as we mentioned before, the output could be considered as the probability of the actual value being 1. Therefore, to work around it we should simply consider predicted values over a threshold to be 1 and values less than the threshold are 0.
threshold = 0.5
predicted = [1 if value > threshold else 0 for value in predicted]
print(predicted)
[0, 0, 0, 1]
Cross-Entropy Loss Function
Just like the linear regression, in order to be able to assess the performance of our model, we need a measure of how good the model in predicting. In binary classification, Cross-Entropy loss function which is also called negative log likelihood does the job for us. This function is defined as below:
Implementing a real-life example of things that I’m trying to comprehend, is the most exciting part of the learning process for me. It gives me a chance to encounter with dark corners and pitfalls. Accordingly, I’m going to briefly write up about a recent project I have done on simple audio classification using machine learning. In this project, I will implement a machine learning model to classify fire alarm, vacuum cleaner, music, and silence sounds.
Data Collection
Every machine learning pipeline starts with a data collection and prepration stage. I have collected audio samples in .WAV format using the Audacity desktop application on the windows with 44.1 khz sample rate. Note that we consider silence as a seperate class. Each sample is roughly 30 seconds. For music, fire alarm, and vacuum cleaner, I recorded samples from online audio. For each class, I collected at least 20 samples, in different sessions with different ambient noise and background to have a more diverse dataset which helps our model to be more generalized and robust.
For loading audio samples, preprocessing, and feature extraction I have used a popular python library called Librosa. It seems that intalling Librosa using pip is problematic due to unknown reasons. Accordingly, I highly recommend to install it using Conda to avoid any inconsistency and hassles on Windows.
import os
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
from scipy.fft import rfft, rfftfreq
import cv2
import pickle
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC
from sklearn.metrics import classification_report, accuracy_score
from sklearn.metrics import ConfusionMatrixDisplay
ROOT_DIR = 'C:/Users/amiri/Desktop/demo/dataset/'
SAMPLING_RATE = 44100 #it's consistent over the entire dataset recordings
defget_all_directories(root_path):
dirs = os.listdir(root_path)
dirs = [dir for dir in dirs if os.path.isdir(root_path+dir)]
return dirs
defget_all_files(path):
files = os.listdir(path)
files = [file for file in files if os.path.isfile(path+file)]
return files
defload_all_audio_files(root_path, duration=30):
files =get_all_files(root_path)
file_samples = []
for file in files:
samples, sampling_rate = librosa.load(root_path+file,
sr=None, mono=True, offset=0.0, duration=duration)
file_samples.append(samples)
return file_samples
dataset = {}
for audio_class inget_all_directories(ROOT_DIR):
dataset[audio_class] =load_all_audio_files(ROOT_DIR + audio_class+'/')
print(f"number of {audio_class} samples: {len(dataset[audio_class])}")
Now let’s visualize the samples and take a look at the audio signals to have an intuition about different audio shapes and variations.
Based on the sound waves visualized above. We may be able to use some time-domain features like: number of zero crossings, mean flatness, maximum amplitude, minimum amplitude, kurtosis and skewness.
Now that we constructed the feature set, we can go ahead and feed them into different classification methods and see if they can correctly classify audio recordings!
But, why the model showed such a poor classification result? First of all, we should take a step back and look at the dataset itself to find a clue.
import pandas as pd
dataframe = pd.DataFrame.from_records(data)
dataframe.describe()
We can clearly see that the features have different value ranges, which could drastically reduce the model accuracy if the machine learning model works based on distance between data points! (which in our case SVM classifier does.)
Accordingly, a StandardScaler is added to the pipeline to normalize data. Let’s see if it can improve the accuracy or not.
Well! We could improve the model accuracy about twenty percent just by normalizing features!
But, the result is still not satisfactory. We can try adding more features, or trying other models to see if we can improve the work. To add more features we can convert the audio samples to frequency-domain and see if we can extract more meaningful information and features. It’s easy to change audio signals to frequency-domain in Librosa and also Scipy libraries using Fast Fourier Transform. It’s basically a method to transform discrete spacial data to a discrete frequency histogram. In other words it reveals the frequencies distribution of the signal.
The figure above, illustrates the frequencies in one randomely chosen sample of each class. We can see each audio sample contains different frequencies which we can be leveraged as a new feature. But, what we do instead, is sliding a window of a fixed size on the original signal, and extract frequency distribution on each window using FFT. Eventualy, by column-wise concatenation of each FFT output, we will have a two-dimentional array of frequencies in time, which is called a Spectrogram. Fortunately, Librosa library made our life easier and provided neat functions to extract and visualize the spectrogram of a signal.
As shown in the figure above, Spectrogram is actually an image containing the changes in frequency over time. Now, we have a rich feature in frequency-domain, we are gonna flatten the 2-D sepctrogram array to a 1-d array and concatenate them to the time-domain features. Also, to reduce the spectrogram size, I have treated the spectrogram as an image and resized it using OpenCv.
data = []
labels = []
num_freq_bins=32
num_time_bins=256
for audio_class in dataset:
for audio_sample in dataset[audio_class]:
time_domain_features =list(get_time_domain_features(audio_sample))
spectro = np.abs(librosa.stft(audio_sample,n_fft=FFT_SIZE,win_length=WINDOW_SIZE))
power_spectro = librosa.amplitude_to_db(spectro,ref=np.max)
binned_power_spectro = cv2.resize(power_spectro[:,:],(num_time_bins,num_freq_bins))
feature_set = np.concatenate([binned_power_spectro.ravel(), time_domain_features])
data.append(feature_set)
labels.append(audio_class)
data = np.array(data)
labels = np.array(labels)
data.shape
Finally, the dataset is fed into the SVM classifier with results shown below.
Till now, we have used 30-second audio samples. But this duration is too much if we are aiming to use the model in a real-time situation like smart-home applications. Accordingly, first of all we have to decrease the sample sizes and try to detect the events in a smaller time window. One way around it is to slide a fixed window size on the original audio samples and create another dataset out of the extracted windows. If we slide the window without any overlap then we can use the same pipeline as we have developed before. Since we may lose some of the event data which lies between two consecutive frames, it’s better to have at least a 50 percent overlap between consecutive frames. But, it’s a bit tricky! Because if we introduce overlapping frames in the dataset, then the evaluation process may shows a bloated result. In other words, the test may contains the frames which have seen before by the model in the training set! Accordingly, it will lead to an unreliable model performance evaluation.
If we had enough data, then we could simply seperate train and test samples before applying the sliding window on them. But here, we utilize a method called BlockingTimeSeriesSplit which is a well-known method for time-series cross-validation split. more details here ,and here.
Ok then, first of all, we need to extract the frames using function defined below.
data = {}
for audio_class in dataset:
class_sub_features = []
for audio_sample in dataset[audio_class]:
sub_samples_data, sub_samples_labels =get_windows(wave_sample = audio_sample,
dataclass = audio_class,
sampling_rate = SAMPLING_RATE
, window_size_seconds=2)
for i inrange(len(sub_samples_data)):
time_domain_features =list(get_time_domain_features(sub_samples_data[i]))
spectro = np.abs(librosa.stft(sub_samples_data[i],n_fft=FFT_SIZE,
win_length=WINDOW_SIZE))
power_spectro = librosa.amplitude_to_db(spectro,ref=np.max)
binned_power_spectro = cv2.resize(power_spectro[:,:],(num_time_bins,num_freq_bins))
time_feauture_set = [time_domain_features,]
freq_feature_set = [binned_power_spectro.ravel(),]
final_feature_set = time_feauture_set + freq_feature_set
feature_set = np.concatenate(final_feature_set)
class_sub_features.append(feature_set)
data[audio_class] = class_sub_features
Feature extraction process is the same as before, but here we added an inner loop to slide a window on each audio sample and extract the features for each frame to build the new dataset.
Then the dataset is splitted using the Blocking TimeSeries Split method. Note that the dataset shouldn’t get shuffled before this process.
Now let’s use the model in a real-time application and see if it works as expected! To do so, first of all we need to save the trained model using the python pickle object serialization library.
Next, we have to read the microphone data using pyaudio python library by instantiating a PyAudio object and open it with same sampling frequency we recorded the sample audio with.
Note that the pyaudio buffers up the samples from the microphone and returns them whenever it reached the threshold parameter frames_per_buffer as a whole.
Now we can read the last chunk of data in a loop. Note that this method of reading input data which is called Blocking, is deprecated in real-world implementation since if the processing happens in a same thread as reading, it will create a lag between each read and eventauly will led to missing parts of the actual audio. To work around this issue, data should be read in a seperate thread and process in a different thread (typical producer/consumer problem). But for the sake of simplicity we don’t get into that here.
try:
while True:
# reading and converting pyaudio input to np array
raw_data_chunk = np.fromstring(input_audio_stream.read(CHUNK_SIZE),dtype=np.float32)
cv2.waitKey(1)
except KeyboardInterrupt:
input_audio_stream.stop_stream()
input_audio_stream.close()
You might think if we add the machine learning model after reading audio, we should be done! In fact it will somehow work, but the problem is data is read every 2 seconds and discarded after we have processed it! So, what if an event happened between two consecutive audio signal chunks?
To solve this issue we have to decrease the input chunk size and buffer the input data, just like we did in training phase. Then we can slide a fixed-size window on the buffer and feed the window content to the machine learning pipeline.
To do so, I have written a class name WindowedBuffer as below:
Basically, It’s a fixed-size buffer that you can add a chunk a data to its head and it will remove the same amount from the tail. Also, by calling the GetWindow function you will get the window content with the size and location defined in the object constructor.
So unitl here, we could read raw data, append it to the buffer, get the buffer window and extract features from it. All we need to plugging in the trained model and use it for classification.
Note that I have created a three-row plot for visualization purpose. The first two rows are the window signal and window signal spectrogram respectively. The third row is left empty as a spaceholder for showing the classification result. Then the matplot figure is converted to OpenCv image using the following lines and eventually result is printed on the image using opencv puttext function.
Note: In the entire process I just SVM model with default parameters (C=1.0, kernel=’rbf’, degree=3), but generally it’s a good practice to try several different machine learning models and choose the one which showed better results in terms of accuracy and generalization performance.
I have watched a lot of tutorials and passed a bunch of courses about machine learning and AI before, but other than utilizing those methods on class assignments and a few of them for my master’s thesis, I didn’t have the chance to use machine learning and specifically deep learning in a long-term practical project. Fortunately, due to the need, I have in my research, I have started to study machine learning methods from scratch and this time more in-depth. According to a famous quote by Albert Einstein
“You do not really understand something unless you can explain it to your grandmother.”
It is quite important to be able to transfer your knowledge to others using plain and easily understandable descriptions which also helps to solidify your comprehension of the topic. So, I have decided to start a series of blog posts to build common machine learning algorithms from scratch in order to clarify these methods for myself and make sure I correctly understand the mechanics behind each of them.
Let me be clear from the very beginning: It’s all about fitting a function!
Consider we have a dataset containing the number of trucks crossing the border from Mexico to the U.S. through Ottay Messa port. Note that it’s just a subset of the entire inbound crossings dataset available on Kaggle. First of all, it’s always better to plot the data which may help us have some insight. To load the data from a .CSV file, we are going to use Pandas which is a well-known data analysis/manipulation python library. We can then plot the data using Matplotlib (another python library for data visualization).
Note that in the original data, each value is corresponding to a month, so I mapped the date intervals into an integer representation.
What we are observing here obviously is not an exact linear function, but for the sake of simplicity, we can model border crossings using a linear function! As we already know, the equation of a line is a below:
\[f(x) = mx + c\]
where m stands for the slope and c is the intercept on the y-axis. But we can have an infinite number of possible values for these parameters. Let’s look at some possible arbitrary lines with values [m=70,c=40000], [m=100,c=40000], [m=140,c=40000] represented with orange, green, and red colors respectively.
But, What are the parameter values we are choosing for our linear equation to properly fit our data points?
Analytical Approach to find the regression parameters
To find the proper fit for our data, basically, we have to minimize the average distance of the data points to our arbitrary line. In other words, we are finding the difference between the predicted value with the actual training data.
there are two ways to calculate the error:
Mean Squared Error (MSE): which considers the squared difference of the values.
Mean Absolute Error (MAE): which considers the absolute difference of the values.
Note that we need to sum up these error values as positive numbers. Otherwise negative values will compensate for positive ones which will make our optimization problem impossible.
Our objective is to find our linear equation parameters m and c to minimize average error over all training set data (known as the cost function) defined below:
where n is the number of training examples. Now let’s explore the parameters space and plot the cost function to see what it looks like. for the MSE cost function, we have the parameters space-cost plot below
# visualize cost function
defline_equation(m,c,x):
return (m*x)+ c
defcost_function(m,c,training_examples_x,training_examples_y):
sum_of_errors = 0
item_index = 0
for example in training_examples_x:
predicted_y =line_equation(m,c,item_index)
sum_of_errors += (predicted_y - training_examples_y[item_index])**2
#sum_of_errors += abs(predicted_y - training_examples_y[item_index])
item_index+=1
mse = (sum_of_errors /len(training_examples_x))
return mse
fig = plt.figure()
fig.set_size_inches(8, 6)
ax = fig.add_subplot(projection='3d')
cost_func_x_points = []
cost_func_y_points = []
cost_func_z_points = []
for m in np.arange(-200,500,10):
for c in np.arange(-10000,60000,200):
cost =cost_function(m,c,x_training_set,y_training_set)
cost_func_x_points.append(m)
cost_func_y_points.append(c)
cost_func_z_points.append(cost)
ax.scatter(cost_func_x_points, cost_func_y_points,
cost_func_z_points,c=cost_func_z_points,marker='.')
ax.set_xlabel('M')
ax.set_ylabel('C')
ax.set_zlabel('Cost')
plt.show()
and for the MAE we have the plot below:
As you can see both these cost functions are convex and they just have one minimum point at the bottom of the slope(global minima). Then based on calculus, it means that if we could find the point that the derivative of this function is zero, we have found the optimal parameters for our model. We can simply use the equation below to find the parameters.
\[theta = (X^T.X)^-1.(X^T.Y)\]
where X is the training features sample vector. Y is the output vector. and the result(theta) are the parameters for our regression model.
This equation is called a Normal Equation and you can find the math behind it here.
So let’s run it on our dataset and see how it works.
Now that we have fit a line on our data, are we done? Nope!
We need to evaluate the model utilizing different metrics and plots to make sure the model we proposed would generalize well on new data.
To evaluate a model there are several metrics we can use.
1- MSE (Mean Squared Error)
\[MSE = {\frac{1}{n}\sum_{i=1}^{n} (y – y_p)^2}\]
2- MAE (Mean Absolute Error)
\[MAE = {\frac{1}{n}\sum_{i=1}^{n} (|y – y_p|)}\]
These two metrics mentioned above, have the exact same definition we had in defining the cost functions. The main difference between these two is MSE penalizes prediction errors heavily in comparison to MAE. Generally, we want these scores to be as close as possible to zero.
mse_value = 0
mae_value = 0
m = theta_list[1]
c = theta_list[0]
for sample_x inrange(0,number_of_training_examples):
predicted_y = m * sample_x + c
sample_y =int(y_training_set[sample_x])
mse_value += (predicted_y - sample_y)**2
mae_value +=abs(predicted_y - sample_y)
print(f"Mean Squared Error: {mse_value//number_of_training_examples}\n")
print(f"Mean Absolute Error: {mae_value//number_of_training_examples}\n")
3- R-Squared Score
In terms of regression evaluation, the R-Square score or R2 score is one of the most useful ones. It indicates how much variance of the data is explained by the model. In other words, it indicates how close is the data point to the fitted line. R2 score is defined as below:
\[R2 = {\frac{ModelVariance}{TotalVariance}}\]
\[= {1 – \frac{Sum Of Squared Regression(SSR)}{Total Sum Of Squares(SST)}}\]
\[= {1 – \frac{\sum_{i=1}^{n} (y – y_p)^2}{\sum_{i=1}^{n} (y – y_m)^2}}\]
R2 score must be a number between 0 and 1. In regression modeling, we aim to maximize the R2 score toward 1 as much as possible. Note that if the R2 score is a negative value, it’s indicating that something is wrong with the modeling or implementations.
4- Residual Plot
Also, it’s crucial to plot residuals to see if linear regression is a good choice to model our data. In regression, residual is the difference between the actual value and the predicted value. Residual points have to distribute equally along the horizontal axis. Otherwise, the model is not performing well and probably it’s not reliable enough.
\[residuals = y – y_p\]
In the figure above, except at the beginning and the end of the plot, in which residuals have greater values, residuals are somehow randomly distributed. Generally, for a good linear regression model, data points in this plot must be as close as possible to the horizontal axis, and also they have to be uniformly distributed along the plot.
Code Refactoring
although what we have done till here is working, but it’s quite a mess in terms of performance and cleanness. Accordingly, I’m gonna wrap the implementation in a class called LinearRegressor as follows.
import numpy as np
classLinearRegressor:
"""Simple linear regression model"""
def__init__(self, loss='MSE'):
self.coeff = []
defMSE(y_predicted, y_target):
return ((y_target-y_predicted)**2).mean()
defMAE(y_predicted, y_target):
return (abs(y_target-y_predicted)).mean()
if loss == 'MSE': self.calculate_loss = MSE
elif loss == 'MAE': self.calculate_loss = MAE
else: self.calculate_loss = MSE
def__add_bias(self, x):
one = np.ones((len(x),1))
return np.append(one, x, axis=1)
deffit(self, x, y):
x = self.__add_bias(x)
self.coeff = np.linalg.inv(x.T.dot(x)).dot(x.T).dot(y)
defpredict(self, x):
x = self.__add_bias(x)
return x.dot(self.coeff)
Test for simple regression
# simple linear regression
x = np.array([1,2,3,4,5,6]).reshape(-1, 1)
y = np.array([2,4,6,8,14,12])
model =LinearRegressor(loss="MAE")
model.fit(x,y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Predicted values: {predicted_y}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')
Great! Note that to be able to multiply a 1-d NumPy array with another array we need to use reshape() function, or change the array to an array of 1×1 arrays!
Also, to consider biases in this class, we have to add a column with value of 1 to each datapoint in the dataset which is done in both fit() and predict() function by calling the __add_bias() function.
Let’s run it on the dataset we prepared in beginning of the post and see if it leads to the same result.
import pandas as pd
import numpy as np
from linearregressor import LinearRegressor
df = pd.read_csv('./regression/dataset.csv')
x_raw = pd.to_datetime(df.Date, infer_datetime_format=True).to_numpy()
y = df.Value.to_numpy()
x = np.arange(0,len(x_raw),1)
model =LinearRegressor()
model.fit(x.reshape(-1, 1),y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')
Based on the visualizations we had in the very first steps, we could clearly see that what we are dealing with is not a linear function. Accordingly, we have to try a more flexible function to fit to our data. To do so, what we can do is adding polynomial features created from the original features.
x = np.arange(0,len(x_raw),1).reshape(-1, 1)
x2 = (x)**(2)
x = np.append(x2, x , axis=1)
print(x.shape)
(279, 2)
Then feed the new feature set to the linear regressor model.
model =LinearRegressor()
model.fit(x,y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')