diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 13b74c3b43..e1fea8985c 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -882,6 +882,39 @@ func (c *Core) BuildIndex(ctx context.Context, segID typeutil.UniqueID, field *s return bldID, nil } +// RemoveIndex will call drop index service +func (c *Core) RemoveIndex(ctx context.Context, collName string, indexName string) error { + _, indexInfos, err := c.MetaTable.GetIndexByName(collName, indexName) + if err != nil { + log.Error("GetIndexByName failed,", zap.String("collection name", collName), + zap.String("index name", indexName), zap.Error(err)) + return err + } + for _, indexInfo := range indexInfos { + if err = c.CallDropIndexService(ctx, indexInfo.IndexID); err != nil { + log.Error("CallDropIndexService failed,", zap.String("collection name", collName), zap.Error(err)) + return err + } + } + return nil +} + +// ExpireMetaCache will call invalidate collection meta cache +func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeutil.Timestamp) { + for _, collName := range collNames { + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ts, + SourceID: c.session.ServerID, + }, + CollectionName: collName, + } + c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + } +} + // Register register rootcoord at etcd func (c *Core) Register() error { c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, Params.EtcdEndpoints) diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 3cb8c90212..e00f4cc3b0 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -22,7 +22,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -296,7 +295,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return fmt.Errorf("encodeDdOperation fail, error = %w", err) } - aliases := t.core.MetaTable.ListAliases(collMeta.ID) + // drop all indices + if err = t.core.RemoveIndex(ctx, t.Req.CollectionName, ""); err != nil { + return err + } // use lambda function here to guarantee all resources to be released dropCollectionFn := func() error { @@ -346,36 +348,14 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { //notify query service to release collection if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil { - log.Error("Failed to CallReleaseCollectionService", zap.String("error", err.Error())) + log.Error("Failed to CallReleaseCollectionService", zap.Error(err)) return err } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - DbName: t.Req.DbName, - CollectionName: t.Req.CollectionName, - } - // error doesn't matter here - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts) - for _, alias := range aliases { - req = proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - DbName: t.Req.DbName, - CollectionName: alias, - } - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) - } + aliases := t.core.MetaTable.ListAliases(collMeta.ID) + t.core.ExpireMetaCache(ctx, aliases, ts) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -566,18 +546,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return err } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - DbName: t.Req.DbName, - CollectionName: t.Req.CollectionName, - } - // error doesn't matter here - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -662,22 +631,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return err } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - DbName: t.Req.DbName, - CollectionName: t.Req.CollectionName, - } - // error doesn't matter here - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts) //notify query service to release partition if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil { - log.Error("Failed to CallReleaseCollectionService", zap.String("error", err.Error())) + log.Error("Failed to CallReleaseCollectionService", zap.Error(err)) return err } @@ -969,22 +927,10 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropIndex { return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - _, info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.IndexName) - if err != nil { - log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err)) + if err := t.core.RemoveIndex(ctx, t.Req.CollectionName, t.Req.IndexName); err != nil { return err } - if len(info) == 0 { - return nil - } - if len(info) != 1 { - return fmt.Errorf("len(index) = %d", len(info)) - } - err = t.core.CallDropIndexService(ctx, info[0].IndexID) - if err != nil { - return err - } - _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) + _, _, err := t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) return err } @@ -1043,17 +989,7 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error { return fmt.Errorf("meta table drop alias failed, error = %w", err) } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - CollectionName: t.Req.Alias, - } - // error doesn't matter here - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts) return nil } @@ -1084,17 +1020,7 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error { return fmt.Errorf("meta table alter alias failed, error = %w", err) } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ts, - SourceID: t.core.session.ServerID, - }, - CollectionName: t.Req.Alias, - } - // error doesn't matter here - t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) + t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts) return nil }