Skip to content

Commit

Permalink
cluster/http: add handleLoad
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 25, 2024
1 parent cf988c1 commit be9e3bb
Showing 1 changed file with 118 additions and 2 deletions.
120 changes: 118 additions & 2 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/cluster/rtls"
"github.com/vinceanalytics/vince/internal/cluster/store"
"github.com/vinceanalytics/vince/version"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -81,7 +82,7 @@ type Cluster interface {
GetAddresser

KV(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.KV_Request) error
Load(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.Load_Request) error
Load(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.Load_Request, timeout time.Duration, retries int) error
Remove(ctx context.Context, nodeAddr string, cred *v1.Credential, req *v1.RemoveNode_Request) error
Backup(ctx context.Context, br *v1.Backup_Request, nodeAddr string, cred *v1.Credential, dst io.Writer) error

Expand All @@ -100,6 +101,7 @@ type CredentialStore interface {
}

const (
defaultTimeout = 30 * time.Second

// VersionHTTPHeader is the HTTP header key for the version.
VersionHTTPHeader = "X-RQLITE-VERSION"
Expand Down Expand Up @@ -288,9 +290,70 @@ func (s *Service) handleApiEvent(w http.ResponseWriter, r *http.Request, params
func (s *Service) handleEvent(w http.ResponseWriter, r *http.Request, params QueryParams) {
}
func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request, params QueryParams) {}
func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, params QueryParams) {
if !s.CheckRequestPerm(r, v1.Credential_LOAD) {
w.WriteHeader(http.StatusUnauthorized)
return
}

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx := r.Context()
lr := &v1.Load_Request{Data: b}
err = s.store.Load(ctx, lr)
if err == nil {
return
}
if !errors.Is(err, store.ErrNotLeader) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if s.DoRedirect(w, r, params) {
return
}

addr, err := s.store.LeaderAddr(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
if addr == "" {
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}

username, password, ok := r.BasicAuth()
if !ok {
username = ""
}

w.Header().Add(ServedByHTTPHeader, addr)
loadErr := s.cluster.Load(ctx, addr, makeCredentials(username, password), lr,
params.Timeout(defaultTimeout), params.Retries(0))
if loadErr != nil {
if loadErr.Error() == "unauthorized" {
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
} else {
http.Error(w, loadErr.Error(), http.StatusInternalServerError)
}
return
}
}

func (s *Service) doCtx(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, defaultTimeout)
}

func (s *Service) handleBoot(w http.ResponseWriter, r *http.Request, params QueryParams) {
if !s.CheckRequestPerm(r, v1.Credential_LOAD) {
w.WriteHeader(http.StatusUnauthorized)
Expand Down Expand Up @@ -341,6 +404,52 @@ func (s *Service) write(w http.ResponseWriter, msg proto.Message) {

}

// DoRedirect checks if the request is a redirect, and if so, performs the redirect.
// Returns true caller can consider the request handled. Returns false if the request
// was not a redirect and the caller should continue processing the request.
func (s *Service) DoRedirect(w http.ResponseWriter, r *http.Request, qp QueryParams) bool {
if !qp.Redirect() {
return false
}

rd, err := s.FormRedirect(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
http.Redirect(w, r, rd, http.StatusMovedPermanently)
}
return true
}

// FormRedirect returns the value for the "Location" header for a 301 response.
func (s *Service) FormRedirect(r *http.Request) (string, error) {
leaderAPIAddr := s.LeaderAPIAddr(r.Context())
if leaderAPIAddr == "" {
return "", ErrLeaderNotFound
}

rq := r.URL.RawQuery
if rq != "" {
rq = fmt.Sprintf("?%s", rq)
}
return fmt.Sprintf("%s%s%s", leaderAPIAddr, r.URL.Path, rq), nil
}

// LeaderAPIAddr returns the API address of the leader, as known by this node.
func (s *Service) LeaderAPIAddr(ctx context.Context) string {
nodeAddr, err := s.store.LeaderAddr(ctx)
if err != nil {
return ""
}
callCtx, cancel := s.doCtx(ctx)
defer cancel()
apiAddr, err := s.cluster.GetNodeAPIAddr(callCtx, nodeAddr)
if err != nil {
return ""
}
return apiAddr
}

func (s *Service) CheckRequestPerm(r *http.Request, perm v1.Credential_Permission) (b bool) {
// No auth store set, so no checking required.
if s.creds == nil {
Expand Down Expand Up @@ -436,3 +545,10 @@ func (s *Service) addAllowHeaders(w http.ResponseWriter) {
w.Header().Add(AllowCredentialsHeader, "true")
}
}

func makeCredentials(username, password string) *v1.Credential {
return &v1.Credential{
Username: username,
Password: password,
}
}

0 comments on commit be9e3bb

Please sign in to comment.