diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index be0a108563..d19d383e49 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -23,17 +23,18 @@ import ( "time" grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client" - "github.com/milvus-io/milvus/internal/types" "github.com/golang/protobuf/proto" cms "github.com/milvus-io/milvus/internal/masterservice" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" ) @@ -132,7 +133,7 @@ func TestGrpcService(t *testing.T) { var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) - core.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + core.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { binlogLock.Lock() defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index d5aea20160..0660113412 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -30,12 +30,14 @@ import ( ms "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" @@ -119,7 +121,7 @@ type Core struct { GetNumRowsReq func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) //call index builder's client to build index, return build id - BuildIndexReq func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) + BuildIndexReq func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) DropIndexReq func(ctx context.Context, indexID typeutil.UniqueID) error //proxy service interface, notify proxy service to drop collection @@ -128,9 +130,6 @@ type Core struct { //query service interface, notify query service to release collection ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error - // put create index task into this chan - indexTaskQueue chan *CreateIndexTask - //dd request scheduler ddReqQueue chan reqTask //dd request will be push into this chan lastDdTimeStamp typeutil.Timestamp @@ -234,9 +233,6 @@ func (c *Core) checkInit() error { if c.InvalidateCollectionMetaCache == nil { return fmt.Errorf("InvalidateCollectionMetaCache is nil") } - if c.indexTaskQueue == nil { - return fmt.Errorf("indexTaskQueue is nil") - } if c.DataNodeSegmentFlushCompletedChan == nil { return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil") } @@ -321,44 +317,23 @@ func (c *Core) startDataServiceSegmentLoop() { } } -//create index loop -func (c *Core) startCreateIndexLoop() { - for { - select { - case <-c.ctx.Done(): - log.Debug("close create index loop") - return - case t, ok := <-c.indexTaskQueue: - if !ok { - log.Debug("index task chan has closed, exit loop") - return - } - if err := t.BuildIndex(); err != nil { - log.Warn("create index failed", zap.String("error", err.Error())) - } else { - log.Debug("create index", zap.String("index name", t.indexName), zap.String("field name", t.fieldSchema.Name), zap.Int64("segment id", t.segmentID)) - } - } - } -} - func (c *Core) startSegmentFlushCompletedLoop() { for { select { case <-c.ctx.Done(): log.Debug("close segment flush completed loop") return - case seg, ok := <-c.DataNodeSegmentFlushCompletedChan: + case segID, ok := <-c.DataNodeSegmentFlushCompletedChan: if !ok { log.Debug("data node segment flush completed chan has closed, exit loop") } - log.Debug("flush segment", zap.Int64("id", seg)) - coll, err := c.MetaTable.GetCollectionBySegmentID(seg) + log.Debug("flush segment", zap.Int64("id", segID)) + coll, err := c.MetaTable.GetCollectionBySegmentID(segID) if err != nil { log.Warn("GetCollectionBySegmentID error", zap.Error(err)) break } - err = c.MetaTable.AddFlushedSegment(seg) + err = c.MetaTable.AddFlushedSegment(segID) if err != nil { log.Warn("AddFlushedSegment error", zap.Error(err)) } @@ -370,18 +345,17 @@ func (c *Core) startSegmentFlushCompletedLoop() { } fieldSch, err := GetFieldSchemaByID(coll, f.FiledID) - if err == nil { - t := &CreateIndexTask{ - ctx: c.ctx, - core: c, - segmentID: seg, - indexName: idxInfo.IndexName, - indexID: idxInfo.IndexID, - fieldSchema: fieldSch, - indexParams: idxInfo.IndexParams, - isFromFlushedChan: true, - } - c.indexTaskQueue <- t + if err != nil { + log.Warn("field schema not found", zap.Int64("field id", f.FiledID)) + continue + } + + if err = c.BuildIndex(segID, fieldSch, idxInfo, true); err != nil { + log.Error("build index fail", zap.String("error", err.Error())) + } else { + log.Debug("build index", zap.String("index name", idxInfo.IndexName), + zap.String("field name", fieldSch.Name), + zap.Int64("segment id", segID)) } } } @@ -746,13 +720,13 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { } func (c *Core) SetIndexService(s types.IndexService) error { - c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{ DataPaths: binlog, - TypeParams: typeParams, - IndexParams: indexParams, - IndexID: indexID, - IndexName: indexName, + TypeParams: field.TypeParams, + IndexParams: idxInfo.IndexParams, + IndexID: idxInfo.IndexID, + IndexName: idxInfo.IndexName, }) if err != nil { return 0, err @@ -803,6 +777,41 @@ func (c *Core) SetQueryService(s types.QueryService) error { return nil } +// BuildIndex will check row num and call build index service +func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) error { + if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) { + return nil + } + rows, err := c.GetNumRowsReq(segID, isFlush) + if err != nil { + return err + } + var bldID typeutil.UniqueID + enableIdx := false + if rows < Params.MinSegmentSizeToEnableIndex { + log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows)) + } else { + binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID) + if err != nil { + return err + } + bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo) + if err != nil { + return err + } + enableIdx = true + } + seg := etcdpb.SegmentIndexInfo{ + SegmentID: segID, + FieldID: field.FieldID, + IndexID: idxInfo.IndexID, + BuildID: bldID, + EnableIndex: enableIdx, + } + err = c.MetaTable.AddIndex(&seg) + return err +} + func (c *Core) Init() error { var initError error = nil c.initOnce.Do(func() { @@ -858,7 +867,6 @@ func (c *Core) Init() error { } c.ddReqQueue = make(chan reqTask, 1024) - c.indexTaskQueue = make(chan *CreateIndexTask, 1024) initError = c.setMsgStreams() }) if initError == nil { @@ -949,7 +957,6 @@ func (c *Core) Start() error { go c.startDdScheduler() go c.startTimeTickLoop() go c.startDataServiceSegmentLoop() - go c.startCreateIndexLoop() go c.startSegmentFlushCompletedLoop() go c.tsLoop() c.stateCode.Store(internalpb.StateCode_Healthy) diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 4101bb9e3e..5d73172b22 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" @@ -1696,7 +1697,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { return 0, nil } err = c.checkInit() @@ -1714,10 +1715,6 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.indexTaskQueue = make(chan *CreateIndexTask) - err = c.checkInit() - assert.NotNil(t, err) - c.DataNodeSegmentFlushCompletedChan = make(chan int64) err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 402a3ac377..3fd3bf0ea0 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -607,47 +607,51 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { return nil } -func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error { +func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - collID, ok := mt.segID2CollID[seg.SegmentID] + collID, ok := mt.segID2CollID[segIdxInfo.SegmentID] if !ok { - return fmt.Errorf("segment id = %d not belong to any collection", seg.SegmentID) + return fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) } collMeta, ok := mt.collID2Meta[collID] if !ok { return fmt.Errorf("collection id = %d not found", collID) } - partID, ok := mt.segID2PartitionID[seg.SegmentID] + partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID] if !ok { - return fmt.Errorf("segment id = %d not belong to any partition", seg.SegmentID) + return fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) } exist := false - for _, i := range collMeta.FieldIndexes { - if i.IndexID == seg.IndexID { + for _, fidx := range collMeta.FieldIndexes { + if fidx.IndexID == segIdxInfo.IndexID { exist = true break } } if !exist { - return fmt.Errorf("index id = %d not found", seg.IndexID) + return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) } - segIdxMap, ok := mt.segID2IndexMeta[seg.SegmentID] + segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] if !ok { - idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{seg.IndexID: *seg} - mt.segID2IndexMeta[seg.SegmentID] = &idxMap + idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo} + mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap } else { - _, ok := (*segIdxMap)[seg.IndexID] + tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID] if ok { - return fmt.Errorf("index id = %d exist", seg.IndexID) + if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { + log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) + return nil + } + return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) } } - (*(mt.segID2IndexMeta[seg.SegmentID]))[seg.IndexID] = *seg - k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, seg.IndexID, partID, seg.SegmentID) - v := proto.MarshalTextString(seg) + (*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo + k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID) + v := proto.MarshalTextString(segIdxInfo) err := mt.client.Save(k, v) if err != nil { @@ -655,8 +659,8 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error { return err } - if _, ok := mt.flushedSegID[seg.SegmentID]; !ok { - mt.flushedSegID[seg.SegmentID] = true + if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok { + mt.flushedSegID[segIdxInfo.SegmentID] = true } return nil diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 542cce6b14..bf442ecebe 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -302,15 +302,23 @@ func TestMetaTable(t *testing.T) { }) t.Run("add segment index", func(t *testing.T) { - seg := pb.SegmentIndexInfo{ + segIdxInfo := pb.SegmentIndexInfo{ SegmentID: segID, FieldID: fieldID, IndexID: 10000, BuildID: 201, } - err := mt.AddIndex(&seg) + err := mt.AddIndex(&segIdxInfo) assert.Nil(t, err) - assert.NotNil(t, mt.AddIndex(&seg)) + + // it's legal to add index twice + err = mt.AddIndex(&segIdxInfo) + assert.Nil(t, err) + + segIdxInfo.BuildID = 202 + err = mt.AddIndex(&segIdxInfo) + assert.NotNil(t, err) + assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID)) }) t.Run("get not indexed segments", func(t *testing.T) { @@ -697,30 +705,30 @@ func TestMetaTable(t *testing.T) { } assert.Nil(t, mt.AddSegment(seg)) - idx := &pb.SegmentIndexInfo{ + segIdxInfo := &pb.SegmentIndexInfo{ SegmentID: 100, FieldID: 110, IndexID: 10001, BuildID: 201, } - err = mt.AddIndex(idx) + err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idx.IndexID)) + assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID)) mt.segID2PartitionID = make(map[int64]int64) - err = mt.AddIndex(idx) + err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", idx.SegmentID)) + assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - err = mt.AddIndex(idx) + err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID)) mt.segID2CollID = make(map[int64]int64) - err = mt.AddIndex(idx) + err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", idx.SegmentID)) + assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)) err = mt.reloadFromKV() assert.Nil(t, err) @@ -730,11 +738,11 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) assert.Nil(t, mt.AddSegment(seg)) - idx.IndexID = 10000 + segIdxInfo.IndexID = 10000 mockKV.save = func(key, value string) error { return fmt.Errorf("save error") } - err = mt.AddIndex(idx) + err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, "save error") }) diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index d6021f52c9..4b8f9dac2c 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -779,19 +779,13 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector { return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)]) } - for _, seg := range segIDs { - task := CreateIndexTask{ - ctx: t.core.ctx, - core: t.core, - segmentID: seg, - indexName: idxInfo.IndexName, - indexID: idxInfo.IndexID, - fieldSchema: &field, - indexParams: t.Req.ExtraParams, - isFromFlushedChan: false, + for _, segID := range segIDs { + if err := t.core.BuildIndex(segID, &field, idxInfo, false); err != nil { + return err } - t.core.indexTaskQueue <- &task - fmt.Println("create index task enqueue, segID = ", seg) + log.Debug("build index", zap.String("index name", idxInfo.IndexName), + zap.String("field name", field.Name), + zap.Int64("segment id", segID)) } return nil } @@ -886,48 +880,3 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) return err } - -type CreateIndexTask struct { - ctx context.Context - core *Core - segmentID typeutil.UniqueID - indexName string - indexID typeutil.UniqueID - fieldSchema *schemapb.FieldSchema - indexParams []*commonpb.KeyValuePair - isFromFlushedChan bool -} - -func (t *CreateIndexTask) BuildIndex() error { - if t.core.MetaTable.IsSegmentIndexed(t.segmentID, t.fieldSchema, t.indexParams) { - return nil - } - rows, err := t.core.GetNumRowsReq(t.segmentID, t.isFromFlushedChan) - if err != nil { - return err - } - var bldID typeutil.UniqueID = 0 - enableIdx := false - if rows < Params.MinSegmentSizeToEnableIndex { - log.Debug("num of is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows)) - } else { - binlogs, err := t.core.GetBinlogFilePathsFromDataServiceReq(t.segmentID, t.fieldSchema.FieldID) - if err != nil { - return err - } - bldID, err = t.core.BuildIndexReq(t.ctx, binlogs, t.fieldSchema.TypeParams, t.indexParams, t.indexID, t.indexName) - if err != nil { - return err - } - enableIdx = true - } - seg := etcdpb.SegmentIndexInfo{ - SegmentID: t.segmentID, - FieldID: t.fieldSchema.FieldID, - IndexID: t.indexID, - BuildID: bldID, - EnableIndex: enableIdx, - } - err = t.core.MetaTable.AddIndex(&seg) - return err -} diff --git a/internal/masterservice/util.go b/internal/masterservice/util.go index 4fbd5be69d..f32dfa35f7 100644 --- a/internal/masterservice/util.go +++ b/internal/masterservice/util.go @@ -84,3 +84,12 @@ func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string } return string(ddOpByte), nil } + +// SegmentIndexInfoEqual return true if 2 SegmentIndexInfo are identical +func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.SegmentIndexInfo) bool { + return info1.SegmentID == info2.SegmentID && + info1.FieldID == info2.FieldID && + info1.IndexID == info2.IndexID && + info1.BuildID == info2.BuildID && + info1.EnableIndex == info2.EnableIndex +}