-
Notifications
You must be signed in to change notification settings - Fork 0
/
ambf_raven_recorder.py
187 lines (141 loc) · 5.8 KB
/
ambf_raven_recorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Raven II Dual Platform Controller: control software for the Raven II robot. Copyright © 2023-2024 Yun-Hsuan Su,
Natalie Chalfant, Mai Bui, Sean Fabrega, and the Mount Holyoke Intelligent Medical Robotics Laboratory.
This file is a part of Raven II Dual Platform Controller.
Raven II Dual Platform Controller is free software: you can redistribute it and/or modify it under the terms of the
GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
Raven II Dual Platform Controller is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along with Raven II Dual Platform Controller.
If not, see <http://www.gnu.org/licenses/>.
ambf_raven_recorder.py
date: May 13, 2024
author: Natalie Chalfant, Mai Bui, Sean Fabrega
"""
import time
from datetime import datetime
import pandas as pd
class ambf_raven_recorder:
def __init__(self):
self.rs_df = None
self.ci_df = None
self.writer = None
self.file = None
self.start_time = None
def build_rs_headers(self):
"""
Creates an array of stings to be used as headers for raven state recording
"""
headers = ["time"]
# Headers for jpos
for i in range(16):
headers.append("jpos" + str(i)) # 7 numbers
# Headers for runlevel, sublevel, and last_seq
headers.extend(["runlevel", "sublevel", "last_seq"])
# Headers for type
for i in range(2):
headers.append("type" + str(i))
# Headers for pos
for i in range(6):
headers.append("pos" + str(i))
# Headers for ori
for i in range(18):
headers.append("ori" + str(i))
# Headers for ori_d
for i in range(18):
headers.append("ori_d" + str(i))
# Headers for pos_d
for i in range(6):
headers.append("pos_d" + str(i))
# Headers for encVals
for i in range(16):
headers.append("encVals" + str(i))
# Headers for dac_val
for i in range(16):
headers.append("dac_vals" + str(i))
# Headers for Tau
for i in range(16):
headers.append("Tau" + str(i))
# Headers for mpos
for i in range(16):
headers.append("mpos" + str(i))
# Headers for mvel
for i in range(16):
headers.append("mvel" + str(i))
# Headers for jvel for both arms
for i in range(16):
headers.append("jvel" + str(i))
# Headers for mpos_d
for i in range(16):
headers.append("mpos_d" + str(i))
# Headers for jpos_d
for i in range(16):
headers.append("jpos_d" + str(i))
# Headers for grasp_d
for i in range(2):
headers.append("grasp_d" + str(i))
# Headers for encoffsets
for i in range(16):
headers.append("encoffsets" + str(i))
# Headers for jac_vel
for i in range(12):
headers.append("jac_vel" + str(i))
# Placeholders for jac_f
for i in range(12):
headers.append("jac_f" + str(i))
return headers
def record_raven_status(self):
"""
Initializes the dataframe in a format matching the raven status rosbag recorder
"""
self.rs_df = pd.DataFrame(columns=self.build_rs_headers())
# self.start_time = time.time()
print("Now recording raven status")
def write_raven_status(self, raven):
"""
Writes the current raven status to the dataframe
"""
newline = raven.get_raven_status()
if self.start_time is None:
self.start_time = newline[0]
newline[0] -= self.start_time
self.rs_df.loc[len(self.rs_df.index)] = newline
def build_ci_headers(self):
"""
Creates an array of strings to be used as headers for controller inputs recording
"""
headers = ["Left Joystick X", "Left Joystick Y", "Left Trigger", "Left Bumper",
"Right Joystick X", "Right Joystick Y", "Right Trigger", "Right Bumper",
"A Button", "B Button", "X Button", "Y Button", "Back Button", "Start Button"]
return headers
def record_controller_inputs(self):
"""
Initializes the dataframe in a format for recording controller inputs
"""
self.ci_df = pd.DataFrame(columns=self.build_ci_headers())
print("Now recording controller inputs")
def write_controller_inputs(self, controller_inputs):
"""
Writes the current controller inputs to the dataframe
"""
formatted_controller_inputs = []
for i in range(len(controller_inputs)):
formatted_controller_inputs.extend(controller_inputs[i])
self.ci_df.loc[len(self.ci_df.index)] = formatted_controller_inputs
def stop_recording(self, filename="test"):
"""
Stops the recording by writing out the current dataframe a CSV to the specified file path
Args:
filename : the path to/filename of the csv to be saved
"""
filename_rs = filename + "_" + str(datetime.now()) + "_rs.csv"
filename_ci = filename + "_" + str(datetime.now()) + "_ci.csv"
if self.rs_df is not None:
self.rs_df.to_csv(filename_rs, encoding='utf-8', index=True)
self.rs_df = None
if self.ci_df is not None:
self.ci_df.to_csv(filename_ci, encoding='utf-8', index=False)
self.ci_df = None
self.start_time = None