Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
propagete context to many functions, add tracer to logger, add tracer…
Browse files Browse the repository at this point in the history
… to some more functions
  • Loading branch information
P1sar committed Jul 27, 2023
1 parent c70371b commit 3f18e63
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 22 deletions.
2 changes: 1 addition & 1 deletion chains/evm/calls/events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewListener(client ChainClient) *Listener {

func (l *Listener) FetchDeposits(ctx context.Context, contractAddress common.Address, startBlock *big.Int, endBlock *big.Int) ([]*Deposit, error) {
tp := otel.GetTracerProvider()
ctxWithSpan, span := tp.Tracer("relayer-core-tracer").Start(ctx, "relayer.core.Listener.FetchDeposits")
ctxWithSpan, span := tp.Tracer("relayer-events").Start(ctx, "relayer.core.Listener.FetchDeposits")
defer span.End()
span.SetAttributes(attribute.String("startBlock", startBlock.String()), attribute.String("endBlock", endBlock.String()))

Expand Down
6 changes: 3 additions & 3 deletions chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type EventListener interface {
}

type ProposalExecutor interface {
Execute(message *message.Message) error
Execute(ctx context.Context, message *message.Message) error
}

// EVMChain is struct that aggregates all data required for
Expand Down Expand Up @@ -64,10 +64,10 @@ func (c *EVMChain) PollEvents(ctx context.Context, sysErr chan<- error, msgChan
go c.listener.ListenToEvents(ctx, startBlock, msgChan, sysErr)
}

func (c *EVMChain) Write(msg []*message.Message) error {
func (c *EVMChain) Write(ctx context.Context, msg []*message.Message) error {
for _, msg := range msg {
go func(msg *message.Message) {
err := c.writer.Execute(msg)
err := c.writer.Execute(ctx, msg)
if err != nil {
log.Err(err).Msgf("Failed writing message %v", msg.String())
}
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/executor/message-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (mh *EVMMessageHandler) HandleMessage(m *message.Message) (*proposal.Propos
if err != nil {
return nil, err
}
log.Info().Str("type", string(m.Type)).Uint8("src", m.Source).Uint8("dst", m.Destination).Uint64("nonce", m.DepositNonce).Str("resourceID", fmt.Sprintf("%x", m.ResourceId)).Msg("Handling new message")
log.Info().Str("msg_id", m.ID()).Msg("Handling new message")
prop, err := handleMessage(m, addr, *mh.handlerMatcher.ContractAddress())
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/executor/voter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewVoter(mh MessageHandler, client ChainClient, bridgeContract BridgeContra

// Execute checks if relayer already voted and is threshold
// satisfied and casts a vote if it isn't.
func (v *EVMVoter) Execute(m *message.Message) error {
func (v *EVMVoter) Execute(ctx context.Context, m *message.Message) error {
prop, err := v.mh.HandleMessage(m)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions chains/evm/listener/event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func NewDepositEventHandler(eventListener EventListener, depositHandler DepositH
}
}

func (eh *DepositEventHandler) HandleEvent(startBlock *big.Int, endBlock *big.Int, msgChan chan []*message.Message) error {
func (eh *DepositEventHandler) HandleEvent(ctx context.Context, startBlock *big.Int, endBlock *big.Int, msgChan chan []*message.Message) error {
tp := otel.GetTracerProvider()
ctxWithSpan, span := tp.Tracer("relayer-core-tracer").Start(context.Background(), "relayer.core.DepositEventHandler.HandleEvent")
ctxWithSpan, span := tp.Tracer("relayer-listener").Start(ctx, "relayer.core.DepositEventHandler.HandleEvent")
defer span.End()
span.SetAttributes(attribute.String("startBlock", startBlock.String()), attribute.String("endBlock", endBlock.String()))
logger := log.With().Str("trace_id", span.SpanContext().TraceID().String()).Logger()
Expand Down
27 changes: 20 additions & 7 deletions chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ package listener

import (
"context"
"fmt"
"math/big"
"time"

"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/chainbridge-core/store"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

type EventHandler interface {
HandleEvent(startBlock *big.Int, endBlock *big.Int, msgChan chan []*message.Message) error
HandleEvent(ctx context.Context, startBlock *big.Int, endBlock *big.Int, msgChan chan []*message.Message) error
}

type ChainClient interface {
Expand Down Expand Up @@ -75,10 +77,15 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, m
case <-ctx.Done():
return
default:
tp := otel.GetTracerProvider()
ctxWithSpan, span := tp.Tracer("relayer-listener").Start(ctx, "relayer.core.EVMListener.ListenToEvents")
logger := l.log.With().Str("trace_id", span.SpanContext().TraceID().String()).Logger()
head, err := l.client.LatestBlock()
if err != nil {
l.log.Error().Err(err).Msg("Unable to get latest block")
logger.Error().Err(err).Msg("Unable to get latest block")
time.Sleep(l.blockRetryInterval)
span.RecordError(fmt.Errorf("unable to get latest block with err: %w", err))
span.End()
continue
}
if startBlock == nil {
Expand All @@ -89,27 +96,33 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, m
// Sleep if the difference is less than needed block confirmations; (latest - current) < BlockDelay
if new(big.Int).Sub(head, endBlock).Cmp(l.blockConfirmations) == -1 {
time.Sleep(l.blockRetryInterval)
span.AddEvent("The block difference is too low")
span.SetStatus(codes.Ok, "The block difference is too low")
span.End()
continue
}

l.metrics.TrackBlockDelta(l.domainID, head, endBlock)
l.log.Debug().Msgf("Fetching evm events for block range %s-%s", startBlock, endBlock)
logger.Debug().Msgf("Fetching evm events for block range %s-%s", startBlock, endBlock)

for _, handler := range l.eventHandlers {
err := handler.HandleEvent(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)), msgChan)
err := handler.HandleEvent(ctxWithSpan, startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)), msgChan)
if err != nil {
l.log.Error().Err(err).Msgf("Unable to handle events")
logger.Error().Err(err).Msgf("Unable to handle events")
continue
}
}

//Write to block store. Not a critical operation, no need to retry
err = l.blockstore.StoreBlock(endBlock, l.domainID)
if err != nil {
l.log.Error().Str("block", endBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
logger.Error().Str("block", endBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
span.RecordError(fmt.Errorf("failed to write latest block to blockstore: %w", err))
}

startBlock.Add(startBlock, l.blockInterval)
span.SetStatus(codes.Ok, "Listened to events")
span.End()
}
}
}
24 changes: 20 additions & 4 deletions relayer/message/message_processors.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,68 @@
package message

import (
"context"
"errors"
"math/big"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"go.opentelemetry.io/otel"

"github.com/rs/zerolog/log"
)

type MessageProcessor func(message *Message) error
type MessageProcessor func(ctx context.Context, message *Message) error

// AdjustDecimalsForERC20AmountMessageProcessor is a function, that accepts message and map[domainID uint8]{decimal uint}
// using this params processor converts amount for one chain to another for provided decimals with floor rounding
func AdjustDecimalsForERC20AmountMessageProcessor(args ...interface{}) MessageProcessor {
return func(m *Message) error {
return func(ctx context.Context, m *Message) error {
tp := otel.GetTracerProvider()
_, span := tp.Tracer("relayer-route").Start(ctx, "relayer.core.MessageProcessor.AdjustDecimalsForERC20AmountMessageProcessor")
span.SetAttributes(attribute.String("msg_id", m.ID()), attribute.String("msg_type", string(m.Type)))
defer span.End()
if len(args) == 0 {
span.SetStatus(codes.Error, "processor requires 1 argument")
return errors.New("processor requires 1 argument")
}
decimalsMap, ok := args[0].(map[uint8]uint64)
if !ok {
span.SetStatus(codes.Error, "no decimals map found in args")
return errors.New("no decimals map found in args")
}
sourceDecimal, ok := decimalsMap[m.Source]
if !ok {
span.SetStatus(codes.Error, "no source decimals found at decimalsMap")
return errors.New("no source decimals found at decimalsMap")
}
destDecimal, ok := decimalsMap[m.Destination]
if !ok {
span.SetStatus(codes.Error, "no destination decimals found at decimalsMap")
return errors.New("no destination decimals found at decimalsMap")
}
amountByte, ok := m.Payload[0].([]byte)
if !ok {
span.SetStatus(codes.Error, "could not cast interface to byte slice")
return errors.New("could not cast interface to byte slice")
}
amount := new(big.Int).SetBytes(amountByte)
if sourceDecimal > destDecimal {
diff := sourceDecimal - destDecimal
roundedAmount := big.NewInt(0)
roundedAmount.Div(amount, big.NewInt(0).Exp(big.NewInt(10), big.NewInt(0).SetUint64(diff), nil))
log.Info().Msgf("amount %s rounded to %s from chain %v to chain %v", amount.String(), roundedAmount.String(), m.Source, m.Destination)
log.Info().Str("msg_id", m.ID()).Msgf("amount %s rounded to %s from chain %v to chain %v", amount.String(), roundedAmount.String(), m.Source, m.Destination)
m.Payload[0] = roundedAmount.Bytes()
span.SetStatus(codes.Ok, "msg processed")
return nil
}
if sourceDecimal < destDecimal {
diff := destDecimal - sourceDecimal
roundedAmount := big.NewInt(0)
roundedAmount.Mul(amount, big.NewInt(0).Exp(big.NewInt(10), big.NewInt(0).SetUint64(diff), nil))
m.Payload[0] = roundedAmount.Bytes()
log.Info().Msgf("amount %s rounded to %s from chain %v to chain %v", amount.String(), roundedAmount.String(), m.Source, m.Destination)
log.Info().Str("msg_id", m.ID()).Msgf("amount %s rounded to %s from chain %v to chain %v", amount.String(), roundedAmount.String(), m.Source, m.Destination)
}
return nil
}
Expand Down
18 changes: 15 additions & 3 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ import (
"context"
"fmt"

"go.opentelemetry.io/otel/codes"

"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
traceapi "go.opentelemetry.io/otel/trace"
)

type DepositMeter interface {
Expand All @@ -19,7 +24,7 @@ type DepositMeter interface {

type RelayedChain interface {
PollEvents(ctx context.Context, sysErr chan<- error, msgChan chan []*message.Message)
Write(messages []*message.Message) error
Write(ctx context.Context, messages []*message.Message) error
DomainID() uint8
}

Expand Down Expand Up @@ -59,25 +64,31 @@ func (r *Relayer) Start(ctx context.Context, sysErr chan error) {

// Route function runs destination writer by mapping DestinationID from message to registered writer.
func (r *Relayer) route(msgs []*message.Message) {
tp := otel.GetTracerProvider()
ctxWithSpan, span := tp.Tracer("relayer-route").Start(context.Background(), "relayer.core.Route")
defer span.End()

destChain, ok := r.registry[msgs[0].Destination]
if !ok {
log.Error().Msgf("no resolver for destID %v to send message registered", msgs[0].Destination)
span.SetStatus(codes.Error, fmt.Sprintf("no resolver for destID %v to send message registered", msgs[0].Destination))
return
}

log.Debug().Msgf("Routing %d messages to destination %d", len(msgs), destChain.DomainID())
for _, m := range msgs {
span.AddEvent("Routing message", traceapi.WithAttributes(attribute.String("msg_id", m.ID()), attribute.String("msg_type", string(m.Type))))
log.Debug().Str("msg_id", m.ID()).Msgf("Routing message %+v", m.String())
r.metrics.TrackDepositMessage(m)
for _, mp := range r.messageProcessors {
if err := mp(m); err != nil {
if err := mp(ctxWithSpan, m); err != nil {
log.Error().Str("msg_id", m.ID()).Err(fmt.Errorf("error %w processing message %v", err, m.String()))
return
}
}
}

err := destChain.Write(msgs)
err := destChain.Write(ctxWithSpan, msgs)
if err != nil {
for _, m := range msgs {
log.Err(err).Str("msg_id", m.ID()).Msgf("Failed sending message %s to destination %v", m.String(), destChain.DomainID())
Expand All @@ -89,6 +100,7 @@ func (r *Relayer) route(msgs []*message.Message) {
for _, m := range msgs {
r.metrics.TrackSuccessfulExecutionLatency(m)
}
span.SetStatus(codes.Ok, "messages routed")
}

func (r *Relayer) addRelayedChain(c RelayedChain) {
Expand Down

0 comments on commit 3f18e63

Please sign in to comment.