-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
117 lines (105 loc) · 4.16 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from datetime import datetime
from multiprocessing import Process
from asyncio import sleep, new_event_loop, set_event_loop
from kasa import SmartDevice, SmartBulb, SmartDimmer, SmartPlug
from logger import configure_logger
from globals import read_config
DEVICE_IPS, COLOR_VALUES, SCHEDULES, ROUTINES = read_config()
logger = configure_logger(__name__, logging.INFO)
# Wraps the call_api function with asyncio
def call_api_async(routine, device):
loop = new_event_loop()
set_event_loop(loop)
loop.run_until_complete(call_api(routine, device))
def execute_routine(routine, module="controller"):
devices = routine["Devices"]
# Create processes for each API call
processes = [Process(target=call_api_async, args=(routine, d)) for d in devices]
if module == "controller":
logger.info(
f"Executing Routine {routine['Schedule']} on {datetime.now().strftime('%A at %H:%M')}"
)
# Start all processes
for process in processes:
process.start()
# Wait for all processes to complete
for process in processes:
process.join()
if module == "webhook":
return 200
# API Calls
async def call_api(routine, device):
call = routine["Devices"][device]
type, colors, brightness, interval = (
call[k] for k in ["Type", "Colors", "Brightness", "Interval"]
)
transition = interval * 1000 # ms to seconds for smooth transition
b = SmartDevice(DEVICE_IPS[device])
await b.update()
if b.model == "HS220(US)":
b = SmartDimmer(DEVICE_IPS[device])
elif b.model == "KL125(US)":
b = SmartBulb(DEVICE_IPS[device])
elif b.model == "EP10(US)":
b = SmartPlug(DEVICE_IPS[device])
await b.update()
match type:
case "set_brightness":
if b.is_on:
await b.set_brightness(brightness=brightness, transition=transition)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Set Brightness | Brightness: {brightness}; Interval: {interval}"
)
else:
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Set Brightness | Device is off - No call issued"
)
pass
case "power_on":
if b.model == "EP10(US)":
await b.turn_on()
if b.is_on:
await b.set_brightness(brightness=brightness, transition=transition)
else:
await b.set_brightness(1)
await b.turn_on()
await b.set_brightness(brightness=brightness, transition=transition)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Power On | Interval: {interval}"
)
case "power_off":
await b.turn_off(transition=transition)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Power Off | Interval: {interval}"
)
case "toggle_power":
if b.is_on:
await b.turn_off(transition=transition)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Toggle Off | Interval: {interval}"
)
else:
if b.model == "EP10(US)":
await b.turn_on()
else:
await b.set_brightness(1)
await b.turn_on()
await b.set_brightness(brightness=brightness, transition=transition)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Toggle On | Interval: {interval}"
)
case "smooth_rotate":
for c in colors:
hue, sat = COLOR_VALUES[c]
val = brightness
await b.set_hsv(hue, sat, val, transition=transition)
await sleep(interval)
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | Rotate | Color: {c}; Brightness: {brightness}; Interval: {interval}"
)
case _:
logger.info(
f"POST {device}@{DEVICE_IPS[device]} | THIS DEVICE DID NOT MATCH A VALID TYPE."
)
pass