depot/packages/networking/ipfs-cluster/pintracker/stateless/stateless_test.go

576 lines
13 KiB
Go
Raw Permalink Normal View History

2022-10-19 23:23:11 +03:00
package stateless
import (
"context"
"errors"
"testing"
"time"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/datastore/inmem"
"github.com/ipfs-cluster/ipfs-cluster/state"
"github.com/ipfs-cluster/ipfs-cluster/state/dsstate"
"github.com/ipfs-cluster/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p/core/peer"
2022-10-19 23:23:11 +03:00
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var (
pinCancelCid = test.Cid3
unpinCancelCid = test.Cid2
pinErrCid = test.ErrorCid
errPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
errUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
pinOpts = api.PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
}
)
// func TestMain(m *testing.M) {
// logging.SetLogLevel("pintracker", "debug")
// os.Exit(m.Run())
// }
// Overwrite Pin and Unpin methods on the normal mock in order to return
// special errors when unwanted operations have been triggered.
type mockIPFS struct{}
func (mock *mockIPFS) Pin(ctx context.Context, in api.Pin, out *struct{}) error {
switch in.Cid {
case pinCancelCid:
return errPinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
case pinErrCid:
return errors.New("error pinning")
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in api.Pin, out *struct{}) error {
switch in.Cid {
case unpinCancelCid:
return errUnpinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
case pinErrCid:
return errors.New("error unpinning")
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in <-chan []string, out chan<- api.IPFSPinInfo) error {
out <- api.IPFSPinInfo{
Cid: api.Cid(test.Cid1),
Type: api.IPFSPinStatusRecursive,
}
out <- api.IPFSPinInfo{
Cid: api.Cid(test.Cid2),
Type: api.IPFSPinStatusRecursive,
}
close(out)
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in api.Pin, out *api.IPFSPinStatus) error {
switch in.Cid {
case test.Cid1, test.Cid2:
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
return nil
}
return nil
}
type mockCluster struct{}
func (mock *mockCluster) IPFSID(ctx context.Context, in peer.ID, out *api.IPFSID) error {
addr, _ := api.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/p2p/" + test.PeerID1.Pretty())
*out = api.IPFSID{
ID: test.PeerID1,
Addresses: []api.Multiaddr{addr},
}
return nil
}
func mockRPCClient(t testing.TB) *rpc.Client {
t.Helper()
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
err = s.RegisterName("Cluster", &mockCluster{})
if err != nil {
t.Fatal(err)
}
return c
}
func getStateFunc(t testing.TB, items ...api.Pin) func(context.Context) (state.ReadOnly, error) {
t.Helper()
ctx := context.Background()
st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
for _, item := range items {
err := st.Add(ctx, item)
if err != nil {
t.Fatal(err)
}
}
return func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
}
func testStatelessPinTracker(t testing.TB, pins ...api.Pin) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
cfg.PriorityPinMaxAge = 10 * time.Second
cfg.PriorityPinMaxRetries = 1
spt := New(cfg, test.PeerID1, test.PeerName1, getStateFunc(t, pins...))
spt.SetClient(mockRPCClient(t))
return spt
}
func TestStatelessPinTracker_New(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
}
func TestStatelessPinTracker_Shutdown(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
err := spt.Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
err = spt.Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
}
func TestUntrackTrack(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
h1 := test.Cid1
// LocalPin
c := api.PinWithOpts(h1, pinOpts)
err := spt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
err = spt.Untrack(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
}
func TestTrackUntrackWithCancel(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
// LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // let pinning start
pInfo := spt.optracker.Get(ctx, slowPin.Cid, api.IPFSID{})
if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("slowPin should be tracked")
}
if pInfo.Status == api.TrackerStatusPinning {
go func() {
err = spt.Untrack(ctx, slowPinCid)
if err != nil {
t.Error(err)
return
}
}()
select {
case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been canceled by now")
}
} else {
t.Error("slowPin should be pinning and is:", pInfo.Status)
}
}
// This tracks a slow CID and then tracks a fast/normal one.
// Because we are pinning the slow CID, the fast one will stay
// queued. We proceed to untrack it then. Since it was never
// "pinning", it should simply be unqueued (or ignored), and no
// canceling of the pinning operation happens (unlike on WithCancel).
func TestTrackUntrackWithNoCancel(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
fastPinCid := pinCancelCid
// SlowLocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
// LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
// Otherwise fails when running with -race
time.Sleep(300 * time.Millisecond)
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
// fastPin should be queued because slow pin is pinning
fastPInfo := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{})
if fastPInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("fastPin should be tracked")
}
if fastPInfo.Status == api.TrackerStatusPinQueued {
err = spt.Untrack(ctx, fastPinCid)
if err != nil {
t.Fatal(err)
}
// pi := spt.get(fastPinCid)
// if pi.Error == ErrPinCancelCid.Error() {
// t.Fatal(ErrPinCancelCid)
// }
} else {
t.Errorf("fastPin should be queued to pin but is %s", fastPInfo.Status)
}
pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{})
if !pi.Cid.Defined() {
t.Error("fastPin should have been removed from tracker")
}
}
func TestUntrackTrackWithCancel(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
// LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Untrack should cancel the ongoing request
// and unpin right away
err = spt.Untrack(ctx, slowPinCid)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
pi := spt.optracker.Get(ctx, slowPin.Cid, api.IPFSID{})
if !pi.Cid.Defined() {
t.Fatal("expected slowPin to be tracked")
}
if pi.Status == api.TrackerStatusUnpinning {
go func() {
err = spt.Track(ctx, slowPin)
if err != nil {
t.Error(err)
return
}
}()
select {
case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been canceled by now")
}
} else {
t.Error("slowPin should be in unpinning")
}
}
func TestUntrackTrackWithNoCancel(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
fastPinCid := unpinCancelCid
// SlowLocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
// LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
err = spt.Untrack(ctx, slowPin.Cid)
if err != nil {
t.Fatal(err)
}
err = spt.Untrack(ctx, fastPin.Cid)
if err != nil {
t.Fatal(err)
}
pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{})
if !pi.Cid.Defined() {
t.Fatal("c untrack operation should be tracked")
}
if pi.Status == api.TrackerStatusUnpinQueued {
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
// pi := spt.get(fastPinCid)
// if pi.Error == ErrUnpinCancelCid.Error() {
// t.Fatal(ErrUnpinCancelCid)
// }
} else {
t.Error("c should be queued to unpin")
}
}
// TestStatusAll checks that StatusAll correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatusAll(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
spt := testStatelessPinTracker(t, normalPin, normalPin2, slowPin)
defer spt.Shutdown(ctx)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Needs to return:
// * A slow CID pinning
// * Cid1 is pinned
// * Cid4 should be in PinError (it's in the state but not on IPFS)
stAll := make(chan api.PinInfo, 10)
err = spt.StatusAll(ctx, api.TrackerStatusUndefined, stAll)
if err != nil {
t.Fatal(err)
}
n := 0
for pi := range stAll {
n++
switch pi.Cid {
case test.Cid1:
if pi.Status != api.TrackerStatusPinned {
t.Error(test.Cid1, " should be pinned")
}
case test.Cid4:
if pi.Status != api.TrackerStatusUnexpectedlyUnpinned {
t.Error(test.Cid2, " should be in unexpectedly_unpinned status")
}
case test.SlowCid1:
if pi.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
default:
t.Error("Unexpected pin:", pi.Cid)
}
if pi.IPFS == "" {
t.Error("IPFS field should be set")
}
}
if n != 3 {
t.Errorf("wrong status length. Expected 3, got: %d", n)
}
}
// TestStatus checks that the Status calls correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatus(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
spt := testStatelessPinTracker(t, normalPin, normalPin2)
defer spt.Shutdown(ctx)
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Status needs to return:
// * For slowCid1: A slow CID pinning
// * For Cid1: pinned
// * For Cid4: unexpectedly_unpinned
st := spt.Status(ctx, test.Cid1)
if st.Status != api.TrackerStatusPinned {
t.Error("cid1 should be pinned")
}
st = spt.Status(ctx, test.Cid4)
if st.Status != api.TrackerStatusUnexpectedlyUnpinned {
t.Error("cid2 should be in unexpectedly_unpinned status")
}
st = spt.Status(ctx, test.SlowCid1)
if st.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
if st.IPFS == "" {
t.Error("IPFS field should be set")
}
}
// Test
func TestAttemptCountAndPriority(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
errPin := api.PinWithOpts(pinErrCid, pinOpts)
spt := testStatelessPinTracker(t, normalPin, normalPin2, errPin)
defer spt.Shutdown(ctx)
st := spt.Status(ctx, test.Cid1)
if st.AttemptCount != 0 {
t.Errorf("errPin should have 0 attempts as it was already pinned: %+v", st)
}
err := spt.Track(ctx, errPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond) // let the pin be applied
st = spt.Status(ctx, pinErrCid)
if st.AttemptCount != 1 {
t.Errorf("errPin should have 1 attempt count: %+v", st)
}
// Retry 1
_, err = spt.Recover(ctx, pinErrCid)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond) // let the pin be applied
st = spt.Status(ctx, pinErrCid)
if st.AttemptCount != 2 || !st.PriorityPin {
t.Errorf("errPin should have 2 attempt counts and be priority: %+v", st)
}
// Retry 2
_, err = spt.Recover(ctx, pinErrCid)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond) // let the pin be applied
st = spt.Status(ctx, pinErrCid)
if st.AttemptCount != 3 || st.PriorityPin {
t.Errorf("errPin should have 3 attempts and not be priority: %+v", st)
}
err = spt.Untrack(ctx, pinErrCid)
time.Sleep(200 * time.Millisecond) // let the pin be applied
if err != nil {
t.Fatal(err)
}
st = spt.Status(ctx, pinErrCid)
if st.AttemptCount != 1 {
t.Errorf("errPin should have 1 attempt count to unpin: %+v", st)
}
err = spt.Untrack(ctx, pinErrCid)
time.Sleep(200 * time.Millisecond) // let the pin be applied
if err != nil {
t.Fatal(err)
}
st = spt.Status(ctx, pinErrCid)
if st.AttemptCount != 2 {
t.Errorf("errPin should have 2 attempt counts to unpin: %+v", st)
}
}