Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
feat: implement block delta metric
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Jun 2, 2023
1 parent d75ae73 commit 81b5362
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 13 deletions.
8 changes: 8 additions & 0 deletions chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ type ChainClient interface {
LatestBlock() (*big.Int, error)
}

type Metrics interface {
TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int)
}

type EVMListener struct {
client ChainClient
eventHandlers []EventHandler
metrics Metrics

domainID uint8
blockstore *store.BlockStore
Expand All @@ -42,6 +47,7 @@ func NewEVMListener(
client ChainClient,
eventHandlers []EventHandler,
blockstore *store.BlockStore,
metrics Metrics,
domainID uint8,
blockRetryInterval time.Duration,
blockConfirmations *big.Int,
Expand All @@ -50,6 +56,7 @@ func NewEVMListener(
return &EVMListener{
log: logger,
client: client,
metrics: metrics,
eventHandlers: eventHandlers,
blockstore: blockstore,
domainID: domainID,
Expand Down Expand Up @@ -85,6 +92,7 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, m
continue
}

l.metrics.TrackBlockDelta(l.domainID, head, endBlock)
l.log.Debug().Msgf("Fetching evm events for block range %s-%s", startBlock, endBlock)

for _, handler := range l.eventHandlers {
Expand Down
13 changes: 7 additions & 6 deletions example/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func Run() error {
}
blockstore := store.NewBlockStore(db)

meter, err := opentelemetry.DefaultMeter(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
metrics := opentelemetry.NewOpenTelemetry(meter)

ctx, cancel := context.WithCancel(context.Background())
chains := []relayer.RelayedChain{}
for _, chainConfig := range configuration.ChainConfigs {
Expand Down Expand Up @@ -82,7 +88,7 @@ func Run() error {
eventListener := events.NewListener(client)
eventHandlers := make([]listener.EventHandler, 0)
eventHandlers = append(eventHandlers, listener.NewDepositEventHandler(eventListener, depositHandler, common.HexToAddress(config.Bridge), *config.GeneralChainConfig.Id))
evmListener := listener.NewEVMListener(client, eventHandlers, blockstore, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockConfirmations, config.BlockInterval)
evmListener := listener.NewEVMListener(client, eventHandlers, blockstore, metrics, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockConfirmations, config.BlockInterval)

mh := executor.NewEVMMessageHandler(bridgeContract)
mh.RegisterMessageHandler(config.Erc20Handler, executor.ERC20MessageHandler)
Expand All @@ -105,11 +111,6 @@ func Run() error {
}
}

meter, err := opentelemetry.DefaultMeter(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
metrics := opentelemetry.NewOpenTelemetry(meter)
r := relayer.NewRelayer(
chains,
metrics,
Expand Down
21 changes: 20 additions & 1 deletion opentelemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package opentelemetry

import (
"context"
"math/big"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/unit"
)
Expand All @@ -10,11 +14,15 @@ type ChainbridgeMetrics struct {
ExecutionErrorCount metric.Int64Counter
ExecutionLatency metric.Int64Histogram
ExecutionLatencyPerRoute metric.Int64Histogram
BlockDelta metric.Int64GaugeObserver

BlockDeltaMap map[uint8]*big.Int
}

// NewChainbridgeMetrics creates an instance of ChainbridgeMetrics
// with provided OpenTelemetry meter
func NewChainbridgeMetrics(meter metric.Meter) *ChainbridgeMetrics {
func NewChainbridgeMetrics(meter metric.Meter, genericLabels ...attribute.KeyValue) *ChainbridgeMetrics {
blockDeltaMap := make(map[uint8]*big.Int)
return &ChainbridgeMetrics{
DepositEventCount: metric.Must(meter).NewInt64Counter(
"chainbridge.DepositEventCount",
Expand All @@ -33,5 +41,16 @@ func NewChainbridgeMetrics(meter metric.Meter) *ChainbridgeMetrics {
metric.WithDescription("Execution time histogram between indexing event and executing it"),
metric.WithUnit(unit.Milliseconds),
),
BlockDelta: metric.Must(meter).NewInt64GaugeObserver(
"chainbridge.BlockDelta",
func(ctx context.Context, result metric.Int64ObserverResult) {
for domainID, delta := range blockDeltaMap {
labels := append(genericLabels, attribute.Int64("domainID", int64(domainID)))
result.Observe(delta.Int64(), labels...)
}
},
metric.WithDescription("Difference between chain head and current indexed block per domain"),
),
BlockDeltaMap: blockDeltaMap,
}
}
26 changes: 20 additions & 6 deletions opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package opentelemetry

import (
"context"
"math/big"
"net/url"
"time"

Expand Down Expand Up @@ -52,37 +53,50 @@ func DefaultMeter(ctx context.Context, collectorRawURL string) (metric.Meter, er

type OpenTelemetry struct {
metrics *ChainbridgeMetrics
meter metric.Meter
messageEventTime map[string]time.Time
genericLabels []attribute.KeyValue
}

// NewOpenTelemetry initializes OpenTelementry metrics
func NewOpenTelemetry(meter metric.Meter) *OpenTelemetry {
metrics := NewChainbridgeMetrics(meter)
func NewOpenTelemetry(meter metric.Meter, labels ...attribute.KeyValue) *OpenTelemetry {
metrics := NewChainbridgeMetrics(meter, labels...)
return &OpenTelemetry{
metrics: metrics,
meter: meter,
genericLabels: labels,
messageEventTime: make(map[string]time.Time),
}
}

// TrackDepositMessage extracts metrics from deposit message and sends
// them to OpenTelemetry collector
func (t *OpenTelemetry) TrackDepositMessage(m *message.Message) {
t.metrics.DepositEventCount.Add(context.Background(), 1, attribute.Int64("source", int64(m.Source)))
labels := append(t.genericLabels, attribute.Int64("source", int64(m.Source)))
t.metrics.DepositEventCount.Add(context.Background(), 1, labels...)
t.messageEventTime[m.ID()] = time.Now()
}

func (t *OpenTelemetry) TrackExecutionError(m *message.Message) {
t.metrics.ExecutionErrorCount.Add(context.Background(), 1, attribute.Int64("destination", int64(m.Source)))
labels := append(t.genericLabels, attribute.Int64("destination", int64(m.Source)))
t.metrics.ExecutionErrorCount.Add(context.Background(), 1, labels...)
delete(t.messageEventTime, m.ID())
}

func (t *OpenTelemetry) TrackSuccessfulExecution(m *message.Message) {
labels := append(t.genericLabels, attribute.Int64("source", int64(m.Source)))
labels = append(labels, attribute.Int64("destination", int64(m.Destination)))
executionLatency := time.Since(t.messageEventTime[m.ID()]).Milliseconds() / 1000
t.metrics.ExecutionLatency.Record(context.Background(), executionLatency)
t.metrics.ExecutionLatencyPerRoute.Record(
context.Background(),
executionLatency,
attribute.Int64("source", int64(m.Source)),
attribute.Int64("destination", int64(m.Destination)))
labels...,
)
delete(t.messageEventTime, m.ID())
}

func (t *OpenTelemetry) TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int) {
t.metrics.BlockDeltaMap[domainID] = new(big.Int).Sub(head, current)
t.meter.RecordBatch(context.Background(), []attribute.KeyValue{})
}

0 comments on commit 81b5362

Please sign in to comment.