From a0226c09f04ac2d3eb56255b9db75862a9ab2d6e Mon Sep 17 00:00:00 2001 From: dragondriver Date: Wed, 27 Jan 2021 13:52:01 +0800 Subject: [PATCH] Change the logic node id allocator in Proxy Signed-off-by: dragondriver --- configs/advanced/data_service.yaml | 2 +- internal/dataservice/cluster.go | 8 +- internal/proxyservice/nodeid_allocator.go | 21 ++- internal/queryservice/queryservice.go | 33 ++-- internal/queryservice/queryservice_test.go | 169 +++++++++++++++++++++ 5 files changed, 193 insertions(+), 40 deletions(-) diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml index bc6b41f7e4..49c405e793 100644 --- a/configs/advanced/data_service.yaml +++ b/configs/advanced/data_service.yaml @@ -10,4 +10,4 @@ dataservice: # old name: segmentExpireDuration: 2000 IDAssignExpiration: 2000 # ms insertChannelNumPerCollection: 4 - dataNodeNum: 1 \ No newline at end of file + dataNodeNum: 2 \ No newline at end of file diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 0ffd225149..ef74df537c 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -41,7 +41,7 @@ func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster { func (c *dataNodeCluster) Register(ip string, port int64, id int64) { c.mu.Lock() defer c.mu.Unlock() - if c.checkDataNodeNotExist(ip, port) { + if !c.checkDataNodeNotExist(ip, port) { c.nodes = append(c.nodes, &dataNode{ id: id, address: struct { @@ -50,9 +50,9 @@ func (c *dataNodeCluster) Register(ip string, port int64, id int64) { }{ip: ip, port: port}, channelNum: 0, }) - if len(c.nodes) == Params.DataNodeNum { - close(c.finishCh) - } + } + if len(c.nodes) == Params.DataNodeNum { + close(c.finishCh) } } diff --git a/internal/proxyservice/nodeid_allocator.go b/internal/proxyservice/nodeid_allocator.go index f4cc9cf82a..aa22db2823 100644 --- a/internal/proxyservice/nodeid_allocator.go +++ b/internal/proxyservice/nodeid_allocator.go @@ -1,7 +1,7 @@ package proxyservice import ( - "context" + "sync" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -17,22 +17,21 @@ type NodeIDAllocator interface { type NaiveNodeIDAllocatorImpl struct { impl *allocator.IDAllocator + now UniqueID + mtx sync.Mutex } func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID { - id, err := allocator.impl.AllocOne() - if err != nil { - panic(err) - } - return id + allocator.mtx.Lock() + defer func() { + allocator.now++ + allocator.mtx.Unlock() + }() + return allocator.now } func NewNodeIDAllocator() NodeIDAllocator { - impl, err := allocator.NewIDAllocator(context.Background(), Params.MasterAddress()) - if err != nil { - panic(err) - } return &NaiveNodeIDAllocatorImpl{ - impl: impl, + now: 0, } } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 25cb8a3864..942407fcbd 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -77,26 +77,6 @@ func (qs *QueryService) Stop() error { return nil } -//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error { -// for _, v := range qs.queryNodeClient { -// err := v.SetDataService(d) -// if err != nil { -// return err -// } -// } -// return nil -//} -// -//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error { -// for _, v := range qs.queryNodeClient { -// err := v.SetIndexService(i) -// if err != nil { -// return err -// } -// } -// return nil -//} - func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, @@ -134,6 +114,7 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { // TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) + // TODO:: add mutex allocatedID := qs.numRegisterNode qs.numRegisterNode++ @@ -158,6 +139,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb } qs.queryNodes = append(qs.queryNodes, node) + // TODO:: watch dm channels return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -289,7 +271,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm segmentIDs := showSegmentResponse.SegmentIDs segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) channel2id := make(map[string]int) - id2channels := make(map[int][]string) + //id2channels := make(map[int][]string) id2segs := make(map[int][]UniqueID) offset := 0 @@ -306,13 +288,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm for i, str := range state.StartPositions { flatChannelName += str.ChannelName channelNames = append(channelNames, str.ChannelName) - if i < len(state.StartPositions) { + if i+1 < len(state.StartPositions) { flatChannelName += "/" } } + if flatChannelName == "" { + log.Fatal("segmentState's channel name is empty") + } if _, ok := channel2id[flatChannelName]; !ok { channel2id[flatChannelName] = offset - id2channels[offset] = channelNames + //id2channels[offset] = channelNames id2segs[offset] = make([]UniqueID, 0) id2segs[offset] = append(id2segs[offset], segmentID) offset++ @@ -329,7 +314,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if segmentStates[v].State == datapb.SegmentState_SegmentFlushed { selectedSegs = append(selectedSegs, v) } else { - if i > 0 && segmentStates[v-1].State != datapb.SegmentState_SegmentFlushed { + if i > 0 && segmentStates[selectedSegs[i-1]].State != datapb.SegmentState_SegmentFlushed { break } selectedSegs = append(selectedSegs, v) diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index 49038ff7ab..d6319cf5c5 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -2,11 +2,148 @@ package queryservice import ( "context" + "strconv" "testing" "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) +type masterMock struct { + collectionIDs []UniqueID + col2partition map[UniqueID][]UniqueID + partition2segment map[UniqueID][]UniqueID +} + +func newMasterMock() *masterMock { + collectionIDs := make([]UniqueID, 0) + collectionIDs = append(collectionIDs, 1) + + col2partition := make(map[UniqueID][]UniqueID) + partitionIDs := make([]UniqueID, 0) + partitionIDs = append(partitionIDs, 1) + col2partition[1] = partitionIDs + + partition2segment := make(map[UniqueID][]UniqueID) + segmentIDs := make([]UniqueID, 0) + segmentIDs = append(segmentIDs, 1) + segmentIDs = append(segmentIDs, 2) + segmentIDs = append(segmentIDs, 3) + segmentIDs = append(segmentIDs, 4) + segmentIDs = append(segmentIDs, 5) + segmentIDs = append(segmentIDs, 6) + partition2segment[1] = segmentIDs + + return &masterMock{ + collectionIDs: collectionIDs, + col2partition: col2partition, + partition2segment: partition2segment, + } +} + +func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { + collectionID := in.CollectionID + partitionIDs := make([]UniqueID, 0) + for _, id := range master.collectionIDs { + if id == collectionID { + partitions := master.col2partition[collectionID] + partitionIDs = append(partitionIDs, partitions...) + } + } + response := &milvuspb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + PartitionIDs: partitionIDs, + } + + return response, nil +} + +func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { + collectionID := in.CollectionID + partitionID := in.PartitionID + + for _, id := range master.collectionIDs { + if id == collectionID { + partitions := master.col2partition[collectionID] + for _, partition := range partitions { + if partition == partitionID { + return &milvuspb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + SegmentIDs: master.partition2segment[partition], + }, nil + } + } + } + } + + return nil, errors.New("collection id or partition id not found") +} + +type dataMock struct { + segmentIDs []UniqueID + segmentStates map[UniqueID]*datapb.SegmentStatesResponse +} + +func newDataMock() *dataMock { + positions1 := make([]*internalpb2.MsgPosition, 0) + positions2 := make([]*internalpb2.MsgPosition, 0) + positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)}) + positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)}) + positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)}) + positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)}) + + segmentIDs := make([]UniqueID, 0) + segmentIDs = append(segmentIDs, 1) + segmentIDs = append(segmentIDs, 2) + segmentIDs = append(segmentIDs, 3) + segmentIDs = append(segmentIDs, 4) + segmentIDs = append(segmentIDs, 5) + segmentIDs = append(segmentIDs, 6) + + fillStates := func(time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStatesResponse { + return &datapb.SegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + State: state, + CreateTime: time, + StartPositions: position, + } + } + segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) + segmentStates[1] = fillStates(1, positions1, datapb.SegmentState_SegmentFlushed) + segmentStates[2] = fillStates(2, positions2, datapb.SegmentState_SegmentFlushed) + segmentStates[3] = fillStates(3, positions1, datapb.SegmentState_SegmentFlushed) + segmentStates[4] = fillStates(4, positions2, datapb.SegmentState_SegmentFlushed) + segmentStates[5] = fillStates(5, positions1, datapb.SegmentState_SegmentGrowing) + segmentStates[6] = fillStates(6, positions2, datapb.SegmentState_SegmentGrowing) + + return &dataMock{ + segmentIDs: segmentIDs, + segmentStates: segmentStates, + } +} + +func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { + segmentID := req.SegmentID + for _, id := range data.segmentIDs { + if segmentID == id { + return data.segmentStates[id], nil + } + } + return nil, errors.New("segment id not found") +} + func TestQueryService_Init(t *testing.T) { service, err := NewQueryService(context.Background()) assert.Nil(t, err) @@ -34,3 +171,35 @@ func TestQueryService_Init(t *testing.T) { service.Stop() } + +func TestQueryService_load(t *testing.T) { + service, err := NewQueryService(context.Background()) + assert.Nil(t, err) + service.Init() + service.Start() + service.SetMasterService(newMasterMock()) + service.SetDataService(newDataMock()) + registerNodeRequest := &querypb.RegisterNodeRequest{ + Address: &commonpb.Address{}, + } + service.RegisterNode(registerNodeRequest) + + t.Run("Test LoadSegment", func(t *testing.T) { + loadCollectionRequest := &querypb.LoadCollectionRequest{ + CollectionID: 1, + } + response, err := service.LoadCollection(loadCollectionRequest) + assert.Nil(t, err) + assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) + + t.Run("Test LoadPartition", func(t *testing.T) { + loadPartitionRequest := &querypb.LoadPartitionRequest{ + CollectionID: 1, + PartitionIDs: []UniqueID{1}, + } + response, err := service.LoadPartitions(loadPartitionRequest) + assert.Nil(t, err) + assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) +}