From e928e15bfcecea5b535b9212a31632882cc302c8 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Mon, 25 Nov 2024 14:02:34 +0800 Subject: [PATCH] fix: refuse schedule compaction tasks if there is no slot (#37809) See #37621 pr: #37589 --------- Signed-off-by: Ted Xu Signed-off-by: Yinzuo Jiang Signed-off-by: yangxuan Signed-off-by: Wei Liu Co-authored-by: Yinzuo Jiang Co-authored-by: yangxuan Co-authored-by: wei liu --- go.mod | 8 +- go.sum | 22 ++-- internal/datacoord/compaction.go | 105 ++++++++++------ internal/datacoord/compaction_task_mix.go | 3 + internal/datacoord/compaction_test.go | 112 ++++++++++++++++-- .../proxy/httpserver/handler_v2_test.go | 4 +- 6 files changed, 196 insertions(+), 58 deletions(-) diff --git a/go.mod b/go.mod index a5b4e214a1..3572e27641 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/protobuf v1.5.4 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 @@ -65,7 +65,7 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7 require ( github.com/bits-and-blooms/bitset v1.10.0 - github.com/bytedance/sonic v1.9.1 + github.com/bytedance/sonic v1.12.4 github.com/greatroar/blobloom v0.8.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 @@ -95,11 +95,13 @@ require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/campoy/embedmd v1.0.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cilium/ebpf v0.11.0 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect github.com/cockroachdb/redact v1.1.3 // indirect github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect diff --git a/go.sum b/go.sum index cf279bd693..f8a17a1a0a 100644 --- a/go.sum +++ b/go.sum @@ -130,9 +130,11 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= -github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= +github.com/bytedance/sonic v1.12.4 h1:9Csb3c9ZJhfUWeMtpCDCq6BUoH5ogfDFLUgQ/jG+R0k= +github.com/bytedance/sonic v1.12.4/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM= +github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY= github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= github.com/casbin/casbin/v2 v2.0.0/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= @@ -150,15 +152,20 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.11.0 h1:V8gS/bTCCjX9uUnkUFUpPsksM8n1lXBAvHcpiFk1X2Y= github.com/cilium/ebpf v0.11.0/go.mod h1:WE7CZAnqOL2RouJ4f1uyNhqr2P4CCvXFIqdRDUgWsVs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -541,6 +548,7 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -1003,7 +1011,6 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1525,6 +1532,7 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/apimachinery v0.28.6 h1:RsTeR4z6S07srPg6XYrwXpTJVMXsjPXn0ODakMytSW0= k8s.io/apimachinery v0.28.6/go.mod h1:QFNX/kCl/EMT2WTSz8k4WLCv2XnkOLMaL8GAVRMdpsA= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 6739b42ae1..1fbe93d9d2 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -197,6 +197,21 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann } func (c *compactionPlanHandler) schedule() []CompactionTask { + selected := make([]CompactionTask, 0) + if c.queueTasks.Len() == 0 { + return selected + } + var ( + parallelism = Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() + slots map[int64]int64 + ) + + c.executingGuard.Lock() + if len(c.executingTasks) >= parallelism { + return selected + } + c.executingGuard.Unlock() + l0ChannelExcludes := typeutil.NewSet[string]() mixChannelExcludes := typeutil.NewSet[string]() clusterChannelExcludes := typeutil.NewSet[string]() @@ -225,21 +240,20 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { c.queueTasks.Enqueue(t) } }() - selected := make([]CompactionTask, 0) p := getPrioritizer() if &c.queueTasks.prioritizer != &p { c.queueTasks.UpdatePrioritizer(p) } - c.executingGuard.Lock() - tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks) - c.executingGuard.Unlock() - for len(selected) < tasksToGo && c.queueTasks.Len() > 0 { + // The schedule loop will stop if either: + // 1. no more task to schedule (the task queue is empty) + // 2. the parallelism of running tasks is reached + // 3. no avaiable slots + for { t, err := c.queueTasks.Dequeue() if err != nil { - // Will never go here - return selected + break // 1. no more task to schedule } switch t.GetType() { @@ -271,11 +285,28 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { selected = append(selected, t) } + if t.NeedReAssignNodeID() { + if slots == nil { + slots = c.cluster.QuerySlots() + } + id := assignNodeID(slots, t) + if id == NullNodeID { + log.RatedWarn(10, "not enough slots for compaction task", zap.Int64("planID", t.GetPlanID())) + selected = selected[:len(selected)-1] + excluded = append(excluded, t) + break // 3. no avaiable slots + } + } + c.executingGuard.Lock() c.executingTasks[t.GetPlanID()] = t + if len(c.executingTasks) >= parallelism { + c.executingGuard.Unlock() + break // 2. the parallelism of running tasks is reached + } c.executingGuard.Unlock() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec() - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() } return selected } @@ -597,49 +628,51 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com return task, nil } -func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { - slots := c.cluster.QuerySlots() +func assignNodeID(slots map[int64]int64, t CompactionTask) int64 { if len(slots) == 0 { - return + return NullNodeID } - for _, t := range tasks { - nodeID, useSlot := c.pickAnyNode(slots, t) - if nodeID == NullNodeID { - log.Info("compactionHandler cannot find datanode for compaction task", - zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel())) - continue - } - err := t.SetNodeID(nodeID) - if err != nil { - log.Info("compactionHandler assignNodeID failed", - zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err)) - } else { - // update the input nodeSlots - slots[nodeID] = slots[nodeID] - useSlot - log.Info("compactionHandler assignNodeID success", - zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() - } + nodeID, useSlot := pickAnyNode(slots, t) + if nodeID == NullNodeID { + log.Info("compactionHandler cannot find datanode for compaction task", + zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel())) + return NullNodeID } + err := t.SetNodeID(nodeID) + if err != nil { + log.Info("compactionHandler assignNodeID failed", + zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err)) + return NullNodeID + } + // update the input nodeSlots + slots[nodeID] = slots[nodeID] - useSlot + log.Info("compactionHandler assignNodeID success", + zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) + return nodeID } func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. - var needAssignIDTasks []CompactionTask + // Assign node id if needed + var slots map[int64]int64 c.executingGuard.RLock() for _, t := range c.executingTasks { if t.NeedReAssignNodeID() { - needAssignIDTasks = append(needAssignIDTasks, t) + if slots == nil { + slots = c.cluster.QuerySlots() + } + id := assignNodeID(slots, t) + if id == NullNodeID { + break + } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() } } c.executingGuard.RUnlock() - if len(needAssignIDTasks) > 0 { - c.assignNodeIDs(needAssignIDTasks) - } var finishedTasks []CompactionTask c.executingGuard.RLock() @@ -663,7 +696,7 @@ func (c *compactionPlanHandler) checkCompaction() error { return nil } -func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { +func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 0b21224c3d..3085d4e7e8 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -47,6 +47,9 @@ func (t *mixCompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) if err != nil { + // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset + // to enable a retry in compaction.checkCompaction(). + // This is tricky, we should remove the reassignment here. log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e38c47cf34..a4b7e8a7fb 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -45,11 +45,12 @@ type CompactionPlanHandlerSuite struct { mockCm *MockChannelManager mockSessMgr *MockSessionManager handler *compactionPlanHandler - cluster Cluster + cluster *MockCluster } func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Maybe() s.mockAlloc = NewNMockAllocator(s.T()) s.mockCm = NewMockChannelManager(s.T()) s.mockSessMgr = NewMockSessionManager(s.T()) @@ -224,6 +225,100 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { } } +func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { + tests := []struct { + description string + tasks []CompactionTask + plans []*datapb.CompactionPlan + expectedOut []UniqueID // planID + }{ + { + "2 L0 tasks, only 1 can be scheduled", + []CompactionTask{ + &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 10, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-10", + }, + plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 10, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + }, + plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_Level0DeleteCompaction}, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + []*datapb.CompactionPlan{ + {PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}, + {PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, + }, + []UniqueID{10}, + }, + { + "2 Mix tasks, only 1 can be scheduled", + []CompactionTask{ + &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 14, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-2", + }, + plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + + &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 13, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + }, + plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + []*datapb.CompactionPlan{ + {PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}, + {PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, + }, + []UniqueID{13}, + }, + } + + for _, test := range tests { + s.Run(test.description, func() { + s.SetupTest() + s.cluster.EXPECT().QuerySlots().Return(map[int64]int64{ + 101: 8, + }).Maybe() + s.generateInitTasksForSchedule() + // submit the testing tasks + for _, t := range test.tasks { + s.handler.submitTask(t) + } + + gotTasks := s.handler.schedule() + s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { + return t.GetPlanID() + })) + }) + } +} + func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { // dataNode 102's paralleTasks has running L0 tasks // nothing of the same channel will be able to schedule @@ -383,7 +478,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { 100: 16, 101: 23, } - node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + node, useSlot := pickAnyNode(nodeSlots, &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ Type: datapb.CompactionType_MixCompaction, }, @@ -391,7 +486,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + node, useSlot = pickAnyNode(nodeSlots, &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ Type: datapb.CompactionType_MixCompaction, }, @@ -399,7 +494,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { s.Equal(int64(100), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + node, useSlot = pickAnyNode(nodeSlots, &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ Type: datapb.CompactionType_MixCompaction, }, @@ -407,7 +502,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) + node, useSlot = pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) s.Equal(int64(NullNodeID), node) } @@ -430,7 +525,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { }, } s.handler.executingTasks = executingTasks - node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + node, useSlot := pickAnyNode(nodeSlots, &clusteringCompactionTask{ CompactionTask: &datapb.CompactionTask{ Type: datapb.CompactionType_ClusteringCompaction, }, @@ -438,7 +533,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + node, useSlot = pickAnyNode(nodeSlots, &clusteringCompactionTask{ CompactionTask: &datapb.CompactionTask{ Type: datapb.CompactionType_ClusteringCompaction, }, @@ -617,7 +712,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) task := &datapb.CompactionTask{ @@ -745,7 +839,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { // s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil) // s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn( func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { if t.GetPlanID() == 2 { @@ -827,7 +920,6 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.SetupTest() // s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return( diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index e23e035e9e..87e83c815c 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -1522,7 +1522,7 @@ func TestSearchV2(t *testing.T) { queryTestCases = append(queryTestCases, requestBodyTestCase{ path: SearchAction, requestBody: []byte(`{"collectionName": "book", "data": [["0.1", "0.2"]], "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"], "params": {"radius":0.9, "range_filter": 0.1}, "groupingField": "test"}`), - errMsg: "can only accept json format request, error: json: cannot unmarshal string into Go value of type float32: invalid parameter[expected=FloatVector][actual=[\"0.1\", \"0.2\"]]", + errMsg: "can only accept json format request, error: Mismatch type float32 with value string \"at index 8: mismatched type with value\\n\\n\\t[\\\"0.1\\\", \\\"0.2\\\"]\\n\\t........^.....\\n\": invalid parameter[expected=FloatVector][actual=[\"0.1\", \"0.2\"]]", errCode: 1801, }) queryTestCases = append(queryTestCases, requestBodyTestCase{ @@ -1564,7 +1564,7 @@ func TestSearchV2(t *testing.T) { queryTestCases = append(queryTestCases, requestBodyTestCase{ path: SearchAction, requestBody: []byte(`{"collectionName": "book", "data": [[0.1, 0.2]], "annsField": "binaryVector", "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"]}`), - errMsg: "can only accept json format request, error: json: cannot unmarshal number 0.1 into Go value of type uint8: invalid parameter[expected=BinaryVector][actual=[[0.1, 0.2]]]", + errMsg: "can only accept json format request, error: Mismatch type uint8 with value number \"at index 7: mismatched type with value\\n\\n\\t[[0.1, 0.2]]\\n\\t.......^....\\n\": invalid parameter[expected=BinaryVector][actual=[[0.1, 0.2]]]", errCode: 1801, }) queryTestCases = append(queryTestCases, requestBodyTestCase{