diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 80034c7261..0775785d30 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -54,7 +54,7 @@ type ReplicaInterface interface { hasCollection(collectionID UniqueID) bool getCollectionNum() int getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) - getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) + getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) // partition addPartition(collectionID UniqueID, partitionID UniqueID) error @@ -93,8 +93,6 @@ type collectionReplica struct { partitions map[UniqueID]*Partition segments map[UniqueID]*Segment - loadType - excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs etcdKV *etcdkv.EtcdKV @@ -203,7 +201,7 @@ func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]U return collection.partitionIDs, nil } -func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { +func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -212,7 +210,7 @@ func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID U return nil, err } - vecFields := make([]int64, 0) + vecFields := make([]FieldID, 0) for _, field := range fields { if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector { vecFields = append(vecFields, field.FieldID) diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index 95ffad84a8..eaac572682 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -255,38 +255,6 @@ func TestCollectionReplica_freeAll(t *testing.T) { assert.NoError(t, err) } -//func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) { -// node := newQueryNodeMock() -// collectionID := UniqueID(0) -// segmentID := UniqueID(520) -// initTestMeta(t, node, collectionID, segmentID) -// -// _, _, segIDs := node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing) -// assert.Equal(t, len(segIDs), 1) -// -// collection, err := node.historical.replica.getCollectionByID(collectionID) -// assert.NoError(t, err) -// ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeSealed, true) -// err = node.historical.replica.replaceGrowingSegmentBySealedSegment(ns) -// assert.NoError(t, err) -// -// segmentNums := node.historical.replica.getSegmentNum() -// assert.Equal(t, segmentNums, 1) -// -// segment, err := node.historical.replica.getSegmentByID(segmentID) -// assert.NoError(t, err) -// -// assert.Equal(t, segment.getType(), segmentTypeSealed) -// -// _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing) -// assert.Equal(t, len(segIDs), 0) -// _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeSealed) -// assert.Equal(t, len(segIDs), 1) -// -// err = node.Stop() -// assert.NoError(t, err) -//} - func TestCollectionReplica_statistic(t *testing.T) { t.Run("test getCollectionIDs", func(t *testing.T) { replica, err := genSimpleReplica() diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 15c63c6897..6bc55d862e 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -65,14 +65,14 @@ func TestDataSyncService_Start(t *testing.T) { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Insert, MsgID: 0, - Timestamp: uint64(i + 1000), + Timestamp: Timestamp(i + 1000), SourceID: 0, }, CollectionID: collectionID, PartitionID: defaultPartitionID, - SegmentID: int64(0), + SegmentID: UniqueID(0), ChannelID: "0", - Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)}, + Timestamps: []Timestamp{Timestamp(i + 1000), Timestamp(i + 1000)}, RowIDs: []int64{int64(i), int64(i)}, RowData: []*commonpb.Blob{ {Value: rawData}, diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 394a7ac210..1d13bc2c4c 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -12,7 +12,6 @@ package querynode import ( - "context" "sync" "github.com/opentracing/opentracing-go" @@ -30,8 +29,7 @@ type insertNode struct { } type InsertData struct { - insertContext map[int64]context.Context - insertIDs map[UniqueID][]UniqueID + insertIDs map[UniqueID][]int64 insertTimestamps map[UniqueID][]Timestamp insertRecords map[UniqueID][]*commonpb.Blob insertOffset map[UniqueID]int64 @@ -56,10 +54,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } insertData := InsertData{ - insertIDs: make(map[int64][]int64), - insertTimestamps: make(map[int64][]uint64), - insertRecords: make(map[int64][]*commonpb.Blob), - insertOffset: make(map[int64]int64), + insertIDs: make(map[UniqueID][]int64), + insertTimestamps: make(map[UniqueID][]Timestamp), + insertRecords: make(map[UniqueID][]*commonpb.Blob), + insertOffset: make(map[UniqueID]int64), } if iMsg == nil { @@ -134,7 +132,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } -func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { +func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) if err != nil { diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index fd0876ab5a..d75b782161 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -46,7 +46,7 @@ type indexLoader struct { kv kv.BaseKV // minio kv } -func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error { +func (loader *indexLoader) loadIndex(segment *Segment, fieldID FieldID) error { // 1. use msg's index paths to get index bytes var err error var indexBuffer [][]byte diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index 7e642da7ea..6631b620b3 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -67,7 +67,7 @@ func (li *LoadIndexInfo) appendIndexParam(indexKey string, indexValue string) er return nil } -func (li *LoadIndexInfo) appendFieldInfo(fieldID int64) error { +func (li *LoadIndexInfo) appendFieldInfo(fieldID FieldID) error { cFieldID := C.long(fieldID) status := C.AppendFieldInfo(li.cLoadIndexInfo, cFieldID) errorCode := status.error_code diff --git a/internal/querynode/plan.go b/internal/querynode/plan.go index 9be6dba362..0fc3e05c69 100644 --- a/internal/querynode/plan.go +++ b/internal/querynode/plan.go @@ -114,7 +114,7 @@ func (pg *searchRequest) delete() { type RetrievePlan struct { cRetrievePlan C.CRetrievePlan - Timestamp uint64 + Timestamp Timestamp } // func createRetrievePlan(col *Collection, msg *segcorepb.RetrieveRequest, timestamp uint64) (*RetrievePlan, error) { @@ -132,7 +132,7 @@ type RetrievePlan struct { // return plan, nil // } -func createRetrievePlanByExpr(col *Collection, expr []byte, timestamp uint64) (*RetrievePlan, error) { +func createRetrievePlanByExpr(col *Collection, expr []byte, timestamp Timestamp) (*RetrievePlan, error) { var cPlan C.CRetrievePlan status := C.CreateRetrievePlanByExpr(col.collectionPtr, (*C.char)(unsafe.Pointer(&expr[0])), (C.int64_t)(len(expr)), &cPlan) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index fde1e99c39..fffd4ca723 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -85,7 +85,7 @@ type Segment struct { segmentType segmentType paramMutex sync.RWMutex // guards index - indexInfos map[int64]*indexInfo + indexInfos map[FieldID]*indexInfo idBinlogRowSizes []int64 @@ -167,7 +167,7 @@ func (s *Segment) getVectorFieldInfo(fieldID UniqueID) (*VectorFieldInfo, error) return nil, errors.New("Invalid fieldID " + strconv.Itoa(int(fieldID))) } -func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) *Segment { +func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) *Segment { /* CSegmentInterface NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 9267a1ca2b..451d7c587a 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -164,7 +164,7 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment } } - indexedFieldIDs := make([]int64, 0) + indexedFieldIDs := make([]FieldID, 0) for _, vecFieldID := range vectorFieldIDs { err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID) if err != nil { diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 058bbf0589..c37abac431 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -93,7 +93,7 @@ func TestSegment_getRowCount(t *testing.T) { assert.Equal(t, segmentID, segment.segmentID) ids := []int64{1, 2, 3} - timestamps := []uint64{0, 0, 0} + timestamps := []Timestamp{0, 0, 0} const DIM = 16 const N = 3 @@ -207,7 +207,7 @@ func TestSegment_retrieve(t *testing.T) { }, }, }, - OutputFieldIds: []int64{101}, + OutputFieldIds: []FieldID{101}, } // reqIds := &segcorepb.RetrieveRequest{ // Ids: &schemapb.IDs{