2302 lines
61 KiB
Go
2302 lines
61 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"mime/multipart"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/go-systemd/v22/daemon"
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder"
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder/sharding"
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder/single"
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
"github.com/ipfs-cluster/ipfs-cluster/pstoremgr"
|
|
"github.com/ipfs-cluster/ipfs-cluster/rpcutil"
|
|
"github.com/ipfs-cluster/ipfs-cluster/state"
|
|
"github.com/ipfs-cluster/ipfs-cluster/version"
|
|
"go.uber.org/multierr"
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
host "github.com/libp2p/go-libp2p-core/host"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
|
|
mdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
|
|
trace "go.opencensus.io/trace"
|
|
)
|
|
|
|
// ReadyTimeout specifies the time before giving up
|
|
// during startup (waiting for consensus to be ready)
|
|
// It may need adjustment according to timeouts in the
|
|
// consensus layer.
|
|
var ReadyTimeout = 30 * time.Second
|
|
|
|
const (
|
|
pingMetricName = "ping"
|
|
bootstrapCount = 3
|
|
reBootstrapInterval = 30 * time.Second
|
|
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
|
|
maxAlerts = 1000
|
|
)
|
|
|
|
var errFollowerMode = errors.New("this peer is configured to be in follower mode. Write operations are disabled")
|
|
|
|
// Cluster is the main IPFS cluster component. It provides
|
|
// the go-API for it and orchestrates the components that make up the system.
|
|
type Cluster struct {
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
id peer.ID
|
|
config *Config
|
|
host host.Host
|
|
dht *dual.DHT
|
|
discovery mdns.Service
|
|
datastore ds.Datastore
|
|
|
|
rpcServer *rpc.Server
|
|
rpcClient *rpc.Client
|
|
peerManager *pstoremgr.Manager
|
|
|
|
consensus Consensus
|
|
apis []API
|
|
ipfs IPFSConnector
|
|
tracker PinTracker
|
|
monitor PeerMonitor
|
|
allocator PinAllocator
|
|
informers []Informer
|
|
tracer Tracer
|
|
|
|
alerts []api.Alert
|
|
alertsMux sync.Mutex
|
|
|
|
doneCh chan struct{}
|
|
readyCh chan struct{}
|
|
readyB bool
|
|
wg sync.WaitGroup
|
|
|
|
// peerAdd
|
|
paMux sync.Mutex
|
|
|
|
// shutdown function and related variables
|
|
shutdownLock sync.Mutex
|
|
shutdownB bool
|
|
removed bool
|
|
|
|
curPingVal pingValue
|
|
}
|
|
|
|
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
|
|
// creates and RPC Server and client and sets up all components.
|
|
//
|
|
// The new cluster peer may still be performing initialization tasks when
|
|
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
|
|
// if you need to wait until the peer is fully up.
|
|
func NewCluster(
|
|
ctx context.Context,
|
|
host host.Host,
|
|
dht *dual.DHT,
|
|
cfg *Config,
|
|
datastore ds.Datastore,
|
|
consensus Consensus,
|
|
apis []API,
|
|
ipfs IPFSConnector,
|
|
tracker PinTracker,
|
|
monitor PeerMonitor,
|
|
allocator PinAllocator,
|
|
informers []Informer,
|
|
tracer Tracer,
|
|
) (*Cluster, error) {
|
|
err := cfg.Validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if host == nil {
|
|
return nil, errors.New("cluster host is nil")
|
|
}
|
|
|
|
if len(informers) == 0 {
|
|
return nil, errors.New("no informers are passed")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
listenAddrs := ""
|
|
for _, addr := range host.Addrs() {
|
|
listenAddrs += fmt.Sprintf(" %s/p2p/%s\n", addr, host.ID().Pretty())
|
|
}
|
|
|
|
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)
|
|
|
|
peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())
|
|
|
|
var mdnsSvc mdns.Service
|
|
if cfg.MDNSInterval > 0 {
|
|
mdnsSvc = mdns.NewMdnsService(host, mdnsServiceTag, peerManager)
|
|
err = mdnsSvc.Start()
|
|
if err != nil {
|
|
logger.Warnf("mDNS could not be started: %s", err)
|
|
}
|
|
}
|
|
|
|
c := &Cluster{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
id: host.ID(),
|
|
config: cfg,
|
|
host: host,
|
|
dht: dht,
|
|
discovery: mdnsSvc,
|
|
datastore: datastore,
|
|
consensus: consensus,
|
|
apis: apis,
|
|
ipfs: ipfs,
|
|
tracker: tracker,
|
|
monitor: monitor,
|
|
allocator: allocator,
|
|
informers: informers,
|
|
tracer: tracer,
|
|
alerts: []api.Alert{},
|
|
peerManager: peerManager,
|
|
shutdownB: false,
|
|
removed: false,
|
|
doneCh: make(chan struct{}),
|
|
readyCh: make(chan struct{}),
|
|
readyB: false,
|
|
}
|
|
|
|
// Import known cluster peers from peerstore file and config. Set
|
|
// a non permanent TTL.
|
|
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
|
|
c.peerManager.ImportPeers(c.config.PeerAddresses, false, peerstore.AddressTTL)
|
|
// Attempt to connect to some peers (up to bootstrapCount)
|
|
connectedPeers := c.peerManager.Bootstrap(bootstrapCount)
|
|
// We cannot warn when count is low as this as this is normal if going
|
|
// to Join() later.
|
|
logger.Debugf("bootstrap count %d", len(connectedPeers))
|
|
// Log a ping metric for every connected peer. This will make them
|
|
// visible as peers without having to wait for them to send one.
|
|
for _, p := range connectedPeers {
|
|
if err := c.logPingMetric(ctx, p); err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
}
|
|
|
|
// After setupRPC components can do their tasks with a fully operative
|
|
// routed libp2p host with some connections and a working DHT (hopefully).
|
|
err = c.setupRPC()
|
|
if err != nil {
|
|
c.Shutdown(ctx)
|
|
return nil, err
|
|
}
|
|
c.setupRPCClients()
|
|
|
|
// Note: It is very important to first call Add() once in a non-racy
|
|
// place
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.ready(ReadyTimeout)
|
|
c.run()
|
|
}()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *Cluster) setupRPC() error {
|
|
rpcServer, err := newRPCServer(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.rpcServer = rpcServer
|
|
|
|
var rpcClient *rpc.Client
|
|
if c.config.Tracing {
|
|
csh := &ocgorpc.ClientHandler{}
|
|
rpcClient = rpc.NewClientWithServer(
|
|
c.host,
|
|
version.RPCProtocol,
|
|
rpcServer,
|
|
rpc.WithClientStatsHandler(csh),
|
|
)
|
|
} else {
|
|
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
|
|
}
|
|
c.rpcClient = rpcClient
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) setupRPCClients() {
|
|
c.ipfs.SetClient(c.rpcClient)
|
|
c.tracker.SetClient(c.rpcClient)
|
|
for _, api := range c.apis {
|
|
api.SetClient(c.rpcClient)
|
|
}
|
|
c.consensus.SetClient(c.rpcClient)
|
|
c.monitor.SetClient(c.rpcClient)
|
|
c.allocator.SetClient(c.rpcClient)
|
|
for _, informer := range c.informers {
|
|
informer.SetClient(c.rpcClient)
|
|
}
|
|
}
|
|
|
|
// watchPinset triggers recurrent operations that loop on the pinset.
|
|
func (c *Cluster) watchPinset() {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset")
|
|
defer span.End()
|
|
|
|
stateSyncTimer := time.NewTimer(c.config.StateSyncInterval)
|
|
|
|
// Upon start, every item in the state that is not pinned will appear
|
|
// as PinError when doing a Status, we should proceed to recover
|
|
// (try pinning) all of those right away.
|
|
recoverTimer := time.NewTimer(0) // 0 so that it does an initial recover right away
|
|
|
|
// This prevents doing an StateSync while doing a RecoverAllLocal,
|
|
// which is intended behavior as for very large pinsets
|
|
for {
|
|
select {
|
|
case <-stateSyncTimer.C:
|
|
logger.Debug("auto-triggering StateSync()")
|
|
c.StateSync(ctx)
|
|
stateSyncTimer.Reset(c.config.StateSyncInterval)
|
|
case <-recoverTimer.C:
|
|
logger.Debug("auto-triggering RecoverAllLocal()")
|
|
|
|
out := make(chan api.PinInfo, 1024)
|
|
go func() {
|
|
for range out {
|
|
}
|
|
}()
|
|
err := c.RecoverAllLocal(ctx, out)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
recoverTimer.Reset(c.config.PinRecoverInterval)
|
|
case <-c.ctx.Done():
|
|
if !stateSyncTimer.Stop() {
|
|
<-stateSyncTimer.C
|
|
}
|
|
if !recoverTimer.Stop() {
|
|
<-recoverTimer.C
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// returns the smallest ttl from the metrics pushed by the informer.
|
|
func (c *Cluster) sendInformerMetrics(ctx context.Context, informer Informer) (time.Duration, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
|
|
defer span.End()
|
|
|
|
var minTTL time.Duration
|
|
var errors error
|
|
metrics := informer.GetMetrics(ctx)
|
|
if len(metrics) == 0 {
|
|
logger.Errorf("informer %s produced no metrics", informer.Name())
|
|
return minTTL, nil
|
|
}
|
|
|
|
for _, metric := range metrics {
|
|
if metric.Discard() { // do not publish invalid metrics
|
|
// the tags informer creates an invalid metric
|
|
// when no tags are defined.
|
|
continue
|
|
}
|
|
metric.Peer = c.id
|
|
ttl := metric.GetTTL()
|
|
if ttl > 0 && (ttl < minTTL || minTTL == 0) {
|
|
minTTL = ttl
|
|
}
|
|
err := c.monitor.PublishMetric(ctx, metric)
|
|
|
|
if multierr.AppendInto(&errors, err) {
|
|
logger.Warnf("error sending metric %s: %s", metric.Name, err)
|
|
}
|
|
}
|
|
return minTTL, errors
|
|
}
|
|
|
|
func (c *Cluster) sendInformersMetrics(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics")
|
|
defer span.End()
|
|
|
|
var errors error
|
|
for _, informer := range c.informers {
|
|
_, err := c.sendInformerMetrics(ctx, informer)
|
|
if multierr.AppendInto(&errors, err) {
|
|
logger.Warnf("informer %s did not send all metrics", informer.Name())
|
|
}
|
|
}
|
|
return errors
|
|
}
|
|
|
|
// pushInformerMetrics loops and publishes informers metrics using the
|
|
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
|
|
// occurs, they are pushed at a TTL/4 rate.
|
|
func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics")
|
|
defer span.End()
|
|
|
|
timer := time.NewTimer(0) // fire immediately first
|
|
|
|
// retries counts how many retries we have made
|
|
retries := 0
|
|
// retryWarnMod controls how often do we log
|
|
// "error broadcasting metric".
|
|
// It will do it in the first error, and then on every
|
|
// 10th.
|
|
retryWarnMod := 10
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
// wait
|
|
}
|
|
|
|
minTTL, err := c.sendInformerMetrics(ctx, informer)
|
|
if minTTL == 0 {
|
|
minTTL = 30 * time.Second
|
|
}
|
|
if err != nil {
|
|
if (retries % retryWarnMod) == 0 {
|
|
logger.Errorf("error broadcasting metric: %s", err)
|
|
retries++
|
|
}
|
|
// retry sooner
|
|
timer.Reset(minTTL / 4)
|
|
continue
|
|
}
|
|
|
|
retries = 0
|
|
// send metric again in TTL/2
|
|
timer.Reset(minTTL / 2)
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) sendPingMetric(ctx context.Context) (api.Metric, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric")
|
|
defer span.End()
|
|
|
|
id := c.ID(ctx)
|
|
newPingVal := pingValue{
|
|
Peername: id.Peername,
|
|
IPFSID: id.IPFS.ID,
|
|
IPFSAddresses: publicIPFSAddresses(id.IPFS.Addresses),
|
|
}
|
|
if c.curPingVal.Valid() &&
|
|
!newPingVal.Valid() { // i.e. ipfs down
|
|
newPingVal = c.curPingVal // use last good value
|
|
}
|
|
c.curPingVal = newPingVal
|
|
|
|
v, err := json.Marshal(newPingVal)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
// continue anyways
|
|
}
|
|
|
|
metric := api.Metric{
|
|
Name: pingMetricName,
|
|
Peer: c.id,
|
|
Valid: true,
|
|
Value: string(v),
|
|
}
|
|
metric.SetTTL(c.config.MonitorPingInterval * 2)
|
|
return metric, c.monitor.PublishMetric(ctx, metric)
|
|
}
|
|
|
|
// logPingMetric logs a ping metric as if it had been sent from PID. It is
|
|
// used to make peers appear available as soon as we connect to them (without
|
|
// having to wait for them to broadcast a metric).
|
|
//
|
|
// We avoid specifically sending a metric to a peer when we "connect" to it
|
|
// because: a) this requires an extra. OPEN RPC endpoint (LogMetric) that can
|
|
// be called by everyone b) We have no way of verifying that the peer ID in a
|
|
// metric pushed is actually the issuer of the metric (something the regular
|
|
// "pubsub" way of pushing metrics allows (by verifying the signature on the
|
|
// message). Thus, this reduces chances of abuse until we have something
|
|
// better.
|
|
func (c *Cluster) logPingMetric(ctx context.Context, pid peer.ID) error {
|
|
m := api.Metric{
|
|
Name: pingMetricName,
|
|
Peer: pid,
|
|
Valid: true,
|
|
}
|
|
m.SetTTL(c.config.MonitorPingInterval * 2)
|
|
return c.monitor.LogMetric(ctx, m)
|
|
}
|
|
|
|
func (c *Cluster) pushPingMetrics(ctx context.Context) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics")
|
|
defer span.End()
|
|
|
|
ticker := time.NewTicker(c.config.MonitorPingInterval)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
c.sendPingMetric(ctx)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Alerts returns the last alerts recorded by this cluster peer with the most
|
|
// recent first.
|
|
func (c *Cluster) Alerts() []api.Alert {
|
|
c.alertsMux.Lock()
|
|
alerts := make([]api.Alert, len(c.alerts))
|
|
{
|
|
total := len(alerts)
|
|
for i, a := range c.alerts {
|
|
alerts[total-1-i] = a
|
|
}
|
|
}
|
|
c.alertsMux.Unlock()
|
|
|
|
return alerts
|
|
}
|
|
|
|
// read the alerts channel from the monitor and triggers repins
|
|
func (c *Cluster) alertsHandler() {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case alrt := <-c.monitor.Alerts():
|
|
// Follower peers do not care about alerts.
|
|
// They can do nothing about them.
|
|
if c.config.FollowerMode {
|
|
continue
|
|
}
|
|
|
|
logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer)
|
|
c.alertsMux.Lock()
|
|
{
|
|
if len(c.alerts) > maxAlerts {
|
|
c.alerts = c.alerts[:0]
|
|
}
|
|
|
|
c.alerts = append(c.alerts, alrt)
|
|
}
|
|
c.alertsMux.Unlock()
|
|
|
|
if alrt.Name != pingMetricName {
|
|
continue // only handle ping alerts
|
|
}
|
|
|
|
if c.config.DisableRepinning {
|
|
logger.Debugf("repinning is disabled. Will not re-allocate pins on alerts")
|
|
return
|
|
}
|
|
|
|
cState, err := c.consensus.State(c.ctx)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
return
|
|
}
|
|
|
|
distance, err := c.distances(c.ctx, alrt.Peer)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
return
|
|
}
|
|
|
|
pinCh := make(chan api.Pin, 1024)
|
|
go func() {
|
|
err = cState.List(c.ctx, pinCh)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
}()
|
|
|
|
for pin := range pinCh {
|
|
if containsPeer(pin.Allocations, alrt.Peer) && distance.isClosest(pin.Cid) {
|
|
c.repinFromPeer(c.ctx, alrt.Peer, pin)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// detects any changes in the peerset and saves the configuration. When it
|
|
// detects that we have been removed from the peerset, it shuts down this peer.
|
|
func (c *Cluster) watchPeers() {
|
|
ticker := time.NewTicker(c.config.PeerWatchInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
//logger.Debugf("%s watching peers", c.id)
|
|
hasMe := false
|
|
peers, err := c.consensus.Peers(c.ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
continue
|
|
}
|
|
for _, p := range peers {
|
|
if p == c.id {
|
|
hasMe = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasMe {
|
|
c.shutdownLock.Lock()
|
|
defer c.shutdownLock.Unlock()
|
|
logger.Info("peer no longer in peerset. Initiating shutdown")
|
|
c.removed = true
|
|
go c.Shutdown(c.ctx)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reBootstrap regularly attempts to bootstrap (re-connect to peers from the
|
|
// peerstore). This should ensure that we auto-recover from situations in
|
|
// which the network was completely gone and we lost all peers.
|
|
func (c *Cluster) reBootstrap() {
|
|
ticker := time.NewTicker(reBootstrapInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
connected := c.peerManager.Bootstrap(bootstrapCount)
|
|
for _, p := range connected {
|
|
logger.Infof("reconnected to %s", p)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// find all Cids pinned to a given peer and triggers re-pins on them.
|
|
func (c *Cluster) vacatePeer(ctx context.Context, p peer.ID) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/vacatePeer")
|
|
defer span.End()
|
|
|
|
if c.config.DisableRepinning {
|
|
logger.Warnf("repinning is disabled. Will not re-allocate cids from %s", p.Pretty())
|
|
return
|
|
}
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
return
|
|
}
|
|
|
|
pinCh := make(chan api.Pin, 1024)
|
|
go func() {
|
|
err = cState.List(ctx, pinCh)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
}()
|
|
|
|
for pin := range pinCh {
|
|
if containsPeer(pin.Allocations, p) {
|
|
c.repinFromPeer(ctx, p, pin)
|
|
}
|
|
}
|
|
}
|
|
|
|
// repinFromPeer triggers a repin on a given pin object blacklisting one of the
|
|
// allocations.
|
|
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin api.Pin) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
|
defer span.End()
|
|
|
|
pin.Allocations = nil // force re-allocations
|
|
// note that pin() should not result in different allocations
|
|
// if we are not under the replication-factor min.
|
|
_, ok, err := c.pin(ctx, pin, []peer.ID{p})
|
|
if ok && err == nil {
|
|
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
|
|
}
|
|
}
|
|
|
|
// run launches some go-routines which live throughout the cluster's life
|
|
func (c *Cluster) run() {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.watchPinset()
|
|
}()
|
|
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.pushPingMetrics(c.ctx)
|
|
}()
|
|
|
|
c.wg.Add(len(c.informers))
|
|
for _, informer := range c.informers {
|
|
go func(inf Informer) {
|
|
defer c.wg.Done()
|
|
c.pushInformerMetrics(c.ctx, inf)
|
|
}(informer)
|
|
}
|
|
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.watchPeers()
|
|
}()
|
|
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.alertsHandler()
|
|
}()
|
|
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.reBootstrap()
|
|
}()
|
|
}
|
|
|
|
func (c *Cluster) ready(timeout time.Duration) {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/ready")
|
|
defer span.End()
|
|
|
|
// We bootstrapped first because with dirty state consensus
|
|
// may have a peerset and not find a leader so we cannot wait
|
|
// for it.
|
|
timer := time.NewTimer(timeout)
|
|
select {
|
|
case <-timer.C:
|
|
logger.Error("***** ipfs-cluster consensus start timed out (tips below) *****")
|
|
logger.Error(`
|
|
**************************************************
|
|
This peer was not able to become part of the cluster.
|
|
This might be due to one or several causes:
|
|
- Check the logs above this message for errors
|
|
- Check that there is connectivity to the "peers" multiaddresses
|
|
- Check that all cluster peers are using the same "secret"
|
|
- Check that this peer is reachable on its "listen_multiaddress" by all peers
|
|
- Check that the current cluster is healthy (has a leader). Otherwise make
|
|
sure to start enough peers so that a leader election can happen.
|
|
- Check that the peer(s) you are trying to connect to is running the
|
|
same version of IPFS-cluster.
|
|
**************************************************
|
|
`)
|
|
c.Shutdown(ctx)
|
|
return
|
|
case <-c.consensus.Ready(ctx):
|
|
// Consensus ready means the state is up to date.
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
|
|
// Cluster is ready.
|
|
|
|
peers, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
c.Shutdown(ctx)
|
|
return
|
|
}
|
|
|
|
logger.Info("Cluster Peers (without including ourselves):")
|
|
if len(peers) == 1 {
|
|
logger.Info(" - No other peers")
|
|
}
|
|
|
|
for _, p := range peers {
|
|
if p != c.id {
|
|
logger.Infof(" - %s", p.Pretty())
|
|
}
|
|
}
|
|
|
|
close(c.readyCh)
|
|
c.shutdownLock.Lock()
|
|
c.readyB = true
|
|
c.shutdownLock.Unlock()
|
|
logger.Info("** IPFS Cluster is READY **")
|
|
daemon.SdNotify(false, daemon.SdNotifyReady)
|
|
}
|
|
|
|
// Ready returns a channel which signals when this peer is
|
|
// fully initialized (including consensus).
|
|
func (c *Cluster) Ready() <-chan struct{} {
|
|
return c.readyCh
|
|
}
|
|
|
|
// Shutdown performs all the necessary operations to shutdown
|
|
// the IPFS Cluster peer:
|
|
// * Save peerstore with the current peers
|
|
// * Remove itself from consensus when LeaveOnShutdown is set
|
|
// * It Shutdowns all the components
|
|
// * Collects all goroutines
|
|
//
|
|
// Shutdown does not close the libp2p host, the DHT, the datastore or
|
|
// generally anything that Cluster did not create.
|
|
func (c *Cluster) Shutdown(ctx context.Context) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Shutdown")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
c.shutdownLock.Lock()
|
|
defer c.shutdownLock.Unlock()
|
|
|
|
if c.shutdownB {
|
|
logger.Debug("Cluster is already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("shutting down Cluster")
|
|
|
|
// Shutdown APIs first, avoids more requests coming through.
|
|
for _, api := range c.apis {
|
|
if err := api.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping API: %s", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Cancel discovery service (this shutdowns announcing). Handling
|
|
// entries is canceled along with the context below.
|
|
if c.discovery != nil {
|
|
c.discovery.Close()
|
|
}
|
|
|
|
// Try to store peerset file for all known peers whatsoever
|
|
// if we got ready (otherwise, don't overwrite anything)
|
|
if c.readyB {
|
|
// Ignoring error since it's a best-effort
|
|
c.peerManager.SavePeerstoreForPeers(c.host.Peerstore().Peers())
|
|
}
|
|
|
|
// Only attempt to leave if:
|
|
// - consensus is initialized
|
|
// - cluster was ready (no bootstrapping error)
|
|
// - We are not removed already (means watchPeers() called us)
|
|
if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed {
|
|
c.removed = true
|
|
_, err := c.consensus.Peers(ctx)
|
|
if err == nil {
|
|
// best effort
|
|
logger.Warn("attempting to leave the cluster. This may take some seconds")
|
|
err := c.consensus.RmPeer(ctx, c.id)
|
|
if err != nil {
|
|
logger.Error("leaving cluster: " + err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
if con := c.consensus; con != nil {
|
|
if err := con.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping consensus: %s", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// We left the cluster or were removed. Remove any consensus-specific
|
|
// state.
|
|
if c.removed && c.readyB {
|
|
err := c.consensus.Clean(ctx)
|
|
if err != nil {
|
|
logger.Error("cleaning consensus: ", err)
|
|
}
|
|
}
|
|
|
|
if err := c.monitor.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping monitor: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := c.ipfs.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping IPFS Connector: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := c.tracker.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping PinTracker: %s", err)
|
|
return err
|
|
}
|
|
|
|
for _, inf := range c.informers {
|
|
if err := inf.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping informer: %s", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := c.tracer.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping Tracer: %s", err)
|
|
return err
|
|
}
|
|
|
|
c.cancel()
|
|
c.wg.Wait()
|
|
|
|
c.shutdownB = true
|
|
close(c.doneCh)
|
|
return nil
|
|
}
|
|
|
|
// Done provides a way to learn if the Peer has been shutdown
|
|
// (for example, because it has been removed from the Cluster)
|
|
func (c *Cluster) Done() <-chan struct{} {
|
|
return c.doneCh
|
|
}
|
|
|
|
// ID returns information about the Cluster peer
|
|
func (c *Cluster) ID(ctx context.Context) api.ID {
|
|
_, span := trace.StartSpan(ctx, "cluster/ID")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// ignore error since it is included in response object
|
|
ipfsID, err := c.ipfs.ID(ctx)
|
|
if err != nil {
|
|
ipfsID = api.IPFSID{
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
|
|
var addrs []api.Multiaddr
|
|
mAddrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: c.id, Addrs: c.host.Addrs()})
|
|
if err == nil {
|
|
for _, mAddr := range mAddrs {
|
|
addrs = append(addrs, api.NewMultiaddrWithValue(mAddr))
|
|
}
|
|
}
|
|
|
|
peers := []peer.ID{}
|
|
// This method might get called very early by a remote peer
|
|
// and might catch us when consensus is not set
|
|
if c.consensus != nil {
|
|
peers, _ = c.consensus.Peers(ctx)
|
|
}
|
|
|
|
clusterPeerInfos := c.peerManager.PeerInfos(peers)
|
|
addresses := []api.Multiaddr{}
|
|
for _, pinfo := range clusterPeerInfos {
|
|
addrs, err := peer.AddrInfoToP2pAddrs(&pinfo)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, a := range addrs {
|
|
addresses = append(addresses, api.NewMultiaddrWithValue(a))
|
|
}
|
|
}
|
|
|
|
id := api.ID{
|
|
ID: c.id,
|
|
// PublicKey: c.host.Peerstore().PubKey(c.id),
|
|
Addresses: addrs,
|
|
ClusterPeers: peers,
|
|
ClusterPeersAddresses: addresses,
|
|
Version: version.Version.String(),
|
|
RPCProtocolVersion: version.RPCProtocol,
|
|
IPFS: ipfsID,
|
|
Peername: c.config.Peername,
|
|
}
|
|
if err != nil {
|
|
id.Error = err.Error()
|
|
}
|
|
|
|
return id
|
|
}
|
|
|
|
// PeerAdd adds a new peer to this Cluster.
|
|
//
|
|
// For it to work well, the new peer should be discoverable
|
|
// (part of our peerstore or connected to one of the existing peers)
|
|
// and reachable. Since PeerAdd allows to add peers which are
|
|
// not running, or reachable, it is recommended to call Join() from the
|
|
// new peer instead.
|
|
//
|
|
// The new peer ID will be passed to the consensus
|
|
// component to be added to the peerset.
|
|
func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PeerAdd")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// starting 10 nodes on the same box for testing
|
|
// causes deadlock and a global lock here
|
|
// seems to help.
|
|
c.paMux.Lock()
|
|
defer c.paMux.Unlock()
|
|
logger.Debugf("peerAdd called with %s", pid.Pretty())
|
|
|
|
// Let the consensus layer be aware of this peer
|
|
err := c.consensus.AddPeer(ctx, pid)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
id := &api.ID{ID: pid, Error: err.Error()}
|
|
return id, err
|
|
}
|
|
|
|
logger.Info("Peer added ", pid.Pretty())
|
|
addedID, err := c.getIDForPeer(ctx, pid)
|
|
if err != nil {
|
|
return addedID, err
|
|
}
|
|
if !containsPeer(addedID.ClusterPeers, c.id) {
|
|
addedID.ClusterPeers = append(addedID.ClusterPeers, c.id)
|
|
}
|
|
return addedID, nil
|
|
}
|
|
|
|
// PeerRemove removes a peer from this Cluster.
|
|
//
|
|
// The peer will be removed from the consensus peerset.
|
|
// This may first trigger repinnings for all content if not disabled.
|
|
func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/PeerRemove")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// We need to repin before removing the peer, otherwise, it won't
|
|
// be able to submit the pins.
|
|
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
|
|
c.vacatePeer(ctx, pid)
|
|
|
|
err := c.consensus.RmPeer(ctx, pid)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
logger.Info("Peer removed ", pid.Pretty())
|
|
return nil
|
|
}
|
|
|
|
// Join adds this peer to an existing cluster by bootstrapping to a
|
|
// given multiaddress. It works by calling PeerAdd on the destination
|
|
// cluster and making sure that the new peer is ready to discover and contact
|
|
// the rest.
|
|
func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Join")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
logger.Debugf("Join(%s)", addr)
|
|
|
|
// Add peer to peerstore so we can talk to it
|
|
pid, err := c.peerManager.ImportPeer(addr, false, peerstore.PermanentAddrTTL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if pid == c.id {
|
|
return nil
|
|
}
|
|
|
|
// Note that PeerAdd() on the remote peer will
|
|
// figure out what our real address is (obviously not
|
|
// ListenAddr).
|
|
var myID api.ID
|
|
err = c.rpcClient.CallContext(
|
|
ctx,
|
|
pid,
|
|
"Cluster",
|
|
"PeerAdd",
|
|
c.id,
|
|
&myID,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
// Log a fake but valid metric from the peer we are
|
|
// contacting. This will signal a CRDT component that
|
|
// we know that peer since we have metrics for it without
|
|
// having to wait for the next metric round.
|
|
if err := c.logPingMetric(ctx, pid); err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
|
|
// Broadcast our metrics to the world
|
|
err = c.sendInformersMetrics(ctx)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
|
|
_, err = c.sendPingMetric(ctx)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
|
|
// We need to trigger a DHT bootstrap asap for this peer to not be
|
|
// lost if the peer it bootstrapped to goes down. We do this manually
|
|
// by triggering 1 round of bootstrap in the background.
|
|
// Note that our regular bootstrap process is still running in the
|
|
// background since we created the cluster.
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
select {
|
|
case err := <-c.dht.LAN.RefreshRoutingTable():
|
|
if err != nil {
|
|
// this error is quite chatty
|
|
// on single peer clusters
|
|
logger.Debug(err)
|
|
}
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
|
|
select {
|
|
case err := <-c.dht.WAN.RefreshRoutingTable():
|
|
if err != nil {
|
|
// this error is quite chatty
|
|
// on single peer clusters
|
|
logger.Debug(err)
|
|
}
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
}()
|
|
|
|
// ConnectSwarms in the background after a while, when we have likely
|
|
// received some metrics.
|
|
time.AfterFunc(c.config.MonitorPingInterval, func() {
|
|
c.ipfs.ConnectSwarms(ctx)
|
|
})
|
|
|
|
// wait for leader and for state to catch up
|
|
// then sync
|
|
err = c.consensus.WaitForSync(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
// Start pinning items in the state that are not on IPFS yet.
|
|
out := make(chan api.PinInfo, 1024)
|
|
// discard outputs
|
|
go func() {
|
|
for range out {
|
|
}
|
|
}()
|
|
go c.RecoverAllLocal(ctx, out)
|
|
|
|
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
|
|
return nil
|
|
}
|
|
|
|
// Distances returns a distance checker using current trusted peers.
|
|
// It can optionally receive a peer ID to exclude from the checks.
|
|
func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChecker, error) {
|
|
trustedPeers, err := c.getTrustedPeers(ctx, exclude)
|
|
if err != nil {
|
|
logger.Error("could not get trusted peers:", err)
|
|
return nil, err
|
|
}
|
|
|
|
return &distanceChecker{
|
|
local: c.id,
|
|
otherPeers: trustedPeers,
|
|
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
|
|
}, nil
|
|
}
|
|
|
|
// StateSync performs maintenance tasks on the global state that require
|
|
// looping through all the items. It is triggered automatically on
|
|
// StateSyncInterval. Currently it:
|
|
// * Sends unpin for expired items for which this peer is "closest"
|
|
// (skipped for follower peers)
|
|
func (c *Cluster) StateSync(ctx context.Context) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/StateSync")
|
|
defer span.End()
|
|
logger.Debug("StateSync")
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
if c.config.FollowerMode {
|
|
return nil
|
|
}
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeNow := time.Now()
|
|
|
|
// Only trigger pin operations if we are the closest with respect to
|
|
// other trusted peers. We cannot know if our peer ID is trusted by
|
|
// other peers in the Cluster. This assumes yes. Setting FollowerMode
|
|
// is a way to assume the opposite and skip this completely.
|
|
distance, err := c.distances(ctx, "")
|
|
if err != nil {
|
|
return err // could not list peers
|
|
}
|
|
|
|
clusterPins := make(chan api.Pin, 1024)
|
|
go func() {
|
|
err = cState.List(ctx, clusterPins)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}()
|
|
|
|
// Unpin expired items when we are the closest peer to them.
|
|
for p := range clusterPins {
|
|
if p.ExpiredAt(timeNow) && distance.isClosest(p.Cid) {
|
|
logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt)
|
|
if _, err := c.Unpin(ctx, p.Cid); err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers on
|
|
// the out channel. This is done by broacasting a StatusAll to all peers. If
|
|
// an error happens, it is returned. This method blocks until it finishes. The
|
|
// operation can be aborted by canceling the context.
|
|
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
in := make(chan api.TrackerStatus, 1)
|
|
in <- filter
|
|
close(in)
|
|
return c.globalPinInfoStream(ctx, "PinTracker", "StatusAll", in, out)
|
|
}
|
|
|
|
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on
|
|
// the out channel. It blocks until finished.
|
|
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.StatusAll(ctx, filter, out)
|
|
}
|
|
|
|
// Status returns the GlobalPinInfo for a given Cid as fetched from all
|
|
// current peers. If an error happens, the GlobalPinInfo should contain
|
|
// as much information as could be fetched from the other peers.
|
|
func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Status")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoCid(ctx, "PinTracker", "Status", h)
|
|
}
|
|
|
|
// StatusLocal returns this peer's PinInfo for a given Cid.
|
|
func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.Status(ctx, h)
|
|
}
|
|
|
|
// used for RecoverLocal and SyncLocal.
|
|
func (c *Cluster) localPinInfoOp(
|
|
ctx context.Context,
|
|
h api.Cid,
|
|
f func(context.Context, api.Cid) (api.PinInfo, error),
|
|
) (pInfo api.PinInfo, err error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp")
|
|
defer span.End()
|
|
|
|
cids, err := c.cidsFromMetaPin(ctx, h)
|
|
if err != nil {
|
|
return api.PinInfo{}, err
|
|
}
|
|
|
|
for _, ci := range cids {
|
|
pInfo, err = f(ctx, ci)
|
|
if err != nil {
|
|
logger.Error("tracker.SyncCid() returned with error: ", err)
|
|
logger.Error("Is the ipfs daemon running?")
|
|
break
|
|
}
|
|
}
|
|
// return the last pInfo/err, should be the root Cid if everything ok
|
|
return pInfo, err
|
|
}
|
|
|
|
// RecoverAll triggers a RecoverAllLocal operation on all peers and returns
|
|
// GlobalPinInfo objets for all recovered items. This method blocks until
|
|
// finished. Operation can be aborted by canceling the context.
|
|
func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out)
|
|
}
|
|
|
|
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
|
|
// by this peer.
|
|
//
|
|
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
|
// is faster than calling Pin on the same CID as it avoids committing an
|
|
// identical pin to the consensus layer.
|
|
//
|
|
// It returns the list of pins that were re-queued for pinning on the out
|
|
// channel. It blocks until done.
|
|
//
|
|
// RecoverAllLocal is called automatically every PinRecoverInterval.
|
|
func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.RecoverAll(ctx, out)
|
|
}
|
|
|
|
// Recover triggers a recover operation for a given Cid in all
|
|
// cluster peers.
|
|
//
|
|
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
|
// is faster than calling Pin on the same CID as it avoids committing an
|
|
// identical pin to the consensus layer.
|
|
func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Recover")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h)
|
|
}
|
|
|
|
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
|
|
// It returns the updated PinInfo, after recovery.
|
|
//
|
|
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
|
// is faster than calling Pin on the same CID as it avoids committing an
|
|
// identical pin to the consensus layer.
|
|
func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.localPinInfoOp(ctx, h, c.tracker.Recover)
|
|
}
|
|
|
|
// Pins sends pins on the given out channel as it iterates the full
|
|
// pinset (current global state). This is the source of truth as to which pins
|
|
// are managed and their allocation, but does not indicate if the item is
|
|
// successfully pinned. For that, use the Status*() methods.
|
|
//
|
|
// The operation can be aborted by canceling the context. This methods blocks
|
|
// until the operation has completed.
|
|
func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Pins")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
return cState.List(ctx, out)
|
|
}
|
|
|
|
// pinsSlice returns the list of Cids managed by Cluster and which are part
|
|
// of the current global state. This is the source of truth as to which
|
|
// pins are managed and their allocation, but does not indicate if
|
|
// the item is successfully pinned. For that, use StatusAll().
|
|
//
|
|
// It is recommended to use PinsChannel(), as this method is equivalent to
|
|
// loading the full pinset in memory!
|
|
func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) {
|
|
out := make(chan api.Pin, 1024)
|
|
var err error
|
|
go func() {
|
|
err = c.Pins(ctx, out)
|
|
}()
|
|
|
|
var pins []api.Pin
|
|
for pin := range out {
|
|
pins = append(pins, pin)
|
|
}
|
|
return pins, err
|
|
}
|
|
|
|
// PinGet returns information for a single Cid managed by Cluster.
|
|
// The information is obtained from the current global state. The
|
|
// returned api.Pin provides information about the allocations
|
|
// assigned for the requested Cid, but does not indicate if
|
|
// the item is successfully pinned. For that, use Status(). PinGet
|
|
// returns an error if the given Cid is not part of the global state.
|
|
func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PinGet")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
st, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
pin, err := st.Get(ctx, h)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
return pin, nil
|
|
}
|
|
|
|
// Pin makes the cluster Pin a Cid. This implies adding the Cid
|
|
// to the IPFS Cluster peers shared-state. Depending on the cluster
|
|
// pinning strategy, the PinTracker may then request the IPFS daemon
|
|
// to pin the Cid.
|
|
//
|
|
// Pin returns the Pin as stored in the global state (with the given
|
|
// allocations and an error if the operation could not be persisted. Pin does
|
|
// not reflect the success or failure of underlying IPFS daemon pinning
|
|
// operations which happen in async fashion.
|
|
//
|
|
// If the options UserAllocations are non-empty then these peers are pinned
|
|
// with priority over other peers in the cluster. If the max repl factor is
|
|
// less than the size of the specified peerset then peers are chosen from this
|
|
// set in allocation order. If the minimum repl factor is greater than the
|
|
// size of this set then the remaining peers are allocated in order from the
|
|
// rest of the cluster. Priority allocations are best effort. If any priority
|
|
// peers are unavailable then Pin will simply allocate from the rest of the
|
|
// cluster.
|
|
//
|
|
// If the Update option is set, the pin options (including allocations) will
|
|
// be copied from an existing one. This is equivalent to running PinUpdate.
|
|
func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Pin")
|
|
defer span.End()
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
pin := api.PinWithOpts(h, opts)
|
|
|
|
result, _, err := c.pin(ctx, pin, []peer.ID{})
|
|
return result, err
|
|
}
|
|
|
|
// sets the default replication factor in a pin when it's set to 0
|
|
func (c *Cluster) setupReplicationFactor(pin api.Pin) (api.Pin, error) {
|
|
rplMin := pin.ReplicationFactorMin
|
|
rplMax := pin.ReplicationFactorMax
|
|
if rplMin == 0 {
|
|
rplMin = c.config.ReplicationFactorMin
|
|
pin.ReplicationFactorMin = rplMin
|
|
}
|
|
if rplMax == 0 {
|
|
rplMax = c.config.ReplicationFactorMax
|
|
pin.ReplicationFactorMax = rplMax
|
|
}
|
|
|
|
// When pinning everywhere, remove all allocations.
|
|
// Allocations may have been preset by the adder
|
|
// for the cases when the replication factor is > -1.
|
|
// Fixes part of #1319: allocations when adding
|
|
// are kept.
|
|
if pin.IsPinEverywhere() {
|
|
pin.Allocations = nil
|
|
}
|
|
|
|
return pin, isReplicationFactorValid(rplMin, rplMax)
|
|
}
|
|
|
|
// basic checks on the pin type to check it's well-formed.
|
|
func checkPinType(pin api.Pin) error {
|
|
switch pin.Type {
|
|
case api.DataType:
|
|
if pin.Reference != nil {
|
|
return errors.New("data pins should not reference other pins")
|
|
}
|
|
case api.ShardType:
|
|
if pin.MaxDepth != 1 {
|
|
return errors.New("must pin shards go depth 1")
|
|
}
|
|
// FIXME: indirect shard pins could have max-depth 2
|
|
// FIXME: repinning a shard type will overwrite replication
|
|
// factor from previous:
|
|
// if existing.ReplicationFactorMin != rplMin ||
|
|
// existing.ReplicationFactorMax != rplMax {
|
|
// return errors.New("shard update with wrong repl factors")
|
|
//}
|
|
case api.ClusterDAGType:
|
|
if pin.MaxDepth != 0 {
|
|
return errors.New("must pin roots directly")
|
|
}
|
|
if pin.Reference == nil {
|
|
return errors.New("clusterDAG pins should reference a Meta pin")
|
|
}
|
|
case api.MetaType:
|
|
if len(pin.Allocations) != 0 {
|
|
return errors.New("meta pin should not specify allocations")
|
|
}
|
|
if pin.Reference == nil {
|
|
return errors.New("metaPins should reference a ClusterDAG")
|
|
}
|
|
|
|
default:
|
|
return errors.New("unrecognized pin type")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setupPin ensures that the Pin object is fit for pinning. We check
|
|
// and set the replication factors and ensure that the pinType matches the
|
|
// metadata consistently.
|
|
func (c *Cluster) setupPin(ctx context.Context, pin, existing api.Pin) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/setupPin")
|
|
defer span.End()
|
|
var err error
|
|
|
|
pin, err = c.setupReplicationFactor(pin)
|
|
if err != nil {
|
|
return pin, err
|
|
}
|
|
|
|
if !pin.ExpireAt.IsZero() && pin.ExpireAt.Before(time.Now()) {
|
|
return pin, errors.New("pin.ExpireAt set before current time")
|
|
}
|
|
|
|
if !existing.Defined() {
|
|
return pin, nil
|
|
}
|
|
|
|
// If an pin CID is already pin, we do a couple more checks
|
|
if existing.Type != pin.Type {
|
|
msg := "cannot repin CID with different tracking method, "
|
|
msg += "clear state with pin rm to proceed. "
|
|
msg += "New: %s. Was: %s"
|
|
return pin, fmt.Errorf(msg, pin.Type, existing.Type)
|
|
}
|
|
|
|
if existing.Mode == api.PinModeRecursive && pin.Mode != api.PinModeRecursive {
|
|
msg := "cannot repin a CID which is already pinned in "
|
|
msg += "recursive mode (new pin is pinned as %s). Unpin it first."
|
|
return pin, fmt.Errorf(msg, pin.Mode)
|
|
}
|
|
|
|
return pin, checkPinType(pin)
|
|
}
|
|
|
|
// pin performs the actual pinning and supports a blacklist to be able to
|
|
// evacuate a node and returns the pin object that it tried to pin, whether
|
|
// the pin was submitted to the consensus layer or skipped (due to error or to
|
|
// the fact that it was already valid) and error.
|
|
//
|
|
// This is the method called by the Cluster.Pin RPC endpoint.
|
|
func (c *Cluster) pin(
|
|
ctx context.Context,
|
|
pin api.Pin,
|
|
blacklist []peer.ID,
|
|
) (api.Pin, bool, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pin")
|
|
defer span.End()
|
|
|
|
if c.config.FollowerMode {
|
|
return api.Pin{}, false, errFollowerMode
|
|
}
|
|
|
|
if !pin.Cid.Defined() {
|
|
return pin, false, errors.New("bad pin object")
|
|
}
|
|
|
|
// Handle pin updates when the option is set
|
|
if update := pin.PinUpdate; update.Defined() && !update.Equals(pin.Cid) {
|
|
pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions)
|
|
return pin, true, err
|
|
}
|
|
|
|
existing, err := c.PinGet(ctx, pin.Cid)
|
|
if err != nil && err != state.ErrNotFound {
|
|
return pin, false, err
|
|
}
|
|
|
|
pin, err = c.setupPin(ctx, pin, existing)
|
|
if err != nil {
|
|
return pin, false, err
|
|
}
|
|
|
|
// Set the Pin timestamp to now(). This is not an user-controllable
|
|
// "option".
|
|
pin.Timestamp = time.Now()
|
|
|
|
if pin.Type == api.MetaType {
|
|
return pin, true, c.consensus.LogPin(ctx, pin)
|
|
}
|
|
|
|
// We did not change ANY options and the pin exists so we just repin
|
|
// what there is without doing new allocations. While this submits
|
|
// pins to the consensus layer even if they are, this should trigger the
|
|
// pin tracker and allows users to get re-pin operations by re-adding
|
|
// without having to use recover, which is naturally expected.
|
|
//
|
|
// blacklist is set on repinFromPeer having any blacklisted peers
|
|
// means we are repinning and need to trigger allocate(), therefore we
|
|
// can't overwrite the incoming pin (which has Allocations set to
|
|
// nil).
|
|
if existing.Defined() &&
|
|
pin.PinOptions.Equals(existing.PinOptions) &&
|
|
len(blacklist) == 0 {
|
|
pin = existing
|
|
}
|
|
|
|
// Usually allocations are unset when pinning normally, however, the
|
|
// allocations may have been preset by the adder in which case they
|
|
// need to be respected. Whenever allocations are set. We don't
|
|
// re-allocate. repinFromPeer() unsets allocations for this reason.
|
|
// allocate() will check which peers are currently allocated
|
|
// and try to respect them.
|
|
if len(pin.Allocations) == 0 {
|
|
// If replication factor is -1, this will return empty
|
|
// allocations.
|
|
allocs, err := c.allocate(
|
|
ctx,
|
|
pin.Cid,
|
|
existing,
|
|
pin.ReplicationFactorMin,
|
|
pin.ReplicationFactorMax,
|
|
blacklist,
|
|
pin.UserAllocations,
|
|
)
|
|
if err != nil {
|
|
return pin, false, err
|
|
}
|
|
pin.Allocations = allocs
|
|
}
|
|
|
|
// If this is true, replication factor should be -1.
|
|
if len(pin.Allocations) == 0 {
|
|
logger.Infof("pinning %s everywhere:", pin.Cid)
|
|
} else {
|
|
logger.Infof("pinning %s on %s:", pin.Cid, pin.Allocations)
|
|
}
|
|
|
|
return pin, true, c.consensus.LogPin(ctx, pin)
|
|
}
|
|
|
|
// Unpin removes a previously pinned Cid from Cluster. It returns
|
|
// the global state Pin object as it was stored before removal, or
|
|
// an error if it was not possible to update the global state.
|
|
//
|
|
// Unpin does not reflect the success or failure of underlying IPFS daemon
|
|
// unpinning operations, which happen in async fashion.
|
|
func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Unpin")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
if c.config.FollowerMode {
|
|
return api.Pin{}, errFollowerMode
|
|
}
|
|
|
|
logger.Info("IPFS cluster unpinning:", h)
|
|
pin, err := c.PinGet(ctx, h)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
switch pin.Type {
|
|
case api.DataType:
|
|
return pin, c.consensus.LogUnpin(ctx, pin)
|
|
case api.ShardType:
|
|
err := "cannot unpin a shard directly. Unpin content root CID instead"
|
|
return pin, errors.New(err)
|
|
case api.MetaType:
|
|
// Unpin cluster dag and referenced shards
|
|
err := c.unpinClusterDag(pin)
|
|
if err != nil {
|
|
return pin, err
|
|
}
|
|
return pin, c.consensus.LogUnpin(ctx, pin)
|
|
case api.ClusterDAGType:
|
|
err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead"
|
|
return pin, errors.New(err)
|
|
default:
|
|
return pin, errors.New("unrecognized pin type")
|
|
}
|
|
}
|
|
|
|
// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata
|
|
// nodes that it references. It handles the case where multiple parents
|
|
// reference the same metadata node, only unpinning those nodes without
|
|
// existing references
|
|
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag")
|
|
defer span.End()
|
|
|
|
cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: FIXME: potentially unpinning shards which are referenced
|
|
// by other clusterDAGs.
|
|
for _, ci := range cids {
|
|
err = c.consensus.LogUnpin(ctx, api.PinCid(ci))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PinUpdate pins a new CID based on an existing cluster Pin. The allocations
|
|
// and most pin options (replication factors) are copied from the existing
|
|
// Pin. The options object can be used to set the Name for the new pin and
|
|
// might support additional options in the future.
|
|
//
|
|
// The from pin is NOT unpinned upon completion. The new pin might take
|
|
// advantage of efficient pin/update operation on IPFS-side (if the
|
|
// IPFSConnector supports it - the default one does). This may offer
|
|
// significant speed when pinning items which are similar to previously pinned
|
|
// content.
|
|
func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts api.PinOptions) (api.Pin, error) {
|
|
existing, err := c.PinGet(ctx, from)
|
|
if err != nil { // including when the existing pin is not found
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
// Hector: I am not sure whether it has any point to update something
|
|
// like a MetaType.
|
|
if existing.Type != api.DataType {
|
|
return api.Pin{}, errors.New("this pin type cannot be updated")
|
|
}
|
|
|
|
existing.Cid = to
|
|
existing.PinUpdate = from
|
|
existing.Timestamp = time.Now()
|
|
if opts.Name != "" {
|
|
existing.Name = opts.Name
|
|
}
|
|
if !opts.ExpireAt.IsZero() && opts.ExpireAt.After(time.Now()) {
|
|
existing.ExpireAt = opts.ExpireAt
|
|
}
|
|
return existing, c.consensus.LogPin(ctx, existing)
|
|
}
|
|
|
|
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
|
|
// Pin object.
|
|
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PinPath")
|
|
defer span.End()
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
ci, err := c.ipfs.Resolve(ctx, path)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
return c.Pin(ctx, ci, opts)
|
|
}
|
|
|
|
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
|
|
// previously pinned Pin object.
|
|
func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/UnpinPath")
|
|
defer span.End()
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
ci, err := c.ipfs.Resolve(ctx, path)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
return c.Unpin(ctx, ci)
|
|
}
|
|
|
|
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer
|
|
// pipeline is used to DAGify the file. Depending on input parameters this
|
|
// DAG can be added locally to the calling cluster peer's ipfs repo, or
|
|
// sharded across the entire cluster.
|
|
func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (api.Cid, error) {
|
|
// TODO: add context param and tracing
|
|
|
|
var dags adder.ClusterDAGService
|
|
if params.Shard {
|
|
dags = sharding.New(ctx, c.rpcClient, params, nil)
|
|
} else {
|
|
dags = single.New(ctx, c.rpcClient, params, params.Local)
|
|
}
|
|
add := adder.New(dags, params, nil)
|
|
return add.FromMultipart(ctx, reader)
|
|
}
|
|
|
|
// Version returns the current IPFS Cluster version.
|
|
func (c *Cluster) Version() string {
|
|
return version.Version.String()
|
|
}
|
|
|
|
// Peers returns the IDs of the members of this Cluster on the out channel.
|
|
// This method blocks until it has finished.
|
|
func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Peers")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
peers, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
logger.Error("an empty list of peers will be returned")
|
|
close(out)
|
|
return
|
|
}
|
|
c.peersWithFilter(ctx, peers, out)
|
|
}
|
|
|
|
// requests IDs from a given number of peers.
|
|
func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID, out chan<- api.ID) {
|
|
defer close(out)
|
|
|
|
// We should be done relatively quickly with this call. Otherwise
|
|
// report errors.
|
|
timeout := 15 * time.Second
|
|
ctxCall, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
in := make(chan struct{})
|
|
close(in)
|
|
idsOut := make(chan api.ID, len(peers))
|
|
errCh := make(chan []error, 1)
|
|
|
|
go func() {
|
|
defer close(errCh)
|
|
|
|
errCh <- c.rpcClient.MultiStream(
|
|
ctxCall,
|
|
peers,
|
|
"Cluster",
|
|
"IDStream",
|
|
in,
|
|
idsOut,
|
|
)
|
|
}()
|
|
|
|
// Unfortunately, we need to use idsOut as intermediary channel
|
|
// because it is closed when MultiStream ends and we cannot keep
|
|
// adding things on it (the errors below).
|
|
for id := range idsOut {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Errorf("Peers call aborted: %s", ctx.Err())
|
|
return
|
|
case out <- id:
|
|
}
|
|
}
|
|
|
|
// ErrCh will always be closed on context cancellation too.
|
|
errs := <-errCh
|
|
for i, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
if rpc.IsAuthorizationError(err) {
|
|
continue
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Errorf("Peers call aborted: %s", ctx.Err())
|
|
case out <- api.ID{
|
|
ID: peers[i],
|
|
Error: err.Error(),
|
|
}:
|
|
}
|
|
}
|
|
}
|
|
|
|
// getTrustedPeers gives listed of trusted peers except the current peer and
|
|
// the excluded peer if provided.
|
|
func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.ID, error) {
|
|
peers, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
trustedPeers := make([]peer.ID, 0, len(peers))
|
|
|
|
for _, p := range peers {
|
|
if p == c.id || p == exclude || !c.consensus.IsTrustedPeer(ctx, p) {
|
|
continue
|
|
}
|
|
trustedPeers = append(trustedPeers, p)
|
|
}
|
|
|
|
return trustedPeers, nil
|
|
}
|
|
|
|
func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h api.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) {
|
|
for _, p := range peers {
|
|
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p))
|
|
gpin.Add(api.PinInfo{
|
|
Cid: h,
|
|
Name: pin.Name,
|
|
Allocations: pin.Allocations,
|
|
Origins: pin.Origins,
|
|
Created: pin.Timestamp,
|
|
Metadata: pin.Metadata,
|
|
Peer: p,
|
|
PinInfoShort: api.PinInfoShort{
|
|
PeerName: pv.Peername,
|
|
IPFS: pv.IPFSID,
|
|
IPFSAddresses: pv.IPFSAddresses,
|
|
Status: status,
|
|
TS: t,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h api.Cid) (api.GlobalPinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
|
|
defer span.End()
|
|
|
|
// The object we will return
|
|
gpin := api.GlobalPinInfo{}
|
|
|
|
// allocated peers, we will contact them through rpc
|
|
var dests []peer.ID
|
|
// un-allocated peers, we will set remote status
|
|
var remote []peer.ID
|
|
|
|
timeNow := time.Now()
|
|
|
|
// If pin is not part of the pinset, mark it unpinned
|
|
pin, err := c.PinGet(ctx, h)
|
|
if err != nil && err != state.ErrNotFound {
|
|
logger.Error(err)
|
|
return api.GlobalPinInfo{}, err
|
|
}
|
|
|
|
// When NotFound return directly with an unpinned
|
|
// status.
|
|
if err == state.ErrNotFound {
|
|
var members []peer.ID
|
|
if c.config.FollowerMode {
|
|
members = []peer.ID{c.host.ID()}
|
|
} else {
|
|
members, err = c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return api.GlobalPinInfo{}, err
|
|
}
|
|
}
|
|
|
|
c.setTrackerStatus(
|
|
&gpin,
|
|
h,
|
|
members,
|
|
api.TrackerStatusUnpinned,
|
|
api.PinCid(h),
|
|
timeNow,
|
|
)
|
|
return gpin, nil
|
|
}
|
|
|
|
// The pin exists.
|
|
gpin.Cid = h
|
|
gpin.Name = pin.Name
|
|
|
|
// Make the list of peers that will receive the request.
|
|
if c.config.FollowerMode {
|
|
// during follower mode return only local status.
|
|
dests = []peer.ID{c.host.ID()}
|
|
remote = []peer.ID{}
|
|
} else {
|
|
members, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return api.GlobalPinInfo{}, err
|
|
}
|
|
|
|
if !pin.IsPinEverywhere() {
|
|
dests = pin.Allocations
|
|
remote = peersSubtract(members, dests)
|
|
} else {
|
|
dests = members
|
|
remote = []peer.ID{}
|
|
}
|
|
}
|
|
|
|
// set status remote on un-allocated peers
|
|
c.setTrackerStatus(&gpin, h, remote, api.TrackerStatusRemote, pin, timeNow)
|
|
|
|
lenDests := len(dests)
|
|
replies := make([]api.PinInfo, lenDests)
|
|
|
|
// a globalPinInfo type of request should be relatively fast. We
|
|
// cannot block response indefinitely due to an unresponsive node.
|
|
timeout := 15 * time.Second
|
|
ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, lenDests, timeout)
|
|
defer rpcutil.MultiCancel(cancels)
|
|
|
|
errs := c.rpcClient.MultiCall(
|
|
ctxs,
|
|
dests,
|
|
comp,
|
|
method,
|
|
h,
|
|
rpcutil.CopyPinInfoToIfaces(replies),
|
|
)
|
|
|
|
for i, r := range replies {
|
|
e := errs[i]
|
|
|
|
// No error. Parse and continue
|
|
if e == nil {
|
|
gpin.Add(r)
|
|
continue
|
|
}
|
|
|
|
if rpc.IsAuthorizationError(e) {
|
|
logger.Debug("rpc auth error:", e)
|
|
continue
|
|
}
|
|
|
|
// Deal with error cases (err != nil): wrap errors in PinInfo
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
|
|
|
|
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i]))
|
|
gpin.Add(api.PinInfo{
|
|
Cid: h,
|
|
Name: pin.Name,
|
|
Peer: dests[i],
|
|
Allocations: pin.Allocations,
|
|
Origins: pin.Origins,
|
|
Created: pin.Timestamp,
|
|
Metadata: pin.Metadata,
|
|
PinInfoShort: api.PinInfoShort{
|
|
PeerName: pv.Peername,
|
|
IPFS: pv.IPFSID,
|
|
IPFSAddresses: pv.IPFSAddresses,
|
|
Status: api.TrackerStatusClusterError,
|
|
TS: timeNow,
|
|
Error: e.Error(),
|
|
},
|
|
})
|
|
}
|
|
|
|
return gpin, nil
|
|
}
|
|
|
|
func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, inChan interface{}, out chan<- api.GlobalPinInfo) error {
|
|
defer close(out)
|
|
|
|
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoStream")
|
|
defer span.End()
|
|
|
|
if inChan == nil {
|
|
emptyChan := make(chan struct{})
|
|
close(emptyChan)
|
|
inChan = emptyChan
|
|
}
|
|
|
|
fullMap := make(map[api.Cid]api.GlobalPinInfo)
|
|
|
|
var members []peer.ID
|
|
var err error
|
|
if c.config.FollowerMode {
|
|
members = []peer.ID{c.host.ID()}
|
|
} else {
|
|
members, err = c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// We don't have a good timeout proposal for this. Depending on the
|
|
// size of the state and the peformance of IPFS and the network, this
|
|
// may take moderately long.
|
|
// If we did, this is the place to put it.
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
msOut := make(chan api.PinInfo)
|
|
errsCh := make(chan []error, 1)
|
|
go func() {
|
|
defer close(errsCh)
|
|
errsCh <- c.rpcClient.MultiStream(
|
|
ctx,
|
|
members,
|
|
comp,
|
|
method,
|
|
inChan,
|
|
msOut,
|
|
)
|
|
}()
|
|
|
|
setPinInfo := func(p api.PinInfo) {
|
|
if !p.Defined() {
|
|
return
|
|
}
|
|
info, ok := fullMap[p.Cid]
|
|
if !ok {
|
|
info = api.GlobalPinInfo{}
|
|
}
|
|
info.Add(p)
|
|
// Set the new/updated info
|
|
fullMap[p.Cid] = info
|
|
}
|
|
|
|
// make the big collection.
|
|
for pin := range msOut {
|
|
setPinInfo(pin)
|
|
}
|
|
|
|
// This WAITs until MultiStream is DONE.
|
|
erroredPeers := make(map[peer.ID]string)
|
|
errs, ok := <-errsCh
|
|
if ok {
|
|
for i, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
if rpc.IsAuthorizationError(err) {
|
|
logger.Debug("rpc auth error", err)
|
|
continue
|
|
}
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], err)
|
|
erroredPeers[members[i]] = err.Error()
|
|
}
|
|
}
|
|
|
|
// Merge any errors
|
|
for p, msg := range erroredPeers {
|
|
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, p))
|
|
for c := range fullMap {
|
|
setPinInfo(api.PinInfo{
|
|
Cid: c,
|
|
Name: "",
|
|
Peer: p,
|
|
Allocations: nil,
|
|
Origins: nil,
|
|
// Created: // leave unitialized
|
|
Metadata: nil,
|
|
PinInfoShort: api.PinInfoShort{
|
|
PeerName: pv.Peername,
|
|
IPFS: pv.IPFSID,
|
|
IPFSAddresses: pv.IPFSAddresses,
|
|
Status: api.TrackerStatusClusterError,
|
|
TS: time.Now(),
|
|
Error: msg,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
for _, v := range fullMap {
|
|
select {
|
|
case <-ctx.Done():
|
|
err := fmt.Errorf("%s.%s aborted: %w", comp, method, ctx.Err())
|
|
logger.Error(err)
|
|
return err
|
|
case out <- v:
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (*api.ID, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/getIDForPeer")
|
|
defer span.End()
|
|
|
|
var id api.ID
|
|
err := c.rpcClient.CallContext(
|
|
ctx,
|
|
pid,
|
|
"Cluster",
|
|
"ID",
|
|
struct{}{},
|
|
&id,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
id.ID = pid
|
|
id.Error = err.Error()
|
|
}
|
|
return &id, err
|
|
}
|
|
|
|
// cidsFromMetaPin expands a meta-pin and returns a list of Cids that
|
|
// Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in
|
|
// that order (the MetaPin is the last element).
|
|
// It returns a slice with only the given Cid if it's not a known Cid or not a
|
|
// MetaPin.
|
|
func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin")
|
|
defer span.End()
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
list := []api.Cid{h}
|
|
|
|
pin, err := cState.Get(ctx, h)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if pin.Type != api.MetaType {
|
|
return list, nil
|
|
}
|
|
|
|
if pin.Reference == nil {
|
|
return nil, errors.New("metaPin.Reference is unset")
|
|
}
|
|
list = append([]api.Cid{*pin.Reference}, list...)
|
|
clusterDagPin, err := c.PinGet(ctx, *pin.Reference)
|
|
if err != nil {
|
|
return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err)
|
|
}
|
|
|
|
clusterDagBlock, err := c.ipfs.BlockGet(ctx, clusterDagPin.Cid)
|
|
if err != nil {
|
|
return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err)
|
|
}
|
|
|
|
clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor")
|
|
if err != nil {
|
|
return list, fmt.Errorf("error parsing clusterDAG block: %s", err)
|
|
}
|
|
for _, l := range clusterDagNode.Links() {
|
|
list = append([]api.Cid{api.NewCid(l.Cid)}, list...)
|
|
}
|
|
|
|
return list, nil
|
|
}
|
|
|
|
// // diffPeers returns the peerIDs added and removed from peers2 in relation to
|
|
// // peers1
|
|
// func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
|
|
// m1 := make(map[peer.ID]struct{})
|
|
// m2 := make(map[peer.ID]struct{})
|
|
// added = make([]peer.ID, 0)
|
|
// removed = make([]peer.ID, 0)
|
|
// if peers1 == nil && peers2 == nil {
|
|
// return
|
|
// }
|
|
// if peers1 == nil {
|
|
// added = peers2
|
|
// return
|
|
// }
|
|
// if peers2 == nil {
|
|
// removed = peers1
|
|
// return
|
|
// }
|
|
|
|
// for _, p := range peers1 {
|
|
// m1[p] = struct{}{}
|
|
// }
|
|
// for _, p := range peers2 {
|
|
// m2[p] = struct{}{}
|
|
// }
|
|
// for k := range m1 {
|
|
// _, ok := m2[k]
|
|
// if !ok {
|
|
// removed = append(removed, k)
|
|
// }
|
|
// }
|
|
// for k := range m2 {
|
|
// _, ok := m1[k]
|
|
// if !ok {
|
|
// added = append(added, k)
|
|
// }
|
|
// }
|
|
// return
|
|
// }
|
|
|
|
// RepoGC performs garbage collection sweep on all peers' IPFS repo.
|
|
func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/RepoGC")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
members, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return api.GlobalRepoGC{}, err
|
|
}
|
|
|
|
// to club `RepoGCLocal` responses of all peers into one
|
|
globalRepoGC := api.GlobalRepoGC{PeerMap: make(map[string]api.RepoGC)}
|
|
|
|
for _, member := range members {
|
|
var repoGC api.RepoGC
|
|
err = c.rpcClient.CallContext(
|
|
ctx,
|
|
member,
|
|
"Cluster",
|
|
"RepoGCLocal",
|
|
struct{}{},
|
|
&repoGC,
|
|
)
|
|
if err == nil {
|
|
globalRepoGC.PeerMap[peer.Encode(member)] = repoGC
|
|
continue
|
|
}
|
|
|
|
if rpc.IsAuthorizationError(err) {
|
|
logger.Debug("rpc auth error:", err)
|
|
continue
|
|
}
|
|
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
|
|
|
|
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member))
|
|
|
|
globalRepoGC.PeerMap[peer.Encode(member)] = api.RepoGC{
|
|
Peer: member,
|
|
Peername: pv.Peername,
|
|
Keys: []api.IPFSRepoGC{},
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
|
|
return globalRepoGC, nil
|
|
}
|
|
|
|
// RepoGCLocal performs garbage collection only on the local IPFS deamon.
|
|
func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
resp, err := c.ipfs.RepoGC(ctx)
|
|
if err != nil {
|
|
return api.RepoGC{}, err
|
|
}
|
|
resp.Peer = c.id
|
|
resp.Peername = c.config.Peername
|
|
return resp, nil
|
|
}
|