forked from ipfs-cluster/ipfs-cluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclusterhost.go
514 lines (439 loc) · 18.4 KB
/
clusterhost.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
package ipfscluster
import (
"context"
"encoding/hex"
"time"
config "github.com/ipfs-cluster/ipfs-cluster/config"
fd "github.com/ipfs-cluster/ipfs-cluster/internal/fd"
"github.com/ipfs-cluster/ipfs-cluster/observations"
humanize "github.com/dustin/go-humanize"
ipns "github.com/ipfs/boxo/ipns"
ds "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
libp2p "github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
record "github.com/libp2p/go-libp2p-record"
crypto "github.com/libp2p/go-libp2p/core/crypto"
host "github.com/libp2p/go-libp2p/core/host"
metrics "github.com/libp2p/go-libp2p/core/metrics"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
corepnet "github.com/libp2p/go-libp2p/core/pnet"
routing "github.com/libp2p/go-libp2p/core/routing"
autorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay"
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
tcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
websocket "github.com/libp2p/go-libp2p/p2p/transport/websocket"
mafilter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
memory "github.com/pbnjay/memory"
mamask "github.com/whyrusleeping/multiaddr-filter"
)
const dhtNamespace = "dht"
var _ = libp2pquic.NewTransport
func init() {
// Cluster peers should advertise their public IPs as soon as they
// learn about them. Default for this is 4, which prevents clusters
// with less than 4 peers to advertise an external address they know
// of, therefore they cannot be remembered by other peers asap. This
// affects dockerized setups mostly. This may announce non-dialable
// NATed addresses too eagerly, but they should progressively be
// cleaned up.
identify.ActivationThresh = 1
}
// NewClusterHost creates a fully-featured libp2p Host with the options from
// the provided cluster configuration. Using that host, it creates pubsub and
// a DHT instances (persisting to the given datastore), for shared use by all
// cluster components. The returned host uses the DHT for routing. Relay and
// NATService are additionally setup for this host.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
cfg *Config,
ds ds.Datastore,
) (host.Host, metrics.Reporter, *pubsub.PubSub, *dual.DHT, error) {
// Set the default dial timeout for all libp2p connections. It is not
// very good to touch this global variable here, but the alternative
// is to used a modify context everywhere, even if the user supplies
// it.
network.DialPeerTimeout = cfg.DialPeerTimeout
connman, err := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, connmgr.WithGracePeriod(cfg.ConnMgr.GracePeriod))
if err != nil {
return nil, nil, nil, nil, err
}
rmgr, err := makeResourceMgr(cfg.ResourceMgr.Enabled, cfg.ResourceMgr.MemoryLimitBytes, cfg.ResourceMgr.FileDescriptorsLimit, cfg.ConnMgr.HighWater)
if err != nil {
return nil, nil, nil, nil, err
}
var h host.Host
var idht *dual.DHT
// a channel to wait until these variables have been set
// (or left unset on errors). Mostly to avoid reading while writing.
hostAndDHTReady := make(chan struct{})
defer close(hostAndDHTReady)
hostGetter := func() host.Host {
<-hostAndDHTReady // closed when we finish NewClusterHost
return h
}
dhtGetter := func() *dual.DHT {
<-hostAndDHTReady // closed when we finish NewClusterHost
return idht
}
addrsFactory, err := makeAddrsFactory(cfg.AnnounceAddr, cfg.NoAnnounceAddr)
if err != nil {
return nil, nil, nil, nil, err
}
bwc := metrics.NewBandwidthCounter()
opts := []libp2p.Option{
libp2p.ListenAddrs(cfg.ListenAddr...),
libp2p.AddrsFactory(addrsFactory),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
libp2p.ResourceManager(rmgr),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = newDHT(ctx, h, ds)
return idht, err
}),
libp2p.EnableNATService(),
libp2p.EnableRelay(),
libp2p.EnableAutoRelayWithPeerSource(newPeerSource(hostGetter, dhtGetter)),
libp2p.EnableHolePunching(),
libp2p.PrometheusRegisterer(observations.PromRegistry),
libp2p.BandwidthReporter(bwc),
}
if cfg.EnableRelayHop {
opts = append(opts, libp2p.EnableRelayService())
}
h, err = newHost(
ctx,
cfg.Secret,
ident.PrivateKey,
opts...,
)
if err != nil {
return nil, nil, nil, nil, err
}
psub, err := newPubSub(ctx, cfg, h)
if err != nil {
h.Close()
return nil, nil, nil, nil, err
}
return h, bwc, psub, idht, nil
}
// newHost creates a base cluster host without dht, pubsub, relay or nat etc.
// mostly used for testing.
func newHost(ctx context.Context, psk corepnet.PSK, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
finalOpts := []libp2p.Option{
libp2p.Identity(priv),
}
finalOpts = append(finalOpts, baseOpts(psk)...)
finalOpts = append(finalOpts, opts...)
h, err := libp2p.New(
finalOpts...,
)
if err != nil {
return nil, err
}
return h, nil
}
func baseOpts(psk corepnet.PSK) []libp2p.Option {
return []libp2p.Option{
libp2p.PrivateNetwork(psk),
libp2p.EnableNATService(),
libp2p.Security(noise.ID, noise.New),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// TODO: quic does not support private networks
// libp2p.DefaultTransports,
libp2p.NoTransports,
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(websocket.New),
}
}
func newDHT(ctx context.Context, h host.Host, store ds.Datastore, extraopts ...dual.Option) (*dual.DHT, error) {
opts := []dual.Option{
dual.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})),
dual.DHTOption(dht.NamespacedValidator("ipns", ipns.Validator{KeyBook: h.Peerstore()})),
dual.DHTOption(dht.Concurrency(10)),
}
opts = append(opts, extraopts...)
if batchingDs, ok := store.(ds.Batching); ok {
dhtDatastore := namespace.Wrap(batchingDs, ds.NewKey(dhtNamespace))
opts = append(opts, dual.DHTOption(dht.Datastore(dhtDatastore)))
logger.Debug("enabling DHT record persistence to datastore")
}
return dual.New(ctx, h, opts...)
}
// this function reduces the size of the pubsub message IDs to about 24 bytes.
func hashMsgID(m *pubsub_pb.Message) string {
//hash := blake2b.Sum256(m.Data)
id := string(m.GetFrom()[0:16]) + string(m.GetSeqno())
return id
}
func newPubSub(ctx context.Context, cfg *Config, h host.Host) (*pubsub.PubSub, error) {
gossipParams := pubsub.DefaultGossipSubParams()
// Let's configure pubsub in an attempt to support networks with
// thousands of peers which may be bandwidth and processing constrained.
// Hearbeat should be rather slow as some peers just may not manage
// to handle all incoming publications, so we don't have to deal with
// additional IHAVEs too often either.
gossipParams.HeartbeatInterval = cfg.PubSub.HeartbeatInterval
// We increase the gap between History gossip and length to give peers
// more time to grab messages, but expire announcements rather quick
gossipParams.HistoryLength = cfg.PubSub.HistoryLength // x * Hearbeat seconds of availability of messages for IWANTs (def 5)
gossipParams.HistoryGossip = cfg.PubSub.HistoryGossip // x * Heartbeat seconds of announcement of messages via IHAVEs (def 3)
// Longer Hearbeat intervals means more messages queue up. IHAVEs
// lists may have more message at given moment, so increase this
// default.
// Note: it may be better to reduce this to a minimum and reduce
// heartbeat interval, but then, this is a list of IDs so 10000x16byte
// x D is not too much to transfer out is it?
gossipParams.MaxIHaveLength = 10000 // 10000 message IDs. (def 5000)
// For my taste, default mesh parameters are a bit low. We already
// have connections to a bunch of peers given DHT etc and having a
// tiny mesh only makes more replay necessary. Thus allow me to
// increase 4x the defaults.
gossipParams.D = cfg.PubSub.DFactor * 6 // default 6
gossipParams.Dlo = cfg.PubSub.DFactor * 5 // default 5
gossipParams.Dhi = cfg.PubSub.DFactor * 12 // default 12
gossipParams.Dscore = cfg.PubSub.DFactor * 4 // default 4
gossipParams.Dout = cfg.PubSub.DFactor * 2 // default 2
gossipParams.Dlazy = cfg.PubSub.DFactor * 6 // default 6
return pubsub.NewGossipSub(
ctx,
h,
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
pubsub.WithPeerExchange(!cfg.FollowerMode),
// FloodPublish is disabled as every peer publishes and we
// prefer messages to just follow the mesh rather than risking
// that small peers saturate themselves every time they
// publish a metric.
pubsub.WithFloodPublish(cfg.PubSub.FloodPublish),
// Custom hash function reduces size of messages and thus traffic.
pubsub.WithMessageIdFn(hashMsgID),
pubsub.WithPeerOutboundQueueSize(128), // default is 32. Give more leeway to large clusters.
pubsub.WithValidateQueueSize(128), //default also 32
pubsub.WithGossipSubParams(gossipParams),
// Keep a long cache of seen messages. Otherwise, if they
// don't propagate under two minutes, they are re-requested
// and re-broadcasted. 1000 peers * 3 metrics * 1 minute
// interval * 30 minutes = 180000 messages * 16 bytes = 1.3MiB
// of memory needed, tops.
pubsub.WithSeenMessagesTTL(30*time.Minute), // default 120 seconds
// future work
//pubsub.WithDefaultValidator(
// pubsub.NewBasicSeqnoValidator(h.Peerstore())),
)
}
// Inspired in Kubo's
// https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/relay.go#L79-L103
// and https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/routing.go#L242-L317
// but simplified and adapted:
// - Everytime we need peers for relays we do a DHT lookup.
// - We return the peers from that lookup.
// - No need to do it async, since we have to wait for the full lookup to
// return anyways. We put them on a buffered channel and be done.
func newPeerSource(hostGetter func() host.Host, dhtGetter func() *dual.DHT) autorelay.PeerSource {
return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
// make a channel to return, and put items from numPeers on
// that channel up to numPeers. Then close it.
r := make(chan peer.AddrInfo, numPeers)
defer close(r)
// Because the Host, DHT are initialized after relay, we need to
// obtain them indirectly this way.
h := hostGetter()
if h == nil { // context canceled etc.
return r
}
idht := dhtGetter()
if idht == nil { // context canceled etc.
return r
}
// length of closest peers is K.
closestPeers, err := idht.WAN.GetClosestPeers(ctx, h.ID().String())
if err != nil { // Bail out. Usually a "no peers found".
return r
}
logger.Debug("peerSource: %d closestPeers for %d requested", len(closestPeers), numPeers)
for _, p := range closestPeers {
addrs := h.Peerstore().Addrs(p)
if len(addrs) == 0 {
continue
}
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
// Attempt to put peers on r if we have space,
// otherwise return (we reached numPeers)
select {
case r <- dhtPeer:
case <-ctx.Done():
return r
default:
return r
}
}
// We are here if numPeers > closestPeers
return r
}
}
// EncodeProtectorKey converts a byte slice to its hex string representation.
func EncodeProtectorKey(secretBytes []byte) string {
return hex.EncodeToString(secretBytes)
}
func makeAddrsFactory(announce []ma.Multiaddr, noAnnounce []ma.Multiaddr) (p2pbhost.AddrsFactory, error) {
filters := mafilter.NewFilters()
noAnnAddrs := map[string]bool{}
for _, addr := range multiAddrstoStrings(noAnnounce) {
f, err := mamask.NewMask(addr)
if err == nil {
filters.AddFilter(*f, mafilter.ActionDeny)
continue
}
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
noAnnAddrs[string(maddr.Bytes())] = true
}
return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
var addrs []ma.Multiaddr
if len(announce) > 0 {
addrs = announce
} else {
addrs = allAddrs
}
var out []ma.Multiaddr
for _, maddr := range addrs {
// check for exact matches
ok := noAnnAddrs[string(maddr.Bytes())]
// check for /ipcidr matches
if !ok && !filters.AddrBlocked(maddr) {
out = append(out, maddr)
}
}
return out
}, nil
}
// mostly copy/pasted from https://github.com/ipfs/rainbow/blob/main/rcmgr.go
// which is itself copy-pasted from Kubo, because libp2p does not have
// a sane way of doing this.
func makeResourceMgr(enabled bool, maxMemory, maxFD uint64, connMgrHighWater int) (network.ResourceManager, error) {
if !enabled {
rmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
logger.Infof("go-libp2p Resource Manager DISABLED")
return rmgr, err
}
// Auto-scaled limits based on available memory/fds.
if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.25))
if maxMemory < 1<<20 { // 1 GiB
maxMemory = 1 << 20
}
}
if maxFD == 0 {
maxFD = fd.GetNumFDs() / 2
}
infiniteResourceLimits := rcmgr.InfiniteLimits.ToPartialLimitConfig().System
maxMemoryMB := maxMemory / (1024 * 1024)
// At least as of 2023-01-25, it's possible to open a connection that
// doesn't ask for any memory usage with the libp2p Resource Manager/Accountant
// (see https://github.com/libp2p/go-libp2p/issues/2010#issuecomment-1404280736).
// As a result, we can't currently rely on Memory limits to full protect us.
// Until https://github.com/libp2p/go-libp2p/issues/2010 is addressed,
// we take a proxy now of restricting to 1 inbound connection per MB.
// Note: this is more generous than go-libp2p's default autoscaled limits which do
// 64 connections per 1GB
// (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ).
systemConnsInbound := int(1 * maxMemoryMB)
partialLimits := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory),
FD: rcmgr.LimitVal(maxFD),
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound),
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
},
// Transient connections won't cause any memory to be accounted for by the resource manager/accountant.
// Only established connections do.
// As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened.
// We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope.
Transient: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory / 4),
FD: rcmgr.LimitVal(maxFD / 4),
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4),
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
},
// Lets get out of the way of the allow list functionality.
// If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through.
AllowlistedSystem: infiniteResourceLimits,
AllowlistedTransient: infiniteResourceLimits,
// Keep it simple by not having Service, ServicePeer, Protocol, ProtocolPeer, Conn, or Stream limits.
ServiceDefault: infiniteResourceLimits,
ServicePeerDefault: infiniteResourceLimits,
ProtocolDefault: infiniteResourceLimits,
ProtocolPeerDefault: infiniteResourceLimits,
Conn: infiniteResourceLimits,
Stream: infiniteResourceLimits,
// Limit the resources consumed by a peer.
// This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers.
// We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally).
// In that case we want to keep that peer's resource consumption contained.
// To keep this simple, we only constrain inbound connections and streams.
PeerDefault: rcmgr.ResourceLimits{
Memory: rcmgr.Unlimited64,
FD: rcmgr.Unlimited,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.DefaultLimit,
StreamsOutbound: rcmgr.Unlimited,
},
}
scalingLimitConfig := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimitConfig)
// Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden.
// Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits).
partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), int(maxFD))).ToPartialLimitConfig()
// Simple checks to override autoscaling ensuring limits make sense versus the connmgr values.
// There are ways to break this, but this should catch most problems already.
// We might improve this in the future.
// See: https://github.com/ipfs/kubo/issues/9545
if partialLimits.System.ConnsInbound > rcmgr.DefaultLimit {
maxInboundConns := int(partialLimits.System.ConnsInbound)
if connmgrHighWaterTimesTwo := connMgrHighWater * 2; maxInboundConns < connmgrHighWaterTimesTwo {
maxInboundConns = connmgrHighWaterTimesTwo
}
if maxInboundConns < 800 {
maxInboundConns = 800
}
// Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound
if partialLimits.System.StreamsInbound > rcmgr.DefaultLimit {
partialLimits.System.StreamsInbound = rcmgr.LimitVal(int64(maxInboundConns) * int64(partialLimits.System.StreamsInbound) / int64(partialLimits.System.ConnsInbound))
}
partialLimits.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns)
}
logger.Infof("go-libp2p Resource Manager ENABLED: Limits based on max_memory: %s, max_fds: %d", humanize.Bytes(maxMemory), maxFD)
// We already have a complete value thus pass in an empty ConcreteLimitConfig.
limitCfg := partialLimits.Build(rcmgr.ConcreteLimitConfig{})
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(limitCfg))
if err != nil {
return nil, err
}
return mgr, nil
}