Recently, I was doing a project with relatively high volume of data. Which made me curious about the possibility of using Spark to process data in a distributed setup. Accordingly, I decided to setup Spark and get my hands dirty with PySpark (python interface for Spark). In this post, I will delve into the steps to set up Spark and the basic concepts of MapReduce paradigm which is a model introduced by Google to process large distributed datasets in an efficient scalable parallel way.
Setting Up Apache Spark on Windows
First of all we should install the requirements including Python, Conda, JDK (Java Development Kit).
Create a new environment variable in system settings with JAVA_HOME as name, and provide the JDK installation address (in my case: JAVA_HOME=C:\Program Files\Java\jdk-19) as the value.
Download the latest Spark binaries along with the Winutils and extract them in a corresponding folder.
Create an environment variable as SPARK_HOME = “Spark binaries address”. In my case it is SPARK_HOME = “C:\bigdatalocal\spark”
Create an environment variable as HADOOP_HOME = “Winutils binary address”. example : HADOOP_HOME = “C:\bigdatalocal\hadoop”. Note that you should create a folder named bin and copy winutils.exe in it.
Then “%SPARK_HOME%\bin” and “%HADOOP_HOME%\bin” to the path.
Now, to confirm installation, open command prompt and enter spark-shell the following result should be shown:
C:\Windows\System32>spark-shell
Spark context Web UI available at http://host.docker.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1677192682852).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
To be able to run PySpark code in a Jupyter notebook, we need to take some extra steps.
First, create a new Conda environment and install the necessary python packages.
Note that to avoid weird errors and exceptions, it’s better running all the commands in Anaconda Command Prompt with administer permissions. Also, it’s better to check the PySpark library and choose the proper python version to create the environment with. Otherwise, you might end up struggling with an error indication the python version running the Jupyter notebook is not the same as the PySpark version.
Next, we should install PySpark, Py4J, Notebook, and FindSpark by running the following commands.
Next, we should create another environment variable PYSPARK_PYTHON and refer it to the created Conda environment’s python binary like PYSPARK_PYTHON=C:\ProgramData\Anaconda3\envs\py39\python.exe
Now, if you open command prompt and enter pyspark the following result will be shown:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Python version 3.9.16 (main, Jan 11 2023 16:16:36)
Spark context Web UI available at http://host.docker.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1677191027145).
SparkSession available as 'spark'.
The last step is to add following two environment variables which runs the PySpark bundled with Jupyter notebook on port 4050.
Finally, running the pyspark command will result in running a jupyter notebook on indicated port 4050.
(py39) C:\Users\amiri>pyspark
[W 18:37:58.491 NotebookApp] Loading JupyterLab as a classic notebook (v6) extension.
[I 18:37:58.494 NotebookApp] Serving notebooks from local directory: C:\Users\amiri
[I 18:37:58.494 NotebookApp] Jupyter Notebook 6.5.2 is running at:
[I 18:37:58.494 NotebookApp] http://localhost:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
[I 18:37:58.495 NotebookApp] or http://127.0.0.1:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
Or copy and paste one of these URLs:
http://localhost:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
or http://127.0.0.1:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
Open the provided URL in your browser and run the following lines of code:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
data = sc.parallelize(range(10))
print(data.collect())
In the previous post, we have walked through the process of fitting a straight line or a polynomial line using linear regression on continues data. But, what if the output we are trying to predict is a discrete value. For example what if we are supposed to model a logical AND gate?!
In this case the modeling is quite a bit different. The underlying mechanics are basically the same as linear regression, but we should change the loss function, and also change the prediction in a way that it shows the probability of each possible output.
Sigmoid Function
As mentioned above, we still use the basics of the linear regression but this time, to be able to limit the predicted value between 0 and 1 which represents the probability of each output, we should pass the value of w*input to a function called Sigmoid or Logistic defined as:
\[Sigmoid(x) = {\frac{1}{1+e^{-x}}}\]
import math
import matplotlib.pyplot as plt
defsigmoid(x):
return 1.0/(1 + math.exp(-x))
x=[x * 0.1 for x inrange(-100, 100)]
y=[sigmoid(x) for x in x]
plt.scatter(x, y)
plt.show()
So let’s modify our linear regressor class and plug in the Sigmoid function in the predict() function as well.
Now, we can check the logistic regressor with arbitrary coefficients and the AND gate truth table as the prediction inputs.
model =LogisticRegressor(coeff=np.array([2.5,2.5,-4]))
x = np.array([[0,0],[0,1],[1,0],[1,1]])
predicted = [model.predict(input) for input in x ]
print(predicted))))
You may think that this result is not actual discrete 0, 1 value and you are right! but as we mentioned before, the output could be considered as the probability of the actual value being 1. Therefore, to work around it we should simply consider predicted values over a threshold to be 1 and values less than the threshold are 0.
threshold = 0.5
predicted = [1 if value > threshold else 0 for value in predicted]
print(predicted)
[0, 0, 0, 1]
Cross-Entropy Loss Function
Just like the linear regression, in order to be able to assess the performance of our model, we need a measure of how good the model in predicting. In binary classification, Cross-Entropy loss function which is also called negative log likelihood does the job for us. This function is defined as below:
During the past few weeks I was working on a project to detect an object from two or more cameras mounted in different perspectives. The initial intuitive idea is to save images with a timestamp and then match the images with a time interval of less than a defined threshold.
Accordingly, I have written a piece of code which simply captures images from camera using OpenCv python library an save them in a specified directory as follows.
At first we have to create an environment and install opencv-python lib:
pip install opencv-python
Then a video capture is created with the camera index=0 and the desired input image size is defined camera object set function. Note if you have multiple cameras you should specify the the index for each one.
import cv2 as cv
from datetime import datetime
camera_id = 0
camera = cv.VideoCapture(camera_id)
camera.set(cv.CAP_PROP_FRAME_WIDTH, 1280)
camera.set(cv.CAP_PROP_FRAME_HEIGHT, 720)
Then, all we need is calling the opencv read() function in a loop and save the captured image in the specified directory.
directory = "./images/"
while True:
result, image = camera.read()
if result:
timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S.%f')[:-3]
cv.imwrite(f"{directory}{timestamp}.jpg", image)
cv.waitKey(1)
To measure the recording FPS we can also use the cv.getTickCount() function to get the current CPU tick and call it again at the buttom of the loop and find the time interval and then devide the interval by the CPU frequency to get the time interval in seconds. Finally divide the number of elapsed seconds by the number of frames captured to have the actual FPS. The final code is as follows:
Sounds good! But due to an unknown reason the recording speed did not exceed 12 frames per second, which does not seem to be right. Therefor, I decided to dig deeper to find a way to increase opencv python low recording FPS. At first I was spectical about the opencv python wrapper itself. I mean it’s python, and it is well-established that it is not as fast as C++ (However it is using native opencv library implementation in the underlying layers).
By the way, I decided to try FFMPEG, which is the swiss army knife of the multimedia related operations, to record images with time-stamps and also verify if the problem is related to the opencv python not anything else.
Recoding Time-Stamped Images with FFMPEG
At first, download the FFMPEG command line tool excutable, and add its address to the enviroment variables to make it easily accessible.
Then use the command below to have a list of connected devices:
ffmpeg -list_devices true -f dshow -i dummy
FFMPEG, based on the operating system it runs on, offers its users several different libraries for hanling different operations. One of them which is deprecated on windows based on the documentation is the vfw library. We can record videos using this library as below:
ffmpeg -y -f vfwcap -r 30 -i 0 out.mp4
This command will record an .mp4 file from the camera with id=0 with 30 frames per second. As shown in figure below the maximum FPS it could reach was about 15 frames per second which is almost the same as OpenCv.
The second and updated version is to say FFMPEG to use Microsoft built-in library for media communications called DirectShow.
This command is telling FFMPEG to record a 30 seconds 1280×720 video from the device with name “Integrated Webcam” using the DirectShow library.
Recording video using DirectShow increased fps to roughly 30 frames per second. It seems that FFMPEG coupled with DirectShow is fast enough in recording video streams.
Great! Now, we should try to save time-stamped images to be able to perform the image synchronization process. It is possible to record images with FFMPEG using the following command:
With this command FFMPEG will record images and save them as digital number file names which is not useful in our image synchronizing case.
To save images with timestamp file name, FFMPEG provides the STRFTIME library. We just need to add the arugument -strftime 1 in the command followed by the datetime format string.
Unfortunately, STRFTIME library does not provide milliseconds precision which is not possible to be used in our project.
Fixing OpenCv-Python Low Recording Rate
At least, we have a clue! We have seen that leveraging DirectShow library would help us increase the recording rate with FFMPEG. After looking OpenCv documention, I have found that it is possible to use different libraries(so called backends) for I/O operations. The problem is, OpenCv is not contained DirectShow by default which means that we have to compile it from the source code with DirectShow included to be able to use it as the backend.
Ok then, let’s compile OpenCv form source code. I will briefly go through the steps:
Make sure you have Visual Studio Community and CMAKE installed.
Launch cmake-Gui and provide it the source code folder path and the build destination path.
press configure and select the visual studio version(the one you just installed) you wish the project generate for.
go through the list of red checkboxes and make sure check whatever feature you need. In our case we have to check OPENCV_PYTHON3_VERSION, BUILD_opencv_python3 and WITH_DSHOW flags.
It will automatically find python binaries if you already installed them, otherwise make sure you have provided python binary files. Note that you can download and provide the contrib extra modules if you need any of those libraries and functionalies.
press generate and wait for the OpenCV visual studio solution to get generated.
press open project or open the .sln file generated in the destination path.
In the opened solution select build type as release , open CmakeTargets, right click on ALL-BUILD. (Recharge with a coffee since it might take a while to build)
right click on INSTALL and built it too.
opencv-python should be installed automatically. Otherwise you can find the opencv-python library with .pyd extension in the build path /lib/python3 directory and put a copy of it inside your python environment packages directory.
Python Offline Image Synchronization
Let’s see if we can gain higher FPS with capturing time-stamped images.
Now let’s get back to the main objective, capturing time-stamped images! The code provided above is naive and it was only for testing the output FPS. Therefor, I have added the functionalities we need and wrapped up the entire code in a class for ease of use and further development.
First of all, note that this class is supposed to be general class for camera operations, including capturing timestamped image. To avoid any delay or performance issues while recording with several cameras, each camera I/O should run in a seperate thread. Accordingly, the Camera class is derived from the thread class. To have some info about the actual FPS I have implemented two functions to capture a few frames and calcluate FPS both in recording mode and only capturing mode. This info could be used when we are directly recording video files. The construcor function provides several options as follows.
I have watched a lot of tutorials and passed a bunch of courses about machine learning and AI before, but other than utilizing those methods on class assignments and a few of them for my master’s thesis, I didn’t have the chance to use machine learning and specifically deep learning in a long-term practical project. Fortunately, due to the need, I have in my research, I have started to study machine learning methods from scratch and this time more in-depth. According to a famous quote by Albert Einstein
“You do not really understand something unless you can explain it to your grandmother.”
It is quite important to be able to transfer your knowledge to others using plain and easily understandable descriptions which also helps to solidify your comprehension of the topic. So, I have decided to start a series of blog posts to build common machine learning algorithms from scratch in order to clarify these methods for myself and make sure I correctly understand the mechanics behind each of them.
Let me be clear from the very beginning: It’s all about fitting a function!
Consider we have a dataset containing the number of trucks crossing the border from Mexico to the U.S. through Ottay Messa port. Note that it’s just a subset of the entire inbound crossings dataset available on Kaggle. First of all, it’s always better to plot the data which may help us have some insight. To load the data from a .CSV file, we are going to use Pandas which is a well-known data analysis/manipulation python library. We can then plot the data using Matplotlib (another python library for data visualization).
Note that in the original data, each value is corresponding to a month, so I mapped the date intervals into an integer representation.
What we are observing here obviously is not an exact linear function, but for the sake of simplicity, we can model border crossings using a linear function! As we already know, the equation of a line is a below:
\[f(x) = mx + c\]
where m stands for the slope and c is the intercept on the y-axis. But we can have an infinite number of possible values for these parameters. Let’s look at some possible arbitrary lines with values [m=70,c=40000], [m=100,c=40000], [m=140,c=40000] represented with orange, green, and red colors respectively.
But, What are the parameter values we are choosing for our linear equation to properly fit our data points?
Analytical Approach to find the regression parameters
To find the proper fit for our data, basically, we have to minimize the average distance of the data points to our arbitrary line. In other words, we are finding the difference between the predicted value with the actual training data.
there are two ways to calculate the error:
Mean Squared Error (MSE): which considers the squared difference of the values.
Mean Absolute Error (MAE): which considers the absolute difference of the values.
Note that we need to sum up these error values as positive numbers. Otherwise negative values will compensate for positive ones which will make our optimization problem impossible.
Our objective is to find our linear equation parameters m and c to minimize average error over all training set data (known as the cost function) defined below:
where n is the number of training examples. Now let’s explore the parameters space and plot the cost function to see what it looks like. for the MSE cost function, we have the parameters space-cost plot below
# visualize cost function
defline_equation(m,c,x):
return (m*x)+ c
defcost_function(m,c,training_examples_x,training_examples_y):
sum_of_errors = 0
item_index = 0
for example in training_examples_x:
predicted_y =line_equation(m,c,item_index)
sum_of_errors += (predicted_y - training_examples_y[item_index])**2
#sum_of_errors += abs(predicted_y - training_examples_y[item_index])
item_index+=1
mse = (sum_of_errors /len(training_examples_x))
return mse
fig = plt.figure()
fig.set_size_inches(8, 6)
ax = fig.add_subplot(projection='3d')
cost_func_x_points = []
cost_func_y_points = []
cost_func_z_points = []
for m in np.arange(-200,500,10):
for c in np.arange(-10000,60000,200):
cost =cost_function(m,c,x_training_set,y_training_set)
cost_func_x_points.append(m)
cost_func_y_points.append(c)
cost_func_z_points.append(cost)
ax.scatter(cost_func_x_points, cost_func_y_points,
cost_func_z_points,c=cost_func_z_points,marker='.')
ax.set_xlabel('M')
ax.set_ylabel('C')
ax.set_zlabel('Cost')
plt.show()
and for the MAE we have the plot below:
As you can see both these cost functions are convex and they just have one minimum point at the bottom of the slope(global minima). Then based on calculus, it means that if we could find the point that the derivative of this function is zero, we have found the optimal parameters for our model. We can simply use the equation below to find the parameters.
\[theta = (X^T.X)^-1.(X^T.Y)\]
where X is the training features sample vector. Y is the output vector. and the result(theta) are the parameters for our regression model.
This equation is called a Normal Equation and you can find the math behind it here.
So let’s run it on our dataset and see how it works.
Now that we have fit a line on our data, are we done? Nope!
We need to evaluate the model utilizing different metrics and plots to make sure the model we proposed would generalize well on new data.
To evaluate a model there are several metrics we can use.
1- MSE (Mean Squared Error)
\[MSE = {\frac{1}{n}\sum_{i=1}^{n} (y – y_p)^2}\]
2- MAE (Mean Absolute Error)
\[MAE = {\frac{1}{n}\sum_{i=1}^{n} (|y – y_p|)}\]
These two metrics mentioned above, have the exact same definition we had in defining the cost functions. The main difference between these two is MSE penalizes prediction errors heavily in comparison to MAE. Generally, we want these scores to be as close as possible to zero.
mse_value = 0
mae_value = 0
m = theta_list[1]
c = theta_list[0]
for sample_x inrange(0,number_of_training_examples):
predicted_y = m * sample_x + c
sample_y =int(y_training_set[sample_x])
mse_value += (predicted_y - sample_y)**2
mae_value +=abs(predicted_y - sample_y)
print(f"Mean Squared Error: {mse_value//number_of_training_examples}\n")
print(f"Mean Absolute Error: {mae_value//number_of_training_examples}\n")
3- R-Squared Score
In terms of regression evaluation, the R-Square score or R2 score is one of the most useful ones. It indicates how much variance of the data is explained by the model. In other words, it indicates how close is the data point to the fitted line. R2 score is defined as below:
\[R2 = {\frac{ModelVariance}{TotalVariance}}\]
\[= {1 – \frac{Sum Of Squared Regression(SSR)}{Total Sum Of Squares(SST)}}\]
\[= {1 – \frac{\sum_{i=1}^{n} (y – y_p)^2}{\sum_{i=1}^{n} (y – y_m)^2}}\]
R2 score must be a number between 0 and 1. In regression modeling, we aim to maximize the R2 score toward 1 as much as possible. Note that if the R2 score is a negative value, it’s indicating that something is wrong with the modeling or implementations.
4- Residual Plot
Also, it’s crucial to plot residuals to see if linear regression is a good choice to model our data. In regression, residual is the difference between the actual value and the predicted value. Residual points have to distribute equally along the horizontal axis. Otherwise, the model is not performing well and probably it’s not reliable enough.
\[residuals = y – y_p\]
In the figure above, except at the beginning and the end of the plot, in which residuals have greater values, residuals are somehow randomly distributed. Generally, for a good linear regression model, data points in this plot must be as close as possible to the horizontal axis, and also they have to be uniformly distributed along the plot.
Code Refactoring
although what we have done till here is working, but it’s quite a mess in terms of performance and cleanness. Accordingly, I’m gonna wrap the implementation in a class called LinearRegressor as follows.
import numpy as np
classLinearRegressor:
"""Simple linear regression model"""
def__init__(self, loss='MSE'):
self.coeff = []
defMSE(y_predicted, y_target):
return ((y_target-y_predicted)**2).mean()
defMAE(y_predicted, y_target):
return (abs(y_target-y_predicted)).mean()
if loss == 'MSE': self.calculate_loss = MSE
elif loss == 'MAE': self.calculate_loss = MAE
else: self.calculate_loss = MSE
def__add_bias(self, x):
one = np.ones((len(x),1))
return np.append(one, x, axis=1)
deffit(self, x, y):
x = self.__add_bias(x)
self.coeff = np.linalg.inv(x.T.dot(x)).dot(x.T).dot(y)
defpredict(self, x):
x = self.__add_bias(x)
return x.dot(self.coeff)
Test for simple regression
# simple linear regression
x = np.array([1,2,3,4,5,6]).reshape(-1, 1)
y = np.array([2,4,6,8,14,12])
model =LinearRegressor(loss="MAE")
model.fit(x,y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Predicted values: {predicted_y}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')
Great! Note that to be able to multiply a 1-d NumPy array with another array we need to use reshape() function, or change the array to an array of 1×1 arrays!
Also, to consider biases in this class, we have to add a column with value of 1 to each datapoint in the dataset which is done in both fit() and predict() function by calling the __add_bias() function.
Let’s run it on the dataset we prepared in beginning of the post and see if it leads to the same result.
import pandas as pd
import numpy as np
from linearregressor import LinearRegressor
df = pd.read_csv('./regression/dataset.csv')
x_raw = pd.to_datetime(df.Date, infer_datetime_format=True).to_numpy()
y = df.Value.to_numpy()
x = np.arange(0,len(x_raw),1)
model =LinearRegressor()
model.fit(x.reshape(-1, 1),y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')
Based on the visualizations we had in the very first steps, we could clearly see that what we are dealing with is not a linear function. Accordingly, we have to try a more flexible function to fit to our data. To do so, what we can do is adding polynomial features created from the original features.
x = np.arange(0,len(x_raw),1).reshape(-1, 1)
x2 = (x)**(2)
x = np.append(x2, x , axis=1)
print(x.shape)
(279, 2)
Then feed the new feature set to the linear regressor model.
model =LinearRegressor()
model.fit(x,y)
predicted_y = model.predict(x)
print(f'Coefficients: {model.coeff}')
print(f'Loss: {model.calculate_loss(predicted_y,y)}')