-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulation.py
476 lines (389 loc) · 19.7 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# 3rd-party packages
import os.path
import itertools
import pandas as pd
import copy
import matplotlib.pyplot as plt
import seaborn as sns
from timeit import default_timer as timer
# local packages
from spaces.factory import activeSpaces
from robots.factory import activeRobots
from planners.factory import availablePlanners
from factory.builder import Builder
from util.plots import savePlot
from spaces.graph import Graph
##
# @brief This class allows for the creation and running of a motion
# planning simulation based on the simulation type string
#
class Simulation:
def __init__(self, shouldSavePlots, basePlotDir):
self.shouldSavePlots = shouldSavePlots
self.basePlotDir = basePlotDir
##
# @brief Runs a set of simulations based on the simulation type and
# outputs the plots to the
#
# @param simType The simulation type string
#
def run(self, simType):
(configNames, configFileNames) = self.getConfigPaths(simType)
for (file, configName) in zip(configFileNames, configNames):
print(' ')
print('===============================')
print(configName)
print('===============================')
baseSaveFName = os.path.join(self.basePlotDir, configName)
if simType == 'polygonalRobot':
self.runRobotWithPlanner(robotType='POLYGONALROBOT',
plannerType=None,
runPlannerBenchmarking=False,
configFileName=file,
baseSaveFName=baseSaveFName)
if simType == 'gradient':
self.runRobotWithPlanner(robotType='POINTROBOT',
plannerType='GRADIENT',
runPlannerBenchmarking=False,
configFileName=file,
baseSaveFName=baseSaveFName)
if simType == 'wavefront':
self.runRobotWithPlanner(robotType='POINTROBOT',
plannerType='WAVEFRONT',
runPlannerBenchmarking=False,
configFileName=file,
baseSaveFName=baseSaveFName)
if simType == 'manipulator':
self.runRobotWithPlanner(robotType='MANIPULATOR',
plannerType='WAVEFRONT',
runPlannerBenchmarking=False,
configFileName=file,
baseSaveFName=baseSaveFName)
if simType == 'graphSearch':
self.runGraphSearch(file, baseSaveFName)
if (simType == 'prmPointRobot' or
simType == 'prmPointRobotBenchmark'):
doBench = (simType == 'prmPointRobotBenchmark')
self.runRobotWithPlanner(robotType='POINTROBOT',
plannerType='PRM',
runPlannerBenchmarking=doBench,
configFileName=file,
baseSaveFName=baseSaveFName)
##
# @brief Generic function to interface with the classes to run a
# motion planner on a robot in variety of environments
#
# @param robotType The robot type string
# @param plannerType The planner type string
# @param runPlannerBenchmarking Boolean flag indicating whether the
# planner should be benchmarked
# according to the settings in the
# simulation configuration files
# @param configFileName The configuration file name for the
# simulation
# @param baseSaveFName The base save file name for plots
#
def runRobotWithPlanner(self, robotType, plannerType,
runPlannerBenchmarking, configFileName,
baseSaveFName):
start = timer()
ssp = self.shouldSavePlots
confName = configFileName
# the workspace doesn't change for this simulation
currWorkspace = activeSpaces.get(robotSpaceType='WORKSPACE',
configFileName=confName,
shouldSavePlots=ssp,
baseSaveFName=baseSaveFName)
currRobot = activeRobots.get(robotType=robotType,
configFileName=confName,
workspace=currWorkspace,
shouldSavePlots=ssp,
baseSaveFName=baseSaveFName)
# guards against trying to plan for a robot that does not support any
# planners yet
currPlanner = None
if plannerType:
currPlanner = availablePlanners.get(plannerType=plannerType,
cSpace=currRobot.cSpace,
workspace=currWorkspace,
robot=currRobot,
configFileName=confName,
shouldSavePlots=ssp,
baseSaveFName=baseSaveFName)
# if benchmarking, use the same workspace, cspace, and planner,
# just adjust settings in planner for each experiment
if runPlannerBenchmarking:
data = self.runPlannerBenchmarking(planner=currPlanner,
robot=currRobot,
configFileName=confName)
(benchMarkingDF, pathValidityDF, benchParams) = data
plotTitle = plannerType + '_stats'
self.plotStatistics(benchMarkingDF=benchMarkingDF,
pathValidityDF=pathValidityDF,
benchParams=benchParams,
baseSaveFName=baseSaveFName,
plotTitle=plotTitle)
# execute robot with whatever planner is given, even if planner is
# still None
if not runPlannerBenchmarking:
currRobot.runAndPlot(planner=currPlanner, plotTitle='')
finish = timer()
computationTime = finish - start
print('Took', computationTime, 'seconds to complete simulation')
##
# @brief Generic function to run a robot in the same workspace many
# times by varying the planner / planner parameters and
# reporting statistical analysis of the runs
#
# @param planner The initialized planner object for the
# simulation scenario
# @param robot The Robot object to use
# @param configFileName The configuration file name
#
# @return (pandas data frame with the computation time and path length
# of each run for each parametric experimental setting
# specified in the configuration file (e.g.):
# computationTime pathLength n r smoothing
# 1.702000e-06 23.814193 200 0.5 True
# 6.310000e-07 21.638431 200 0.5 False
#
# pandas data frame with the number of valid paths per
# experimental parameter set and the number of times the
# planner tried to find a path:
# numValidPaths n r smoothing
# 0 200 0.5 True
# 0 200 1.0 False
#
# list of strings of the parameters varied in benchmarking)
#
def runPlannerBenchmarking(self, planner, robot, configFileName):
# need to load the config data here to extract simulation benchmarking
# parameters
configData = Builder.loadConfigData(configFileName)
numRunsOfPlannerPerSetting = configData['numRunsOfPlannerPerSetting']
# this sets which of the planner setting will be changed. We need to
# create all possible combinations of settings to use for all of the
# planner runs
parametersToVary = configData['paramterNamesToVary']
# builds a dict of variable names and the associated lists of
# parameters to use from the config file
allParams = dict((var, configData[var]) for var in parametersToVary)
# building a list of all possible combinations of values for each key
# in the dictionary of lists of parameter lists
keys, values = zip(*allParams.items())
experiments = [dict(zip(keys, v)) for v in itertools.product(*values)]
# now that we have a unique dictionary of planner settings for desired
# experiment, we now need to run each of these settings combinations
# numRunsOfPlannerPerSetting times, then average the statistics across
# all runs
data = []
pathValidityData = []
for experiment in experiments:
print('benchmarking the ', planner.plannerType, ' planner ',
numRunsOfPlannerPerSetting, ' times with:', experiment)
# we would like to count how many times each experiment produces a
# valid path to the goal in cspace
numValidPaths = 1
runInfo = {}
for i in range(0, numRunsOfPlannerPerSetting):
(benchmarkingInfo,
foundPath) = robot.runAndPlot(planner=planner,
plotPlannerOutput=False,
plotTitle='',
shouldBenchmark=True,
plannerConfigData=experiment)
# add the current experiment parameters to the dictionary for
# creating the data frame later
benchmarkingInfo.update(experiment)
data.append(benchmarkingInfo)
if foundPath:
numValidPaths += 1
# add how many times a valid path was found to the benchmarking
# info so we can later analyze the efficacy of our experimental
# settings
runInfo['numValidPaths'] = copy.deepcopy(numValidPaths)
runInfo['numTimesRun'] = numRunsOfPlannerPerSetting
runInfo.update(copy.deepcopy(experiment))
pathValidityData.append(runInfo)
# easier to do stat analysis with a dataframe
benchMarkingDF = pd.DataFrame(data)
pathValidityDF = pd.DataFrame(pathValidityData)
return (benchMarkingDF, pathValidityDF, parametersToVary)
##
# @brief Plots the statistics for the dataframes from the benchmark
# and path validity (how often the probabilistic planner is
# able to find a path)
#
# @param benchMarkingDF pandas data frame with the computation time
# and path length of each run for each
# parametric experimental setting specified in
# the configuration file (e.g.):
# computationTime pathLength n r smoothing
# 1.702000e-06 23.814193 200 0.5 True
# 6.310000e-07 21.638431 200 0.5 False
#
# @param pathValidityDF pandas data frame with the number of valid
# paths per experimental parameter set and the
# number of times the planner tried to find a
# path
# numValidPaths n r smoothing
# 0 200 0.5 True
# 0 200 1.0 False
#
# @param benchParams list of strings of the parameters varied in
# benchmarking
# @param baseSaveFName The base directory file name for output plot
# @param plotTitle The plot title
#
def plotStatistics(self, benchMarkingDF, pathValidityDF, benchParams,
baseSaveFName, plotTitle):
##
# Plotting boxplots
##
boxPlotsToMake = ['computationTimeInSeconds', 'pathLength']
# need to create a new, merged categorical data for boxplots
mergedParamsName = ', '.join(benchParams)
benchMarkingDF[mergedParamsName] = benchMarkingDF[benchParams].apply(
lambda x: ', '.join(x.astype(str)), axis=1)
pathValidityDF[mergedParamsName] = pathValidityDF[
benchParams].apply(lambda x: ', '.join(x.astype(str)), axis=1)
# Usual boxplot for each variable that was benchmarked
for plotVar in boxPlotsToMake:
# make it wider for the insanse length of xticklabels
fig = plt.figure(figsize=(10, 5))
plt.style.use("seaborn-darkgrid")
bp = sns.boxplot(data=benchMarkingDF,
x=mergedParamsName, y=plotVar)
sns.swarmplot(x=mergedParamsName, y=plotVar, data=benchMarkingDF,
color="grey")
# for readability of axis labels
bp.set_xticklabels(bp.get_xticklabels(), rotation=45, ha='right')
newPlotTitle = plotVar + '-' + plotTitle
plt.title('Benchmarking of Sampled Planner ' + plotVar)
savePlot(fig=fig, shouldSavePlots=self.shouldSavePlots,
baseSaveFName=baseSaveFName, plotTitle=newPlotTitle)
##
# Plotting path validity bar graph
##
# number of times a valid path was found
fig = plt.figure()
plt.style.use('seaborn-darkgrid')
bp = sns.barplot(x=mergedParamsName, y='numValidPaths',
data=pathValidityDF)
plt.title('Number of Valid Paths Found for Each Parameter Combination')
# for readability of axis labels
bp.set_xticklabels(bp.get_xticklabels(), rotation=45, ha='right')
newPlotTitle = 'numPaths' + '-' + plotTitle
savePlot(fig=fig, shouldSavePlots=self.shouldSavePlots,
baseSaveFName=baseSaveFName, plotTitle=newPlotTitle)
##
# @brief A function to interface with the graph class and demonstrate
# the performance of both the A* and Dijkstra optimal search
# algorithms
#
# @param configFileName The configuration file name for the
# simulation
# @param baseSaveFName The base save file name for plots
#
def runGraphSearch(self, configFileName, baseSaveFName):
# need to load the config data here to extract simulation benchmarking
# parameters
configData = Builder.loadConfigData(configFileName)
nodes = configData['nodes']
adjList = configData['edges']
startNode = configData['startNodeLabel']
goalNode = configData['goalNodeLabel']
# need to convert the configuration adjacency list given in the config
# to an edge list given as a 3-tuple of (source, dest, edgeAttrDict)
edgeList = []
for sourceEdge, destEdgesData in adjList.items():
if destEdgesData:
newEdges = [(sourceEdge, destEdgeData[0], destEdgeData[1]) for
destEdgeData in destEdgesData]
edgeList.extend(newEdges)
myLittleGraph = Graph(nodes.items(), edges=edgeList)
myLittleGraph.dispEdges()
# make a generic solution info printing function
def printShit(method, pathLength, numIter):
print('|-------- ', method, ' -------|')
if pathLength:
print('Path Length:', pathLength)
else:
print('No path exists')
print('Number of Dequeues to Find Path:', numIter)
# define a generic path printing function
def plotPath(method, graph, path, pathLength, nIter, shouldSavePlots,
baseSaveFName):
if path:
plotTitle = 'Shortest Path (length = ' + str(pathLength) + \
') Found with ' + method + ' - nIter: ' + \
str(nIter)
else:
plotTitle = 'No Path Found with ' + method + ' - nIter: ' + \
str(nIter)
(fig, _) = graph.plot(path, plotTitle=plotTitle, showAxes=False)
# tight plot layout messes up the graph
saveTitle = method + '_' + 'pathLength' + str(pathLength) + '_' + \
'nIter' + str(nIter)
savePlot(fig=fig, shouldSavePlots=shouldSavePlots,
baseSaveFName=baseSaveFName, plotTitle=saveTitle,
useTightLayout=False)
# now run and compare the performance statistics of A* to plain
# Dijkstra's
path = {}
pathLength = {}
numIter = {}
##
# A* Baby
##
method = 'A star'
(path[method],
pathLength[method],
numIter[method]) = myLittleGraph.findPathToGoal(start=startNode,
goal=goalNode,
method=method)
printShit(method, pathLength[method], numIter[method])
plotPath(method, myLittleGraph, path[method], pathLength[method],
numIter[method], self.shouldSavePlots, baseSaveFName)
##
# Dijkstra :'(
##
method = 'Dijkstra'
(path[method],
pathLength[method],
numIter[method]) = myLittleGraph.findPathToGoal(start=startNode,
goal=goalNode,
method=method)
printShit(method, pathLength[method], numIter[method])
plotPath(method, myLittleGraph, path[method], pathLength[method],
numIter[method], self.shouldSavePlots, baseSaveFName)
##
# @brief Gets the configuration file paths for the given sim type
#
# @param simType The simulation type string
#
# @return The configuration path strings in a list
#
def getConfigPaths(self, simType):
if simType == 'polygonalRobot':
configNames = ['WO_Rob_triangles', 'WO_Rob_triangles_no_rot']
elif simType == 'gradient':
configNames = ['env1', 'env2', 'env3']
elif simType == 'wavefront':
configNames = ['env2', 'env3']
elif simType == 'manipulator':
configNames = ['env1', 'env2', 'env3']
elif simType == 'graphSearch':
configNames = ['graph1']
elif simType == 'prmPointRobot':
configNames = ['env1', 'env2', 'env3']
elif simType == 'prmPointRobotBenchmark':
configNames = ['env1', 'env2', 'env3']
else:
raise ValueError(simType)
fullConfigNames = list(map(lambda x: simType + '_' + x, configNames))
configDir = 'config'
fType = '.yaml'
configFileNames = [os.path.join(configDir, fName) + fType
for fName in fullConfigNames]
return (fullConfigNames, configFileNames)