depot/packages/networking/ipfs-cluster/adder/sharding/shard.go

166 lines
4.4 KiB
Go

package sharding
import (
"context"
"fmt"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
humanize "github.com/dustin/go-humanize"
)
// a shard represents a set of blocks (or bucket) which have been assigned
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
currentSize uint64
sizeLimit uint64
}
func newShard(globalCtx context.Context, ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
allocs, err := adder.BlockAllocate(ctx, rpc, opts)
if err != nil {
return nil, err
}
if opts.ReplicationFactorMin > 0 && len(allocs) == 0 {
// This would mean that the empty cid is part of the shared state somehow.
panic("allocations for new shard cannot be empty without error")
}
if opts.ReplicationFactorMin < 0 {
logger.Warn("Shard is set to replicate everywhere ,which doesn't make sense for sharding")
}
// TODO (hector): get latest metrics for allocations, adjust sizeLimit
// to minimum. This can be done later.
blocks := make(chan api.NodeWithMeta, 256)
return &shard{
ctx: globalCtx,
rpc: rpc,
allocations: allocs,
pinOptions: opts,
bs: adder.NewBlockStreamer(globalCtx, rpc, allocs, blocks),
blocks: blocks,
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
}, nil
}
// AddLink tries to add a new block to this shard if it's not full.
// Returns true if the block was added
func (sh *shard) AddLink(ctx context.Context, c cid.Cid, s uint64) {
linkN := len(sh.dagNode)
linkName := fmt.Sprintf("%d", linkN)
logger.Debugf("shard: add link: %s", linkName)
sh.dagNode[linkName] = c
sh.currentSize += s
}
// Allocations returns the peer IDs on which blocks are put for this shard.
func (sh *shard) Allocations() []peer.ID {
if len(sh.allocations) == 1 && sh.allocations[0] == "" {
return nil
}
return sh.allocations
}
func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case sh.blocks <- adder.IpldNodeToNodeWithMeta(n):
return nil
}
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, error) {
logger.Debugf("shard %d: flush", shardN)
nodes, err := makeDAG(ctx, sh.dagNode)
if err != nil {
return cid.Undef, err
}
for _, n := range nodes {
err = sh.sendBlock(ctx, n)
if err != nil {
close(sh.blocks)
return cid.Undef, err
}
}
close(sh.blocks)
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
case <-sh.bs.Done():
}
if err := sh.bs.Err(); err != nil {
return cid.Undef, err
}
rootCid := nodes[0].Cid()
pin := api.PinWithOpts(api.NewCid(rootCid), sh.pinOptions)
pin.Name = fmt.Sprintf("%s-shard-%d", sh.pinOptions.Name, shardN)
// this sets allocations as priority allocation
pin.Allocations = sh.allocations
pin.Type = api.ShardType
ref := api.NewCid(prev)
pin.Reference = &ref
pin.MaxDepth = 1
pin.ShardSize = sh.Size() // use current size, not the limit
if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph
pin.MaxDepth = 2
}
logger.Infof("shard #%d (%s) completed. Total size: %s. Links: %d",
shardN,
rootCid,
humanize.Bytes(sh.Size()),
len(sh.dagNode),
)
return rootCid, adder.Pin(ctx, sh.rpc, pin)
}
// Size returns this shard's current size.
func (sh *shard) Size() uint64 {
return sh.currentSize
}
// Size returns this shard's size limit.
func (sh *shard) Limit() uint64 {
return sh.sizeLimit
}
// Last returns the last added link. When finishing sharding,
// the last link of the last shard is the data root for the
// full sharded DAG (the CID that would have resulted from
// adding the content to a single IPFS daemon).
func (sh *shard) LastLink() cid.Cid {
l := len(sh.dagNode)
lastLink := fmt.Sprintf("%d", l-1)
return sh.dagNode[lastLink]
}