diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 4d83e0293c..ff226a6399 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -19,6 +19,7 @@ enum SegmentType { Invalid = 0, Growing = 1, Sealed = 2, + Indexing = 3, }; typedef enum SegmentType SegmentType; diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 84d10709cc..e972237447 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -36,6 +36,7 @@ NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type) { segment = milvus::segcore::CreateGrowingSegment(col->get_schema()); break; case Sealed: + case Indexing: segment = milvus::segcore::CreateSealedSegment(col->get_schema()); break; default: diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 3c45dc7a86..a8957bcaef 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -67,6 +67,7 @@ type collectionReplica interface { getSegmentByID(segmentID UniqueID) (*Segment, error) hasSegment(segmentID UniqueID) bool getVecFieldsBySegmentID(segmentID UniqueID) (map[int64]string, error) + getSealedSegments() ([]UniqueID, []UniqueID) freeAll() } @@ -425,11 +426,11 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S SegmentID: segmentID, MemorySize: currentMemSize, NumRows: segmentNumOfRows, - RecentlyModified: segment.GetRecentlyModified(), + RecentlyModified: segment.getRecentlyModified(), } statisticData = append(statisticData, &stat) - segment.SetRecentlyModified(false) + segment.setRecentlyModified(false) } return statisticData @@ -560,6 +561,22 @@ func (colReplica *collectionReplicaImpl) getVecFieldsBySegmentID(segmentID Uniqu return vecFields, nil } +func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + collectionIDs := make([]UniqueID, 0) + segmentIDs := make([]UniqueID, 0) + for k, v := range colReplica.segments { + if v.getType() == segTypeSealed { + collectionIDs = append(collectionIDs, v.collectionID) + segmentIDs = append(segmentIDs, k) + } + } + + return collectionIDs, segmentIDs +} + //----------------------------------------------------------------------------------------------------- func (colReplica *collectionReplicaImpl) freeAll() { colReplica.mu.Lock() diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_service.go similarity index 75% rename from internal/querynode/load_index_service.go rename to internal/querynode/load_service.go index 8c95915bba..8f40b4ed30 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_service.go @@ -20,12 +20,15 @@ import ( "github.com/zilliztech/milvus-distributed/internal/storage" ) -type loadIndexService struct { +const indexCheckInterval = 1 + +type loadService struct { ctx context.Context cancel context.CancelFunc client *minioKV.MinIOKV - replica collectionReplica + queryNodeID UniqueID + replica collectionReplica fieldIndexes map[string][]*internalpb2.IndexStats fieldStatsChan chan []*internalpb2.FieldStats @@ -33,63 +36,10 @@ type loadIndexService struct { loadIndexReqChan chan []msgstream.TsMsg loadIndexMsgStream msgstream.MsgStream - queryNodeID UniqueID + segManager *segmentManager } -func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIndexService { - ctx1, cancel := context.WithCancel(ctx) - - option := &minioKV.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - CreateBucket: true, - BucketName: Params.MinioBucketName, - } - - // TODO: load bucketName from config - MinioKV, err := minioKV.NewMinIOKV(ctx1, option) - if err != nil { - panic(err) - } - - // init msgStream - receiveBufSize := Params.LoadIndexReceiveBufSize - pulsarBufSize := Params.LoadIndexPulsarBufSize - - msgStreamURL := Params.PulsarAddress - - consumeChannels := Params.LoadIndexChannelNames - consumeSubName := Params.MsgChannelSubName - - loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) - loadIndexStream.SetPulsarClient(msgStreamURL) - unmarshalDispatcher := util.NewUnmarshalDispatcher() - loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) - - var stream msgstream.MsgStream = loadIndexStream - - // init index load requests channel size by message receive buffer size - indexLoadChanSize := receiveBufSize - - return &loadIndexService{ - ctx: ctx1, - cancel: cancel, - client: MinioKV, - - replica: replica, - fieldIndexes: make(map[string][]*internalpb2.IndexStats), - fieldStatsChan: make(chan []*internalpb2.FieldStats, 1), - - loadIndexReqChan: make(chan []msgstream.TsMsg, indexLoadChanSize), - loadIndexMsgStream: stream, - - queryNodeID: Params.QueryNodeID, - } -} - -func (lis *loadIndexService) consume() { +func (lis *loadService) consume() { for { select { case <-lis.ctx.Done(): @@ -105,9 +55,37 @@ func (lis *loadIndexService) consume() { } } -func (lis *loadIndexService) start() { +func (lis *loadService) indexListener() { + for { + select { + case <-lis.ctx.Done(): + return + case <-time.After(indexCheckInterval * time.Second): + collectionIDs, segmentIDs := lis.replica.getSealedSegments() + for i := range collectionIDs { + // we don't need index id yet + _, buildID, err := lis.segManager.getIndexInfo(collectionIDs[i], segmentIDs[i]) + if err != nil { + indexPaths, err := lis.segManager.getIndexPaths(buildID) + if err != nil { + log.Println(err) + continue + } + err = lis.segManager.loadIndex(segmentIDs[i], indexPaths) + if err != nil { + log.Println(err) + continue + } + } + } + } + } +} + +func (lis *loadService) start() { lis.loadIndexMsgStream.Start() go lis.consume() + go lis.indexListener() for { select { @@ -132,7 +110,7 @@ func (lis *loadIndexService) start() { } } -func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { +func (lis *loadService) execute(msg msgstream.TsMsg) error { indexMsg, ok := msg.(*msgstream.LoadIndexMsg) if !ok { return errors.New("type assertion failed for LoadIndexMsg") @@ -174,21 +152,21 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { return nil } -func (lis *loadIndexService) close() { +func (lis *loadService) close() { if lis.loadIndexMsgStream != nil { lis.loadIndexMsgStream.Close() } lis.cancel() } -func (lis *loadIndexService) printIndexParams(index []*commonpb.KeyValuePair) { +func (lis *loadService) printIndexParams(index []*commonpb.KeyValuePair) { fmt.Println("=================================================") for i := 0; i < len(index); i++ { fmt.Println(index[i]) } } -func (lis *loadIndexService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { +func (lis *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { if len(index1) != len(index2) { return false } @@ -204,11 +182,11 @@ func (lis *loadIndexService) indexParamsEqual(index1 []*commonpb.KeyValuePair, i return true } -func (lis *loadIndexService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { +func (lis *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) } -func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { +func (lis *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { ids := strings.Split(key, "/") if len(ids) != 2 { return 0, 0, errors.New("illegal fieldsStatsKey") @@ -224,7 +202,7 @@ func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, return collectionID, fieldID, nil } -func (lis *loadIndexService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error { +func (lis *loadService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error { targetSegment, err := lis.replica.getSegmentByID(indexMsg.SegmentID) if err != nil { return err @@ -265,12 +243,10 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexParams indexParam, ind }) } } - targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams) - - return nil + return targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams) } -func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, indexParam, error) { +func (lis *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error) { index := make([][]byte, 0) var indexParams indexParam @@ -303,7 +279,7 @@ func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, indexParam return index, indexParams, nil } -func (lis *loadIndexService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error { +func (lis *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error { segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID) if err != nil { return err @@ -328,15 +304,10 @@ func (lis *loadIndexService) updateSegmentIndex(indexParams indexParam, bytesInd if err != nil { return err } - err = segment.updateSegmentIndex(loadIndexInfo) - if err != nil { - return err - } - - return nil + return segment.updateSegmentIndex(loadIndexInfo) } -func (lis *loadIndexService) sendQueryNodeStats() error { +func (lis *loadService) sendQueryNodeStats() error { resultFieldsStats := make([]*internalpb2.FieldStats, 0) for fieldStatsKey, indexStats := range lis.fieldIndexes { colID, fieldID, err := lis.fieldsStatsKey2IDs(fieldStatsKey) @@ -356,7 +327,7 @@ func (lis *loadIndexService) sendQueryNodeStats() error { return nil } -func (lis *loadIndexService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) { +func (lis *loadService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) { segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID) if err != nil { return false, err @@ -367,3 +338,60 @@ func (lis *loadIndexService) checkIndexReady(indexParams indexParam, loadIndexMs return true, nil } + +func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService { + ctx1, cancel := context.WithCancel(ctx) + + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + CreateBucket: true, + BucketName: Params.MinioBucketName, + } + + MinioKV, err := minioKV.NewMinIOKV(ctx1, option) + if err != nil { + panic(err) + } + + // init msgStream + receiveBufSize := Params.LoadIndexReceiveBufSize + pulsarBufSize := Params.LoadIndexPulsarBufSize + + msgStreamURL := Params.PulsarAddress + + consumeChannels := Params.LoadIndexChannelNames + consumeSubName := Params.MsgChannelSubName + + loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) + loadIndexStream.SetPulsarClient(msgStreamURL) + unmarshalDispatcher := util.NewUnmarshalDispatcher() + loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + var stream msgstream.MsgStream = loadIndexStream + + // init index load requests channel size by message receive buffer size + indexLoadChanSize := receiveBufSize + + // init segment manager + loadIndexReqChan := make(chan []msgstream.TsMsg, indexLoadChanSize) + manager := newSegmentManager(ctx1, masterClient, dataClient, indexClient, replica, dmStream, loadIndexReqChan) + + return &loadService{ + ctx: ctx1, + cancel: cancel, + client: MinioKV, + + replica: replica, + queryNodeID: Params.QueryNodeID, + fieldIndexes: make(map[string][]*internalpb2.IndexStats), + fieldStatsChan: make(chan []*internalpb2.FieldStats, 1), + + loadIndexReqChan: loadIndexReqChan, + loadIndexMsgStream: stream, + + segManager: manager, + } +} diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_service_test.go similarity index 98% rename from internal/querynode/load_index_service_test.go rename to internal/querynode/load_service_test.go index c2377f416b..2c63746e75 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_service_test.go @@ -25,13 +25,13 @@ import ( "github.com/zilliztech/milvus-distributed/internal/storage" ) -func TestLoadIndexService_FloatVector(t *testing.T) { +func TestLoadService_LoadIndex_FloatVector(t *testing.T) { node := newQueryNodeMock() collectionID := rand.Int63n(1000000) segmentID := rand.Int63n(1000000) initTestMeta(t, node, "collection0", collectionID, segmentID) - // loadIndexService and statsService + // loadService and statsService suffix := "-test-search" + strconv.FormatInt(rand.Int63n(1000000), 10) oldSearchChannelNames := Params.SearchChannelNames newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix) @@ -321,7 +321,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) { params := indexStats0.IndexParams // sort index params by key sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key }) - indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV) + indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV) assert.Equal(t, indexEqual, true) } @@ -354,13 +354,13 @@ func TestLoadIndexService_FloatVector(t *testing.T) { node.Stop() } -func TestLoadIndexService_BinaryVector(t *testing.T) { +func TestLoadService_LoadIndex_BinaryVector(t *testing.T) { node := newQueryNodeMock() collectionID := rand.Int63n(1000000) segmentID := rand.Int63n(1000000) initTestMeta(t, node, "collection0", collectionID, segmentID, true) - // loadIndexService and statsService + // loadService and statsService suffix := "-test-search-binary" + strconv.FormatInt(rand.Int63n(1000000), 10) oldSearchChannelNames := Params.SearchChannelNames newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix) @@ -640,7 +640,7 @@ func TestLoadIndexService_BinaryVector(t *testing.T) { params := indexStats0.IndexParams // sort index params by key sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key }) - indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV) + indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV) assert.Equal(t, indexEqual, true) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index a9cc453234..edc553e1fc 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -56,13 +56,11 @@ type QueryNode struct { replica collectionReplica // internal services - dataSyncService *dataSyncService - metaService *metaService - searchService *searchService - loadIndexService *loadIndexService - statsService *statsService - - segManager *segmentManager + dataSyncService *dataSyncService + metaService *metaService + searchService *searchService + loadService *loadService + statsService *statsService //opentracing tracer opentracing.Tracer @@ -86,7 +84,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { metaService: nil, searchService: nil, statsService: nil, - segManager: nil, } var err error @@ -128,7 +125,6 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { metaService: nil, searchService: nil, statsService: nil, - segManager: nil, } var err error @@ -167,10 +163,6 @@ func Init() { func (node *QueryNode) Init() error { Params.Init() - return nil -} - -func (node *QueryNode) Start() error { registerReq := &queryPb.RegisterNodeRequest{ Address: &commonpb.Address{ Ip: Params.QueryNodeIP, @@ -189,6 +181,10 @@ func (node *QueryNode) Start() error { Params.QueryNodeID = response.InitParams.NodeID fmt.Println("QueryNodeID is", Params.QueryNodeID) + if node.masterClient == nil { + log.Println("WARN: null master service detected") + } + if node.indexClient == nil { log.Println("WARN: null index service detected") } @@ -197,20 +193,22 @@ func (node *QueryNode) Start() error { log.Println("WARN: null data service detected") } - // todo add connectMaster logic + return nil +} + +func (node *QueryNode) Start() error { // init services and manager node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan) - node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan) + node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.fieldStatsChan) // start services go node.dataSyncService.start() go node.searchService.start() go node.metaService.start() - go node.loadIndexService.start() + go node.loadService.start() go node.statsService.start() node.stateCode.Store(internalpb2.StateCode_HEALTHY) @@ -232,8 +230,8 @@ func (node *QueryNode) Stop() error { if node.searchService != nil { node.searchService.close() } - if node.loadIndexService != nil { - node.loadIndexService.close() + if node.loadService != nil { + node.loadService.close() } if node.statsService != nil { node.statsService.close() @@ -457,7 +455,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing { segmentNum := len(segmentIDs) positions := in.LastSegmentState.StartPositions - err = node.segManager.seekSegment(positions) + err = node.loadService.segManager.seekSegment(positions) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -468,7 +466,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S segmentIDs = segmentIDs[:segmentNum-1] } - err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) + err = node.loadService.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -493,7 +491,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm // release all fields in the segments for _, id := range in.SegmentIDs { - err := node.segManager.releaseSegment(id) + err := node.loadService.segManager.releaseSegment(id) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index c617f8f277..56da38bff0 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -23,17 +23,18 @@ import ( ) const ( - segTypeInvalid = C.Invalid - segTypeGrowing = C.Growing - segTypeSealed = C.Sealed + segTypeInvalid = C.Invalid + segTypeGrowing = C.Growing + segTypeSealed = C.Sealed + segTypeIndexing = C.Indexing ) type segmentType = C.SegmentType type indexParam = map[string]string type Segment struct { - segmentPtr C.CSegmentInterface - segmentType C.SegmentType + segmentPtr C.CSegmentInterface + segmentID UniqueID partitionTag string // TODO: use partitionID partitionID UniqueID @@ -44,6 +45,9 @@ type Segment struct { rmMutex sync.Mutex // guards recentlyModified recentlyModified bool + typeMu sync.Mutex // guards builtIndex + segmentType C.SegmentType + paramMutex sync.RWMutex // guards indexParam indexParam map[int64]indexParam } @@ -53,22 +57,30 @@ func (s *Segment) ID() UniqueID { return s.segmentID } -func (s *Segment) Type() segmentType { - return s.segmentType -} - -func (s *Segment) SetRecentlyModified(modify bool) { +func (s *Segment) setRecentlyModified(modify bool) { s.rmMutex.Lock() defer s.rmMutex.Unlock() s.recentlyModified = modify } -func (s *Segment) GetRecentlyModified() bool { +func (s *Segment) getRecentlyModified() bool { s.rmMutex.Lock() defer s.rmMutex.Unlock() return s.recentlyModified } +func (s *Segment) setType(segType segmentType) { + s.typeMu.Lock() + defer s.typeMu.Unlock() + s.segmentType = segType +} + +func (s *Segment) getType() segmentType { + s.typeMu.Lock() + defer s.typeMu.Unlock() + return s.segmentType +} + func newSegment2(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID, segType segmentType) *Segment { /* CSegmentInterface @@ -195,7 +207,7 @@ func (s *Segment) fillTargetEntry(plan *Plan, return nil } -// segment, err := loadIndexService.replica.getSegmentByID(segmentID) +// segment, err := loadService.replica.getSegmentByID(segmentID) func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error { var status C.CStatus @@ -215,6 +227,8 @@ func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error { return errors.New("updateSegmentIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } + s.setType(segTypeIndexing) + return nil } @@ -324,7 +338,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } - s.SetRecentlyModified(true) + s.setRecentlyModified(true) return nil } diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go index 7a7f29b574..1e17d18580 100644 --- a/internal/querynode/segment_manager.go +++ b/internal/querynode/segment_manager.go @@ -41,7 +41,6 @@ func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error return nil } -//TODO, index params func (s *segmentManager) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) { req := &milvuspb.DescribeSegmentRequest{ Base: &commonpb.MsgBase{ @@ -73,7 +72,7 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID // we don't need index id yet _, buildID, err := s.getIndexInfo(collectionID, segmentID) if err == nil { - // we don't need load vector fields + // we don't need load to vector fields vectorFields, err := s.replica.getVecFieldsBySegmentID(segmentID) if err != nil { return err @@ -229,11 +228,11 @@ func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields numRows = fieldData.NumRows data = fieldData.Data case *storage.FloatVectorFieldData: - // segment to be loaded doesn't need vector field, - // so we ignore the type of vector field data - continue + numRows = fieldData.NumRows + data = fieldData.Data case *storage.BinaryVectorFieldData: - continue + numRows = fieldData.NumRows + data = fieldData.Data default: return errors.New("unexpected field data type") } @@ -282,7 +281,7 @@ func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string) erro return err } for id, name := range vecFieldIDs { - // non-blocking send + // non-blocking sending go s.sendLoadIndex(indexPaths, segmentID, id, name) } diff --git a/internal/querynode/segment_manager_test.go b/internal/querynode/segment_manager_test.go index 56ec50bb10..02afa508ad 100644 --- a/internal/querynode/segment_manager_test.go +++ b/internal/querynode/segment_manager_test.go @@ -406,9 +406,8 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { defer node.Stop() ctx := node.queryNodeLoopCtx - node.loadIndexService = newLoadIndexService(ctx, node.replica) - node.segManager = newSegmentManager(ctx, nil, nil, nil, node.replica, nil, node.loadIndexService.loadIndexReqChan) - go node.loadIndexService.start() + node.loadService = newLoadService(ctx, nil, nil, nil, node.replica, nil) + go node.loadService.start() collectionName := "collection0" initTestMeta(t, node, collectionName, collectionID, 0) @@ -422,16 +421,16 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) - fieldsMap := node.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs) + fieldsMap := node.loadService.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs) assert.Equal(t, len(fieldsMap), 2) - err = node.segManager.loadSegmentFieldsData(segmentID, fieldsMap) + err = node.loadService.segManager.loadSegmentFieldsData(segmentID, fieldsMap) assert.NoError(t, err) indexPaths, err := generateIndex(segmentID) assert.NoError(t, err) - err = node.segManager.loadIndex(segmentID, indexPaths) + err = node.loadService.segManager.loadIndex(segmentID, indexPaths) assert.NoError(t, err) // do search