/
database.go
185 lines (162 loc) · 4.36 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package manager
import (
"bytes"
"encoding/json"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/cluster"
"github.com/portworx/kvdb"
)
const (
// ClusterDBKey is the key at which cluster info is store in kvdb
ClusterDBKey = "cluster/database"
)
type updateCallbackFn func(db *cluster.ClusterInfo) (bool, error)
func snapAndReadClusterInfo(snapshotPrefixes []string) (*cluster.ClusterInitState, error) {
kv := kvdb.Instance()
// To work-around a kvdb issue with watches, try snapshot in a loop
var (
collector kvdb.UpdatesCollector
err error
version uint64
snap kvdb.Kvdb
)
for i := 0; i < 3; i++ {
if i > 0 {
logrus.Infof("Retrying snapshot")
}
// Start the watch before the snapshot
collector, err = kvdb.NewUpdatesCollector(kv, "", 0)
if err != nil {
logrus.Errorf("Failed to start collector for cluster db: %v", err)
collector = nil
continue
}
// Create the snapshot
snap, version, err = kv.Snapshot(snapshotPrefixes, true)
if err != nil {
logrus.Errorf("Snapshot failed for cluster db: %v", err)
collector.Stop()
collector = nil
} else {
break
}
}
if err != nil {
return nil, err
}
logrus.Infof("Cluster db snapshot at: %v", version)
clusterDB, err := snap.Get(ClusterDBKey)
if err != nil && !strings.Contains(err.Error(), "Key not found") {
logrus.Warnln("Warning, could not read cluster database")
return nil, err
}
db := emptyClusterInfo()
state := &cluster.ClusterInitState{
ClusterInfo: &db,
InitDb: snap,
Version: version,
Collector: collector,
}
if clusterDB == nil || bytes.Compare(clusterDB.Value, []byte("{}")) == 0 {
logrus.Infoln("Cluster is uninitialized...")
return state, nil
}
if err := json.Unmarshal(clusterDB.Value, &db); err != nil {
logrus.Warnln("Fatal, Could not parse cluster database ", kv)
return state, err
}
return state, nil
}
func emptyClusterInfo() cluster.ClusterInfo {
return cluster.ClusterInfo{
Status: api.Status_STATUS_INIT,
NodeEntries: make(map[string]cluster.NodeEntry),
}
}
func unmarshalClusterInfo(kv *kvdb.KVPair) (cluster.ClusterInfo, uint64, error) {
db := emptyClusterInfo()
version := uint64(0)
if kv != nil {
version = kv.ModifiedIndex
}
if kv == nil || bytes.Compare(kv.Value, []byte("{}")) == 0 {
logrus.Infoln("Cluster is uninitialized...")
return db, version, nil
}
if err := json.Unmarshal(kv.Value, &db); err != nil {
logrus.Warnln("Fatal, Could not parse cluster database ", kv)
return db, version, err
}
return db, version, nil
}
func readClusterInfo() (cluster.ClusterInfo, uint64, error) {
kv, err := kvdb.Instance().Get(ClusterDBKey)
if err != nil && !strings.Contains(err.Error(), "Key not found") {
logrus.Warnln("Warning, could not read cluster database")
return emptyClusterInfo(), 0, err
}
return unmarshalClusterInfo(kv)
}
func lockAndUpdateDB(fn, lockID string, cb updateCallbackFn) (*kvdb.KVPair, error) {
kv := kvdb.Instance()
kvlock, err := kv.LockWithTimeout(clusterLockKey, lockID, 10 * time.Minute, kv.GetLockTimeout())
if err != nil {
logrus.Warnf("Unable to obtain cluster lock for %v op: %v", fn, err)
return nil, err
}
defer kv.Unlock(kvlock)
db, version, err := readClusterInfo()
if err != nil {
return nil, err
}
update, err := cb(&db)
if err != nil {
return nil, err
}
if !update {
return nil, nil
}
if version != 0 {
b, err := json.Marshal(db)
if err != nil {
logrus.Warnf("Fatal, Could not marshal cluster database to JSON: %v", err)
return nil, err
}
kvpInput := &kvdb.KVPair{
Key: ClusterDBKey,
Value: b,
ModifiedIndex: version,
}
return kv.CompareAndSet(kvpInput, kvdb.KVModifiedIndex, nil)
} else {
// key does not exists yet, call create
kvp, err := kv.Create(ClusterDBKey, db, 0)
if err == kvdb.ErrExist {
// key created, we may have lost the lock, retry
err = kvdb.ErrModified
}
return kvp, err
}
}
func updateDB(fn, lockID string, cb updateCallbackFn) error {
_, err := updateAndGetDB(fn, lockID, cb, true)
return err
}
func updateAndGetDB(
fn string,
lockID string,
cb updateCallbackFn,
retry bool,
) (*kvdb.KVPair, error) {
for {
if kvp, err := lockAndUpdateDB(fn, lockID, cb); err == kvdb.ErrModified && retry {
// compare and set failed, retry
continue
} else {
return kvp, err
}
}
}