Skip to content

Commit

Permalink
cluster/http: complete service handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 26, 2024
1 parent d6412be commit c6fdd92
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 25 deletions.
54 changes: 29 additions & 25 deletions gen/go/vince/v1/auth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 95 additions & 0 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/bufbuild/protovalidate-go"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/cluster/rtls"
"github.com/vinceanalytics/vince/internal/cluster/store"
Expand All @@ -31,8 +32,14 @@ import (
var (
// ErrLeaderNotFound is returned when a node cannot locate a leader
ErrLeaderNotFound = errors.New("leader not found")

validator *protovalidate.Validator
)

func init() {
validator, _ = protovalidate.New(protovalidate.WithFailFast(true))
}

type ResultsError interface {
Error() string
IsAuthorized() bool
Expand All @@ -51,6 +58,8 @@ type Database interface {
type Store interface {
Database

Queue(ctx context.Context, event *v1.Event)

Committed(ctx context.Context) (uint64, error)

// Remove removes the node from the cluster.
Expand Down Expand Up @@ -566,9 +575,95 @@ func (s *Service) handleBreakdown(w http.ResponseWriter, r *http.Request, params
s.write(w, res)
}
func (s *Service) handleApiEvent(w http.ResponseWriter, r *http.Request, params QueryParams) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if !s.CheckRequestPerm(r, v1.Credential_EVENT) {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var ev v1.Event
err = protojson.Unmarshal(b, &ev)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
err = validator.Validate(&ev)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
s.store.Queue(r.Context(), &ev)
}
func (s *Service) handleEvent(w http.ResponseWriter, r *http.Request, params QueryParams) {
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Methods", http.MethodPost)
w.Header().Add("Access-Control-Allow-Headers", "Content-Type")
if !s.guard.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var ev v1.Event
err = protojson.Unmarshal(b, &ev)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if !s.guard.Accept(ev.D) {
w.Header().Set("x-vince-dropped", "1")
w.WriteHeader(http.StatusOK)
return
}
ev.Ip = remoteIP(r)
ev.Ua = r.UserAgent()
err = validator.Validate(&ev)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
s.store.Queue(r.Context(), &ev)
w.WriteHeader(http.StatusAccepted)
}

var remoteIPHeaders = []string{
"X-Real-IP", "X-Forwarded-For", "X-Client-IP",
}

func remoteIP(r *http.Request) string {
var raw string
for _, v := range remoteIPHeaders {
if raw = r.Header.Get(v); raw != "" {
break
}
}
if raw == "" && r.RemoteAddr != "" {
raw = r.RemoteAddr
}
var host string
host, _, err := net.SplitHostPort(raw)
if err != nil {
host = raw
}

ip := net.ParseIP(host)
if ip == nil {
return "-"
}
return ip.String()
}

func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, params QueryParams) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

Expand Down
4 changes: 4 additions & 0 deletions internal/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func Get(ctx context.Context) *protovalidate.Validator {
return ctx.Value(validatorKey{}).(*protovalidate.Validator)
}

func ValidateMsg() {

}

func Read(w http.ResponseWriter, r *http.Request, o proto.Message) bool {
ctx := r.Context()
if r.Header.Get("Content-Type") != "application/json" {
Expand Down
1 change: 1 addition & 0 deletions proto/vince/v1/auth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ message Credential {
BACKUP = 7;
LOAD = 8;
REMOVE = 9;
EVENT = 10;
}
}

0 comments on commit c6fdd92

Please sign in to comment.