APED_Device/interface_handler.py

143 lines
4.4 KiB
Python

from enum import Enum
import RPi.GPIO as GPIO
import sys
sys.path.append('/home/pi/aped_device')
from xml_reader import Xml_reader
class Interface_handler:
"""handles the access to the various interfaces (e.g. GPIO)
"""
class Pin(Enum):
"""list of all usable pins
enables restricting methods to these pins
"""
GPIO_2 = 3
GPIO_3 = 5
GPIO_4 = 7
GPIO_5 = 29
GPIO_6 = 31
GPIO_7 = 26
GPIO_8 = 24
GPIO_9 = 21
GPIO_10 = 19
GPIO_11 = 23
GPIO_12 = 32
GPIO_13 = 33
GPIO_14 = 8
GPIO_15 = 10
GPIO_16 = 36
GPIO_17 = 11
GPIO_18 = 12
GPIO_19 = 35
GPIO_20 = 38
GPIO_21 = 40
GPIO_22 = 15
GPIO_23 = 16
GPIO_24 = 18
GPIO_25 = 22
GPIO_26 = 37
GPIO_27 = 13
def __init__(self, xml_reader: Xml_reader):
"""inits the interface_handler
"""
#type check xml_reader
if not isinstance(xml_reader, Xml_reader):
raise TypeError('xml_reader must be an instance of XML_reader')
#initialise interface_handler
self.xml_reader = xml_reader
self.init_gpio()
def init_gpio(self):
"""initialises the GPIO pins
"""
GPIO.setmode(GPIO.BOARD)
for device_name in self.xml_reader.get_device_names():
port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"]
pins = port_info["pins"]
#setup pins
for pin in pins:
if protocol == "DI":
GPIO.setup(self.Pin[pin].value,GPIO.IN)
elif protocol == "DO":
GPIO.setup(self.Pin[pin].value,GPIO.OUT)
else:
raise NotImplementedError('the protocol of the device %s is not implemented' % device_name)
def read(self, device_name:str) -> dict:
"""reads the current value from the pins of one device
Args:
device_name (string): the name of the device
Raises:
NameError: the device cannot be found in the config.xml
Returns:
dict: {'pin': 'value'}
"""
#check device_name is valid
if self.xml_reader.get_port(device_name) == None:
raise NameError('the device with the name %s is unknown' % device_name)
returnValues = dict()
port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"]
pins = port_info["pins"]
for pin in self.xml_reader.get_port(device_name)["pins"]:
if protocol == "DI":
returnValues.update({
pin:GPIO.input(self.Pin[pin].value)
})
else: #throw error for protocols without write functionality
raise ValueError("you can not read from device %s" % device_name)
return returnValues
def write(self, device_name:str, value):
"""writes a value to all the pins of a device
Args:
device_name (string): name of the device
value (float): the value to be written
Raises:
NameError: device not found in config.xml
TypeError: value has wrong type
Returns:
value if everything went well, None otherwise
"""
#check device_name is valid
if self.xml_reader.get_port(device_name) == None:
raise NameError('the device with the name %s is unknown' % device_name)
port_info = self.xml_reader.get_port(device_name)
protocol = port_info["protocol"]
pins = port_info["pins"]
for pin in self.xml_reader.get_port(device_name)["pins"]:
if protocol == "DO":
#type check input value
if not value in [GPIO.HIGH,GPIO.LOW]:
raise ValueError("value must be GPIO.HIGH or GPIO.LOW")
GPIO.output(self.Pin[pin].value,value)
return value
else: #throw error for protocols without write functionality
raise ValueError("you cannot write to device %s" % device_name)
# ##test
# interface_handler = Interface_handler(Xml_reader('XML/config.xml','XML/config.xsd'))
# print(interface_handler.read('example'))
# print(interface_handler.read('sensorarray'))
# print(interface_handler.write('example',"test")) #shuld throw not writable error