Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
feat: refactoring of the traces usage with traceutils
Browse files Browse the repository at this point in the history
  • Loading branch information
P1sar committed Sep 8, 2023
1 parent 77199fc commit fe75ae0
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 214 deletions.
2 changes: 1 addition & 1 deletion chains/evm/calls/contracts/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *Contract) ExecuteTransaction(ctx context.Context, method string, opts t
Str("txHash", h.String()).
Str("contract", c.contractAddress.String()).
Msgf("method %s executed", method)
return h, err
return h, nil
}

func (c *Contract) CallContract(method string, args ...interface{}) ([]interface{}, error) {
Expand Down
10 changes: 9 additions & 1 deletion chains/evm/calls/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package events
import (
"fmt"

"github.com/status-im/keycard-go/hexutils"

"github.com/ChainSafe/chainbridge-core/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -42,5 +44,11 @@ type Deposit struct {
}

func (d *Deposit) TraceEventAttributes() []attribute.KeyValue {
return []attribute.KeyValue{attribute.Int("deposit.dstdomain", int(d.DestinationDomainID)), attribute.String("deposit.rID", fmt.Sprintf("%x", d.ResourceID)), attribute.String("tx.sender", d.SenderAddress.Hex())}
return []attribute.KeyValue{attribute.Int("deposit.dstdomain", int(d.DestinationDomainID)), attribute.String("deposit.rID", fmt.Sprintf("%x", d.ResourceID)), attribute.String("deposit.sender", d.SenderAddress.Hex())}
}

func (d *Deposit) String() string {
return fmt.Sprintf(
`Destination: %d,DepositNonce: %d,ResourceID: %x,Sender: %s, Data: %s, HandlerResponse: %s`,
d.DestinationDomainID, d.DepositNonce, d.ResourceID, d.SenderAddress.Hex(), hexutils.BytesToHex(d.Data), hexutils.BytesToHex(d.HandlerResponse))
}
27 changes: 11 additions & 16 deletions chains/evm/calls/events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package events

import (
"context"
"fmt"
"math/big"
"strings"

"github.com/ChainSafe/chainbridge-core/chains/evm/calls/consts"
"github.com/ChainSafe/chainbridge-core/observability"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceapi "go.opentelemetry.io/otel/trace"
)

type ChainClient interface {
Expand All @@ -34,32 +32,29 @@ func NewListener(client ChainClient) *Listener {
}

func (l *Listener) FetchDeposits(ctx context.Context, contractAddress common.Address, startBlock *big.Int, endBlock *big.Int) ([]*Deposit, error) {
ctxWithSpan, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.Listener.FetchDeposits")
ctx, span, logger := observability.CreateSpanAndLoggerFromContext(ctx, "relayer-core", "relayer.core.Listener.FetchDeposits", attribute.String("startBlock", startBlock.String()), attribute.String("endBlock", endBlock.String()))
defer span.End()
span.SetAttributes(attribute.String("startBlock", startBlock.String()), attribute.String("endBlock", endBlock.String()))
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

logs, err := l.client.FetchEventLogs(ctxWithSpan, contractAddress, string(DepositSig), startBlock, endBlock)
logs, err := l.client.FetchEventLogs(ctx, contractAddress, string(DepositSig), startBlock, endBlock)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, observability.LogAndRecordErrorWithStatus(&logger, span, err, "failed FetchEventLogs")
}
deposits := make([]*Deposit, 0)

for _, dl := range logs {
d, err := l.UnpackDeposit(l.abi, dl.Data)
if err != nil {
logger.Error().Msgf("failed unpacking deposit event log: %v", err)
span.RecordError(err)
_ = observability.LogAndRecordError(&logger, span, err, "failed unpacking deposit event log")
continue
}

d.SenderAddress = common.BytesToAddress(dl.Topics[1].Bytes())
logger.Debug().Msgf("Found deposit log in block: %d, TxHash: %s, contractAddress: %s, sender: %s", dl.BlockNumber, dl.TxHash, dl.Address, d.SenderAddress)
span.AddEvent("Found deposit", traceapi.WithAttributes(append(d.TraceEventAttributes(), attribute.String("tx.hash", dl.TxHash.Hex()))...))
deposits = append(deposits, d)
observability.LogAndEvent(
logger.Debug(),
span,
fmt.Sprintf("Found deposit log in block: %d, TxHash: %s, contractAddress: %s, sender: %s", dl.BlockNumber, dl.TxHash, dl.Address, d.SenderAddress),
append(d.TraceEventAttributes(), attribute.String("tx.hash", dl.TxHash.Hex()))...)
}
span.SetStatus(codes.Ok, "Deposits fetched")
return deposits, nil
}

Expand Down
74 changes: 30 additions & 44 deletions chains/evm/calls/transactor/monitored/monitored.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (
"sync"
"time"

"github.com/ChainSafe/chainbridge-core/observability"

"github.com/ChainSafe/chainbridge-core/chains/evm/calls"
"github.com/ChainSafe/chainbridge-core/chains/evm/calls/transactor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceapi "go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -71,21 +70,20 @@ func NewMonitoredTransactor(
}

func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor")
_, span, _ := observability.CreateSpanAndLoggerFromContext(ctx, "relayer-core", "relayer.core.evm.monitoredTransactor.Transact")
defer span.End()

t.client.LockNonce()
defer t.client.UnlockNonce()

n, err := t.client.UnsafeNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to call UnsafeNonce")
}

err = transactor.MergeTransactionOptions(&opts, &transactor.DefaultTransactionOptions)
if err != nil {
span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to MergeTransactionOptions")
}

gp := []*big.Int{opts.GasPrice}
Expand Down Expand Up @@ -116,29 +114,23 @@ func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address,
}
tx, err := t.txFabric(rawTx.nonce, rawTx.to, rawTx.value, rawTx.gasLimit, rawTx.gasPrice, rawTx.data)
if err != nil {
span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to call TxFabric")
}

h, err := t.client.SignAndSendTransaction(context.TODO(), tx)
if err != nil {
span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err))
span.SetStatus(codes.Error, "unable to SignAndSendTransaction")
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to SignAndSendTransaction")
}
span.AddEvent("Executed transaction", traceapi.WithAttributes(attribute.String("tx.hash", h.String())))
span.AddEvent("Transaction sent", traceapi.WithAttributes(attribute.String("tx.hash", h.String())))

t.txLock.Lock()
t.pendingTxns[h] = rawTx
t.txLock.Unlock()

err = t.client.UnsafeIncreaseNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err))
span.SetStatus(codes.Error, "unable to UnsafeIncreaseNonce")
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to UnsafeIncreaseNonce")
}

return &h, nil
}

Expand Down Expand Up @@ -167,29 +159,25 @@ func (t *MonitoredTransactor) Monitor(
if time.Since(tx.submitTime) < tooNewTransaction {
continue
}
txContextWithSpan, span := otel.Tracer("relayer-core").Start(traceapi.ContextWithSpanContext(ctx, traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tx.traceID})), "relayer.core.evm.transactor.Monitor")
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

txContextWithSpan, span, logger := observability.CreateSpanAndLoggerFromContext(
traceapi.ContextWithSpanContext(ctx, traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tx.traceID})),
"relayer-core",
"relayer.core.evm.transactor.Monitor",
attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce)))
receipt, err := t.client.TransactionReceipt(context.Background(), oldHash)
if err == nil {
if receipt.Status == types.ReceiptStatusSuccessful {
logger.Info().Uint64("nonce", tx.nonce).Msgf("Executed transaction %s with nonce %d", oldHash, tx.nonce)
span.AddEvent("Executed transaction", traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
span.SetStatus(codes.Ok, "Executed transaction")
span.End()
observability.LogAndEvent(logger.Info(), span, fmt.Sprintf("Executed transaction %s with nonce %d", oldHash, tx.nonce))
} else {
logger.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s failed on chain", oldHash)
span.RecordError(fmt.Errorf("transaction execution failed on chain with error %w", err), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
span.SetStatus(codes.Error, "Transaction execution failed on chain")
span.End()
_ = observability.LogAndRecordErrorWithStatus(&logger, span, fmt.Errorf("on-chain execution fail"), fmt.Sprintf("transaction %s failed on chain", oldHash))
}
span.End()
delete(t.pendingTxns, oldHash)
continue
}

if time.Since(tx.creationTime) > txTimeout {
logger.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s has timed out", oldHash)
span.RecordError(fmt.Errorf("transaction has timed out"), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
_ = observability.LogAndRecordErrorWithStatus(&logger, span, fmt.Errorf("transaction has timed out"), fmt.Sprintf("transaction %s failed on chain", oldHash))
span.End()
delete(t.pendingTxns, oldHash)
continue
Expand All @@ -199,9 +187,10 @@ func (t *MonitoredTransactor) Monitor(
if err != nil {
span.RecordError(fmt.Errorf("error resending transaction %w", err), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
logger.Warn().Uint64("nonce", tx.nonce).Err(err).Msgf("Failed resending transaction %s", oldHash)
_ = observability.LogAndRecordError(&logger, span, err, "failed resending transaction")
continue
}
span.AddEvent("Resending transaction", traceapi.WithAttributes(attribute.String("tx.newHash", hash.String())))
span.AddEvent("Transaction resent", traceapi.WithAttributes(attribute.String("tx.newHash", hash.String())))
span.End()

delete(t.pendingTxns, oldHash)
Expand All @@ -213,7 +202,14 @@ func (t *MonitoredTransactor) Monitor(
}

func (t *MonitoredTransactor) resendTransaction(ctx context.Context, tx *RawTx) (common.Hash, error) {
tx.gasPrice = t.IncreaseGas(ctx, tx.gasPrice)
ctx, span, logger := observability.CreateSpanAndLoggerFromContext(ctx, "relayer-core", "relayer.core.evm.transactor.Monitor.resendTransaction")
defer span.End()
tx.gasPrice = t.IncreaseGas(tx.gasPrice)
if len(tx.gasPrice) > 1 {
observability.LogAndEvent(logger.Debug(), span, "Calculated GasPrice", attribute.String("tx.gasTipCap", tx.gasPrice[0].String()), attribute.String("tx.gasFeeCap", tx.gasPrice[1].String()))
} else {
observability.LogAndEvent(logger.Debug(), span, "Calculated GasPrice", attribute.String("tx.gp", tx.gasPrice[0].String()))
}
newTx, err := t.txFabric(tx.nonce, tx.to, tx.value, tx.gasLimit, tx.gasPrice, tx.data)
if err != nil {
return common.Hash{}, err
Expand All @@ -223,9 +219,6 @@ func (t *MonitoredTransactor) resendTransaction(ctx context.Context, tx *RawTx)
if err != nil {
return common.Hash{}, err
}

log.Debug().Uint64("nonce", tx.nonce).Msgf("Resent transaction with hash %s", hash)

return hash, nil
}

Expand All @@ -234,8 +227,7 @@ func (t *MonitoredTransactor) resendTransaction(ctx context.Context, tx *RawTx)
// If gas was 10 and the increaseFactor is 15 the new gas price
// would be 11 (it floors the value). In case the gas price didn't
// change it increases it by 1.
func (t *MonitoredTransactor) IncreaseGas(ctx context.Context, oldGp []*big.Int) []*big.Int {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor.IncreaseGas")
func (t *MonitoredTransactor) IncreaseGas(oldGp []*big.Int) []*big.Int {
newGp := make([]*big.Int, len(oldGp))
for i, gp := range oldGp {
percentIncreaseValue := new(big.Int).Div(new(big.Int).Mul(gp, t.increasePercentage), big.NewInt(100))
Expand All @@ -250,11 +242,5 @@ func (t *MonitoredTransactor) IncreaseGas(ctx context.Context, oldGp []*big.Int)
newGp[i] = increasedGp
}
}
if len(newGp) > 1 {
span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gasTipCap", newGp[0].String()), attribute.String("tx.gasFeeCap", newGp[1].String())))
} else {
span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gp", newGp[0].String())))
}
span.End()
return newGp
}
4 changes: 2 additions & 2 deletions chains/evm/calls/transactor/monitored/monitored_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *TransactorTestSuite) TestTransactor_IncreaseGas_15PercentIncrease() {
big.NewInt(150),
big.NewInt(15))

newGas := t.IncreaseGas(context.Background(), []*big.Int{big.NewInt(1), big.NewInt(10), big.NewInt(100)})
newGas := t.IncreaseGas([]*big.Int{big.NewInt(1), big.NewInt(10), big.NewInt(100)})

s.Equal(newGas, []*big.Int{big.NewInt(2), big.NewInt(11), big.NewInt(115)})
}
Expand All @@ -259,7 +259,7 @@ func (s *TransactorTestSuite) TestTransactor_IncreaseGas_MaxGasReached() {
big.NewInt(15),
big.NewInt(15))

newGas := t.IncreaseGas(context.Background(), []*big.Int{big.NewInt(1), big.NewInt(10), big.NewInt(100)})
newGas := t.IncreaseGas([]*big.Int{big.NewInt(1), big.NewInt(10), big.NewInt(100)})

s.Equal(newGas, []*big.Int{big.NewInt(2), big.NewInt(11), big.NewInt(15)})
}
37 changes: 15 additions & 22 deletions chains/evm/calls/transactor/signAndSend/signAndSend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package signAndSend

import (
"context"
"fmt"
"math/big"

"go.opentelemetry.io/otel/codes"
"github.com/ChainSafe/chainbridge-core/observability"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
traceapi "go.opentelemetry.io/otel/trace"

Expand All @@ -31,65 +29,60 @@ func NewSignAndSendTransactor(txFabric calls.TxFabric, gasPriceClient calls.GasP
}

func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.Transactor.signAndSendTransactor.Transact")
ctx, span, logger := observability.CreateSpanAndLoggerFromContext(ctx, "relayer-core", "relayer.core.Transactor.signAndSendTransactor.Transact")
defer span.End()

t.client.LockNonce()
n, err := t.client.UnsafeNonce()
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to call UnsafeNonce")
}

err = transactor.MergeTransactionOptions(&opts, &transactor.DefaultTransactionOptions)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to MergeTransactionOptions")
}

gp := []*big.Int{opts.GasPrice}
if opts.GasPrice.Cmp(big.NewInt(0)) == 0 {
gp, err = t.gasPriceClient.GasPrice(&opts.Priority)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to define gas price with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to define gas price")
}
}

if len(gp) > 1 {
span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gasTipCap", gp[0].String()), attribute.String("tx.gasFeeCap", gp[1].String())))
observability.LogAndEvent(logger.Debug(), span, "Calculated GasPrice", attribute.String("tx.gasTipCap", gp[0].String()), attribute.String("tx.gasFeeCap", gp[1].String()))
} else {
span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gp", gp[0].String())))
observability.LogAndEvent(logger.Debug(), span, "Calculated GasPrice", attribute.String("tx.gp", gp[0].String()))
}

tx, err := t.TxFabric(n.Uint64(), to, opts.Value, opts.GasLimit, gp, data)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to call TxFabric")
}

h, err := t.client.SignAndSendTransaction(context.TODO(), tx)
h, err := t.client.SignAndSendTransaction(ctx, tx)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to SignAndSendTransaction")
}

span.AddEvent("Transaction sent", traceapi.WithAttributes(attribute.String("tx.hash", h.String())))

err = t.client.UnsafeIncreaseNonce()
t.client.UnlockNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "unable to UnsafeIncreaseNonce")
}

_, err = t.client.WaitAndReturnTxReceipt(h)
if err != nil {
span.RecordError(fmt.Errorf("unable to WaitAndReturnTxReceipt with err: %w", err))
return &common.Hash{}, err
return &common.Hash{}, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to WaitAndReturnTxReceipt")
}
span.SetStatus(codes.Ok, "Transaction sent")
return &h, nil
}
4 changes: 2 additions & 2 deletions chains/evm/cli/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"time"

"github.com/ChainSafe/chainbridge-core/logger"
"github.com/ChainSafe/chainbridge-core/observability"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -43,5 +43,5 @@ func LoggerMetadata(cmdName string, flagSet *pflag.FlagSet) {
// PartsExclude - omit log level and execution time from final log
logConsoleWriter := zerolog.ConsoleWriter{Out: os.Stdout, PartsExclude: []string{"level", "time"}}
logFileWriter := zerolog.ConsoleWriter{Out: file, PartsExclude: []string{"level", "time"}}
logger.ConfigureLogger(zerolog.DebugLevel, logConsoleWriter, logFileWriter)
observability.ConfigureLogger(zerolog.DebugLevel, logConsoleWriter, logFileWriter)
}
Loading

0 comments on commit fe75ae0

Please sign in to comment.