2022-10-19 22:23:11 +02:00
package ipfscluster
import (
2022-12-18 20:15:13 +01:00
peer "github.com/libp2p/go-libp2p/core/peer"
2022-10-19 22:23:11 +02:00
// This file gathers allocation logic used when pinning or re-pinning
// to find which peers should be allocated to a Cid. Allocation is constrained
// by ReplicationFactorMin and ReplicationFactorMax parameters obtained
// from the Pin object.
// The allocation process has several steps:
// * Find which peers are pinning a CID
// * Obtain the last values for the configured informer metrics from the
// monitor component
// * Divide the metrics between "current" (peers already pinning the CID)
// and "candidates" (peers that could pin the CID), as long as their metrics
// are valid.
// * Given the candidates:
// * Check if we are overpinning an item
// * Check if there are not enough candidates for the "needed" replication
// factor.
// * If there are enough candidates:
// * Call the configured allocator, which sorts the candidates (and
// may veto some depending on the allocation strategy.
// * The allocator returns a list of final candidate peers sorted by
// order of preference.
// * Take as many final candidates from the list as we can, until
// ReplicationFactorMax is reached. Error if there are less than
// ReplicationFactorMin.
// A wrapper to carry peer metrics that have been classified.
type classifiedMetrics struct {
current api.MetricsSet
currentPeers []peer.ID
candidate api.MetricsSet
candidatePeers []peer.ID
priority api.MetricsSet
priorityPeers []peer.ID
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (if rplMin and rplMax
// are > 0, then rplMin <= rplMax).
// It always returns allocations, but if no new allocations are needed,
// it will return the current ones. Note that allocate() does not take
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(ctx context.Context, hash api.Cid, currentPin api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()
if (rplMin + rplMax) == 0 {
return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax)
if rplMin < 0 && rplMax < 0 { // allocate everywhere
return []peer.ID{}, nil
// Figure out who is holding the CID
var currentAllocs []peer.ID
if currentPin.Defined() {
currentAllocs = currentPin.Allocations
// Get Metrics that the allocator is interested on
mSet := make(api.MetricsSet)
metrics := c.allocator.Metrics()
for _, metricName := range metrics {
mSet[metricName] = c.monitor.LatestMetrics(ctx, metricName)
// Filter and divide metrics. The resulting sets only have peers that
// have all the metrics needed and are not blacklisted.
classified := c.filterMetrics(
newAllocs, err := c.obtainAllocations(
if err != nil {
return newAllocs, err
// if current allocations are above the minimal threshold,
// obtainAllocations returns nil and we just leave things as they are.
// This is what makes repinning do nothing if items are still above
// rmin.
if newAllocs == nil {
newAllocs = currentAllocs
return newAllocs, nil
// Given metrics from all informers, split them into 3 MetricsSet:
// - Those corresponding to currently allocated peers
// - Those corresponding to priority allocations
// - Those corresponding to "candidate" allocations
// And return also an slice of the peers in those groups.
// Peers from untrusted peers are left out if configured.
// For a metric/peer to be included in a group, it is necessary that it has
// metrics for all informers.
func (c *Cluster) filterMetrics(ctx context.Context, mSet api.MetricsSet, numMetrics int, currentAllocs, priorityList, blacklist []peer.ID) classifiedMetrics {
curPeersMap := make(map[peer.ID][]api.Metric)
candPeersMap := make(map[peer.ID][]api.Metric)
prioPeersMap := make(map[peer.ID][]api.Metric)
// Divide the metric by current/candidate/prio and by peer
for _, metrics := range mSet {
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
case c.config.PinOnlyOnTrustedPeers && !c.consensus.IsTrustedPeer(ctx, m.Peer):
// discard peer that are not trusted when
// configured.
case containsPeer(currentAllocs, m.Peer):
curPeersMap[m.Peer] = append(curPeersMap[m.Peer], m)
case containsPeer(priorityList, m.Peer):
prioPeersMap[m.Peer] = append(prioPeersMap[m.Peer], m)
candPeersMap[m.Peer] = append(candPeersMap[m.Peer], m)
fillMetricsSet := func(peersMap map[peer.ID][]api.Metric) (api.MetricsSet, []peer.ID) {
mSet := make(api.MetricsSet)
peers := make([]peer.ID, 0, len(peersMap))
// Put the metrics in their sets if peers have metrics for all
// informers Record peers. This relies on LatestMetrics
// returning exactly one metric per peer. Thus, a peer with
// all the needed metrics should have exactly numMetrics.
// Otherwise, they are ignored.
for p, metrics := range peersMap {
if len(metrics) == numMetrics {
for _, m := range metrics {
mSet[m.Name] = append(mSet[m.Name], m)
peers = append(peers, p)
} // otherwise this peer will be ignored.
return mSet, peers
curSet, curPeers := fillMetricsSet(curPeersMap)
candSet, candPeers := fillMetricsSet(candPeersMap)
prioSet, prioPeers := fillMetricsSet(prioPeersMap)
return classifiedMetrics{
current: curSet,
currentPeers: curPeers,
candidate: candSet,
candidatePeers: candPeers,
priority: prioSet,
priorityPeers: prioPeers,
// allocationError logs an allocation error
func allocationError(hash api.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Available candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Available candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
func (c *Cluster) obtainAllocations(
ctx context.Context,
hash api.Cid,
rplMin, rplMax int,
metrics classifiedMetrics,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()
nCurrentValid := len(metrics.currentPeers)
nAvailableValid := len(metrics.candidatePeers) + len(metrics.priorityPeers)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current: %d", nCurrentValid)
logger.Debugf("obtainAllocations: available: %d", nAvailableValid)
logger.Debugf("obtainAllocations: candidates: %d", len(metrics.candidatePeers))
logger.Debugf("obtainAllocations: priority: %d", len(metrics.priorityPeers))
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
// Reminder: rplMin <= rplMax AND >0
if wanted < 0 { // allocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return metrics.currentPeers[0 : len(metrics.currentPeers)+wanted], nil
if needed <= 0 { // allocations are above minimal threshold
// We don't provide any new allocations
return nil, nil
if nAvailableValid < needed { // not enough candidates
return nil, allocationError(hash, needed, wanted, append(metrics.priorityPeers, metrics.candidatePeers...))
// We can allocate from this point. Use the allocator to decide
// on the priority of candidates grab as many as "wanted"
// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
if err != nil {
return nil, logError(err.Error())
logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs)
// check that we have enough as the allocator may have returned
// less candidates than provided.
if got := len(finalAllocs); got < needed {
return nil, allocationError(hash, needed, wanted, finalAllocs)
allocationsToUse := minInt(wanted, len(finalAllocs))
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(metrics.currentPeers, finalAllocs[0:allocationsToUse]...), nil