yihao.dai 2024-07-13 18:23:38 +08:00 committed by GitHub
parent 59ef1115d2
commit 9e593f8128
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 54 additions and 22 deletions

View File

@ -146,7 +146,10 @@ class OffsetOrderedMap : public OffsetMap {
seg_offsets.reserve(limit);
auto it = map_.begin();
for (; hit_num < limit && it != map_.end(); it++) {
for (auto seg_offset : it->second) {
// Offsets in the growing segment are ordered by timestamp,
// so traverse from back to front to obtain the latest offset.
for (int i = it->second.size() - 1; i >= 0; --i) {
auto seg_offset = it->second[i];
if (seg_offset >= size) {
// Frequently concurrent insert/query will cause this case.
continue;
@ -155,9 +158,8 @@ class OffsetOrderedMap : public OffsetMap {
if (!(bitset[seg_offset] ^ false_filtered_out)) {
seg_offsets.push_back(seg_offset);
hit_num++;
if (hit_num >= limit) {
break;
}
// PK hit, no need to continue traversing offsets with the same PK.
break;
}
}
}

View File

@ -847,7 +847,7 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 6);
ASSERT_EQ(query_result->ids().int_id().data().size(), 3);
DeleteRetrieveResult(&retrieve_result);
// delete data pks = {1, 2, 3}

View File

@ -343,10 +343,10 @@ func (s *importScheduler) processFailed(task ImportTask) {
if task.GetType() == ImportTaskType {
segments := task.(*importTask).GetSegmentIDs()
for _, segment := range segments {
err := s.meta.DropSegment(segment)
op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped)
err := s.meta.UpdateSegmentsInfo(op)
if err != nil {
log.Warn("drop import segment failed",
WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
log.Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
return
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -247,7 +248,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
})
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: id, IsImporting: true},
SegmentInfo: &datapb.SegmentInfo{ID: id, State: commonpb.SegmentState_Importing, IsImporting: true},
}
err = s.meta.AddSegment(context.Background(), segment)
s.NoError(err)
@ -258,11 +259,11 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
}
s.cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().DropSegment(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
s.scheduler.process()
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := s.meta.GetSegment(id)
s.Nil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}
task = s.imeta.GetTask(task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Failed, task.GetState())

View File

@ -90,7 +90,7 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs
pksInDeleteMsgs[didx] = pks
pkTss := delMsg.GetTimestamps()
partitionSegments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
partitionGroups := lo.Filter(groups, func(inData *inData, _ int) bool {
return delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID
})

View File

@ -96,6 +96,7 @@ func NewDispatcher(ctx context.Context,
return nil, err
}
if position != nil && len(position.MsgID) != 0 {
position = typeutil.Clone(position)
position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName)
err = stream.AsConsumer(ctx, []string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown)
if err != nil {
@ -234,7 +235,7 @@ func (d *Dispatcher) work() {
}
}
if err != nil {
t.pos = pack.StartPositions[0]
t.pos = typeutil.Clone(pack.StartPositions[0])
// replace the pChannel with vChannel
t.pos.ChannelName = t.vchannel
d.lagTargets.Insert(t.vchannel, t)

View File

@ -182,7 +182,7 @@ func (c *dispatcherManager) tryMerge() {
c.mu.Lock()
defer c.mu.Unlock()
if c.mainDispatcher == nil {
if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 {
return
}
candidates := make(map[string]struct{})

View File

@ -46,14 +46,16 @@ func TestManager(t *testing.T) {
r := rand.Intn(10) + 1
for j := 0; j < r; j++ {
offset++
t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
_, err := c.Add(context.Background(), fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, mqwrapper.SubscriptionPositionUnknown)
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("add vchannel, %s", vchannel)
_, err := c.Add(context.Background(), vchannel, nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, offset, c.Num())
}
for j := 0; j < rand.Intn(r); j++ {
t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("remove vchannel, %s", vchannel)
c.Remove(vchannel)
offset--
assert.Equal(t, offset, c.Num())
}
@ -72,6 +74,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()
c.(*dispatcherManager).tryMerge()
assert.Equal(t, 1, c.Num())
@ -97,6 +105,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()
checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key
paramtable.Get().Save(checkIntervalK, "0.01")
@ -166,7 +180,7 @@ func (suite *SimulationSuite) SetupSuite() {
}
func (suite *SimulationSuite) SetupTest() {
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml-%d-%d", rand.Int(), time.Now().UnixNano())
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml_%d", time.Now().UnixNano())
producer, err := newMockProducer(suite.factory, suite.pchannel)
assert.NoError(suite.T(), err)
suite.producer = producer

View File

@ -210,8 +210,16 @@ func GetAvailablePort() int {
return listener.Addr().(*net.TCPAddr).Port
}
// IsPhysicalChannel checks if the channel is a physical channel
func IsPhysicalChannel(channel string) bool {
return strings.Count(channel, "_") == 1
}
// ToPhysicalChannel get physical channel name from virtual channel name
func ToPhysicalChannel(vchannel string) string {
if IsPhysicalChannel(vchannel) {
return vchannel
}
index := strings.LastIndex(vchannel, "_")
if index < 0 {
return vchannel

View File

@ -174,11 +174,17 @@ func TestCheckPortAvailable(t *testing.T) {
}
func Test_ToPhysicalChannel(t *testing.T) {
assert.Equal(t, "abc", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456"))
assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg"))
assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef"))
channel := "by-dev-rootcoord-dml_3_449883080965365748v0"
for i := 0; i < 10; i++ {
channel = ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_3", channel)
}
}
func Test_ConvertChannelName(t *testing.T) {