diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 7d62e303ee..c924945110 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -146,7 +146,10 @@ class OffsetOrderedMap : public OffsetMap { seg_offsets.reserve(limit); auto it = map_.begin(); for (; hit_num < limit && it != map_.end(); it++) { - for (auto seg_offset : it->second) { + // Offsets in the growing segment are ordered by timestamp, + // so traverse from back to front to obtain the latest offset. + for (int i = it->second.size() - 1; i >= 0; --i) { + auto seg_offset = it->second[i]; if (seg_offset >= size) { // Frequently concurrent insert/query will cause this case. continue; @@ -155,9 +158,8 @@ class OffsetOrderedMap : public OffsetMap { if (!(bitset[seg_offset] ^ false_filtered_out)) { seg_offsets.push_back(seg_offset); hit_num++; - if (hit_num >= limit) { - break; - } + // PK hit, no need to continue traversing offsets with the same PK. + break; } } } diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 3b7991abee..2d439d792c 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -847,7 +847,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); ASSERT_TRUE(suc); - ASSERT_EQ(query_result->ids().int_id().data().size(), 6); + ASSERT_EQ(query_result->ids().int_id().data().size(), 3); DeleteRetrieveResult(&retrieve_result); // delete data pks = {1, 2, 3} diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index c75b40fb88..9d6b9d4f24 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -343,10 +343,10 @@ func (s *importScheduler) processFailed(task ImportTask) { if task.GetType() == ImportTaskType { segments := task.(*importTask).GetSegmentIDs() for _, segment := range segments { - err := s.meta.DropSegment(segment) + op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped) + err := s.meta.UpdateSegmentsInfo(op) if err != nil { - log.Warn("drop import segment failed", - WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...) + log.Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...) return } } diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 0f9acf578f..2f20131451 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -247,7 +248,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { }) for _, id := range task.(*importTask).GetSegmentIDs() { segment := &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: id, IsImporting: true}, + SegmentInfo: &datapb.SegmentInfo{ID: id, State: commonpb.SegmentState_Importing, IsImporting: true}, } err = s.meta.AddSegment(context.Background(), segment) s.NoError(err) @@ -258,11 +259,11 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { } s.cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil) - s.catalog.EXPECT().DropSegment(mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil) s.scheduler.process() for _, id := range task.(*importTask).GetSegmentIDs() { segment := s.meta.GetSegment(id) - s.Nil(segment) + s.Equal(commonpb.SegmentState_Dropped, segment.GetState()) } task = s.imeta.GetTask(task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_Failed, task.GetState()) diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 7759a0deeb..cbcb5e9887 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -90,7 +90,7 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs pksInDeleteMsgs[didx] = pks pkTss := delMsg.GetTimestamps() partitionSegments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) partitionGroups := lo.Filter(groups, func(inData *inData, _ int) bool { return delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID }) diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 4d0ab3e2c6..64f6a606e4 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -96,6 +96,7 @@ func NewDispatcher(ctx context.Context, return nil, err } if position != nil && len(position.MsgID) != 0 { + position = typeutil.Clone(position) position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName) err = stream.AsConsumer(ctx, []string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown) if err != nil { @@ -234,7 +235,7 @@ func (d *Dispatcher) work() { } } if err != nil { - t.pos = pack.StartPositions[0] + t.pos = typeutil.Clone(pack.StartPositions[0]) // replace the pChannel with vChannel t.pos.ChannelName = t.vchannel d.lagTargets.Insert(t.vchannel, t) diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index 4f88fd5521..f2fe6a1750 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -182,7 +182,7 @@ func (c *dispatcherManager) tryMerge() { c.mu.Lock() defer c.mu.Unlock() - if c.mainDispatcher == nil { + if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 { return } candidates := make(map[string]struct{}) diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 51c7790b40..34c2a11468 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -46,14 +46,16 @@ func TestManager(t *testing.T) { r := rand.Intn(10) + 1 for j := 0; j < r; j++ { offset++ - t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) - _, err := c.Add(context.Background(), fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, mqwrapper.SubscriptionPositionUnknown) + vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset) + t.Logf("add vchannel, %s", vchannel) + _, err := c.Add(context.Background(), vchannel, nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, offset, c.Num()) } for j := 0; j < rand.Intn(r); j++ { - t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) - c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) + vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset) + t.Logf("remove vchannel, %s", vchannel) + c.Remove(vchannel) offset-- assert.Equal(t, offset, c.Num()) } @@ -72,6 +74,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() c.(*dispatcherManager).tryMerge() assert.Equal(t, 1, c.Num()) @@ -97,6 +105,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key paramtable.Get().Save(checkIntervalK, "0.01") @@ -166,7 +180,7 @@ func (suite *SimulationSuite) SetupSuite() { } func (suite *SimulationSuite) SetupTest() { - suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml-%d-%d", rand.Int(), time.Now().UnixNano()) + suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml_%d", time.Now().UnixNano()) producer, err := newMockProducer(suite.factory, suite.pchannel) assert.NoError(suite.T(), err) suite.producer = producer diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 0d89f4ec40..218d4082cc 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -210,8 +210,16 @@ func GetAvailablePort() int { return listener.Addr().(*net.TCPAddr).Port } +// IsPhysicalChannel checks if the channel is a physical channel +func IsPhysicalChannel(channel string) bool { + return strings.Count(channel, "_") == 1 +} + // ToPhysicalChannel get physical channel name from virtual channel name func ToPhysicalChannel(vchannel string) string { + if IsPhysicalChannel(vchannel) { + return vchannel + } index := strings.LastIndex(vchannel, "_") if index < 0 { return vchannel diff --git a/pkg/util/funcutil/func_test.go b/pkg/util/funcutil/func_test.go index f39fd99477..8ab5bb4988 100644 --- a/pkg/util/funcutil/func_test.go +++ b/pkg/util/funcutil/func_test.go @@ -174,11 +174,17 @@ func TestCheckPortAvailable(t *testing.T) { } func Test_ToPhysicalChannel(t *testing.T) { - assert.Equal(t, "abc", ToPhysicalChannel("abc_")) - assert.Equal(t, "abc", ToPhysicalChannel("abc_123")) - assert.Equal(t, "abc", ToPhysicalChannel("abc_defgsg")) + assert.Equal(t, "abc_", ToPhysicalChannel("abc_")) + assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123")) + assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg")) + assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456")) assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg")) assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef")) + channel := "by-dev-rootcoord-dml_3_449883080965365748v0" + for i := 0; i < 10; i++ { + channel = ToPhysicalChannel(channel) + assert.Equal(t, "by-dev-rootcoord-dml_3", channel) + } } func Test_ConvertChannelName(t *testing.T) {