-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.py
executable file
·82 lines (66 loc) · 2.83 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import schedule, time, logging
from datetime import datetime, timedelta
from suntime import Sun
from api import execute_routine
from logger import configure_logger
from globals import read_config
logger = configure_logger(__name__, logging.INFO)
# Scheduling
def schedule_continuous_routines(routine):
start = SCHEDULES[routine["Schedule"]]["Start"]
end = SCHEDULES[routine["Schedule"]]["End"]
delta = datetime.strptime(end, "%H:%M") - datetime.strptime(start, "%H:%M")
logger.debug(f"{routine} Start: {start}; End: {end}, Delta {delta}")
if delta < timedelta(): # Find future time if delta returns past one
delta = delta + timedelta(days=1)
schedule.every().day.at(start).until(delta).do(execute_routine, routine=routine)
def schedule_sun_routine(start, char, routine):
if char == None: # For when no offset
time = start
offset = 0
else:
time, offset = start.split(char) # For offset
offset = float(offset)
if char == "-": # Negate offset if requested
offset = -offset
if time.lower() == "sunrise":
final = (SUNRISE + timedelta(hours=offset)).strftime("%H:%M")
elif time.lower() == "sunset":
final = (SUNSET + timedelta(hours=offset)).strftime("%H:%M")
schedule.every().day.at(final).do(execute_routine, routine=routine)
logger.debug(f"{routine} Start: {final}; Offset: {offset}")
def schedule_onetime_routines(routine):
start = SCHEDULES[routine["Schedule"]]["Start"]
if "sun" in start.lower():
if "-" in start:
schedule_sun_routine(start, "-", routine)
elif "+" in start:
schedule_sun_routine(start, "+", routine)
else:
schedule_sun_routine(start, None, routine)
else:
schedule.every().day.at(start).do(execute_routine, routine=routine)
logger.debug(f"{routine} Start: {start}")
# Globals from config
DEVICE_IPS, COLOR_VALUES, SCHEDULES, ROUTINES = read_config()
sun = Sun(30.271041325306694, -97.74181978453979)
SUNRISE = sun.get_local_sunrise_time()
SUNSET = sun.get_local_sunset_time()
def main():
for r in ROUTINES:
if SCHEDULES[r["Schedule"]]["End"] == None:
schedule_onetime_routines(r)
logger.info(
f"Starting service on {datetime.now().strftime('%c')}\n\
Sunrise: {SUNRISE.strftime('%H:%M')}; Sunset: {SUNSET.strftime('%H:%M')}\n\
First run on {schedule.next_run().strftime('%A at %H:%M')}"
)
while True:
for r in ROUTINES:
if SCHEDULES[r["Schedule"]]["End"] != None:
schedule_continuous_routines(r) # Need to test this better
# logger.debug(f"{pformat(schedule.get_jobs())}\n")
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()