Den kürzlich begonnenen Webserver, der alle Werte der GPIO Pins in einem einstellbaren Intervall abfragt und dann per REST Schnittstelle zur Verfügung stellt, habe ich erweitert und verbessert.
Zuerst die Sensorenklasse:
# sensors.py
import time
from machine import Pin, Timer
from sensor import Sensor
class Sensors():
def __init__(self):
self._sensors = {}
self.add_sensor("_timeset", 0, 0)
self._timer = Timer(-1)
self.time_set()
def time_set(self):
self.get_sensor("_timeset").set_sensor_value(time.gmtime())
def add_sensor(self, sensor_name, pin_sensor, pin_led):
sensor = Sensor(sensor_name, pin_sensor, pin_led)
self._sensors[sensor_name] = sensor
def exists(self, sensor_name):
return sensor_name in self._sensors
def get_sensors(self):
return self._sensors
def get_sensor(self, sensor_name):
return self._sensors[sensor_name]
def read_sensor(self, sensor):
sensor.read_sensor()
def read_sensors_loop(self, t):
self.time_set()
for sensor_name in self._sensors:
self.read_sensor(self._sensors[sensor_name])
def start(self, intervall):
self._timer.init(freq=intervall, mode=Timer.PERIODIC, callback=self.read_sensors_loop)
Der Sensoren Klasse können alle am ESP32 angeschlossenen Sensoren mit sensors.add_sensor(name, pin, led) hinzugefügt werden. Mit start() kann man in einem einstellbaren Intervall alle registrierten Sensoren abfragen und den aktuellen Wert abspeichern:
from lib.sensors import Sensors
sensors = Sensors()
sensors.add_sensor("rechts", 0, 0)
sensors.add_sensor("links", 0, 0)
sensors.start(100)
Die Sensor Klasse:
# sensor.py
from machine import Pin
import time
class Sensor():
def __init__(self, sensor_name, pin_sensor=0, pin_led = 0):
self._sensor_value = 0
self._sensor_name = sensor_name
self._pin_sensor = pin_sensor
self._pin_led = pin_led
self.blink_led(0)
self.read_sensor()
# kann auch manuell gesetzt werden
def set_sensor_value(self, sensor_value: bool):
self._sensor_value = sensor_value
def get_sensor_name(self) -> str:
return self._sensor_name
def get_sensor_value(self):
return self._sensor_value
def blink_led(self, on_off):
if (self._pin_led > 0):
led=Pin(self._pin_led,Pin.OUT) #create LED object from self._pin_led,Set Pin to output
if on_off > 0:
led.on()
else:
led.off()
def read_sensor(self):
if (self._pin_sensor > 0):
sensor = Pin(self._pin_sensor, Pin.IN)
sensor_value = sensor.value()
self.blink_led(sensor_value)
self.set_sensor_value(sensor_value)
Ein Sensor besitzt einen Namen, eine PIN Nummer und eine LED Nummer, falls man den Status des Sensors mit einer LED anzeigen will, kann diese automatisch mit an / aus reagieren.
Mit 0, 0 können auch weitere Werte, z.B. von anderen Boards veröffentlicht werden, die keinen PIN abfragen.
Der Webserver:
# json_webserver.py
import usocket as socket
import ujson as json
import time
def json_response(c):
data = {}
for sensor_name in c.get_sensors():
data[sensor_name] = c.get_sensor(sensor_name).get_sensor_value()
html = json.dumps(data)
return html
def webserver(c):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 8081))
s.listen(5)
while True:
try:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
conn.settimeout(None)
request = str(request)
response = json_response(c)
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: application/json\n')
conn.send('Connection: close\n\n')
conn.sendall(response)
conn.close()
except OSError as e:
conn.close()
print('Connection closed')
Nun muss man das Ganze noch aufrufen:
from sensors import Sensors
from json_webserver import webserver
sensors = Sensors()
sensors.add_sensor("rechts", 0, 0)
sensors.add_sensor("links", 0, 0)
sensors.start(100)
webserver(sensors)
An den Webserver werden alle registrierten Sensoren übergeben. Alle 100ms aktualisiert der ESP32 die Werte der Sensoren an der Schnittstelle.
Die REST Schnittstelle kann man nun mit:
import requests
import json
r = requests.get("http://espressif:8081/")
r.content
result = json.loads(r.content)
result
abfragen und verarbeiten.
Link: