• Data Mining using Spark and Python

    ·

    Recently, I was doing a project with relatively high volume of data. Which made me curious about the possibility of using Spark to process data in a distributed setup. Accordingly, I decided to setup Spark and get my hands dirty with PySpark (python interface for Spark). In this post, I will delve into the steps to set up Spark and the basic concepts of MapReduce paradigm which is a model introduced by Google to process large distributed datasets in an efficient scalable parallel way.

    Setting Up Apache Spark on Windows

    First of all we should install the requirements including PythonCondaJDK (Java Development Kit).

    Create a new environment variable in system settings with JAVA_HOME as name, and provide the JDK installation address (in my case: JAVA_HOME=C:\Program Files\Java\jdk-19) as the value.

    Download the latest Spark binaries along with the Winutils and extract them in a corresponding folder.

    Create an environment variable as SPARK_HOME = “Spark binaries address”. In my case it is SPARK_HOME = “C:\bigdatalocal\spark”

    Create an environment variable as HADOOP_HOME = “Winutils binary address”. example : HADOOP_HOME = “C:\bigdatalocal\hadoop”. Note that you should create a folder named bin and copy winutils.exe in it.

    Then “%SPARK_HOME%\bin” and “%HADOOP_HOME%\bin” to the path.

    Now, to confirm installation, open command prompt and enter spark-shell the following result should be shown:

    C:\Windows\System32>spark-shell
    
    Spark context Web UI available at http://host.docker.internal:4040
    Spark context available as 'sc' (master = local[*], app id = local-1677192682852).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    

    To be able to run PySpark code in a Jupyter notebook, we need to take some extra steps.

    First, create a new Conda environment and install the necessary python packages.

    conda create --name py39 python==3.9
    conda activate py39

    Note that to avoid weird errors and exceptions, it’s better running all the commands in Anaconda Command Prompt with administer permissions. Also, it’s better to check the PySpark library and choose the proper python version to create the environment with. Otherwise, you might end up struggling with an error indication the python version running the Jupyter notebook is not the same as the PySpark version.

    Next, we should install PySpark, Py4J, Notebook, and FindSpark by running the following commands.

    pip install pyspark
    pip install findspark
    pip install py4j
    pip install notebook

    Next, we should create another environment variable PYSPARK_PYTHON and refer it to the created Conda environment’s python binary like PYSPARK_PYTHON=C:\ProgramData\Anaconda3\envs\py39\python.exe

    Now, if you open command prompt and enter pyspark the following result will be shown:

     Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    
    Using Python version 3.9.16 (main, Jan 11 2023 16:16:36)
    Spark context Web UI available at http://host.docker.internal:4040
    Spark context available as 'sc' (master = local[*], app id = local-1677191027145).
    SparkSession available as 'spark'.

    The last step is to add following two environment variables which runs the PySpark bundled with Jupyter notebook on port 4050.

    PYSPARK_DRIVER_PYTHON = jupyter

    PYSPARK_DRIVER_PYTHON_OPTS=notebook –no-browser –port=4050

    Finally, running the pyspark command will result in running a jupyter notebook on indicated port 4050.

    (py39) C:\Users\amiri>pyspark
    [W 18:37:58.491 NotebookApp] Loading JupyterLab as a classic notebook (v6) extension.
    
    [I 18:37:58.494 NotebookApp] Serving notebooks from local directory: C:\Users\amiri
    [I 18:37:58.494 NotebookApp] Jupyter Notebook 6.5.2 is running at:
    [I 18:37:58.494 NotebookApp] http://localhost:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
    [I 18:37:58.495 NotebookApp]  or http://127.0.0.1:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
        Or copy and paste one of these URLs:
            http://localhost:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11
         or http://127.0.0.1:4050/?token=2f203d7e89781e656d1283c0e7a26c9fe45b3ca848468e11

    Open the provided URL in your browser and run the following lines of code:

    from pyspark import SparkContext
    sc = SparkContext.getOrCreate()
    
    data = sc.parallelize(range(10))
    print(data.collect())
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  • Machine Learning From Scratch – Logistic Regression

    ·

    In the previous post, we have walked through the process of fitting a straight line or a polynomial line using linear regression on continues data. But, what if the output we are trying to predict is a discrete value. For example what if we are supposed to model a logical AND gate?!

    In this case the modeling is quite a bit different. The underlying mechanics are basically the same as linear regression, but we should change the loss function, and also change the prediction in a way that it shows the probability of each possible output.

    Sigmoid Function

    As mentioned above, we still use the basics of the linear regression but this time, to be able to limit the predicted value between 0 and 1 which represents the probability of each output, we should pass the value of w*input to a function called Sigmoid or Logistic defined as:

    \[Sigmoid(x) = {\frac{1}{1+e^{-x}}}\]
    import math
    import matplotlib.pyplot as plt
    
    def sigmoid(x):
        return 1.0/(1 + math.exp(-x))
    
    x=[x * 0.1 for x in range(-100, 100)]
    y=[sigmoid(x) for x in x]
    
    plt.scatter(x, y)
    plt.show()

    So let’s modify our linear regressor class and plug in the Sigmoid function in the predict() function as well.

    import numpy as np
    import math
    
    def sigmoid(x):
        return 1/(1 + math.exp(-x))
    
    class LogisticRegressor:
        """Logistic regression model"""
        def __init__(self, coeff=None):
            self.coeff = coeff
    
        def __add_bias(self, x):
            return np.append(x, 1)
    
        def predict(self, x):
            x = self.__add_bias(x)
            return sigmoid(self.coeff.dot(x.T))

    Now, we can check the logistic regressor with arbitrary coefficients and the AND gate truth table as the prediction inputs.

    model = LogisticRegressor(coeff=np.array([2.5,2.5,-4]))
    x = np.array([[0,0],[0,1],[1,0],[1,1]])
    predicted = [model.predict(input) for input in x ]
    print(predicted))))
    [0.01798620996209156, 0.18242552380635635, 0.18242552380635635, 0.7310585786300049]

    You may think that this result is not actual discrete 0, 1 value and you are right! but as we mentioned before, the output could be considered as the probability of the actual value being 1. Therefore, to work around it we should simply consider predicted values over a threshold to be 1 and values less than the threshold are 0.

    threshold = 0.5
    predicted = [1 if value > threshold else 0 for value in predicted]
    print(predicted)
    [0, 0, 0, 1]
    Cross-Entropy Loss Function

    Just like the linear regression, in order to be able to assess the performance of our model, we need a measure of how good the model in predicting. In binary classification, Cross-Entropy loss function which is also called negative log likelihood does the job for us. This function is defined as below:

    \[CrossEntropyLoss(t,p) = {-(t*log(p)+(1-t)*log(1-p))}\]
  • MicroFastApi – A Micro Adventure in Python

    ·

    In the previous post, I covered the very basic implementation of a web API structure using python decorators. But, it was just the beginning of a journey I have started. I decided to challenge myself and gradually develop this library into a real one!

    Accordingly, I will write up the process of design and implementation of features for it.

    MicroPython Threading

    First of all, we have to implement a function running on a separate thread inside the MicroFastApi class to receive HTTP requests and send back responses in the most efficient way possible. However it’s an experimental module, fortunately, MicroPython has a low-level threading module _thread available which saves us a lot of headaches.

    So let’s briefly take a look at a simple example of running a thread in micropython. To create a thread we should call the function start_new_thread() in the _thread module as follows:

    import _thread
    import time
    
    def _http_handler():
        counter=0
        while True:        
            print(counter)
            counter+=1
            time.sleep(0.1) # 100 ms
    
    threadobject = _thread.start_new_thread(_http_handler,())

    Note that the time module provides the function sleep() which receives a floating point as the number of seconds to sleep.

    This is the simplest use case that we are gonna embed inside the MicroFastApi class. Other than that, since the routes dictionary object is accessed in both the main and the dispatcher thread, we are gonna create a lock object using the allocate_lock() function and use it to make sure no race condition is gonna happen. Then we can either use acquire() and release() functions of the lock object explicitly or use python with statement.

    with self.lock:
        self.routes[route] = decorated_get_func

    So the class will become like this:

    import functools #pip install micropython-functools
    import _thread
    import time
    
    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
            self.lock = _thread.allocate_lock()
            self.thread = _thread.start_new_thread(self._http_handler, ())
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info
    
        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
    
                    return ret_val
                with self.lock:
                    self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api
    
        def _http_handler(self):
    
            while True:
    
                with self.lock: # acquire the lock and release it when done.
                    for route in self.routes:
                        print(f"route {route} is mapped to {self.routes[route].__name__}. result:"+ self.routes[route]())
    
                time.sleep(1)

    Handling API Requests

    Now the general idea is listening to a port specified in the constructor, receiving requests in the thread loop, parsing the incoming HTTP requests, calling the corresponding routine defined the in the routes dictionary, and eventually returning the JSON result.

    The most basic way is listening to a TCP connection using the micrpython implementation of the python low-level networking module named socket, and then parsing the receiving packets.

    To do so, we modify the construction function to receive the port number we intend the server to listen to. Next, we create a socket object and bind it to the specified port in the constructor.

    addr = socket.getaddrinfo("0.0.0.0", self.port)[0][-1]
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        
    self.socket.bind(addr)
    self.socket.listen(1)

    Note that passing the “0.0.0.0” IP address to the bind() function means that the server can receive connections from any possible IPv4 addresses. Also, we passed the socket.AF_INET and socket.SOCK_STREAM to the socket() factory function respectively to create an IPV4 socket with a TPC protocol connection. The integer value passed to the listen() function, defines the concurrent connections capacity of the socket.

    After setting up the socket connection, now we can accept() connections in a loop inside the http_handler() function we had before.

    while True:
        try:
            client_socket, client_addr = self.socket.accept()                
            print('client connected from', client_addr)
            testjson = {'userId':'1'}
            response = json.dumps(testjson)
            client_socket.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
            client_socket.send(response)
            client_socket.close()
            print("client closed!")
        except OSError as e:
            print(e)

    The accept() function returns a socket to the client along with its address which should be used to send back the results to the client. In this example, we created a dictionary and converted it to a JSON string, and sent the result through the client socket. If we run the project and click on the server URL provided in the terminal, we can expect the following JSON result in the browser:

    API server running on http://10.0.0.67:80
    client connected from ('10.0.0.30', 55780)
    client closed!
    {"userId": "1"}

    This solution is quite clear, but since the MicroPython _thread module is in experimental development, it’s not reliable. If you run the code above, the socket inside the thread function works just once and after that, it will freeze! I was struggling with this issue for several hours today and eventually found out there is an open issue in MicroPython GitHub for this problem. Let’s cross our fingers to see a bug fix in a near future.

    To be able to continue this project, I commented out the threading-related code and added a function called Run(). Within that, called the _http_handler() function which contains the main loop.

    Now, we have a very basic working structure for the providing WebApi on the RasbperyPi Pico, let’s get to next step, which is developing a simple HTTP request parser to be used for our goal.

    A Simple HTTP Request Parser

    First of all we should take a look at the anatomy of a HTTP request. Each request has three main sections as follows.

    • Request LineGenerally, this line is made of HTTP Method + URI + HTTP Protocol. HTTP Method defines the type of a request, whether it is a GETPOSTPUT, or DELETE request. After the type, it contains the URI of the request, which corresoponds to the route we have defined in the server. And Finaly, the HTTP protocol version which can be one of the HTTP/1.1HTTP/2 , or HTTP/3. For example the request line to get the root route would be like:GET / HTTP/1.1\r\n
  • Python in Depth – Decorators

    ·

    I’m planning to do a research project which needs reading an analog sensor using Raspberry Pi Pico W and feed it to a TinyML model, store the results and provide a RESTFUL api for data transfer. To have a faster development process, I have decided to give MicroPython a try for this project.

    There would be several examples out there about implementing a tiny webserver on rasberypi pico. But, unfortunately, I couldn’t find a framework like FastApi, or Django to make web api development easily possible on micropython.

    Accordingly, I decided to start writing a Web API Framework with Micropython on Raspberry Pi Pico W from scratch, and in the meanwhile, tackle a bit more with advanced python software development topics.

    This work is inspired by FastAPI framework which by now is the most popular python framework for RESTFUL api development. So, let’s briefly take a look at a very simple FastAPI example provided in its website:

    from typing import Union
    from fastapi import FastAPI
    
    app = FastAPI()
    
    @app.get("/")
    def read_root():
        return {"Hello": "World"}
    
    @app.get("/items/{item_id}")
    def read_item(item_id: int, q: Union[str, None] = None):
        return {"item_id": item_id, "q": q}

    Ok, what’s going on here?

    FastAPI module is imported, an object from FastAPI class instantiated, two functions are defined to return a dictionary, and the route to those functions are defined by @app.get() decorator. Bravo! Such a neat design!

    What is a Decorator in python?

    The key element that let us design this way, is the decorators in python. In simple words, a decorator is a function that receives a function or a class and change its behavior without modifying the function itself!

    Let’s see what does that mean in action. We have a function that prints the current time:

    from datetime import datetime
    
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    get_time()
    17:59:25

    Now consider having another function which receives a function as an argument and prints the phrase “current time is:” before calling it.

    def decorate_time(func):
        print("current time is:")
        func()
    
    decorate_time(get_time)
    current time is:
    17:59:25

    It does the job we have expected, but the problem is if we call the get_time() function again it will only show time. What we want this behavior attached to the original function. To work around this issue, we can define a funtion inside the decorate_time() function and return the new function.

    from datetime import datetime
    
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    def decorate_time(func):
        def add_phrase():
            print("current time is:")
            func()
        return add_phrase
    
    get_time = decorate_time(get_time)
    get_time()
    current time is:
    18:15:02

    Infact, we called the original function in another function’s definition and replaced it with the old definition. What we have done here, is the essence of python decorators. To make the use of decorators more pleasant, in python we can use @ with the decorators name on top the original function definition.

    @decorate_time
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    get_time()
    current time is:
    18:43:44

    Python decorators with arguments and returning values

    So far so good. But we still have more to do to reach the same design. In the fastapi example, we have seen that the original function could have input parameters. Let’s define the function get_time() to receive an integer value which is then subtracted from the current hour and printed.

    from datetime import datetime, timedelta
    
    def get_time(hours:int):
        new_time = datetime.now() - timedelta(hours=hours) 
        print(new_time.strftime('%H:%M:%S'))
    
    get_time(1)
    18:38:38

    Now if we add the decorator we have defined before, we are gonna encouter with an error:

    TypeError: decorate_time.<locals>.add_phrase() takes 0 positional arguments but 1 was given

    Yes, since the function definition inside the decorator does not receive any arguments in its definition, it led to this error. To solve the issue, we have to define the inner function such that it can receive any number of parameters with any type. It’s done by adding *args, **kwargs to the inner function definition and then pass them to the original function call.

    As a reminder I should remind that * operator is used for packing and unpacking lists in python. In a function definition it means pack the arguments, and in function calls it unpacking the list and passing to the function. In a same way, ** operator is used for packing/unpacking dictionaries in function definition and calling.

    Another thing to be concered of is how the original function name as you can see in the error message is changed now to decorate_time.add_phrase(). To preseve the original function name and docstring, we have to use a built-in python decorator @functools.wraps() available in functools module.

    from datetime import datetime, timedelta
    import functools
    
    def decorate_time(func):
        @functools.wraps(func)
        def add_phrase(*args, **kwargs):
            print("current time is:")
            func(*args, **kwargs)
        return add_phrase
    
    @decorate_time
    def get_time(hours:int):
    
        new_time = datetime.now() - timedelta(hours=hours) 
        print(new_time.strftime('%H:%M:%S'))
    
    get_time(1)
    18:47:08

    Great! Now we can pass parameters to the decorated functions. What if we need to return a value instead of printing it? With the same logic, the decorator inner function should also return the value returned by the original function.

    from datetime import datetime, timedelta
    import functools
    
    def decorate_time(func)            
        @functools.wraps(func):
        def add_phrase(*args, **kwargs):
            print("current time is:")
            ret_val = func(*args, **kwargs)
            return ret_val
        return add_phrase
    
    @decorate_time
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(newtime.strftime('%H:%M:%S')) 
    19:04:39

    Awesome! What if we need to pass an argument to the decorator itself? I mean like the way we can pass the route to the decorator in fastapi.

    @app.get("/items/{item_id}")

    Let’s give it a try.

    @decorate_time("/")
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(get_time)   
    TypeError: 'str' object is not callable

    Why the code ends up calling a string object? Infact, the decorator @ syntactic sugar will translate into something like this:

    get_time = decorate_time(get_time)

    thus, if we pass an argument to the decorator python will translate it to:

    get_time = decorate_time("/")(get_time)

    That’s why we got an error. Instead of receiving a function object we passed a string to the decorator and it ended up calling a string object! To work aroud this issue, we wrap the entire decorator in another decorator!

    from datetime import datetime, timedelta
    import functools
    
    def outer_decorator(route):
        print(f"The route is: {route}")
        def decorate_time(func):
            @functools.wraps(func)
            def add_phrase(*args, **kwargs):
                print("current time is:")
                ret_val = func(*args, **kwargs)
                return ret_val
            return add_phrase
        return decorate_time
    
    @outer_decorator("/")
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(get_time)    

    Now, as before, the decorator actually translate to:

    get_time = outer_decorator("/")(get_time)

    But this time, outer_decorator returns another function which is the previous decorator, and then the it will get called.

    The route is: /
    current time is:
    <function get_time at 0x000002366A209AB0>

    Works like a charm! Note that @functools.wraps is still successfully preserve the original function name and docstring.

    Python decorators and classes

    Now we decorated a function and it’s exactly doing the job it was supposed to do. There is still something missing. In the fastapi example, functions are decorated using the @ with the object name followed by a . and the name of the deocrator.

    We can have different scenarios with decorators and classes. The decorators can easily be used on a class method definition just like we did on global functions. Let’s assume we have class named PyTime as below:

    from datetime import datetime, timedelta
    
    class PyTime:
        def __init__(self):
            pass
    
        def get_time(self, hours:int):
            return datetime.now() - timedelta(hours=hours)
    
    pytime = PyTime()
    newtime = pytime.get_time(1)
    print(newtime)
    2023-01-04 23:35:40.229966

    We can simply decorate it using the decorator we have defined before.

    from datetime import datetime, timedelta
    import functools
    
    def outer_decorator(route):
        print(f"The route is: {route}")
        def decorate_time(func):
            @functools.wraps(func)
            def add_phrase(*args, **kwargs):
                print("current time is:")
                ret_val = func(*args, **kwargs)
                return ret_val
            return add_phrase
        return decorate_time
    
    class PyTime:
        def __init__(self):
            pass
        @outer_decorator("/")
        def get_time(self, hours:int):
            return datetime.now() - timedelta(hours=hours)
    
    pytime = PyTime()
    newtime = pytime.get_time(1)
    print(newtime)
    The route is: /
    current time is:
    2023-01-04 23:38:06.261602

    Note that python also provides several built-in decorators like @property@classmethod, and @staticmethod to be used in decorating class methods.

    The other case is using a class itself as a decorator. Basically, when we instantiate a class, we are calling its constructor fucntion. Now, what if we pass the function we intend to decorate to the class constructor function __init__ ? We can then store the function in the class. Then by implementing __call__ function in the class, it’s possible to make the object callable. That’s all we need to to make a class act as a decorator. The only difference is to preserve the function name and docstring, instead of using the @functools.wraps decorator, we have explicitly call functools.update_wrapper(self, func).

    from datetime import datetime, timedelta
    import functools
    
    class TimeDecorator:    
        def __init__(self, func):
            functools.update_wrapper(self, func)
            self.func = func
    
        def __call__(self, *args, **kwargs):
            print("current time is:")
            return self.func(*args, **kwargs)
    
    @TimeDecorator  # equivalant to obj = TimeDecorator(get_time)
    def get_time(hours:int=0):
            return datetime.now() - timedelta(hours=hours)    
    
    newtime = get_time(1)
    print(newtime)
    current time is:
    2023-01-09 14:46:53.283555

    Using a class method as a decorator

    Yet we couldn’t find the proper design choice for our purpose. FastApi is creating an intance and use a method of that instance to decorate api routines. So, let’s give it a try by writing a class called MicroFastApi as follows.

    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info

    then for the http GET method, we add a new function called get. This function should receive a string to the corresponding route it supposed to get called from.

    It’s quite important to comprehend the underlying mechanics. The final goal is to decorate a function called for example index() or sensors(userId:int=None) like this:

    @app.get('/')
    def index():
        return "root"
    
    @app.get('/sensors/')
    def sensors(userId:int=None):
        return f"sensor data for userId={userId}"

    Since we need to feed the top-level decorator with an string input, we have to implement a nested decorator as discussed before in this post. The decorations methods will translate to:

    index = app.get('/')(index)
    sensors = app.get('/sensors/')(sensors)

    Note that the functions could have parameters and return values which means that we have pack/unpack parameters in the inner decorator function, and also return the values of the original fucntions. The decorator function is gonna be something like this:

        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
                    # post-processing 
                    return ret_val
                self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api

    Need to mention that to have @functools in MicroPython you should install micropython-functools using pip:

    pip install micropython-functools

    If you use Thonny for development on MicroPython, you can use the manage packages option from tools menu to easily install any python package.

    The final class is defined a below:

    import functools #pip install micropython-functools
    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info
    
        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
                    # post-processing 
                    return ret_val
                self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api
    
        def run():
            pass

    To verify if everything is working as expected:

    from microfastapi import MicroFastApi
    from wifi import Wifi
    from secrets import *
    
    wifihandle = Wifi(ssid = ssid, password = password)
    
    if not wifihandle.connect():
        raise Exception("Connection failed!")
    
    app = MicroFastApi(ip= wifihandle.ip, mac= wifihandle.mac)
    
    @app.get('/')
    def index():
        return "root"    
    
    @app.get('/sensors/')
    def sensors(userId:int=None):
        return f"sensor data for userId={userId}"
    
    print(app)
    for route in app.routes:
        print(f"route {route} is mapped to {app.routes[route].__name__}. result:"+ app.routes[route]())
    Connecting to: 1122 apt 1224
    Waiting for WiFi connection...
    Connected: True
    
                device ip = 10.0.0.65
                device mac = xx:xx:xx:xx:xx:xx
                open the URL provided below in your browser 
     http://10.0.0.65/
    route / is mapped to decorated_get_func. result:root
    route /sensors/ is mapped to decorated_get_func. result:sensor data for userId=None

    Great! Now, we have the basic design of the web server on our tiny shiny Rasbpery PI Pico W!

  • Offline Image Synchronization in Python

    ·

    During the past few weeks I was working on a project to detect an object from two or more cameras mounted in different perspectives. The initial intuitive idea is to save images with a timestamp and then match the images with a time interval of less than a defined threshold.

    Accordingly, I have written a piece of code which simply captures images from camera using OpenCv python library an save them in a specified directory as follows.

    At first we have to create an environment and install opencv-python lib:

    pip install opencv-python

    Then a video capture is created with the camera index=0 and the desired input image size is defined camera object set function. Note if you have multiple cameras you should specify the the index for each one.

    import cv2 as cv 
    from datetime import datetime
    
    camera_id = 0
    camera = cv.VideoCapture(camera_id)
    camera.set(cv.CAP_PROP_FRAME_WIDTH, 1280)
    camera.set(cv.CAP_PROP_FRAME_HEIGHT, 720)

    Then, all we need is calling the opencv read() function in a loop and save the captured image in the specified directory.

    directory = "./images/"
    while True:
        result, image = camera.read()
        if result:
            timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S.%f')[:-3]
            cv.imwrite(f"{directory}{timestamp}.jpg", image)
        cv.waitKey(1)

    To measure the recording FPS we can also use the cv.getTickCount() function to get the current CPU tick and call it again at the buttom of the loop and find the time interval and then devide the interval by the CPU frequency to get the time interval in seconds. Finally divide the number of elapsed seconds by the number of frames captured to have the actual FPS. The final code is as follows:

    import cv2 as cv 
    from datetime import datetime
    
    camera_id = 0
    camera = cv.VideoCapture(camera_id)
    camera.set(cv.CAP_PROP_FRAME_WIDTH, 1280)
    camera.set(cv.CAP_PROP_FRAME_HEIGHT, 720)
    
    start_tick = cv.getTickCount()
    numberofframes = 0
    
    directory = "./images/"
    while True:
        result, image = camera.read()
        if result:
            timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S.%f')[:-3]
            cv.imwrite(f"{directory}{timestamp}.jpg", image)
            numberofframes+=1
    
        end_tick = cv.getTickCount()
        time_taken = (end_tick - start_tick) / cv.getTickFrequency() 
        fps  = numberofframes / time_taken
        print(fps)
        cv.waitKey(1)

    Sounds good! But due to an unknown reason the recording speed did not exceed 12 frames per second, which does not seem to be right. Therefor, I decided to dig deeper to find a way to increase opencv python low recording FPS. At first I was spectical about the opencv python wrapper itself. I mean it’s python, and it is well-established that it is not as fast as C++ (However it is using native opencv library implementation in the underlying layers).

    By the way, I decided to try FFMPEG, which is the swiss army knife of the multimedia related operations, to record images with time-stamps and also verify if the problem is related to the opencv python not anything else.

    Recoding Time-Stamped Images with FFMPEG

    At first, download the FFMPEG command line tool excutable, and add its address to the enviroment variables to make it easily accessible.

    Then use the command below to have a list of connected devices:

    ffmpeg -list_devices true -f dshow -i dummy

    FFMPEG, based on the operating system it runs on, offers its users several different libraries for hanling different operations. One of them which is deprecated on windows based on the documentation is the vfw library. We can record videos using this library as below:

    ffmpeg -y -f vfwcap -r 30 -i 0 out.mp4

    This command will record an .mp4 file from the camera with id=0 with 30 frames per second. As shown in figure below the maximum FPS it could reach was about 15 frames per second which is almost the same as OpenCv.

    The second and updated version is to say FFMPEG to use Microsoft built-in library for media communications called DirectShow.

    ffmpeg.exe -f dshow -video_size 1280x720 -t 00:00:30 -i video="Integrated Webcam" out.mp4

    This command is telling FFMPEG to record a 30 seconds 1280×720 video from the device with name “Integrated Webcam” using the DirectShow library.

    Recording video using DirectShow increased fps to roughly 30 frames per second. It seems that FFMPEG coupled with DirectShow is fast enough in recording video streams.

    Great! Now, we should try to save time-stamped images to be able to perform the image synchronization process. It is possible to record images with FFMPEG using the following command:

    ffmpeg.exe -f dshow -video_size 1280x720 -t 00:00:59 -i video="Integrated Webcam" '%04d.png'

    With this command FFMPEG will record images and save them as digital number file names which is not useful in our image synchronizing case.

    To save images with timestamp file name, FFMPEG provides the STRFTIME library. We just need to add the arugument -strftime 1 in the command followed by the datetime format string.

    ffmpeg.exe -f dshow -video_size 1280x720 -t 00:00:59 -i video="Integrated Webcam" -strftime 1 '%Y-%m-%d_%H-%M-%S_test.png'

    Unfortunately, STRFTIME library does not provide milliseconds precision which is not possible to be used in our project.

    Fixing OpenCv-Python Low Recording Rate

    At least, we have a clue! We have seen that leveraging DirectShow library would help us increase the recording rate with FFMPEG. After looking OpenCv documention, I have found that it is possible to use different libraries(so called backends) for I/O operations. The problem is, OpenCv is not contained DirectShow by default which means that we have to compile it from the source code with DirectShow included to be able to use it as the backend.

    Ok then, let’s compile OpenCv form source code. I will briefly go through the steps:

    • Make sure you have Visual Studio Community and CMAKE installed.
    • Download the latest OpenCV release source code.
    • Extract the .zip file.
    • Launch cmake-Gui and provide it the source code folder path and the build destination path.
    • press configure and select the visual studio version(the one you just installed) you wish the project generate for.
    • go through the list of red checkboxes and make sure check whatever feature you need. In our case we have to check OPENCV_PYTHON3_VERSION, BUILD_opencv_python3 and WITH_DSHOW flags.
    • It will automatically find python binaries if you already installed them, otherwise make sure you have provided python binary files. Note that you can download and provide the contrib extra modules if you need any of those libraries and functionalies.
    • press generate and wait for the OpenCV visual studio solution to get generated.
    • press open project or open the .sln file generated in the destination path.
    • In the opened solution select build type as release , open CmakeTargets, right click on ALL-BUILD. (Recharge with a coffee since it might take a while to build)
    • right click on INSTALL and built it too.
    • opencv-python should be installed automatically. Otherwise you can find the opencv-python library with .pyd extension in the build path /lib/python3 directory and put a copy of it inside your python environment packages directory.

    Python Offline Image Synchronization

    Let’s see if we can gain higher FPS with capturing time-stamped images.

    import cv2 as cv 
    from datetime import datetime
    
    camera_id = 0
    camera = cv.VideoCapture(camera_id)
    camera.set(cv.CAP_PROP_FRAME_WIDTH, 1280)
    camera.set(cv.CAP_PROP_FRAME_HEIGHT, 720)
    
    start_tick = cv.getTickCount()
    numberofframes = 0
    
    directory = "./images/"
    while True:
        result, image = camera.read()
        if result:
            timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S.%f')[:-3]
            cv.imwrite(f"{directory}{timestamp}.jpg", image)
            numberofframes+=1
    
        end_tick = cv.getTickCount()
        time_taken = (end_tick - start_tick) / cv.getTickFrequency() 
        fps  = numberofframes / time_taken
        print(fps)
        cv.waitKey(1)
    29.558924959655464
    29.568654656146848
    29.573374653446777
    29.548251172473567

    Great! We could reach about 30 frames per second.

    Now let’s get back to the main objective, capturing time-stamped images! The code provided above is naive and it was only for testing the output FPS. Therefor, I have added the functionalities we need and wrapped up the entire code in a class for ease of use and further development.

    from itertools import count
    from threading import Thread, Semaphore
    import numpy as np
    import cv2 as cv
    import time 
    import random
    from datetime import datetime
    import os
    
    class Camera(Thread):
    
        def __init__(self, camId:int, frameWidth:int, frameHeight:int,visualize:bool = False, video_output:bool = False, time_stamped_output:bool = False, root_path:str="",fileName:str="",fileName_postfix:str="", autofocus:bool=False):
    
            Thread.__init__(self)
    
            self.camId = camId
            self.width = frameWidth
            self.height = frameHeight
            self.outputfile = fileName
            self.fileName_postfix = fileName_postfix
            self.done = False
            self.image_format = '.jpeg'
            self.autofocus = autofocus
            self.fourcc =  cv.VideoWriter.fourcc('M','J','P','G')
            self.calibration_frames_number = 200
            self.time_stamped_output = time_stamped_output
            self.video_output = video_output
            self.root_path = root_path
            
            if not self.outputfile:
                self.outputfile = self.get_time_stamp()+"_cam"+str(self.camId)+self.fileName_postfix+".mp4"
    
            self.visualize = visualize
    
        def connect(self):
            # make sure build and install opencv from source to work with DirectShow on windows
            self.videostreamcapture = cv.VideoCapture(self.camId, cv.CAP_DSHOW)
            self.input_fps  = self.videostreamcapture.get(cv.CAP_PROP_FPS)
            self.recording_fps = self.input_fps
            
            self.videostreamcapture.set(cv.CAP_PROP_FRAME_WIDTH, self.width)
            self.videostreamcapture.set(cv.CAP_PROP_FRAME_HEIGHT, self.height)
    
            if not self.autofocus:
                self.videostreamcapture.set(cv.CAP_PROP_AUTOFOCUS, 0) # turn the autofocus off
            else:
                self.videostreamcapture.set(cv.CAP_PROP_AUTOFOCUS, 1) # turn the autofocus on
    
    
        def calculate_camera_fps(self)->int:
              
            start_tick = cv.getTickCount()
            for i in range(0, self.calibration_frames_number) :
                ret, frame = self.videostreamcapture.read()
            end_tick = cv.getTickCount()
            time_taken = (end_tick - start_tick) / cv.getTickFrequency() 
            fps  = self.calibration_frames_number / time_taken
            return fps
        
        def calculate_recording_fps(self)->int:
             
            outputstreamwriter = cv.VideoWriter(self.root_path+self.get_time_stamp()+"_calibration_"+str(self.camId)+self.fileName_postfix+".mp4",  self.fourcc, 30, (self.width, self.height), isColor=True)
            start_tick = cv.getTickCount()
            for i in range(0, self.calibration_frames_number) :
                ret, frame = self.videostreamcapture.read()
                outputstreamwriter.write(frame)
                cv.waitKey(1)
            end_tick = cv.getTickCount()
            time_taken = (end_tick - start_tick) / cv.getTickFrequency() 
            fps  = self.calibration_frames_number / time_taken
            outputstreamwriter.release()
            return fps
            
        def get_time_stamp(self):
            timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S.%f')[:-3]
            return timestamp
    
        def run(self):
    
            camera_fps = self.calculate_camera_fps()
            print("camera fps: " + str(camera_fps))
    
            if self.time_stamped_output:
                date_path=self.root_path + datetime.now().strftime('%Y-%m-%d')
                if not os.path.exists(date_path):
                    os.mkdir(date_path) 
    
            if self.video_output:
                recording_fps = self.calculate_recording_fps()           
                print("recording fps: " + str(recording_fps))
                self.outputstreamwriter = cv.VideoWriter(self.root_path + self.outputfile, self.fourcc, recording_fps, (self.width, self.height), isColor=True)
    
            while not self.done:
                ret, frame = self.videostreamcapture.read()
                if not ret:
                    print("Can't receive frame (stream end?). Exiting ...")
                    break  
                
                if self.time_stamped_output:
                    timestamp = self.get_time_stamp()
                    cv.imwrite(date_path + "/" + timestamp +"_"+str(cv.getTickCount())+"_"+ "cam" +str(self.camId)+self.fileName_postfix+ self.image_format, frame)
                
                if self.video_output:
                    self.outputstreamwriter.write(frame)
    
                if self.visualize:  
                    cv.imshow('Camera Index:'+ str(self.camId), frame)
                
                cv.waitKey(1)
    
            self.videostreamcapture.release()
            if self.video_output:
                self.outputstreamwriter.release()
            cv.destroyAllWindows()
    
        def stopit(self):
            self.done = True

    First of all, note that this class is supposed to be general class for camera operations, including capturing timestamped image. To avoid any delay or performance issues while recording with several cameras, each camera I/O should run in a seperate thread. Accordingly, the Camera class is derived from the thread class. To have some info about the actual FPS I have implemented two functions to capture a few frames and calcluate FPS both in recording mode and only capturing mode. This info could be used when we are directly recording video files. The construcor function provides several options as follows.

    def __init__(self, camId:int, frameWidth:int, frameHeight:int,visualize:bool = False, video_output:bool = False, time_stamped_output:bool = False, root_path:str="",fileName:str="",fileName_postfix:str="", autofocus:bool=False):
    • camID: Each camera object should be assigned with an ID, and the ID is used for saving recorded files.
    • frameWidth and frameHeight: Size of frames to capture.
    • visualize: Shows the live camera output if True
    • video_output: Saves a .mp4 video output if True
    • time_stamped_output: Record time-stampped images if True
    • root_path: Output root path
    • fileName: File name for video recordings
    • fileName_postfix: File name postfix string
    • autofocus: Turn Autofocus on/off
  • Machine Learning Case Study – Audio Classification

    ·

    Implementing a real-life example of things that I’m trying to comprehend, is the most exciting part of the learning process for me. It gives me a chance to encounter with dark corners and pitfalls. Accordingly, I’m going to briefly write up about a recent project I have done on simple audio classification using machine learning. In this project, I will implement a machine learning model to classify fire alarm, vacuum cleaner, music, and silence sounds.

    Data Collection

    Every machine learning pipeline starts with a data collection and prepration stage. I have collected audio samples in .WAV format using the Audacity desktop application on the windows with 44.1 khz sample rate. Note that we consider silence as a seperate class. Each sample is roughly 30 seconds. For music, fire alarm, and vacuum cleaner, I recorded samples from online audio. For each class, I collected at least 20 samples, in different sessions with different ambient noise and background to have a more diverse dataset which helps our model to be more generalized and robust.

    For loading audio samples, preprocessing, and feature extraction I have used a popular python library called Librosa. It seems that intalling Librosa using pip is problematic due to unknown reasons. Accordingly, I highly recommend to install it using Conda to avoid any inconsistency and hassles on Windows.

    import os
    import numpy as np
    import matplotlib.pyplot as plt
    import librosa
    import librosa.display
    from scipy.fft import rfft, rfftfreq
    import cv2
    import pickle
    
    from sklearn.model_selection import cross_val_score
    from sklearn.svm import SVC
    from sklearn.metrics import classification_report, accuracy_score
    from sklearn.metrics import ConfusionMatrixDisplay
    
    ROOT_DIR = 'C:/Users/amiri/Desktop/demo/dataset/'
    SAMPLING_RATE = 44100 #it's consistent over the entire dataset recordings
    
    def get_all_directories(root_path):
        dirs = os.listdir(root_path)
        dirs = [dir for dir in dirs if os.path.isdir(root_path+dir)]
        return dirs
    
    def get_all_files(path):
        files = os.listdir(path)
        files = [file for file in files if os.path.isfile(path+file)]
        return files
       
    def load_all_audio_files(root_path, duration=30):
        files = get_all_files(root_path)
        file_samples = []
        for file in files:
                samples, sampling_rate = librosa.load(root_path+file,
                 sr=None, mono=True, offset=0.0, duration=duration)
                file_samples.append(samples)
        return file_samples
    
    dataset = {}
    for audio_class in get_all_directories(ROOT_DIR):
        dataset[audio_class] = load_all_audio_files(ROOT_DIR + audio_class+'/')
        print(f"number of {audio_class} samples: {len(dataset[audio_class])}") 

    Now let’s visualize the samples and take a look at the audio signals to have an intuition about different audio shapes and variations.

    fig, axs = plt.subplots(4,figsize=(8, 5), sharex=True,
    constrained_layout = True) 
    fig.suptitle('Time-Amplitude Visualizaition')
    
    ax_index = 0
    sample_index = 0
    for audio_class in dataset:
        axs[ax_index].title.set_text(f'{audio_class} Audio Sample \n')
        librosa.display.waveshow(dataset[audio_class][sample_index],
         sr=SAMPLING_RATE, ax = axs[ax_index])
        ax_index+=1
    plt.show()

    Domain Specific Features

    Based on the sound waves visualized above. We may be able to use some time-domain features like: number of zero crossings, mean flatness, maximum amplitude, minimum amplitude, kurtosis and skewness.

    def get_zero_crossing_features(wave_sample):
        crossings = np.nonzero(librosa.zero_crossings(wave_sample))[0]
        number_of_zero_crossings = len(crossings)
        mean_zero_crossing_rate = librosa.feature.zero_crossing_rate(wave_sample).mean()
        
        return number_of_zero_crossings, mean_zero_crossing_rate
    
    def get_mean_flatness(audio_sample):
        return librosa.feature.spectral_flatness(y=audio_sample).mean()
    
    def get_time_domain_features(audio_sample):
        
        mean_flatness = get_mean_flatness(audio_sample)
        number_of_zero_crossings , mean_zero_crossing_rate = get_zero_crossing_features(audio_sample)
        max_ampl = audio_sample.max()
        min_ampl = audio_sample.min()
        variance_ampl = np.var(audio_sample)
        kurtosis = sc.stats.kurtosis(audio_sample)
        skewness = sc.stats.skew(audio_sample)        
        return number_of_zero_crossings, mean_zero_crossing_rate,
         max_ampl, min_ampl, variance_ampl, mean_flatness,kurtosis, skewness
    
    data = []
    labels = []
    for audio_class in dataset:
        for audio_sample in dataset[audio_class]:
            time_domain_features = list(get_time_domain_features(audio_sample))
            feature_set = np.concatenate([time_domain_features])
            labels.append(audio_class)
            data.append(feature_set)
    
    data = np.array(data)
    labels = np.array(labels)

    Now that we constructed the feature set, we can go ahead and feed them into different classification methods and see if they can correctly classify audio recordings!

    xtrain, xtest, ytrain, ytest = train_test_split(data, labels, test_size=0.3, shuffle=True)
    svm_rbf = SVC()
    svm_rbf.fit(xtrain, ytrain)
    svm_rbf_scores = cross_val_score(svm_rbf, xtrain, ytrain, cv=10)
    print('Average Cross Validation Score from Training:',
     svm_rbf_scores.mean(), sep='\n', end='\n\n\n')
    
    
    svm_rbf_ypred = svm_rbf.predict(xtest)
    svm_rbf_cr = classification_report(ytest, svm_rbf_ypred)
    
    print('Test Statistics:', svm_rbf_cr, sep='\n', end='\n\n\n')
    
    svm_rbf_accuracy = accuracy_score(ytest, svm_rbf_ypred)
    print('Testing Accuracy:', svm_rbf_accuracy)
    
    fig, ax = plt.subplots(figsize=(5,5))
    ConfusionMatrixDisplay.from_estimator(svm_rbf, xtest, ytest,
     ax = ax, cmap='RdYlGn')
    plt.show()

    and Voila!

    Disappointing! Isn’t it?!

    But, why the model showed such a poor classification result? First of all, we should take a step back and look at the dataset itself to find a clue.

    import pandas as pd
    dataframe = pd.DataFrame.from_records(data)
    dataframe.describe()

    We can clearly see that the features have different value ranges, which could drastically reduce the model accuracy if the machine learning model works based on distance between data points! (which in our case SVM classifier does.)

    Accordingly, a StandardScaler is added to the pipeline to normalize data. Let’s see if it can improve the accuracy or not.

    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import Pipeline
    from sklearn.svm import SVC
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import balanced_accuracy_score
    
    xtrain, xtest, ytrain, ytest = train_test_split(data, labels, test_size=0.3, shuffle=True)
    svm_rbf = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
    svm_rbf.fit(xtrain, ytrain)
    svm_rbf_scores = cross_val_score(svm_rbf, xtrain, ytrain, cv=10)
    print('Average Cross Validation Score from Training:',
     svm_rbf_scores.mean(), sep='\n', end='\n\n\n')
    
    
    svm_rbf_ypred = svm_rbf.predict(xtest)
    svm_rbf_cr = classification_report(ytest, svm_rbf_ypred)
    
    print('Test Statistics:', svm_rbf_cr, sep='\n', end='\n\n\n')
    
    svm_rbf_accuracy = accuracy_score(ytest, svm_rbf_ypred)
    print('Testing Accuracy:', svm_rbf_accuracy)
    
    balanced_accuracy = balanced_accuracy_score(ytest, svm_rbf_ypred)
    print('Balanced Testing Accuracy:', balanced_accuracy)
    
    
    fig, ax = plt.subplots(figsize=(5,5))
    ConfusionMatrixDisplay.from_estimator(svm_rbf, xtest, ytest,
     ax = ax, cmap='RdYlGn')
    plt.show()

    Well! We could improve the model accuracy about twenty percent just by normalizing features!

    But, the result is still not satisfactory. We can try adding more features, or trying other models to see if we can improve the work. To add more features we can convert the audio samples to frequency-domain and see if we can extract more meaningful information and features. It’s easy to change audio signals to frequency-domain in Librosa and also Scipy libraries using Fast Fourier Transform. It’s basically a method to transform discrete spacial data to a discrete frequency histogram. In other words it reveals the frequencies distribution of the signal.

    from scipy.fft import rfft, rfftfreq
    
    def Get_RFFT(audio_sample):
        N = len(audio_sample)
        yf = rfft(audio_sample)
        xf = rfftfreq(N, 1 / SAMPLING_RATE)
        return xf, yf
    
    fig, axs = plt.subplots(4,figsize=(8, 5), sharex=True,
    constrained_layout = True) 
    fig.suptitle('Frequency-Domain Visualizaition')
    
    ax_index = 0
    sample_index = 0
    for audio_class in dataset:
        audio_sample = dataset[audio_class][sample_index]
        axs[ax_index].title.set_text(f'{audio_class} \n')
        audio_sample_xf, audio_sample_yf = Get_RFFT(audio_sample)
        axs[ax_index].plot(audio_sample_xf, np.abs(audio_sample_yf))
        ax_index+=1
    
    plt.show()

    The figure above, illustrates the frequencies in one randomely chosen sample of each class. We can see each audio sample contains different frequencies which we can be leveraged as a new feature. But, what we do instead, is sliding a window of a fixed size on the original signal, and extract frequency distribution on each window using FFT. Eventualy, by column-wise concatenation of each FFT output, we will have a two-dimentional array of frequencies in time, which is called a Spectrogram. Fortunately, Librosa library made our life easier and provided neat functions to extract and visualize the spectrogram of a signal.

    import matplotlib.pyplot as plt
    
    FFT_SIZE = 1024*4
    WINDOW_SIZE = FFT_SIZE//2
    
    fig, axs = plt.subplots(4,figsize=(15, 15), sharex=True,
    constrained_layout = True) 
    fig.suptitle('Frequency-Domain Visualizaition')
    
    
    ax_index = 0
    sample_index = 15
    for audio_class in dataset:
        audio_sample = dataset[audio_class][sample_index]
        S = np.abs(librosa.stft(audio_sample,n_fft=FFT_SIZE,win_length=WINDOW_SIZE))
        power_fft = librosa.amplitude_to_db(S,ref=np.max)
        axs[ax_index].title.set_text(f'{audio_class} \n')
        img = librosa.display.specshow(power_fft, y_axis='log', x_axis='time', 
                                       ax=axs[ax_index],sr = SAMPLING_RATE)
        axs[ax_index].set_title(f'{audio_class} Power spectrogram')
        fig.colorbar(img, ax=axs[ax_index], format="%+2.0f dB")
        ax_index+=1
    
    plt.show()

    As shown in the figure above, Spectrogram is actually an image containing the changes in frequency over time. Now, we have a rich feature in frequency-domain, we are gonna flatten the 2-D sepctrogram array to a 1-d array and concatenate them to the time-domain features. Also, to reduce the spectrogram size, I have treated the spectrogram as an image and resized it using OpenCv.

    data = []
    labels = []
    num_freq_bins=32
    num_time_bins=256
    for audio_class in dataset:
        for audio_sample in dataset[audio_class]:
        
            time_domain_features = list(get_time_domain_features(audio_sample))
            
            spectro = np.abs(librosa.stft(audio_sample,n_fft=FFT_SIZE,win_length=WINDOW_SIZE))
            power_spectro = librosa.amplitude_to_db(spectro,ref=np.max)
            binned_power_spectro = cv2.resize(power_spectro[:,:],(num_time_bins,num_freq_bins))
            
            feature_set = np.concatenate([binned_power_spectro.ravel(), time_domain_features])
            data.append(feature_set)
            labels.append(audio_class)
    
    
    data = np.array(data)
    labels = np.array(labels)
    data.shape
    

    Finally, the dataset is fed into the SVM classifier with results shown below.

    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import Pipeline
    from sklearn.svm import SVC
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import balanced_accuracy_score
    print('\n')
          
    xtrain, xtest, ytrain, ytest = train_test_split(data, labels, test_size=0.3, shuffle=True)
    svm_rbf = Pipeline([('scaler', StandardScaler()),('svc', SVC())])
    svm_rbf.fit(xtrain, ytrain)
    svm_rbf_scores = cross_val_score(svm_rbf, xtrain, ytrain, cv=10)
    print(f'10-Fold CV Score :{svm_rbf_scores.mean():.3f} ± {svm_rbf_scores.std():.3f}',
          sep='\n', end='\n\n\n')
    
    
    
    svm_rbf_ypred = svm_rbf.predict(xtest)
    svm_rbf_cr = classification_report(ytest, svm_rbf_ypred)
    
    print('Test Statistics:', svm_rbf_cr, sep='\n', end='\n')
    
    svm_rbf_accuracy = accuracy_score(ytest, svm_rbf_ypred)
    print('Testing Accuracy:', svm_rbf_accuracy)
    
    balanced_accuracy = balanced_accuracy_score(ytest, svm_rbf_ypred)
    print('Balanced Testing Accuracy:', balanced_accuracy)
    
    
    fig, ax = plt.subplots(figsize=(5,5))
    ConfusionMatrixDisplay.from_estimator(svm_rbf, xtest, ytest,
     ax = ax, cmap='RdYlGn')
    plt.show()

    Real-Time Audio Classification

    Till now, we have used 30-second audio samples. But this duration is too much if we are aiming to use the model in a real-time situation like smart-home applications. Accordingly, first of all we have to decrease the sample sizes and try to detect the events in a smaller time window. One way around it is to slide a fixed window size on the original audio samples and create another dataset out of the extracted windows. If we slide the window without any overlap then we can use the same pipeline as we have developed before. Since we may lose some of the event data which lies between two consecutive frames, it’s better to have at least a 50 percent overlap between consecutive frames. But, it’s a bit tricky! Because if we introduce overlapping frames in the dataset, then the evaluation process may shows a bloated result. In other words, the test may contains the frames which have seen before by the model in the training set! Accordingly, it will lead to an unreliable model performance evaluation.

    If we had enough data, then we could simply seperate train and test samples before applying the sliding window on them. But here, we utilize a method called BlockingTimeSeriesSplit which is a well-known method for time-series cross-validation split. more details here ,and here.

    Ok then, first of all, we need to extract the frames using function defined below.

    def get_windows(wave_sample, dataclass, sampling_rate, window_size_seconds=1, overlap = 0.5):
        sub_samples_data = []
        labels = []
        samples_per_window = int(sampling_rate) * window_size_seconds
        window_shift_by = int((1-overlap) * samples_per_window)
        window_index = 0 
    
        while not ((window_index + samples_per_window)>wave_sample.shape[0]):
            sub_samples_data.append(wave_sample[window_index: window_index + samples_per_window])
            window_index+=window_shift_by
            
        labels.extend([dataclass]  * len(sub_samples_data))
        return sub_samples_data, labels 
    
    data = {}
    
    for audio_class in dataset:
        class_sub_features = []
        for audio_sample in dataset[audio_class]:
            sub_samples_data, sub_samples_labels = get_windows(wave_sample = audio_sample,
                                                   dataclass = audio_class,
                                                   sampling_rate = SAMPLING_RATE
                                                  , window_size_seconds=2)
            for i in range(len(sub_samples_data)):
                time_domain_features = list(get_time_domain_features(sub_samples_data[i]))
                
                
                spectro = np.abs(librosa.stft(sub_samples_data[i],n_fft=FFT_SIZE,
                win_length=WINDOW_SIZE))
                power_spectro = librosa.amplitude_to_db(spectro,ref=np.max)
                binned_power_spectro = cv2.resize(power_spectro[:,:],(num_time_bins,num_freq_bins))
                
                time_feauture_set = [time_domain_features,]
                freq_feature_set = [binned_power_spectro.ravel(),]
            
                
                final_feature_set =  time_feauture_set + freq_feature_set
                feature_set = np.concatenate(final_feature_set)
                class_sub_features.append(feature_set)
        data[audio_class] = class_sub_features
    

    Feature extraction process is the same as before, but here we added an inner loop to slide a window on each audio sample and extract the features for each frame to build the new dataset.

    Then the dataset is splitted using the Blocking TimeSeries Split method. Note that the dataset shouldn’t get shuffled before this process.

    class BlockingTimeSeriesSplit():
        def __init__(self, n_splits, test_size=0.3, hop=2):
            self.n_splits = n_splits
            self.test_size = test_size
            self.hop = hop
            
        def get_n_splits(self, X, y, groups):
            return self.n_splits
        
        def split(self, X, y=None, groups=None):
            n_samples = len(X)
            k_fold_size = n_samples // self.n_splits
            indices = np.arange(n_samples)
    
            for i in range(self.n_splits):
                start = i * k_fold_size
                stop = start + k_fold_size
                mid = int((1-self.test_size) * (stop - start)) + start
                yield indices[start: mid], indices[mid + self.hop: stop]
    
    blocking_timeseries_split = BlockingTimeSeriesSplit(n_splits=30, test_size=0.3, hop=2)
    
    x_test_groups = []
    x_train_groups = []
    
    y_test_groups = []
    y_train_groups = []
    for audio_class in data:
        class_data = np.array(data[audio_class]) 
        for train_index, test_index in blocking_timeseries_split.split(class_data):
            
           #print("TRAIN:", train_index, "TEST:", test_index)
           x_train, x_test = class_data[train_index], class_data[test_index]
           y_train = [audio_class]  * len(x_train)
           y_test = [audio_class]  * len(x_test)
           x_test_groups.extend(x_test)
           x_train_groups.extend(x_train)
    
           y_test_groups.extend(y_test)
           y_train_groups.extend(y_train)
    
    x_test_groups = np.array(x_test_groups)
    x_train_groups = np.array(x_train_groups)
    
    y_test_groups = np.array(y_test_groups)
    y_train_groups = np.array(y_train_groups)
    

    Finally, the training set and testing set are fed to the machine learning pipeline for training and evaluation.

    model = Pipeline([('scaler', StandardScaler()),('svc', SVC())])
    
    model.fit(x_train_groups, y_train_groups)
    model_scores = cross_val_score(model, x_train_groups, y_train_groups,
     cv=10,scoring="balanced_accuracy")
    
    print(f'10-Fold CV Score :{model_scores.mean():.3f} ± {model_scores.std():.3f}',
     sep='\n', end='\n\n')
    
    model_ypred = model.predict(x_test_groups)
    model_cr = classification_report(y_test_groups, model_ypred)
    
    print('Test Statistics:', model_cr, sep='\n', end='\n\n')
    
    model_accuracy = accuracy_score(y_test_groups, model_ypred)
    print(f'Testing Accuracy:{model_accuracy:.3f}')
    
    
    balanced_model_accuracy = balanced_accuracy_score(y_test_groups, model_ypred)
    print(f'Balanced Testing Accuracy:{balanced_model_accuracy:.3f}')
    
    fig, ax = plt.subplots(figsize=(5,5))
    ax.grid(False)
    ConfusionMatrixDisplay.from_estimator(model, x_test_groups, y_test_groups,ax = ax,
     cmap='Greens')
    plt.show()

    Sounds Great!

    Real-Time Audio Classification Live Demo

    Now let’s use the model in a real-time application and see if it works as expected! To do so, first of all we need to save the trained model using the python pickle object serialization library.

    import pickle
    filename = 'audio_classifier.ml'
    pickle.dump(model, open(filename, 'wb'))
    

    Next, we have to read the microphone data using pyaudio python library by instantiating a PyAudio object and open it with same sampling frequency we recorded the sample audio with.

    import pyaudio 
    SAMPLING_RATE  = 41000
    CHUNK_SIZE   = SAMPLING_RATE // 10
    
    py_audio = pyaudio.PyAudio()
    input_audio_stream = py_audio.open(format=pyaudio.paFloat32,
    channels=1, rate=SAMPLING_RATE, input=True,
    frames_per_buffer=CHUNK_SIZE)
    

    Note that the pyaudio buffers up the samples from the microphone and returns them whenever it reached the threshold parameter frames_per_buffer as a whole.

    Now we can read the last chunk of data in a loop. Note that this method of reading input data which is called Blocking, is deprecated in real-world implementation since if the processing happens in a same thread as reading, it will create a lag between each read and eventauly will led to missing parts of the actual audio. To work around this issue, data should be read in a seperate thread and process in a different thread (typical producer/consumer problem). But for the sake of simplicity we don’t get into that here.

    try:
        while True:
            # reading and converting pyaudio input to np array
            raw_data_chunk = np.fromstring(input_audio_stream.read(CHUNK_SIZE),dtype=np.float32) 
            
    
            cv2.waitKey(1)
    
    except KeyboardInterrupt:
        input_audio_stream.stop_stream()
        input_audio_stream.close()
    

    You might think if we add the machine learning model after reading audio, we should be done! In fact it will somehow work, but the problem is data is read every 2 seconds and discarded after we have processed it! So, what if an event happened between two consecutive audio signal chunks?

    To solve this issue we have to decrease the input chunk size and buffer the input data, just like we did in training phase. Then we can slide a fixed-size window on the buffer and feed the window content to the machine learning pipeline.

    To do so, I have written a class name WindowedBuffer as below:

    import numpy as np 
    
    class WindowedBuffer1D:
    
        def __init__(self, buffer_size:int, window_size:int, window_loc = "center"):
            
            self.buffer_size = buffer_size
            self.window_size = window_size
            self.buffer = None
            self.chunk_number = 0
            if window_loc == "center" or None:
                self.window_start_loc = self.buffer_size//2 - self.window_size//2
                self.window_end_loc = self.window_start_loc + self.window_size
    
            elif window_loc == "head":
                self.window_start_loc = self.buffer_size - self.window_size
                self.window_end_loc = self.buffer_size
    
            elif window_loc == "tail":
                self.window_start_loc = 0
                self.window_end_loc = self.window_size
    
            
        def Append(self, data):
            # Todo: sanity check here before concat
            if self.buffer is None:
                self.buffer = np.zeros(self.buffer_size)
    
            self.buffer = np.concatenate((self.buffer, data), axis=-1)
            self.buffer = self.buffer[data.shape[0]:]
            self.chunk_number+=1
    
        def GetWindow(self):
            return self.buffer[self.window_start_loc:self.window_end_loc]
    

    Basically, It’s a fixed-size buffer that you can add a chunk a data to its head and it will remove the same amount from the tail. Also, by calling the GetWindow function you will get the window content with the size and location defined in the object constructor.

    win_buffer.Append(raw_data_chunk)
    window_samples = win_buffer.GetWindow()
            
    window_fft = librosa.amplitude_to_db(np.abs(librosa.stft(y= window_samples,n_fft=FFT_SIZE)),
     ref=np.max)
            
    extracted_features = getfeatures(window_samples)
    

    So unitl here, we could read raw data, append it to the buffer, get the buffer window and extract features from it. All we need to plugging in the trained model and use it for classification.

    filename = 'audio_classifier.ml'
    classification_model = pickle.load(open(filename, 'rb'))
    

    Finally this is the code for the real-time audio classification

    from time import time
    import pickle
    import numpy as np
    import scipy as sc 
    import librosa
    import librosa.display
    import pyaudio 
    import cv2
    import PIL
    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    import warnings
    from windowedbuffer1d import WindowedBuffer1D
    
    warnings.simplefilter("ignore", DeprecationWarning)
    
    
    import numpy as np
    import matplotlib.pyplot as plt
    
    
    SAMPLING_RATE  = 41000
    CHUNK_SIZE   = SAMPLING_RATE//4
    CLASSIFICATION_WINDOW_SIZE   = SAMPLING_RATE * 2 # 2 seconds
    BUFFER_DURATION = SAMPLING_RATE * 3 # 3 seconds
    FFT_SIZE = 1024*4
    FFT_WINDOW_SIZE = FFT_SIZE//2
    num_freq_bins=32
    num_time_bins=256
    
    def get_zero_crossing_features(wave_sample):
        crossings = np.nonzero(librosa.zero_crossings(wave_sample))[0]
        number_of_zero_crossings = len(crossings)
        mean_zero_crossing_rate = librosa.feature.zero_crossing_rate(wave_sample).mean()
        
        return number_of_zero_crossings, mean_zero_crossing_rate
    
    def get_mean_flatness(audio_sample):
        return librosa.feature.spectral_flatness(y=audio_sample).mean()
    
    def get_time_domain_features(audio_sample):
        
        mean_flatness = get_mean_flatness(audio_sample)
        number_of_zero_crossings , mean_zero_crossing_rate = get_zero_crossing_features(audio_sample)
        max_ampl = audio_sample.max()
        min_ampl = audio_sample.min()
        variance_ampl = np.var(audio_sample)
        kurtosis = sc.stats.kurtosis(audio_sample)
        skewness = sc.stats.skew(audio_sample)        
        return number_of_zero_crossings, mean_zero_crossing_rate, max_ampl, min_ampl, variance_ampl,
         mean_flatness,kurtosis, skewness
    
    
    def getfeatures(signal):
        time_domain_features = list(get_time_domain_features(signal))
              
        spectro = np.abs(librosa.stft(signal,
        n_fft=FFT_SIZE,win_length=FFT_WINDOW_SIZE))
        power_spectro = librosa.amplitude_to_db(spectro,ref=np.max)
        binned_power_spectro = cv2.resize(power_spectro[:,:],
        (num_time_bins,num_freq_bins))
                
        time_feauture_set = [time_domain_features,]
        freq_feature_set = [binned_power_spectro.ravel(),]
                  
        final_feature_set =  time_feauture_set + freq_feature_set
        feature_set = np.concatenate(final_feature_set)
        return feature_set
    
    # load the model from disk
    filename = 'audio_classifier.ml'
    classification_model = pickle.load(open(filename, 'rb'))
    
    py_audio = pyaudio.PyAudio()
    input_audio_stream = py_audio.open(format=pyaudio.paFloat32,
    channels=1, rate=SAMPLING_RATE, input=True,
    frames_per_buffer=CHUNK_SIZE)
    
    win_buffer = None
    try:
        while True:
            raw_data_chunk = np.fromstring(input_audio_stream.read(CHUNK_SIZE),dtype=np.float32)
             
            if win_buffer is None:
                win_buffer = WindowedBuffer1D(buffer_size=BUFFER_DURATION,
                 window_size=CLASSIFICATION_WINDOW_SIZE)
            
            win_buffer.Append(raw_data_chunk)
            window_samples = win_buffer.GetWindow()
            
            window_fft = librosa.amplitude_to_db(
                np.abs(librosa.stft(y= window_samples,n_fft=FFT_SIZE)), ref=np.max)
            
            extracted_features = getfeatures(window_samples)
    
            if not np.isnan(extracted_features).any():
                classification_result = classification_model.predict
                (extracted_features.reshape(1, -1))
    
                fig, ax = plt.subplots(nrows=3,figsize=(8, 6))       
    
                librosa.display.waveshow(window_samples, sr=SAMPLING_RATE, ax=ax[0])
                ax[0].set_ylim(-0.1,0.1)
            
                img = librosa.display.specshow(window_fft, y_axis='linear', x_axis='time',
    
                                   sr=SAMPLING_RATE, ax=ax[1])
                fig.canvas.draw()
                visualization_figure = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
                visualization_figure = visualization_figure.reshape(fig.canvas.get_width_height()
                [::-1] + (3,))
                
                font = cv2.FONT_HERSHEY_SIMPLEX
                cv2.putText(visualization_figure, str(classification_result[0]), (150,500), 
                font, 3, (0, 100, 0), 10, cv2.LINE_AA)
                 
                cv2.imshow("Input", visualization_figure)
    
            cv2.waitKey(1)
    
    except KeyboardInterrupt:
        input_audio_stream.stop_stream()
        input_audio_stream.close()
    

    Note that I have created a three-row plot for visualization purpose. The first two rows are the window signal and window signal spectrogram respectively. The third row is left empty as a spaceholder for showing the classification result. Then the matplot figure is converted to OpenCv image using the following lines and eventually result is printed on the image using opencv puttext function.

    visualization_figure = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    visualization_figure = visualization_figure.reshape(fig.canvas.get_width_height()[::-1] + (3,))

    Note: In the entire process I just SVM model with default parameters (C=1.0, kernel=’rbf’, degree=3), but generally it’s a good practice to try several different machine learning models and choose the one which showed better results in terms of accuracy and generalization performance.

  • Machine Learning From Scratch – Linear Regression

    ·

    I have watched a lot of tutorials and passed a bunch of courses about machine learning and AI before, but other than utilizing those methods on class assignments and a few of them for my master’s thesis, I didn’t have the chance to use machine learning and specifically deep learning in a long-term practical project. Fortunately, due to the need, I have in my research, I have started to study machine learning methods from scratch and this time more in-depth. According to a famous quote by Albert Einstein

    “You do not really understand something unless you can explain it to your grandmother.”

    It is quite important to be able to transfer your knowledge to others using plain and easily understandable descriptions which also helps to solidify your comprehension of the topic. So, I have decided to start a series of blog posts to build common machine learning algorithms from scratch in order to clarify these methods for myself and make sure I correctly understand the mechanics behind each of them.

    Let me be clear from the very beginning: It’s all about fitting a function!

    Consider we have a dataset containing the number of trucks crossing the border from Mexico to the U.S. through Ottay Messa port. Note that it’s just a subset of the entire inbound crossings dataset available on Kaggle. First of all, it’s always better to plot the data which may help us have some insight. To load the data from a .CSV file, we are going to use Pandas which is a well-known data analysis/manipulation python library. We can then plot the data using Matplotlib (another python library for data visualization).

    df = pd.read_csv('./regression/dataset.csv')
    x_training_set = pd.to_datetime(df.Date, infer_datetime_format=True).to_numpy()
    y_training_set = df.Value.to_numpy()
    number_of_training_examples = len(x_training_set)
    
    #plot the raw data
    fig, ax = plt.subplots(figsize=(8, 6))
    year_locator = mdates.YearLocator(2)
    year_month_formatter = mdates.DateFormatter("%Y-%m")
    ax.xaxis.set_major_locator(year_locator)
    ax.xaxis.set_minor_locator(mdates.YearLocator())
    ax.xaxis.set_major_formatter(year_month_formatter)
    ax.plot(x_training_set,y_training_set, ".")
    fig.autofmt_xdate()
    plt.show()

    Note that in the original data, each value is corresponding to a month, so I mapped the date intervals into an integer representation.

    What we are observing here obviously is not an exact linear function, but for the sake of simplicity, we can model border crossings using a linear function! As we already know, the equation of a line is a below:

    \[f(x) = mx + c\]

    where m stands for the slope and c is the intercept on the y-axis. But we can have an infinite number of possible values for these parameters. Let’s look at some possible arbitrary lines with values [m=70,c=40000], [m=100,c=40000], [m=140,c=40000] represented with orange, green, and red colors respectively.

    #plot arbitrary lines
    def plot_line(ax, m,c,xMax):
        y0 = (1*m)+c
        ymax = (xMax*m)+c
        ax.plot([x_training_set[0],x_training_set[xMax-1]],[y0,ymax])
    
    fig, ax = plt.subplots(figsize=(8, 6))
    year_locator = mdates.YearLocator(2)
    year_month_formatter = mdates.DateFormatter("%Y-%m")
    ax.xaxis.set_major_locator(year_locator)
    ax.xaxis.set_minor_locator(mdates.YearLocator())
    ax.xaxis.set_major_formatter(year_month_formatter)
    ax.plot(x_training_set,y_training_set, ".")
    fig.autofmt_xdate()
    
    plot_line(ax,70,40000,number_of_training_examples) 
    plot_line(ax,100,40000,number_of_training_examples) 
    plot_line(ax,140,40000,number_of_training_examples) 
    plt.show()

    But, What are the parameter values we are choosing for our linear equation to properly fit our data points?

    Analytical Approach to find the regression parameters

    To find the proper fit for our data, basically, we have to minimize the average distance of the data points to our arbitrary line. In other words, we are finding the difference between the predicted value with the actual training data.

    there are two ways to calculate the error:

    Mean Squared Error (MSE): which considers the squared difference of the values.

    Mean Absolute Error (MAE): which considers the absolute difference of the values.

    Note that we need to sum up these error values as positive numbers. Otherwise negative values will compensate for positive ones which will make our optimization problem impossible.

    Our objective is to find our linear equation parameters m and c to minimize average error over all training set data (known as the cost function) defined below:

    \[Cost Function(m,c) = {\frac{1}{n}\sum_{i=1}^{n} (f(x_i)-y_i)^2}\]

    where n is the number of training examples. Now let’s explore the parameters space and plot the cost function to see what it looks like. for the MSE cost function, we have the parameters space-cost plot below

    # visualize cost function
    def line_equation(m,c,x):
        return (m*x)+ c
    
    def cost_function(m,c,training_examples_x,training_examples_y):
        sum_of_errors = 0
        item_index = 0
        for example in training_examples_x:
            predicted_y = line_equation(m,c,item_index)
            sum_of_errors += (predicted_y - training_examples_y[item_index])**2
            #sum_of_errors += abs(predicted_y - training_examples_y[item_index])
            item_index+=1
        mse = (sum_of_errors / len(training_examples_x))
        return mse
    
    fig = plt.figure()
    fig.set_size_inches(8, 6)
    ax = fig.add_subplot(projection='3d')
    cost_func_x_points = []
    cost_func_y_points = []
    cost_func_z_points = []
    for m in np.arange(-200,500,10):
        for c in np.arange(-10000,60000,200):
             cost = cost_function(m,c,x_training_set,y_training_set)
             cost_func_x_points.append(m)
             cost_func_y_points.append(c)
             cost_func_z_points.append(cost)
    ax.scatter(cost_func_x_points, cost_func_y_points,
    cost_func_z_points,c=cost_func_z_points,marker='.')
    ax.set_xlabel('M')
    ax.set_ylabel('C')
    ax.set_zlabel('Cost')
    plt.show()

    and for the MAE we have the plot below:

    As you can see both these cost functions are convex and they just have one minimum point at the bottom of the slope(global minima). Then based on calculus, it means that if we could find the point that the derivative of this function is zero, we have found the optimal parameters for our model. We can simply use the equation below to find the parameters.

    \[theta = (X^T.X)^-1.(X^T.Y)\]

    where X is the training features sample vector. Y is the output vector. and the result(theta) are the parameters for our regression model.

    This equation is called a Normal Equation and you can find the math behind it here.

    So let’s run it on our dataset and see how it works.

    #linear equation using numpy
    x_training_set_numbers = np.arange(0,len(x_training_set),1)
    ones_vector = np.ones((len(x_training_set), 1))
    x_training_set_numbers = np.reshape(x_training_set_numbers, (len(x_training_set_numbers), 1))
    x_training_set_numbers = np.append(ones_vector, x_training_set_numbers, axis=1)
    theta_list = np.linalg.inv(x_training_set_numbers.T.dot(x_training_set_numbers)) \
    .dot(x_training_set_numbers.T).dot(y_training_set)
    print(theta_list)
    
    #visualize raw data with fitted line
    fig, ax = plt.subplots(figsize=(8, 6))
    year_locator = mdates.YearLocator(2)
    year_month_formatter = mdates.DateFormatter("%Y-%m")
    ax.xaxis.set_major_locator(year_locator)
    ax.xaxis.set_minor_locator(mdates.YearLocator())
    ax.xaxis.set_major_formatter(year_month_formatter)
    ax.plot(x_training_set,y_training_set, ".")
    fig.autofmt_xdate()
    plot_line(ax,theta_list[1],theta_list[0],number_of_training_examples) 
    plt.show()
    [48326.215617      97.55941938]

    Model Evaluation

    Now that we have fit a line on our data, are we done? Nope!

    We need to evaluate the model utilizing different metrics and plots to make sure the model we proposed would generalize well on new data.

    To evaluate a model there are several metrics we can use.

    1- MSE (Mean Squared Error)

    \[MSE = {\frac{1}{n}\sum_{i=1}^{n} (y – y_p)^2}\]

    2- MAE (Mean Absolute Error)

    \[MAE = {\frac{1}{n}\sum_{i=1}^{n} (|y – y_p|)}\]

    These two metrics mentioned above, have the exact same definition we had in defining the cost functions. The main difference between these two is MSE penalizes prediction errors heavily in comparison to MAE. Generally, we want these scores to be as close as possible to zero.

    mse_value = 0
    mae_value = 0
    m  = theta_list[1]
    c = theta_list[0]
    for sample_x in range(0,number_of_training_examples):
        predicted_y =  m * sample_x + c 
        sample_y = int(y_training_set[sample_x])
        mse_value += (predicted_y - sample_y)**2
        mae_value += abs(predicted_y - sample_y)
    print(f"Mean Squared Error: {mse_value//number_of_training_examples}\n")
    print(f"Mean Absolute Error: {mae_value//number_of_training_examples}\n")    

    3- R-Squared Score

    In terms of regression evaluation, the R-Square score or R2 score is one of the most useful ones. It indicates how much variance of the data is explained by the model. In other words, it indicates how close is the data point to the fitted line. R2 score is defined as below:

    \[R2 = {\frac{ModelVariance}{TotalVariance}}\] \[= {1 – \frac{Sum Of Squared Regression(SSR)}{Total Sum Of Squares(SST)}}\] \[= {1 – \frac{\sum_{i=1}^{n} (y – y_p)^2}{\sum_{i=1}^{n} (y – y_m)^2}}\]

    R2 score must be a number between 0 and 1. In regression modeling, we aim to maximize the R2 score toward 1 as much as possible. Note that if the R2 score is a negative value, it’s indicating that something is wrong with the modeling or implementations.

    4- Residual Plot

    Also, it’s crucial to plot residuals to see if linear regression is a good choice to model our data. In regression, residual is the difference between the actual value and the predicted value. Residual points have to distribute equally along the horizontal axis. Otherwise, the model is not performing well and probably it’s not reliable enough.

    \[residuals = y – y_p\]

    In the figure above, except at the beginning and the end of the plot, in which residuals have greater values, residuals are somehow randomly distributed. Generally, for a good linear regression model, data points in this plot must be as close as possible to the horizontal axis, and also they have to be uniformly distributed along the plot.

    Code Refactoring

    although what we have done till here is working, but it’s quite a mess in terms of performance and cleanness. Accordingly, I’m gonna wrap the implementation in a class called LinearRegressor as follows.

    import numpy as np
    
    class LinearRegressor:
        """Simple linear regression model"""
        def __init__(self, loss='MSE'):
            self.coeff = []
    
            def MSE(y_predicted, y_target):
                return ((y_target-y_predicted)**2).mean()
    
            def MAE(y_predicted, y_target):
                return (abs(y_target-y_predicted)).mean()
    
            if loss == 'MSE': self.calculate_loss = MSE
            elif loss == 'MAE': self.calculate_loss = MAE
            else: self.calculate_loss = MSE
    
        def __add_bias(self, x):
            one = np.ones((len(x),1))
            return np.append(one, x, axis=1)
    
        def fit(self, x, y):
            x = self.__add_bias(x)
            self.coeff = np.linalg.inv(x.T.dot(x)).dot(x.T).dot(y)
    
        def predict(self, x):
            x = self.__add_bias(x)
            return x.dot(self.coeff)

    Test for simple regression

    # simple linear regression 
    x = np.array([1,2,3,4,5,6]).reshape(-1, 1)
    y = np.array([2,4,6,8,14,12]) 
    
    model = LinearRegressor(loss="MAE")
    model.fit(x,y)
    predicted_y = model.predict(x)
    print(f'Coefficients: {model.coeff}')
    print(f'Predicted values: {predicted_y}')
    print(f'Loss: {model.calculate_loss(predicted_y,y)}')
    Coefficients: [-0.53333333  2.34285714]
    Predicted values: [ 1.80952381  4.15238095  6.4952381   8.83809524 11.18095238 13.52380952]
    Loss: 1.00317460317459907

    Test for multiple regression

    #multiple regression 
    x = np.array([[1,2],[3,4],[5,6],[7,6],[9,8],[11,6]])
    y = np.array([2,4,6,8,14,12]) 
    
    model = LinearRegressor(loss="MAE")
    model.fit(x,y)
    predicted_y = model.predict(x)
    print(f'Coefficients: {model.coeff}')
    print(f'Predicted values: {predicted_y}')
    print(f'Loss: {model.calculate_loss(predicted_y,y)}')
    Coefficients: [-1.36363636  0.81818182  0.77272727]
    Predicted values: [ 1.          4.18181818  7.36363636  9.         12.18181818 12.27272727]
    Loss: 0.939393939393931822

    Great! Note that to be able to multiply a 1-d NumPy array with another array we need to use reshape() function, or change the array to an array of 1×1 arrays!

    Also, to consider biases in this class, we have to add a column with value of 1 to each datapoint in the dataset which is done in both fit() and predict() function by calling the __add_bias() function.

    Let’s run it on the dataset we prepared in beginning of the post and see if it leads to the same result.

    import pandas as pd
    import numpy as np
    from linearregressor import LinearRegressor 
    
    df = pd.read_csv('./regression/dataset.csv')
    x_raw = pd.to_datetime(df.Date, infer_datetime_format=True).to_numpy()
    y = df.Value.to_numpy()
    
    x = np.arange(0,len(x_raw),1)
    
    model = LinearRegressor()
    model.fit(x.reshape(-1, 1),y)
    predicted_y = model.predict(x)
    
    print(f'Coefficients: {model.coeff}')
    print(f'Loss: {model.calculate_loss(predicted_y,y)}')
    [48326.215617      97.55941938]
    Loss: 35035066.896423854

    Polynomial Regression

    Based on the visualizations we had in the very first steps, we could clearly see that what we are dealing with is not a linear function. Accordingly, we have to try a more flexible function to fit to our data. To do so, what we can do is adding polynomial features created from the original features.

    x = np.arange(0,len(x_raw),1).reshape(-1, 1)
    x2 = (x)**(2)
    x = np.append(x2, x , axis=1) 
    print(x.shape)
    (279, 2)

    Then feed the new feature set to the linear regressor model.

    model = LinearRegressor()
    model.fit(x,y)
    predicted_y = model.predict(x)
    
    print(f'Coefficients: {model.coeff}')
    print(f'Loss: {model.calculate_loss(predicted_y,y)}')
    Coefficients: [4.94483798e+04 8.74345493e-02 7.32526147e+01]
    Loss: 34777741.55225808

    Good! it could decrease the loss value! By the same way we can add higher order polynomials.

    x = np.arange(0,len(x_raw),1).reshape(-1, 1)
    x2 = (x)**(2)
    x3 = (x)**(3)
    x4 = (x)**(4)
    x = np.append(x2, x , axis=1)
    x = np.append(x3, x , axis=1)
    x = np.append(x4, x , axis=1)
    print(x.shape)
    
    model = LinearRegressor()
    model.fit(x,y)
    predicted_y = model.predict(x)
    
    print(f'Coefficients: {model.coeff}')
    print(f'Loss: {model.calculate_loss(predicted_y,y)}')
    279, 4)
    Coefficients: [ 4.13050845e+04 -4.22457826e-07  7.56166532e-03 -3.07680331e+00
      4.26784643e+02]
    Loss: 24975813.83882587
  • Both PyTorch and TensorFlow can utilize GPUs to train neural networks faster. To make them work, it’s important to properly set up the whole mechanics. I have done a few projects using classic machine learning methods and I also watched some tutorials and books here and there, but I never had the chance to use deep learning on a practical project. So, I’m kinda new to deep learning. Recently, I was trying to train different deep learning models for an instance segmentation task, since I had zero knowledge about how to make things work, I ended up struggling with various challenges for hours. I decided to write about the steps, tricks, and solutions for the issues I encountered. I was trying to train models on a local computer equipped with Nvidia GPUs (lambda workstation).

    The Cuda toolkit is another software layer on top of the Nvidia Driver. As is mentioned on the Nvidia website, different Cuda toolkit versions are mostly forward-compatible with Cuda drivers. It means that if you already have nvidia-driver-515, which is a fairly new version, it is compatible with cuda-toolkit-11-2.

    Installing Nvidia Driver

    Nvidia driver is the underlying library necessary for making the operating system (In our case Ubuntu 20.04) work with the graphical processors. To install drivers you can simply run the following command in the terminal:

    sudo ubuntu-drivers autoinstall

    or the following command in case you need a specific version:

    sudo apt install nvidia-driver-470

    it is also possible to install the drivers using Ubuntu Software Center

    to verify successful installation you should run the command below:

    nvidia-smi

    If you look at the TensorFlow installation instructions, the compatible cuda-toolkit version is mentioned in Conda install command.

    To make TensorFlow work, we should install the cuda-toolkit 11.2. basically, we assume that Conda will take care of the Cuda toolkit installation, but it DOESN’T. It is necessary to install the Cuda toolkit separately on the system. We can either install the Cuda toolkit using the run file provided by Nvidia or install it using an apt command.

    wget https://developer.download.nvidia.com/compute/cuda/11.2.0/local_installers/cuda_11.2.0_460.27.04_linux.run
    sudo sh cuda_11.2.0_460.27.04_linux.run

    Apt installation:

    sudo apt install nvidia-cuda-toolkit-11-2

    Note: it is necessary to set the Cuda toolkit path to environment variables to make TensorFlow and PyTorch able to find libraries and tools. you can add the following commands to your ~/.bashrc to pick the Cuda toolkit automatically in every new bash.

    export CUDA_HOME=/usr/local/cuda
    export PATH=/usr/local/cuda/bin:$PATH
    export CPATH=/usr/local/cuda/include:$CPATH
    export LIBRARY_PATH=/usr/local/cuda/lib64:$LIBRARY_PATH
    export LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64:$LD_LIBRARY_PATH

    To verify the cuda toolkit installation you should run:

    nvcc --version

    Setting up PyTorch(GPU):

    As it is provided by the PyTorch website, it’s possible to set up different versions of PyTorch using Conda. It is supporting the Cuda toolkit 10.7, 11.3, and 11.6.

    Note that any other versions of the Cuda toolkit will not work with PyTorch, so make sure the installed version of the cuda toolkit is among the versions mentioned in PyTorch installation instructions!

    Setting up TensorFlow(GPU):

    After setting up the Cuda driver and toolkit, we are ready to install TensorFlow using the commands below:

    conda install -c conda-forge cudatoolkit=11.2 cudnn=8.1.0
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib/
    python3 -m pip install tensorflow
    # Verify install:
    python3 -c "import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))"

    Note that before using TensorFlow, each time a new terminal is opened, it is necessary to define the Cuda toolkit path in the environment with the command below. It is also possible to add the Cuda toolkit path permanently using the commands below,

    mkdir -p $CONDA_PREFIX/etc/conda/activate.d
    echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib/' > $CONDA_PREFIX/etc/conda/activate.d/env_vars.sh

    This way Conda will automatically load the cuda toolkit path each time activating an environment.

    Setting up TensorFlow(GPU) with Docker:

    One way to avoid problems with the cuda-toolkit and also a way to be able to use different versions of TensorFlow is running Tensorflow docker containers which DIRECTLYinteract with the Cuda driver. There are different containers of TensorFlow each integrated with Jupiter lab. After running the container, TensorFlow will be accessible through Jupyter lab.

    after installing Docker you should install nvidia-docker2 using the following command:

    sudo apt-get install -y nvidia-docker2

    pull the TensorFlow(bundled with Jupyterlab) container:

    docker pull tensorflow/tensorflow:latest-gpu-jupyter

    then run the container and access Jupyter lab through your browser:

    docker run --gpus all -it -p 8888:8888 tensorflow/tensorflow:latest-gpu-jupyter

    make sure to add the –gpus all flag to the command to give docker access to GPUs. you can then access the jupyter notebook through http://yourip:8888/ (use localhost if you are running it on a local computer). Also, you can add –restart unless-stopped flag to make the docker container run after each restart(unless stopped manually).

    docker run --restart unless-stopped --gpus all -it -p 8888:8888 tensorflow/tensorflow:latest-gpu-jupyter

    To verify if GPU devices are available in TensorFlow run the following code in a jupyter notebook cell

    import tensorflow as tf
    print(tf.config.list_physical_devices('GPU'))