From 72ed65d7f8682dba915c8b46cfb677e108a8dbf5 Mon Sep 17 00:00:00 2001 From: quicksilver Date: Tue, 17 Nov 2020 18:36:03 +0800 Subject: [PATCH] Check builder images Signed-off-by: quicksilver --- .github/workflows/main.yaml | 2 + build/builder.sh | 6 + cmd/reader/reader.go | 4 +- internal/master/collection_task.go | 4 +- internal/master/meta_table.go | 11 + internal/master/meta_table_test.go | 19 +- internal/master/task_test.go | 302 ------------------ ...ection_replica.go => col_seg_container.go} | 155 ++++----- ...lica_test.go => col_seg_container_test.go} | 151 +++++---- internal/reader/collection_test.go | 11 +- internal/reader/data_sync_service.go | 48 ++- internal/reader/data_sync_service_test.go | 15 +- internal/reader/flow_graph_insert_node.go | 18 +- .../flow_graph_msg_stream_input_nodes.go | 10 +- .../reader/flow_graph_service_time_node.go | 8 +- internal/reader/meta_service.go | 26 +- internal/reader/meta_service_test.go | 129 ++++---- internal/reader/param_table_test.go | 25 -- .../reader/{param_table.go => paramtable.go} | 17 - internal/reader/partition_test.go | 13 +- internal/reader/query_node.go | 47 ++- internal/reader/query_node_test.go | 6 +- internal/reader/reader.go | 4 +- internal/reader/search_service.go | 20 +- internal/reader/search_service_test.go | 17 +- internal/reader/stats_service.go | 32 +- internal/reader/stats_service_test.go | 179 ++++++----- internal/util/flowgraph/flow_graph.go | 10 - 28 files changed, 493 insertions(+), 796 deletions(-) delete mode 100644 internal/master/task_test.go rename internal/reader/{collection_replica.go => col_seg_container.go} (52%) rename internal/reader/{collection_replica_test.go => col_seg_container_test.go} (74%) delete mode 100644 internal/reader/param_table_test.go rename internal/reader/{param_table.go => paramtable.go} (64%) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index ca20e836f5..e48cb1efc8 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -59,5 +59,7 @@ jobs: run: | cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose up -d - name: Build and UnitTest + env: + CHECK_BUILDER: "1" run: | ./build/builder.sh diff --git a/build/builder.sh b/build/builder.sh index 2e88404661..a830a7be0b 100755 --- a/build/builder.sh +++ b/build/builder.sh @@ -18,6 +18,9 @@ if [ "${1-}" = "gdbserver" ]; then chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}" docker-compose pull --ignore-pull-failures gdbserver + if [ "${CHECK_BUILDER:-}" == "1" ]; then + docker-compose build gdbserver + fi docker-compose up -d gdbserver exit 0 fi @@ -39,6 +42,9 @@ mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-cache" chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}" docker-compose pull --ignore-pull-failures ubuntu +if [ "${CHECK_BUILDER:-}" == "1" ]; then + docker-compose build ubuntu +fi docker-compose run --rm -u "$uid:$gid" ubuntu "$@" popd diff --git a/cmd/reader/reader.go b/cmd/reader/reader.go index 386ed51539..cc8a48a01e 100644 --- a/cmd/reader/reader.go +++ b/cmd/reader/reader.go @@ -27,8 +27,8 @@ func main() { sig = <-sc cancel() }() - - reader.StartQueryNode(ctx) + pulsarAddress, _ := reader.Params.PulsarAddress() + reader.StartQueryNode(ctx, pulsarAddress) switch sig { case syscall.SIGTERM: diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index 54cf7c7ab8..dc63df5ccb 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -167,7 +167,7 @@ func (t *describeCollectionTask) Ts() (Timestamp, error) { if t.req == nil { return 0, errors.New("null request") } - return t.req.Timestamp, nil + return Timestamp(t.req.Timestamp), nil } func (t *describeCollectionTask) Execute() error { @@ -199,7 +199,7 @@ func (t *showCollectionsTask) Ts() (Timestamp, error) { if t.req == nil { return 0, errors.New("null request") } - return t.req.Timestamp, nil + return Timestamp(t.req.Timestamp), nil } func (t *showCollectionsTask) Execute() error { diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 9c7dbfdae6..3d4bcdf100 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -132,6 +132,17 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error { return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes)) } +// mt.ddLock.Lock() before call this function +func (mt *metaTable) deleteSegmentMeta(segID UniqueID) error { + _, ok := mt.segID2Meta[segID] + + if ok { + delete(mt.segID2Meta, segID) + } + + return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10)) +} + // mt.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error { segIDStrs := make([]string, 0, len(segIDs)) diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index df02a27ea5..cc2e532a1e 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -73,7 +73,7 @@ func TestMetaTable_Collection(t *testing.T) { Name: "coll2", }, CreateTime: 0, - SegmentIDs: []UniqueID{}, + SegmentIDs: []UniqueID{1}, PartitionTags: []string{"1"}, } segID1 := pb.SegmentMeta{ @@ -121,16 +121,11 @@ func TestMetaTable_Collection(t *testing.T) { assert.Nil(t, err) err = meta.AddSegment(&segID3) assert.Nil(t, err) - getColMeta, err := meta.GetCollectionByName("coll5") - assert.NotNil(t, err) - assert.Nil(t, getColMeta) - getColMeta, err = meta.GetCollectionByName(colMeta.Schema.Name) + getColMeta, err := meta.GetCollectionByName(colMeta.Schema.Name) assert.Nil(t, err) assert.Equal(t, 3, len(getColMeta.SegmentIDs)) err = meta.DeleteCollection(colMeta.ID) assert.Nil(t, err) - err = meta.DeleteCollection(500) - assert.NotNil(t, err) hasCollection = meta.HasCollection(colMeta.ID) assert.False(t, hasCollection) _, err = meta.GetSegmentByID(segID1.SegmentID) @@ -198,14 +193,10 @@ func TestMetaTable_DeletePartition(t *testing.T) { } err = meta.AddCollection(&colMeta) assert.Nil(t, err) - err = meta.AddPartition(500, "p1") - assert.NotNil(t, err) err = meta.AddPartition(colMeta.ID, "p1") assert.Nil(t, err) err = meta.AddPartition(colMeta.ID, "p2") assert.Nil(t, err) - err = meta.AddPartition(colMeta.ID, "p2") - assert.NotNil(t, err) err = meta.AddSegment(&segID1) assert.Nil(t, err) err = meta.AddSegment(&segID2) @@ -218,8 +209,6 @@ func TestMetaTable_DeletePartition(t *testing.T) { assert.Equal(t, 3, len(afterCollMeta.SegmentIDs)) err = meta.DeletePartition(100, "p1") assert.Nil(t, err) - err = meta.DeletePartition(500, "p1") - assert.NotNil(t, err) afterCollMeta, err = meta.GetCollectionByName("coll1") assert.Nil(t, err) assert.Equal(t, 1, len(afterCollMeta.PartitionTags)) @@ -296,16 +285,12 @@ func TestMetaTable_Segment(t *testing.T) { assert.Equal(t, &segMeta, getSegMeta) err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111) assert.Nil(t, err) - err = meta.CloseSegment(1000, Timestamp(11), 111) - assert.NotNil(t, err) getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID) assert.Nil(t, err) assert.Equal(t, getSegMeta.NumRows, int64(111)) assert.Equal(t, getSegMeta.CloseTime, uint64(11)) err = meta.DeleteSegment(segMeta.SegmentID) assert.Nil(t, err) - err = meta.DeleteSegment(1000) - assert.NotNil(t, err) getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID) assert.Nil(t, getSegMeta) assert.NotNil(t, err) diff --git a/internal/master/task_test.go b/internal/master/task_test.go deleted file mode 100644 index 3753fd6181..0000000000 --- a/internal/master/task_test.go +++ /dev/null @@ -1,302 +0,0 @@ -package master - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -func TestMaster_CreateCollectionTask(t *testing.T) { - req := internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Schema: nil, - } - var collectionTask task = &createCollectionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kCreateCollection, collectionTask.Type()) - ts, err := collectionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - collectionTask = &createCollectionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type()) - ts, err = collectionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = collectionTask.Execute() - assert.NotNil(t, err) -} - -func TestMaster_DropCollectionTask(t *testing.T) { - req := internalpb.DropCollectionRequest{ - MsgType: internalpb.MsgType_kDropPartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: nil, - } - var collectionTask task = &dropCollectionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kDropPartition, collectionTask.Type()) - ts, err := collectionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - collectionTask = &dropCollectionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type()) - ts, err = collectionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = collectionTask.Execute() - assert.NotNil(t, err) -} - -func TestMaster_HasCollectionTask(t *testing.T) { - req := internalpb.HasCollectionRequest{ - MsgType: internalpb.MsgType_kHasCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: nil, - } - var collectionTask task = &hasCollectionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kHasCollection, collectionTask.Type()) - ts, err := collectionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - collectionTask = &hasCollectionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type()) - ts, err = collectionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = collectionTask.Execute() - assert.NotNil(t, err) -} - -func TestMaster_ShowCollectionTask(t *testing.T) { - req := internalpb.ShowCollectionRequest{ - MsgType: internalpb.MsgType_kShowCollections, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - } - var collectionTask task = &showCollectionsTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kShowCollections, collectionTask.Type()) - ts, err := collectionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - collectionTask = &showCollectionsTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type()) - ts, err = collectionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = collectionTask.Execute() - assert.NotNil(t, err) -} - -func TestMaster_DescribeCollectionTask(t *testing.T) { - req := internalpb.DescribeCollectionRequest{ - MsgType: internalpb.MsgType_kDescribeCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: nil, - } - var collectionTask task = &describeCollectionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kDescribeCollection, collectionTask.Type()) - ts, err := collectionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - collectionTask = &describeCollectionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type()) - ts, err = collectionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = collectionTask.Execute() - assert.NotNil(t, err) -} - -func TestMaster_CreatePartitionTask(t *testing.T) { - req := internalpb.CreatePartitionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: nil, - } - var partitionTask task = &createPartitionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kCreatePartition, partitionTask.Type()) - ts, err := partitionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - partitionTask = &createPartitionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type()) - ts, err = partitionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = partitionTask.Execute() - assert.NotNil(t, err) -} -func TestMaster_DropPartitionTask(t *testing.T) { - req := internalpb.DropPartitionRequest{ - MsgType: internalpb.MsgType_kDropPartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: nil, - } - var partitionTask task = &dropPartitionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kDropPartition, partitionTask.Type()) - ts, err := partitionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - partitionTask = &dropPartitionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type()) - ts, err = partitionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = partitionTask.Execute() - assert.NotNil(t, err) -} -func TestMaster_HasPartitionTask(t *testing.T) { - req := internalpb.HasPartitionRequest{ - MsgType: internalpb.MsgType_kHasPartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: nil, - } - var partitionTask task = &hasPartitionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kHasPartition, partitionTask.Type()) - ts, err := partitionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - partitionTask = &hasPartitionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type()) - ts, err = partitionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = partitionTask.Execute() - assert.NotNil(t, err) -} -func TestMaster_DescribePartitionTask(t *testing.T) { - req := internalpb.DescribePartitionRequest{ - MsgType: internalpb.MsgType_kDescribePartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: nil, - } - var partitionTask task = &describePartitionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kDescribePartition, partitionTask.Type()) - ts, err := partitionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - partitionTask = &describePartitionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type()) - ts, err = partitionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = partitionTask.Execute() - assert.NotNil(t, err) -} -func TestMaster_ShowPartitionTask(t *testing.T) { - req := internalpb.ShowPartitionRequest{ - MsgType: internalpb.MsgType_kShowPartitions, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - } - var partitionTask task = &showPartitionTask{ - req: &req, - baseTask: baseTask{}, - } - assert.Equal(t, internalpb.MsgType_kShowPartitions, partitionTask.Type()) - ts, err := partitionTask.Ts() - assert.Equal(t, uint64(11), ts) - assert.Nil(t, err) - - partitionTask = &showPartitionTask{ - req: nil, - baseTask: baseTask{}, - } - - assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type()) - ts, err = partitionTask.Ts() - assert.Equal(t, uint64(0), ts) - assert.NotNil(t, err) - err = partitionTask.Execute() - assert.NotNil(t, err) -} diff --git a/internal/reader/collection_replica.go b/internal/reader/col_seg_container.go similarity index 52% rename from internal/reader/collection_replica.go rename to internal/reader/col_seg_container.go index 0de8917cbf..5020bf4ce7 100644 --- a/internal/reader/collection_replica.go +++ b/internal/reader/col_seg_container.go @@ -20,19 +20,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" ) -/* - * collectionReplica contains a in-memory local copy of persistent collections. - * In common cases, the system has multiple query nodes. Data of a collection will be - * distributed across all the available query nodes, and each query node's collectionReplica - * will maintain its own share (only part of the collection). - * Every replica tracks a value called tSafe which is the maximum timestamp that the replica - * is up-to-date. - */ -type collectionReplica interface { - // tSafe - getTSafe() Timestamp - setTSafe(t Timestamp) - +type container interface { // collection getCollectionNum() int addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error @@ -56,51 +44,36 @@ type collectionReplica interface { hasSegment(segmentID UniqueID) bool } -type collectionReplicaImpl struct { - tSafeMu sync.Mutex - tSafe Timestamp - +// TODO: rename +type colSegContainer struct { mu sync.RWMutex collections []*Collection segments map[UniqueID]*Segment } -//----------------------------------------------------------------------------------------------------- tSafe -func (colReplica *collectionReplicaImpl) getTSafe() Timestamp { - colReplica.tSafeMu.Lock() - defer colReplica.tSafeMu.Unlock() - return colReplica.tSafe -} - -func (colReplica *collectionReplicaImpl) setTSafe(t Timestamp) { - colReplica.tSafeMu.Lock() - colReplica.tSafe = t - colReplica.tSafeMu.Unlock() -} - //----------------------------------------------------------------------------------------------------- collection -func (colReplica *collectionReplicaImpl) getCollectionNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) getCollectionNum() int { + container.mu.RLock() + defer container.mu.RUnlock() - return len(colReplica.collections) + return len(container.collections) } -func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { + container.mu.Lock() + defer container.mu.Unlock() var newCollection = newCollection(collMeta, colMetaBlob) - colReplica.collections = append(colReplica.collections, newCollection) + container.collections = append(container.collections, newCollection) return nil } -func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { - collection, err := colReplica.getCollectionByID(collectionID) +func (container *colSegContainer) removeCollection(collectionID UniqueID) error { + collection, err := container.getCollectionByID(collectionID) - colReplica.mu.Lock() - defer colReplica.mu.Unlock() + container.mu.Lock() + defer container.mu.Unlock() if err != nil { return err @@ -109,11 +82,11 @@ func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) deleteCollection(collection) tmpCollections := make([]*Collection, 0) - for _, col := range colReplica.collections { + for _, col := range container.collections { if col.ID() == collectionID { for _, p := range *col.Partitions() { for _, s := range *p.Segments() { - delete(colReplica.segments, s.ID()) + delete(container.segments, s.ID()) } } } else { @@ -121,15 +94,15 @@ func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) } } - colReplica.collections = tmpCollections + container.collections = tmpCollections return nil } -func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) { + container.mu.RLock() + defer container.mu.RUnlock() - for _, collection := range colReplica.collections { + for _, collection := range container.collections { if collection.ID() == collectionID { return collection, nil } @@ -138,11 +111,11 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10)) } -func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) { + container.mu.RLock() + defer container.mu.RUnlock() - for _, collection := range colReplica.collections { + for _, collection := range container.collections { if collection.Name() == collectionName { return collection, nil } @@ -152,14 +125,14 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri } //----------------------------------------------------------------------------------------------------- partition -func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { - collection, err := colReplica.getCollectionByID(collectionID) +func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error { + collection, err := container.getCollectionByID(collectionID) if err != nil { return err } - colReplica.mu.Lock() - defer colReplica.mu.Unlock() + container.mu.Lock() + defer container.mu.Unlock() var newPartition = newPartition(partitionTag) @@ -167,20 +140,20 @@ func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, par return nil } -func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error { - collection, err := colReplica.getCollectionByID(collectionID) +func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error { + collection, err := container.getCollectionByID(collectionID) if err != nil { return err } - colReplica.mu.Lock() - defer colReplica.mu.Unlock() + container.mu.Lock() + defer container.mu.Unlock() var tmpPartitions = make([]*Partition, 0) for _, p := range *collection.Partitions() { if p.Tag() == partitionTag { for _, s := range *p.Segments() { - delete(colReplica.segments, s.ID()) + delete(container.segments, s.ID()) } } else { tmpPartitions = append(tmpPartitions, p) @@ -191,14 +164,14 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, return nil } -func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { - collection, err := colReplica.getCollectionByID(collectionID) +func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { + collection, err := container.getCollectionByID(collectionID) if err != nil { return nil, err } - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() + container.mu.RLock() + defer container.mu.RUnlock() for _, p := range *collection.Partitions() { if p.Tag() == partitionTag { @@ -210,17 +183,17 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID } //----------------------------------------------------------------------------------------------------- segment -func (colReplica *collectionReplicaImpl) getSegmentNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) getSegmentNum() int { + container.mu.RLock() + defer container.mu.RUnlock() - return len(colReplica.segments) + return len(container.segments) } -func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats { +func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats { var statisticData = make([]*internalpb.SegmentStats, 0) - for segmentID, segment := range colReplica.segments { + for segmentID, segment := range container.segments { currentMemSize := segment.getMemSize() segment.lastMemSize = currentMemSize segmentNumOfRows := segment.getRowCount() @@ -242,36 +215,36 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.Quer } } -func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { - collection, err := colReplica.getCollectionByID(collectionID) +func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { + collection, err := container.getCollectionByID(collectionID) if err != nil { return err } - partition, err := colReplica.getPartitionByTag(collectionID, partitionTag) + partition, err := container.getPartitionByTag(collectionID, partitionTag) if err != nil { return err } - colReplica.mu.Lock() - defer colReplica.mu.Unlock() + container.mu.Lock() + defer container.mu.Unlock() var newSegment = newSegment(collection, segmentID) - colReplica.segments[segmentID] = newSegment + container.segments[segmentID] = newSegment *partition.Segments() = append(*partition.Segments(), newSegment) return nil } -func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (container *colSegContainer) removeSegment(segmentID UniqueID) error { + container.mu.Lock() + defer container.mu.Unlock() var targetPartition *Partition var segmentIndex = -1 - for _, col := range colReplica.collections { + for _, col := range container.collections { for _, p := range *col.Partitions() { for i, s := range *p.Segments() { if s.ID() == segmentID { @@ -282,7 +255,7 @@ func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error } } - delete(colReplica.segments, segmentID) + delete(container.segments, segmentID) if targetPartition != nil && segmentIndex > 0 { targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...) @@ -291,11 +264,11 @@ func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error return nil } -func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) { + container.mu.RLock() + defer container.mu.RUnlock() - targetSegment, ok := colReplica.segments[segmentID] + targetSegment, ok := container.segments[segmentID] if !ok { return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) @@ -304,11 +277,11 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se return targetSegment, nil } -func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (container *colSegContainer) hasSegment(segmentID UniqueID) bool { + container.mu.RLock() + defer container.mu.RUnlock() - _, ok := colReplica.segments[segmentID] + _, ok := container.segments[segmentID] return ok } diff --git a/internal/reader/collection_replica_test.go b/internal/reader/col_seg_container_test.go similarity index 74% rename from internal/reader/collection_replica_test.go rename to internal/reader/col_seg_container_test.go index 864cd38ed6..0636997c1e 100644 --- a/internal/reader/collection_replica_test.go +++ b/internal/reader/col_seg_container_test.go @@ -15,7 +15,8 @@ import ( //----------------------------------------------------------------------------------------------------- collection func TestColSegContainer_addCollection(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -58,19 +59,20 @@ func TestColSegContainer_addCollection(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) } func TestColSegContainer_removeCollection(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -114,24 +116,25 @@ func TestColSegContainer_removeCollection(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).removeCollection(collectionID) + err = (*node.container).removeCollection(collectionID) assert.NoError(t, err) - assert.Equal(t, (*node.replica).getCollectionNum(), 0) + assert.Equal(t, (*node.container).getCollectionNum(), 0) } func TestColSegContainer_getCollectionByID(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -174,17 +177,17 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - targetCollection, err := (*node.replica).getCollectionByID(UniqueID(0)) + targetCollection, err := (*node.container).getCollectionByID(UniqueID(0)) assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") @@ -193,7 +196,8 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { func TestColSegContainer_getCollectionByName(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -236,17 +240,17 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - targetCollection, err := (*node.replica).getCollectionByName("collection0") + targetCollection, err := (*node.container).getCollectionByName("collection0") assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") @@ -256,7 +260,8 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { //----------------------------------------------------------------------------------------------------- partition func TestColSegContainer_addPartition(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -300,20 +305,20 @@ func TestColSegContainer_addPartition(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.replica).addPartition(collectionID, tag) + err := (*node.container).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.replica).getPartitionByTag(collectionID, tag) + partition, err := (*node.container).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, "default") } @@ -321,7 +326,8 @@ func TestColSegContainer_addPartition(t *testing.T) { func TestColSegContainer_removePartition(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -366,30 +372,31 @@ func TestColSegContainer_removePartition(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.replica).addPartition(collectionID, tag) + err := (*node.container).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.replica).getPartitionByTag(collectionID, tag) + partition, err := (*node.container).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, partitionTag) - err = (*node.replica).removePartition(collectionID, partitionTag) + err = (*node.container).removePartition(collectionID, partitionTag) assert.NoError(t, err) } } func TestColSegContainer_getPartitionByTag(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -433,20 +440,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.replica).addPartition(collectionID, tag) + err := (*node.container).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.replica).getPartitionByTag(collectionID, tag) + partition, err := (*node.container).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, "default") assert.NotNil(t, partition) @@ -456,7 +463,8 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { //----------------------------------------------------------------------------------------------------- segment func TestColSegContainer_addSegment(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -500,24 +508,24 @@ func TestColSegContainer_addSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -525,7 +533,8 @@ func TestColSegContainer_addSegment(t *testing.T) { func TestColSegContainer_removeSegment(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -569,34 +578,35 @@ func TestColSegContainer_removeSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - err = (*node.replica).removeSegment(UniqueID(i)) + err = (*node.container).removeSegment(UniqueID(i)) assert.NoError(t, err) } } func TestColSegContainer_getSegmentByID(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -640,24 +650,24 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -665,7 +675,8 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { func TestColSegContainer_hasSegment(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" collectionID := UniqueID(0) @@ -709,29 +720,29 @@ func TestColSegContainer_hasSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - hasSeg := (*node.replica).hasSegment(UniqueID(i)) + hasSeg := (*node.container).hasSegment(UniqueID(i)) assert.Equal(t, hasSeg, true) - hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) + hasSeg = (*node.container).hasSegment(UniqueID(i + 100)) assert.Equal(t, hasSeg, false) } } diff --git a/internal/reader/collection_test.go b/internal/reader/collection_test.go index 58d8747a2e..01cae1211b 100644 --- a/internal/reader/collection_test.go +++ b/internal/reader/collection_test.go @@ -13,7 +13,8 @@ import ( func TestCollection_Partitions(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -56,18 +57,18 @@ func TestCollection_Partitions(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.replica).addPartition(collection.ID(), tag) + err := (*node.container).addPartition(collection.ID(), tag) assert.NoError(t, err) } diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index ffc24e382c..1f2dba1f4e 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -4,23 +4,33 @@ import ( "context" "log" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type dataSyncService struct { - ctx context.Context - fg *flowgraph.TimeTickedFlowGraph + ctx context.Context + pulsarURL string + fg *flowgraph.TimeTickedFlowGraph - replica *collectionReplica + // input streams + dmStream *msgstream.MsgStream + // ddStream *msgstream.MsgStream + // k2sStream *msgstream.MsgStream + + node *QueryNode } -func newDataSyncService(ctx context.Context, replica *collectionReplica) *dataSyncService { +func newDataSyncService(ctx context.Context, node *QueryNode, pulsarURL string) *dataSyncService { return &dataSyncService{ - ctx: ctx, - fg: nil, + ctx: ctx, + pulsarURL: pulsarURL, + fg: nil, - replica: replica, + dmStream: nil, + + node: node, } } @@ -31,6 +41,7 @@ func (dsService *dataSyncService) start() { func (dsService *dataSyncService) close() { dsService.fg.Close() + (*dsService.dmStream).Close() } func (dsService *dataSyncService) initNodes() { @@ -38,10 +49,10 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - var dmStreamNode Node = newDmInputNode(dsService.ctx) + var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL) var filterDmNode Node = newFilteredDmNode() - var insertNode Node = newInsertNode(dsService.replica) - var serviceTimeNode Node = newServiceTimeNode(dsService.replica) + var insertNode Node = newInsertNode(dsService.node.container) + var serviceTimeNode Node = newServiceTimeNode(dsService.node) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&filterDmNode) @@ -79,4 +90,21 @@ func (dsService *dataSyncService) initNodes() { if err != nil { log.Fatal("set edges failed in node:", serviceTimeNode.Name()) } + + dsService.setDmStream(&dmStreamNode) } + +func (dsService *dataSyncService) setDmStream(node *Node) { + if (*node).IsInputNode() { + inStream, ok := (*node).(*InputNode) + dsService.dmStream = inStream.InStream() + if !ok { + log.Fatal("Invalid inputNode") + } + } else { + log.Fatal("stream set failed") + } +} + +func (dsService *dataSyncService) setDdStream(node *Node) {} +func (dsService *dataSyncService) setK2sStream(node *Node) {} diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index a3dbaa9b6e..4284b74727 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -19,7 +19,6 @@ import ( // NOTE: start pulsar before test func TestManipulationService_Start(t *testing.T) { - Params.Init() var ctx context.Context if closeWithDeadline { @@ -33,7 +32,7 @@ func TestManipulationService_Start(t *testing.T) { // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0) + node := NewQueryNode(ctx, 0, pulsarURL) // init meta collectionName := "collection0" @@ -77,20 +76,20 @@ func TestManipulationService_Start(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // test data generate @@ -169,7 +168,7 @@ func TestManipulationService_Start(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.ctx, node.replica) + node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) go node.dataSyncService.start() node.Close() diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index cb8c9b29ab..77500545e8 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -10,7 +10,7 @@ import ( type insertNode struct { BaseNode - replica *collectionReplica + container *container } type InsertData struct { @@ -58,13 +58,13 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) // check if segment exists, if not, create this segment - if !(*iNode.replica).hasSegment(task.SegmentID) { - collection, err := (*iNode.replica).getCollectionByName(task.CollectionName) + if !(*iNode.container).hasSegment(task.SegmentID) { + collection, err := (*iNode.container).getCollectionByName(task.CollectionName) if err != nil { log.Println(err) continue } - err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) + err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) if err != nil { log.Println(err) continue @@ -74,7 +74,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // 2. do preInsert for segmentID := range insertData.insertRecords { - var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) + var targetSegment, err = (*iNode.container).getSegmentByID(segmentID) if err != nil { log.Println("preInsert failed") // TODO: add error handling @@ -102,7 +102,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { } func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { - var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) + var targetSegment, err = (*iNode.container).getSegmentByID(segmentID) if err != nil { log.Println("cannot find segment:", segmentID) // TODO: add error handling @@ -125,13 +125,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn wg.Done() } -func newInsertNode(replica *collectionReplica) *insertNode { +func newInsertNode(container *container) *insertNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - BaseNode: baseNode, - replica: replica, + BaseNode: baseNode, + container: container, } } diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/reader/flow_graph_msg_stream_input_nodes.go index 5eec6306c0..d00574c078 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/reader/flow_graph_msg_stream_input_nodes.go @@ -2,28 +2,22 @@ package reader import ( "context" - "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, pulsarURL string) *flowgraph.InputNode { const ( receiveBufSize = 1024 pulsarBufSize = 1024 ) - msgStreamURL, err := Params.PulsarAddress() - if err != nil { - log.Fatal(err) - } - consumeChannels := []string{"insert"} consumeSubName := "insertSub" insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarCient(msgStreamURL) + insertStream.SetPulsarCient(pulsarURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 25c6d74028..50ca674ff8 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -6,7 +6,7 @@ import ( type serviceTimeNode struct { BaseNode - replica *collectionReplica + node *QueryNode } func (stNode *serviceTimeNode) Name() string { @@ -28,17 +28,17 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } // update service time - (*stNode.replica).setTSafe(serviceTimeMsg.timeRange.timestampMax) + stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax) return nil } -func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { +func newServiceTimeNode(node *QueryNode) *serviceTimeNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &serviceTimeNode{ BaseNode: baseNode, - replica: replica, + node: node, } } diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 2b98161b2a..8af38ca830 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -24,12 +24,12 @@ const ( ) type metaService struct { - ctx context.Context - kvBase *kv.EtcdKV - replica *collectionReplica + ctx context.Context + kvBase *kv.EtcdKV + container *container } -func newMetaService(ctx context.Context, replica *collectionReplica) *metaService { +func newMetaService(ctx context.Context, container *container) *metaService { ETCDAddr, err := Params.EtcdAddress() if err != nil { panic(err) @@ -46,9 +46,9 @@ func newMetaService(ctx context.Context, replica *collectionReplica) *metaServic }) return &metaService{ - ctx: ctx, - kvBase: kv.NewEtcdKV(cli, ETCDRootPath), - replica: replica, + ctx: ctx, + kvBase: kv.NewEtcdKV(cli, ETCDRootPath), + container: container, } } @@ -164,12 +164,12 @@ func (mService *metaService) processCollectionCreate(id string, value string) { col := mService.collectionUnmarshal(value) if col != nil { - err := (*mService.replica).addCollection(col, value) + err := (*mService.container).addCollection(col, value) if err != nil { log.Println(err) } for _, partitionTag := range col.PartitionTags { - err = (*mService.replica).addPartition(col.ID, partitionTag) + err = (*mService.container).addPartition(col.ID, partitionTag) if err != nil { log.Println(err) } @@ -187,7 +187,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) { // TODO: what if seg == nil? We need to notify master and return rpc request failed if seg != nil { - err := (*mService.replica).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID) + err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID) if err != nil { log.Println(err) return @@ -216,7 +216,7 @@ func (mService *metaService) processSegmentModify(id string, value string) { } if seg != nil { - targetSegment, err := (*mService.replica).getSegmentByID(seg.SegmentID) + targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID) if err != nil { log.Println(err) return @@ -251,7 +251,7 @@ func (mService *metaService) processSegmentDelete(id string) { log.Println("Cannot parse segment id:" + id) } - err = (*mService.replica).removeSegment(segmentID) + err = (*mService.container).removeSegment(segmentID) if err != nil { log.Println(err) return @@ -266,7 +266,7 @@ func (mService *metaService) processCollectionDelete(id string) { log.Println("Cannot parse collection id:" + id) } - err = (*mService.replica).removeCollection(collectionID) + err = (*mService.container).removeCollection(collectionID) if err != nil { log.Println(err) return diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index b2c908aca3..4c366b3535 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -27,8 +27,9 @@ func TestMetaService_start(t *testing.T) { } // init query node - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) (*node.metaService).start() } @@ -186,8 +187,9 @@ func TestMetaService_processCollectionCreate(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) id := "0" value := `schema: < @@ -215,10 +217,10 @@ func TestMetaService_processCollectionCreate(t *testing.T) { node.metaService.processCollectionCreate(id, value) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) } @@ -231,8 +233,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -275,10 +278,10 @@ func TestMetaService_processSegmentCreate(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.replica).addPartition(UniqueID(0), "default") + err = (*node.container).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -290,7 +293,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { (*node.metaService).processSegmentCreate(id, value) - s, err := (*node.replica).getSegmentByID(UniqueID(0)) + s, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) } @@ -303,8 +306,9 @@ func TestMetaService_processCreate(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -331,10 +335,10 @@ func TestMetaService_processCreate(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -346,7 +350,7 @@ func TestMetaService_processCreate(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - s, err := (*node.replica).getSegmentByID(UniqueID(0)) + s, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) } @@ -359,8 +363,9 @@ func TestMetaService_processSegmentModify(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -403,10 +408,10 @@ func TestMetaService_processSegmentModify(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.replica).addPartition(UniqueID(0), "default") + err = (*node.container).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -417,7 +422,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { ` (*node.metaService).processSegmentCreate(id, value) - s, err := (*node.replica).getSegmentByID(UniqueID(0)) + s, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) @@ -429,7 +434,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { // TODO: modify segment for testing processCollectionModify (*node.metaService).processSegmentModify(id, newValue) - seg, err := (*node.replica).getSegmentByID(UniqueID(0)) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) } @@ -442,8 +447,9 @@ func TestMetaService_processCollectionModify(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) id := "0" value := `schema: < @@ -470,10 +476,10 @@ func TestMetaService_processCollectionModify(t *testing.T) { ` (*node.metaService).processCollectionCreate(id, value) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -502,7 +508,7 @@ func TestMetaService_processCollectionModify(t *testing.T) { ` (*node.metaService).processCollectionModify(id, newValue) - collection, err = (*node.replica).getCollectionByName("test") + collection, err = (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) } @@ -515,8 +521,9 @@ func TestMetaService_processModify(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -543,10 +550,10 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -558,7 +565,7 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - s, err := (*node.replica).getSegmentByID(UniqueID(0)) + s, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) @@ -588,7 +595,7 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processModify(key1, msg3) - collection, err = (*node.replica).getCollectionByName("test") + collection, err = (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -600,7 +607,7 @@ func TestMetaService_processModify(t *testing.T) { // TODO: modify segment for testing processCollectionModify (*node.metaService).processModify(key2, msg4) - seg, err := (*node.replica).getSegmentByID(UniqueID(0)) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) } @@ -613,8 +620,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -657,10 +665,10 @@ func TestMetaService_processSegmentDelete(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.replica).addPartition(UniqueID(0), "default") + err = (*node.container).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -671,12 +679,12 @@ func TestMetaService_processSegmentDelete(t *testing.T) { ` (*node.metaService).processSegmentCreate(id, value) - seg, err := (*node.replica).getSegmentByID(UniqueID(0)) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) (*node.metaService).processSegmentDelete("0") - mapSize := (*node.replica).getSegmentNum() + mapSize := (*node.container).getSegmentNum() assert.Equal(t, mapSize, 0) } @@ -688,8 +696,9 @@ func TestMetaService_processCollectionDelete(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) id := "0" value := `schema: < @@ -716,15 +725,15 @@ func TestMetaService_processCollectionDelete(t *testing.T) { ` (*node.metaService).processCollectionCreate(id, value) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) (*node.metaService).processCollectionDelete(id) - collectionNum = (*node.replica).getCollectionNum() + collectionNum = (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 0) } @@ -736,8 +745,9 @@ func TestMetaService_processDelete(t *testing.T) { defer cancel() // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -764,10 +774,10 @@ func TestMetaService_processDelete(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.replica).getCollectionNum() + collectionNum := (*node.container).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.replica).getCollectionByName("test") + collection, err := (*node.container).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -779,15 +789,15 @@ func TestMetaService_processDelete(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - seg, err := (*node.replica).getSegmentByID(UniqueID(0)) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) (*node.metaService).processDelete(key1) - collectionsSize := (*node.replica).getCollectionNum() + collectionsSize := (*node.container).getCollectionNum() assert.Equal(t, collectionsSize, 0) - mapSize := (*node.replica).getSegmentNum() + mapSize := (*node.container).getSegmentNum() assert.Equal(t, mapSize, 0) } @@ -805,8 +815,9 @@ func TestMetaService_processResp(t *testing.T) { } // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) metaChan := (*node.metaService).kvBase.WatchWithPrefix("") @@ -832,8 +843,9 @@ func TestMetaService_loadCollections(t *testing.T) { } // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) err2 := (*node.metaService).loadCollections() assert.Nil(t, err2) @@ -853,8 +865,9 @@ func TestMetaService_loadSegments(t *testing.T) { } // init metaService - node := NewQueryNode(ctx, 0) - node.metaService = newMetaService(ctx, node.replica) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) err2 := (*node.metaService).loadSegments() assert.Nil(t, err2) diff --git a/internal/reader/param_table_test.go b/internal/reader/param_table_test.go deleted file mode 100644 index e8acc50487..0000000000 --- a/internal/reader/param_table_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package reader - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestParamTable_QueryNodeID(t *testing.T) { - Params.InitParamTable() - id := Params.QueryNodeID() - assert.Equal(t, id, 0) -} - -func TestParamTable_TopicStart(t *testing.T) { - Params.InitParamTable() - topicStart := Params.TopicStart() - assert.Equal(t, topicStart, 0) -} - -func TestParamTable_TopicEnd(t *testing.T) { - Params.InitParamTable() - topicEnd := Params.TopicEnd() - assert.Equal(t, topicEnd, 128) -} diff --git a/internal/reader/param_table.go b/internal/reader/paramtable.go similarity index 64% rename from internal/reader/param_table.go rename to internal/reader/paramtable.go index ba13e99391..c60b5b2f87 100644 --- a/internal/reader/param_table.go +++ b/internal/reader/paramtable.go @@ -16,23 +16,6 @@ func (p *ParamTable) InitParamTable() { p.Init() } -func (p *ParamTable) PulsarAddress() (string, error) { - url, err := p.Load("_PulsarAddress") - if err != nil { - panic(err) - } - return "pulsar://" + url, nil -} - -func (p *ParamTable) QueryNodeID() int { - queryNodeID, _ := p.Load("reader.clientid") - id, err := strconv.Atoi(queryNodeID) - if err != nil { - panic(err) - } - return id -} - func (p *ParamTable) TopicStart() int { topicStart, _ := p.Load("reader.topicstart") topicStartNum, err := strconv.Atoi(topicStart) diff --git a/internal/reader/partition_test.go b/internal/reader/partition_test.go index 9c19eb0852..5311a21448 100644 --- a/internal/reader/partition_test.go +++ b/internal/reader/partition_test.go @@ -13,7 +13,8 @@ import ( func TestPartition_Segments(t *testing.T) { ctx := context.Background() - node := NewQueryNode(ctx, 0) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -56,17 +57,17 @@ func TestPartition_Segments(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.replica).addPartition(collection.ID(), tag) + err := (*node.container).addPartition(collection.ID(), tag) assert.NoError(t, err) } @@ -77,7 +78,7 @@ func TestPartition_Segments(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.replica).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID()) + err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID()) assert.NoError(t, err) } diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 464fe35908..de06ece2d0 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -14,14 +14,18 @@ import "C" import ( "context" + "sync" ) type QueryNode struct { ctx context.Context QueryNodeID uint64 + pulsarURL string - replica *collectionReplica + tSafe tSafe + + container *container dataSyncService *dataSyncService metaService *metaService @@ -29,21 +33,36 @@ type QueryNode struct { statsService *statsService } -func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { +type tSafe interface { + getTSafe() Timestamp + setTSafe(t Timestamp) +} + +type serviceTime struct { + tSafeMu sync.Mutex + time Timestamp +} + +func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode { segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) - var replica collectionReplica = &collectionReplicaImpl{ + var container container = &colSegContainer{ collections: collections, segments: segmentsMap, } + var tSafe tSafe = &serviceTime{} + return &QueryNode{ ctx: ctx, QueryNodeID: queryNodeID, + pulsarURL: pulsarURL, - replica: &replica, + tSafe: tSafe, + + container: &container, dataSyncService: nil, metaService: nil, @@ -53,10 +72,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { } func (node *QueryNode) Start() { - node.dataSyncService = newDataSyncService(node.ctx, node.replica) - node.searchService = newSearchService(node.ctx, node.replica) - node.metaService = newMetaService(node.ctx, node.replica) - node.statsService = newStatsService(node.ctx, node.replica) + node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) + node.searchService = newSearchService(node.ctx, node, node.pulsarURL) + node.metaService = newMetaService(node.ctx, node.container) + node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) go node.dataSyncService.start() // go node.searchService.start() @@ -67,3 +86,15 @@ func (node *QueryNode) Start() { func (node *QueryNode) Close() { // TODO: close services } + +func (st *serviceTime) getTSafe() Timestamp { + st.tSafeMu.Lock() + defer st.tSafeMu.Unlock() + return st.time +} + +func (st *serviceTime) setTSafe(t Timestamp) { + st.tSafeMu.Lock() + st.time = t + st.tSafeMu.Unlock() +} diff --git a/internal/reader/query_node_test.go b/internal/reader/query_node_test.go index dcae19f3fa..b7de266460 100644 --- a/internal/reader/query_node_test.go +++ b/internal/reader/query_node_test.go @@ -23,6 +23,10 @@ func TestQueryNode_start(t *testing.T) { ctx = context.Background() } - node := NewQueryNode(ctx, 0) + pulsarAddr, err := Params.PulsarAddress() + if err != nil { + panic(err) + } + node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr) node.Start() } diff --git a/internal/reader/reader.go b/internal/reader/reader.go index 91852eac13..d4d6b7a5b6 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -8,8 +8,8 @@ func Init() { Params.Init() } -func StartQueryNode(ctx context.Context) { - node := NewQueryNode(ctx, 0) +func StartQueryNode(ctx context.Context, pulsarURL string) { + node := NewQueryNode(ctx, 0, pulsarURL) node.Start() } diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index 0a08039bd4..332c1c8c81 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "log" "sort" "github.com/golang/protobuf/proto" @@ -20,7 +19,7 @@ type searchService struct { ctx context.Context cancel context.CancelFunc - replica *collectionReplica + node *QueryNode searchMsgStream *msgstream.MsgStream searchResultMsgStream *msgstream.MsgStream } @@ -32,29 +31,24 @@ type SearchResult struct { ResultDistances []float32 } -func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { +func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService { const ( //TODO:: read config file receiveBufSize = 1024 pulsarBufSize = 1024 ) - msgStreamURL, err := Params.PulsarAddress() - if err != nil { - log.Fatal(err) - } - consumeChannels := []string{"search"} consumeSubName := "subSearch" searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - searchStream.SetPulsarCient(msgStreamURL) + searchStream.SetPulsarCient(pulsarURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) var inputStream msgstream.MsgStream = searchStream producerChannels := []string{"searchResult"} searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - searchResultStream.SetPulsarCient(msgStreamURL) + searchResultStream.SetPulsarCient(pulsarURL) searchResultStream.CreatePulsarProducers(producerChannels) var outputStream msgstream.MsgStream = searchResultStream @@ -63,7 +57,7 @@ func newSearchService(ctx context.Context, replica *collectionReplica) *searchSe ctx: searchServiceCtx, cancel: searchServiceCancel, - replica: replica, + node: node, searchMsgStream: &inputStream, searchResultMsgStream: &outputStream, } @@ -126,7 +120,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error { } collectionName := query.CollectionName partitionTags := query.PartitionTags - collection, err := (*ss.replica).getCollectionByName(collectionName) + collection, err := (*ss.node.container).getCollectionByName(collectionName) if err != nil { return err } @@ -156,7 +150,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error { // 3. Do search in all segments for _, partitionTag := range partitionTags { - partition, err := (*ss.replica).getPartitionByTag(collectionID, partitionTag) + partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag) if err != nil { return err } diff --git a/internal/reader/search_service_test.go b/internal/reader/search_service_test.go index b8e9fcffcb..42ab772b78 100644 --- a/internal/reader/search_service_test.go +++ b/internal/reader/search_service_test.go @@ -21,13 +21,12 @@ import ( ) func TestSearch_Search(t *testing.T) { - Params.Init() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0) + node := NewQueryNode(ctx, 0, pulsarURL) // init meta collectionName := "collection0" @@ -71,20 +70,20 @@ func TestSearch_Search(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // test data generate @@ -163,7 +162,7 @@ func TestSearch_Search(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.ctx, node.replica) + node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) go node.dataSyncService.start() time.Sleep(2 * time.Second) @@ -234,7 +233,7 @@ func TestSearch_Search(t *testing.T) { err = searchMsgStream.Produce(&msgPackSearch) assert.NoError(t, err) - node.searchService = newSearchService(node.ctx, node.replica) + node.searchService = newSearchService(node.ctx, node, node.pulsarURL) go node.searchService.start() time.Sleep(2 * time.Second) diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 9f118e1dd1..1487ceee4a 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -13,17 +13,21 @@ import ( ) type statsService struct { - ctx context.Context - statsStream *msgstream.MsgStream - replica *collectionReplica + ctx context.Context + pulsarURL string + + msgStream *msgstream.MsgStream + + container *container } -func newStatsService(ctx context.Context, replica *collectionReplica) *statsService { +func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService { return &statsService{ - ctx: ctx, - statsStream: nil, - replica: replica, + ctx: ctx, + pulsarURL: pulsarURL, + msgStream: nil, + container: container, } } @@ -34,20 +38,16 @@ func (sService *statsService) start() { ) // start pulsar - msgStreamURL, err := Params.PulsarAddress() - if err != nil { - log.Fatal(err) - } producerChannels := []string{"statistic"} statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) - statsStream.SetPulsarCient(msgStreamURL) + statsStream.SetPulsarCient(sService.pulsarURL) statsStream.CreatePulsarProducers(producerChannels) var statsMsgStream msgstream.MsgStream = statsStream - sService.statsStream = &statsMsgStream - (*sService.statsStream).Start() + sService.msgStream = &statsMsgStream + (*sService.msgStream).Start() // start service fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") @@ -62,7 +62,7 @@ func (sService *statsService) start() { } func (sService *statsService) sendSegmentStatistic() { - statisticData := (*sService.replica).getSegmentStatistics() + statisticData := (*sService.container).getSegmentStatistics() // fmt.Println("Publish segment statistic") // fmt.Println(statisticData) @@ -80,7 +80,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg var msgPack = msgstream.MsgPack{ Msgs: []msgstream.TsMsg{msg}, } - err := (*sService.statsStream).Produce(&msgPack) + err := (*sService.msgStream).Produce(&msgPack) if err != nil { log.Println(err) } diff --git a/internal/reader/stats_service_test.go b/internal/reader/stats_service_test.go index 7e1dfb23fe..d9f6ef04f7 100644 --- a/internal/reader/stats_service_test.go +++ b/internal/reader/stats_service_test.go @@ -15,87 +15,6 @@ import ( // NOTE: start pulsar before test func TestStatsService_start(t *testing.T) { - Params.Init() - var ctx context.Context - - if closeWithDeadline { - var cancel context.CancelFunc - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - defer cancel() - } else { - ctx = context.Background() - } - - // init query node - node := NewQueryNode(ctx, 0) - - // init meta - collectionName := "collection0" - fieldVec := schemapb.FieldSchema{ - Name: "vec", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "16", - }, - }, - } - - fieldInt := schemapb.FieldSchema{ - Name: "age", - DataType: schemapb.DataType_INT32, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "1", - }, - }, - } - - schema := schemapb.CollectionSchema{ - Name: collectionName, - Fields: []*schemapb.FieldSchema{ - &fieldVec, &fieldInt, - }, - } - - collectionMeta := etcdpb.CollectionMeta{ - ID: UniqueID(0), - Schema: &schema, - CreateTime: Timestamp(0), - SegmentIDs: []UniqueID{0}, - PartitionTags: []string{"default"}, - } - - collectionMetaBlob := proto.MarshalTextString(&collectionMeta) - assert.NotEqual(t, "", collectionMetaBlob) - - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) - assert.NoError(t, err) - - collection, err := (*node.replica).getCollectionByName(collectionName) - assert.NoError(t, err) - assert.Equal(t, collection.meta.Schema.Name, "collection0") - assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) - - err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) - assert.NoError(t, err) - - segmentID := UniqueID(0) - err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) - assert.NoError(t, err) - - // start stats service - node.statsService = newStatsService(node.ctx, node.replica) - node.statsService.start() -} - -// NOTE: start pulsar before test -func TestSegmentManagement_SegmentStatisticService(t *testing.T) { - Params.Init() var ctx context.Context if closeWithDeadline { @@ -109,7 +28,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0) + node := NewQueryNode(ctx, 0, pulsarURL) // init meta collectionName := "collection0" @@ -153,20 +72,100 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.replica).getCollectionByName(collectionName) + collection, err := (*node.container).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.replica).getCollectionNum(), 1) + assert.Equal(t, (*node.container).getCollectionNum(), 1) - err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + assert.NoError(t, err) + + // start stats service + node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.statsService.start() +} + +// NOTE: start pulsar before test +func TestSegmentManagement_SegmentStatisticService(t *testing.T) { + var ctx context.Context + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init query node + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + + // init meta + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.container).getCollectionByName(collectionName) + assert.NoError(t, err) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.ID, UniqueID(0)) + assert.Equal(t, (*node.container).getCollectionNum(), 1) + + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + segmentID := UniqueID(0) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) const receiveBufSize = 1024 @@ -179,9 +178,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { var statsMsgStream msgstream.MsgStream = statsStream - node.statsService = newStatsService(node.ctx, node.replica) - node.statsService.statsStream = &statsMsgStream - (*node.statsService.statsStream).Start() + node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.statsService.msgStream = &statsMsgStream + (*node.statsService.msgStream).Start() // send stats node.statsService.sendSegmentStatistic() diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 79b49242e4..822842c66c 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -2,7 +2,6 @@ package flowgraph import ( "context" - "log" "sync" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -69,15 +68,6 @@ func (fg *TimeTickedFlowGraph) Start() { func (fg *TimeTickedFlowGraph) Close() { for _, v := range fg.nodeCtx { - // close message stream - if (*v.node).IsInputNode() { - inStream, ok := (*v.node).(*InputNode) - if !ok { - log.Fatal("Invalid inputNode") - } - (*inStream.inStream).Close() - } - // close input channels v.Close() } }