mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Deprecated old RootCoord methods in Bulk Load (#19449)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
parent
3fb8e80147
commit
67b95597ed
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -66,19 +67,18 @@ func NewImportFactory(c *Core) ImportFactory {
|
|||||||
|
|
||||||
func GetCollectionNameWithCore(c *Core) GetCollectionNameFunc {
|
func GetCollectionNameWithCore(c *Core) GetCollectionNameFunc {
|
||||||
return func(collID, partitionID UniqueID) (string, string, error) {
|
return func(collID, partitionID UniqueID) (string, string, error) {
|
||||||
colName, err := c.meta.GetCollectionNameByID(collID)
|
colInfo, err := c.meta.GetCollectionByID(c.ctx, collID, typeutil.MaxTimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Core failed to get collection name by id", zap.Int64("ID", collID), zap.Error(err))
|
log.Error("Core failed to get collection name by id", zap.Int64("ID", collID), zap.Error(err))
|
||||||
return "", "", err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
partName, err := c.meta.GetPartitionNameByID(collID, partitionID, 0)
|
partName, err := c.meta.GetPartitionNameByID(collID, partitionID, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Core failed to get partition name by id", zap.Int64("ID", partitionID), zap.Error(err))
|
log.Error("Core failed to get partition name by id", zap.Int64("ID", partitionID), zap.Error(err))
|
||||||
return colName, "", err
|
return colInfo.Name, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return colName, partName, nil
|
return colInfo.Name, partName, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -91,10 +91,9 @@ type IMetaTable interface {
|
|||||||
ListAliasesByID(collID UniqueID) []string
|
ListAliasesByID(collID UniqueID) []string
|
||||||
|
|
||||||
// TODO: better to accept ctx.
|
// TODO: better to accept ctx.
|
||||||
// TODO: should GetCollectionNameByID & GetCollectionIDByName also accept ts?
|
GetCollectionNameByID(collID UniqueID) (string, error) // [Deprecated].
|
||||||
GetCollectionNameByID(collID UniqueID) (string, error) // serve for bulk load.
|
|
||||||
GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) // serve for bulk load.
|
GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) // serve for bulk load.
|
||||||
GetCollectionIDByName(name string) (UniqueID, error) // serve for bulk load.
|
GetCollectionIDByName(name string) (UniqueID, error) // [Deprecated].
|
||||||
GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) // serve for bulk load.
|
GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) // serve for bulk load.
|
||||||
|
|
||||||
// TODO: better to accept ctx.
|
// TODO: better to accept ctx.
|
||||||
@ -580,6 +579,7 @@ func (mt *MetaTable) ListAliasesByID(collID UniqueID) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetCollectionNameByID serve for bulk load. TODO: why this didn't accept ts?
|
// GetCollectionNameByID serve for bulk load. TODO: why this didn't accept ts?
|
||||||
|
// [Deprecated]
|
||||||
func (mt *MetaTable) GetCollectionNameByID(collID UniqueID) (string, error) {
|
func (mt *MetaTable) GetCollectionNameByID(collID UniqueID) (string, error) {
|
||||||
mt.ddLock.RLock()
|
mt.ddLock.RLock()
|
||||||
defer mt.ddLock.RUnlock()
|
defer mt.ddLock.RUnlock()
|
||||||
@ -625,6 +625,7 @@ func (mt *MetaTable) GetPartitionNameByID(collID UniqueID, partitionID UniqueID,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetCollectionIDByName serve for bulk load. TODO: why this didn't accept ts?
|
// GetCollectionIDByName serve for bulk load. TODO: why this didn't accept ts?
|
||||||
|
// [Deprecated]
|
||||||
func (mt *MetaTable) GetCollectionIDByName(name string) (UniqueID, error) {
|
func (mt *MetaTable) GetCollectionIDByName(name string) (UniqueID, error) {
|
||||||
mt.ddLock.RLock()
|
mt.ddLock.RLock()
|
||||||
defer mt.ddLock.RUnlock()
|
defer mt.ddLock.RUnlock()
|
||||||
|
|||||||
@ -1471,14 +1471,15 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get collection/partition ID from collection/partition name.
|
// Get collection/partition ID from collection/partition name.
|
||||||
var cID UniqueID
|
var colInfo *model.Collection
|
||||||
var err error
|
var err error
|
||||||
if cID, err = c.meta.GetCollectionIDByName(req.GetCollectionName()); err != nil {
|
if colInfo, err = c.meta.GetCollectionByName(ctx, req.GetCollectionName(), typeutil.MaxTimestamp); err != nil {
|
||||||
log.Error("failed to find collection ID from its name",
|
log.Error("failed to find collection ID from its name",
|
||||||
zap.String("collection name", req.GetCollectionName()),
|
zap.String("collection name", req.GetCollectionName()),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
cID := colInfo.CollectionID
|
||||||
req.ChannelNames = c.meta.GetCollectionVirtualChannels(cID)
|
req.ChannelNames = c.meta.GetCollectionVirtualChannels(cID)
|
||||||
if req.GetPartitionName() == "" {
|
if req.GetPartitionName() == "" {
|
||||||
req.PartitionName = Params.CommonCfg.DefaultPartitionName
|
req.PartitionName = Params.CommonCfg.DefaultPartitionName
|
||||||
|
|||||||
@ -896,6 +896,9 @@ func TestCore_Import(t *testing.T) {
|
|||||||
meta.GetCollectionIDByNameFunc = func(name string) (UniqueID, error) {
|
meta.GetCollectionIDByNameFunc = func(name string) (UniqueID, error) {
|
||||||
return 0, errors.New("error mock GetCollectionIDByName")
|
return 0, errors.New("error mock GetCollectionIDByName")
|
||||||
}
|
}
|
||||||
|
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
|
||||||
|
return nil, errors.New("collection name not found")
|
||||||
|
}
|
||||||
_, err := c.Import(ctx, &milvuspb.ImportRequest{
|
_, err := c.Import(ctx, &milvuspb.ImportRequest{
|
||||||
CollectionName: "a-bad-name",
|
CollectionName: "a-bad-name",
|
||||||
})
|
})
|
||||||
@ -934,6 +937,10 @@ func TestCore_Import(t *testing.T) {
|
|||||||
meta.GetPartitionByNameFunc = func(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) {
|
meta.GetPartitionByNameFunc = func(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) {
|
||||||
return 101, nil
|
return 101, nil
|
||||||
}
|
}
|
||||||
|
coll := &model.Collection{Name: "a-good-name"}
|
||||||
|
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
|
||||||
|
return coll.Clone(), nil
|
||||||
|
}
|
||||||
_, err := c.Import(ctx, &milvuspb.ImportRequest{
|
_, err := c.Import(ctx, &milvuspb.ImportRequest{
|
||||||
CollectionName: "a-good-name",
|
CollectionName: "a-good-name",
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user