implemented digital API including error handling when writing
This commit is contained in:
parent
011147bc4f
commit
37cb64a2ab
31
app.py
31
app.py
@ -4,26 +4,35 @@ from flask import request
|
|||||||
from flask_api import FlaskAPI
|
from flask_api import FlaskAPI
|
||||||
import RPi.GPIO as GPIO
|
import RPi.GPIO as GPIO
|
||||||
|
|
||||||
LEDS = {"green": 16, "red": 18}
|
from xml_reader import Xml_reader
|
||||||
GPIO.setmode(GPIO.BOARD)
|
from interface_handler import Interface_handler
|
||||||
GPIO.setup(LEDS["green"], GPIO.OUT)
|
|
||||||
GPIO.setup(LEDS["red"], GPIO.OUT)
|
xml_reader = Xml_reader('XML/config.xml','XML/config.xsd')
|
||||||
|
interface_handler = Interface_handler(xml_reader)
|
||||||
|
|
||||||
|
device_names = xml_reader.get_device_names()
|
||||||
|
|
||||||
app = FlaskAPI(__name__)
|
app = FlaskAPI(__name__)
|
||||||
|
|
||||||
@app.route('/', methods=["GET"])
|
@app.route('/', methods=["GET"])
|
||||||
def api_root():
|
def api_root():
|
||||||
return {
|
return {
|
||||||
"led_url": request.url + "led/(green | red)/",
|
"device_url": request.url + "device/<device_name>",
|
||||||
"led_url_POST": {"state": "(0 | 1)"}
|
"device_url_POST": {"output": "<value>"}
|
||||||
}
|
}
|
||||||
|
|
||||||
@app.route('/led/<color>/', methods=["GET", "POST"])
|
@app.route('/device/<device_name>/', methods=["GET", "POST"])
|
||||||
def api_leds_control(color):
|
def api_leds_control(device_name):
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
if color in LEDS:
|
# if color in LEDS:
|
||||||
GPIO.output(LEDS[color], int(request.data.get("state")))
|
# GPIO.output(LEDS[color], int(request.data.get("state")))
|
||||||
return {color: GPIO.input(LEDS[color])}
|
try:
|
||||||
|
response = interface_handler.write(device_name,request.data.get("output"))
|
||||||
|
return {'output': response}
|
||||||
|
except Exception as e:
|
||||||
|
return {'error': str(e)}
|
||||||
|
# return {color: GPIO.input(LEDS[color])}
|
||||||
|
return {'state': interface_handler.read(device_name)}
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(port=8080,host="0.0.0.0")
|
app.run(port=8080,host="0.0.0.0")
|
||||||
|
@ -95,9 +95,9 @@ class Interface_handler:
|
|||||||
pins = port_info["pins"]
|
pins = port_info["pins"]
|
||||||
|
|
||||||
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
||||||
if protocol == "DI":
|
if protocol == "DI" or "DO":
|
||||||
returnValues.update({
|
returnValues.update({
|
||||||
pin:GPIO.input(self.Pin[pin].value)
|
pin : GPIO.input(self.Pin[pin].value)
|
||||||
})
|
})
|
||||||
else: #throw error for protocols without write functionality
|
else: #throw error for protocols without write functionality
|
||||||
raise ValueError("you can not read from device %s" % device_name)
|
raise ValueError("you can not read from device %s" % device_name)
|
||||||
@ -105,19 +105,21 @@ class Interface_handler:
|
|||||||
|
|
||||||
|
|
||||||
def write(self, device_name:str, value):
|
def write(self, device_name:str, value):
|
||||||
"""writes a value to all the pins of a device
|
"""writes an output to a given device
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
device_name (string): name of the device
|
device_name (str): the name of the relevant device
|
||||||
value (float): the value to be written
|
value (<type depends on protocol>): the value to be written
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
NameError: device not found in config.xml
|
NameError: the device name could not be found in the config.xml
|
||||||
TypeError: value has wrong type
|
TypeError: the given Value has the wrong type for the protocol
|
||||||
|
ValueError: the value cannot be written to the device
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
value if everything went well, None otherwise
|
<type depends on protocol>: value if everything worked correctly
|
||||||
"""
|
"""
|
||||||
|
|
||||||
#check device_name is valid
|
#check device_name is valid
|
||||||
if self.xml_reader.get_port(device_name) == None:
|
if self.xml_reader.get_port(device_name) == None:
|
||||||
raise NameError('the device with the name %s is unknown' % device_name)
|
raise NameError('the device with the name %s is unknown' % device_name)
|
||||||
@ -129,10 +131,11 @@ class Interface_handler:
|
|||||||
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
||||||
if protocol == "DO":
|
if protocol == "DO":
|
||||||
#type check input value
|
#type check input value
|
||||||
if not value in [GPIO.HIGH,GPIO.LOW]:
|
if not value in ['0','1']:
|
||||||
raise ValueError("value must be GPIO.HIGH or GPIO.LOW")
|
raise TypeError("value must be '0' or '1'")
|
||||||
|
value = GPIO.HIGH if value == '1' else GPIO.LOW
|
||||||
GPIO.output(self.Pin[pin].value,value)
|
GPIO.output(self.Pin[pin].value,value)
|
||||||
return value
|
return GPIO.input(self.Pin[pin].value)
|
||||||
else: #throw error for protocols without write functionality
|
else: #throw error for protocols without write functionality
|
||||||
raise ValueError("you cannot write to device %s" % device_name)
|
raise ValueError("you cannot write to device %s" % device_name)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user