indication_driver

asyncio --> threading
This commit is contained in:
paul-loedige 2023-01-19 04:46:43 +00:00
parent b5dc9e3e73
commit 5197c767b0
3 changed files with 44 additions and 69 deletions

View File

@ -1,7 +1,7 @@
import neopixel import neopixel
from time import sleep from time import sleep
import asyncio
from typing import List from typing import List
from threading import Thread
import sys import sys
sys.path.append('.') sys.path.append('.')
@ -16,14 +16,15 @@ class IndicationDriver:
auto_write=False, auto_write=False,
pixel_order=neopixel.GRB pixel_order=neopixel.GRB
) )
self.clear() self._run_loop = True
self.indication_stop_event = asyncio.Event() self._indication_loop = None
self.stop()
def stop_loop(self): def stop(self):
self.indication_stop_event.set() if self._indication_loop:
self._run_loop = False
def clear(self): self._indication_loop.join()
self.pixels.fill((0,0,0)) self.pixels.fill((0,0,0))
self.pixels.show() self.pixels.show()
@ -35,17 +36,15 @@ class IndicationDriver:
raise ValueError raise ValueError
def indicate(self, trash_type: str): def indicate(self, trash_type: str):
self.clear() self.stop()
for idx in self._get_trash_location(trash_type): location = self._get_trash_location(trash_type)
def _indication_loop(self, location: List[int]):
for idx in location:
self.pixels[idx] = (0, 255, 0) self.pixels[idx] = (0, 255, 0)
self.pixels.show() self.pixels.show()
self._run_loop = True
async def indicate_async(self, trash_type: str): while self._run_loop:
self.clear()
self.indication_stop_event.clear()
self.indicate(trash_type)
location = self._get_trash_location(trash_type)
while not self.indication_stop_event.is_set():
for step in range (4): for step in range (4):
for idx in range(location[0]): for idx in range(location[0]):
if idx % 4 == step: if idx % 4 == step:
@ -53,5 +52,8 @@ class IndicationDriver:
else: else:
self.pixels[idx] = (0, 0, 0) self.pixels[idx] = (0, 0, 0)
self.pixels.show() self.pixels.show()
await asyncio.sleep(.1) sleep(.1)
self.clear()
self._indication_loop = Thread(target = _indication_loop, args = (self, location, ))
self._indication_loop.start()
sleep(1)

View File

@ -1,20 +1,12 @@
from time import sleep from time import sleep
import asyncio
import sys import sys
sys.path.append('..') sys.path.append('..')
from TREx.indication_driver import IndicationDriver from TREx.indication_driver import IndicationDriver
async def __main__():
task = asyncio.create_task(indication_driver.indicate_async("BOTTLE"))
await asyncio.sleep(5)
indication_driver.stop_loop()
await task
indication_driver.clear()
indication_driver = IndicationDriver() indication_driver = IndicationDriver()
asyncio.run(__main__())
sleep(2)
indication_driver.indicate("CAN") indication_driver.indicate("CAN")
sleep(2) sleep(5)
indication_driver.clear() indication_driver.indicate("INCOMBUSTIBLE")
sleep(5)
indication_driver.stop()

View File

@ -1,7 +1,6 @@
from time import sleep from time import sleep
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
from mfrc522 import SimpleMFRC522 from mfrc522 import SimpleMFRC522
import asyncio
import sys import sys
sys.path.append('..') sys.path.append('..')
@ -9,33 +8,15 @@ import TREx.config as config
from TREx.indication_driver import IndicationDriver from TREx.indication_driver import IndicationDriver
from TREx.rfid_driver import RFIDDriver from TREx.rfid_driver import RFIDDriver
reader = SimpleMFRC522()
async def __main__():
rfid_driver = RFIDDriver()
indication_driver = IndicationDriver() indication_driver = IndicationDriver()
indication_task = None
while True: while True:
id = await rfid_driver.get_next_id() id, _ = reader.read()
trash_category = None if id in config.RFID_LOOKUP_TABLE.keys():
try:
if indication_task is not None:
indication_driver.stop_loop()
await indication_task
trash_category = config.RFID_LOOKUP_TABLE[id] trash_category = config.RFID_LOOKUP_TABLE[id]
indication_task = asyncio.create_task(indication_driver.indicate_async(trash_category)) print(f"{id}: {trash_category}")
await asyncio.sleep(1) indication_driver.indicate(trash_category)
except KeyError: else:
indication_driver.stop_loop() indication_driver.stop()
print(id)
asyncio.run(__main__())
# while True:
# id, _ = reader.read()
# if id in config.RFID_LOOKUP_TABLE.keys():
# trash_category = config.RFID_LOOKUP_TABLE[id]
# print(f"{id}: {trash_category}")
# indication_driver.indicate(trash_category)
# else:
# print(id)
# sleep(1)
# GPIO.cleanup()