added some documentation
This commit is contained in:
parent
682cea9b59
commit
80afbd6e47
6
API
6
API
@ -1,6 +0,0 @@
|
|||||||
Beispielaufrufe:
|
|
||||||
|
|
||||||
.../ # gibt xml config zurück
|
|
||||||
.../ file=config.xml # xml config hochladen
|
|
||||||
.../temp1
|
|
||||||
.../motor1 activate=true
|
|
@ -4,8 +4,8 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
#local imports
|
||||||
sys.path.append('/home/pi/aped_device')
|
sys.path.append('/home/pi/aped_device')
|
||||||
|
|
||||||
from xml_reader import Xml_reader
|
from xml_reader import Xml_reader
|
||||||
from ring_buffer import Ring_buffer
|
from ring_buffer import Ring_buffer
|
||||||
|
|
||||||
@ -48,7 +48,7 @@ class Interface_handler:
|
|||||||
GPIO_27 = 13
|
GPIO_27 = 13
|
||||||
|
|
||||||
def __init__(self, xml_reader: Xml_reader):
|
def __init__(self, xml_reader: Xml_reader):
|
||||||
"""inits the interface_handler
|
"""initializes the interface_handler
|
||||||
"""
|
"""
|
||||||
#type check xml_reader
|
#type check xml_reader
|
||||||
if not isinstance(xml_reader, Xml_reader):
|
if not isinstance(xml_reader, Xml_reader):
|
||||||
@ -63,9 +63,8 @@ class Interface_handler:
|
|||||||
self.init_gpio()
|
self.init_gpio()
|
||||||
self.init_buffer()
|
self.init_buffer()
|
||||||
|
|
||||||
|
|
||||||
def init_gpio(self):
|
def init_gpio(self):
|
||||||
"""initialises the GPIO pins
|
"""initializes the GPIO pins
|
||||||
"""
|
"""
|
||||||
for pwm in self.pwms:
|
for pwm in self.pwms:
|
||||||
if pwm != None:
|
if pwm != None:
|
||||||
@ -91,8 +90,12 @@ class Interface_handler:
|
|||||||
raise NotImplementedError('the protocol of the device %s is not implemented' % device_name)
|
raise NotImplementedError('the protocol of the device %s is not implemented' % device_name)
|
||||||
|
|
||||||
def init_buffer(self):
|
def init_buffer(self):
|
||||||
|
"""initializes the buffer thread
|
||||||
|
"""
|
||||||
|
#stop the buffer thread
|
||||||
if self.buffer_thread != None:
|
if self.buffer_thread != None:
|
||||||
self.buffer_thread.stop()
|
self.buffer_thread.stop()
|
||||||
|
#create the buffers
|
||||||
for device_name in self.xml_reader.get_device_names():
|
for device_name in self.xml_reader.get_device_names():
|
||||||
port_info = self.xml_reader.get_port(device_name)
|
port_info = self.xml_reader.get_port(device_name)
|
||||||
pins = port_info["pins"]
|
pins = port_info["pins"]
|
||||||
@ -101,27 +104,37 @@ class Interface_handler:
|
|||||||
for pin in pins:
|
for pin in pins:
|
||||||
self.buffers[self.Pin[pin].value] = Ring_buffer(buffer_size)
|
self.buffers[self.Pin[pin].value] = Ring_buffer(buffer_size)
|
||||||
self.buffer_device_names += [device_name]
|
self.buffer_device_names += [device_name]
|
||||||
|
#start the buffer thread
|
||||||
if len(self.buffer_device_names) > 0:
|
if len(self.buffer_device_names) > 0:
|
||||||
self.buffer_thread = threading.Thread(target=self.buffering)
|
self.buffer_thread = threading.Thread(target=self.buffering)
|
||||||
self.buffer_thread.start()
|
self.buffer_thread.start()
|
||||||
|
|
||||||
def read_buffer(self, device_name:str) -> dict:
|
def read_buffer(self, device_name:str) -> dict:
|
||||||
|
"""reads the current buffer content for a given device
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_name (str): the name of the device
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
NameError: the device cannot be found in the config.xml
|
||||||
|
ValueError: the device has no buffer
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: the buffer of the device
|
||||||
|
"""
|
||||||
if self.xml_reader.get_port(device_name) == None:
|
if self.xml_reader.get_port(device_name) == None:
|
||||||
raise NameError('the device with the name %s is unknown' % device_name)
|
raise NameError('the device with the name %s is unknown' % device_name)
|
||||||
|
|
||||||
returnValues = dict()
|
returnValues = dict()
|
||||||
|
|
||||||
port_info = self.xml_reader.get_port(device_name)
|
port_info = self.xml_reader.get_port(device_name)
|
||||||
protocol = port_info["protocol"]
|
protocol = port_info["protocol"]
|
||||||
pins = port_info["pins"]
|
pins = port_info["pins"]
|
||||||
|
|
||||||
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
||||||
if protocol in ["DO","DI","PWM"]:
|
if self.buffers[self.Pin[pin].value] != None:
|
||||||
returnValues.update({
|
returnValues.update({
|
||||||
pin : str(self.buffers[self.Pin[pin].value])
|
pin : str(self.buffers[self.Pin[pin].value])
|
||||||
})
|
})
|
||||||
else: #throw error for protocols without write functionality
|
else: #throw error for devices without buffer
|
||||||
raise ValueError("you can not read from device %s" % device_name)
|
raise ValueError("device %s does not have a buffer" % device_name)
|
||||||
return returnValues
|
return returnValues
|
||||||
|
|
||||||
|
|
||||||
@ -133,6 +146,7 @@ class Interface_handler:
|
|||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
NameError: the device cannot be found in the config.xml
|
NameError: the device cannot be found in the config.xml
|
||||||
|
ValueError: the device does not support reading
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: {'pin': 'value'}
|
dict: {'pin': 'value'}
|
||||||
@ -140,13 +154,10 @@ class Interface_handler:
|
|||||||
#check device_name is valid
|
#check device_name is valid
|
||||||
if self.xml_reader.get_port(device_name) == None:
|
if self.xml_reader.get_port(device_name) == None:
|
||||||
raise NameError('the device with the name %s is unknown' % device_name)
|
raise NameError('the device with the name %s is unknown' % device_name)
|
||||||
|
|
||||||
returnValues = dict()
|
returnValues = dict()
|
||||||
|
|
||||||
port_info = self.xml_reader.get_port(device_name)
|
port_info = self.xml_reader.get_port(device_name)
|
||||||
protocol = port_info["protocol"]
|
protocol = port_info["protocol"]
|
||||||
pins = port_info["pins"]
|
pins = port_info["pins"]
|
||||||
|
|
||||||
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
for pin in self.xml_reader.get_port(device_name)["pins"]:
|
||||||
if protocol in ["DO","DI"]:
|
if protocol in ["DO","DI"]:
|
||||||
returnValues.update({
|
returnValues.update({
|
||||||
@ -156,7 +167,7 @@ class Interface_handler:
|
|||||||
returnValues.update({
|
returnValues.update({
|
||||||
pin : str(self.pwm_dutycycles[self.Pin[pin].value])
|
pin : str(self.pwm_dutycycles[self.Pin[pin].value])
|
||||||
})
|
})
|
||||||
else: #throw error for protocols without write functionality
|
else: #throw error for protocols without read functionality
|
||||||
raise ValueError("you can not read from device %s" % device_name)
|
raise ValueError("you can not read from device %s" % device_name)
|
||||||
return returnValues
|
return returnValues
|
||||||
|
|
||||||
@ -180,11 +191,10 @@ class Interface_handler:
|
|||||||
#check device_name is valid
|
#check device_name is valid
|
||||||
if self.xml_reader.get_port(device_name) == None:
|
if self.xml_reader.get_port(device_name) == None:
|
||||||
raise NameError('the device with the name %s is unknown' % device_name)
|
raise NameError('the device with the name %s is unknown' % device_name)
|
||||||
|
|
||||||
port_info = self.xml_reader.get_port(device_name)
|
port_info = self.xml_reader.get_port(device_name)
|
||||||
protocol = port_info["protocol"]
|
protocol = port_info["protocol"]
|
||||||
pins = port_info["pins"]
|
pins = port_info["pins"]
|
||||||
|
#digital write
|
||||||
if protocol == "DO":
|
if protocol == "DO":
|
||||||
#type check input value
|
#type check input value
|
||||||
if not isinstance(output,dict):
|
if not isinstance(output,dict):
|
||||||
@ -197,6 +207,7 @@ class Interface_handler:
|
|||||||
value = GPIO.HIGH if output[pin] == '1' else GPIO.LOW
|
value = GPIO.HIGH if output[pin] == '1' else GPIO.LOW
|
||||||
GPIO.output(self.Pin[pin].value,value)
|
GPIO.output(self.Pin[pin].value,value)
|
||||||
return output
|
return output
|
||||||
|
#PWM write
|
||||||
elif protocol == "PWM":
|
elif protocol == "PWM":
|
||||||
if not isinstance(output,dict):
|
if not isinstance(output,dict):
|
||||||
raise TypeError("value must be a dictionary of GPIOs and values")
|
raise TypeError("value must be a dictionary of GPIOs and values")
|
||||||
@ -214,7 +225,9 @@ class Interface_handler:
|
|||||||
raise ValueError("you cannot write to device %s" % device_name)
|
raise ValueError("you cannot write to device %s" % device_name)
|
||||||
|
|
||||||
def buffering(self):
|
def buffering(self):
|
||||||
while True:
|
"""method that buffers the device outputs
|
||||||
|
"""
|
||||||
|
while True: #loop forever
|
||||||
for device_name in self.buffer_device_names:
|
for device_name in self.buffer_device_names:
|
||||||
output = self.read(device_name)
|
output = self.read(device_name)
|
||||||
port_info = self.xml_reader.get_port(device_name)
|
port_info = self.xml_reader.get_port(device_name)
|
||||||
|
@ -1,13 +1,31 @@
|
|||||||
class Ring_buffer:
|
class Ring_buffer:
|
||||||
def __init__(self,size):
|
"""manages a list in a ring buffer like manner
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, size: int):
|
||||||
|
"""initializes the ring buffer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
size (int): the size of the ring buffer
|
||||||
|
"""
|
||||||
self.size = size
|
self.size = size
|
||||||
self.data = []
|
self.data = []
|
||||||
|
|
||||||
def append(self, data):
|
def append(self, data):
|
||||||
|
"""appends an element to the ring buffer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data ([type]): the element to be added
|
||||||
|
"""
|
||||||
self.data.append(data)
|
self.data.append(data)
|
||||||
if len(self.data) == self.size:
|
if len(self.data) == self.size:
|
||||||
self.data.pop(0)
|
self.data.pop(0)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self) -> str:
|
||||||
|
"""string conversion method for the ring buffer
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: the content of the ring buffer as a string (JSON style)
|
||||||
|
"""
|
||||||
return_str = str(self.data)
|
return_str = str(self.data)
|
||||||
return return_str
|
return return_str
|
@ -1,4 +1,5 @@
|
|||||||
from lxml import etree
|
from lxml import etree
|
||||||
|
|
||||||
class Xml_reader:
|
class Xml_reader:
|
||||||
""" reader for the XML configuration """
|
""" reader for the XML configuration """
|
||||||
|
|
||||||
@ -12,12 +13,16 @@ class Xml_reader:
|
|||||||
Raises:
|
Raises:
|
||||||
SyntaxError: validity of XML file is checked with the XSD file
|
SyntaxError: validity of XML file is checked with the XSD file
|
||||||
"""
|
"""
|
||||||
#check the XML for validity using the XSD file
|
if not self.validate(xml_path,xsd_path): #validate XML file
|
||||||
if not self.validate(xml_path,xsd_path):
|
|
||||||
raise SyntaxError('the XML config file has an invalid syntax')
|
raise SyntaxError('the XML config file has an invalid syntax')
|
||||||
self.set_root(xml_path)
|
self.set_root(xml_path)
|
||||||
|
|
||||||
def set_root(self, xml_path: str):
|
def set_root(self, xml_path: str):
|
||||||
|
"""method to read and store the root element from the XML file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xml_path (str): the path to the XML file
|
||||||
|
"""
|
||||||
self.root = etree.parse(xml_path).getroot()
|
self.root = etree.parse(xml_path).getroot()
|
||||||
|
|
||||||
def validate(self, xml_path: str, xsd_path: str) -> bool:
|
def validate(self, xml_path: str, xsd_path: str) -> bool:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user