This repository has been archived by the owner on Sep 19, 2019. It is now read-only.
/
go-ari-proxy.go
370 lines (326 loc) · 11.8 KB
/
go-ari-proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// go-ari-proxy is an implementation of the go-ari-library used to
// connect to the Asterisk REST Interface for delivery of Events and
// Commands across a message bus.
package main
import (
"bytes"
"encoding/json"
"flag"
"github.com/nvisibleinc/go-ari-library"
"golang.org/x/net/websocket"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
// Var config contains a Config struct to hold the proxy configuration file.
var (
config Config // main proxy configuration structure
client = &http.Client{} // connection for Commands to ARI
proxyInstances *proxyInstanceMap // maps the per-dialog proxy instances
Debug *log.Logger
Info *log.Logger
Warning *log.Logger
Error *log.Logger
)
// signalCatcher is a function to allows us to stop the application through an
// operating system signal.
func signalCatcher() {
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT)
sig := <-ch
log.Printf("Signal received: %v", sig)
os.Exit(0)
}
// Init parses the configuration file by unmarshaling it into a Config struct.
func init() {
var err error
// Setup our logging interfaces
Debug = ari.InitLogger(os.Stdout, "DEBUG")
Info = ari.InitLogger(os.Stdout, "INFO")
Warning = ari.InitLogger(os.Stdout, "WARNING")
Error = ari.InitLogger(os.Stderr, "ERROR")
// parse the configuration file and get data from it
Info.Println("Loading configuration for proxy.")
configpath := flag.String("config", "./config.json", "Path to config file")
Debug.Println("Parsing the configuration file.")
flag.Parse()
Debug.Println("Reading in the configuration from the file.")
configfile, err := ioutil.ReadFile(*configpath)
if err != nil {
Error.Fatal(err)
}
// read in the configuration file and unmarshal the json, storing it in 'config'
Debug.Println("Unmarshaling proxy configuration.")
json.Unmarshal(configfile, &config)
Debug.Println(&config)
Debug.Println("Initialize the proxy instance map.")
proxyInstances = NewproxyInstanceMap() // initialize a new proxy instance map
}
func main() {
// Setup a new Event producer and Command consumer for every application
// we've configured in the configuration file.
Info.Println("Initializing the message bus.")
ari.InitBus(config.MessageBus, config.BusConfig)
for _, app := range config.Applications {
/*
Create a new producer which is responsible for the initial topic on the message bus which is used
to signal the setup of per-application instances. All applications listen to this topic in order to
be provided the information to setup the ownership of per dialog application instances.
*/
Info.Printf("Initializing signalling bus for application %s", app)
producer := ari.InitProducer(app) // Initialize a new producer channel using the ari.InitProducer function.
Info.Printf("Starting event handler for application %s", app)
go runEventHandler(app, producer) // create new websocket connection for every application and pass the producer channel
}
go signalCatcher() // listen for os signal to stop the application
select {}
}
// runEventHandler sets up a websocket connection to an ARI application.
func runEventHandler(s string, producer chan []byte) {
// Connect to the websocket backend (ARI)
var ariMessage string
url := strings.Join([]string{config.WebsocketURL, "?app=", s, "&api_key=", config.WSUser, ":", config.WSPassword}, "")
Info.Printf("Attempting to connect to ARI websocket at: %s", url)
ws, err := websocket.Dial(url, "ari", config.Origin)
if err != nil {
log.Fatal(err)
}
// Start the producer loop. Every message received from the websocket is
// passed to the PublishMessage() function.
Info.Printf("Starting producer loop for application %s", s)
for {
err = websocket.Message.Receive(ws, &ariMessage) // accept the message from the websocket
if err != nil {
log.Fatal(err)
}
go PublishMessage(ariMessage, producer) // publish message to the producer channel
}
}
// PublishMessage takes an ARI event from the websocket and places it on the
// producer channel.
// Accepts two arguments:
// * a string containing the ARI message
// * a producer channel
func PublishMessage(ariMessage string, producer chan []byte) {
// unmarshal into an ari.Event so we can append some extra information
var info eventInfo
var message ari.Event
var pi *proxyInstance
var exists bool = false
json.Unmarshal([]byte(ariMessage), &message)
json.Unmarshal([]byte(ariMessage), &info)
message.ServerID = config.ServerID
message.Timestamp = time.Now()
message.ARI_Body = ariMessage
switch {
case info.Type == "StasisStart":
// Check to see if the new channel was already in the map, which means it
// was created by an originate with ID
pi, exists = proxyInstances.Get(info.Channel.ID)
if exists {
break
}
// since we're starting a new application instance, create the proxy side
dialogID := ari.UUID()
Info.Println("New StasisStart found. Created new dialogID of ", dialogID)
as, err := json.Marshal(ari.AppStart{Application: info.Application, DialogID: dialogID, ServerID: config.ServerID})
producer <- as
// TODO: this sleep is required to allow the application time to spin up. In the future we likely want
// to implement some sort of feedback mechanism in order to remove this sleep timer.
time.Sleep(50 * time.Millisecond)
if err != nil {
return
}
Info.Printf("Created new proxy instance mapping for dialog '%s' and channel '%s'", dialogID, info.Channel.ID)
pi = NewProxyInstance(dialogID) // create new proxy instance for the dialog
proxyInstances.Add(info.Channel.ID, pi) // add the dialog to the proxyInstances map to track its life
exists = true
case info.Type == "StasisEnd":
Info.Printf("Ending application instance for channel '%s'", info.Channel.ID)
// on application end, perform clean up checks
pi, exists = proxyInstances.Get(info.Channel.ID)
if exists {
pi.removeAllObjects()
}
case info.Type == "BridgeDestroyed":
pi, exists = proxyInstances.Get(info.Bridge.ID)
if exists {
pi.removeObject(info.Bridge.ID)
}
case info.Type == "ChannelDestroyed":
pi, exists = proxyInstances.Get(info.Channel.ID)
if exists {
pi.removeObject(info.Channel.ID)
}
// check if prefix is part of the minChan{} struct
case strings.HasPrefix(info.Type, "Channel"):
pi, exists = proxyInstances.Get(info.Channel.ID)
// check if prefix is part of the minBridge{} struct
case strings.HasPrefix(message.Type, "Bridge"):
pi, exists = proxyInstances.Get(info.Bridge.ID)
// check if prefix is part of the minPlay{} struct
case strings.HasPrefix(message.Type, "Playback"):
pi, exists = proxyInstances.Get(info.Playback.ID)
// check if prefix is part of the minRec{} struct (this one uses a Name instead of ID for some reason)
case strings.HasPrefix(message.Type, "Recording"):
pi, exists = proxyInstances.Get(info.Recording.Name)
default:
Warning.Println("No handler for event type")
//pi, exists = proxyInstances[]
// if not matching, then we need to perform checks against the
// existing map to determine where to send this ARI message.
}
// marshal the message back into a string
busMessage, err := json.Marshal(message)
if err != nil {
Error.Println(err)
return
}
Debug.Printf("Bus Data:\n%s\n", busMessage)
// push the busMessage onto the producer channel
if exists {
pi.Events <- busMessage
}
}
// Add inserts a proxy instance into the global map of active proxy instances.
func (p *proxyInstanceMap) Add(id string, pi *proxyInstance) {
p.mapLock.Lock()
defer p.mapLock.Unlock()
p.instanceMap[id] = pi
}
// Get returns a proxy instance from the global map of active proxy instances.
// returns nil if not found.
func (p *proxyInstanceMap) Get(id string) (*proxyInstance, bool) {
p.mapLock.RLock()
defer p.mapLock.RUnlock()
pi, ok := p.instanceMap[id]
if !ok {
return nil, false
}
return pi, true
}
// Remove deletes an entry in the global proxyInstanceMap
func (p *proxyInstanceMap) Remove(id string) {
p.mapLock.Lock()
defer p.mapLock.Unlock()
delete(p.instanceMap, id)
}
// shutDown closes the quit channel to signal all of a ProxyInstance's goroutines
// to return
func (p *proxyInstance) shutDown() {
select {
case _, ok := (<-p.quit):
if !ok {
return
}
default:
close(p.quit)
}
}
// addObject adds an object reference to the proxyInstance mapping
func (p *proxyInstance) addObject(id string) {
for i := range p.ariObjects {
if p.ariObjects[i] == id {
//object already is associated with this proxyInstance
return
}
}
p.ariObjects = append(p.ariObjects, id)
proxyInstances.Add(id, p)
}
// removeObject removes an object reference from the proxyInstance mapping
func (p *proxyInstance) removeObject(id string) {
// remove an object from the map.
for i := range p.ariObjects {
if p.ariObjects[i] == id {
// rewrite the p.ariObjects string slice to append all values up to
// the index value of 'i', and all values of 'i'+1 and later.
p.ariObjects = append(p.ariObjects[:i], p.ariObjects[i+1:]...)
}
}
// remove the instance from our tracking map
proxyInstances.Remove(id)
// if there are no more objects, shut'rdown
if len(p.ariObjects) == 0 {
p.shutDown()
}
}
// removeAllObjects will remove all object references from the proxyInstance mapping
func (p *proxyInstance) removeAllObjects() {
// remove all objects from the map as our application is shutting down.
for _, obj := range p.ariObjects {
proxyInstances.Remove(obj)
}
p.shutDown() // destroy the application / proxy instance
}
// runCommandConsumer starts the consumer for accepting Commands from
// applications.
func (p *proxyInstance) runCommandConsumer(dialogID string) {
commandTopic := strings.Join([]string{"commands", dialogID}, "_")
responseTopic := strings.Join([]string{"responses", dialogID}, "_")
Debug.Println("Topics are:", commandTopic, " ", responseTopic)
p.responseChannel = ari.InitProducer(responseTopic)
// waits for the TopicExists function to return a channel
select {
case <-ari.TopicExists(commandTopic):
p.commandChannel = ari.InitConsumer(commandTopic)
case <-time.After(10 * time.Second):
// if the application instance hasn't come up after a period of time, gracefully end the proxy instance
p.removeAllObjects()
return
}
for {
select {
case jsonCommand := <-p.commandChannel:
go p.processCommand(jsonCommand, p.responseChannel)
case <-p.quit:
return
}
}
}
// processCommand processes commands from applications and submits them to the
// REST interface.
func (p *proxyInstance) processCommand(jsonCommand []byte, responseProducer chan []byte) {
var c ari.Command
var r ari.CommandResponse
i := ID{ID: "", Name: ""}
Debug.Printf("jsonCommand is %s\n", string(jsonCommand))
json.Unmarshal(jsonCommand, &c)
//TODO: Try to come up with something that makes me feel less dirty
if c.Method == "POST" && strings.Contains(c.URL, "/channels/") && strings.Count(c.URL, "/") == 2 {
chanID := strings.TrimPrefix(c.URL, "/channels/")
if chanID != "" {
p.addObject(chanID)
}
}
//ENDTODO
fullURL := strings.Join([]string{config.StasisURL, c.URL, "?api_key=", config.WSUser, ":", config.WSPassword}, "")
Debug.Printf("fullURL is %s\n", fullURL)
req, err := http.NewRequest(c.Method, fullURL, bytes.NewBufferString(c.Body))
req.Header.Set("Content-Type", "application/json")
res, err := client.Do(req)
buf := new(bytes.Buffer)
buf.ReadFrom(res.Body)
Debug.Printf("Response body is %s\n", buf.String())
json.Unmarshal(buf.Bytes(), &i)
if i.ID != "" {
p.addObject(i.ID)
} else if i.Name != "" {
p.addObject(i.Name)
}
r.ResponseBody = buf.String()
r.StatusCode = res.StatusCode
r.UniqueID = c.UniqueID // return the Command UID in the response
sendJSON, err := json.Marshal(r)
if err != nil {
Error.Println(err)
}
Debug.Printf("sendJSON is %s\n", string(sendJSON))
responseProducer <- sendJSON
}
// vim: tabstop=4 softtabstop=4 shiftwidth=4 noexpandtab tw=72