This repository has been archived by the owner on May 23, 2023. It is now read-only.
/
legacy_analytics.go
113 lines (93 loc) · 2.95 KB
/
legacy_analytics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analytics
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"path"
"github.com/apigee/istio-mixer-adapter/adapter/auth"
"istio.io/istio/mixer/pkg/adapter"
)
const (
axPath = "/axpublisher/organization/%s/environment/%s"
)
type legacyAnalytics struct {
client *http.Client
}
func (oa *legacyAnalytics) Start(env adapter.Env) error { return nil }
func (oa *legacyAnalytics) Close() {}
func (oa *legacyAnalytics) SendRecords(auth *auth.Context, records []Record) error {
axURL := *auth.ApigeeBase()
axURL.Path = path.Join(axURL.Path, fmt.Sprintf(axPath, auth.Organization(), auth.Environment()))
request, err := buildRequest(auth, records)
if request == nil || err != nil {
return err
}
body := new(bytes.Buffer)
json.NewEncoder(body).Encode(request)
req, err := http.NewRequest(http.MethodPost, axURL.String(), body)
if err != nil {
return err
}
req.SetBasicAuth(auth.Key(), auth.Secret())
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
auth.Log().Debugf("sending %d analytics records to: %s", len(records), axURL.String())
resp, err := oa.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
buf := bytes.NewBuffer(make([]byte, 0, resp.ContentLength))
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return err
}
respBody := buf.Bytes()
switch resp.StatusCode {
case 200:
auth.Log().Debugf("analytics accepted: %v", string(respBody))
return nil
default:
return fmt.Errorf("analytics rejected. status: %d, body: %s", resp.StatusCode, string(respBody))
}
}
func buildRequest(auth *auth.Context, incoming []Record) (*legacyRequest, error) {
if auth == nil || len(incoming) == 0 {
return nil, nil
}
if auth.Organization() == "" || auth.Environment() == "" {
return nil, fmt.Errorf("organization and environment are required in auth: %v", auth)
}
records := make([]Record, 0, len(incoming))
for _, record := range incoming {
records = append(records, record.ensureFields(auth))
}
return &legacyRequest{
Organization: auth.Organization(),
Environment: auth.Environment(),
Records: records,
}, nil
}
type legacyRequest struct {
Organization string `json:"organization"`
Environment string `json:"environment"`
Records []Record `json:"records"`
}
type legacyResponse struct {
Accepted int `json:"accepted"`
Rejected int `json:"rejected"`
}