From 5197c767b009c73a576ce629705d16dc49876dca Mon Sep 17 00:00:00 2001 From: ploedige Date: Thu, 19 Jan 2023 04:46:43 +0000 Subject: [PATCH] indication_driver asyncio --> threading --- indication_driver.py | 56 ++++++++++++++++++++------------------- test_scripts/led_test.py | 16 +++-------- test_scripts/rfid_test.py | 41 ++++++++-------------------- 3 files changed, 44 insertions(+), 69 deletions(-) diff --git a/indication_driver.py b/indication_driver.py index aa718d1..eeb1f4c 100644 --- a/indication_driver.py +++ b/indication_driver.py @@ -1,7 +1,7 @@ import neopixel from time import sleep -import asyncio from typing import List +from threading import Thread import sys sys.path.append('.') @@ -16,17 +16,18 @@ class IndicationDriver: auto_write=False, pixel_order=neopixel.GRB ) - self.clear() - self.indication_stop_event = asyncio.Event() + self._run_loop = True + self._indication_loop = None + self.stop() - def stop_loop(self): - self.indication_stop_event.set() - - def clear(self): + def stop(self): + if self._indication_loop: + self._run_loop = False + self._indication_loop.join() self.pixels.fill((0,0,0)) self.pixels.show() - + def _get_trash_location(self, trash_type: str) -> List[int]: if trash_type in config.TRASH_LOCATION: center_location = config.TRASH_LOCATION[trash_type] @@ -35,23 +36,24 @@ class IndicationDriver: raise ValueError def indicate(self, trash_type: str): - self.clear() - for idx in self._get_trash_location(trash_type): - self.pixels[idx] = (0, 255, 0) - self.pixels.show() - - async def indicate_async(self, trash_type: str): - self.clear() - self.indication_stop_event.clear() - self.indicate(trash_type) + self.stop() location = self._get_trash_location(trash_type) - while not self.indication_stop_event.is_set(): - for step in range (4): - for idx in range(location[0]): - if idx % 4 == step: - self.pixels[idx] = (0, 0, 255) - else: - self.pixels[idx] = (0, 0, 0) - self.pixels.show() - await asyncio.sleep(.1) - self.clear() \ No newline at end of file + + def _indication_loop(self, location: List[int]): + for idx in location: + self.pixels[idx] = (0, 255, 0) + self.pixels.show() + self._run_loop = True + while self._run_loop: + for step in range (4): + for idx in range(location[0]): + if idx % 4 == step: + self.pixels[idx] = (0, 0, 255) + else: + self.pixels[idx] = (0, 0, 0) + self.pixels.show() + sleep(.1) + + self._indication_loop = Thread(target = _indication_loop, args = (self, location, )) + self._indication_loop.start() + sleep(1) \ No newline at end of file diff --git a/test_scripts/led_test.py b/test_scripts/led_test.py index 0575a06..79a54b6 100644 --- a/test_scripts/led_test.py +++ b/test_scripts/led_test.py @@ -1,20 +1,12 @@ from time import sleep -import asyncio import sys sys.path.append('..') from TREx.indication_driver import IndicationDriver -async def __main__(): - task = asyncio.create_task(indication_driver.indicate_async("BOTTLE")) - await asyncio.sleep(5) - indication_driver.stop_loop() - await task - indication_driver.clear() - indication_driver = IndicationDriver() -asyncio.run(__main__()) -sleep(2) indication_driver.indicate("CAN") -sleep(2) -indication_driver.clear() \ No newline at end of file +sleep(5) +indication_driver.indicate("INCOMBUSTIBLE") +sleep(5) +indication_driver.stop() \ No newline at end of file diff --git a/test_scripts/rfid_test.py b/test_scripts/rfid_test.py index e835e1c..b2ce7ba 100644 --- a/test_scripts/rfid_test.py +++ b/test_scripts/rfid_test.py @@ -1,7 +1,6 @@ from time import sleep import RPi.GPIO as GPIO from mfrc522 import SimpleMFRC522 -import asyncio import sys sys.path.append('..') @@ -9,33 +8,15 @@ import TREx.config as config from TREx.indication_driver import IndicationDriver from TREx.rfid_driver import RFIDDriver +reader = SimpleMFRC522() +indication_driver = IndicationDriver() -async def __main__(): - rfid_driver = RFIDDriver() - indication_driver = IndicationDriver() - indication_task = None - while True: - id = await rfid_driver.get_next_id() - trash_category = None - try: - if indication_task is not None: - indication_driver.stop_loop() - await indication_task - trash_category = config.RFID_LOOKUP_TABLE[id] - indication_task = asyncio.create_task(indication_driver.indicate_async(trash_category)) - await asyncio.sleep(1) - except KeyError: - indication_driver.stop_loop() - -asyncio.run(__main__()) - -# while True: -# id, _ = reader.read() -# if id in config.RFID_LOOKUP_TABLE.keys(): -# trash_category = config.RFID_LOOKUP_TABLE[id] -# print(f"{id}: {trash_category}") -# indication_driver.indicate(trash_category) -# else: -# print(id) -# sleep(1) -# GPIO.cleanup() \ No newline at end of file +while True: + id, _ = reader.read() + if id in config.RFID_LOOKUP_TABLE.keys(): + trash_category = config.RFID_LOOKUP_TABLE[id] + print(f"{id}: {trash_category}") + indication_driver.indicate(trash_category) + else: + indication_driver.stop() + print(id) \ No newline at end of file