diff --git a/.github/workflows/code-checker.yaml b/.github/workflows/code-checker.yaml deleted file mode 100644 index 9f14992b99..0000000000 --- a/.github/workflows/code-checker.yaml +++ /dev/null @@ -1,53 +0,0 @@ -name: Code Checker -# TODO: do not trigger action for some document file update - -# This workflow is triggered on pushes or pull request to the repository. -on: - push: - # file paths to consider in the event. Optional; defaults to all. - paths: - - 'scripts/**' - - 'internal/**' - - 'cmd/**' - - 'build/**' - - '.github/workflows/code-checker.yaml' - - '.env' - - docker-compose.yml - - Makefile - - '!**.md' - pull_request: - # file paths to consider in the event. Optional; defaults to all. - paths: - - 'scripts/**' - - 'internal/**' - - 'cmd/**' - - 'build/**' - - '.github/workflows/code-checker.yaml' - - '.env' - - docker-compose.yml - - Makefile - - '!**.md' - -jobs: - ubuntu: - name: AMD64 Ubuntu ${{ matrix.ubuntu }} - runs-on: ubuntu-latest - timeout-minutes: 60 - strategy: - fail-fast: false - matrix: - ubuntu: [18.04] - env: - UBUNTU: ${{ matrix.ubuntu }} - steps: - - name: Checkout - uses: actions/checkout@v2 - - name: Dockerfile Lint - uses: reviewdog/action-hadolint@v1 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - reporter: github-pr-check # Default is github-pr-check - hadolint_ignore: DL3008 - - name: Code Check - run: | - ./build/builder.sh /bin/bash -c "make check-proto-product && make verifiers" diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index b764b9641c..4522df6653 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -42,6 +42,12 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + - name: Check Dockerfile + uses: reviewdog/action-hadolint@v1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + reporter: github-pr-check # Default is github-pr-check + hadolint_ignore: DL3008 - name: Cache Docker Volumes uses: actions/cache@v1 with: @@ -56,4 +62,4 @@ jobs: env: CHECK_BUILDER: "1" run: | - ./build/builder.sh /bin/bash -c "make check-proto-product && make unittest" + ./build/builder.sh diff --git a/go.mod b/go.mod index 6bf3a79b3b..8da99987a9 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 + github.com/json-iterator/go v1.1.10 github.com/klauspost/compress v1.10.11 // indirect github.com/kr/text v0.2.0 // indirect github.com/minio/minio-go/v7 v7.0.5 @@ -25,7 +26,7 @@ require ( github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 github.com/prometheus/client_golang v1.5.1 // indirect - github.com/prometheus/common v0.10.0 // indirect + github.com/prometheus/common v0.10.0 github.com/prometheus/procfs v0.1.3 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spaolacci/murmur3 v1.1.0 diff --git a/internal/proxy/paramtable.go b/internal/proxy/paramtable.go index 0ad0775a2c..f419c72991 100644 --- a/internal/proxy/paramtable.go +++ b/internal/proxy/paramtable.go @@ -2,7 +2,6 @@ package proxy import ( "log" - "net" "os" "strconv" "strings" @@ -39,25 +38,6 @@ func (pt *ParamTable) Init() { pt.Save("_proxyID", proxyIDStr) } -func (pt *ParamTable) NetWorkAddress() string { - addr, err := pt.Load("proxy.network.address") - if err != nil { - panic(err) - } - if ip := net.ParseIP(addr); ip == nil { - panic("invalid ip proxy.network.address") - } - port, err := pt.Load("proxy.network.port") - if err != nil { - panic(err) - } - _, err = strconv.Atoi(port) - if err != nil { - panic(err) - } - return addr + ":" + port -} - func (pt *ParamTable) MasterAddress() string { ret, err := pt.Load("_MasterAddress") if err != nil { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 02e724f821..3b6b3ba223 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -135,7 +135,8 @@ func (p *Proxy) AddCloseCallback(callbacks ...func()) { func (p *Proxy) grpcLoop() { defer p.proxyLoopWg.Done() - lis, err := net.Listen("tcp", Params.NetWorkAddress()) + // TODO: use address in config instead + lis, err := net.Listen("tcp", ":5053") if err != nil { log.Fatalf("Proxy grpc server fatal error=%v", err) } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 09c299bc5f..1c324d2fd1 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -6,7 +6,6 @@ import ( "log" "os" "strconv" - "strings" "sync" "testing" @@ -25,6 +24,7 @@ import ( var ctx context.Context var cancel func() +var proxyAddress = "127.0.0.1:5053" var proxyConn *grpc.ClientConn var proxyClient servicepb.MilvusServiceClient @@ -81,13 +81,8 @@ func setup() { startMaster(ctx) startProxy(ctx) - proxyAddr := Params.NetWorkAddress() - addr := strings.Split(proxyAddr, ":") - if addr[0] == "0.0.0.0" { - proxyAddr = "127.0.0.1:" + addr[1] - } - conn, err := grpc.DialContext(ctx, proxyAddr, grpc.WithInsecure(), grpc.WithBlock()) + conn, err := grpc.DialContext(ctx, proxyAddress, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("Connect to proxy failed, error= %v", err) } diff --git a/internal/reader/collection_replica.go b/internal/reader/collection_replica.go index b1d17c33a8..823f200344 100644 --- a/internal/reader/collection_replica.go +++ b/internal/reader/collection_replica.go @@ -13,6 +13,7 @@ package reader import "C" import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "log" "strconv" "sync" @@ -37,13 +38,18 @@ type collectionReplica interface { removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) + hasCollection(collectionID UniqueID) bool // partition // Partition tags in different collections are not unique, // so partition api should specify the target collection. + getPartitionNum(collectionID UniqueID) (int, error) addPartition(collectionID UniqueID, partitionTag string) error removePartition(collectionID UniqueID, partitionTag string) error + addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error + removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) + hasPartition(collectionID UniqueID, partitionTag string) bool // segment getSegmentNum() int @@ -52,8 +58,6 @@ type collectionReplica interface { removeSegment(segmentID UniqueID) error getSegmentByID(segmentID UniqueID) (*Segment, error) hasSegment(segmentID UniqueID) bool - - freeAll() } type collectionReplicaImpl struct { @@ -142,7 +146,31 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri return nil, errors.New("Cannot found collection: " + collectionName) } +func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + for _, col := range colReplica.collections { + if col.ID() == collectionID { + return true + } + } + return false +} + //----------------------------------------------------------------------------------------------------- partition +func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) (int, error) { + collection, err := colReplica.getCollectionByID(collectionID) + if err != nil { + return -1, err + } + + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + return len(collection.partitions), nil +} + func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { collection, err := colReplica.getCollectionByID(collectionID) if err != nil { @@ -182,6 +210,61 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, return nil } +func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { + if !colReplica.hasCollection(colMeta.ID) { + err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10)) + return err + } + pToAdd := make([]string, 0) + for _, partitionTag := range colMeta.PartitionTags { + if !colReplica.hasPartition(colMeta.ID, partitionTag) { + pToAdd = append(pToAdd, partitionTag) + } + } + + for _, tag := range pToAdd { + err := colReplica.addPartition(colMeta.ID, tag) + if err != nil { + log.Println(err) + } + } + + return nil +} + +func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error { + col, err := colReplica.getCollectionByID(colMeta.ID) + if err != nil { + return err + } + + colReplica.mu.Lock() + + pToDel := make([]string, 0) + for _, partition := range col.partitions { + hasPartition := false + for _, tag := range colMeta.PartitionTags { + if partition.partitionTag == tag { + hasPartition = true + } + } + if !hasPartition { + pToDel = append(pToDel, partition.partitionTag) + } + } + + colReplica.mu.Unlock() + + for _, tag := range pToDel { + err := colReplica.removePartition(col.ID(), tag) + if err != nil { + log.Println(err) + } + } + + return nil +} + func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { collection, err := colReplica.getCollectionByID(collectionID) if err != nil { @@ -200,6 +283,25 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID return nil, errors.New("cannot find partition, tag = " + partitionTag) } +func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool { + collection, err := colReplica.getCollectionByID(collectionID) + if err != nil { + log.Println(err) + return false + } + + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + for _, p := range *collection.Partitions() { + if p.Tag() == partitionTag { + return true + } + } + + return false +} + //----------------------------------------------------------------------------------------------------- segment func (colReplica *collectionReplicaImpl) getSegmentNum() int { colReplica.mu.RLock() @@ -303,13 +405,3 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { return ok } - -//----------------------------------------------------------------------------------------------------- -func (colReplica *collectionReplicaImpl) freeAll() { - for _, seg := range colReplica.segments { - deleteSegment(seg) - } - for _, col := range colReplica.collections { - deleteCollection(col) - } -} diff --git a/internal/reader/collection_replica_test.go b/internal/reader/collection_replica_test.go index 864cd38ed6..f69d05ed8e 100644 --- a/internal/reader/collection_replica_test.go +++ b/internal/reader/collection_replica_test.go @@ -13,6 +13,57 @@ import ( ) //----------------------------------------------------------------------------------------------------- collection +func TestColSegContainer_getCollectionNum(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + assert.Equal(t, (*node.replica).getCollectionNum(), 1) +} + func TestColSegContainer_addCollection(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -253,7 +304,130 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { assert.Equal(t, targetCollection.meta.ID, UniqueID(0)) } +func TestColSegContainer_hasCollection(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + hasCollection := (*node.replica).hasCollection(UniqueID(0)) + assert.Equal(t, hasCollection, true) + hasCollection = (*node.replica).hasCollection(UniqueID(1)) + assert.Equal(t, hasCollection, false) +} + //----------------------------------------------------------------------------------------------------- partition +func TestColSegContainer_getPartitionNum(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + for _, tag := range collectionMeta.PartitionTags { + err := (*node.replica).addPartition(collectionID, tag) + assert.NoError(t, err) + partition, err := (*node.replica).getPartitionByTag(collectionID, tag) + assert.NoError(t, err) + assert.Equal(t, partition.partitionTag, "default") + } + + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 1) +} + func TestColSegContainer_addPartition(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -387,6 +561,148 @@ func TestColSegContainer_removePartition(t *testing.T) { } } +func TestColSegContainer_addPartitionsByCollectionMeta(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"p0"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + collectionMeta.PartitionTags = []string{"p0", "p1", "p2"} + + err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta) + assert.NoError(t, err) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) +} + +func TestColSegContainer_removePartitionsByCollectionMeta(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"p0", "p1", "p2"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + collectionMeta.PartitionTags = []string{"p0"} + + err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta) + assert.NoError(t, err) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 1) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, false) +} + func TestColSegContainer_getPartitionByTag(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0) @@ -453,6 +769,70 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { } } +func TestColSegContainer_hasPartition(t *testing.T) { + ctx := context.Background() + node := NewQueryNode(ctx, 0) + + collectionName := "collection0" + collectionID := UniqueID(0) + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: "collection0", + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.replica).getCollectionByName(collectionName) + assert.NoError(t, err) + + assert.Equal(t, collection.meta.Schema.Name, collectionName) + assert.Equal(t, collection.meta.ID, collectionID) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) + + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + hasPartition := (*node.replica).hasPartition(UniqueID(0), "default") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "default1") + assert.Equal(t, hasPartition, false) +} + //----------------------------------------------------------------------------------------------------- segment func TestColSegContainer_addSegment(t *testing.T) { ctx := context.Background() diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index ef746e6521..ffc24e382c 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -30,9 +30,7 @@ func (dsService *dataSyncService) start() { } func (dsService *dataSyncService) close() { - if dsService.fg != nil { - dsService.fg.Close() - } + dsService.fg.Close() } func (dsService *dataSyncService) initNodes() { diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 535f45b88d..620cdd181f 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -214,6 +214,18 @@ func (mService *metaService) processSegmentModify(id string, value string) { func (mService *metaService) processCollectionModify(id string, value string) { println("Modify Collection: ", id) + + col := mService.collectionUnmarshal(value) + if col != nil { + err := (*mService.replica).addPartitionsByCollectionMeta(col) + if err != nil { + log.Println(err) + } + err = (*mService.replica).removePartitionsByCollectionMeta(col) + if err != nil { + log.Println(err) + } + } } func (mService *metaService) processModify(key string, msg string) { diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index 6c0470891a..df80c4a9d5 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -452,7 +452,9 @@ func TestMetaService_processCollectionModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p0" + partition_tags: "p1" + partition_tags: "p2" ` (*node.metaService).processCollectionCreate(id, value) @@ -463,7 +465,19 @@ func TestMetaService_processCollectionModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - // TODO: use different index for testing processCollectionModify + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, false) + newValue := `schema: < name: "test" fields: < @@ -484,13 +498,28 @@ func TestMetaService_processCollectionModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p1" + partition_tags: "p2" + partition_tags: "p3" ` (*node.metaService).processCollectionModify(id, newValue) collection, err = (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) + + partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, true) } func TestMetaService_processModify(t *testing.T) { @@ -523,7 +552,9 @@ func TestMetaService_processModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p0" + partition_tags: "p1" + partition_tags: "p2" ` (*node.metaService).processCreate(key1, msg1) @@ -534,8 +565,21 @@ func TestMetaService_processModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) + partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, false) + key2 := "by-dev/segment/0" - msg2 := `partition_tag: "default" + msg2 := `partition_tag: "p1" channel_start: 0 channel_end: 128 close_time: 18446744073709551615 @@ -568,7 +612,9 @@ func TestMetaService_processModify(t *testing.T) { > > segmentIDs: 0 - partition_tags: "default" + partition_tags: "p1" + partition_tags: "p2" + partition_tags: "p3" ` (*node.metaService).processModify(key1, msg3) @@ -576,13 +622,25 @@ func TestMetaService_processModify(t *testing.T) { assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) - msg4 := `partition_tag: "default" + partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, partitionNum, 3) + + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0") + assert.Equal(t, hasPartition, false) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2") + assert.Equal(t, hasPartition, true) + hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") + assert.Equal(t, hasPartition, true) + + msg4 := `partition_tag: "p1" channel_start: 0 channel_end: 128 close_time: 18446744073709551615 ` - // TODO: modify segment for testing processCollectionModify (*node.metaService).processModify(key2, msg4) seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index c1d18769cc..a98f69f68a 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -69,18 +69,5 @@ func (node *QueryNode) Start() { } func (node *QueryNode) Close() { - <-node.ctx.Done() - // free collectionReplica - (*node.replica).freeAll() - - // close services - if node.dataSyncService != nil { - (*node.dataSyncService).close() - } - if node.searchService != nil { - (*node.searchService).close() - } - if node.statsService != nil { - (*node.statsService).close() - } + // TODO: close services } diff --git a/internal/reader/search_service_test.go b/internal/reader/search_service_test.go index 5d2fd7d911..8e9c4e9285 100644 --- a/internal/reader/search_service_test.go +++ b/internal/reader/search_service_test.go @@ -23,6 +23,7 @@ import ( func TestSearch_Search(t *testing.T) { Params.Init() ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // init query node pulsarURL, _ := Params.pulsarAddress() @@ -239,6 +240,6 @@ func TestSearch_Search(t *testing.T) { time.Sleep(2 * time.Second) - cancel() + node.searchService.close() node.Close() } diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index ddc241bd01..859c4786b7 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -59,10 +59,6 @@ func (sService *statsService) start() { } } -func (sService *statsService) close() { - (*sService.statsStream).Close() -} - func (sService *statsService) sendSegmentStatistic() { statisticData := (*sService.replica).getSegmentStatistics() diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 83448cd813..79b49242e4 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -77,6 +77,8 @@ func (fg *TimeTickedFlowGraph) Close() { } (*inStream.inStream).Close() } + // close input channels + v.Close() } } diff --git a/scripts/core_build.sh b/scripts/core_build.sh index 7edd18ed2e..2448254a67 100755 --- a/scripts/core_build.sh +++ b/scripts/core_build.sh @@ -154,12 +154,12 @@ if [[ ${RUN_CPPLINT} == "ON" ]]; then echo "clang-format check passed!" # clang-tidy check - # make check-clang-tidy || true - # if [ $? -ne 0 ]; then - # echo "ERROR! clang-tidy check failed" - # exit 1 - # fi - # echo "clang-tidy check passed!" + make check-clang-tidy || true + if [ $? -ne 0 ]; then + echo "ERROR! clang-tidy check failed" + exit 1 + fi + echo "clang-tidy check passed!" else # compile and build make -j ${jobs} install || exit 1