package ipfscluster import ( "context" "encoding/json" "errors" "fmt" "mime/multipart" "sync" "time" "github.com/coreos/go-systemd/v22/daemon" "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/adder/sharding" "github.com/ipfs-cluster/ipfs-cluster/adder/single" "github.com/ipfs-cluster/ipfs-cluster/api" "github.com/ipfs-cluster/ipfs-cluster/pstoremgr" "github.com/ipfs-cluster/ipfs-cluster/rpcutil" "github.com/ipfs-cluster/ipfs-cluster/state" "github.com/ipfs-cluster/ipfs-cluster/version" "go.uber.org/multierr" ds "github.com/ipfs/go-datastore" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" peerstore "github.com/libp2p/go-libp2p-core/peerstore" rpc "github.com/libp2p/go-libp2p-gorpc" dual "github.com/libp2p/go-libp2p-kad-dht/dual" mdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns" ma "github.com/multiformats/go-multiaddr" ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc" trace "go.opencensus.io/trace" ) // ReadyTimeout specifies the time before giving up // during startup (waiting for consensus to be ready) // It may need adjustment according to timeouts in the // consensus layer. var ReadyTimeout = 30 * time.Second const ( pingMetricName = "ping" bootstrapCount = 3 reBootstrapInterval = 30 * time.Second mdnsServiceTag = "_ipfs-cluster-discovery._udp" maxAlerts = 1000 ) var errFollowerMode = errors.New("this peer is configured to be in follower mode. Write operations are disabled") // Cluster is the main IPFS cluster component. It provides // the go-API for it and orchestrates the components that make up the system. type Cluster struct { ctx context.Context cancel func() id peer.ID config *Config host host.Host dht *dual.DHT discovery mdns.Service datastore ds.Datastore rpcServer *rpc.Server rpcClient *rpc.Client peerManager *pstoremgr.Manager consensus Consensus apis []API ipfs IPFSConnector tracker PinTracker monitor PeerMonitor allocator PinAllocator informers []Informer tracer Tracer alerts []api.Alert alertsMux sync.Mutex doneCh chan struct{} readyCh chan struct{} readyB bool wg sync.WaitGroup // peerAdd paMux sync.Mutex // shutdown function and related variables shutdownLock sync.Mutex shutdownB bool removed bool curPingVal pingValue } // NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host, // creates and RPC Server and client and sets up all components. // // The new cluster peer may still be performing initialization tasks when // this call returns (consensus may still be bootstrapping). Use Cluster.Ready() // if you need to wait until the peer is fully up. func NewCluster( ctx context.Context, host host.Host, dht *dual.DHT, cfg *Config, datastore ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, monitor PeerMonitor, allocator PinAllocator, informers []Informer, tracer Tracer, ) (*Cluster, error) { err := cfg.Validate() if err != nil { return nil, err } if host == nil { return nil, errors.New("cluster host is nil") } if len(informers) == 0 { return nil, errors.New("no informers are passed") } ctx, cancel := context.WithCancel(ctx) listenAddrs := "" for _, addr := range host.Addrs() { listenAddrs += fmt.Sprintf(" %s/p2p/%s\n", addr, host.ID().Pretty()) } logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs) peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath()) var mdnsSvc mdns.Service if cfg.MDNSInterval > 0 { mdnsSvc = mdns.NewMdnsService(host, mdnsServiceTag, peerManager) err = mdnsSvc.Start() if err != nil { logger.Warnf("mDNS could not be started: %s", err) } } c := &Cluster{ ctx: ctx, cancel: cancel, id: host.ID(), config: cfg, host: host, dht: dht, discovery: mdnsSvc, datastore: datastore, consensus: consensus, apis: apis, ipfs: ipfs, tracker: tracker, monitor: monitor, allocator: allocator, informers: informers, tracer: tracer, alerts: []api.Alert{}, peerManager: peerManager, shutdownB: false, removed: false, doneCh: make(chan struct{}), readyCh: make(chan struct{}), readyB: false, } // Import known cluster peers from peerstore file and config. Set // a non permanent TTL. c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL) c.peerManager.ImportPeers(c.config.PeerAddresses, false, peerstore.AddressTTL) // Attempt to connect to some peers (up to bootstrapCount) connectedPeers := c.peerManager.Bootstrap(bootstrapCount) // We cannot warn when count is low as this as this is normal if going // to Join() later. logger.Debugf("bootstrap count %d", len(connectedPeers)) // Log a ping metric for every connected peer. This will make them // visible as peers without having to wait for them to send one. for _, p := range connectedPeers { if err := c.logPingMetric(ctx, p); err != nil { logger.Warn(err) } } // After setupRPC components can do their tasks with a fully operative // routed libp2p host with some connections and a working DHT (hopefully). err = c.setupRPC() if err != nil { c.Shutdown(ctx) return nil, err } c.setupRPCClients() // Note: It is very important to first call Add() once in a non-racy // place c.wg.Add(1) go func() { defer c.wg.Done() c.ready(ReadyTimeout) c.run() }() return c, nil } func (c *Cluster) setupRPC() error { rpcServer, err := newRPCServer(c) if err != nil { return err } c.rpcServer = rpcServer var rpcClient *rpc.Client if c.config.Tracing { csh := &ocgorpc.ClientHandler{} rpcClient = rpc.NewClientWithServer( c.host, version.RPCProtocol, rpcServer, rpc.WithClientStatsHandler(csh), ) } else { rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer) } c.rpcClient = rpcClient return nil } func (c *Cluster) setupRPCClients() { c.ipfs.SetClient(c.rpcClient) c.tracker.SetClient(c.rpcClient) for _, api := range c.apis { api.SetClient(c.rpcClient) } c.consensus.SetClient(c.rpcClient) c.monitor.SetClient(c.rpcClient) c.allocator.SetClient(c.rpcClient) for _, informer := range c.informers { informer.SetClient(c.rpcClient) } } // watchPinset triggers recurrent operations that loop on the pinset. func (c *Cluster) watchPinset() { ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset") defer span.End() stateSyncTimer := time.NewTimer(c.config.StateSyncInterval) // Upon start, every item in the state that is not pinned will appear // as PinError when doing a Status, we should proceed to recover // (try pinning) all of those right away. recoverTimer := time.NewTimer(0) // 0 so that it does an initial recover right away // This prevents doing an StateSync while doing a RecoverAllLocal, // which is intended behavior as for very large pinsets for { select { case <-stateSyncTimer.C: logger.Debug("auto-triggering StateSync()") c.StateSync(ctx) stateSyncTimer.Reset(c.config.StateSyncInterval) case <-recoverTimer.C: logger.Debug("auto-triggering RecoverAllLocal()") out := make(chan api.PinInfo, 1024) go func() { for range out { } }() err := c.RecoverAllLocal(ctx, out) if err != nil { logger.Error(err) } recoverTimer.Reset(c.config.PinRecoverInterval) case <-c.ctx.Done(): if !stateSyncTimer.Stop() { <-stateSyncTimer.C } if !recoverTimer.Stop() { <-recoverTimer.C } return } } } // returns the smallest ttl from the metrics pushed by the informer. func (c *Cluster) sendInformerMetrics(ctx context.Context, informer Informer) (time.Duration, error) { ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric") defer span.End() var minTTL time.Duration var errors error metrics := informer.GetMetrics(ctx) if len(metrics) == 0 { logger.Errorf("informer %s produced no metrics", informer.Name()) return minTTL, nil } for _, metric := range metrics { if metric.Discard() { // do not publish invalid metrics // the tags informer creates an invalid metric // when no tags are defined. continue } metric.Peer = c.id ttl := metric.GetTTL() if ttl > 0 && (ttl < minTTL || minTTL == 0) { minTTL = ttl } err := c.monitor.PublishMetric(ctx, metric) if multierr.AppendInto(&errors, err) { logger.Warnf("error sending metric %s: %s", metric.Name, err) } } return minTTL, errors } func (c *Cluster) sendInformersMetrics(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics") defer span.End() var errors error for _, informer := range c.informers { _, err := c.sendInformerMetrics(ctx, informer) if multierr.AppendInto(&errors, err) { logger.Warnf("informer %s did not send all metrics", informer.Name()) } } return errors } // pushInformerMetrics loops and publishes informers metrics using the // cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error // occurs, they are pushed at a TTL/4 rate. func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) { ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics") defer span.End() timer := time.NewTimer(0) // fire immediately first // retries counts how many retries we have made retries := 0 // retryWarnMod controls how often do we log // "error broadcasting metric". // It will do it in the first error, and then on every // 10th. retryWarnMod := 10 for { select { case <-ctx.Done(): return case <-timer.C: // wait } minTTL, err := c.sendInformerMetrics(ctx, informer) if minTTL == 0 { minTTL = 30 * time.Second } if err != nil { if (retries % retryWarnMod) == 0 { logger.Errorf("error broadcasting metric: %s", err) retries++ } // retry sooner timer.Reset(minTTL / 4) continue } retries = 0 // send metric again in TTL/2 timer.Reset(minTTL / 2) } } func (c *Cluster) sendPingMetric(ctx context.Context) (api.Metric, error) { ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric") defer span.End() id := c.ID(ctx) newPingVal := pingValue{ Peername: id.Peername, IPFSID: id.IPFS.ID, IPFSAddresses: publicIPFSAddresses(id.IPFS.Addresses), } if c.curPingVal.Valid() && !newPingVal.Valid() { // i.e. ipfs down newPingVal = c.curPingVal // use last good value } c.curPingVal = newPingVal v, err := json.Marshal(newPingVal) if err != nil { logger.Error(err) // continue anyways } metric := api.Metric{ Name: pingMetricName, Peer: c.id, Valid: true, Value: string(v), } metric.SetTTL(c.config.MonitorPingInterval * 2) return metric, c.monitor.PublishMetric(ctx, metric) } // logPingMetric logs a ping metric as if it had been sent from PID. It is // used to make peers appear available as soon as we connect to them (without // having to wait for them to broadcast a metric). // // We avoid specifically sending a metric to a peer when we "connect" to it // because: a) this requires an extra. OPEN RPC endpoint (LogMetric) that can // be called by everyone b) We have no way of verifying that the peer ID in a // metric pushed is actually the issuer of the metric (something the regular // "pubsub" way of pushing metrics allows (by verifying the signature on the // message). Thus, this reduces chances of abuse until we have something // better. func (c *Cluster) logPingMetric(ctx context.Context, pid peer.ID) error { m := api.Metric{ Name: pingMetricName, Peer: pid, Valid: true, } m.SetTTL(c.config.MonitorPingInterval * 2) return c.monitor.LogMetric(ctx, m) } func (c *Cluster) pushPingMetrics(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics") defer span.End() ticker := time.NewTicker(c.config.MonitorPingInterval) for { select { case <-ctx.Done(): return default: } c.sendPingMetric(ctx) select { case <-ctx.Done(): return case <-ticker.C: } } } // Alerts returns the last alerts recorded by this cluster peer with the most // recent first. func (c *Cluster) Alerts() []api.Alert { c.alertsMux.Lock() alerts := make([]api.Alert, len(c.alerts)) { total := len(alerts) for i, a := range c.alerts { alerts[total-1-i] = a } } c.alertsMux.Unlock() return alerts } // read the alerts channel from the monitor and triggers repins func (c *Cluster) alertsHandler() { for { select { case <-c.ctx.Done(): return case alrt := <-c.monitor.Alerts(): // Follower peers do not care about alerts. // They can do nothing about them. if c.config.FollowerMode { continue } logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer) c.alertsMux.Lock() { if len(c.alerts) > maxAlerts { c.alerts = c.alerts[:0] } c.alerts = append(c.alerts, alrt) } c.alertsMux.Unlock() if alrt.Name != pingMetricName { continue // only handle ping alerts } if c.config.DisableRepinning { logger.Debugf("repinning is disabled. Will not re-allocate pins on alerts") return } cState, err := c.consensus.State(c.ctx) if err != nil { logger.Warn(err) return } distance, err := c.distances(c.ctx, alrt.Peer) if err != nil { logger.Warn(err) return } pinCh := make(chan api.Pin, 1024) go func() { err = cState.List(c.ctx, pinCh) if err != nil { logger.Warn(err) } }() for pin := range pinCh { if containsPeer(pin.Allocations, alrt.Peer) && distance.isClosest(pin.Cid) { c.repinFromPeer(c.ctx, alrt.Peer, pin) } } } } } // detects any changes in the peerset and saves the configuration. When it // detects that we have been removed from the peerset, it shuts down this peer. func (c *Cluster) watchPeers() { ticker := time.NewTicker(c.config.PeerWatchInterval) defer ticker.Stop() for { select { case <-c.ctx.Done(): return default: } select { case <-c.ctx.Done(): return case <-ticker.C: //logger.Debugf("%s watching peers", c.id) hasMe := false peers, err := c.consensus.Peers(c.ctx) if err != nil { logger.Error(err) continue } for _, p := range peers { if p == c.id { hasMe = true break } } if !hasMe { c.shutdownLock.Lock() defer c.shutdownLock.Unlock() logger.Info("peer no longer in peerset. Initiating shutdown") c.removed = true go c.Shutdown(c.ctx) return } } } } // reBootstrap regularly attempts to bootstrap (re-connect to peers from the // peerstore). This should ensure that we auto-recover from situations in // which the network was completely gone and we lost all peers. func (c *Cluster) reBootstrap() { ticker := time.NewTicker(reBootstrapInterval) defer ticker.Stop() for { select { case <-c.ctx.Done(): return case <-ticker.C: connected := c.peerManager.Bootstrap(bootstrapCount) for _, p := range connected { logger.Infof("reconnected to %s", p) } } } } // find all Cids pinned to a given peer and triggers re-pins on them. func (c *Cluster) vacatePeer(ctx context.Context, p peer.ID) { ctx, span := trace.StartSpan(ctx, "cluster/vacatePeer") defer span.End() if c.config.DisableRepinning { logger.Warnf("repinning is disabled. Will not re-allocate cids from %s", p.Pretty()) return } cState, err := c.consensus.State(ctx) if err != nil { logger.Warn(err) return } pinCh := make(chan api.Pin, 1024) go func() { err = cState.List(ctx, pinCh) if err != nil { logger.Warn(err) } }() for pin := range pinCh { if containsPeer(pin.Allocations, p) { c.repinFromPeer(ctx, p, pin) } } } // repinFromPeer triggers a repin on a given pin object blacklisting one of the // allocations. func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin api.Pin) { ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer") defer span.End() pin.Allocations = nil // force re-allocations // note that pin() should not result in different allocations // if we are not under the replication-factor min. _, ok, err := c.pin(ctx, pin, []peer.ID{p}) if ok && err == nil { logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty()) } } // run launches some go-routines which live throughout the cluster's life func (c *Cluster) run() { c.wg.Add(1) go func() { defer c.wg.Done() c.watchPinset() }() c.wg.Add(1) go func() { defer c.wg.Done() c.pushPingMetrics(c.ctx) }() c.wg.Add(len(c.informers)) for _, informer := range c.informers { go func(inf Informer) { defer c.wg.Done() c.pushInformerMetrics(c.ctx, inf) }(informer) } c.wg.Add(1) go func() { defer c.wg.Done() c.watchPeers() }() c.wg.Add(1) go func() { defer c.wg.Done() c.alertsHandler() }() c.wg.Add(1) go func() { defer c.wg.Done() c.reBootstrap() }() } func (c *Cluster) ready(timeout time.Duration) { ctx, span := trace.StartSpan(c.ctx, "cluster/ready") defer span.End() // We bootstrapped first because with dirty state consensus // may have a peerset and not find a leader so we cannot wait // for it. timer := time.NewTimer(timeout) select { case <-timer.C: logger.Error("***** ipfs-cluster consensus start timed out (tips below) *****") logger.Error(` ************************************************** This peer was not able to become part of the cluster. This might be due to one or several causes: - Check the logs above this message for errors - Check that there is connectivity to the "peers" multiaddresses - Check that all cluster peers are using the same "secret" - Check that this peer is reachable on its "listen_multiaddress" by all peers - Check that the current cluster is healthy (has a leader). Otherwise make sure to start enough peers so that a leader election can happen. - Check that the peer(s) you are trying to connect to is running the same version of IPFS-cluster. ************************************************** `) c.Shutdown(ctx) return case <-c.consensus.Ready(ctx): // Consensus ready means the state is up to date. case <-c.ctx.Done(): return } // Cluster is ready. peers, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) c.Shutdown(ctx) return } logger.Info("Cluster Peers (without including ourselves):") if len(peers) == 1 { logger.Info(" - No other peers") } for _, p := range peers { if p != c.id { logger.Infof(" - %s", p.Pretty()) } } close(c.readyCh) c.shutdownLock.Lock() c.readyB = true c.shutdownLock.Unlock() logger.Info("** IPFS Cluster is READY **") daemon.SdNotify(false, daemon.SdNotifyReady) } // Ready returns a channel which signals when this peer is // fully initialized (including consensus). func (c *Cluster) Ready() <-chan struct{} { return c.readyCh } // Shutdown performs all the necessary operations to shutdown // the IPFS Cluster peer: // * Save peerstore with the current peers // * Remove itself from consensus when LeaveOnShutdown is set // * It Shutdowns all the components // * Collects all goroutines // // Shutdown does not close the libp2p host, the DHT, the datastore or // generally anything that Cluster did not create. func (c *Cluster) Shutdown(ctx context.Context) error { _, span := trace.StartSpan(ctx, "cluster/Shutdown") defer span.End() ctx = trace.NewContext(c.ctx, span) c.shutdownLock.Lock() defer c.shutdownLock.Unlock() if c.shutdownB { logger.Debug("Cluster is already shutdown") return nil } logger.Info("shutting down Cluster") // Shutdown APIs first, avoids more requests coming through. for _, api := range c.apis { if err := api.Shutdown(ctx); err != nil { logger.Errorf("error stopping API: %s", err) return err } } // Cancel discovery service (this shutdowns announcing). Handling // entries is canceled along with the context below. if c.discovery != nil { c.discovery.Close() } // Try to store peerset file for all known peers whatsoever // if we got ready (otherwise, don't overwrite anything) if c.readyB { // Ignoring error since it's a best-effort c.peerManager.SavePeerstoreForPeers(c.host.Peerstore().Peers()) } // Only attempt to leave if: // - consensus is initialized // - cluster was ready (no bootstrapping error) // - We are not removed already (means watchPeers() called us) if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed { c.removed = true _, err := c.consensus.Peers(ctx) if err == nil { // best effort logger.Warn("attempting to leave the cluster. This may take some seconds") err := c.consensus.RmPeer(ctx, c.id) if err != nil { logger.Error("leaving cluster: " + err.Error()) } } } if con := c.consensus; con != nil { if err := con.Shutdown(ctx); err != nil { logger.Errorf("error stopping consensus: %s", err) return err } } // We left the cluster or were removed. Remove any consensus-specific // state. if c.removed && c.readyB { err := c.consensus.Clean(ctx) if err != nil { logger.Error("cleaning consensus: ", err) } } if err := c.monitor.Shutdown(ctx); err != nil { logger.Errorf("error stopping monitor: %s", err) return err } if err := c.ipfs.Shutdown(ctx); err != nil { logger.Errorf("error stopping IPFS Connector: %s", err) return err } if err := c.tracker.Shutdown(ctx); err != nil { logger.Errorf("error stopping PinTracker: %s", err) return err } for _, inf := range c.informers { if err := inf.Shutdown(ctx); err != nil { logger.Errorf("error stopping informer: %s", err) return err } } if err := c.tracer.Shutdown(ctx); err != nil { logger.Errorf("error stopping Tracer: %s", err) return err } c.cancel() c.wg.Wait() c.shutdownB = true close(c.doneCh) return nil } // Done provides a way to learn if the Peer has been shutdown // (for example, because it has been removed from the Cluster) func (c *Cluster) Done() <-chan struct{} { return c.doneCh } // ID returns information about the Cluster peer func (c *Cluster) ID(ctx context.Context) api.ID { _, span := trace.StartSpan(ctx, "cluster/ID") defer span.End() ctx = trace.NewContext(c.ctx, span) // ignore error since it is included in response object ipfsID, err := c.ipfs.ID(ctx) if err != nil { ipfsID = api.IPFSID{ Error: err.Error(), } } var addrs []api.Multiaddr mAddrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: c.id, Addrs: c.host.Addrs()}) if err == nil { for _, mAddr := range mAddrs { addrs = append(addrs, api.NewMultiaddrWithValue(mAddr)) } } peers := []peer.ID{} // This method might get called very early by a remote peer // and might catch us when consensus is not set if c.consensus != nil { peers, _ = c.consensus.Peers(ctx) } clusterPeerInfos := c.peerManager.PeerInfos(peers) addresses := []api.Multiaddr{} for _, pinfo := range clusterPeerInfos { addrs, err := peer.AddrInfoToP2pAddrs(&pinfo) if err != nil { continue } for _, a := range addrs { addresses = append(addresses, api.NewMultiaddrWithValue(a)) } } id := api.ID{ ID: c.id, // PublicKey: c.host.Peerstore().PubKey(c.id), Addresses: addrs, ClusterPeers: peers, ClusterPeersAddresses: addresses, Version: version.Version.String(), RPCProtocolVersion: version.RPCProtocol, IPFS: ipfsID, Peername: c.config.Peername, } if err != nil { id.Error = err.Error() } return id } // PeerAdd adds a new peer to this Cluster. // // For it to work well, the new peer should be discoverable // (part of our peerstore or connected to one of the existing peers) // and reachable. Since PeerAdd allows to add peers which are // not running, or reachable, it is recommended to call Join() from the // new peer instead. // // The new peer ID will be passed to the consensus // component to be added to the peerset. func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) { _, span := trace.StartSpan(ctx, "cluster/PeerAdd") defer span.End() ctx = trace.NewContext(c.ctx, span) // starting 10 nodes on the same box for testing // causes deadlock and a global lock here // seems to help. c.paMux.Lock() defer c.paMux.Unlock() logger.Debugf("peerAdd called with %s", pid.Pretty()) // Let the consensus layer be aware of this peer err := c.consensus.AddPeer(ctx, pid) if err != nil { logger.Error(err) id := &api.ID{ID: pid, Error: err.Error()} return id, err } logger.Info("Peer added ", pid.Pretty()) addedID, err := c.getIDForPeer(ctx, pid) if err != nil { return addedID, err } if !containsPeer(addedID.ClusterPeers, c.id) { addedID.ClusterPeers = append(addedID.ClusterPeers, c.id) } return addedID, nil } // PeerRemove removes a peer from this Cluster. // // The peer will be removed from the consensus peerset. // This may first trigger repinnings for all content if not disabled. func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error { _, span := trace.StartSpan(ctx, "cluster/PeerRemove") defer span.End() ctx = trace.NewContext(c.ctx, span) // We need to repin before removing the peer, otherwise, it won't // be able to submit the pins. logger.Infof("re-allocating all CIDs directly associated to %s", pid) c.vacatePeer(ctx, pid) err := c.consensus.RmPeer(ctx, pid) if err != nil { logger.Error(err) return err } logger.Info("Peer removed ", pid.Pretty()) return nil } // Join adds this peer to an existing cluster by bootstrapping to a // given multiaddress. It works by calling PeerAdd on the destination // cluster and making sure that the new peer is ready to discover and contact // the rest. func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { _, span := trace.StartSpan(ctx, "cluster/Join") defer span.End() ctx = trace.NewContext(c.ctx, span) logger.Debugf("Join(%s)", addr) // Add peer to peerstore so we can talk to it pid, err := c.peerManager.ImportPeer(addr, false, peerstore.PermanentAddrTTL) if err != nil { return err } if pid == c.id { return nil } // Note that PeerAdd() on the remote peer will // figure out what our real address is (obviously not // ListenAddr). var myID api.ID err = c.rpcClient.CallContext( ctx, pid, "Cluster", "PeerAdd", c.id, &myID, ) if err != nil { logger.Error(err) return err } // Log a fake but valid metric from the peer we are // contacting. This will signal a CRDT component that // we know that peer since we have metrics for it without // having to wait for the next metric round. if err := c.logPingMetric(ctx, pid); err != nil { logger.Warn(err) } // Broadcast our metrics to the world err = c.sendInformersMetrics(ctx) if err != nil { logger.Warn(err) } _, err = c.sendPingMetric(ctx) if err != nil { logger.Warn(err) } // We need to trigger a DHT bootstrap asap for this peer to not be // lost if the peer it bootstrapped to goes down. We do this manually // by triggering 1 round of bootstrap in the background. // Note that our regular bootstrap process is still running in the // background since we created the cluster. c.wg.Add(1) go func() { defer c.wg.Done() select { case err := <-c.dht.LAN.RefreshRoutingTable(): if err != nil { // this error is quite chatty // on single peer clusters logger.Debug(err) } case <-c.ctx.Done(): return } select { case err := <-c.dht.WAN.RefreshRoutingTable(): if err != nil { // this error is quite chatty // on single peer clusters logger.Debug(err) } case <-c.ctx.Done(): return } }() // ConnectSwarms in the background after a while, when we have likely // received some metrics. time.AfterFunc(c.config.MonitorPingInterval, func() { c.ipfs.ConnectSwarms(ctx) }) // wait for leader and for state to catch up // then sync err = c.consensus.WaitForSync(ctx) if err != nil { logger.Error(err) return err } // Start pinning items in the state that are not on IPFS yet. out := make(chan api.PinInfo, 1024) // discard outputs go func() { for range out { } }() go c.RecoverAllLocal(ctx, out) logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty()) return nil } // Distances returns a distance checker using current trusted peers. // It can optionally receive a peer ID to exclude from the checks. func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChecker, error) { trustedPeers, err := c.getTrustedPeers(ctx, exclude) if err != nil { logger.Error("could not get trusted peers:", err) return nil, err } return &distanceChecker{ local: c.id, otherPeers: trustedPeers, cache: make(map[peer.ID]distance, len(trustedPeers)+1), }, nil } // StateSync performs maintenance tasks on the global state that require // looping through all the items. It is triggered automatically on // StateSyncInterval. Currently it: // * Sends unpin for expired items for which this peer is "closest" // (skipped for follower peers) func (c *Cluster) StateSync(ctx context.Context) error { _, span := trace.StartSpan(ctx, "cluster/StateSync") defer span.End() logger.Debug("StateSync") ctx = trace.NewContext(c.ctx, span) if c.config.FollowerMode { return nil } cState, err := c.consensus.State(ctx) if err != nil { return err } timeNow := time.Now() // Only trigger pin operations if we are the closest with respect to // other trusted peers. We cannot know if our peer ID is trusted by // other peers in the Cluster. This assumes yes. Setting FollowerMode // is a way to assume the opposite and skip this completely. distance, err := c.distances(ctx, "") if err != nil { return err // could not list peers } clusterPins := make(chan api.Pin, 1024) go func() { err = cState.List(ctx, clusterPins) if err != nil { logger.Error(err) } }() // Unpin expired items when we are the closest peer to them. for p := range clusterPins { if p.ExpiredAt(timeNow) && distance.isClosest(p.Cid) { logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt) if _, err := c.Unpin(ctx, p.Cid); err != nil { logger.Error(err) } } } return nil } // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers on // the out channel. This is done by broacasting a StatusAll to all peers. If // an error happens, it is returned. This method blocks until it finishes. The // operation can be aborted by canceling the context. func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error { _, span := trace.StartSpan(ctx, "cluster/StatusAll") defer span.End() ctx = trace.NewContext(c.ctx, span) in := make(chan api.TrackerStatus, 1) in <- filter close(in) return c.globalPinInfoStream(ctx, "PinTracker", "StatusAll", in, out) } // StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on // the out channel. It blocks until finished. func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error { _, span := trace.StartSpan(ctx, "cluster/StatusAllLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.tracker.StatusAll(ctx, filter, out) } // Status returns the GlobalPinInfo for a given Cid as fetched from all // current peers. If an error happens, the GlobalPinInfo should contain // as much information as could be fetched from the other peers. func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Status") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoCid(ctx, "PinTracker", "Status", h) } // StatusLocal returns this peer's PinInfo for a given Cid. func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo { _, span := trace.StartSpan(ctx, "cluster/StatusLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.tracker.Status(ctx, h) } // used for RecoverLocal and SyncLocal. func (c *Cluster) localPinInfoOp( ctx context.Context, h api.Cid, f func(context.Context, api.Cid) (api.PinInfo, error), ) (pInfo api.PinInfo, err error) { ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp") defer span.End() cids, err := c.cidsFromMetaPin(ctx, h) if err != nil { return api.PinInfo{}, err } for _, ci := range cids { pInfo, err = f(ctx, ci) if err != nil { logger.Error("tracker.SyncCid() returned with error: ", err) logger.Error("Is the ipfs daemon running?") break } } // return the last pInfo/err, should be the root Cid if everything ok return pInfo, err } // RecoverAll triggers a RecoverAllLocal operation on all peers and returns // GlobalPinInfo objets for all recovered items. This method blocks until // finished. Operation can be aborted by canceling the context. func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error { _, span := trace.StartSpan(ctx, "cluster/RecoverAll") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out) } // RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked // by this peer. // // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. // // It returns the list of pins that were re-queued for pinning on the out // channel. It blocks until done. // // RecoverAllLocal is called automatically every PinRecoverInterval. func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error { _, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.tracker.RecoverAll(ctx, out) } // Recover triggers a recover operation for a given Cid in all // cluster peers. // // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Recover") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h) } // RecoverLocal triggers a recover operation for a given Cid in this peer only. // It returns the updated PinInfo, after recovery. // // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/RecoverLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) return c.localPinInfoOp(ctx, h, c.tracker.Recover) } // Pins sends pins on the given out channel as it iterates the full // pinset (current global state). This is the source of truth as to which pins // are managed and their allocation, but does not indicate if the item is // successfully pinned. For that, use the Status*() methods. // // The operation can be aborted by canceling the context. This methods blocks // until the operation has completed. func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error { _, span := trace.StartSpan(ctx, "cluster/Pins") defer span.End() ctx = trace.NewContext(c.ctx, span) cState, err := c.consensus.State(ctx) if err != nil { logger.Error(err) return err } return cState.List(ctx, out) } // pinsSlice returns the list of Cids managed by Cluster and which are part // of the current global state. This is the source of truth as to which // pins are managed and their allocation, but does not indicate if // the item is successfully pinned. For that, use StatusAll(). // // It is recommended to use PinsChannel(), as this method is equivalent to // loading the full pinset in memory! func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) { out := make(chan api.Pin, 1024) var err error go func() { err = c.Pins(ctx, out) }() var pins []api.Pin for pin := range out { pins = append(pins, pin) } return pins, err } // PinGet returns information for a single Cid managed by Cluster. // The information is obtained from the current global state. The // returned api.Pin provides information about the allocations // assigned for the requested Cid, but does not indicate if // the item is successfully pinned. For that, use Status(). PinGet // returns an error if the given Cid is not part of the global state. func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/PinGet") defer span.End() ctx = trace.NewContext(c.ctx, span) st, err := c.consensus.State(ctx) if err != nil { return api.Pin{}, err } pin, err := st.Get(ctx, h) if err != nil { return api.Pin{}, err } return pin, nil } // Pin makes the cluster Pin a Cid. This implies adding the Cid // to the IPFS Cluster peers shared-state. Depending on the cluster // pinning strategy, the PinTracker may then request the IPFS daemon // to pin the Cid. // // Pin returns the Pin as stored in the global state (with the given // allocations and an error if the operation could not be persisted. Pin does // not reflect the success or failure of underlying IPFS daemon pinning // operations which happen in async fashion. // // If the options UserAllocations are non-empty then these peers are pinned // with priority over other peers in the cluster. If the max repl factor is // less than the size of the specified peerset then peers are chosen from this // set in allocation order. If the minimum repl factor is greater than the // size of this set then the remaining peers are allocated in order from the // rest of the cluster. Priority allocations are best effort. If any priority // peers are unavailable then Pin will simply allocate from the rest of the // cluster. // // If the Update option is set, the pin options (including allocations) will // be copied from an existing one. This is equivalent to running PinUpdate. func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Pin") defer span.End() ctx = trace.NewContext(c.ctx, span) pin := api.PinWithOpts(h, opts) result, _, err := c.pin(ctx, pin, []peer.ID{}) return result, err } // sets the default replication factor in a pin when it's set to 0 func (c *Cluster) setupReplicationFactor(pin api.Pin) (api.Pin, error) { rplMin := pin.ReplicationFactorMin rplMax := pin.ReplicationFactorMax if rplMin == 0 { rplMin = c.config.ReplicationFactorMin pin.ReplicationFactorMin = rplMin } if rplMax == 0 { rplMax = c.config.ReplicationFactorMax pin.ReplicationFactorMax = rplMax } // When pinning everywhere, remove all allocations. // Allocations may have been preset by the adder // for the cases when the replication factor is > -1. // Fixes part of #1319: allocations when adding // are kept. if pin.IsPinEverywhere() { pin.Allocations = nil } return pin, isReplicationFactorValid(rplMin, rplMax) } // basic checks on the pin type to check it's well-formed. func checkPinType(pin api.Pin) error { switch pin.Type { case api.DataType: if pin.Reference != nil { return errors.New("data pins should not reference other pins") } case api.ShardType: if pin.MaxDepth != 1 { return errors.New("must pin shards go depth 1") } // FIXME: indirect shard pins could have max-depth 2 // FIXME: repinning a shard type will overwrite replication // factor from previous: // if existing.ReplicationFactorMin != rplMin || // existing.ReplicationFactorMax != rplMax { // return errors.New("shard update with wrong repl factors") //} case api.ClusterDAGType: if pin.MaxDepth != 0 { return errors.New("must pin roots directly") } if pin.Reference == nil { return errors.New("clusterDAG pins should reference a Meta pin") } case api.MetaType: if len(pin.Allocations) != 0 { return errors.New("meta pin should not specify allocations") } if pin.Reference == nil { return errors.New("metaPins should reference a ClusterDAG") } default: return errors.New("unrecognized pin type") } return nil } // setupPin ensures that the Pin object is fit for pinning. We check // and set the replication factors and ensure that the pinType matches the // metadata consistently. func (c *Cluster) setupPin(ctx context.Context, pin, existing api.Pin) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/setupPin") defer span.End() var err error pin, err = c.setupReplicationFactor(pin) if err != nil { return pin, err } if !pin.ExpireAt.IsZero() && pin.ExpireAt.Before(time.Now()) { return pin, errors.New("pin.ExpireAt set before current time") } if !existing.Defined() { return pin, nil } // If an pin CID is already pin, we do a couple more checks if existing.Type != pin.Type { msg := "cannot repin CID with different tracking method, " msg += "clear state with pin rm to proceed. " msg += "New: %s. Was: %s" return pin, fmt.Errorf(msg, pin.Type, existing.Type) } if existing.Mode == api.PinModeRecursive && pin.Mode != api.PinModeRecursive { msg := "cannot repin a CID which is already pinned in " msg += "recursive mode (new pin is pinned as %s). Unpin it first." return pin, fmt.Errorf(msg, pin.Mode) } return pin, checkPinType(pin) } // pin performs the actual pinning and supports a blacklist to be able to // evacuate a node and returns the pin object that it tried to pin, whether // the pin was submitted to the consensus layer or skipped (due to error or to // the fact that it was already valid) and error. // // This is the method called by the Cluster.Pin RPC endpoint. func (c *Cluster) pin( ctx context.Context, pin api.Pin, blacklist []peer.ID, ) (api.Pin, bool, error) { ctx, span := trace.StartSpan(ctx, "cluster/pin") defer span.End() if c.config.FollowerMode { return api.Pin{}, false, errFollowerMode } if !pin.Cid.Defined() { return pin, false, errors.New("bad pin object") } // Handle pin updates when the option is set if update := pin.PinUpdate; update.Defined() && !update.Equals(pin.Cid) { pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions) return pin, true, err } existing, err := c.PinGet(ctx, pin.Cid) if err != nil && err != state.ErrNotFound { return pin, false, err } pin, err = c.setupPin(ctx, pin, existing) if err != nil { return pin, false, err } // Set the Pin timestamp to now(). This is not an user-controllable // "option". pin.Timestamp = time.Now() if pin.Type == api.MetaType { return pin, true, c.consensus.LogPin(ctx, pin) } // We did not change ANY options and the pin exists so we just repin // what there is without doing new allocations. While this submits // pins to the consensus layer even if they are, this should trigger the // pin tracker and allows users to get re-pin operations by re-adding // without having to use recover, which is naturally expected. // // blacklist is set on repinFromPeer having any blacklisted peers // means we are repinning and need to trigger allocate(), therefore we // can't overwrite the incoming pin (which has Allocations set to // nil). if existing.Defined() && pin.PinOptions.Equals(existing.PinOptions) && len(blacklist) == 0 { pin = existing } // Usually allocations are unset when pinning normally, however, the // allocations may have been preset by the adder in which case they // need to be respected. Whenever allocations are set. We don't // re-allocate. repinFromPeer() unsets allocations for this reason. // allocate() will check which peers are currently allocated // and try to respect them. if len(pin.Allocations) == 0 { // If replication factor is -1, this will return empty // allocations. allocs, err := c.allocate( ctx, pin.Cid, existing, pin.ReplicationFactorMin, pin.ReplicationFactorMax, blacklist, pin.UserAllocations, ) if err != nil { return pin, false, err } pin.Allocations = allocs } // If this is true, replication factor should be -1. if len(pin.Allocations) == 0 { logger.Infof("pinning %s everywhere:", pin.Cid) } else { logger.Infof("pinning %s on %s:", pin.Cid, pin.Allocations) } return pin, true, c.consensus.LogPin(ctx, pin) } // Unpin removes a previously pinned Cid from Cluster. It returns // the global state Pin object as it was stored before removal, or // an error if it was not possible to update the global state. // // Unpin does not reflect the success or failure of underlying IPFS daemon // unpinning operations, which happen in async fashion. func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Unpin") defer span.End() ctx = trace.NewContext(c.ctx, span) if c.config.FollowerMode { return api.Pin{}, errFollowerMode } logger.Info("IPFS cluster unpinning:", h) pin, err := c.PinGet(ctx, h) if err != nil { return api.Pin{}, err } switch pin.Type { case api.DataType: return pin, c.consensus.LogUnpin(ctx, pin) case api.ShardType: err := "cannot unpin a shard directly. Unpin content root CID instead" return pin, errors.New(err) case api.MetaType: // Unpin cluster dag and referenced shards err := c.unpinClusterDag(pin) if err != nil { return pin, err } return pin, c.consensus.LogUnpin(ctx, pin) case api.ClusterDAGType: err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead" return pin, errors.New(err) default: return pin, errors.New("unrecognized pin type") } } // unpinClusterDag unpins the clusterDAG metadata node and the shard metadata // nodes that it references. It handles the case where multiple parents // reference the same metadata node, only unpinning those nodes without // existing references func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag") defer span.End() cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid) if err != nil { return err } // TODO: FIXME: potentially unpinning shards which are referenced // by other clusterDAGs. for _, ci := range cids { err = c.consensus.LogUnpin(ctx, api.PinCid(ci)) if err != nil { return err } } return nil } // PinUpdate pins a new CID based on an existing cluster Pin. The allocations // and most pin options (replication factors) are copied from the existing // Pin. The options object can be used to set the Name for the new pin and // might support additional options in the future. // // The from pin is NOT unpinned upon completion. The new pin might take // advantage of efficient pin/update operation on IPFS-side (if the // IPFSConnector supports it - the default one does). This may offer // significant speed when pinning items which are similar to previously pinned // content. func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts api.PinOptions) (api.Pin, error) { existing, err := c.PinGet(ctx, from) if err != nil { // including when the existing pin is not found return api.Pin{}, err } // Hector: I am not sure whether it has any point to update something // like a MetaType. if existing.Type != api.DataType { return api.Pin{}, errors.New("this pin type cannot be updated") } existing.Cid = to existing.PinUpdate = from existing.Timestamp = time.Now() if opts.Name != "" { existing.Name = opts.Name } if !opts.ExpireAt.IsZero() && opts.ExpireAt.After(time.Now()) { existing.ExpireAt = opts.ExpireAt } return existing, c.consensus.LogPin(ctx, existing) } // PinPath pins an CID resolved from its IPFS Path. It returns the resolved // Pin object. func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/PinPath") defer span.End() ctx = trace.NewContext(c.ctx, span) ci, err := c.ipfs.Resolve(ctx, path) if err != nil { return api.Pin{}, err } return c.Pin(ctx, ci, opts) } // UnpinPath unpins a CID resolved from its IPFS Path. If returns the // previously pinned Pin object. func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/UnpinPath") defer span.End() ctx = trace.NewContext(c.ctx, span) ci, err := c.ipfs.Resolve(ctx, path) if err != nil { return api.Pin{}, err } return c.Unpin(ctx, ci) } // AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (api.Cid, error) { // TODO: add context param and tracing var dags adder.ClusterDAGService if params.Shard { dags = sharding.New(ctx, c.rpcClient, params, nil) } else { dags = single.New(ctx, c.rpcClient, params, params.Local) } add := adder.New(dags, params, nil) return add.FromMultipart(ctx, reader) } // Version returns the current IPFS Cluster version. func (c *Cluster) Version() string { return version.Version.String() } // Peers returns the IDs of the members of this Cluster on the out channel. // This method blocks until it has finished. func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) { _, span := trace.StartSpan(ctx, "cluster/Peers") defer span.End() ctx = trace.NewContext(c.ctx, span) peers, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) logger.Error("an empty list of peers will be returned") close(out) return } c.peersWithFilter(ctx, peers, out) } // requests IDs from a given number of peers. func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID, out chan<- api.ID) { defer close(out) // We should be done relatively quickly with this call. Otherwise // report errors. timeout := 15 * time.Second ctxCall, cancel := context.WithTimeout(ctx, timeout) defer cancel() in := make(chan struct{}) close(in) idsOut := make(chan api.ID, len(peers)) errCh := make(chan []error, 1) go func() { defer close(errCh) errCh <- c.rpcClient.MultiStream( ctxCall, peers, "Cluster", "IDStream", in, idsOut, ) }() // Unfortunately, we need to use idsOut as intermediary channel // because it is closed when MultiStream ends and we cannot keep // adding things on it (the errors below). for id := range idsOut { select { case <-ctx.Done(): logger.Errorf("Peers call aborted: %s", ctx.Err()) return case out <- id: } } // ErrCh will always be closed on context cancellation too. errs := <-errCh for i, err := range errs { if err == nil { continue } if rpc.IsAuthorizationError(err) { continue } select { case <-ctx.Done(): logger.Errorf("Peers call aborted: %s", ctx.Err()) case out <- api.ID{ ID: peers[i], Error: err.Error(), }: } } } // getTrustedPeers gives listed of trusted peers except the current peer and // the excluded peer if provided. func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.ID, error) { peers, err := c.consensus.Peers(ctx) if err != nil { return nil, err } trustedPeers := make([]peer.ID, 0, len(peers)) for _, p := range peers { if p == c.id || p == exclude || !c.consensus.IsTrustedPeer(ctx, p) { continue } trustedPeers = append(trustedPeers, p) } return trustedPeers, nil } func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h api.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { for _, p := range peers { pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p)) gpin.Add(api.PinInfo{ Cid: h, Name: pin.Name, Allocations: pin.Allocations, Origins: pin.Origins, Created: pin.Timestamp, Metadata: pin.Metadata, Peer: p, PinInfoShort: api.PinInfoShort{ PeerName: pv.Peername, IPFS: pv.IPFSID, IPFSAddresses: pv.IPFSAddresses, Status: status, TS: t, }, }) } } func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h api.Cid) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") defer span.End() // The object we will return gpin := api.GlobalPinInfo{} // allocated peers, we will contact them through rpc var dests []peer.ID // un-allocated peers, we will set remote status var remote []peer.ID timeNow := time.Now() // If pin is not part of the pinset, mark it unpinned pin, err := c.PinGet(ctx, h) if err != nil && err != state.ErrNotFound { logger.Error(err) return api.GlobalPinInfo{}, err } // When NotFound return directly with an unpinned // status. if err == state.ErrNotFound { var members []peer.ID if c.config.FollowerMode { members = []peer.ID{c.host.ID()} } else { members, err = c.consensus.Peers(ctx) if err != nil { logger.Error(err) return api.GlobalPinInfo{}, err } } c.setTrackerStatus( &gpin, h, members, api.TrackerStatusUnpinned, api.PinCid(h), timeNow, ) return gpin, nil } // The pin exists. gpin.Cid = h gpin.Name = pin.Name // Make the list of peers that will receive the request. if c.config.FollowerMode { // during follower mode return only local status. dests = []peer.ID{c.host.ID()} remote = []peer.ID{} } else { members, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) return api.GlobalPinInfo{}, err } if !pin.IsPinEverywhere() { dests = pin.Allocations remote = peersSubtract(members, dests) } else { dests = members remote = []peer.ID{} } } // set status remote on un-allocated peers c.setTrackerStatus(&gpin, h, remote, api.TrackerStatusRemote, pin, timeNow) lenDests := len(dests) replies := make([]api.PinInfo, lenDests) // a globalPinInfo type of request should be relatively fast. We // cannot block response indefinitely due to an unresponsive node. timeout := 15 * time.Second ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, lenDests, timeout) defer rpcutil.MultiCancel(cancels) errs := c.rpcClient.MultiCall( ctxs, dests, comp, method, h, rpcutil.CopyPinInfoToIfaces(replies), ) for i, r := range replies { e := errs[i] // No error. Parse and continue if e == nil { gpin.Add(r) continue } if rpc.IsAuthorizationError(e) { logger.Debug("rpc auth error:", e) continue } // Deal with error cases (err != nil): wrap errors in PinInfo logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e) pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i])) gpin.Add(api.PinInfo{ Cid: h, Name: pin.Name, Peer: dests[i], Allocations: pin.Allocations, Origins: pin.Origins, Created: pin.Timestamp, Metadata: pin.Metadata, PinInfoShort: api.PinInfoShort{ PeerName: pv.Peername, IPFS: pv.IPFSID, IPFSAddresses: pv.IPFSAddresses, Status: api.TrackerStatusClusterError, TS: timeNow, Error: e.Error(), }, }) } return gpin, nil } func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, inChan interface{}, out chan<- api.GlobalPinInfo) error { defer close(out) ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoStream") defer span.End() if inChan == nil { emptyChan := make(chan struct{}) close(emptyChan) inChan = emptyChan } fullMap := make(map[api.Cid]api.GlobalPinInfo) var members []peer.ID var err error if c.config.FollowerMode { members = []peer.ID{c.host.ID()} } else { members, err = c.consensus.Peers(ctx) if err != nil { logger.Error(err) return err } } // We don't have a good timeout proposal for this. Depending on the // size of the state and the peformance of IPFS and the network, this // may take moderately long. // If we did, this is the place to put it. ctx, cancel := context.WithCancel(ctx) defer cancel() msOut := make(chan api.PinInfo) errsCh := make(chan []error, 1) go func() { defer close(errsCh) errsCh <- c.rpcClient.MultiStream( ctx, members, comp, method, inChan, msOut, ) }() setPinInfo := func(p api.PinInfo) { if !p.Defined() { return } info, ok := fullMap[p.Cid] if !ok { info = api.GlobalPinInfo{} } info.Add(p) // Set the new/updated info fullMap[p.Cid] = info } // make the big collection. for pin := range msOut { setPinInfo(pin) } // This WAITs until MultiStream is DONE. erroredPeers := make(map[peer.ID]string) errs, ok := <-errsCh if ok { for i, err := range errs { if err == nil { continue } if rpc.IsAuthorizationError(err) { logger.Debug("rpc auth error", err) continue } logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], err) erroredPeers[members[i]] = err.Error() } } // Merge any errors for p, msg := range erroredPeers { pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, p)) for c := range fullMap { setPinInfo(api.PinInfo{ Cid: c, Name: "", Peer: p, Allocations: nil, Origins: nil, // Created: // leave unitialized Metadata: nil, PinInfoShort: api.PinInfoShort{ PeerName: pv.Peername, IPFS: pv.IPFSID, IPFSAddresses: pv.IPFSAddresses, Status: api.TrackerStatusClusterError, TS: time.Now(), Error: msg, }, }) } } for _, v := range fullMap { select { case <-ctx.Done(): err := fmt.Errorf("%s.%s aborted: %w", comp, method, ctx.Err()) logger.Error(err) return err case out <- v: } } return nil } func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (*api.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/getIDForPeer") defer span.End() var id api.ID err := c.rpcClient.CallContext( ctx, pid, "Cluster", "ID", struct{}{}, &id, ) if err != nil { logger.Error(err) id.ID = pid id.Error = err.Error() } return &id, err } // cidsFromMetaPin expands a meta-pin and returns a list of Cids that // Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in // that order (the MetaPin is the last element). // It returns a slice with only the given Cid if it's not a known Cid or not a // MetaPin. func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, error) { ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin") defer span.End() cState, err := c.consensus.State(ctx) if err != nil { return nil, err } list := []api.Cid{h} pin, err := cState.Get(ctx, h) if err != nil { return nil, err } if pin.Type != api.MetaType { return list, nil } if pin.Reference == nil { return nil, errors.New("metaPin.Reference is unset") } list = append([]api.Cid{*pin.Reference}, list...) clusterDagPin, err := c.PinGet(ctx, *pin.Reference) if err != nil { return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err) } clusterDagBlock, err := c.ipfs.BlockGet(ctx, clusterDagPin.Cid) if err != nil { return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err) } clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor") if err != nil { return list, fmt.Errorf("error parsing clusterDAG block: %s", err) } for _, l := range clusterDagNode.Links() { list = append([]api.Cid{api.NewCid(l.Cid)}, list...) } return list, nil } // // diffPeers returns the peerIDs added and removed from peers2 in relation to // // peers1 // func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { // m1 := make(map[peer.ID]struct{}) // m2 := make(map[peer.ID]struct{}) // added = make([]peer.ID, 0) // removed = make([]peer.ID, 0) // if peers1 == nil && peers2 == nil { // return // } // if peers1 == nil { // added = peers2 // return // } // if peers2 == nil { // removed = peers1 // return // } // for _, p := range peers1 { // m1[p] = struct{}{} // } // for _, p := range peers2 { // m2[p] = struct{}{} // } // for k := range m1 { // _, ok := m2[k] // if !ok { // removed = append(removed, k) // } // } // for k := range m2 { // _, ok := m1[k] // if !ok { // added = append(added, k) // } // } // return // } // RepoGC performs garbage collection sweep on all peers' IPFS repo. func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) { _, span := trace.StartSpan(ctx, "cluster/RepoGC") defer span.End() ctx = trace.NewContext(c.ctx, span) members, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) return api.GlobalRepoGC{}, err } // to club `RepoGCLocal` responses of all peers into one globalRepoGC := api.GlobalRepoGC{PeerMap: make(map[string]api.RepoGC)} for _, member := range members { var repoGC api.RepoGC err = c.rpcClient.CallContext( ctx, member, "Cluster", "RepoGCLocal", struct{}{}, &repoGC, ) if err == nil { globalRepoGC.PeerMap[peer.Encode(member)] = repoGC continue } if rpc.IsAuthorizationError(err) { logger.Debug("rpc auth error:", err) continue } logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err) pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member)) globalRepoGC.PeerMap[peer.Encode(member)] = api.RepoGC{ Peer: member, Peername: pv.Peername, Keys: []api.IPFSRepoGC{}, Error: err.Error(), } } return globalRepoGC, nil } // RepoGCLocal performs garbage collection only on the local IPFS deamon. func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) { _, span := trace.StartSpan(ctx, "cluster/RepoGCLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) resp, err := c.ipfs.RepoGC(ctx) if err != nil { return api.RepoGC{}, err } resp.Peer = c.id resp.Peername = c.config.Peername return resp, nil }