-
Notifications
You must be signed in to change notification settings - Fork 0
/
gadgeteer.py
109 lines (85 loc) · 2.79 KB
/
gadgeteer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import daemon
import json
import os
import socket
import threading
import time
from sp_usb import GadgetManager
SOCKET_ADDR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), ".gadgeteer.socket"
)
SOCKET_UID = 0
SOCKET_GID = 1000
SOCKET_MODE = 0o770
def bind_socket():
try:
os.unlink(SOCKET_ADDR)
except OSError:
if os.path.exists(SOCKET_ADDR):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKET_ADDR)
os.chown(path=SOCKET_ADDR, uid=SOCKET_UID, gid=SOCKET_GID)
os.chmod(path=SOCKET_ADDR, mode=SOCKET_MODE)
return sock
def handle_get(dtarget, gdm=None):
if dtarget == "status":
return json.dumps({"type": "status", "msg": {"active": gdm.active}})
def handle_set(dtarget, dmsg={}, gdm=None):
if dtarget == "active":
if not dmsg:
return json.dumps(
{"type": "error", "msg": {"err_msg": "No msg dict set on request."}}
)
g_type = dmsg.pop("mode") if dmsg.get("mode") else ""
if not g_type or g_type == "none":
gdm.deactivate()
return json.dumps({"type": "status", "msg": {"active": ""}})
kw = dmsg
gdm.run(g_type, **kw)
if gdm.active != g_type:
return json.dumps({"type": "error", "msg": {"err_msg": "Reboot required"}})
return json.dumps({"type": "status", "msg": {"active": gdm.active}})
def controller(data, gdm=None):
dtype = data.get("type")
dtarget = data.get("target")
dmsg = data.get("msg")
if dtype and dtarget:
if dtype == "get":
return handle_get(dtarget, gdm=gdm)
elif dtype == "set":
return handle_set(dtarget, dmsg, gdm=gdm)
return None
def socket_handler(gdm):
while gdm.running:
try:
sock = bind_socket()
sock.listen(1)
while gdm.running:
conn, client_addr = sock.accept()
try:
data_bytes: bytes = conn.recv(1024)
data_string = json.loads(data_bytes)
response = controller(data_string, gdm=gdm)
conn.sendall(response.encode("utf-8"))
except json.decoder.JSONDecodeError:
pass
except ConnectionResetError:
pass
except BrokenPipeError:
pass
finally:
conn.close()
except socket.timeout:
pass
def main():
gdm = GadgetManager()
sh = threading.Thread(target=socket_handler, args=[gdm])
sh.daemon = True
sh.start()
# Threaded all the above so this can manage other tricks
while True:
pass
if __name__ == "__main__":
with daemon.DaemonContext():
main()