/
exportInfluxDb.go
103 lines (84 loc) · 2.83 KB
/
exportInfluxDb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"context"
"fmt"
"time"
clientinfluxdb "github.com/influxdata/influxdb/client/v2"
nest "github.com/patrickalin/nest-api-go"
)
type client struct {
in chan nest.Nest
c clientinfluxdb.Client
database string
}
func (c *client) sendnestToInfluxDB(nest nest.Nest) {
fmt.Printf("\n%s :> Send nest Data to InfluxDB\n", time.Now().Format(time.RFC850))
// Create a point and add to batch
tags := map[string]string{"nest": "living"}
fields := map[string]interface{}{
"TargetTemperature": nest.GetTargetTemperatureC(),
"AmbientTemperature": nest.GetAmbientTemperatureC(),
"Humidity": nest.GetHumidity(),
"Version": nest.GetSoftwareVersion(),
"Status": nest.GetAway(),
"Running": nest.GetAmbientTemperatureF() < nest.GetTargetTemperatureF(),
}
// Create a new point batch
bp, err := clientinfluxdb.NewBatchPoints(clientinfluxdb.BatchPointsConfig{
Database: c.database,
Precision: "s",
})
if err != nil {
log.Errorf("Error sent Data to Influx DB : %v", err)
}
pt, err := clientinfluxdb.NewPoint("nestData", tags, fields, time.Now())
bp.AddPoint(pt)
// Write the batch
err = c.c.Write(bp)
if err != nil {
err2 := c.createDB(c.database)
if err2 != nil {
log.Errorf("Check if InfluxData is running or if the database nest exists : %v", err)
}
}
}
func (c *client) createDB(InfluxDBDatabase string) error {
fmt.Println("Create Database nest in InfluxData")
query := fmt.Sprint("CREATE DATABASE ", InfluxDBDatabase)
q := clientinfluxdb.NewQuery(query, "", "")
fmt.Println("Query: ", query)
_, err := c.c.Query(q)
if err != nil {
return fmt.Errorf("Error with : Create database nest, check if InfluxDB is running : %v", err)
}
fmt.Println("Database nest created in InfluxDB")
return nil
}
func initClient(messagesnest chan nest.Nest, InfluxDBServer, InfluxDBServerPort, InfluxDBUsername, InfluxDBPassword, InfluxDatabase string) (*client, error) {
c, err := clientinfluxdb.NewHTTPClient(
clientinfluxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", InfluxDBServer, InfluxDBServerPort),
Username: InfluxDBUsername,
Password: InfluxDBPassword,
})
if err != nil || c == nil {
return nil, fmt.Errorf("Error creating database nest, check if InfluxDB is running : %v", err)
}
cl := &client{c: c, in: messagesnest, database: InfluxDatabase}
//need to check how to verify that the db is running
err = cl.createDB(InfluxDatabase)
checkErr(err, funcName(), "impossible to create DB", InfluxDatabase)
return cl, nil
}
// InitInfluxDB initiate the client influxDB
// Arguments nest informations, configuration from config file
// Wait events to send to influxDB
func (c *client) listen(context context.Context) {
go func() {
log.Info("Receive messagesnest to export InfluxDB")
for {
msg := <-c.in
c.sendnestToInfluxDB(msg)
}
}()
}