fix: [AddField] Broadcast update schema even there is no segment (#41780)

Related to #41744

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-05-13 16:02:55 +08:00 committed by GitHub
parent eb1eab8914
commit 186a01eef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -321,7 +321,7 @@ func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest
log.Warn("failed to optimize search params", zap.Error(err)) log.Warn("failed to optimize search params", zap.Error(err))
return nil, err return nil, err
} }
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifySearchRequest) tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifySearchRequest)
if err != nil { if err != nil {
log.Warn("Search organizeSubTask failed", zap.Error(err)) log.Warn("Search organizeSubTask failed", zap.Error(err))
return nil, err return nil, err
@ -508,7 +508,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
zap.Int("sealedNum", len(sealed)), zap.Int("sealedNum", len(sealed)),
zap.Int("growingNum", len(growing)), zap.Int("growingNum", len(growing)),
) )
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifyQueryRequest) tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
if err != nil { if err != nil {
log.Warn("query organizeSubTask failed", zap.Error(err)) log.Warn("query organizeSubTask failed", zap.Error(err))
return err return err
@ -588,7 +588,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
zap.Int("sealedNum", sealedNum), zap.Int("sealedNum", sealedNum),
zap.Int("growingNum", len(growing)), zap.Int("growingNum", len(growing)),
) )
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, sd.modifyQueryRequest) tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
if err != nil { if err != nil {
log.Warn("query organizeSubTask failed", zap.Error(err)) log.Warn("query organizeSubTask failed", zap.Error(err))
return nil, err return nil, err
@ -636,7 +636,7 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta
} }
defer sd.distribution.Unpin(version) defer sd.distribution.Unpin(version)
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest {
nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest) nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest)
nodeReq.GetReq().GetBase().TargetID = targetID nodeReq.GetReq().GetBase().TargetID = targetID
nodeReq.Scope = scope nodeReq.Scope = scope
@ -670,7 +670,14 @@ type subTask[T any] struct {
worker cluster.Worker worker cluster.Worker
} }
func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, growing []SegmentEntry, sd *shardDelegator, modify func(T, querypb.DataScope, []int64, int64) T) ([]subTask[T], error) { func organizeSubTask[T any](ctx context.Context,
req T,
sealed []SnapshotItem,
growing []SegmentEntry,
sd *shardDelegator,
skipEmpty bool,
modify func(T, querypb.DataScope, []int64, int64) T,
) ([]subTask[T], error) {
log := sd.getLogger(ctx) log := sd.getLogger(ctx)
result := make([]subTask[T], 0, len(sealed)+1) result := make([]subTask[T], 0, len(sealed)+1)
@ -678,7 +685,7 @@ func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, g
segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 { segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 {
return item.SegmentID return item.SegmentID
}) })
if len(segmentIDs) == 0 { if skipEmpty && len(segmentIDs) == 0 {
return nil return nil
} }
// update request // update request
@ -874,6 +881,11 @@ func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.Col
} }
defer sd.distribution.Unpin(version) defer sd.distribution.Unpin(version)
log.Info("update schema targets...",
zap.Int("sealedNum", len(sealed)),
zap.Int("growingNum", len(growing)),
)
tasks, err := organizeSubTask(ctx, &querypb.UpdateSchemaRequest{ tasks, err := organizeSubTask(ctx, &querypb.UpdateSchemaRequest{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()), commonpbutil.WithSourceID(paramtable.GetNodeID()),
@ -881,11 +893,16 @@ func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.Col
CollectionID: sd.collectionID, CollectionID: sd.collectionID,
Schema: schema, Schema: schema,
Version: schVersion, Version: schVersion,
}, sealed, growing, sd, func(req *querypb.UpdateSchemaRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.UpdateSchemaRequest { },
nodeReq := typeutil.Clone(req) sealed,
nodeReq.GetBase().TargetID = targetID growing,
return nodeReq sd,
}) false, // don't skip empty
func(req *querypb.UpdateSchemaRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.UpdateSchemaRequest {
nodeReq := typeutil.Clone(req)
nodeReq.GetBase().TargetID = targetID
return nodeReq
})
if err != nil { if err != nil {
return err return err
} }
@ -915,7 +932,7 @@ func (sd *shardDelegator) Close() {
// clean up l0 segment in delete buffer // clean up l0 segment in delete buffer
start := time.Now() start := time.Now()
sd.deleteBuffer.Clear() sd.deleteBuffer.Clear()
log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start))) log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start)))
metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)
metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)