depot/packages/networking/ipfs-cluster/ipfsconn/ipfshttp/ipfshttp.go

1240 lines
31 KiB
Go

// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It
// uses the IPFS HTTP API to communicate to IPFS.
package ipfshttp
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/observations"
"github.com/tv42/httpunix"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
logging "github.com/ipfs/go-log/v2"
gopath "github.com/ipfs/go-path"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multicodec"
multihash "github.com/multiformats/go-multihash"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
)
// DNSTimeout is used when resolving DNS multiaddresses in this module
var DNSTimeout = 5 * time.Second
var logger = logging.Logger("ipfshttp")
// Connector implements the IPFSConnector interface
// and provides a component which is used to perform
// on-demand requests against the configured IPFS daemom
// (such as a pin request).
type Connector struct {
ctx context.Context
cancel func()
config *Config
nodeAddr string
nodeAddrScheme string
rpcClient *rpc.Client
rpcReady chan struct{}
client *http.Client // client to ipfs daemon
updateMetricCount uint64
ipfsPinCount int64
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
type ipfsError struct {
path string
code int
Message string
}
func (ie ipfsError) Error() string {
return fmt.Sprintf(
"IPFS error (%s). Code: %d. Message: %s",
ie.path,
ie.code,
ie.Message,
)
}
type ipfsUnpinnedError ipfsError
func (unpinned ipfsUnpinnedError) Is(target error) bool {
ierr, ok := target.(ipfsError)
if !ok {
return false
}
return strings.HasSuffix(ierr.Message, "not pinned")
}
func (unpinned ipfsUnpinnedError) Error() string {
return ipfsError(unpinned).Error()
}
type ipfsIDResp struct {
ID string
Addresses []string
}
type ipfsResolveResp struct {
Path string
}
type ipfsRepoGCResp struct {
Key cid.Cid
Error string
}
type ipfsPinsResp struct {
Pins []string
Progress int
}
type ipfsSwarmPeersResp struct {
Peers []ipfsPeer
}
type ipfsBlockPutResp struct {
Key api.Cid
Size int
}
type ipfsPeer struct {
Peer string
}
// NewConnector creates the component and leaves it ready to be started
func NewConnector(cfg *Config) (*Connector, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
nodeMAddr := cfg.NodeAddr
// dns multiaddresses need to be resolved first
if madns.Matches(nodeMAddr) {
ctx, cancel := context.WithTimeout(context.Background(), DNSTimeout)
defer cancel()
resolvedAddrs, err := madns.Resolve(ctx, cfg.NodeAddr)
if err != nil {
logger.Error(err)
return nil, err
}
nodeMAddr = resolvedAddrs[0]
}
_, nodeAddr, err := manet.DialArgs(nodeMAddr)
if err != nil {
return nil, err
}
var c *http.Client
var nodeAddrScheme string
if unixSocketPath, err := nodeMAddr.ValueForProtocol(multiaddr.P_UNIX); err == nil {
u := &httpunix.Transport{}
u.RegisterLocation("ipfsapiunix", unixSocketPath)
nodeAddr = "ipfsapiunix"
t := &http.Transport{}
t.RegisterProtocol(httpunix.Scheme, u)
c = &http.Client{
Transport: t,
}
nodeAddrScheme = "http+unix"
} else {
c = &http.Client{} // timeouts are handled by context timeouts
nodeAddrScheme = "http"
}
if cfg.Tracing {
c.Transport = &ochttp.Transport{
Base: http.DefaultTransport,
Propagation: &tracecontext.HTTPFormat{},
StartOptions: trace.StartOptions{SpanKind: trace.SpanKindClient},
FormatSpanName: func(req *http.Request) string { return req.Host + ":" + req.URL.Path + ":" + req.Method },
NewClientTrace: ochttp.NewSpanAnnotatingClientTrace,
}
}
ctx, cancel := context.WithCancel(context.Background())
ipfs := &Connector{
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeAddr,
nodeAddrScheme: nodeAddrScheme,
rpcReady: make(chan struct{}, 1),
client: c,
}
initializeMetrics(ctx)
go ipfs.run()
return ipfs, nil
}
func initializeMetrics(ctx context.Context) {
// initialize metrics
stats.Record(ctx, observations.PinsIpfsPins.M(0))
stats.Record(ctx, observations.PinsPinAdd.M(0))
stats.Record(ctx, observations.PinsPinAddError.M(0))
stats.Record(ctx, observations.BlocksPut.M(0))
stats.Record(ctx, observations.BlocksAddedSize.M(0))
stats.Record(ctx, observations.BlocksAdded.M(0))
stats.Record(ctx, observations.BlocksAddedError.M(0))
}
// connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
<-ipfs.rpcReady
// Do not shutdown while launching threads
// -- prevents race conditions with ipfs.wg.
ipfs.shutdownLock.Lock()
defer ipfs.shutdownLock.Unlock()
if ipfs.config.ConnectSwarmsDelay == 0 {
return
}
// This runs ipfs swarm connect to the daemons of other cluster members
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
// It does not hurt to wait a little bit. i.e. think cluster
// peers which are started at the same time as the ipfs
// daemon...
tmr := time.NewTimer(ipfs.config.ConnectSwarmsDelay)
defer tmr.Stop()
select {
case <-tmr.C:
// do not hang this goroutine if this call hangs
// otherwise we hang during shutdown
go ipfs.ConnectSwarms(ipfs.ctx)
case <-ipfs.ctx.Done():
return
}
}()
}
// SetClient makes the component ready to perform RPC
// requests.
func (ipfs *Connector) SetClient(c *rpc.Client) {
ipfs.rpcClient = c
ipfs.rpcReady <- struct{}{}
}
// Shutdown stops any listeners and stops the component from taking
// any requests.
func (ipfs *Connector) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown")
defer span.End()
ipfs.shutdownLock.Lock()
defer ipfs.shutdownLock.Unlock()
if ipfs.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping IPFS Connector")
ipfs.cancel()
close(ipfs.rpcReady)
ipfs.wg.Wait()
ipfs.shutdown = true
return nil
}
// ID performs an ID request against the configured
// IPFS daemon. It returns the fetched information.
// If the request fails, or the parsing fails, it
// returns an error.
func (ipfs *Connector) ID(ctx context.Context) (api.IPFSID, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/ID")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
body, err := ipfs.postCtx(ctx, "id", "", nil)
if err != nil {
return api.IPFSID{}, err
}
var res ipfsIDResp
err = json.Unmarshal(body, &res)
if err != nil {
return api.IPFSID{}, err
}
pID, err := peer.Decode(res.ID)
if err != nil {
return api.IPFSID{}, err
}
id := api.IPFSID{
ID: pID,
}
mAddrs := make([]api.Multiaddr, len(res.Addresses))
for i, strAddr := range res.Addresses {
mAddr, err := api.NewMultiaddr(strAddr)
if err != nil {
id.Error = err.Error()
return id, err
}
mAddrs[i] = mAddr
}
id.Addresses = mAddrs
return id, nil
}
func pinArgs(maxDepth api.PinDepth) string {
q := url.Values{}
switch {
case maxDepth < 0:
q.Set("recursive", "true")
case maxDepth == 0:
q.Set("recursive", "false")
default:
q.Set("recursive", "true")
q.Set("max-depth", strconv.Itoa(int(maxDepth)))
}
return q.Encode()
}
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin")
defer span.End()
hash := pin.Cid
maxDepth := pin.MaxDepth
pinStatus, err := ipfs.PinLsCid(ctx, pin)
if err != nil {
return err
}
if pinStatus.IsPinned(maxDepth) {
logger.Debug("IPFS object is already pinned: ", hash)
return nil
}
defer ipfs.updateInformerMetric(ctx)
ctx, cancelRequest := context.WithCancel(ctx)
defer cancelRequest()
// If the pin has origins, tell ipfs to connect to a maximum of 10.
bound := len(pin.Origins)
if bound > 10 {
bound = 10
}
for _, orig := range pin.Origins[0:bound] {
// do it in the background, ignoring errors.
go func(o string) {
logger.Debugf("swarm-connect to origin before pinning: %s", o)
_, err := ipfs.postCtx(
ctx,
fmt.Sprintf("swarm/connect?arg=%s", o),
"",
nil,
)
if err != nil {
logger.Debug(err)
return
}
logger.Debugf("swarm-connect success to origin: %s", o)
}(url.QueryEscape(orig.String()))
}
// If we have a pin-update, and the old object
// is pinned recursively, then do pin/update.
// Otherwise do a normal pin.
if from := pin.PinUpdate; from.Defined() {
fromPin := api.PinWithOpts(from, pin.PinOptions)
pinStatus, _ := ipfs.PinLsCid(ctx, fromPin)
if pinStatus.IsPinned(-1) { // pinned recursively.
// As a side note, if PinUpdate == pin.Cid, we are
// somehow pinning an already pinned thing and we'd
// better use update for that
return ipfs.pinUpdate(ctx, from, pin.Cid)
}
}
// Pin request and timeout if there is no progress
outPins := make(chan int)
go func() {
var lastProgress int
lastProgressTime := time.Now()
ticker := time.NewTicker(ipfs.config.PinTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if time.Since(lastProgressTime) > ipfs.config.PinTimeout {
// timeout request
cancelRequest()
return
}
case p := <-outPins:
// ipfs will send status messages every second
// or so but we need make sure there was
// progress by looking at number of nodes
// fetched.
if p > lastProgress {
lastProgress = p
lastProgressTime = time.Now()
}
case <-ctx.Done():
return
}
}
}()
stats.Record(ipfs.ctx, observations.PinsPinAdd.M(1))
err = ipfs.pinProgress(ctx, hash, maxDepth, outPins)
if err != nil {
stats.Record(ipfs.ctx, observations.PinsPinAddError.M(1))
return err
}
totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1)
stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))
logger.Info("IPFS Pin request succeeded: ", hash)
return nil
}
// pinProgress pins an item and sends fetched node's progress on a
// channel. Blocks until done or error. pinProgress will always close the out
// channel. pinProgress will not block on sending to the channel if it is full.
func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth api.PinDepth, out chan<- int) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinsProgress")
defer span.End()
pinArgs := pinArgs(maxDepth)
path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs)
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
if err != nil {
return err
}
defer res.Body.Close()
_, err = checkResponse(path, res)
if err != nil {
return err
}
dec := json.NewDecoder(res.Body)
for {
var pins ipfsPinsResp
if err := dec.Decode(&pins); err != nil {
// If we canceled the request we should tell the user
// (in case dec.Decode() exited cleanly with an EOF).
select {
case <-ctx.Done():
return ctx.Err()
default:
if err == io.EOF {
return nil // clean exit. Pinned!
}
return err // error decoding
}
}
select {
case out <- pins.Progress:
default:
}
}
}
func (ipfs *Connector) pinUpdate(ctx context.Context, from, to api.Cid) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate")
defer span.End()
path := fmt.Sprintf("pin/update?arg=%s&arg=%s&unpin=false", from, to)
_, err := ipfs.postCtx(ctx, path, "", nil)
if err != nil {
return err
}
totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1)
stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))
logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to)
return nil
}
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(ctx context.Context, hash api.Cid) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Unpin")
defer span.End()
if ipfs.config.UnpinDisable {
return errors.New("ipfs unpinning is disallowed by configuration on this peer")
}
defer ipfs.updateInformerMetric(ctx)
path := fmt.Sprintf("pin/rm?arg=%s", hash)
ctx, cancel := context.WithTimeout(ctx, ipfs.config.UnpinTimeout)
defer cancel()
// We will call unpin in any case, if the CID is not pinned,
// then we ignore the error (although this is a bit flaky).
_, err := ipfs.postCtx(ctx, path, "", nil)
if err != nil {
ipfsErr, ok := err.(ipfsError)
if !ok || ipfsErr.Message != ipfspinner.ErrNotPinned.Error() {
return err
}
logger.Debug("IPFS object is already unpinned: ", hash)
return nil
}
totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, -1)
stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))
logger.Info("IPFS Unpin request succeeded:", hash)
return nil
}
// PinLs performs a "pin ls --type typeFilter" request against the configured
// IPFS daemon and sends the results on the given channel. Returns when done.
func (ipfs *Connector) PinLs(ctx context.Context, typeFilters []string, out chan<- api.IPFSPinInfo) error {
defer close(out)
bodies := make([]io.ReadCloser, len(typeFilters))
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/PinLs")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
var err error
var totalPinCount int64
defer func() {
if err != nil {
atomic.StoreInt64(&ipfs.ipfsPinCount, totalPinCount)
stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPinCount))
}
}()
nextFilter:
for i, typeFilter := range typeFilters {
// Post and read streaming response
path := "pin/ls?stream=true&type=" + typeFilter
bodies[i], err = ipfs.postCtxStreamResponse(ctx, path, "", nil)
if err != nil {
logger.Error("error querying pinset: %s", err)
return err
}
defer bodies[i].Close()
dec := json.NewDecoder(bodies[i])
for {
select {
case <-ctx.Done():
err = fmt.Errorf("aborting pin/ls operation: %w", ctx.Err())
logger.Error(err)
return err
default:
}
var ipfsPin api.IPFSPinInfo
err = dec.Decode(&ipfsPin)
if err == io.EOF {
break nextFilter
}
if err != nil {
err = fmt.Errorf("error decoding ipfs pin: %w", err)
return err
}
select {
case <-ctx.Done():
err = fmt.Errorf("aborting pin/ls operation: %w", ctx.Err())
logger.Error(err)
return err
case out <- ipfsPin:
totalPinCount++
}
}
}
return nil
}
// PinLsCid performs a "pin ls <hash>" request. It will use "type=recursive" or
// "type=direct" (or other) depending on the given pin's MaxDepth setting.
// It returns an api.IPFSPinStatus for that hash.
func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinStatus, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/PinLsCid")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
if !pin.Defined() {
return api.IPFSPinStatusBug, errors.New("calling PinLsCid without a defined CID")
}
pinType := pin.MaxDepth.ToPinMode().String()
lsPath := fmt.Sprintf("pin/ls?stream=true&arg=%s&type=%s", pin.Cid, pinType)
body, err := ipfs.postCtxStreamResponse(ctx, lsPath, "", nil)
if err != nil {
if errors.Is(ipfsUnpinnedError{}, err) {
return api.IPFSPinStatusUnpinned, nil
}
return api.IPFSPinStatusError, err
}
defer body.Close()
var res api.IPFSPinInfo
dec := json.NewDecoder(body)
err = dec.Decode(&res)
if err != nil {
logger.Error("error parsing pin/ls?arg=cid response")
return api.IPFSPinStatusError, err
}
return res.Type, nil
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil {
logger.Error("error creating POST request:", err)
}
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx)
res, err := ipfs.client.Do(req)
if err != nil {
logger.Error("error posting to IPFS:", err)
}
return res, err
}
// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
if res.StatusCode == http.StatusOK {
return nil, nil
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
ipfsErr.code = res.StatusCode
ipfsErr.path = path
return body, ipfsErr
}
}
// No error response with useful message from ipfs
return nil, fmt.Errorf(
"IPFS request failed (is it running?) (%s). Code %d: %s",
path,
res.StatusCode,
string(body))
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
if err != nil {
return nil, err
}
defer rdr.Close()
body, err := io.ReadAll(rdr)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
return body, nil
}
// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil {
return nil, err
}
_, err = checkResponse(path, res)
if err != nil {
return nil, err
}
return res.Body, nil
}
// apiURL is a short-hand for building the url of the IPFS
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("%s://%s/api/v0", ipfs.nodeAddrScheme, ipfs.nodeAddr)
}
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/ConnectSwarms")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
in := make(chan struct{})
close(in)
out := make(chan api.ID)
go func() {
err := ipfs.rpcClient.Stream(
ctx,
"",
"Cluster",
"Peers",
in,
out,
)
if err != nil {
logger.Error(err)
}
}()
for id := range out {
ipfsID := id.IPFS
if id.Error != "" || ipfsID.Error != "" {
continue
}
for _, addr := range ipfsID.Addresses {
// This is a best effort attempt
// We ignore errors which happens
// when passing in a bunch of addresses
_, err := ipfs.postCtx(
ctx,
fmt.Sprintf("swarm/connect?arg=%s", url.QueryEscape(addr.String())),
"",
nil,
)
if err != nil {
logger.Debug(err)
continue
}
logger.Debugf("ipfs successfully connected to %s", addr)
}
}
return nil
}
// ConfigKey fetches the IPFS daemon configuration and retrieves the value for
// a given configuration key. For example, "Datastore/StorageMax" will return
// the value for StorageMax in the Datastore configuration object.
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "config/show", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
var cfg map[string]interface{}
err = json.Unmarshal(res, &cfg)
if err != nil {
logger.Error(err)
return nil, err
}
path := strings.SplitN(keypath, "/", 2)
if len(path) == 0 {
return nil, errors.New("cannot lookup without a path")
}
return getConfigValue(path, cfg)
}
func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, error) {
value, ok := cfg[path[0]]
if !ok {
return nil, errors.New("key not found in configuration")
}
if len(path) == 1 {
return value, nil
}
switch v := value.(type) {
case map[string]interface{}:
return getConfigValue(path[1:], v)
default:
return nil, errors.New("invalid path")
}
}
// RepoStat returns the DiskUsage and StorageMax repo/stat values from the
// ipfs daemon, in bytes, wrapped as an IPFSRepoStat object.
func (ipfs *Connector) RepoStat(ctx context.Context) (api.IPFSRepoStat, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/RepoStat")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
if err != nil {
logger.Error(err)
return api.IPFSRepoStat{}, err
}
var stats api.IPFSRepoStat
err = json.Unmarshal(res, &stats)
if err != nil {
logger.Error(err)
return api.IPFSRepoStat{}, err
}
return stats, nil
}
// RepoGC performs a garbage collection sweep on the cluster peer's IPFS repo.
func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/RepoGC")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.RepoGCTimeout)
defer cancel()
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), "repo/gc?stream-errors=true", "", nil)
if err != nil {
logger.Error(err)
return api.RepoGC{}, err
}
defer res.Body.Close()
dec := json.NewDecoder(res.Body)
repoGC := api.RepoGC{
Keys: []api.IPFSRepoGC{},
}
for {
resp := ipfsRepoGCResp{}
if err := dec.Decode(&resp); err != nil {
// If we canceled the request we should tell the user
// (in case dec.Decode() exited cleanly with an EOF).
select {
case <-ctx.Done():
return repoGC, ctx.Err()
default:
if err == io.EOF {
return repoGC, nil // clean exit
}
logger.Error(err)
return repoGC, err // error decoding
}
}
repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: api.NewCid(resp.Key), Error: resp.Error})
}
}
// Resolve accepts ipfs or ipns path and resolves it into a cid
func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Resolve")
defer span.End()
validPath, err := gopath.ParsePath(path)
if err != nil {
logger.Error("could not parse path: " + err.Error())
return api.CidUndef, err
}
if !strings.HasPrefix(path, "/ipns") && validPath.IsJustAKey() {
ci, _, err := gopath.SplitAbsPath(validPath)
return api.NewCid(ci), err
}
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil)
if err != nil {
logger.Error(err)
return api.CidUndef, err
}
var resp ipfsResolveResp
err = json.Unmarshal(res, &resp)
if err != nil {
logger.Error("could not unmarshal response: " + err.Error())
return api.CidUndef, err
}
ci, _, err := gopath.SplitAbsPath(gopath.FromString(resp.Path))
return api.NewCid(ci), err
}
// SwarmPeers returns the peers currently connected to this ipfs daemon.
func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/SwarmPeers")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
var peersRaw ipfsSwarmPeersResp
err = json.Unmarshal(res, &peersRaw)
if err != nil {
logger.Error(err)
return nil, err
}
swarm := make([]peer.ID, len(peersRaw.Peers))
for i, p := range peersRaw.Peers {
pID, err := peer.Decode(p.Peer)
if err != nil {
logger.Error(err)
return swarm, err
}
swarm[i] = pID
}
return swarm, nil
}
// chanDirectory implements the files.Directory interface
type chanDirectory struct {
iterator files.DirIterator
}
// Close is a no-op and it is not used.
func (cd *chanDirectory) Close() error {
return nil
}
// not implemented, I think not needed for multipart.
func (cd *chanDirectory) Size() (int64, error) {
return 0, nil
}
func (cd *chanDirectory) Entries() files.DirIterator {
return cd.iterator
}
// chanIterator implements the files.DirIterator interface.
type chanIterator struct {
ctx context.Context
blocks <-chan api.NodeWithMeta
current api.NodeWithMeta
peeked api.NodeWithMeta
done bool
err error
seenMu sync.Mutex
seen *multihash.Set
}
func (ci *chanIterator) Name() string {
if !ci.current.Cid.Defined() {
return ""
}
return ci.current.Cid.String()
}
// return NewBytesFile.
func (ci *chanIterator) Node() files.Node {
if !ci.current.Cid.Defined() {
return nil
}
logger.Debugf("it.node(): %s", ci.current.Cid)
ci.seenMu.Lock()
ci.seen.Add(ci.current.Cid.Hash())
ci.seenMu.Unlock()
stats.Record(ci.ctx, observations.BlocksAdded.M(1))
stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(ci.current.Data))))
return files.NewBytesFile(ci.current.Data)
}
func (ci *chanIterator) Seen(c api.Cid) bool {
ci.seenMu.Lock()
has := ci.seen.Has(c.Cid.Hash())
ci.seen.Remove(c.Cid.Hash())
ci.seenMu.Unlock()
return has
}
func (ci *chanIterator) Done() bool {
return ci.done
}
// Peek reads one block from the channel but saves it so that Next also
// returns it.
func (ci *chanIterator) Peek() (api.NodeWithMeta, bool) {
if ci.done {
return api.NodeWithMeta{}, false
}
select {
case <-ci.ctx.Done():
return api.NodeWithMeta{}, false
case next, ok := <-ci.blocks:
if !ok {
return api.NodeWithMeta{}, false
}
ci.peeked = next
return next, true
}
}
func (ci *chanIterator) Next() bool {
if ci.done {
return false
}
if ci.peeked.Cid.Defined() {
ci.current = ci.peeked
ci.peeked = api.NodeWithMeta{}
return true
}
select {
case <-ci.ctx.Done():
ci.done = true
ci.err = ci.ctx.Err()
return false
case next, ok := <-ci.blocks:
if !ok {
ci.done = true
return false
}
logger.Debugf("it.Next() %s", next.Cid)
ci.current = next
return true
}
}
func (ci *chanIterator) Err() error {
return ci.err
}
func blockPutQuery(prefix cid.Prefix) (url.Values, error) {
q := make(url.Values, 3)
codec := multicodec.Code(prefix.Codec).String()
if codec == "" {
return q, fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
}
mhType, ok := multihash.Codes[prefix.MhType]
if !ok {
return q, fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
}
// From go-ipfs 0.13.0 format is deprecated and we use cid-codec
q.Set("cid-codec", codec)
q.Set("mhtype", mhType)
q.Set("mhlen", strconv.Itoa(prefix.MhLength))
q.Set("pin", "false")
q.Set("allow-big-block", "true")
return q, nil
}
// BlockStream performs a multipart request to block/put with the blocks
// received on the channel.
func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWithMeta) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockStream")
defer span.End()
logger.Debug("streaming blocks to IPFS")
defer ipfs.updateInformerMetric(ctx)
it := &chanIterator{
ctx: ctx,
blocks: blocks,
seen: multihash.NewSet(),
}
dir := &chanDirectory{
iterator: it,
}
// We need to pick into the first block to know which Cid prefix we
// are writing blocks with, so that ipfs calculates the expected
// multihash (we select the function used). This means that all blocks
// in a stream should use the same.
peek, ok := it.Peek()
if !ok {
return errors.New("BlockStream: no blocks to peek in blocks channel")
}
q, err := blockPutQuery(peek.Cid.Prefix())
if err != nil {
return err
}
url := "block/put?" + q.Encode()
// Now we stream the blocks to ipfs. In case of error, we return
// directly, but leave a goroutine draining the channel until it is
// closed, which should be soon after returning.
stats.Record(ctx, observations.BlocksPut.M(1))
multiFileR := files.NewMultiFileReader(dir, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
if err != nil {
return err
}
defer body.Close()
dec := json.NewDecoder(body)
for {
var res ipfsBlockPutResp
err = dec.Decode(&res)
if err == io.EOF {
return nil
}
if err != nil {
logger.Error(err)
break
}
logger.Debugf("response block: %s", res.Key)
if !it.Seen(res.Key) {
logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key)
}
}
// keep draining blocks channel until closed.
go func() {
for range blocks {
}
}()
if err != nil {
stats.Record(ipfs.ctx, observations.BlocksAddedError.M(1))
}
return err
}
// BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
url := "block/get?arg=" + c.String()
return ipfs.postCtx(ctx, url, "", nil)
}
// // FetchRefs asks IPFS to download blocks recursively to the given depth.
// // It discards the response, but waits until it completes.
// func (ipfs *Connector) FetchRefs(ctx context.Context, c api.Cid, maxDepth int) error {
// ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.PinTimeout)
// defer cancel()
// q := url.Values{}
// q.Set("recursive", "true")
// q.Set("unique", "false") // same memory on IPFS side
// q.Set("max-depth", fmt.Sprintf("%d", maxDepth))
// q.Set("arg", c.String())
// url := fmt.Sprintf("refs?%s", q.Encode())
// err := ipfs.postDiscardBodyCtx(ctx, url)
// if err != nil {
// return err
// }
// logger.Debugf("refs for %s successfully fetched", c)
// return nil
// }
// Returns true every updateMetricsMod-th time that we
// call this function.
func (ipfs *Connector) shouldUpdateMetric() bool {
if ipfs.config.InformerTriggerInterval <= 0 {
return false
}
curCount := atomic.AddUint64(&ipfs.updateMetricCount, 1)
if curCount%uint64(ipfs.config.InformerTriggerInterval) == 0 {
atomic.StoreUint64(&ipfs.updateMetricCount, 0)
return true
}
return false
}
// Trigger a broadcast of the local informer metrics.
func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
defer span.End()
ctx = trace.NewContext(ipfs.ctx, span)
if !ipfs.shouldUpdateMetric() {
return nil
}
err := ipfs.rpcClient.GoContext(
ctx,
"",
"Cluster",
"SendInformersMetrics",
struct{}{},
&struct{}{},
nil,
)
if err != nil {
logger.Error(err)
}
return err
}