fix: refuse schedule compaction tasks if there is no slot (#37809)

See #37621


pr: #37589

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Yinzuo Jiang <jiangyinzuo@foxmail.com>
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
Co-authored-by: Yinzuo Jiang <jiangyinzuo@foxmail.com>
Co-authored-by: yangxuan <xuan.yang@zilliz.com>
Co-authored-by: wei liu <wei.liu@zilliz.com>
This commit is contained in:
Ted Xu 2024-11-25 14:02:34 +08:00 committed by GitHub
parent 370f39db67
commit e928e15bfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 196 additions and 58 deletions

8
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/protobuf v1.5.4
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
@ -65,7 +65,7 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7
require (
github.com/bits-and-blooms/bitset v1.10.0
github.com/bytedance/sonic v1.9.1
github.com/bytedance/sonic v1.12.4
github.com/greatroar/blobloom v0.8.0
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
@ -95,11 +95,13 @@ require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/campoy/embedmd v1.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect

22
go.sum
View File

@ -130,9 +130,11 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/bytedance/sonic v1.12.4 h1:9Csb3c9ZJhfUWeMtpCDCq6BUoH5ogfDFLUgQ/jG+R0k=
github.com/bytedance/sonic v1.12.4/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM=
github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY=
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
github.com/casbin/casbin/v2 v2.0.0/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
@ -150,15 +152,20 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.11.0 h1:V8gS/bTCCjX9uUnkUFUpPsksM8n1lXBAvHcpiFk1X2Y=
github.com/cilium/ebpf v0.11.0/go.mod h1:WE7CZAnqOL2RouJ4f1uyNhqr2P4CCvXFIqdRDUgWsVs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@ -541,6 +548,7 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
@ -1003,7 +1011,6 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -1525,6 +1532,7 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/apimachinery v0.28.6 h1:RsTeR4z6S07srPg6XYrwXpTJVMXsjPXn0ODakMytSW0=
k8s.io/apimachinery v0.28.6/go.mod h1:QFNX/kCl/EMT2WTSz8k4WLCv2XnkOLMaL8GAVRMdpsA=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=

View File

@ -197,6 +197,21 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann
}
func (c *compactionPlanHandler) schedule() []CompactionTask {
selected := make([]CompactionTask, 0)
if c.queueTasks.Len() == 0 {
return selected
}
var (
parallelism = Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
slots map[int64]int64
)
c.executingGuard.Lock()
if len(c.executingTasks) >= parallelism {
return selected
}
c.executingGuard.Unlock()
l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
@ -225,21 +240,20 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
c.queueTasks.Enqueue(t)
}
}()
selected := make([]CompactionTask, 0)
p := getPrioritizer()
if &c.queueTasks.prioritizer != &p {
c.queueTasks.UpdatePrioritizer(p)
}
c.executingGuard.Lock()
tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks)
c.executingGuard.Unlock()
for len(selected) < tasksToGo && c.queueTasks.Len() > 0 {
// The schedule loop will stop if either:
// 1. no more task to schedule (the task queue is empty)
// 2. the parallelism of running tasks is reached
// 3. no avaiable slots
for {
t, err := c.queueTasks.Dequeue()
if err != nil {
// Will never go here
return selected
break // 1. no more task to schedule
}
switch t.GetType() {
@ -271,11 +285,28 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
selected = append(selected, t)
}
if t.NeedReAssignNodeID() {
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
log.RatedWarn(10, "not enough slots for compaction task", zap.Int64("planID", t.GetPlanID()))
selected = selected[:len(selected)-1]
excluded = append(excluded, t)
break // 3. no avaiable slots
}
}
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
if len(c.executingTasks) >= parallelism {
c.executingGuard.Unlock()
break // 2. the parallelism of running tasks is reached
}
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}
return selected
}
@ -597,49 +628,51 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
return task, nil
}
func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
slots := c.cluster.QuerySlots()
func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
if len(slots) == 0 {
return
return NullNodeID
}
for _, t := range tasks {
nodeID, useSlot := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))
continue
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err))
} else {
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}
nodeID, useSlot := pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))
return NullNodeID
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err))
return NullNodeID
}
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
return nodeID
}
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
var needAssignIDTasks []CompactionTask
// Assign node id if needed
var slots map[int64]int64
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.NeedReAssignNodeID() {
needAssignIDTasks = append(needAssignIDTasks, t)
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
break
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}
}
c.executingGuard.RUnlock()
if len(needAssignIDTasks) > 0 {
c.assignNodeIDs(needAssignIDTasks)
}
var finishedTasks []CompactionTask
c.executingGuard.RLock()
@ -663,7 +696,7 @@ func (c *compactionPlanHandler) checkCompaction() error {
return nil
}
func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

View File

@ -47,6 +47,9 @@ func (t *mixCompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
if err != nil {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here.
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false

View File

@ -45,11 +45,12 @@ type CompactionPlanHandlerSuite struct {
mockCm *MockChannelManager
mockSessMgr *MockSessionManager
handler *compactionPlanHandler
cluster Cluster
cluster *MockCluster
}
func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Maybe()
s.mockAlloc = NewNMockAllocator(s.T())
s.mockCm = NewMockChannelManager(s.T())
s.mockSessMgr = NewMockSessionManager(s.T())
@ -224,6 +225,100 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
}
}
func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
tests := []struct {
description string
tasks []CompactionTask
plans []*datapb.CompactionPlan
expectedOut []UniqueID // planID
}{
{
"2 L0 tasks, only 1 can be scheduled",
[]CompactionTask{
&l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-10",
},
plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
&l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
},
plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_Level0DeleteCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
},
[]*datapb.CompactionPlan{
{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction},
{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
},
[]UniqueID{10},
},
{
"2 Mix tasks, only 1 can be scheduled",
[]CompactionTask{
&mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 14,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-2",
},
plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
&mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
},
plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
},
[]*datapb.CompactionPlan{
{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction},
{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
},
[]UniqueID{13},
},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.cluster.EXPECT().QuerySlots().Return(map[int64]int64{
101: 8,
}).Maybe()
s.generateInitTasksForSchedule()
// submit the testing tasks
for _, t := range test.tasks {
s.handler.submitTask(t)
}
gotTasks := s.handler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetPlanID()
}))
})
}
}
func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
@ -383,7 +478,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
100: 16,
101: 23,
}
node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
node, useSlot := pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
@ -391,7 +486,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
node, useSlot = pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
@ -399,7 +494,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot
node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
node, useSlot = pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
@ -407,7 +502,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
node, useSlot = pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}
@ -430,7 +525,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
},
}
s.handler.executingTasks = executingTasks
node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
node, useSlot := pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
@ -438,7 +533,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
node, useSlot = pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
@ -617,7 +712,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
task := &datapb.CompactionTask{
@ -745,7 +839,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
// s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
// s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn(
func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if t.GetPlanID() == 2 {
@ -827,7 +920,6 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.SetupTest()
// s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(

View File

@ -1522,7 +1522,7 @@ func TestSearchV2(t *testing.T) {
queryTestCases = append(queryTestCases, requestBodyTestCase{
path: SearchAction,
requestBody: []byte(`{"collectionName": "book", "data": [["0.1", "0.2"]], "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"], "params": {"radius":0.9, "range_filter": 0.1}, "groupingField": "test"}`),
errMsg: "can only accept json format request, error: json: cannot unmarshal string into Go value of type float32: invalid parameter[expected=FloatVector][actual=[\"0.1\", \"0.2\"]]",
errMsg: "can only accept json format request, error: Mismatch type float32 with value string \"at index 8: mismatched type with value\\n\\n\\t[\\\"0.1\\\", \\\"0.2\\\"]\\n\\t........^.....\\n\": invalid parameter[expected=FloatVector][actual=[\"0.1\", \"0.2\"]]",
errCode: 1801,
})
queryTestCases = append(queryTestCases, requestBodyTestCase{
@ -1564,7 +1564,7 @@ func TestSearchV2(t *testing.T) {
queryTestCases = append(queryTestCases, requestBodyTestCase{
path: SearchAction,
requestBody: []byte(`{"collectionName": "book", "data": [[0.1, 0.2]], "annsField": "binaryVector", "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"]}`),
errMsg: "can only accept json format request, error: json: cannot unmarshal number 0.1 into Go value of type uint8: invalid parameter[expected=BinaryVector][actual=[[0.1, 0.2]]]",
errMsg: "can only accept json format request, error: Mismatch type uint8 with value number \"at index 7: mismatched type with value\\n\\n\\t[[0.1, 0.2]]\\n\\t.......^....\\n\": invalid parameter[expected=BinaryVector][actual=[[0.1, 0.2]]]",
errCode: 1801,
})
queryTestCases = append(queryTestCases, requestBodyTestCase{