263 lines
9.5 KiB
Python
263 lines
9.5 KiB
Python
from enum import Enum
|
|
import RPi.GPIO as GPIO
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
#local imports
|
|
sys.path.append('/home/pi/aped_device')
|
|
from xml_reader import Xml_reader
|
|
from ring_buffer import Ring_buffer
|
|
|
|
BUFFER_SPEED = 0.01 #1/s
|
|
|
|
class Interface_handler:
|
|
"""handles the access to the various interfaces (e.g. GPIO)
|
|
"""
|
|
|
|
class Pin(Enum):
|
|
"""list of all usable pins
|
|
|
|
enables restricting methods to these pins
|
|
"""
|
|
GPIO_2 = 3
|
|
GPIO_3 = 5
|
|
GPIO_4 = 7
|
|
GPIO_5 = 29
|
|
GPIO_6 = 31
|
|
GPIO_7 = 26
|
|
GPIO_8 = 24
|
|
GPIO_9 = 21
|
|
GPIO_10 = 19
|
|
GPIO_11 = 23
|
|
GPIO_12 = 32
|
|
GPIO_13 = 33
|
|
GPIO_14 = 8
|
|
GPIO_15 = 10
|
|
GPIO_16 = 36
|
|
GPIO_17 = 11
|
|
GPIO_18 = 12
|
|
GPIO_19 = 35
|
|
GPIO_20 = 38
|
|
GPIO_21 = 40
|
|
GPIO_22 = 15
|
|
GPIO_23 = 16
|
|
GPIO_24 = 18
|
|
GPIO_25 = 22
|
|
GPIO_26 = 37
|
|
GPIO_27 = 13
|
|
|
|
def __init__(self, xml_reader: Xml_reader):
|
|
"""initializes the interface_handler
|
|
"""
|
|
#type check xml_reader
|
|
if not isinstance(xml_reader, Xml_reader):
|
|
raise TypeError('xml_reader must be an instance of XML_reader')
|
|
#initialise interface_handler
|
|
self.xml_reader = xml_reader
|
|
self.pwms = [None] * 41
|
|
self.pwm_dutycycles = [None] * 41
|
|
self.buffers = [None] * 41
|
|
self.buffer_device_names = []
|
|
self.buffer_thread = None
|
|
self.run_buffer = False
|
|
self.init_gpio()
|
|
self.init_buffer()
|
|
|
|
def init_gpio(self):
|
|
"""initializes the GPIO pins
|
|
"""
|
|
for pwm in self.pwms:
|
|
if pwm != None:
|
|
pwm.stop()
|
|
GPIO.setmode(GPIO.BOARD)
|
|
for device_name in self.xml_reader.get_device_names():
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
protocol = port_info["protocol"]
|
|
pins = port_info["pins"]
|
|
if port_info["frequency"] != '':
|
|
frequency = int(port_info["frequency"])
|
|
else:
|
|
frequency = 1
|
|
|
|
#setup pins
|
|
for pin in pins:
|
|
if protocol == "DI":
|
|
GPIO.setup(self.Pin[pin].value,GPIO.IN)
|
|
elif protocol =="DO":
|
|
GPIO.setup(self.Pin[pin].value,GPIO.OUT)
|
|
elif protocol =="PWM":
|
|
GPIO.setup(self.Pin[pin].value,GPIO.OUT)
|
|
pwm = GPIO.PWM(self.Pin[pin].value,frequency)
|
|
self.pwms[self.Pin[pin].value] = pwm
|
|
else:
|
|
raise NotImplementedError('the protocol of the'
|
|
+ 'device %s is not implemented' % device_name)
|
|
|
|
def init_buffer(self):
|
|
"""initializes the buffer thread
|
|
"""
|
|
#stop the buffer thread
|
|
self.run_buffer = False
|
|
while self.buffer_thread!=None and self.buffer_thread.is_alive():
|
|
time.sleep(BUFFER_SPEED)
|
|
self.run_buffer = True
|
|
#create the buffers
|
|
for device_name in self.xml_reader.get_device_names():
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
pins = port_info["pins"]
|
|
buffer_size = int(self.xml_reader.get_buffer_size(device_name))
|
|
if buffer_size != 0:
|
|
for pin in pins:
|
|
self.buffers[self.Pin[pin].value] = Ring_buffer(buffer_size)
|
|
self.buffer_device_names += [device_name]
|
|
#start the buffer thread
|
|
if len(self.buffer_device_names) > 0:
|
|
self.buffer_thread = threading.Thread(target=self.buffering)
|
|
self.buffer_thread.start()
|
|
|
|
def read_buffer(self, device_name:str) -> dict:
|
|
"""reads the current buffer content for a given device
|
|
|
|
Args:
|
|
device_name (str): the name of the device
|
|
|
|
Raises:
|
|
NameError: the device cannot be found in the config.xml
|
|
ValueError: the device has no buffer
|
|
|
|
Returns:
|
|
dict: the buffer of the device
|
|
"""
|
|
if self.xml_reader.get_port(device_name) == None:
|
|
raise NameError('the device with the name %s is unknown'
|
|
% device_name)
|
|
returnValues = dict()
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
protocol = port_info["protocol"]
|
|
pins = port_info["pins"]
|
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
|
if self.buffers[self.Pin[pin].value] != None:
|
|
returnValues.update({
|
|
pin : str(self.buffers[self.Pin[pin].value])
|
|
})
|
|
else: #throw error for devices without buffer
|
|
raise ValueError("device %s does not have a buffer"
|
|
% device_name)
|
|
return returnValues
|
|
|
|
|
|
def read(self, device_name:str) -> dict:
|
|
"""reads the current value from the pins of one device
|
|
|
|
Args:
|
|
device_name (string): the name of the device
|
|
|
|
Raises:
|
|
NameError: the device cannot be found in the config.xml
|
|
ValueError: the device does not support reading
|
|
|
|
Returns:
|
|
dict: {'pin': 'value'}
|
|
"""
|
|
#check device_name is valid
|
|
if self.xml_reader.get_port(device_name) == None:
|
|
raise NameError('the device with the name %s is unknown'
|
|
% device_name)
|
|
returnValues = dict()
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
protocol = port_info["protocol"]
|
|
pins = port_info["pins"]
|
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
|
if protocol in ["DO","DI"]:
|
|
returnValues.update({
|
|
pin : str(GPIO.input(self.Pin[pin].value))
|
|
})
|
|
elif protocol in ["PWM"]:
|
|
returnValues.update({
|
|
pin : str(self.pwm_dutycycles[self.Pin[pin].value])
|
|
})
|
|
else: #throw error for protocols without read functionality
|
|
raise ValueError("you can not read from device %s"
|
|
% device_name)
|
|
return returnValues
|
|
|
|
|
|
def write(self, device_name:str, output):
|
|
"""writes an output to a given device
|
|
|
|
Args:
|
|
device_name (str): the name of the relevant device
|
|
value (<type depends on protocol>): the value to be written
|
|
|
|
Raises:
|
|
NameError: the device name could not be found in the config.xml
|
|
TypeError: the given Value has the wrong type for the protocol
|
|
ValueError: the value cannot be written to the device
|
|
|
|
Returns:
|
|
<type depends on protocol>: value if everything worked correctly
|
|
"""
|
|
|
|
#check device_name is valid
|
|
if self.xml_reader.get_port(device_name) == None:
|
|
raise NameError('the device with the name %s is unknown'
|
|
% device_name)
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
protocol = port_info["protocol"]
|
|
pins = port_info["pins"]
|
|
#digital write
|
|
if protocol == "DO":
|
|
#type check input value
|
|
if not isinstance(output,dict):
|
|
raise TypeError(
|
|
"value must be a dictionary of GPIOs and values")
|
|
for pin in output:
|
|
if not pin in self.xml_reader.get_port(device_name)["pins"]:
|
|
raise ValueError(str("pin %s is not a part of device %s",
|
|
pin,device_name))
|
|
if not output[pin] in ['0','1']:
|
|
raise TypeError("value must be 1 or 0")
|
|
value = GPIO.HIGH if output[pin] == '1' else GPIO.LOW
|
|
GPIO.output(self.Pin[pin].value,value)
|
|
return output
|
|
#PWM write
|
|
elif protocol == "PWM":
|
|
if not isinstance(output,dict):
|
|
raise TypeError(
|
|
"value must be a dictionary of GPIOs and values")
|
|
for pin in output:
|
|
if not pin in self.xml_reader.get_port(device_name)["pins"]:
|
|
raise ValueError(str("pin %s is not a part of device %s",
|
|
pin,device_name))
|
|
dutycycle = int(output[pin])
|
|
self.pwm_dutycycles[self.Pin[pin].value] = dutycycle
|
|
if not dutycycle in range(0,101,1):
|
|
raise TypeError(
|
|
"value must be a whole number between 0 and 100")
|
|
self.pwms[self.Pin[pin].value].stop()
|
|
self.pwms[self.Pin[pin].value].start(dutycycle)
|
|
return output
|
|
else: #throw error for protocols without write functionality
|
|
raise ValueError("you cannot write to device %s" % device_name)
|
|
|
|
def buffering(self):
|
|
"""method that buffers the device outputs
|
|
"""
|
|
while self.run_buffer: #loop
|
|
for device_name in self.buffer_device_names:
|
|
output = self.read(device_name)
|
|
port_info = self.xml_reader.get_port(device_name)
|
|
protocol = port_info["protocol"]
|
|
pins = port_info["pins"]
|
|
time.sleep(BUFFER_SPEED)
|
|
for pin in pins:
|
|
ring_buffer: Ring_buffer = self.buffers[self.Pin[pin].value]
|
|
ring_buffer.append(output[pin])
|
|
|
|
|
|
# ##test
|
|
# interface_handler = Interface_handler(Xml_reader('XML/config.xml','XML/config.xsd'))
|
|
# print(interface_handler.read('example'))
|
|
# print(interface_handler.read('sensorarray'))
|
|
# # print(interface_handler.write('example',"test")) #shuld throw not writable error |