Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add traces across relayer #205

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,16 @@ import (
substrateExecutor "github.com/ChainSafe/sygma-relayer/chains/substrate/executor"
substrate_listener "github.com/ChainSafe/sygma-relayer/chains/substrate/listener"
substrate_pallet "github.com/ChainSafe/sygma-relayer/chains/substrate/pallet"
"github.com/ChainSafe/sygma-relayer/metrics"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"

"github.com/ChainSafe/sygma-relayer/comm/elector"
"github.com/ChainSafe/sygma-relayer/comm/p2p"
"github.com/ChainSafe/sygma-relayer/config"
"github.com/ChainSafe/sygma-relayer/health"
"github.com/ChainSafe/sygma-relayer/jobs"
"github.com/ChainSafe/sygma-relayer/keyshare"
"github.com/ChainSafe/sygma-relayer/metrics"
"github.com/ChainSafe/sygma-relayer/topology"
"github.com/ChainSafe/sygma-relayer/tss"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -132,7 +131,11 @@ func Run() error {
exitLock := &sync.RWMutex{}
defer exitLock.Lock()

mp, err := opentelemetry.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

OTLPResource := opentelemetry.InitResource(fmt.Sprintf("Relayer-%s", configuration.RelayerConfig.Id), configuration.RelayerConfig.Env)
mp, err := opentelemetry.InitMetricProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
Expand All @@ -146,8 +149,16 @@ func Run() error {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tp, err := opentelemetry.InitTracesProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Error().Msgf("Error shutting down tracer provider: %v", err)
}
}()

chains := []relayer.RelayedChain{}
for _, chainConfig := range configuration.ChainConfigs {
switch chainConfig["type"] {
Expand Down
1 change: 1 addition & 0 deletions chains/evm/calls/contracts/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (c *BridgeContract) ExecuteProposal(
}

func (c *BridgeContract) ExecuteProposals(
ctx context.Context,
proposals []*chains.Proposal,
signature []byte,
opts transactor.TransactOptions,
Expand Down
7 changes: 3 additions & 4 deletions chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type BatchProposalExecutor interface {
Execute(msgs []*message.Message) error
Execute(ctx context.Context, msgs []*message.Message) error
}

type EVMChain struct {
Expand Down Expand Up @@ -53,13 +53,12 @@ func NewEVMChain(
}
}

func (c *EVMChain) Write(msgs []*message.Message) error {
err := c.executor.Execute(msgs)
func (c *EVMChain) Write(ctx context.Context, msgs []*message.Message) error {
err := c.executor.Execute(ctx, msgs)
if err != nil {
log.Err(err).Str("domainID", string(c.domainID)).Msgf("error writing messages %+v", msgs)
return err
}

return nil
}

Expand Down
67 changes: 45 additions & 22 deletions chains/evm/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ import (
"sync"
"time"

"github.com/binance-chain/tss-lib/common"
"github.com/sourcegraph/conc/pool"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"

"github.com/ChainSafe/chainbridge-core/chains/evm/calls/transactor"
"github.com/ChainSafe/chainbridge-core/chains/evm/executor/proposal"
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/sygma-relayer/chains"
"github.com/ChainSafe/sygma-relayer/comm"
"github.com/ChainSafe/sygma-relayer/tss"
"github.com/ChainSafe/sygma-relayer/tss/signing"
"github.com/binance-chain/tss-lib/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceapi "go.opentelemetry.io/otel/trace"
)

var (
Expand All @@ -37,7 +39,7 @@ type MessageHandler interface {

type BridgeContract interface {
IsProposalExecuted(p *chains.Proposal) (bool, error)
ExecuteProposals(proposals []*chains.Proposal, signature []byte, opts transactor.TransactOptions) (*ethCommon.Hash, error)
ExecuteProposals(ctx context.Context, proposals []*chains.Proposal, signature []byte, opts transactor.TransactOptions) (*ethCommon.Hash, error)
ProposalsHash(proposals []*chains.Proposal) ([]byte, error)
}

Expand Down Expand Up @@ -72,26 +74,34 @@ func NewExecutor(
}

// Execute starts a signing process and executes proposals when signature is generated
func (e *Executor) Execute(msgs []*message.Message) error {
func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {
e.exitLock.RLock()
defer e.exitLock.RUnlock()
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.Execute")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

proposals := make([]*chains.Proposal, 0)
for _, m := range msgs {
logger.Debug().Str("msg.id", m.ID()).Msgf("Message to execute %s", m.String())
span.AddEvent("Message to execute received", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
prop, err := e.mh.HandleMessage(m)
if err != nil {
return err
return fmt.Errorf("failed to handle message %s with error: %w", m.String(), err)
}
evmProposal := chains.NewProposal(prop.Source, prop.Destination, prop.DepositNonce, prop.ResourceId, prop.Data, prop.Metadata)
isExecuted, err := e.bridge.IsProposalExecuted(evmProposal)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}
if isExecuted {
log.Info().Msgf("Prop %p already executed", prop)
logger.Info().Str("msg.id", m.ID()).Msgf("Message already executed %s", m.String())
span.AddEvent("Message already executed", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
continue
}

logger.Info().Str("msg.id", m.ID()).Msgf("Executing message %s", m.String())
span.AddEvent("Executing message", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
proposals = append(proposals, evmProposal)
}
if len(proposals) == 0 {
Expand All @@ -100,10 +110,14 @@ func (e *Executor) Execute(msgs []*message.Message) error {

propHash, err := e.bridge.ProposalsHash(proposals)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}

sessionID := e.sessionID(propHash)

span.AddEvent("SessionID created", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))

msg := big.NewInt(0)
msg.SetBytes(propHash)
signing, err := signing.NewSigning(
Expand All @@ -113,26 +127,29 @@ func (e *Executor) Execute(msgs []*message.Message) error {
e.comm,
e.fetcher)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}

sigChn := make(chan interface{})
executionContext, cancelExecution := context.WithCancel(context.Background())
watchContext, cancelWatch := context.WithCancel(context.Background())
executionContext, cancelExecution := context.WithCancel(ctx)
watchContext, cancelWatch := context.WithCancel(ctx)
pool := pool.New().WithErrors()
pool.Go(func() error {
err := e.coordinator.Execute(executionContext, signing, sigChn)
if err != nil {
cancelWatch()
}

return err
})
pool.Go(func() error { return e.watchExecution(watchContext, cancelExecution, proposals, sigChn, sessionID) })
return pool.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.watchExecution")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
Expand All @@ -149,25 +166,28 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}

signatureData := sigResult.(*common.SignatureData)
hash, err := e.executeProposal(proposals, signatureData)
hash, err := e.executeProposal(ctx, proposals, signatureData)
if err != nil {
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
_ = e.comm.Broadcast(ctx, e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
span.SetStatus(codes.Error, fmt.Errorf("executing proposel has failed %w", err).Error())
return err
}

log.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
logger.Debug().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
P1sar marked this conversation as resolved.
Show resolved Hide resolved
}
case <-ticker.C:
{
if !e.areProposalsExecuted(proposals, sessionID) {
continue
}

log.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
logger.Debug().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
span.AddEvent("Proposals executed", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))
return nil
}
case <-timeout.C:
{
span.SetStatus(codes.Error, fmt.Errorf("execution timed out in %s", signingTimeout).Error())
return fmt.Errorf("execution timed out in %s", signingTimeout)
}
case <-ctx.Done():
Expand All @@ -178,7 +198,9 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
}

func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
func (e *Executor) executeProposal(ctx context.Context, proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.executeProposal")
defer span.End()
sig := []byte{}
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.R, 32)...)
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.S, 32)...)
Expand All @@ -191,13 +213,14 @@ func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *
gasLimit = l.(uint64)
}

hash, err := e.bridge.ExecuteProposals(proposals, sig, transactor.TransactOptions{
hash, err := e.bridge.ExecuteProposals(ctx, proposals, sig, transactor.TransactOptions{
GasLimit: gasLimit,
})
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.AddEvent("Deposit execution sent", traceapi.WithAttributes(attribute.String("tx.hash", hash.String())))
P1sar marked this conversation as resolved.
Show resolved Hide resolved
return hash, err
}

Expand Down
Loading
Loading