ESP32 Webserver

Nach der Einrichtung des ESP32 und dem Sammeln erster Erfahrungen, will ich einen Versuch starten die Sensordaten des ESP32 per REST Service im JSON Format bereit zu stellen.

Für Micro Python existieren hierfür bereits einige fertige Bibliotheken:

Das Problem ist hier jedoch, dass diese Bibliotheken sich im besten Fall mehr als 500ms gönnten, bis der Request / Response Zyklus abgeschlossen war. Das mag für eine Tweet-Wall oder ein Dashboard ausreichend sein. Bei der Überwachung von Fahrdaten eines Roboterfahrzeugs ist das jedoch unbrauchbar.

Daher soll für die Ausgabe der Sensordaten des ESP32 ein einfacher GET Request für alles auf einmal dienen.

Zunächst jedoch für die angeschlossenen Sensoren (Zwei Infrarot Abstandssensoren jeweils links und rechts), die zwei Sensoren Klassen:

# sensor.py

from machine import Pin

class Sensor():
    def __init__(self, sensor_name, pin_sensor, pin_led):
        self._sensor_value = 0
        self._sensor_name = sensor_name
        self._pin_sensor = pin_sensor
        self._pin_led = pin_led
        self.blink_led(0)
        self.update_sensor()
        
    def set_sensor_value(self, sensor_value: bool):
        self._sensor_value = sensor_value
    
    def get_sensor_name(self) -> str:
        return self._sensor_name
    
    def get_sensor_value(self) -> bool:
        return self._sensor_value
    
    def blink_led(self, on_off: bool):
        led=Pin(self._pin_led,Pin.OUT) #create LED object from self._pin_led,Set Pin to output
        led.value(int(on_off)) #Set led turn to: on_off
    
    def update_sensor(self):
        sensor = Pin(self._pin_sensor, Pin.IN)
        sensor_value = bool(abs(sensor.value()-1))
        self.blink_led(sensor_value)
        self.set_sensor_value(sensor_value)
# sensors.py

import time

from machine import Pin, Timer

from sensor import Sensor

class Sensors():
    
    def __init__(self):
        self._sensors = {}
        self._timer = Timer(-1)
        
    def add_sensor(self, sensor_name, pin_sensor, pin_led):
        sensor = Sensor(sensor_name, pin_sensor, pin_led)
        self._sensors[sensor_name] = sensor
    
    def exists(self, sensor_name):
        return sensor_name in self._sensors
        
    def get_sensors(self):
        return self._sensors
        
    def get_sensor(self, sensor_name):
        return self._sensors[sensor_name]
      
    def update_sensor(self, sensor):
        sensor.update_sensor()
            
    def update_loop(self, t):
        for sensor_name in self._sensors:
            self.update_sensor(self._sensors[sensor_name])
            
    def start(self, intervall):
        self._timer.init(freq=intervall, mode=Timer.PERIODIC, callback=self.update_loop)

Die Klassen dienen dazu die Sensoren regelmäßig im Intervall auszulesen:

from lib.sensors import Sensors

c = Sensors()
c.add_sensor("rechts", 35, 32)
c.add_sensor("links", 34, 33)
c.start(100) # alle 100 ms die Sensoren auslesen.

Damit kann man beliebig viele Sensoren an das Board anschließen und auswerten.

Der Webserver soll diese nun anderen Nodes im Roboterfahrzeug zur Verfügung stellen:

# home.py

import usocket as socket
import ujson as json
from lib.sensors import Sensors

c = Sensors()
c.add_sensor("rechts", 35, 32)
c.add_sensor("links", 34, 33)
c.start(100)

def json_response():
    data = {}
    for sensor_name in c.get_sensors():
        data[sensor_name] = c.get_sensor(sensor_name).get_sensor_value()
    html = json.dumps(data)
    return html

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 8081))
s.listen(5)

while True:
    try:
        conn, addr = s.accept()
        print('Got a connection from %s' % str(addr))
        request = conn.recv(1024)
        conn.settimeout(None)
        request = str(request)
        response = json_response()
        conn.send('HTTP/1.1 200 OK\n')
        conn.send('Content-Type: application/json\n')
        conn.send('Connection: close\n\n')
        conn.sendall(response)
        conn.close()
    except OSError as e:
        conn.close()
        print('Connection closed')

Die Einrichtung des ESP32 für das lokale Netzwerk habe ich bereits hier beschrieben.

Im Jupyter Notebook kann das Ergebnis schließlich noch überprüft werden:

import requests
import json

r = requests.get("http://espressif:8081")

r.content
result = json.loads(r.content)
result

{‚rechts‘: False, ‚links‘: False}

Response Zeit liegt unter 0,1 Sekunden:

Je nach Typ des Sensors muss die Sensor Klasse natürlich noch angepasst werden.

Diese Seite verwendet Cookies, um die Nutzerfreundlichkeit zu verbessern. Mit der weiteren Verwendung stimmen Sie dem zu.

Datenschutzerklärung