Skip to content

Commit

Permalink
Allow running without physical hardware
Browse files Browse the repository at this point in the history
  • Loading branch information
pzbitskiy committed Aug 6, 2024
1 parent c594082 commit 28592fa
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 40 deletions.
46 changes: 46 additions & 0 deletions src/media_center_kb/gpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""GPIO interface"""

from abc import ABC, abstractmethod


class GPioIf(ABC):
"""Base abstract class for GPIO operations"""

@property
@abstractmethod
def HIGH(self) -> int: # pylint: disable=invalid-name
"""HIGH level"""

@property
@abstractmethod
def LOW(self) -> int: # pylint: disable=invalid-name
"""LOW level"""

@abstractmethod
def input(self, pin: int) -> bool:
"""Read pin and return logical HIGH/LOW level"""

@abstractmethod
def output(self, pin: int, signal: int):
"""Set pin to HIGH/LOW logical level"""


class GPioNoOp(GPioIf):
"""GPIO noop"""

def __init__(self, pins):
self.pins = pins

@property
def HIGH(self):
return 1

@property
def LOW(self):
return 0

def input(self, _: int) -> bool:
return False

def output(self, _: int, __: int):
return
63 changes: 47 additions & 16 deletions src/media_center_kb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@
from ysp4000.ysp import Ysp4000

from media_center_kb.control import Controller
from media_center_kb.gpio import GPioNoOp
from media_center_kb.ha import ha_loop, SmartOutletHaDevice
from media_center_kb.kb import kb_event_loop
from media_center_kb.relays import RelayModule, Pins
from media_center_kb.rpi import GPio

try:
from media_center_kb.rpi import GPio

RAISED = None
except RuntimeError as ex:
RAISED = ex


def init_logging(level=None, **kwargs):
Expand Down Expand Up @@ -56,11 +63,6 @@ def shutdown(loop):
task.cancel()


async def noop_loop():
"""Noop implementation when no HA stuff available or needed"""
return


async def main():
"""init dependencies and run kb read loop"""
parser = argparse.ArgumentParser(description="App manager")
Expand All @@ -85,6 +87,30 @@ async def main():
type=argparse.FileType("rt", encoding="utf8"),
help="Verbose output",
)
parser.add_argument(
"--no-gpio",
dest="no_gpio",
action="store_true",
help="No GPIO device. Useful when tunning without physical relays/control board",
)
parser.add_argument(
"--no-kb",
dest="no_keyboard",
action="store_true",
help="No keyboard. Useful when tunning without physical controls",
)
parser.add_argument(
"--no-serial",
dest="no_serial",
action="store_true",
help="No serial port. Useful when tunning without connected serial port",
)
parser.add_argument(
"--no-ha",
dest="no_ha",
action="store_true",
help="No MQTT. Useful when tunning without MQTT+HA integration",
)
args = parser.parse_args()

verbose = False
Expand All @@ -102,22 +128,27 @@ async def main():
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(getattr(signal, signame), lambda: shutdown(loop))

if not args.no_gpio and RAISED:
raise RAISED

try:
gpio = GPio(Pins)
gpio = GPio(Pins) if not args.no_gpio else GPioNoOp(Pins)
relays = RelayModule(gpio, logging.getLogger("rly"))
ysp = Ysp4000(verbose=verbose)
shell = RestrictedShell()
controller = Controller(relays, ysp, shell)

ysp_coro = ysp.get_async_coro(loop)

extra_loop = noop_loop()
if mqtt_settings:
extra_loop = ha_loop(SmartOutletHaDevice(controller.devices, mqtt_settings))

await asyncio.gather(
kb_event_loop(controller.kb_handlers()), ysp_coro, extra_loop
)
coros = []
if not args.no_keyboard:
coros.append(kb_event_loop(controller.kb_handlers()))
if not args.no_serial:
coros.append(ysp.get_async_coro(loop))
if mqtt_settings and not args.no_ha:
coros.append(
ha_loop(SmartOutletHaDevice(controller.devices, mqtt_settings))
)

await asyncio.gather(*coros)
except asyncio.CancelledError:
logger.info("exiting main on cancel")
finally:
Expand Down
25 changes: 2 additions & 23 deletions src/media_center_kb/relays.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""
Relays related functionality:
- GPIO abstraction
- Relays abstraction
"""

from abc import ABC, abstractmethod
from enum import Enum
from types import SimpleNamespace

from media_center_kb.gpio import GPioIf


class _RelayPins(Enum):
RELAY1_PIN = 6
Expand All @@ -28,28 +29,6 @@ class _RelayPins(Enum):
Pins = _PIN_TO_RELAY.keys()


class GPioIf(ABC):
"""Base abstract class for GPIO operations"""

@property
@abstractmethod
def HIGH(self) -> int: # pylint: disable=invalid-name
"""HIGH level"""

@property
@abstractmethod
def LOW(self) -> int: # pylint: disable=invalid-name
"""LOW level"""

@abstractmethod
def input(self, pin: int) -> bool:
"""Read pin and return logical HIGH/LOW level"""

@abstractmethod
def output(self, pin: int, signal: int):
"""Set pin to HIGH/LOW logical level"""


class RelayIf(ABC):
"""Single relay interface"""

Expand Down
2 changes: 1 addition & 1 deletion src/media_center_kb/rpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import RPi.GPIO as GPIO # pylint: disable=consider-using-from-import

from .relays import GPioIf
from media_center_kb.gpio import GPioIf


# pylint: disable=no-member
Expand Down

0 comments on commit 28592fa

Please sign in to comment.