import neopixel import time from typing import List from threading import Thread import sys sys.path.append('.') import config class IndicationDriver: def __init__(self): self.pixels = neopixel.NeoPixel( config.PIXEL_PIN, config.NUM_PIXELS, brightness= config.NEOPIXEL_BRIGHTNESS, auto_write=False, pixel_order=neopixel.GRB ) self._run_loop = True self._indication_loop = None self.trash_category = "" self.stop() def _clear(self): self.pixels.fill((0,0,0)) self.pixels.show() self.trash_category = "" def stop(self): if self._indication_loop: self._run_loop = False self._indication_loop.join() self._clear() def _get_trash_location(self, trash_type: str) -> List[int]: if trash_type in config.TRASH_LOCATION: center_location = config.TRASH_LOCATION[trash_type] return range(center_location-2, center_location+3) else: raise ValueError def indicate(self, trash_type: str): self.stop() self.trash_category = trash_type location = self._get_trash_location(trash_type) def _indication_loop(self, location: List[int]): for idx in location: self.pixels[idx] = (0, 255, 0) self.pixels.show() self._run_loop = True start_time = time.process_time() while self._run_loop and time.process_time() < start_time + 10: for step in range (4): for idx in range(location[0]): if idx % 4 == step: self.pixels[idx] = (0, 0, 255) else: self.pixels[idx] = (0, 0, 0) self.pixels.show() time.sleep(.1) self._clear() self._indication_loop = Thread(target = _indication_loop, args = (self, location, )) self._indication_loop.start() time.sleep(.1)