Skip to content

Commit

Permalink
cluster/http: add handleNodes
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 25, 2024
1 parent cd3409e commit 47aa735
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 36 deletions.
24 changes: 14 additions & 10 deletions gen/go/vince/v1/auth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 33 additions & 23 deletions gen/go/vince/v1/raft.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions internal/cluster/http/node.go
Original file line number Diff line number Diff line change
@@ -1 +1,63 @@
package http

import (
"context"
"sort"
"time"

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
)

// NewNodesFromServers creates a slice of Nodes from a slice of Servers.
func NewNodesFromServers(servers *v1.Server_List) *v1.Node_List {
nodes := &v1.Node_List{
Items: make([]*v1.Node, len(servers.Items)),
}
for i, s := range servers.Items {
nodes.Items[i] = &v1.Node{
Id: s.Id,
Addr: s.Addr,
Voter: s.Suffrage == v1.Server_Voter,
}
}
ls := nodes.Items
sort.Slice(ls, func(i, j int) bool {
return ls[i].Id < ls[j].Id
})
return nodes
}

// Test tests the node's reachability and leadership status. If an error
// occurs, the Error field will be populated.
func TestNode(ctx context.Context, n *v1.Node, ga GetAddresser, leaderAddr string, timeout time.Duration) {
start := time.Now()
n.Time = time.Since(start).Seconds()
n.TimeS = time.Since(start).String()
n.Reachable = false
n.Leader = false
doCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

apiAddr, err := ga.GetNodeAPIAddr(doCtx, n.Addr)
if err != nil {
n.Error = err.Error()
return
}
n.ApiAddr = apiAddr
n.Reachable = true
n.Leader = n.Addr == leaderAddr
}

func Voters(n *v1.Node_List) (o *v1.Node_List) {
o = &v1.Node_List{}
for _, e := range n.Items {
if e.Voter {
o.Items = append(o.Items, e)
}
}
ls := o.Items
sort.Slice(ls, func(i, j int) bool {
return ls[i].Id < ls[j].Id
})
return
}
115 changes: 112 additions & 3 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
Expand Down Expand Up @@ -83,7 +84,7 @@ type Cluster interface {

KV(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.KV_Request) error
Load(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.Load_Request, timeout time.Duration, retries int) error
Remove(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.RemoveNode_Request) error
Remove(ctx context.Context, nodeAddr string, cred *v1.Credential, timeout time.Duration, req *v1.RemoveNode_Request) error
Backup(ctx context.Context, br *v1.Backup_Request, nodeAddr string, cred *v1.Credential, timeout time.Duration, dst io.Writer) error

Realtime(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.Realtime_Request) (*v1.Realtime_Response, error)
Expand Down Expand Up @@ -289,8 +290,116 @@ func (s *Service) handleApiEvent(w http.ResponseWriter, r *http.Request, params
}
func (s *Service) handleEvent(w http.ResponseWriter, r *http.Request, params QueryParams) {
}
func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, params QueryParams) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

if !s.CheckRequestPerm(r, v1.Credential_STATUS) {
w.WriteHeader(http.StatusUnauthorized)
return
}

if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
ctx := r.Context()
servers, err := s.store.Nodes(ctx)
if err != nil {
statusCode := http.StatusInternalServerError
if err == store.ErrNotOpen {
statusCode = http.StatusServiceUnavailable
}
http.Error(w, fmt.Sprintf("store nodes: %s", err.Error()), statusCode)
return
}
nodes := NewNodesFromServers(servers)
if !params.NonVoters() {
nodes = Voters(nodes)
}
// Now test the nodes
lAddr, err := s.store.LeaderAddr(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
var wg sync.WaitGroup
timeout := params.Timeout(defaultTimeout)
for _, n := range nodes.Items {
n := n
wg.Add(1)
go func() {
defer wg.Done()
TestNode(ctx, n, s.cluster, lAddr, timeout)
}()
}
wg.Wait()
s.write(w, nodes)
}
func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request, params QueryParams) {
if !s.CheckRequestPerm(r, v1.Credential_REMOVE) {
w.WriteHeader(http.StatusUnauthorized)
return
}

if r.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rn := &v1.RemoveNode_Request{}
err = protojson.Unmarshal(b, rn)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
ctx := r.Context()
err = s.store.Remove(ctx, rn)
if err == nil {
return
}
if !errors.Is(err, store.ErrNotLeader) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if s.DoRedirect(w, r, params) {
return
}

addr, err := s.store.LeaderAddr(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
if addr == "" {
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}

username, password, ok := r.BasicAuth()
if !ok {
username = ""
}

w.Header().Add(ServedByHTTPHeader, addr)
removeErr := s.cluster.Remove(ctx, addr, makeCredentials(username, password),
params.Timeout(defaultTimeout), rn)
if err != nil {
if removeErr.Error() == "unauthorized" {
http.Error(w, "remote backup not authorized", http.StatusUnauthorized)
} else {
http.Error(w, removeErr.Error(), http.StatusInternalServerError)
}
return

}
}
func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request, params QueryParams) {
if !s.CheckRequestPerm(r, v1.Credential_BACKUP) {
w.WriteHeader(http.StatusUnauthorized)
Expand Down
1 change: 1 addition & 0 deletions proto/vince/v1/auth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ message Credential {
READY = 6;
BACKUP = 7;
LOAD = 8;
REMOVE = 9;
}
}
1 change: 1 addition & 0 deletions proto/vince/v1/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message Node {
double time = 6;
string time_s = 7;
string error = 8;
bool leader = 9;

message List { repeated Node items = 1; }
}
Expand Down

0 comments on commit 47aa735

Please sign in to comment.