mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Sync the sealed and flushed segments to datanode (#34301)
issue: #33696 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
b32dae1883
commit
4cf1a358ba
@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool {
|
||||
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
|
||||
}
|
||||
|
||||
func needSync(segment *SegmentInfo) bool {
|
||||
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed
|
||||
}
|
||||
|
||||
// buckets will be updated inplace
|
||||
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
|
||||
for i := len(small) - 1; i >= 0; i-- {
|
||||
|
||||
@ -98,7 +98,7 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
|
||||
continue
|
||||
}
|
||||
for _, partitionID := range collInfo.Partitions {
|
||||
if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
|
||||
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
|
||||
log.Warn("sync segment with channel failed, retry next ticker",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
@ -111,11 +111,14 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
|
||||
}
|
||||
}
|
||||
|
||||
func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
|
||||
func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
|
||||
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
|
||||
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
|
||||
// sync all healthy segments, but only check flushed segments on datanode. Because L0 growing segments may not in datacoord's meta.
|
||||
// upon receiving the SyncSegments request, the datanode's segment state may have already transitioned from Growing/Flushing
|
||||
// to Flushed, so the view must include this segment.
|
||||
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
|
||||
return info.GetPartitionID() == partitionID && isFlush(info)
|
||||
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
|
||||
}))
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: channelName,
|
||||
|
||||
@ -291,8 +291,10 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64,
|
||||
}
|
||||
|
||||
for segID, info := range c.segmentInfos {
|
||||
if info.partitionID != partitionID ||
|
||||
(info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) {
|
||||
// only check flushed segments
|
||||
// 1. flushing may be compacted on datacoord
|
||||
// 2. growing may doesn't have stats log, it won't include in sync views
|
||||
if info.partitionID != partitionID || info.state != commonpb.SegmentState_Flushed {
|
||||
continue
|
||||
}
|
||||
if _, ok := allSegments[segID]; !ok {
|
||||
|
||||
@ -684,6 +684,349 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
|
||||
s.Run("dc growing/flushing dn flushed", func() {
|
||||
s.SetupTest()
|
||||
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
Vchan: &datapb.VchannelInfo{},
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 100,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
|
||||
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
|
||||
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
|
||||
s.node.flowgraphManager = mockFlowgraphManager
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
100: {
|
||||
SegmentId: 100,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Growing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
101: {
|
||||
SegmentId: 101,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(status))
|
||||
|
||||
info, exist := cache.GetSegmentByID(100)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(101)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
|
||||
s.Run("dc flushed dn growing/flushing", func() {
|
||||
s.SetupTest()
|
||||
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
Vchan: &datapb.VchannelInfo{},
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 100,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Growing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
|
||||
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
|
||||
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
|
||||
s.node.flowgraphManager = mockFlowgraphManager
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
100: {
|
||||
SegmentId: 100,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
101: {
|
||||
SegmentId: 101,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(status))
|
||||
|
||||
info, exist := cache.GetSegmentByID(100)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(101)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
|
||||
s.Run("dc dropped dn growing/flushing", func() {
|
||||
s.SetupTest()
|
||||
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
Vchan: &datapb.VchannelInfo{},
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 100,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Growing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 102,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
|
||||
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
|
||||
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
|
||||
s.node.flowgraphManager = mockFlowgraphManager
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
102: {
|
||||
SegmentId: 102,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(status))
|
||||
|
||||
info, exist := cache.GetSegmentByID(100)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(101)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(102)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
|
||||
s.Run("dc dropped dn flushed", func() {
|
||||
s.SetupTest()
|
||||
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
Vchan: &datapb.VchannelInfo{},
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 100,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushing,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
|
||||
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
|
||||
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
|
||||
s.node.flowgraphManager = mockFlowgraphManager
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
102: {
|
||||
SegmentId: 102,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1025,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(status))
|
||||
|
||||
info, exist := cache.GetSegmentByID(100)
|
||||
s.False(exist)
|
||||
s.Nil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(101)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(102)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user