From a2ce70d252565f7c534888356965a4cf5886ecc3 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 4 Nov 2025 22:25:33 +0800 Subject: [PATCH] fix: ddl framework bug patch (#45290) issue: #45080, #45274, #45285 - LoadCollection doesn't ignore the ignorable request, for false field array. - CreatIndex doesn't ignore the ignorable request, for wrong index. - index meta is not thread safe. - lost parameter check of DDL. - DDL Ack scheduler may get stuck and DDL is block until next incoming DDL. - lost parameter checker of ddl --------- Signed-off-by: chyezh --- internal/datacoord/index_meta.go | 11 +++++++++-- internal/datacoord/index_meta_test.go | 16 ++++++++-------- internal/datacoord/index_service.go | 9 +++++++++ internal/querycoordv2/job/job_load.go | 2 +- .../ddl_callbacks_alter_collection_name.go | 6 +++++- .../ddl_callbacks_alter_collection_properties.go | 8 ++++++++ .../rootcoord/ddl_callbacks_alter_database.go | 5 +++-- .../server/broadcaster/ack_callback_scheduler.go | 13 +++++++++++++ .../server/broadcaster/tombstone_scheduler.go | 2 ++ 9 files changed, 58 insertions(+), 14 deletions(-) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index b6181b7e24..8e33036e43 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -53,6 +53,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +var errIndexOperationIgnored = errors.New("index operation ignored") + type indexMeta struct { ctx context.Context catalog metastore.DataCoordCatalog @@ -375,7 +377,7 @@ func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) if req.IndexName == index.IndexName { if req.FieldID == index.FieldID && checkParams(index, req) && /*only check json params when it is json index*/ (!isJson || checkIdenticalJson(index, req)) { - return index.IndexID, nil + return index.IndexID, errIndexOperationIgnored } errMsg := "at most one distinct index is allowed per field" log.Warn(errMsg, @@ -1060,12 +1062,17 @@ func (m *indexMeta) CheckCleanSegmentIndex(buildID UniqueID) (bool, *model.Segme func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState { ret := make(map[int64]map[int64]*indexpb.SegmentIndexState, 0) m.fieldIndexLock.RLock() - fieldIndexes, ok := m.indexes[collectionID] + fieldIndexesMap, ok := m.indexes[collectionID] if !ok { m.fieldIndexLock.RUnlock() return ret } + fieldIndexes := make(map[UniqueID]*model.Index, len(fieldIndexesMap)) + for id, index := range fieldIndexesMap { + fieldIndexes[id] = index + } m.fieldIndexLock.RUnlock() + for _, segID := range segmentIDs { ret[segID] = make(map[int64]*indexpb.SegmentIndexState) segIndexInfos, ok := m.segmentIndexes.Get(segID) diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 89c13ac322..7132b087b9 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -134,8 +134,8 @@ func TestMeta_ScalarAutoIndex(t *testing.T) { }, } tmpIndexID, err := m.CanCreateIndex(req, false) - assert.NoError(t, err) - assert.Equal(t, int64(indexID), tmpIndexID) + assert.ErrorIs(t, err, errIndexOperationIgnored) + assert.Zero(t, tmpIndexID) }) t.Run("user index params not consistent", func(t *testing.T) { @@ -203,8 +203,8 @@ func TestMeta_ScalarAutoIndex(t *testing.T) { }, } tmpIndexID, err := m.CanCreateIndex(req, false) - assert.NoError(t, err) - assert.Equal(t, int64(indexID), tmpIndexID) + assert.ErrorIs(t, err, errIndexOperationIgnored) + assert.Zero(t, tmpIndexID) newIndexParams := req.GetIndexParams() assert.Equal(t, len(newIndexParams), 1) assert.Equal(t, newIndexParams[0].Key, common.IndexTypeKey) @@ -287,8 +287,8 @@ func TestMeta_CanCreateIndex(t *testing.T) { assert.NoError(t, err) tmpIndexID, err = m.CanCreateIndex(req, false) - assert.NoError(t, err) - assert.Equal(t, indexID, tmpIndexID) + assert.ErrorIs(t, err, errIndexOperationIgnored) + assert.Zero(t, tmpIndexID) }) t.Run("params not consistent", func(t *testing.T) { @@ -326,8 +326,8 @@ func TestMeta_CanCreateIndex(t *testing.T) { req.UserIndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "AUTOINDEX"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserAutoindexMetricTypeSpecified = false tmpIndexID, err = m.CanCreateIndex(req, false) - assert.NoError(t, err) - assert.Equal(t, indexID, tmpIndexID) + assert.ErrorIs(t, err, errIndexOperationIgnored) + assert.Zero(t, tmpIndexID) // req should follow the meta assert.Equal(t, "L2", req.GetUserIndexParams()[1].Value) assert.Equal(t, "L2", req.GetIndexParams()[1].Value) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 1071381b35..3fc48aaa95 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -21,6 +21,7 @@ import ( "fmt" "math" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -211,6 +212,14 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques indexID, err := s.meta.indexMeta.CanCreateIndex(req, isJson) if err != nil { + if errors.Is(err, errIndexOperationIgnored) { + log.Info("index already exists", + zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64("fieldID", req.GetFieldID()), + zap.String("indexName", req.GetIndexName())) + metrics.IndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc() + return merr.Success(), nil + } log.Error("Check CanCreateIndex fail", zap.Error(err)) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index dae75ab6ec..5f1fd29495 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -101,7 +101,7 @@ func (job *LoadCollectionJob) Execute() error { // 2. put load info meta fieldIndexIDs := make(map[int64]int64, len(req.GetLoadFields())) - fieldIDs := make([]int64, len(req.GetLoadFields())) + fieldIDs := make([]int64, 0, len(req.GetLoadFields())) for _, loadField := range req.GetLoadFields() { if loadField.GetIndexId() != 0 { fieldIndexIDs[loadField.GetFieldId()] = loadField.GetIndexId() diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_name.go b/internal/rootcoord/ddl_callbacks_alter_collection_name.go index 1acdde532b..3a025ca72f 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_name.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_name.go @@ -104,8 +104,12 @@ func (c *Core) broadcastAlterCollectionForRenameCollection(ctx context.Context, } func (c *Core) validateEncryption(ctx context.Context, oldDBName string, newDBName string) error { - // old and new DB names are filled in Prepare, shouldn't be empty here + if oldDBName == newDBName { + return nil + } + // Check if renaming across databases with encryption enabled + // old and new DB names are filled in Prepare, shouldn't be empty here originalDB, err := c.meta.GetDatabaseByName(ctx, oldDBName, typeutil.MaxTimestamp) if err != nil { return fmt.Errorf("failed to get original database: %w", err) diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go index 42e7d1e4ee..3355ddddd5 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go @@ -26,6 +26,10 @@ import ( // broadcastAlterCollectionForAlterCollection broadcasts the put collection message for alter collection. func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + if req.GetCollectionName() == "" { + return merr.WrapErrParameterInvalidMsg("alter collection failed, collection name does not exists") + } + if len(req.GetProperties()) == 0 && len(req.GetDeleteKeys()) == 0 { return merr.WrapErrParameterInvalidMsg("no properties or delete keys provided") } @@ -38,6 +42,10 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r return merr.WrapErrParameterInvalidMsg("can not alter cipher related properties") } + if funcutil.SliceContain(req.GetDeleteKeys(), common.EnableDynamicSchemaKey) { + return merr.WrapErrParameterInvalidMsg("cannot delete key %s, dynamic field schema could support set to true/false", common.EnableDynamicSchemaKey) + } + // Validate timezone tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, req.GetProperties()) if exist && !funcutil.IsTimezoneValid(tz) { diff --git a/internal/rootcoord/ddl_callbacks_alter_database.go b/internal/rootcoord/ddl_callbacks_alter_database.go index 81c2b63d9d..6a3ba1b12d 100644 --- a/internal/rootcoord/ddl_callbacks_alter_database.go +++ b/internal/rootcoord/ddl_callbacks_alter_database.go @@ -18,7 +18,6 @@ package rootcoord import ( "context" - "strings" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -39,7 +38,9 @@ import ( ) func (c *Core) broadcastAlterDatabase(ctx context.Context, req *rootcoordpb.AlterDatabaseRequest) error { - req.DbName = strings.TrimSpace(req.DbName) + if req.GetDbName() == "" { + return merr.WrapErrParameterInvalidMsg("alter database failed, database name does not exists") + } if req.GetProperties() == nil && req.GetDeleteKeys() == nil { return merr.WrapErrParameterInvalidMsg("alter database with empty properties and delete keys, expected to set either properties or delete keys") } diff --git a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go index f7b0a3e6ae..f96b220975 100644 --- a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go +++ b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go @@ -83,14 +83,27 @@ func (s *ackCallbackScheduler) background() { }() s.Logger().Info("ack scheduler background start") + // it's weired to find that FastLock may be failure even if there's no resource-key locked, + // also see: #45285 + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + var triggerTicker <-chan time.Time for { s.triggerAckCallback() + if len(s.pendingAckedTasks) > 0 { + // if there's pending tasks, trigger the ack callback after a delay. + triggerTicker = ticker.C + } else { + triggerTicker = nil + } select { case <-s.notifier.Context().Done(): return case task := <-s.pending: s.addBroadcastTask(task) case <-s.triggerChan: + case <-triggerTicker: } } } diff --git a/internal/streamingcoord/server/broadcaster/tombstone_scheduler.go b/internal/streamingcoord/server/broadcaster/tombstone_scheduler.go index efa42aea12..e86e162afa 100644 --- a/internal/streamingcoord/server/broadcaster/tombstone_scheduler.go +++ b/internal/streamingcoord/server/broadcaster/tombstone_scheduler.go @@ -121,4 +121,6 @@ func (s *tombstoneScheduler) triggerGCTombstone() { return } } + // all the tombstones are dropped, reset the tombstones. + s.tombstones = make([]tombstoneItem, 0) }