depot/packages/networking/ipfs-cluster/adder/sharding/dag_service.go

315 lines
8.1 KiB
Go

// Package sharding implements a sharding ClusterDAGService places
// content in different shards while it's being added, creating
// a final Cluster DAG and pinning it.
package sharding
import (
"context"
"errors"
"fmt"
"time"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
humanize "github.com/dustin/go-humanize"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var logger = logging.Logger("shardingdags")
// DAGService is an implementation of a ClusterDAGService which
// shards content while adding among several IPFS Cluster peers,
// creating a Cluster DAG to track and pin that content selectively
// in the IPFS daemons allocated to it.
type DAGService struct {
adder.BaseDAGService
ctx context.Context
rpcClient *rpc.Client
addParams api.AddParams
output chan<- api.AddedOutput
addedSet *cid.Set
// Current shard being built
currentShard *shard
// Last flushed shard CID
previousShard cid.Cid
// shard tracking
shards map[string]cid.Cid
startTime time.Time
totalSize uint64
}
// New returns a new ClusterDAGService, which uses the given rpc client to perform
// Allocate, IPFSStream and Pin requests to other cluster components.
func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, out chan<- api.AddedOutput) *DAGService {
// use a default value for this regardless of what is provided.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
addParams: opts,
output: out,
addedSet: cid.NewSet(),
shards: make(map[string]cid.Cid),
startTime: time.Now(),
}
}
// Add puts the given node in its corresponding shard and sends it to the
// destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
// FIXME: This will grow in memory
if !dgs.addedSet.Visit(node.Cid()) {
return nil
}
return dgs.ingestBlock(ctx, node)
}
// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) {
lastCid, err := dgs.flushCurrentShard(ctx)
if err != nil {
return api.NewCid(lastCid), err
}
if !lastCid.Equals(dataRoot.Cid) {
logger.Warnf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot)
}
clusterDAGNodes, err := makeDAG(ctx, dgs.shards)
if err != nil {
return dataRoot, err
}
// PutDAG to ourselves
blocks := make(chan api.NodeWithMeta, 256)
go func() {
defer close(blocks)
for _, n := range clusterDAGNodes {
select {
case <-ctx.Done():
logger.Error(ctx.Err())
return //abort
case blocks <- adder.IpldNodeToNodeWithMeta(n):
}
}
}()
// Stream these blocks and wait until we are done.
bs := adder.NewBlockStreamer(ctx, dgs.rpcClient, []peer.ID{""}, blocks)
select {
case <-ctx.Done():
return dataRoot, ctx.Err()
case <-bs.Done():
}
if err := bs.Err(); err != nil {
return dataRoot, err
}
clusterDAG := clusterDAGNodes[0].Cid()
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name),
Cid: api.NewCid(clusterDAG),
Size: dgs.totalSize,
Allocations: nil,
})
// Pin the ClusterDAG
clusterDAGPin := api.PinWithOpts(api.NewCid(clusterDAG), dgs.addParams.PinOptions)
clusterDAGPin.ReplicationFactorMin = -1
clusterDAGPin.ReplicationFactorMax = -1
clusterDAGPin.MaxDepth = 0 // pin direct
clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name)
clusterDAGPin.Type = api.ClusterDAGType
clusterDAGPin.Reference = &dataRoot
// Update object with response.
err = adder.Pin(ctx, dgs.rpcClient, clusterDAGPin)
if err != nil {
return dataRoot, err
}
// Pin the META pin
metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions)
metaPin.Type = api.MetaType
ref := api.NewCid(clusterDAG)
metaPin.Reference = &ref
metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned
err = adder.Pin(ctx, dgs.rpcClient, metaPin)
if err != nil {
return dataRoot, err
}
// Log some stats
dgs.logStats(metaPin.Cid, clusterDAGPin.Cid)
// Consider doing this? Seems like overkill
//
// // Amend ShardPins to reference clusterDAG root hash as a Parent
// shardParents := cid.NewSet()
// shardParents.Add(clusterDAG)
// for shardN, shard := range dgs.shardNodes {
// pin := api.PinWithOpts(shard, dgs.addParams)
// pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN)
// pin.Type = api.ShardType
// pin.Parents = shardParents
// // FIXME: We don't know anymore the shard pin maxDepth
// // so we'd need to get the pin first.
// err := dgs.pin(pin)
// if err != nil {
// return err
// }
// }
return dataRoot, nil
}
// Allocations returns the current allocations for the current shard.
func (dgs *DAGService) Allocations() []peer.ID {
// FIXME: this is probably not safe in concurrency? However, there is
// no concurrent execution of any code in the DAGService I think.
if dgs.currentShard != nil {
return dgs.currentShard.Allocations()
}
return nil
}
// ingests a block to the current shard. If it get's full, it
// Flushes the shard and retries with a new one.
func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
shard := dgs.currentShard
// if we have no currentShard, create one
if shard == nil {
logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards))
var err error
// important: shards use the DAGService context.
shard, err = newShard(dgs.ctx, ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
return err
}
dgs.currentShard = shard
}
logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.addParams.Name)
// this is not same as n.Size()
size := uint64(len(n.RawData()))
// add the block to it if it fits and return
if shard.Size()+size < shard.Limit() {
shard.AddLink(ctx, n.Cid(), size)
return dgs.currentShard.sendBlock(ctx, n)
}
logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d",
len(dgs.shards),
size,
shard.Size(),
shard.Limit(),
)
// -------
// Below: block DOES NOT fit in shard
// Flush and retry
// if shard is empty, error
if shard.Size() == 0 {
return errors.New("block doesn't fit in empty shard: shard size too small?")
}
_, err := dgs.flushCurrentShard(ctx)
if err != nil {
return err
}
return dgs.ingestBlock(ctx, n) // <-- retry ingest
}
func (dgs *DAGService) logStats(metaPin, clusterDAGPin api.Cid) {
duration := time.Since(dgs.startTime)
seconds := uint64(duration) / uint64(time.Second)
var rate string
if seconds == 0 {
rate = "∞ B"
} else {
rate = humanize.Bytes(dgs.totalSize / seconds)
}
statsFmt := `sharding session successful:
CID: %s
ClusterDAG: %s
Total shards: %d
Total size: %s
Total time: %s
Ingest Rate: %s/s
`
logger.Infof(
statsFmt,
metaPin,
clusterDAGPin,
len(dgs.shards),
humanize.Bytes(dgs.totalSize),
duration,
rate,
)
}
func (dgs *DAGService) sendOutput(ao api.AddedOutput) {
if dgs.output != nil {
dgs.output <- ao
}
}
// flushes the dgs.currentShard and returns the LastLink()
func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) {
shard := dgs.currentShard
if shard == nil {
return cid.Undef, errors.New("cannot flush a nil shard")
}
lens := len(dgs.shards)
shardCid, err := shard.Flush(ctx, lens, dgs.previousShard)
if err != nil {
return shardCid, err
}
dgs.totalSize += shard.Size()
dgs.shards[fmt.Sprintf("%d", lens)] = shardCid
dgs.previousShard = shardCid
dgs.currentShard = nil
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("shard-%d", lens),
Cid: api.NewCid(shardCid),
Size: shard.Size(),
Allocations: shard.Allocations(),
})
return shard.LastLink(), nil
}
// AddMany calls Add for every given node.
func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error {
for _, node := range nodes {
err := dgs.Add(ctx, node)
if err != nil {
return err
}
}
return nil
}