ESP32 JSON Webserver

Den kürzlich begonnenen Webserver, der alle Werte der GPIO Pins in einem einstellbaren Intervall abfragt und dann per REST Schnittstelle zur Verfügung stellt, habe ich erweitert und verbessert.

Zuerst die Sensorenklasse:

# sensors.py
import time

from machine import Pin, Timer

from sensor import Sensor

class Sensors():
    
    def __init__(self):
        self._sensors = {}
        self.add_sensor("_timeset", 0, 0)
        self._timer = Timer(-1)
        self.time_set()
        
    def time_set(self):
        self.get_sensor("_timeset").set_sensor_value(time.gmtime())
        
    def add_sensor(self, sensor_name, pin_sensor, pin_led):
        sensor = Sensor(sensor_name, pin_sensor, pin_led)
        self._sensors[sensor_name] = sensor
    
    def exists(self, sensor_name):
        return sensor_name in self._sensors
        
    def get_sensors(self):
        return self._sensors
        
    def get_sensor(self, sensor_name):
        return self._sensors[sensor_name]
      
    def read_sensor(self, sensor):
        sensor.read_sensor()
            
    def read_sensors_loop(self, t):
        self.time_set()
        for sensor_name in self._sensors:
            self.read_sensor(self._sensors[sensor_name])
            
    def start(self, intervall):
        self._timer.init(freq=intervall, mode=Timer.PERIODIC, callback=self.read_sensors_loop)

Der Sensoren Klasse können alle am ESP32 angeschlossenen Sensoren mit sensors.add_sensor(name, pin, led) hinzugefügt werden. Mit start() kann man in einem einstellbaren Intervall alle registrierten Sensoren abfragen und den aktuellen Wert abspeichern:

from lib.sensors import Sensors

sensors = Sensors()
sensors.add_sensor("rechts", 0, 0)
sensors.add_sensor("links", 0, 0)
sensors.start(100)

Die Sensor Klasse:

# sensor.py

from machine import Pin
import time

class Sensor():
    def __init__(self, sensor_name, pin_sensor=0, pin_led = 0):
        self._sensor_value = 0
        self._sensor_name = sensor_name
        self._pin_sensor = pin_sensor
        self._pin_led = pin_led
        self.blink_led(0)
        self.read_sensor()

    # kann auch manuell gesetzt werden    
    def set_sensor_value(self, sensor_value: bool):
        self._sensor_value = sensor_value
    
    def get_sensor_name(self) -> str:
        return self._sensor_name
    
    def get_sensor_value(self):
        return self._sensor_value
    
    def blink_led(self, on_off):
        if (self._pin_led > 0):
            led=Pin(self._pin_led,Pin.OUT) #create LED object from self._pin_led,Set Pin to output
            if on_off > 0:
                led.on()
            else:
                led.off()
    
    def read_sensor(self):
        if (self._pin_sensor > 0):
            sensor = Pin(self._pin_sensor, Pin.IN)
            sensor_value = sensor.value()
            self.blink_led(sensor_value)
            self.set_sensor_value(sensor_value)

Ein Sensor besitzt einen Namen, eine PIN Nummer und eine LED Nummer, falls man den Status des Sensors mit einer LED anzeigen will, kann diese automatisch mit an / aus reagieren.

Mit 0, 0 können auch weitere Werte, z.B. von anderen Boards veröffentlicht werden, die keinen PIN abfragen.

Der Webserver:

# json_webserver.py

import usocket as socket
import ujson as json
import time

def json_response(c):
    data = {}
    for sensor_name in c.get_sensors():
        data[sensor_name] = c.get_sensor(sensor_name).get_sensor_value()
    html = json.dumps(data)
    return html

def webserver(c):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(('', 8081))
    s.listen(5)

    while True:
        try:
            conn, addr = s.accept()
            print('Got a connection from %s' % str(addr))
            request = conn.recv(1024)
            conn.settimeout(None)
            request = str(request)
            response = json_response(c)
            conn.send('HTTP/1.1 200 OK\n')
            conn.send('Content-Type: application/json\n')
            conn.send('Connection: close\n\n')
            conn.sendall(response)
            conn.close()
        except OSError as e:
            conn.close()
            print('Connection closed')

Nun muss man das Ganze noch aufrufen:

from sensors import Sensors
from json_webserver import webserver

sensors = Sensors()
sensors.add_sensor("rechts", 0, 0)
sensors.add_sensor("links", 0, 0)
sensors.start(100)

webserver(sensors)

An den Webserver werden alle registrierten Sensoren übergeben. Alle 100ms aktualisiert der ESP32 die Werte der Sensoren an der Schnittstelle.

Die REST Schnittstelle kann man nun mit:

import requests
import json

r = requests.get("http://espressif:8081/")

r.content
result = json.loads(r.content)
result

abfragen und verarbeiten.

Link:

Webinterface aktivieren ESP32

Diese Seite verwendet Cookies, um die Nutzerfreundlichkeit zu verbessern. Mit der weiteren Verwendung stimmen Sie dem zu.

Datenschutzerklärung