Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
fix: replaced additional map with TraceID to structure in for monitor…
Browse files Browse the repository at this point in the history
…ed transaction
  • Loading branch information
P1sar committed Aug 29, 2023
1 parent 88dda81 commit 77199fc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 47 deletions.
67 changes: 28 additions & 39 deletions chains/evm/calls/transactor/monitored/monitored.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type RawTx struct {
creationTime time.Time
}

type RawTxWithTraceID struct {
RawTx
traceID traceapi.TraceID
}

type MonitoredTransactor struct {
txFabric calls.TxFabric
gasPriceClient calls.GasPricer
Expand All @@ -37,8 +42,7 @@ type MonitoredTransactor struct {
maxGasPrice *big.Int
increasePercentage *big.Int

pendingTxns map[common.Hash]RawTx
pendingTxnsTrace map[common.Hash]traceapi.TraceID
pendingTxns map[common.Hash]RawTxWithTraceID

txLock sync.Mutex
}
Expand All @@ -60,30 +64,27 @@ func NewMonitoredTransactor(
client: client,
gasPriceClient: gasPriceClient,
txFabric: txFabric,
pendingTxns: make(map[common.Hash]RawTx),
pendingTxnsTrace: make(map[common.Hash]traceapi.TraceID),
pendingTxns: make(map[common.Hash]RawTxWithTraceID),
maxGasPrice: maxGasPrice,
increasePercentage: increasePercentage,
}
}

func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.EVMListener.ListenToEvents")

_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor")
defer span.End()
t.client.LockNonce()
defer t.client.UnlockNonce()

n, err := t.client.UnsafeNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err))
span.End()
return &common.Hash{}, err
}

err = transactor.MergeTransactionOptions(&opts, &transactor.DefaultTransactionOptions)
if err != nil {
span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err))
span.End()
return &common.Hash{}, err
}

Expand All @@ -100,39 +101,41 @@ func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address,
span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gp", gp[0].String())))
}

rawTx := RawTx{
to: to,
nonce: n.Uint64(),
value: opts.Value,
gasLimit: opts.GasLimit,
gasPrice: gp,
data: data,
submitTime: time.Now(),
creationTime: time.Now(),
rawTx := RawTxWithTraceID{
RawTx{
to: to,
nonce: n.Uint64(),
value: opts.Value,
gasLimit: opts.GasLimit,
gasPrice: gp,
data: data,
submitTime: time.Now(),
creationTime: time.Now(),
},
span.SpanContext().TraceID(),
}
tx, err := t.txFabric(rawTx.nonce, rawTx.to, rawTx.value, rawTx.gasLimit, rawTx.gasPrice, rawTx.data)
if err != nil {
span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err))
span.End()
return &common.Hash{}, err
}

h, err := t.client.SignAndSendTransaction(context.TODO(), tx)
if err != nil {
span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err))
span.End()
span.SetStatus(codes.Error, "unable to SignAndSendTransaction")
return &common.Hash{}, err
}
span.AddEvent("Executed transaction", traceapi.WithAttributes(attribute.String("tx.hash", h.String())))

t.txLock.Lock()
t.pendingTxns[h] = rawTx
t.pendingTxnsTrace[h] = span.SpanContext().TraceID()
t.txLock.Unlock()

err = t.client.UnsafeIncreaseNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err))
span.End()
span.SetStatus(codes.Error, "unable to UnsafeIncreaseNonce")
return &common.Hash{}, err
}

Expand All @@ -154,27 +157,17 @@ func (t *MonitoredTransactor) Monitor(
case <-ticker.C:
{
t.txLock.Lock()
pendingTxCopy := make(map[common.Hash]RawTx, len(t.pendingTxns))
pendingTxCopy := make(map[common.Hash]RawTxWithTraceID, len(t.pendingTxns))
for k, v := range t.pendingTxns {
pendingTxCopy[k] = v
}
pendingTxTraceIDCopy := make(map[common.Hash]traceapi.TraceID, len(t.pendingTxnsTrace))
for k, v := range t.pendingTxnsTrace {
pendingTxTraceIDCopy[k] = v
}
t.txLock.Unlock()

for oldHash, tx := range pendingTxCopy {
if time.Since(tx.submitTime) < tooNewTransaction {
continue
}
tID, ok := pendingTxTraceIDCopy[oldHash]
if ok {
// Creating span context with existing TraceID
spanCtx := traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tID, Remote: true})
ctx = traceapi.ContextWithSpanContext(ctx, spanCtx)
}
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.transactor.Monitor")
txContextWithSpan, span := otel.Tracer("relayer-core").Start(traceapi.ContextWithSpanContext(ctx, traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tx.traceID})), "relayer.core.evm.transactor.Monitor")
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

receipt, err := t.client.TransactionReceipt(context.Background(), oldHash)
Expand All @@ -191,7 +184,6 @@ func (t *MonitoredTransactor) Monitor(
span.End()
}
delete(t.pendingTxns, oldHash)
delete(t.pendingTxnsTrace, oldHash)
continue
}

Expand All @@ -200,11 +192,10 @@ func (t *MonitoredTransactor) Monitor(
span.RecordError(fmt.Errorf("transaction has timed out"), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
span.End()
delete(t.pendingTxns, oldHash)
delete(t.pendingTxnsTrace, oldHash)
continue
}

hash, err := t.resendTransaction(ctx, &tx)
hash, err := t.resendTransaction(txContextWithSpan, &tx.RawTx)
if err != nil {
span.RecordError(fmt.Errorf("error resending transaction %w", err), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce))))
logger.Warn().Uint64("nonce", tx.nonce).Err(err).Msgf("Failed resending transaction %s", oldHash)
Expand All @@ -214,9 +205,7 @@ func (t *MonitoredTransactor) Monitor(
span.End()

delete(t.pendingTxns, oldHash)
delete(t.pendingTxnsTrace, oldHash)
t.pendingTxns[hash] = tx
t.pendingTxnsTrace[hash] = tID
}
}
}
Expand Down Expand Up @@ -246,7 +235,7 @@ func (t *MonitoredTransactor) resendTransaction(ctx context.Context, tx *RawTx)
// would be 11 (it floors the value). In case the gas price didn't
// change it increases it by 1.
func (t *MonitoredTransactor) IncreaseGas(ctx context.Context, oldGp []*big.Int) []*big.Int {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.sygma.evm.transactor.Monitor.IncreaseGas")
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor.IncreaseGas")
newGp := make([]*big.Int, len(oldGp))
for i, gp := range oldGp {
percentIncreaseValue := new(big.Int).Div(new(big.Int).Mul(gp, t.increasePercentage), big.NewInt(100))
Expand Down
9 changes: 1 addition & 8 deletions chains/evm/calls/transactor/signAndSend/signAndSend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,19 @@ func NewSignAndSendTransactor(txFabric calls.TxFabric, gasPriceClient calls.GasP

func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) {
_, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.Transactor.signAndSendTransactor.Transact")
defer span.End()
t.client.LockNonce()
n, err := t.client.UnsafeNonce()
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err))
span.End()
return &common.Hash{}, err
}

err = transactor.MergeTransactionOptions(&opts, &transactor.DefaultTransactionOptions)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err))
span.End()
return &common.Hash{}, err
}

Expand All @@ -55,7 +54,6 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to define gas price with err: %w", err))
span.End()
return &common.Hash{}, err
}
}
Expand All @@ -70,33 +68,28 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err))
span.End()
return &common.Hash{}, err
}

h, err := t.client.SignAndSendTransaction(context.TODO(), tx)
if err != nil {
t.client.UnlockNonce()
span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err))
span.End()
return &common.Hash{}, err
}

err = t.client.UnsafeIncreaseNonce()
t.client.UnlockNonce()
if err != nil {
span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err))
span.End()
return &common.Hash{}, err
}

_, err = t.client.WaitAndReturnTxReceipt(h)
if err != nil {
span.RecordError(fmt.Errorf("unable to WaitAndReturnTxReceipt with err: %w", err))
span.End()
return &common.Hash{}, err
}
span.SetStatus(codes.Ok, "Transaction sent")
span.End()
return &h, nil
}

0 comments on commit 77199fc

Please sign in to comment.