milvus/internal/datanode/syncmgr/key_lock_dispatcher_test.go
yihao.dai b1e74dc7cb
enhance: [cherry-pick] Decouple compaction from shard (#34157)
This PR cherry-picks the following commits:

- Implement task limit control logic in datanode.
https://github.com/milvus-io/milvus/pull/32881
- Load bf from storage instead of memory during L0 compaction.
https://github.com/milvus-io/milvus/pull/32913
- Remove dependencies on shards (e.g. SyncSegments, injection).
https://github.com/milvus-io/milvus/pull/33138
- Rename Compaction interface to CompactionV2.
https://github.com/milvus-io/milvus/pull/33858
- Remove the unused residual compaction logic.
https://github.com/milvus-io/milvus/pull/33932

issue: https://github.com/milvus-io/milvus/issues/32809

pr: https://github.com/milvus-io/milvus/pull/32881,
https://github.com/milvus-io/milvus/pull/32913,
https://github.com/milvus-io/milvus/pull/33138,
https://github.com/milvus-io/milvus/pull/33858,
https://github.com/milvus-io/milvus/pull/33932

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2024-06-25 20:22:03 +08:00

103 lines
1.9 KiB
Go

package syncmgr
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
)
/*
type mockTask struct {
targetID int64
ch chan struct{}
err error
}
func (t *mockTask) done() {
close(t.ch)
}
func (t *mockTask) SegmentID() int64 { panic("no implementation") }
func (t *mockTask) Checkpoint() *msgpb.MsgPosition { panic("no implementation") }
func (t *mockTask) StartPosition() *msgpb.MsgPosition { panic("no implementation") }
func (t *mockTask) ChannelName() string { panic("no implementation") }
func (t *mockTask) Run() error {
<-t.ch
return t.err
}
func newMockTask(err error) *mockTask {
return &mockTask{
err: err,
ch: make(chan struct{}),
}
}*/
type KeyLockDispatcherSuite struct {
suite.Suite
}
func (s *KeyLockDispatcherSuite) TestKeyLock() {
d := newKeyLockDispatcher[int64](2)
done := make(chan struct{})
t1 := NewMockTask(s.T())
t1.EXPECT().Run().Run(func() {
<-done
}).Return(nil)
t2 := NewMockTask(s.T())
t2.EXPECT().Run().Return(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
go func() {
d.Submit(1, t2)
sig.Store(true)
}()
s.False(sig.Load(), "task 2 will never be submit before task 1 done")
close(done)
s.Eventually(sig.Load, time.Second, time.Millisecond*100)
}
func (s *KeyLockDispatcherSuite) TestCap() {
d := newKeyLockDispatcher[int64](1)
t1 := NewMockTask(s.T())
t2 := NewMockTask(s.T())
done := make(chan struct{})
t1.EXPECT().Run().Run(func() {
<-done
}).Return(nil)
t2.EXPECT().Run().Return(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
go func() {
// defer t2.done()
d.Submit(2, t2)
sig.Store(true)
}()
s.False(sig.Load(), "task 2 will never be submit before task 1 done")
close(done)
s.Eventually(sig.Load, time.Second, time.Millisecond*100)
}
func TestKeyLockDispatcher(t *testing.T) {
suite.Run(t, new(KeyLockDispatcherSuite))
}