// Package sharding implements a sharding ClusterDAGService places // content in different shards while it's being added, creating // a final Cluster DAG and pinning it. package sharding import ( "context" "errors" "fmt" "time" "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" humanize "github.com/dustin/go-humanize" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) var logger = logging.Logger("shardingdags") // DAGService is an implementation of a ClusterDAGService which // shards content while adding among several IPFS Cluster peers, // creating a Cluster DAG to track and pin that content selectively // in the IPFS daemons allocated to it. type DAGService struct { adder.BaseDAGService ctx context.Context rpcClient *rpc.Client addParams api.AddParams output chan<- api.AddedOutput addedSet *cid.Set // Current shard being built currentShard *shard // Last flushed shard CID previousShard cid.Cid // shard tracking shards map[string]cid.Cid startTime time.Time totalSize uint64 } // New returns a new ClusterDAGService, which uses the given rpc client to perform // Allocate, IPFSStream and Pin requests to other cluster components. func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, out chan<- api.AddedOutput) *DAGService { // use a default value for this regardless of what is provided. opts.Mode = api.PinModeRecursive return &DAGService{ ctx: ctx, rpcClient: rpc, addParams: opts, output: out, addedSet: cid.NewSet(), shards: make(map[string]cid.Cid), startTime: time.Now(), } } // Add puts the given node in its corresponding shard and sends it to the // destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { // FIXME: This will grow in memory if !dgs.addedSet.Visit(node.Cid()) { return nil } return dgs.ingestBlock(ctx, node) } // Finalize finishes sharding, creates the cluster DAG and pins it along // with the meta pin for the root node of the content. func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) { lastCid, err := dgs.flushCurrentShard(ctx) if err != nil { return api.NewCid(lastCid), err } if !lastCid.Equals(dataRoot.Cid) { logger.Warnf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot) } clusterDAGNodes, err := makeDAG(ctx, dgs.shards) if err != nil { return dataRoot, err } // PutDAG to ourselves blocks := make(chan api.NodeWithMeta, 256) go func() { defer close(blocks) for _, n := range clusterDAGNodes { select { case <-ctx.Done(): logger.Error(ctx.Err()) return //abort case blocks <- adder.IpldNodeToNodeWithMeta(n): } } }() // Stream these blocks and wait until we are done. bs := adder.NewBlockStreamer(ctx, dgs.rpcClient, []peer.ID{""}, blocks) select { case <-ctx.Done(): return dataRoot, ctx.Err() case <-bs.Done(): } if err := bs.Err(); err != nil { return dataRoot, err } clusterDAG := clusterDAGNodes[0].Cid() dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name), Cid: api.NewCid(clusterDAG), Size: dgs.totalSize, Allocations: nil, }) // Pin the ClusterDAG clusterDAGPin := api.PinWithOpts(api.NewCid(clusterDAG), dgs.addParams.PinOptions) clusterDAGPin.ReplicationFactorMin = -1 clusterDAGPin.ReplicationFactorMax = -1 clusterDAGPin.MaxDepth = 0 // pin direct clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name) clusterDAGPin.Type = api.ClusterDAGType clusterDAGPin.Reference = &dataRoot // Update object with response. err = adder.Pin(ctx, dgs.rpcClient, clusterDAGPin) if err != nil { return dataRoot, err } // Pin the META pin metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions) metaPin.Type = api.MetaType ref := api.NewCid(clusterDAG) metaPin.Reference = &ref metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned err = adder.Pin(ctx, dgs.rpcClient, metaPin) if err != nil { return dataRoot, err } // Log some stats dgs.logStats(metaPin.Cid, clusterDAGPin.Cid) // Consider doing this? Seems like overkill // // // Amend ShardPins to reference clusterDAG root hash as a Parent // shardParents := cid.NewSet() // shardParents.Add(clusterDAG) // for shardN, shard := range dgs.shardNodes { // pin := api.PinWithOpts(shard, dgs.addParams) // pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN) // pin.Type = api.ShardType // pin.Parents = shardParents // // FIXME: We don't know anymore the shard pin maxDepth // // so we'd need to get the pin first. // err := dgs.pin(pin) // if err != nil { // return err // } // } return dataRoot, nil } // Allocations returns the current allocations for the current shard. func (dgs *DAGService) Allocations() []peer.ID { // FIXME: this is probably not safe in concurrency? However, there is // no concurrent execution of any code in the DAGService I think. if dgs.currentShard != nil { return dgs.currentShard.Allocations() } return nil } // ingests a block to the current shard. If it get's full, it // Flushes the shard and retries with a new one. func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { shard := dgs.currentShard // if we have no currentShard, create one if shard == nil { logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards)) var err error // important: shards use the DAGService context. shard, err = newShard(dgs.ctx, ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { return err } dgs.currentShard = shard } logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.addParams.Name) // this is not same as n.Size() size := uint64(len(n.RawData())) // add the block to it if it fits and return if shard.Size()+size < shard.Limit() { shard.AddLink(ctx, n.Cid(), size) return dgs.currentShard.sendBlock(ctx, n) } logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d", len(dgs.shards), size, shard.Size(), shard.Limit(), ) // ------- // Below: block DOES NOT fit in shard // Flush and retry // if shard is empty, error if shard.Size() == 0 { return errors.New("block doesn't fit in empty shard: shard size too small?") } _, err := dgs.flushCurrentShard(ctx) if err != nil { return err } return dgs.ingestBlock(ctx, n) // <-- retry ingest } func (dgs *DAGService) logStats(metaPin, clusterDAGPin api.Cid) { duration := time.Since(dgs.startTime) seconds := uint64(duration) / uint64(time.Second) var rate string if seconds == 0 { rate = "∞ B" } else { rate = humanize.Bytes(dgs.totalSize / seconds) } statsFmt := `sharding session successful: CID: %s ClusterDAG: %s Total shards: %d Total size: %s Total time: %s Ingest Rate: %s/s ` logger.Infof( statsFmt, metaPin, clusterDAGPin, len(dgs.shards), humanize.Bytes(dgs.totalSize), duration, rate, ) } func (dgs *DAGService) sendOutput(ao api.AddedOutput) { if dgs.output != nil { dgs.output <- ao } } // flushes the dgs.currentShard and returns the LastLink() func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) { shard := dgs.currentShard if shard == nil { return cid.Undef, errors.New("cannot flush a nil shard") } lens := len(dgs.shards) shardCid, err := shard.Flush(ctx, lens, dgs.previousShard) if err != nil { return shardCid, err } dgs.totalSize += shard.Size() dgs.shards[fmt.Sprintf("%d", lens)] = shardCid dgs.previousShard = shardCid dgs.currentShard = nil dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("shard-%d", lens), Cid: api.NewCid(shardCid), Size: shard.Size(), Allocations: shard.Allocations(), }) return shard.LastLink(), nil } // AddMany calls Add for every given node. func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { for _, node := range nodes { err := dgs.Add(ctx, node) if err != nil { return err } } return nil }