improved code style

This commit is contained in:
p.loedige 2021-01-07 19:28:34 +01:00
parent 80afbd6e47
commit a295ac644b
3 changed files with 71 additions and 25 deletions

55
app.py
View File

@ -6,26 +6,34 @@ import RPi.GPIO as GPIO
import shutil import shutil
import os import os
#local imports
from xml_reader import Xml_reader from xml_reader import Xml_reader
from interface_handler import Interface_handler from interface_handler import Interface_handler
# setup xml_reader and interface_handler
dir = os.path.dirname(os.path.abspath(__file__)) dir = os.path.dirname(os.path.abspath(__file__))
xml_reader = Xml_reader(str(dir + '/XML/config.xml'),str(dir + '/XML/config.xsd')) xml_reader = Xml_reader(str(dir + '/XML/config.xml'),
str(dir + '/XML/config.xsd'))
interface_handler = Interface_handler(xml_reader) interface_handler = Interface_handler(xml_reader)
device_names = xml_reader.get_device_names()
app = FlaskAPI(__name__) app = FlaskAPI(__name__)
@app.route('/', methods=["GET"]) @app.route('/', methods=["GET"])
def api_root(): def api_root():
return { """method for the root of the API
"device_url": request.url + "device/<device_name>",
"device_url_POST": {"output": "<value>"} Returns:
} html: a welcom message
"""
return "<h1>Success. You have reached the root of the APED API.</h1>"
@app.route('/XML/', methods=["GET","POST"]) @app.route('/XML/', methods=["GET","POST"])
def xml_transfer(): def xml_transfer():
"""method for transfering the config.xml files to and from the raspberry
Returns:
the config.xml or the occured errors
"""
try: try:
#return the current config.xml #return the current config.xml
if request.method == "GET": if request.method == "GET":
@ -40,9 +48,12 @@ def xml_transfer():
tmp_file.write(data) tmp_file.write(data)
tmp_file.close() tmp_file.close()
#check config.xml against the XSD file #check config.xml against the XSD file
if xml_reader.validate('/tmp/aped_device/config.xml',str(dir + '/XML/config.xsd')): if xml_reader.validate('/tmp/aped_device/config.xml',
shutil.move(str(dir + '/XML/config.xml'),str(dir + '/XML/config_old.xml')) str(dir + '/XML/config.xsd')):
shutil.move('/tmp/aped_device/config.xml',str(dir + '/XML/config.xml')) shutil.move(str(dir + '/XML/config.xml'),
str(dir + '/XML/config_old.xml'))
shutil.move('/tmp/aped_device/config.xml',
str(dir + '/XML/config.xml'))
xml_reader.set_root(str(dir + '/XML/config.xml')) xml_reader.set_root(str(dir + '/XML/config.xml'))
interface_handler.init_gpio() interface_handler.init_gpio()
else: else:
@ -50,13 +61,33 @@ def xml_transfer():
return {'error': ''} return {'error': ''}
except Exception as e: except Exception as e:
return {'error' : str(e)} return {'error' : str(e)}
@app.route('/device/', methods=["GET"])
def api_general_device_info():
"""method to read all the available device names
Returns:
JSON: the available device names
"""
return {"device_names": str(xml_reader.get_device_names())}
@app.route('/device/<device_name>/', defaults={'buffering': None}, methods=["GET", "POST"]) @app.route('/device/<device_name>/', defaults={'buffering': None},
methods=["GET", "POST"])
@app.route('/device/<device_name>/<buffering>/', methods=["GET", "POST"]) @app.route('/device/<device_name>/<buffering>/', methods=["GET", "POST"])
def api_devices(device_name: str, buffering: str): def api_devices(device_name: str, buffering: str):
"""the method for reading and writing to and from the connected devices
Args:
device_name (str): the name of the relevant device
buffering (str): whether or not the buffer of the device is requested
Returns:
the relevant information about the device in JSON format
"""
try: try:
if request.method == "POST": if request.method == "POST":
response = interface_handler.write(device_name,request.data.get("output")) response = interface_handler.write(device_name,
request.data.get("output"))
if buffering == 'buffer': if buffering == 'buffer':
return {'buffer': interface_handler.read_buffer(device_name)} return {'buffer': interface_handler.read_buffer(device_name)}
else: else:

View File

@ -74,7 +74,10 @@ class Interface_handler:
port_info = self.xml_reader.get_port(device_name) port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"] protocol = port_info["protocol"]
pins = port_info["pins"] pins = port_info["pins"]
frequency = int(port_info["frequency"]) if port_info["frequency"] != '' else 1 if port_info["frequency"] != '':
frequency = int(port_info["frequency"])
else:
frequency = 1
#setup pins #setup pins
for pin in pins: for pin in pins:
@ -87,7 +90,8 @@ class Interface_handler:
pwm = GPIO.PWM(self.Pin[pin].value,frequency) pwm = GPIO.PWM(self.Pin[pin].value,frequency)
self.pwms[self.Pin[pin].value] = pwm self.pwms[self.Pin[pin].value] = pwm
else: else:
raise NotImplementedError('the protocol of the device %s is not implemented' % device_name) raise NotImplementedError('the protocol of the'
+ 'device %s is not implemented' % device_name)
def init_buffer(self): def init_buffer(self):
"""initializes the buffer thread """initializes the buffer thread
@ -123,7 +127,8 @@ class Interface_handler:
dict: the buffer of the device dict: the buffer of the device
""" """
if self.xml_reader.get_port(device_name) == None: if self.xml_reader.get_port(device_name) == None:
raise NameError('the device with the name %s is unknown' % device_name) raise NameError('the device with the name %s is unknown'
% device_name)
returnValues = dict() returnValues = dict()
port_info = self.xml_reader.get_port(device_name) port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"] protocol = port_info["protocol"]
@ -134,7 +139,8 @@ class Interface_handler:
pin : str(self.buffers[self.Pin[pin].value]) pin : str(self.buffers[self.Pin[pin].value])
}) })
else: #throw error for devices without buffer else: #throw error for devices without buffer
raise ValueError("device %s does not have a buffer" % device_name) raise ValueError("device %s does not have a buffer"
% device_name)
return returnValues return returnValues
@ -153,7 +159,8 @@ class Interface_handler:
""" """
#check device_name is valid #check device_name is valid
if self.xml_reader.get_port(device_name) == None: if self.xml_reader.get_port(device_name) == None:
raise NameError('the device with the name %s is unknown' % device_name) raise NameError('the device with the name %s is unknown'
% device_name)
returnValues = dict() returnValues = dict()
port_info = self.xml_reader.get_port(device_name) port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"] protocol = port_info["protocol"]
@ -168,7 +175,8 @@ class Interface_handler:
pin : str(self.pwm_dutycycles[self.Pin[pin].value]) pin : str(self.pwm_dutycycles[self.Pin[pin].value])
}) })
else: #throw error for protocols without read functionality else: #throw error for protocols without read functionality
raise ValueError("you can not read from device %s" % device_name) raise ValueError("you can not read from device %s"
% device_name)
return returnValues return returnValues
@ -190,7 +198,8 @@ class Interface_handler:
#check device_name is valid #check device_name is valid
if self.xml_reader.get_port(device_name) == None: if self.xml_reader.get_port(device_name) == None:
raise NameError('the device with the name %s is unknown' % device_name) raise NameError('the device with the name %s is unknown'
% device_name)
port_info = self.xml_reader.get_port(device_name) port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"] protocol = port_info["protocol"]
pins = port_info["pins"] pins = port_info["pins"]
@ -198,10 +207,12 @@ class Interface_handler:
if protocol == "DO": if protocol == "DO":
#type check input value #type check input value
if not isinstance(output,dict): if not isinstance(output,dict):
raise TypeError("value must be a dictionary of GPIOs and values") raise TypeError(
"value must be a dictionary of GPIOs and values")
for pin in output: for pin in output:
if not pin in self.xml_reader.get_port(device_name)["pins"]: if not pin in self.xml_reader.get_port(device_name)["pins"]:
raise ValueError(str("pin %s is not a part of device %s",pin,device_name)) raise ValueError(str("pin %s is not a part of device %s",
pin,device_name))
if not output[pin] in ['0','1']: if not output[pin] in ['0','1']:
raise TypeError("value must be 1 or 0") raise TypeError("value must be 1 or 0")
value = GPIO.HIGH if output[pin] == '1' else GPIO.LOW value = GPIO.HIGH if output[pin] == '1' else GPIO.LOW
@ -210,14 +221,17 @@ class Interface_handler:
#PWM write #PWM write
elif protocol == "PWM": elif protocol == "PWM":
if not isinstance(output,dict): if not isinstance(output,dict):
raise TypeError("value must be a dictionary of GPIOs and values") raise TypeError(
"value must be a dictionary of GPIOs and values")
for pin in output: for pin in output:
if not pin in self.xml_reader.get_port(device_name)["pins"]: if not pin in self.xml_reader.get_port(device_name)["pins"]:
raise ValueError(str("pin %s is not a part of device %s",pin,device_name)) raise ValueError(str("pin %s is not a part of device %s",
pin,device_name))
dutycycle = int(output[pin]) dutycycle = int(output[pin])
self.pwm_dutycycles[self.Pin[pin].value] = dutycycle self.pwm_dutycycles[self.Pin[pin].value] = dutycycle
if not dutycycle in range(0,101,1): if not dutycycle in range(0,101,1):
raise TypeError("value must be a whole number between 0 and 100") raise TypeError(
"value must be a whole number between 0 and 100")
self.pwms[self.Pin[pin].value].stop() self.pwms[self.Pin[pin].value].stop()
self.pwms[self.Pin[pin].value].start(dutycycle) self.pwms[self.Pin[pin].value].start(dutycycle)
return output return output

View File

@ -82,7 +82,8 @@ class Xml_reader:
"Device[@name='%s']/Port" % device_name) "Device[@name='%s']/Port" % device_name)
return { return {
'protocol': port.get('protocol'), 'protocol': port.get('protocol'),
'frequency': port.get('frequency') if port.get('frequency') != None else '', 'frequency':
port.get('frequency') if port.get('frequency') != None else '',
'pins': [ 'pins': [
pin.text for pin in self.root.findall( pin.text for pin in self.root.findall(
"Device[@name='%s']/Port/Pin" % device_name "Device[@name='%s']/Port/Pin" % device_name