-
Notifications
You must be signed in to change notification settings - Fork 54
/
node.go
174 lines (149 loc) · 3.51 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package configer
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
tp "github.com/henrylee2cn/teleport"
"github.com/xiaoenai/tp-micro/model/etcd"
)
// InitNode initializes the config node.
func InitNode(etcdClient *etcd.Client) {
globalNodes = &Nodes{
nodeMap: make(map[string]*Node),
}
var err error
globalNodes.etcdSession, err = etcd.NewSession(etcdClient)
if err != nil {
tp.Fatalf("Initialization of the global node failed: %s", err.Error())
}
}
// SyncNode registers a configuration template to etcd,
// and update it when monitoring the configuration changes.
func SyncNode(service, version string, cfg Config) {
globalNodes.mustAdd(service, version, cfg)
}
// Nodes config node handlers
type Nodes struct {
nodeMap map[string]*Node
etcdSession *etcd.Session
rwMutex sync.RWMutex
}
var globalNodes *Nodes
func (n *Nodes) mustAdd(service, version string, cfg Config) {
must(n.add(service, version, cfg))
}
func (n *Nodes) add(service, version string, cfg Config) (err error) {
n.rwMutex.Lock()
defer n.rwMutex.Unlock()
key := NewKey(service, version)
if _, ok := n.nodeMap[key]; ok {
return fmt.Errorf("Repeat the registration configuration: %s", key)
}
cfgBytes, _ := cfg.MarshalJSON()
node := &Node{
key: key,
object: cfg,
etcdMutex: etcd.NewLocker(n.etcdSession, key),
Initialized: false,
Config: string(cfgBytes),
doInitCh: make(chan error, 1),
nodes: n,
}
n.nodeMap[key] = node
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("etcd concurrency lock fail: %v", p)
}
}()
node.etcdMutex.Lock()
defer node.etcdMutex.Unlock()
resp, err := n.etcdSession.Client().Get(context.TODO(), key)
if err != nil {
return err
}
if len(resp.Kvs) > 0 {
err = node.bind(resp.Kvs[0].Value)
if node.Initialized {
go node.watch(n.etcdSession.Client())
return err
}
} else {
_, err = n.etcdSession.Client().Put(context.TODO(), key, node.String())
if err != nil {
return err
}
}
tp.Warnf("Wait for the configuration in the ETCD to be set: %s", key)
go node.watch(n.etcdSession.Client())
return node.waitInit()
}
// Node config node handler
type Node struct {
key string
object Config
// Config string
Config string `json:"config"`
// Is it initialized?
Initialized bool `json:"initialized"`
doInitCh chan error
etcdMutex sync.Locker
nodes *Nodes
}
func (n *Nodes) archive() {
os.Mkdir("./config", 0755)
r, err := os.OpenFile("./config/archive", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
tp.Warnf("Archive config error: %v", err)
return
}
b, _ := json.Marshal(n.nodeMap)
r.Write(b)
r.Close()
}
func (n *Node) bind(data []byte) error {
inited := n.Initialized
err := json.Unmarshal(data, n)
if err != nil {
return err
}
n.nodes.archive()
if inited {
err = n.object.Reload([]byte(n.Config))
} else {
err = n.object.UnmarshalJSON([]byte(n.Config))
if n.Initialized {
select {
case n.doInitCh <- err:
default:
}
}
}
return err
}
func (n *Node) waitInit() error {
return <-n.doInitCh
}
// String returns the encoding string
func (n *Node) String() string {
b, _ := json.Marshal(n)
return string(b)
}
func (n *Node) watch(etcdClient *etcd.Client) {
watcher := etcdClient.Watch(
context.TODO(),
n.key,
)
for wresp := range watcher {
for _, event := range wresp.Events {
if event.Type != etcd.EventTypePut {
continue
}
err := n.bind(event.Kv.Value)
if err != nil {
tp.Errorf("Binding configuration from etcd failed: %s", err)
}
}
}
}