Skip to content

Commit

Permalink
Add chain exchange and partial message metrics (#852)
Browse files Browse the repository at this point in the history
Add metrics that measure various aspects of chain exchange and partial
messages, including:
 * The number of messages pending partial messages.
 * The total number instances with a partial message.
 * The chain exchange broadcast count.
 * The length of broadcast chain.
 * The time spent on chain exchange validation.
 * Qualify existing GPBFT validation metrics with partial message
   attribute.
 * The number of duplicate partial messages qualified with equivocation
 attribute.

Fixes: #812
  • Loading branch information
masih authored Jan 29, 2025
1 parent 1e488a2 commit 81059b1
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 38 deletions.
44 changes: 44 additions & 0 deletions chainexchange/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package chainexchange

import (
"github.com/filecoin-project/go-f3/internal/measurements"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
meter = otel.Meter("f3/chainexchange")

attrKindWanted = attribute.String("kind", "wanted")
attrKindDiscovered = attribute.String("kind", "discovered")

metrics = struct {
chains metric.Int64Counter
broadcasts metric.Int64Counter
broadcastChainLen metric.Int64Gauge
notifications metric.Int64Counter
instances metric.Int64UpDownCounter
validatedMessages metric.Int64Counter
validationTime metric.Float64Histogram
}{
chains: measurements.Must(meter.Int64Counter("f3_chainexchange_chains", metric.WithDescription("Number of chains engaged in chainexhange by status."))),
broadcasts: measurements.Must(meter.Int64Counter("f3_chainexchange_broadcasts", metric.WithDescription("Number of chains broadcasts made by chainexchange."))),
broadcastChainLen: measurements.Must(meter.Int64Gauge("f3_chainexchange_broadcast_chain_length", metric.WithDescription("The latest length of broadcasted chain."))),
notifications: measurements.Must(meter.Int64Counter("f3_chainexchange_notifications", metric.WithDescription("Number of chain discovery notified by chainexchange."))),
instances: measurements.Must(meter.Int64UpDownCounter("f3_chainexchange_instances", metric.WithDescription("Number of instances engaged in chainexchage."))),
validatedMessages: measurements.Must(meter.Int64Counter("f3_chainexchange_validated_messages", metric.WithDescription("Number of pubsub messages validated tagged by result."))),
validationTime: measurements.Must(meter.Float64Histogram("f3_chainexchange_validation_time",
metric.WithDescription("Histogram of time spent validating chainexchange messages in seconds."),
metric.WithExplicitBucketBoundaries(0.001, 0.002, 0.003, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 10.0),
metric.WithUnit("s"),
)),
}
)

func attrFromWantedDiscovered(wanted, discovered bool) attribute.Set {
return attribute.NewSet(
attribute.Bool("wanted", wanted),
attribute.Bool("discovered", discovered),
)
}
60 changes: 49 additions & 11 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (

"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/encoding"
"github.com/filecoin-project/go-f3/internal/measurements"
"github.com/filecoin-project/go-f3/internal/psutil"
lru "github.com/hashicorp/golang-lru/v2"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/metric"
)

var (
Expand Down Expand Up @@ -134,21 +136,24 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u

// Check wanted keys first.

wanted := p.getChainsWantedAt(instance)
wanted := p.getChainsWantedAt(ctx, instance)
if portion, found := wanted.Get(key); found && !portion.IsPlaceholder() {
return portion.chain, true
}

// Check if the chain for the key is discovered.
discovered := p.getChainsDiscoveredAt(instance)
discovered := p.getChainsDiscoveredAt(ctx, instance)
if portion, found := discovered.Get(key); found {
// Add it to the wanted cache and remove it from the discovered cache.
wanted.Add(key, portion)
metrics.chains.Add(ctx, 1, metric.WithAttributeSet(
attrFromWantedDiscovered(true, true)))
discovered.Remove(key)

chain := portion.chain
if p.listener != nil {
p.listener.NotifyChainDiscovered(ctx, instance, chain)
metrics.notifications.Add(ctx, 1)
}
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
return chain, true
Expand All @@ -157,27 +162,31 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
// Otherwise, add a placeholder for the wanted key as a way to prioritise its
// retention via LRU recent-ness.
wanted.ContainsOrAdd(key, chainPortionPlaceHolder)
metrics.chains.Add(ctx, 1, metric.WithAttributeSet(
attrFromWantedDiscovered(true, false)))
return nil, false
}

func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[gpbft.ECChainKey, *chainPortion] {
func (p *PubSubChainExchange) getChainsWantedAt(ctx context.Context, instance uint64) *lru.Cache[gpbft.ECChainKey, *chainPortion] {
p.mu.Lock()
defer p.mu.Unlock()
wanted, exists := p.chainsWanted[instance]
if !exists {
wanted = p.newChainPortionCache(p.maxWantedChainsPerInstance)
p.chainsWanted[instance] = wanted
metrics.instances.Add(ctx, +1, metric.WithAttributes(attrKindWanted))
}
return wanted
}

func (p *PubSubChainExchange) getChainsDiscoveredAt(instance uint64) *lru.Cache[gpbft.ECChainKey, *chainPortion] {
func (p *PubSubChainExchange) getChainsDiscoveredAt(ctx context.Context, instance uint64) *lru.Cache[gpbft.ECChainKey, *chainPortion] {
p.mu.Lock()
defer p.mu.Unlock()
discovered, exists := p.chainsDiscovered[instance]
if !exists {
discovered = p.newChainPortionCache(p.maxDiscoveredChainsPerInstance)
p.chainsDiscovered[instance] = discovered
metrics.instances.Add(ctx, +1, metric.WithAttributes(attrKindDiscovered))
}
return discovered
}
Expand All @@ -193,7 +202,13 @@ func (p *PubSubChainExchange) newChainPortionCache(capacity int) *lru.Cache[gpbf
return cache
}

func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
func (p *PubSubChainExchange) validatePubSubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) {
defer func(start time.Time) {
attr := measurements.AttrFromPubSubValidationResult(_result)
metrics.validatedMessages.Add(ctx, 1, metric.WithAttributes(attr))
metrics.validationTime.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes(attr))
}(time.Now())

var cmsg Message
if err := p.encoding.Decode(msg.Data, &cmsg); err != nil {
log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err)
Expand Down Expand Up @@ -230,8 +245,8 @@ func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID

func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg Message) {

wanted := p.getChainsDiscoveredAt(cmsg.Instance)
discovered := p.getChainsDiscoveredAt(cmsg.Instance)
wanted := p.getChainsDiscoveredAt(ctx, cmsg.Instance)
discovered := p.getChainsDiscoveredAt(ctx, cmsg.Instance)

for offset := cmsg.Chain.Len(); offset >= 0 && ctx.Err() == nil; offset-- {
// TODO: Expose internals of merkle.go so that keys can be generated
Expand All @@ -241,15 +256,21 @@ func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg M
if portion, found := wanted.Peek(key); !found {
// Not a wanted key; add it to discovered chains if they are not there already,
// i.e. without modifying the recent-ness of any of the discovered values.
discovered.ContainsOrAdd(key, &chainPortion{
existed, _ := discovered.ContainsOrAdd(key, &chainPortion{
chain: prefix,
})
if !existed {
metrics.chains.Add(ctx, 1, metric.WithAttributeSet(
attrFromWantedDiscovered(false, true)))
}
} else if portion.IsPlaceholder() {
// It is a wanted key with a placeholder; replace the placeholder with the actual
// discovery.
wanted.Add(key, &chainPortion{
chain: prefix,
})
metrics.chains.Add(ctx, 1, metric.WithAttributeSet(
attrFromWantedDiscovered(true, true)))
}
// Nothing to do; the discovered value is already in the wanted chains with
// discovered value.
Expand All @@ -260,7 +281,11 @@ func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg M
}
}

func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error {
func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) (_err error) {
defer func() {
metrics.broadcasts.Add(ctx, 1, metric.WithAttributes(measurements.Status(ctx, _err)))
metrics.broadcastChainLen.Record(ctx, int64(msg.Chain.Len()))
}()

// Optimistically cache the broadcast chain and all of its prefixes as wanted.
select {
Expand Down Expand Up @@ -288,7 +313,7 @@ type discovery struct {

func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
var notifications []discovery
wanted := p.getChainsWantedAt(cmsg.Instance)
wanted := p.getChainsWantedAt(ctx, cmsg.Instance)
for offset := cmsg.Chain.Len(); offset >= 0 && ctx.Err() == nil; offset-- {
// TODO: Expose internals of merkle.go so that keys can be generated
// cumulatively for a more efficient prefix chain key generation.
Expand All @@ -298,6 +323,12 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
wanted.Add(key, &chainPortion{
chain: prefix,
})

if portion.IsPlaceholder() {
metrics.chains.Add(ctx, 1, metric.WithAttributeSet(
attrFromWantedDiscovered(true, true)))
}

if p.listener != nil {
notifications = append(notifications, discovery{
instance: cmsg.Instance,
Expand All @@ -315,22 +346,29 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
for _, notification := range notifications {
p.listener.NotifyChainDiscovered(ctx, notification.instance, notification.chain)
}
metrics.notifications.Add(ctx, int64(len(notifications)))
}
}

func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error {
func (p *PubSubChainExchange) RemoveChainsByInstance(ctx context.Context, instance uint64) error {
p.mu.Lock()
defer p.mu.Unlock()
for i := range p.chainsWanted {
if i < instance {
delete(p.chainsWanted, i)
metrics.instances.Add(ctx, -1, metric.WithAttributes(attrKindWanted))
}
}
for i := range p.chainsDiscovered {
if i < instance {
delete(p.chainsDiscovered, i)
metrics.instances.Add(ctx, -1, metric.WithAttributes(attrKindDiscovered))
}
}
// TODO: Do we want to precisely count the number of "wanted but not discovered
// chains" per instance? If so, then the LRU caches per instance need to be
// wrapped to keep a count of placeholders, etc. For now, we can approximate
// this by the number of partial messages never fulfilled.
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion host.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,9 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage

func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) {
var partiallyValidated bool
defer func(start time.Time) {
recordValidationTime(ctx, start, _result)
recordValidationTime(ctx, start, _result, partiallyValidated)
}(time.Now())

var pgmsg PartialGMessage
Expand All @@ -555,6 +556,7 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
if result == pubsub.ValidationAccept {
msg.ValidatorData = partiallyValidatedMessage
}
partiallyValidated = true
return result
}

Expand Down
16 changes: 16 additions & 0 deletions internal/measurements/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/ipfs/go-datastore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -37,3 +38,18 @@ func Status(ctx context.Context, err error) attribute.KeyValue {
return AttrStatusError
}
}

func AttrFromPubSubValidationResult(result pubsub.ValidationResult) attribute.KeyValue {
var v string
switch result {
case pubsub.ValidationAccept:
v = "accepted"
case pubsub.ValidationReject:
v = "rejected"
case pubsub.ValidationIgnore:
v = "ignored"
default:
v = "unknown"
}
return attribute.String("result", v)
}
40 changes: 20 additions & 20 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ var samples = struct {
}

var metrics = struct {
headDiverged metric.Int64Counter
reconfigured metric.Int64Counter
manifestsReceived metric.Int64Counter
validationTime metric.Float64Histogram
proposalFetchTime metric.Float64Histogram
committeeFetchTime metric.Float64Histogram
validatedMessages metric.Int64Counter
headDiverged metric.Int64Counter
reconfigured metric.Int64Counter
manifestsReceived metric.Int64Counter
validationTime metric.Float64Histogram
proposalFetchTime metric.Float64Histogram
committeeFetchTime metric.Float64Histogram
validatedMessages metric.Int64Counter
partialMessages metric.Int64UpDownCounter
partialMessageDuplicates metric.Int64Counter
partialMessageInstances metric.Int64UpDownCounter
}{
headDiverged: measurements.Must(meter.Int64Counter("f3_head_diverged", metric.WithDescription("Number of times we encountered the head has diverged from base scenario."))),
reconfigured: measurements.Must(meter.Int64Counter("f3_reconfigured", metric.WithDescription("Number of times we reconfigured due to new manifest being delivered."))),
Expand All @@ -65,6 +68,12 @@ var metrics = struct {
)),
validatedMessages: measurements.Must(meter.Int64Counter("f3_validated_messages",
metric.WithDescription("Number of validated GPBFT messages."))),
partialMessages: measurements.Must(meter.Int64UpDownCounter("f3_partial_messages",
metric.WithDescription("Number of partial GPBFT messages pending fulfilment."))),
partialMessageDuplicates: measurements.Must(meter.Int64Counter("f3_partial_message_duplicates",
metric.WithDescription("Number of partial GPBFT messages recieved that already have an unfulfilled message for the same instance, sender, round and phase."))),
partialMessageInstances: measurements.Must(meter.Int64UpDownCounter("f3_partial_message_instances",
metric.WithDescription("Number of instances with partial GPBFT messages pending fulfilment."))),
}

func recordValidatedMessage(ctx context.Context, msg gpbft.ValidatedMessage) {
Expand All @@ -88,22 +97,13 @@ func recordValidatedMessage(ctx context.Context, msg gpbft.ValidatedMessage) {
metrics.validatedMessages.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func recordValidationTime(ctx context.Context, start time.Time, result pubsub.ValidationResult) {
var v string
switch result {
case pubsub.ValidationAccept:
v = "accepted"
case pubsub.ValidationReject:
v = "rejected"
case pubsub.ValidationIgnore:
v = "ignored"
default:
v = "unknown"
}
func recordValidationTime(ctx context.Context, start time.Time, result pubsub.ValidationResult, partiallyValidated bool) {
metrics.validationTime.Record(
ctx,
time.Since(start).Seconds(),
metric.WithAttributes(attribute.KeyValue{Key: "result", Value: attribute.StringValue(v)}))
metric.WithAttributes(
measurements.AttrFromPubSubValidationResult(result),
attribute.Bool("partially_validated", partiallyValidated)))
}

// attrStatusFromErr returns an attribute with key "status" and value set to "success" if
Expand Down
Loading

0 comments on commit 81059b1

Please sign in to comment.