micro python

  • MicroFastApi – A Micro Adventure in Python

    ·

    In the previous post, I covered the very basic implementation of a web API structure using python decorators. But, it was just the beginning of a journey I have started. I decided to challenge myself and gradually develop this library into a real one!

    Accordingly, I will write up the process of design and implementation of features for it.

    MicroPython Threading

    First of all, we have to implement a function running on a separate thread inside the MicroFastApi class to receive HTTP requests and send back responses in the most efficient way possible. However it’s an experimental module, fortunately, MicroPython has a low-level threading module _thread available which saves us a lot of headaches.

    So let’s briefly take a look at a simple example of running a thread in micropython. To create a thread we should call the function start_new_thread() in the _thread module as follows:

    import _thread
    import time
    
    def _http_handler():
        counter=0
        while True:        
            print(counter)
            counter+=1
            time.sleep(0.1) # 100 ms
    
    threadobject = _thread.start_new_thread(_http_handler,())

    Note that the time module provides the function sleep() which receives a floating point as the number of seconds to sleep.

    This is the simplest use case that we are gonna embed inside the MicroFastApi class. Other than that, since the routes dictionary object is accessed in both the main and the dispatcher thread, we are gonna create a lock object using the allocate_lock() function and use it to make sure no race condition is gonna happen. Then we can either use acquire() and release() functions of the lock object explicitly or use python with statement.

    with self.lock:
        self.routes[route] = decorated_get_func

    So the class will become like this:

    import functools #pip install micropython-functools
    import _thread
    import time
    
    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
            self.lock = _thread.allocate_lock()
            self.thread = _thread.start_new_thread(self._http_handler, ())
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info
    
        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
    
                    return ret_val
                with self.lock:
                    self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api
    
        def _http_handler(self):
    
            while True:
    
                with self.lock: # acquire the lock and release it when done.
                    for route in self.routes:
                        print(f"route {route} is mapped to {self.routes[route].__name__}. result:"+ self.routes[route]())
    
                time.sleep(1)

    Handling API Requests

    Now the general idea is listening to a port specified in the constructor, receiving requests in the thread loop, parsing the incoming HTTP requests, calling the corresponding routine defined the in the routes dictionary, and eventually returning the JSON result.

    The most basic way is listening to a TCP connection using the micrpython implementation of the python low-level networking module named socket, and then parsing the receiving packets.

    To do so, we modify the construction function to receive the port number we intend the server to listen to. Next, we create a socket object and bind it to the specified port in the constructor.

    addr = socket.getaddrinfo("0.0.0.0", self.port)[0][-1]
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        
    self.socket.bind(addr)
    self.socket.listen(1)

    Note that passing the “0.0.0.0” IP address to the bind() function means that the server can receive connections from any possible IPv4 addresses. Also, we passed the socket.AF_INET and socket.SOCK_STREAM to the socket() factory function respectively to create an IPV4 socket with a TPC protocol connection. The integer value passed to the listen() function, defines the concurrent connections capacity of the socket.

    After setting up the socket connection, now we can accept() connections in a loop inside the http_handler() function we had before.

    while True:
        try:
            client_socket, client_addr = self.socket.accept()                
            print('client connected from', client_addr)
            testjson = {'userId':'1'}
            response = json.dumps(testjson)
            client_socket.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
            client_socket.send(response)
            client_socket.close()
            print("client closed!")
        except OSError as e:
            print(e)

    The accept() function returns a socket to the client along with its address which should be used to send back the results to the client. In this example, we created a dictionary and converted it to a JSON string, and sent the result through the client socket. If we run the project and click on the server URL provided in the terminal, we can expect the following JSON result in the browser:

    API server running on http://10.0.0.67:80
    client connected from ('10.0.0.30', 55780)
    client closed!
    {"userId": "1"}

    This solution is quite clear, but since the MicroPython _thread module is in experimental development, it’s not reliable. If you run the code above, the socket inside the thread function works just once and after that, it will freeze! I was struggling with this issue for several hours today and eventually found out there is an open issue in MicroPython GitHub for this problem. Let’s cross our fingers to see a bug fix in a near future.

    To be able to continue this project, I commented out the threading-related code and added a function called Run(). Within that, called the _http_handler() function which contains the main loop.

    Now, we have a very basic working structure for the providing WebApi on the RasbperyPi Pico, let’s get to next step, which is developing a simple HTTP request parser to be used for our goal.

    A Simple HTTP Request Parser

    First of all we should take a look at the anatomy of a HTTP request. Each request has three main sections as follows.

    • Request LineGenerally, this line is made of HTTP Method + URI + HTTP Protocol. HTTP Method defines the type of a request, whether it is a GETPOSTPUT, or DELETE request. After the type, it contains the URI of the request, which corresoponds to the route we have defined in the server. And Finaly, the HTTP protocol version which can be one of the HTTP/1.1HTTP/2 , or HTTP/3. For example the request line to get the root route would be like:GET / HTTP/1.1\r\n
  • Python in Depth – Decorators

    ·

    I’m planning to do a research project which needs reading an analog sensor using Raspberry Pi Pico W and feed it to a TinyML model, store the results and provide a RESTFUL api for data transfer. To have a faster development process, I have decided to give MicroPython a try for this project.

    There would be several examples out there about implementing a tiny webserver on rasberypi pico. But, unfortunately, I couldn’t find a framework like FastApi, or Django to make web api development easily possible on micropython.

    Accordingly, I decided to start writing a Web API Framework with Micropython on Raspberry Pi Pico W from scratch, and in the meanwhile, tackle a bit more with advanced python software development topics.

    This work is inspired by FastAPI framework which by now is the most popular python framework for RESTFUL api development. So, let’s briefly take a look at a very simple FastAPI example provided in its website:

    from typing import Union
    from fastapi import FastAPI
    
    app = FastAPI()
    
    @app.get("/")
    def read_root():
        return {"Hello": "World"}
    
    @app.get("/items/{item_id}")
    def read_item(item_id: int, q: Union[str, None] = None):
        return {"item_id": item_id, "q": q}

    Ok, what’s going on here?

    FastAPI module is imported, an object from FastAPI class instantiated, two functions are defined to return a dictionary, and the route to those functions are defined by @app.get() decorator. Bravo! Such a neat design!

    What is a Decorator in python?

    The key element that let us design this way, is the decorators in python. In simple words, a decorator is a function that receives a function or a class and change its behavior without modifying the function itself!

    Let’s see what does that mean in action. We have a function that prints the current time:

    from datetime import datetime
    
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    get_time()
    17:59:25

    Now consider having another function which receives a function as an argument and prints the phrase “current time is:” before calling it.

    def decorate_time(func):
        print("current time is:")
        func()
    
    decorate_time(get_time)
    current time is:
    17:59:25

    It does the job we have expected, but the problem is if we call the get_time() function again it will only show time. What we want this behavior attached to the original function. To work around this issue, we can define a funtion inside the decorate_time() function and return the new function.

    from datetime import datetime
    
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    def decorate_time(func):
        def add_phrase():
            print("current time is:")
            func()
        return add_phrase
    
    get_time = decorate_time(get_time)
    get_time()
    current time is:
    18:15:02

    Infact, we called the original function in another function’s definition and replaced it with the old definition. What we have done here, is the essence of python decorators. To make the use of decorators more pleasant, in python we can use @ with the decorators name on top the original function definition.

    @decorate_time
    def get_time():
    
        print(datetime.now().strftime('%H:%M:%S'))
    
    get_time()
    current time is:
    18:43:44

    Python decorators with arguments and returning values

    So far so good. But we still have more to do to reach the same design. In the fastapi example, we have seen that the original function could have input parameters. Let’s define the function get_time() to receive an integer value which is then subtracted from the current hour and printed.

    from datetime import datetime, timedelta
    
    def get_time(hours:int):
        new_time = datetime.now() - timedelta(hours=hours) 
        print(new_time.strftime('%H:%M:%S'))
    
    get_time(1)
    18:38:38

    Now if we add the decorator we have defined before, we are gonna encouter with an error:

    TypeError: decorate_time.<locals>.add_phrase() takes 0 positional arguments but 1 was given

    Yes, since the function definition inside the decorator does not receive any arguments in its definition, it led to this error. To solve the issue, we have to define the inner function such that it can receive any number of parameters with any type. It’s done by adding *args, **kwargs to the inner function definition and then pass them to the original function call.

    As a reminder I should remind that * operator is used for packing and unpacking lists in python. In a function definition it means pack the arguments, and in function calls it unpacking the list and passing to the function. In a same way, ** operator is used for packing/unpacking dictionaries in function definition and calling.

    Another thing to be concered of is how the original function name as you can see in the error message is changed now to decorate_time.add_phrase(). To preseve the original function name and docstring, we have to use a built-in python decorator @functools.wraps() available in functools module.

    from datetime import datetime, timedelta
    import functools
    
    def decorate_time(func):
        @functools.wraps(func)
        def add_phrase(*args, **kwargs):
            print("current time is:")
            func(*args, **kwargs)
        return add_phrase
    
    @decorate_time
    def get_time(hours:int):
    
        new_time = datetime.now() - timedelta(hours=hours) 
        print(new_time.strftime('%H:%M:%S'))
    
    get_time(1)
    18:47:08

    Great! Now we can pass parameters to the decorated functions. What if we need to return a value instead of printing it? With the same logic, the decorator inner function should also return the value returned by the original function.

    from datetime import datetime, timedelta
    import functools
    
    def decorate_time(func)            
        @functools.wraps(func):
        def add_phrase(*args, **kwargs):
            print("current time is:")
            ret_val = func(*args, **kwargs)
            return ret_val
        return add_phrase
    
    @decorate_time
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(newtime.strftime('%H:%M:%S')) 
    19:04:39

    Awesome! What if we need to pass an argument to the decorator itself? I mean like the way we can pass the route to the decorator in fastapi.

    @app.get("/items/{item_id}")

    Let’s give it a try.

    @decorate_time("/")
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(get_time)   
    TypeError: 'str' object is not callable

    Why the code ends up calling a string object? Infact, the decorator @ syntactic sugar will translate into something like this:

    get_time = decorate_time(get_time)

    thus, if we pass an argument to the decorator python will translate it to:

    get_time = decorate_time("/")(get_time)

    That’s why we got an error. Instead of receiving a function object we passed a string to the decorator and it ended up calling a string object! To work aroud this issue, we wrap the entire decorator in another decorator!

    from datetime import datetime, timedelta
    import functools
    
    def outer_decorator(route):
        print(f"The route is: {route}")
        def decorate_time(func):
            @functools.wraps(func)
            def add_phrase(*args, **kwargs):
                print("current time is:")
                ret_val = func(*args, **kwargs)
                return ret_val
            return add_phrase
        return decorate_time
    
    @outer_decorator("/")
    def get_time(hours:int):
    
        return datetime.now() - timedelta(hours=hours) 
    
    newtime = get_time(1) 
    print(get_time)    

    Now, as before, the decorator actually translate to:

    get_time = outer_decorator("/")(get_time)

    But this time, outer_decorator returns another function which is the previous decorator, and then the it will get called.

    The route is: /
    current time is:
    <function get_time at 0x000002366A209AB0>

    Works like a charm! Note that @functools.wraps is still successfully preserve the original function name and docstring.

    Python decorators and classes

    Now we decorated a function and it’s exactly doing the job it was supposed to do. There is still something missing. In the fastapi example, functions are decorated using the @ with the object name followed by a . and the name of the deocrator.

    We can have different scenarios with decorators and classes. The decorators can easily be used on a class method definition just like we did on global functions. Let’s assume we have class named PyTime as below:

    from datetime import datetime, timedelta
    
    class PyTime:
        def __init__(self):
            pass
    
        def get_time(self, hours:int):
            return datetime.now() - timedelta(hours=hours)
    
    pytime = PyTime()
    newtime = pytime.get_time(1)
    print(newtime)
    2023-01-04 23:35:40.229966

    We can simply decorate it using the decorator we have defined before.

    from datetime import datetime, timedelta
    import functools
    
    def outer_decorator(route):
        print(f"The route is: {route}")
        def decorate_time(func):
            @functools.wraps(func)
            def add_phrase(*args, **kwargs):
                print("current time is:")
                ret_val = func(*args, **kwargs)
                return ret_val
            return add_phrase
        return decorate_time
    
    class PyTime:
        def __init__(self):
            pass
        @outer_decorator("/")
        def get_time(self, hours:int):
            return datetime.now() - timedelta(hours=hours)
    
    pytime = PyTime()
    newtime = pytime.get_time(1)
    print(newtime)
    The route is: /
    current time is:
    2023-01-04 23:38:06.261602

    Note that python also provides several built-in decorators like @property@classmethod, and @staticmethod to be used in decorating class methods.

    The other case is using a class itself as a decorator. Basically, when we instantiate a class, we are calling its constructor fucntion. Now, what if we pass the function we intend to decorate to the class constructor function __init__ ? We can then store the function in the class. Then by implementing __call__ function in the class, it’s possible to make the object callable. That’s all we need to to make a class act as a decorator. The only difference is to preserve the function name and docstring, instead of using the @functools.wraps decorator, we have explicitly call functools.update_wrapper(self, func).

    from datetime import datetime, timedelta
    import functools
    
    class TimeDecorator:    
        def __init__(self, func):
            functools.update_wrapper(self, func)
            self.func = func
    
        def __call__(self, *args, **kwargs):
            print("current time is:")
            return self.func(*args, **kwargs)
    
    @TimeDecorator  # equivalant to obj = TimeDecorator(get_time)
    def get_time(hours:int=0):
            return datetime.now() - timedelta(hours=hours)    
    
    newtime = get_time(1)
    print(newtime)
    current time is:
    2023-01-09 14:46:53.283555

    Using a class method as a decorator

    Yet we couldn’t find the proper design choice for our purpose. FastApi is creating an intance and use a method of that instance to decorate api routines. So, let’s give it a try by writing a class called MicroFastApi as follows.

    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info

    then for the http GET method, we add a new function called get. This function should receive a string to the corresponding route it supposed to get called from.

    It’s quite important to comprehend the underlying mechanics. The final goal is to decorate a function called for example index() or sensors(userId:int=None) like this:

    @app.get('/')
    def index():
        return "root"
    
    @app.get('/sensors/')
    def sensors(userId:int=None):
        return f"sensor data for userId={userId}"

    Since we need to feed the top-level decorator with an string input, we have to implement a nested decorator as discussed before in this post. The decorations methods will translate to:

    index = app.get('/')(index)
    sensors = app.get('/sensors/')(sensors)

    Note that the functions could have parameters and return values which means that we have pack/unpack parameters in the inner decorator function, and also return the values of the original fucntions. The decorator function is gonna be something like this:

        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
                    # post-processing 
                    return ret_val
                self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api

    Need to mention that to have @functools in MicroPython you should install micropython-functools using pip:

    pip install micropython-functools

    If you use Thonny for development on MicroPython, you can use the manage packages option from tools menu to easily install any python package.

    The final class is defined a below:

    import functools #pip install micropython-functools
    class MicroFastApi:
    
        def __init__(self, ip, mac,):
            self.routes = {}
            self.address = ip
            self.mac = mac
    
        def __str__(self):
            server_info = f"""
                device ip = {self.address}
                device mac = {self.mac}
                open the URL provided below in your browser \n http://{self.address}/"""
            return server_info
    
        def get(self,route:str):
            def decorate_get_api(func):
                @functools.wraps(func)
                def decorated_get_func(*args, **kwargs):
                    # pre-processing 
                    ret_val = func(*args, **kwargs)
                    # post-processing 
                    return ret_val
                self.routes[route] = decorated_get_func
                return decorated_get_func
            return decorate_get_api
    
        def run():
            pass

    To verify if everything is working as expected:

    from microfastapi import MicroFastApi
    from wifi import Wifi
    from secrets import *
    
    wifihandle = Wifi(ssid = ssid, password = password)
    
    if not wifihandle.connect():
        raise Exception("Connection failed!")
    
    app = MicroFastApi(ip= wifihandle.ip, mac= wifihandle.mac)
    
    @app.get('/')
    def index():
        return "root"    
    
    @app.get('/sensors/')
    def sensors(userId:int=None):
        return f"sensor data for userId={userId}"
    
    print(app)
    for route in app.routes:
        print(f"route {route} is mapped to {app.routes[route].__name__}. result:"+ app.routes[route]())
    Connecting to: 1122 apt 1224
    Waiting for WiFi connection...
    Connected: True
    
                device ip = 10.0.0.65
                device mac = xx:xx:xx:xx:xx:xx
                open the URL provided below in your browser 
     http://10.0.0.65/
    route / is mapped to decorated_get_func. result:root
    route /sensors/ is mapped to decorated_get_func. result:sensor data for userId=None

    Great! Now, we have the basic design of the web server on our tiny shiny Rasbpery PI Pico W!