Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/golang.org/x/net-0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
MakMuftic committed May 3, 2023
2 parents 00899fa + 2ce7357 commit 5f9b10d
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 85 deletions.
4 changes: 3 additions & 1 deletion chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *EVMChain) PollEvents(ctx context.Context, sysErr chan<- error, msgChan
go c.listener.ListenToEvents(ctx, startBlock, msgChan, sysErr)
}

func (c *EVMChain) Write(msg []*message.Message) {
func (c *EVMChain) Write(msg []*message.Message) error {
for _, msg := range msg {
go func(msg *message.Message) {
err := c.writer.Execute(msg)
Expand All @@ -73,6 +73,8 @@ func (c *EVMChain) Write(msg []*message.Message) {
}
}(msg)
}

return nil
}

func (c *EVMChain) DomainID() uint8 {
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/executor/message-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (mh *EVMMessageHandler) RegisterMessageHandler(address string, handler Mess
mh.handlers = make(map[common.Address]MessageHandlerFunc)
}

log.Info().Msgf("Registered message handler for address %s", address)
log.Debug().Msgf("Registered message handler for address %s", address)

mh.handlers[common.HexToAddress(address)] = handler
}
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/listener/deposit-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (e *ETHDepositHandler) RegisterDepositHandler(handlerAddress string, handle
return
}

log.Info().Msgf("Registered deposit handler for address %s", handlerAddress)
log.Debug().Msgf("Registered deposit handler for address %s", handlerAddress)
e.depositHandlers[common.HexToAddress(handlerAddress)] = handler
}

Expand Down
13 changes: 10 additions & 3 deletions chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/chainbridge-core/store"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand All @@ -31,6 +32,8 @@ type EVMListener struct {
blockRetryInterval time.Duration
blockConfirmations *big.Int
blockInterval *big.Int

log zerolog.Logger
}

// NewEVMListener creates an EVMListener that listens to deposit events on chain
Expand All @@ -43,7 +46,9 @@ func NewEVMListener(
blockRetryInterval time.Duration,
blockConfirmations *big.Int,
blockInterval *big.Int) *EVMListener {
logger := log.With().Uint8("domainID", domainID).Logger()
return &EVMListener{
log: logger,
client: client,
eventHandlers: eventHandlers,
blockstore: blockstore,
Expand All @@ -65,7 +70,7 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, m
default:
head, err := l.client.LatestBlock()
if err != nil {
log.Error().Err(err).Msg("Unable to get latest block")
l.log.Error().Err(err).Msg("Unable to get latest block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand All @@ -80,18 +85,20 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, m
continue
}

l.log.Debug().Msgf("Fetching evm events for block range %s-%s", startBlock, endBlock)

for _, handler := range l.eventHandlers {
err := handler.HandleEvent(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)), msgChan)
if err != nil {
log.Error().Err(err).Str("DomainID", string(l.domainID)).Msgf("Unable to handle events")
l.log.Error().Err(err).Msgf("Unable to handle events")
continue
}
}

//Write to block store. Not a critical operation, no need to retry
err = l.blockstore.StoreBlock(endBlock, l.domainID)
if err != nil {
log.Error().Str("block", endBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
l.log.Error().Str("block", endBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
}

startBlock.Add(startBlock, l.blockInterval)
Expand Down
7 changes: 6 additions & 1 deletion example/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ func Run() error {
}
}

meter, err := opentelemetry.DefaultMeter(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
metrics := opentelemetry.NewOpenTelemetry(meter)
r := relayer.NewRelayer(
chains,
&opentelemetry.ConsoleTelemetry{},
metrics,
)

errChn := make(chan error)
Expand Down
4 changes: 3 additions & 1 deletion example/cfg/config_evm-evm_1.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"relayer": {},
"relayer": {
"opentelemetryCollectorURL": "http://otel-collector:4318"
},
"chains": [
{
"id": 1,
Expand Down
4 changes: 3 additions & 1 deletion example/cfg/config_evm-evm_2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"relayer": {},
"relayer": {
"opentelemetryCollectorURL": "http://otel-collector:4318"
},
"chains": [
{
"id": 1,
Expand Down
4 changes: 3 additions & 1 deletion example/cfg/config_evm-evm_3.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"relayer": {},
"relayer": {
"opentelemetryCollectorURL": "http://otel-collector:4318"
},
"chains": [
{
"id": 1,
Expand Down
20 changes: 20 additions & 0 deletions example/cfg/otel-collector-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
receivers:
otlp:
protocols:
grpc:
http:

exporters:
prometheus:
endpoint: 0.0.0.0:8889
namespace: default

extensions:
health_check:

service:
extensions: [health_check]
pipelines:
metrics:
exporters: [prometheus]
receivers: [otlp]
25 changes: 9 additions & 16 deletions example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ services:
container_name: evm1-1
logging:
driver: none
networks:
- evm1
ports:
- "8545:8545"
- "8546:8546"
Expand All @@ -29,8 +27,6 @@ services:
container_name: evm2-1
logging:
driver: none
networks:
- evm2
ports:
- "8547:8545"
- "8548:8546"
Expand All @@ -49,9 +45,6 @@ services:
depends_on:
- evm1-1
- evm2-1
networks:
- evm1
- evm2
restart: "no"

relayer1:
Expand All @@ -63,9 +56,6 @@ services:
depends_on:
- evm1-1
- evm2-1
networks:
- evm1
- evm2
volumes:
- ./cfg:/cfg
restart: always
Expand All @@ -79,9 +69,6 @@ services:
depends_on:
- evm1-1
- evm2-1
networks:
- evm1
- evm2
volumes:
- ./cfg:/cfg
restart: always
Expand All @@ -95,13 +82,19 @@ services:
depends_on:
- evm1-1
- evm2-1
networks:
- evm1
- evm2
volumes:
- ./cfg:/cfg
restart: always

otel-collector:
container_name: otel-collector
image: otel/opentelemetry-collector
command: ["--config=/etc/otel-collector-config.yml"]
volumes:
- ./cfg/otel-collector-config.yml:/etc/otel-collector-config.yml
ports:
- "8889:8889" # Prometheus exporter metrics

networks:
evm2:
evm1:
55 changes: 19 additions & 36 deletions opentelemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package opentelemetry

import (
"context"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
export "go.opentelemetry.io/otel/sdk/export/metric"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/metric/unit"
)

type ChainbridgeMetrics struct {
DepositEventCount metric.Int64Counter
DepositEventCount metric.Int64Counter
ExecutionErrorCount metric.Int64Counter
ExecutionLatency metric.Int64Histogram
ExecutionLatencyPerRoute metric.Int64Histogram
}

// NewChainbridgeMetrics creates an instance of ChainbridgeMetrics
Expand All @@ -23,32 +18,20 @@ func NewChainbridgeMetrics(meter metric.Meter) *ChainbridgeMetrics {
return &ChainbridgeMetrics{
DepositEventCount: metric.Must(meter).NewInt64Counter(
"chainbridge.DepositEventCount",
metric.WithDescription("Number of deposit events across all chains"),
metric.WithDescription("Number of deposit events per domain"),
),
ExecutionErrorCount: metric.Must(meter).NewInt64Counter(
"chainbridge.ExecutionErrorCount",
metric.WithDescription("Number of executions that failed"),
),
ExecutionLatencyPerRoute: metric.Must(meter).NewInt64Histogram(
"chainbridge.ExecutionLatencyPerRoute",
metric.WithDescription("Execution time histogram between indexing event and executing it per route"),
),
ExecutionLatency: metric.Must(meter).NewInt64Histogram(
"chainbridge.ExecutionLatency",
metric.WithDescription("Execution time histogram between indexing event and executing it"),
metric.WithUnit(unit.Milliseconds),
),
}
}

func initOpenTelemetryMetrics(opts ...otlpmetrichttp.Option) (*ChainbridgeMetrics, error) {
ctx := context.Background()

client := otlpmetrichttp.NewClient(opts...)
exp, err := otlpmetric.New(ctx, client)
if err != nil {
return nil, err
}

selector := simple.NewWithInexpensiveDistribution()
proc := processor.NewFactory(selector, export.CumulativeExportKindSelector())
cont := controller.New(proc, controller.WithExporter(exp))
global.SetMeterProvider(cont)

err = cont.Start(ctx)
if err != nil {
return nil, err
}

meter := cont.Meter("chainbridge")
metrics := NewChainbridgeMetrics(meter)

return metrics, nil
}
Loading

0 comments on commit 5f9b10d

Please sign in to comment.