Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chenchao dev #424

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
*.tar.gz
*.tgz
tars-protocol
*go.sum
examples/*/go.sum
.DS_Store
36 changes: 36 additions & 0 deletions contrib/middleware/opentelemetry/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module github.com/TarsCloud/TarsGo/contrib/middleware/opentelemetry

go 1.18

require (
github.com/TarsCloud/TarsGo v1.3.9
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/metric v0.37.0
go.opentelemetry.io/otel/trace v1.14.0
)

require (
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.8.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
309 changes: 309 additions & 0 deletions contrib/middleware/opentelemetry/go.sum

Large diffs are not rendered by default.

208 changes: 208 additions & 0 deletions contrib/middleware/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package opentelemetry

import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"strings"
"time"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/res/requestf"
"github.com/TarsCloud/TarsGo/tars/util/current"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const (
instrumentationName = "github.com/TarsCloud/TarsGo/tars/middleware/opentelemetry"
TarsRpcRetKey = attribute.Key("tars.rpc.ret")
)

type Opentelemetry struct {
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
}

type Option func(*Opentelemetry)

func WithTracerProvider(tp trace.TracerProvider) Option {
return func(o *Opentelemetry) {
o.TracerProvider = tp
}
}

func WithPropagators(p propagation.TextMapPropagator) Option {
return func(o *Opentelemetry) {
o.Propagators = p
}
}

func New(opts ...Option) *Opentelemetry {
o := &Opentelemetry{
TracerProvider: otel.GetTracerProvider(),
Propagators: otel.GetTextMapPropagator(),
}
for _, opt := range opts {
opt(o)
}
return o
}

func (o *Opentelemetry) BuildServerFilter() tars.ServerFilterMiddleware {
localIp := getOutboundIP()
tracer := o.TracerProvider.Tracer(instrumentationName)
return func(next tars.ServerFilter) tars.ServerFilter {
return func(ctx context.Context, d tars.Dispatch, f interface{}, req *requestf.RequestPacket, resp *requestf.ResponsePacket, withContext bool) (err error) {
ip, _ := current.GetClientIPFromContext(ctx)
port, _ := current.GetClientPortFromContext(ctx)
var span trace.Span
ctx = o.extract(ctx, req)
servants := strings.Split(req.SServantName, ".")
ctx, span = tracer.Start(
ctx,
fmt.Sprintf("%s.%s", servants[2], req.SFuncName),
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("tars.interface", req.SServantName),
attribute.String("tars.method", req.SFuncName),
attribute.String("tars.local_ip", localIp),
attribute.Int("tars.request_id", int(req.IRequestId)),
attribute.String("tars.client.ipv4", ip),
attribute.String("tars.client.port", port),
attribute.String("tars.server.version", tars.Version),
),
)
defer span.End()
cfg := tars.GetServerConfig()
if cfg.Enableset {
span.SetAttributes(attribute.String("tars.set_division", cfg.Setdivision))
}
err = next(ctx, d, f, req, resp, withContext)
if err != nil {
span.SetStatus(codes.Error, "server failed")
span.RecordError(err)
span.SetAttributes(TarsRpcRetKey.Int64(int64(codes.Error)))
} else {
span.SetAttributes(TarsRpcRetKey.Int64(int64(resp.IRet)))
}
return err
}
}
}

func (o *Opentelemetry) extract(ctx context.Context, req *requestf.RequestPacket) context.Context {
if req.Status == nil {
req.Status = make(map[string]string)
}
return o.Propagators.Extract(ctx, propagation.MapCarrier(req.Status))
}

func (o *Opentelemetry) BuildHttpHandler() func(next http.Handler) http.Handler {
tracer := o.TracerProvider.Tracer(instrumentationName)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var span trace.Span
reqCtx := r.Context()
reqCtx = o.Propagators.Extract(reqCtx, propagation.HeaderCarrier(r.Header))
reqCtx, span = tracer.Start(
reqCtx,
r.URL.Path,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.scheme", r.URL.Scheme),
attribute.String("http.proto", r.Proto),
attribute.String("peer.hostname", r.Host),
attribute.String("peer.address", r.RemoteAddr),
attribute.String("component", "web"),
),
)
defer span.End()

r = r.WithContext(reqCtx)
recorder := httptest.NewRecorder()
next.ServeHTTP(recorder, r)
span.SetAttributes(attribute.Int("http.status", recorder.Code))
for k, v := range recorder.Result().Header {
w.Header()[k] = v
}
w.WriteHeader(recorder.Code)
_, err := w.Write(recorder.Body.Bytes())
if err != nil {
span.SetStatus(codes.Error, "http server write failed")
span.RecordError(err)
}
})
}
}

func (o *Opentelemetry) BuildClientFilter() tars.ClientFilterMiddleware {
localIp := getOutboundIP()
tracer := o.TracerProvider.Tracer(instrumentationName)
return func(next tars.ClientFilter) tars.ClientFilter {
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
var span trace.Span
servants := strings.Split(msg.Req.SServantName, ".")
ctx, span = tracer.Start(
ctx,
fmt.Sprintf("%s.%s", servants[2], msg.Req.SFuncName),
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("tars.interface", msg.Req.SServantName),
attribute.String("tars.method", msg.Req.SFuncName),
attribute.Int("tars.request_id", int(msg.Req.IRequestId)),
attribute.String("tars.local_ip", localIp),
attribute.String("tars.protocol", "tars"),
attribute.String("tars.client.version", tars.Version),
),
)
ctx = o.inject(ctx, msg)
defer func() {
ip, _ := current.GetServerIPFromContext(ctx)
port, _ := current.GetServerPortFromContext(ctx)
span.SetAttributes(attribute.String("tars.server.ipv4", ip))
span.SetAttributes(attribute.String("tars.server.port", port))
span.End()
}()

err = next(ctx, msg, invoke, timeout)
if err != nil {
span.SetStatus(codes.Error, "client failed")
span.RecordError(err)
span.SetAttributes(TarsRpcRetKey.Int64(int64(codes.Error)))
} else {
span.SetAttributes(TarsRpcRetKey.Int64(int64(msg.Resp.IRet)))
}
return err
}
}
}

func (o *Opentelemetry) inject(ctx context.Context, msg *tars.Message) context.Context {
if msg.Req.Status == nil {
msg.Req.Status = make(map[string]string)
}
o.Propagators.Inject(ctx, propagation.MapCarrier(msg.Req.Status))
return ctx
}

func getOutboundIP() string {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return ""
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP.String()
}
9 changes: 5 additions & 4 deletions examples/GinHttpServer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
golang.org/x/text v0.3.7 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
Expand Down
1 change: 1 addition & 0 deletions examples/OpentelemetryServer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OpentelemetryServer
6 changes: 6 additions & 0 deletions examples/OpentelemetryServer/Opentelemetry.tars
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module StressTest{
interface Opentelemetry {
int Add(int a,int b,out int c); // Some example function
int Sub(int a,int b,out int c); // Some example function
};
};
57 changes: 57 additions & 0 deletions examples/OpentelemetryServer/OpentelemetryImp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/util/current"
)

var name = "OpentelemetryImp"

type OpentelemetryImp struct {
}

func (imp *OpentelemetryImp) Add(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
span := trace.SpanContextFromContext(ctx)
logger := tars.GetLogger("context")
data, err := span.MarshalJSON()
if err != nil {
logger.Errorf("MarshalJSON err:%v", err)
}
logger.Info("span", string(data))
ip, ok := current.GetClientIPFromContext(ctx)
if !ok {
logger.Error("Error getting ip from context")
}
logger.Infof("Get Client Ip : %s from context", ip)
reqContext, ok := current.GetRequestContext(ctx)
if !ok {
logger.Error("Error getting reqcontext from context")
}
logger.Infof("Get context from context: %v", reqContext)
k := make(map[string]string)
k["resp"] = "respform context"
ok = current.SetResponseContext(ctx, k)
if !ok {
logger.Error("error setting respose context")
}
imp.Sub(ctx, a, b, c)
//Doing something in your function
//...
*c = a * b
return 0, nil
}

func (imp *OpentelemetryImp) Sub(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
_, span := otel.Tracer(name).Start(ctx, "Sub")
defer span.End()
span.SetAttributes(attribute.Int64("request.a", int64(a)))
span.SetAttributes(attribute.Int64("request.b", int64(b)))
//...
return 0, nil
}
23 changes: 23 additions & 0 deletions examples/OpentelemetryServer/OpentelemetryServer.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<tars>
<application>
<server>
app=StressTest
server=OpentelemetryServer
local=tcp -h 127.0.0.1 -p 10027 -t 30000
logpath=/tmp
<StressTest.OpentelemetryServer.OpenTelemetryObjAdapter>
allow
endpoint=tcp -h 127.0.0.1 -p 10028 -t 60000
handlegroup=StressTest.OpentelemetryServer.OpenTelemetryObjAdapter
maxconns=200000
protocol=tars
queuecap=10000
queuetimeout=60000
servant=StressTest.OpentelemetryServer.OpenTelemetryObj
shmcap=0
shmkey=0
threads=1
</StressTest.OpentelemetryServer.OpenTelemetryObjAdapter>
</server>
</application>
</tars>
31 changes: 31 additions & 0 deletions examples/OpentelemetryServer/OpentelemetryServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"fmt"
"log"

"OpentelemetryServer/tars-protocol/StressTest"
"OpentelemetryServer/tracer"

"github.com/TarsCloud/TarsGo/contrib/middleware/opentelemetry"
"github.com/TarsCloud/TarsGo/tars"
)

func main() {
cfg := tars.GetServerConfig()
serviceNameKey := fmt.Sprintf("%s.%s", cfg.App, cfg.Server)
tp := tracer.NewTracerProvider(serviceNameKey, "")
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
filter := opentelemetry.New()
tars.UseServerFilterMiddleware(filter.BuildServerFilter())
tars.UseClientFilterMiddleware(filter.BuildClientFilter())
imp := new(OpentelemetryImp) //New Imp
app := new(StressTest.ContextTest) //New init the A Tars
app.AddServantWithContext(imp, cfg.App+"."+cfg.Server+".OpenTelemetryObj") //Register Servant
tars.Run()
}
Loading