depot/packages/networking/ipfs-cluster/api/rest/client/methods.go

699 lines
18 KiB
Go

package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/ipfs-cluster/ipfs-cluster/api"
files "github.com/ipfs/go-ipfs-files"
gopath "github.com/ipfs/go-path"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opencensus.io/trace"
)
// ID returns information about the cluster Peer.
func (c *defaultClient) ID(ctx context.Context) (api.ID, error) {
ctx, span := trace.StartSpan(ctx, "client/ID")
defer span.End()
var id api.ID
err := c.do(ctx, "GET", "/id", nil, nil, &id)
return id, err
}
// Peers requests ID information for all cluster peers.
func (c *defaultClient) Peers(ctx context.Context, out chan<- api.ID) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "client/Peers")
defer span.End()
handler := func(dec *json.Decoder) error {
var obj api.ID
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
return c.doStream(ctx, "GET", "/peers", nil, nil, handler)
}
type peerAddBody struct {
PeerID string `json:"peer_id"`
}
// PeerAdd adds a new peer to the cluster.
func (c *defaultClient) PeerAdd(ctx context.Context, pid peer.ID) (api.ID, error) {
ctx, span := trace.StartSpan(ctx, "client/PeerAdd")
defer span.End()
body := peerAddBody{pid.String()}
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(body)
var id api.ID
err := c.do(ctx, "POST", "/peers", nil, &buf, &id)
return id, err
}
// PeerRm removes a current peer from the cluster
func (c *defaultClient) PeerRm(ctx context.Context, id peer.ID) error {
ctx, span := trace.StartSpan(ctx, "client/PeerRm")
defer span.End()
return c.do(ctx, "DELETE", fmt.Sprintf("/peers/%s", id.Pretty()), nil, nil, nil)
}
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
func (c *defaultClient) Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/Pin")
defer span.End()
query, err := opts.ToQuery()
if err != nil {
return api.Pin{}, err
}
var pin api.Pin
err = c.do(
ctx,
"POST",
fmt.Sprintf(
"/pins/%s?%s",
ci.String(),
query,
),
nil,
nil,
&pin,
)
return pin, err
}
// Unpin untracks a Cid from cluster.
func (c *defaultClient) Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/Unpin")
defer span.End()
var pin api.Pin
err := c.do(ctx, "DELETE", fmt.Sprintf("/pins/%s", ci.String()), nil, nil, &pin)
return pin, err
}
// PinPath allows to pin an element by the given IPFS path.
func (c *defaultClient) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/PinPath")
defer span.End()
var pin api.Pin
ipfspath, err := gopath.ParsePath(path)
if err != nil {
return api.Pin{}, err
}
query, err := opts.ToQuery()
if err != nil {
return api.Pin{}, err
}
err = c.do(
ctx,
"POST",
fmt.Sprintf(
"/pins%s?%s",
ipfspath.String(),
query,
),
nil,
nil,
&pin,
)
return pin, err
}
// UnpinPath allows to unpin an item by providing its IPFS path.
// It returns the unpinned api.Pin information of the resolved Cid.
func (c *defaultClient) UnpinPath(ctx context.Context, p string) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/UnpinPath")
defer span.End()
var pin api.Pin
ipfspath, err := gopath.ParsePath(p)
if err != nil {
return api.Pin{}, err
}
err = c.do(ctx, "DELETE", fmt.Sprintf("/pins%s", ipfspath.String()), nil, nil, &pin)
return pin, err
}
// Allocations returns the consensus state listing all tracked items and
// the peers that should be pinning them.
func (c *defaultClient) Allocations(ctx context.Context, filter api.PinType, out chan<- api.Pin) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "client/Allocations")
defer span.End()
types := []api.PinType{
api.DataType,
api.MetaType,
api.ClusterDAGType,
api.ShardType,
}
var strFilter []string
if filter == api.AllType {
strFilter = []string{"all"}
} else {
for _, t := range types {
if t&filter > 0 { // the filter includes this type
strFilter = append(strFilter, t.String())
}
}
}
handler := func(dec *json.Decoder) error {
var obj api.Pin
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
f := url.QueryEscape(strings.Join(strFilter, ","))
return c.doStream(
ctx,
"GET",
fmt.Sprintf("/allocations?filter=%s", f),
nil,
nil,
handler)
}
// Allocation returns the current allocations for a given Cid.
func (c *defaultClient) Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/Allocation")
defer span.End()
var pin api.Pin
err := c.do(ctx, "GET", fmt.Sprintf("/allocations/%s", ci.String()), nil, nil, &pin)
return pin, err
}
// Status returns the current ipfs state for a given Cid. If local is true,
// the information affects only the current peer, otherwise the information
// is fetched from all cluster peers.
func (c *defaultClient) Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "client/Status")
defer span.End()
var gpi api.GlobalPinInfo
err := c.do(
ctx,
"GET",
fmt.Sprintf("/pins/%s?local=%t", ci.String(), local),
nil,
nil,
&gpi,
)
return gpi, err
}
// StatusCids returns Status() information for the given Cids. If local is
// true, the information affects only the current peer, otherwise the
// information is fetched from all cluster peers.
func (c *defaultClient) StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error {
return c.statusAllWithCids(ctx, api.TrackerStatusUndefined, cids, local, out)
}
// StatusAll gathers Status() for all tracked items. If a filter is
// provided, only entries matching the given filter statuses
// will be returned. A filter can be built by merging TrackerStatuses with
// a bitwise OR operation (st1 | st2 | ...). A "0" filter value (or
// api.TrackerStatusUndefined), means all.
func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus, local bool, out chan<- api.GlobalPinInfo) error {
return c.statusAllWithCids(ctx, filter, nil, local, out)
}
func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.TrackerStatus, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "client/StatusAll")
defer span.End()
filterStr := ""
if filter != api.TrackerStatusUndefined { // undefined filter means "all"
filterStr = filter.String()
if filterStr == "" {
return errors.New("invalid filter value")
}
}
cidsStr := make([]string, len(cids))
for i, c := range cids {
cidsStr[i] = c.String()
}
handler := func(dec *json.Decoder) error {
var obj api.GlobalPinInfo
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
return c.doStream(
ctx,
"GET",
fmt.Sprintf("/pins?local=%t&filter=%s&cids=%s",
local, url.QueryEscape(filterStr), strings.Join(cidsStr, ",")),
nil,
nil,
handler,
)
}
// Recover retriggers pin or unpin ipfs operations for a Cid in error state.
// If local is true, the operation is limited to the current peer, otherwise
// it happens on every cluster peer.
func (c *defaultClient) Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "client/Recover")
defer span.End()
var gpi api.GlobalPinInfo
err := c.do(ctx, "POST", fmt.Sprintf("/pins/%s/recover?local=%t", ci.String(), local), nil, nil, &gpi)
return gpi, err
}
// RecoverAll triggers Recover() operations on all tracked items. If local is
// true, the operation is limited to the current peer. Otherwise, it happens
// everywhere.
func (c *defaultClient) RecoverAll(ctx context.Context, local bool, out chan<- api.GlobalPinInfo) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "client/RecoverAll")
defer span.End()
handler := func(dec *json.Decoder) error {
var obj api.GlobalPinInfo
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
return c.doStream(
ctx,
"POST",
fmt.Sprintf("/pins/recover?local=%t", local),
nil,
nil,
handler)
}
// Alerts returns information health events in the cluster (expired metrics
// etc.).
func (c *defaultClient) Alerts(ctx context.Context) ([]api.Alert, error) {
ctx, span := trace.StartSpan(ctx, "client/Alert")
defer span.End()
var alerts []api.Alert
err := c.do(ctx, "GET", "/health/alerts", nil, nil, &alerts)
return alerts, err
}
// Version returns the ipfs-cluster peer's version.
func (c *defaultClient) Version(ctx context.Context) (api.Version, error) {
ctx, span := trace.StartSpan(ctx, "client/Version")
defer span.End()
var ver api.Version
err := c.do(ctx, "GET", "/version", nil, nil, &ver)
return ver, err
}
// GetConnectGraph returns an ipfs-cluster connection graph.
// The serialized version, strings instead of pids, is returned
func (c *defaultClient) GetConnectGraph(ctx context.Context) (api.ConnectGraph, error) {
ctx, span := trace.StartSpan(ctx, "client/GetConnectGraph")
defer span.End()
var graph api.ConnectGraph
err := c.do(ctx, "GET", "/health/graph", nil, nil, &graph)
return graph, err
}
// Metrics returns a map with the latest valid metrics of the given name
// for the current cluster peers.
func (c *defaultClient) Metrics(ctx context.Context, name string) ([]api.Metric, error) {
ctx, span := trace.StartSpan(ctx, "client/Metrics")
defer span.End()
if name == "" {
return nil, errors.New("bad metric name")
}
var metrics []api.Metric
err := c.do(ctx, "GET", fmt.Sprintf("/monitor/metrics/%s", name), nil, nil, &metrics)
return metrics, err
}
// MetricNames lists names of all metrics.
func (c *defaultClient) MetricNames(ctx context.Context) ([]string, error) {
ctx, span := trace.StartSpan(ctx, "client/MetricNames")
defer span.End()
var metricsNames []string
err := c.do(ctx, "GET", "/monitor/metrics", nil, nil, &metricsNames)
return metricsNames, err
}
// RepoGC runs garbage collection on IPFS daemons of cluster peers and
// returns collected CIDs. If local is true, it would garbage collect
// only on contacted peer, otherwise on all peers' IPFS daemons.
func (c *defaultClient) RepoGC(ctx context.Context, local bool) (api.GlobalRepoGC, error) {
ctx, span := trace.StartSpan(ctx, "client/RepoGC")
defer span.End()
var repoGC api.GlobalRepoGC
err := c.do(
ctx,
"POST",
fmt.Sprintf("/ipfs/gc?local=%t", local),
nil,
nil,
&repoGC,
)
return repoGC, err
}
// WaitFor is a utility function that allows for a caller to wait until a CID
// status target is reached (as given in StatusFilterParams).
// It returns the final status for that CID and an error, if there was one.
//
// WaitFor works by calling Status() repeatedly and checking that returned
// peers have transitioned to the target TrackerStatus. It immediately returns
// an error when the an error is among the statuses (and an empty
// GlobalPinInfo).
//
// A special case exists for TrackerStatusPinned targets: in this case,
// TrackerStatusRemote statuses are ignored, so WaitFor will return when
// all Statuses are Pinned or Remote by default.
//
// The Limit parameter allows to specify finer-grained control to, for
// example, only wait until a number of peers reaches a status.
func WaitFor(ctx context.Context, c Client, fp StatusFilterParams) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "client/WaitFor")
defer span.End()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sf := newStatusFilter()
go sf.pollStatus(ctx, c, fp)
go sf.filter(ctx, fp)
var status api.GlobalPinInfo
for {
select {
case <-ctx.Done():
return status, ctx.Err()
case err := <-sf.Err:
return status, err
case st, ok := <-sf.Out:
if !ok { // channel closed
return status, nil
}
status = st
}
}
}
// StatusFilterParams contains the parameters required
// to filter a stream of status results.
type StatusFilterParams struct {
Cid api.Cid
Local bool // query status from the local peer only
Target api.TrackerStatus
Limit int // wait for N peers reaching status. 0 == all
CheckFreq time.Duration
}
type statusFilter struct {
In, Out chan api.GlobalPinInfo
Done chan struct{}
Err chan error
}
func newStatusFilter() *statusFilter {
return &statusFilter{
In: make(chan api.GlobalPinInfo),
Out: make(chan api.GlobalPinInfo),
Done: make(chan struct{}),
Err: make(chan error),
}
}
func (sf *statusFilter) filter(ctx context.Context, fp StatusFilterParams) {
defer close(sf.Done)
defer close(sf.Out)
for {
select {
case <-ctx.Done():
sf.Err <- ctx.Err()
return
case gblPinInfo, more := <-sf.In:
if !more {
return
}
ok, err := statusReached(fp.Target, gblPinInfo, fp.Limit)
if err != nil {
sf.Err <- err
return
}
sf.Out <- gblPinInfo
if !ok {
continue
}
return
}
}
}
func (sf *statusFilter) pollStatus(ctx context.Context, c Client, fp StatusFilterParams) {
ticker := time.NewTicker(fp.CheckFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
sf.Err <- ctx.Err()
return
case <-ticker.C:
gblPinInfo, err := c.Status(ctx, fp.Cid, fp.Local)
if err != nil {
sf.Err <- err
return
}
logger.Debugf("pollStatus: status: %#v", gblPinInfo)
sf.In <- gblPinInfo
case <-sf.Done:
close(sf.In)
return
}
}
}
func statusReached(target api.TrackerStatus, gblPinInfo api.GlobalPinInfo, limit int) (bool, error) {
// Specific case: return error if there are errors
for _, pinInfo := range gblPinInfo.PeerMap {
switch pinInfo.Status {
case api.TrackerStatusUndefined,
api.TrackerStatusClusterError,
api.TrackerStatusPinError,
api.TrackerStatusUnpinError:
return false, fmt.Errorf("error has occurred while attempting to reach status: %s", target.String())
}
}
// Specific case: when limit it set, just count how many targets we
// reached.
if limit > 0 {
total := 0
for _, pinInfo := range gblPinInfo.PeerMap {
if pinInfo.Status == target {
total++
}
}
return total >= limit, nil
}
// General case: all statuses should be the target.
// Specific case: when looking for Pinned, ignore status remote.
for _, pinInfo := range gblPinInfo.PeerMap {
if pinInfo.Status == api.TrackerStatusRemote && target == api.TrackerStatusPinned {
continue
}
if pinInfo.Status == target {
continue
}
return false, nil
}
// All statuses are the target, as otherwise we would have returned
// false.
return true, nil
}
// logic drawn from go-ipfs-cmds/cli/parse.go: appendFile
func makeSerialFile(fpath string, params api.AddParams) (string, files.Node, error) {
if fpath == "." {
cwd, err := os.Getwd()
if err != nil {
return "", nil, err
}
cwd, err = filepath.EvalSymlinks(cwd)
if err != nil {
return "", nil, err
}
fpath = cwd
}
fpath = filepath.ToSlash(filepath.Clean(fpath))
stat, err := os.Lstat(fpath)
if err != nil {
return "", nil, err
}
if stat.IsDir() {
if !params.Recursive {
return "", nil, fmt.Errorf("%s is a directory, but Recursive option is not set", fpath)
}
}
sf, err := files.NewSerialFile(fpath, params.Hidden, stat)
return path.Base(fpath), sf, err
}
// Add imports files to the cluster from the given paths. A path can
// either be a local filesystem location or an web url (http:// or https://).
// In the latter case, the destination will be downloaded with a GET request.
// The AddParams allow to control different options, like enabling the
// sharding the resulting DAG across the IPFS daemons of multiple cluster
// peers. The output channel will receive regular updates as the adding
// process progresses.
func (c *defaultClient) Add(
ctx context.Context,
paths []string,
params api.AddParams,
out chan<- api.AddedOutput,
) error {
ctx, span := trace.StartSpan(ctx, "client/Add")
defer span.End()
addFiles := make([]files.DirEntry, len(paths))
for i, p := range paths {
u, err := url.Parse(p)
if err != nil {
close(out)
return fmt.Errorf("error parsing path: %s", err)
}
var name string
var addFile files.Node
if strings.HasPrefix(u.Scheme, "http") {
addFile = files.NewWebFile(u)
name = path.Base(u.Path)
} else {
if params.NoCopy {
close(out)
return fmt.Errorf("nocopy option is only valid for URLs")
}
name, addFile, err = makeSerialFile(p, params)
if err != nil {
close(out)
return err
}
}
addFiles[i] = files.FileEntry(name, addFile)
}
sliceFile := files.NewSliceDirectory(addFiles)
// If `form` is set to true, the multipart data will have
// a Content-Type of 'multipart/form-data', if `form` is false,
// the Content-Type will be 'multipart/mixed'.
return c.AddMultiFile(ctx, files.NewMultiFileReader(sliceFile, true), params, out)
}
// AddMultiFile imports new files from a MultiFileReader. See Add().
func (c *defaultClient) AddMultiFile(
ctx context.Context,
multiFileR *files.MultiFileReader,
params api.AddParams,
out chan<- api.AddedOutput,
) error {
ctx, span := trace.StartSpan(ctx, "client/AddMultiFile")
defer span.End()
defer close(out)
headers := make(map[string]string)
headers["Content-Type"] = "multipart/form-data; boundary=" + multiFileR.Boundary()
// This method must run with StreamChannels set.
params.StreamChannels = true
queryStr, err := params.ToQueryString()
if err != nil {
return err
}
// our handler decodes an AddedOutput and puts it
// in the out channel.
handler := func(dec *json.Decoder) error {
if out == nil {
return nil
}
var obj api.AddedOutput
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
err = c.doStream(ctx,
"POST",
"/add?"+queryStr,
headers,
multiFileR,
handler,
)
return err
}