Optimize CreateIndex routine in master service (#5238)

Remove CreateIndexTask struct and indexTaskQueue

Resolves: #5228 

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-05-15 18:08:08 +08:00 committed by GitHub
parent cd11c50932
commit 001795ee70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 121 additions and 146 deletions

View File

@ -23,17 +23,18 @@ import (
"time"
grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
"github.com/milvus-io/milvus/internal/types"
"github.com/golang/protobuf/proto"
cms "github.com/milvus-io/milvus/internal/masterservice"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
@ -132,7 +133,7 @@ func TestGrpcService(t *testing.T) {
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
core.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)

View File

@ -30,12 +30,14 @@ import (
ms "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
@ -119,7 +121,7 @@ type Core struct {
GetNumRowsReq func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
//call index builder's client to build index, return build id
BuildIndexReq func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
BuildIndexReq func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
DropIndexReq func(ctx context.Context, indexID typeutil.UniqueID) error
//proxy service interface, notify proxy service to drop collection
@ -128,9 +130,6 @@ type Core struct {
//query service interface, notify query service to release collection
ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
// put create index task into this chan
indexTaskQueue chan *CreateIndexTask
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
lastDdTimeStamp typeutil.Timestamp
@ -234,9 +233,6 @@ func (c *Core) checkInit() error {
if c.InvalidateCollectionMetaCache == nil {
return fmt.Errorf("InvalidateCollectionMetaCache is nil")
}
if c.indexTaskQueue == nil {
return fmt.Errorf("indexTaskQueue is nil")
}
if c.DataNodeSegmentFlushCompletedChan == nil {
return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil")
}
@ -321,44 +317,23 @@ func (c *Core) startDataServiceSegmentLoop() {
}
}
//create index loop
func (c *Core) startCreateIndexLoop() {
for {
select {
case <-c.ctx.Done():
log.Debug("close create index loop")
return
case t, ok := <-c.indexTaskQueue:
if !ok {
log.Debug("index task chan has closed, exit loop")
return
}
if err := t.BuildIndex(); err != nil {
log.Warn("create index failed", zap.String("error", err.Error()))
} else {
log.Debug("create index", zap.String("index name", t.indexName), zap.String("field name", t.fieldSchema.Name), zap.Int64("segment id", t.segmentID))
}
}
}
}
func (c *Core) startSegmentFlushCompletedLoop() {
for {
select {
case <-c.ctx.Done():
log.Debug("close segment flush completed loop")
return
case seg, ok := <-c.DataNodeSegmentFlushCompletedChan:
case segID, ok := <-c.DataNodeSegmentFlushCompletedChan:
if !ok {
log.Debug("data node segment flush completed chan has closed, exit loop")
}
log.Debug("flush segment", zap.Int64("id", seg))
coll, err := c.MetaTable.GetCollectionBySegmentID(seg)
log.Debug("flush segment", zap.Int64("id", segID))
coll, err := c.MetaTable.GetCollectionBySegmentID(segID)
if err != nil {
log.Warn("GetCollectionBySegmentID error", zap.Error(err))
break
}
err = c.MetaTable.AddFlushedSegment(seg)
err = c.MetaTable.AddFlushedSegment(segID)
if err != nil {
log.Warn("AddFlushedSegment error", zap.Error(err))
}
@ -370,18 +345,17 @@ func (c *Core) startSegmentFlushCompletedLoop() {
}
fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
if err == nil {
t := &CreateIndexTask{
ctx: c.ctx,
core: c,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: fieldSch,
indexParams: idxInfo.IndexParams,
isFromFlushedChan: true,
}
c.indexTaskQueue <- t
if err != nil {
log.Warn("field schema not found", zap.Int64("field id", f.FiledID))
continue
}
if err = c.BuildIndex(segID, fieldSch, idxInfo, true); err != nil {
log.Error("build index fail", zap.String("error", err.Error()))
} else {
log.Debug("build index", zap.String("index name", idxInfo.IndexName),
zap.String("field name", fieldSch.Name),
zap.Int64("segment id", segID))
}
}
}
@ -746,13 +720,13 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
}
func (c *Core) SetIndexService(s types.IndexService) error {
c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
DataPaths: binlog,
TypeParams: typeParams,
IndexParams: indexParams,
IndexID: indexID,
IndexName: indexName,
TypeParams: field.TypeParams,
IndexParams: idxInfo.IndexParams,
IndexID: idxInfo.IndexID,
IndexName: idxInfo.IndexName,
})
if err != nil {
return 0, err
@ -803,6 +777,41 @@ func (c *Core) SetQueryService(s types.QueryService) error {
return nil
}
// BuildIndex will check row num and call build index service
func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) error {
if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) {
return nil
}
rows, err := c.GetNumRowsReq(segID, isFlush)
if err != nil {
return err
}
var bldID typeutil.UniqueID
enableIdx := false
if rows < Params.MinSegmentSizeToEnableIndex {
log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
} else {
binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID)
if err != nil {
return err
}
bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo)
if err != nil {
return err
}
enableIdx = true
}
seg := etcdpb.SegmentIndexInfo{
SegmentID: segID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
BuildID: bldID,
EnableIndex: enableIdx,
}
err = c.MetaTable.AddIndex(&seg)
return err
}
func (c *Core) Init() error {
var initError error = nil
c.initOnce.Do(func() {
@ -858,7 +867,6 @@ func (c *Core) Init() error {
}
c.ddReqQueue = make(chan reqTask, 1024)
c.indexTaskQueue = make(chan *CreateIndexTask, 1024)
initError = c.setMsgStreams()
})
if initError == nil {
@ -949,7 +957,6 @@ func (c *Core) Start() error {
go c.startDdScheduler()
go c.startTimeTickLoop()
go c.startDataServiceSegmentLoop()
go c.startCreateIndexLoop()
go c.startSegmentFlushCompletedLoop()
go c.tsLoop()
c.stateCode.Store(internalpb.StateCode_Healthy)

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
@ -1696,7 +1697,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
return 0, nil
}
err = c.checkInit()
@ -1714,10 +1715,6 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.indexTaskQueue = make(chan *CreateIndexTask)
err = c.checkInit()
assert.NotNil(t, err)
c.DataNodeSegmentFlushCompletedChan = make(chan int64)
err = c.checkInit()
assert.NotNil(t, err)

View File

@ -607,47 +607,51 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
return nil
}
func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error {
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collID, ok := mt.segID2CollID[seg.SegmentID]
collID, ok := mt.segID2CollID[segIdxInfo.SegmentID]
if !ok {
return fmt.Errorf("segment id = %d not belong to any collection", seg.SegmentID)
return fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return fmt.Errorf("collection id = %d not found", collID)
}
partID, ok := mt.segID2PartitionID[seg.SegmentID]
partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID]
if !ok {
return fmt.Errorf("segment id = %d not belong to any partition", seg.SegmentID)
return fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)
}
exist := false
for _, i := range collMeta.FieldIndexes {
if i.IndexID == seg.IndexID {
for _, fidx := range collMeta.FieldIndexes {
if fidx.IndexID == segIdxInfo.IndexID {
exist = true
break
}
}
if !exist {
return fmt.Errorf("index id = %d not found", seg.IndexID)
return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
}
segIdxMap, ok := mt.segID2IndexMeta[seg.SegmentID]
segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
if !ok {
idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{seg.IndexID: *seg}
mt.segID2IndexMeta[seg.SegmentID] = &idxMap
idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap
} else {
_, ok := (*segIdxMap)[seg.IndexID]
tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID]
if ok {
return fmt.Errorf("index id = %d exist", seg.IndexID)
if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
return nil
}
return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
}
}
(*(mt.segID2IndexMeta[seg.SegmentID]))[seg.IndexID] = *seg
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, seg.IndexID, partID, seg.SegmentID)
v := proto.MarshalTextString(seg)
(*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID)
v := proto.MarshalTextString(segIdxInfo)
err := mt.client.Save(k, v)
if err != nil {
@ -655,8 +659,8 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error {
return err
}
if _, ok := mt.flushedSegID[seg.SegmentID]; !ok {
mt.flushedSegID[seg.SegmentID] = true
if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok {
mt.flushedSegID[segIdxInfo.SegmentID] = true
}
return nil

View File

@ -302,15 +302,23 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add segment index", func(t *testing.T) {
seg := pb.SegmentIndexInfo{
segIdxInfo := pb.SegmentIndexInfo{
SegmentID: segID,
FieldID: fieldID,
IndexID: 10000,
BuildID: 201,
}
err := mt.AddIndex(&seg)
err := mt.AddIndex(&segIdxInfo)
assert.Nil(t, err)
assert.NotNil(t, mt.AddIndex(&seg))
// it's legal to add index twice
err = mt.AddIndex(&segIdxInfo)
assert.Nil(t, err)
segIdxInfo.BuildID = 202
err = mt.AddIndex(&segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
})
t.Run("get not indexed segments", func(t *testing.T) {
@ -697,30 +705,30 @@ func TestMetaTable(t *testing.T) {
}
assert.Nil(t, mt.AddSegment(seg))
idx := &pb.SegmentIndexInfo{
segIdxInfo := &pb.SegmentIndexInfo{
SegmentID: 100,
FieldID: 110,
IndexID: 10001,
BuildID: 201,
}
err = mt.AddIndex(idx)
err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idx.IndexID))
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
mt.segID2PartitionID = make(map[int64]int64)
err = mt.AddIndex(idx)
err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", idx.SegmentID))
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID))
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
err = mt.AddIndex(idx)
err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
mt.segID2CollID = make(map[int64]int64)
err = mt.AddIndex(idx)
err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", idx.SegmentID))
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID))
err = mt.reloadFromKV()
assert.Nil(t, err)
@ -730,11 +738,11 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
assert.Nil(t, mt.AddSegment(seg))
idx.IndexID = 10000
segIdxInfo.IndexID = 10000
mockKV.save = func(key, value string) error {
return fmt.Errorf("save error")
}
err = mt.AddIndex(idx)
err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, "save error")
})

View File

@ -779,19 +779,13 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector {
return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)])
}
for _, seg := range segIDs {
task := CreateIndexTask{
ctx: t.core.ctx,
core: t.core,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: &field,
indexParams: t.Req.ExtraParams,
isFromFlushedChan: false,
for _, segID := range segIDs {
if err := t.core.BuildIndex(segID, &field, idxInfo, false); err != nil {
return err
}
t.core.indexTaskQueue <- &task
fmt.Println("create index task enqueue, segID = ", seg)
log.Debug("build index", zap.String("index name", idxInfo.IndexName),
zap.String("field name", field.Name),
zap.Int64("segment id", segID))
}
return nil
}
@ -886,48 +880,3 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
return err
}
type CreateIndexTask struct {
ctx context.Context
core *Core
segmentID typeutil.UniqueID
indexName string
indexID typeutil.UniqueID
fieldSchema *schemapb.FieldSchema
indexParams []*commonpb.KeyValuePair
isFromFlushedChan bool
}
func (t *CreateIndexTask) BuildIndex() error {
if t.core.MetaTable.IsSegmentIndexed(t.segmentID, t.fieldSchema, t.indexParams) {
return nil
}
rows, err := t.core.GetNumRowsReq(t.segmentID, t.isFromFlushedChan)
if err != nil {
return err
}
var bldID typeutil.UniqueID = 0
enableIdx := false
if rows < Params.MinSegmentSizeToEnableIndex {
log.Debug("num of is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
} else {
binlogs, err := t.core.GetBinlogFilePathsFromDataServiceReq(t.segmentID, t.fieldSchema.FieldID)
if err != nil {
return err
}
bldID, err = t.core.BuildIndexReq(t.ctx, binlogs, t.fieldSchema.TypeParams, t.indexParams, t.indexID, t.indexName)
if err != nil {
return err
}
enableIdx = true
}
seg := etcdpb.SegmentIndexInfo{
SegmentID: t.segmentID,
FieldID: t.fieldSchema.FieldID,
IndexID: t.indexID,
BuildID: bldID,
EnableIndex: enableIdx,
}
err = t.core.MetaTable.AddIndex(&seg)
return err
}

View File

@ -84,3 +84,12 @@ func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string
}
return string(ddOpByte), nil
}
// SegmentIndexInfoEqual return true if 2 SegmentIndexInfo are identical
func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.SegmentIndexInfo) bool {
return info1.SegmentID == info2.SegmentID &&
info1.FieldID == info2.FieldID &&
info1.IndexID == info2.IndexID &&
info1.BuildID == info2.BuildID &&
info1.EnableIndex == info2.EnableIndex
}