diff --git a/.golangci.yml b/.golangci.yml index 5b6ef1927a..a16fa8c568 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,6 +6,7 @@ run: - docs - scripts - internal/core + - internal/proto linters-settings: golint: diff --git a/Makefile b/Makefile index 8c8b572679..ef15f22e67 100644 --- a/Makefile +++ b/Makefile @@ -36,9 +36,9 @@ fmt: lint: @echo "Running $@ check" @GO111MODULE=on ${GOPATH}/bin/golangci-lint cache clean - @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./internal/... || true - @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./cmd/... || true - @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./test/... || true + @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./internal/... + @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./cmd/... + @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./test/... ruleguard: @echo "Running $@ check" diff --git a/docker-compose.yml b/docker-compose.yml index 7ac9b4b148..a30b70e891 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,4 +25,4 @@ services: working_dir: "/milvus-distributed" command: &ubuntu-command > /bin/bash -c " - make verifiers && make unittest" + make build-cpp && make verifiers && make unittest" diff --git a/internal/kv/etcd_kv_test.go b/internal/kv/etcd_kv_test.go index 9cfafe4f58..f3f84b7102 100644 --- a/internal/kv/etcd_kv_test.go +++ b/internal/kv/etcd_kv_test.go @@ -2,19 +2,19 @@ package kv import ( "context" - "github.com/zilliztech/milvus-distributed/internal/conf" "path" "strconv" "testing" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" "go.etcd.io/etcd/clientv3" ) func TestEtcdKV_Load(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) rootpath := "/etcd/test/root" kv := NewEtcdKV(cli, rootpath) @@ -69,8 +69,8 @@ func TestEtcdKV_Load(t *testing.T) { func TestEtcdKV_MultiSave(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) rootpath := "/etcd/test/root" kv := NewEtcdKV(cli, rootpath) @@ -98,8 +98,8 @@ func TestEtcdKV_MultiSave(t *testing.T) { func TestEtcdKV_Remove(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) rootpath := "/etcd/test/root" kv := NewEtcdKV(cli, rootpath) @@ -167,8 +167,8 @@ func TestEtcdKV_Remove(t *testing.T) { func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) rootpath := "/etcd/test/root" kv := NewEtcdKV(cli, rootpath) diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index 55b50d1708..0c0d682baa 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -74,7 +74,7 @@ func (t *createCollectionTask) Execute() error { return err } - collectionID, err := allocGlobalId() + collectionID, err := allocGlobalID() if err != nil { return err } diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index ec14cb0a08..096c03af59 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -2,11 +2,11 @@ package master import ( "context" - "github.com/zilliztech/milvus-distributed/internal/master/tso" "time" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/master/id" + "github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 8fdbb4bb99..112e0c916e 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -2,12 +2,12 @@ package master import ( "context" - "github.com/zilliztech/milvus-distributed/internal/conf" "strconv" "testing" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -21,15 +21,15 @@ func TestMaster_CreateCollection(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - etcd_addr := "127.0.0.1:" + etcd_port + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + etcdAddr := "127.0.0.1:" + etcdPort - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcd_addr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcd_addr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(10001) assert.Nil(t, err) @@ -96,7 +96,7 @@ func TestMaster_CreateCollection(t *testing.T) { }, }, } - schema_bytes, err := proto.Marshal(&sch) + schemaBytes, err := proto.Marshal(&sch) assert.Nil(t, err) req := internalpb.CreateCollectionRequest{ @@ -104,44 +104,44 @@ func TestMaster_CreateCollection(t *testing.T) { ReqId: 1, Timestamp: 11, ProxyId: 1, - Schema: &commonpb.Blob{Value: schema_bytes}, + Schema: &commonpb.Blob{Value: schemaBytes}, } st, err := cli.CreateCollection(ctx, &req) assert.Nil(t, err) assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - coll_meta, err := svr.mt.GetCollectionByName(sch.Name) + collMeta, err := svr.mt.GetCollectionByName(sch.Name) assert.Nil(t, err) - t.Logf("collection id = %d", coll_meta.ID) - assert.Equal(t, coll_meta.CreateTime, uint64(11)) - assert.Equal(t, coll_meta.Schema.Name, "col1") - assert.Equal(t, coll_meta.Schema.AutoId, false) - assert.Equal(t, len(coll_meta.Schema.Fields), 2) - assert.Equal(t, coll_meta.Schema.Fields[0].Name, "col1_f1") - assert.Equal(t, coll_meta.Schema.Fields[1].Name, "col1_f2") - assert.Equal(t, coll_meta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) - assert.Equal(t, coll_meta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) - assert.Equal(t, len(coll_meta.Schema.Fields[0].TypeParams), 2) - assert.Equal(t, len(coll_meta.Schema.Fields[0].IndexParams), 2) - assert.Equal(t, len(coll_meta.Schema.Fields[1].TypeParams), 2) - assert.Equal(t, len(coll_meta.Schema.Fields[1].IndexParams), 2) - assert.Equal(t, coll_meta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") - assert.Equal(t, coll_meta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") - assert.Equal(t, coll_meta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") - assert.Equal(t, coll_meta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") - assert.Equal(t, coll_meta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") - assert.Equal(t, coll_meta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") - assert.Equal(t, coll_meta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") - assert.Equal(t, coll_meta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") + t.Logf("collection id = %d", collMeta.ID) + assert.Equal(t, collMeta.CreateTime, uint64(11)) + assert.Equal(t, collMeta.Schema.Name, "col1") + assert.Equal(t, collMeta.Schema.AutoId, false) + assert.Equal(t, len(collMeta.Schema.Fields), 2) + assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") + assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") + assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) + assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) + assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") - assert.Equal(t, coll_meta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") - assert.Equal(t, coll_meta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") - assert.Equal(t, coll_meta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") - assert.Equal(t, coll_meta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") - assert.Equal(t, coll_meta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") - assert.Equal(t, coll_meta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") - assert.Equal(t, coll_meta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") - assert.Equal(t, coll_meta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") req.Timestamp = Timestamp(10) st, err = cli.CreateCollection(ctx, &req) diff --git a/internal/master/master.go b/internal/master/master.go index b84e93b1e9..132fd91ca0 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -73,7 +73,7 @@ func Init() { } // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, kv_root_path string, meta_root_path, tso_root_path string, etcdAddr []string) (*Master, error) { +func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) { rand.Seed(time.Now().UnixNano()) Init() @@ -81,7 +81,7 @@ func CreateServer(ctx context.Context, kv_root_path string, meta_root_path, tso_ if err != nil { return nil, err } - etcdkv := kv.NewEtcdKV(etcdClient, meta_root_path) + etcdkv := kv.NewEtcdKV(etcdClient, metaRootPath) metakv, err := NewMetaTable(etcdkv) if err != nil { return nil, err @@ -90,7 +90,7 @@ func CreateServer(ctx context.Context, kv_root_path string, meta_root_path, tso_ m := &Master{ ctx: ctx, startTimestamp: time.Now().Unix(), - kvBase: newKVBase(kv_root_path, etcdAddr), + kvBase: newKVBase(kvRootPath, etcdAddr), scheduler: NewDDRequestScheduler(), mt: metakv, ssChan: make(chan internalpb.SegmentStatistics, 10), diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 1be5bdd1cb..b7cf6fb3ae 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -2,11 +2,11 @@ package master import ( "context" - "github.com/zilliztech/milvus-distributed/internal/conf" "strconv" "testing" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/kv" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -15,8 +15,8 @@ import ( func TestMetaTable_Collection(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -138,8 +138,8 @@ func TestMetaTable_Collection(t *testing.T) { func TestMetaTable_DeletePartition(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -221,8 +221,8 @@ func TestMetaTable_DeletePartition(t *testing.T) { func TestMetaTable_Segment(t *testing.T) { conf.LoadConfig("config.yaml") - etcd_port := strconv.Itoa(int(conf.Config.Etcd.Port)) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcd_port}}) + etcdPort := strconv.Itoa(int(conf.Config.Etcd.Port)) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") diff --git a/internal/master/scheduler.go b/internal/master/scheduler.go index 6dcc3bc1df..031c98801d 100644 --- a/internal/master/scheduler.go +++ b/internal/master/scheduler.go @@ -21,7 +21,7 @@ func (rs *ddRequestScheduler) Enqueue(task task) error { return nil } -//TODO, allocGlobalId -func allocGlobalId() (UniqueID, error) { +//TODO, allocGlobalID +func allocGlobalID() (UniqueID, error) { return rand.Int63(), nil } diff --git a/internal/master/timesync/time_snyc_producer_test.go b/internal/master/timesync/time_snyc_producer_test.go index 4c46137fea..2d8687690c 100644 --- a/internal/master/timesync/time_snyc_producer_test.go +++ b/internal/master/timesync/time_snyc_producer_test.go @@ -2,11 +2,12 @@ package timesync import ( "context" - "github.com/stretchr/testify/assert" - ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "log" "testing" "time" + + "github.com/stretchr/testify/assert" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type ( @@ -24,15 +25,8 @@ func (ttBarrier *TestTickBarrier) GetTimeTick() (Timestamp, error) { func (ttBarrier *TestTickBarrier) Start() error { go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - { - log.Printf("barrier context done, exit") - return - } - } - } + <-ctx.Done() + log.Printf("barrier context done, exit") }(ttBarrier.ctx) return nil } @@ -90,7 +84,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { producerChannels := []string{"proxyTtBarrier"} consumerChannels := []string{"proxyTtBarrier"} consumerSubName := "proxyTtBarrier" - ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() proxyTtInputStream, proxyTtOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName) producerChannels = []string{"writeNodeBarrier"} @@ -107,10 +102,10 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { (*writeNodeOutputStream).Start() timeSyncProducer.Start() expected := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - result_1 := receiveMsg(proxyTtOutputStream) - assert.Equal(t, expected, result_1) - result_2 := receiveMsg(writeNodeOutputStream) - assert.Equal(t, expected, result_2) + result1 := receiveMsg(proxyTtOutputStream) + assert.Equal(t, expected, result1) + result2 := receiveMsg(writeNodeOutputStream) + assert.Equal(t, expected, result2) timeSyncProducer.Close() } diff --git a/internal/master/timesync/time_sync_producer.go b/internal/master/timesync/time_sync_producer.go index da95734802..c4629179db 100644 --- a/internal/master/timesync/time_sync_producer.go +++ b/internal/master/timesync/time_sync_producer.go @@ -1,12 +1,12 @@ package timesync import ( + "context" + "log" + "github.com/zilliztech/milvus-distributed/internal/errors" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "log" - - "context" ) type timeSyncMsgProducer struct { diff --git a/internal/master/timesync/timesync.go b/internal/master/timesync/timesync.go index 9955d64616..09a4de643a 100644 --- a/internal/master/timesync/timesync.go +++ b/internal/master/timesync/timesync.go @@ -76,7 +76,7 @@ func (ttBarrier *softTimeTickBarrier) Start() error { log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerId, ttmsg.Timestamp) if !ok { - log.Printf("[softTimeTickBarrier] Warning: peerId %d not exist\n", ttmsg.PeerId) + log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.PeerId) continue } @@ -108,7 +108,7 @@ func NewSoftTimeTickBarrier(ctx context.Context, minTtInterval Timestamp) *softTimeTickBarrier { if len(peerIds) <= 0 { - log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is emtpy!\n") + log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is empty!\n") return nil } @@ -193,7 +193,7 @@ func (ttBarrier *hardTimeTickBarrier) Start() error { oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerId] if !ok { - log.Printf("[hardTimeTickBarrier] Warning: peerId %d not exist\n", ttmsg.PeerId) + log.Printf("[hardTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.PeerId) continue } @@ -233,7 +233,7 @@ func NewHardTimeTickBarrier(ctx context.Context, peerIds []UniqueID) *hardTimeTickBarrier { if len(peerIds) <= 0 { - log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is emtpy!") + log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is empty!") return nil } @@ -259,5 +259,4 @@ func (ttBarrier *hardTimeTickBarrier) Close() { ttBarrier.closeCh <- struct{}{} } ttBarrier.closed = true - return } diff --git a/internal/master/timesync/timesync_test.go b/internal/master/timesync/timesync_test.go index 6deb874e08..a7f9208f5b 100644 --- a/internal/master/timesync/timesync_test.go +++ b/internal/master/timesync/timesync_test.go @@ -9,18 +9,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) -func getTtMsg(msgType internalPb.MsgType, peerId UniqueID, timeStamp uint64) *ms.TsMsg { +func getTtMsg(msgType internalPb.MsgType, peerID UniqueID, timeStamp uint64) *ms.TsMsg { var tsMsg ms.TsMsg baseMsg := ms.BaseMsg{ - HashValues: []int32{int32(peerId)}, + HashValues: []int32{int32(peerID)}, } timeTickResult := internalPb.TimeTickMsg{ MsgType: internalPb.MsgType_kTimeTick, - PeerId: peerId, + PeerId: peerID, Timestamp: timeStamp, } timeTickMsg := &ms.TimeTickMsg{ @@ -393,6 +394,8 @@ func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { // This will stuck ts, err = sttbarrierStuck.GetTimeTick() + assert.NotNil(t, err) + assert.Equal(t, Timestamp(0), ts) // ---------------------context cancel------------------------ channelsCancel := []string{"HardTtBarrierGetTimeTickCancel"} @@ -424,5 +427,6 @@ func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { // This will stuck ts, err = sttbarrierCancel.GetTimeTick() - + assert.NotNil(t, err) + assert.Equal(t, Timestamp(0), ts) } diff --git a/internal/msgstream/msgstream_test.go b/internal/msgstream/msgstream_test.go index c168961cc5..5ade8b6ff7 100644 --- a/internal/msgstream/msgstream_test.go +++ b/internal/msgstream/msgstream_test.go @@ -464,7 +464,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { Timestamp: uint64(1), } timeTick := &TimeTickMsg{ - BaseMsg: baseMsg, + BaseMsg: baseMsg, TimeTickMsg: timeTickRequest, } var tsMsg TsMsg = timeTick diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index dc64ab975c..0e65beee93 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -74,11 +74,11 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { return nil } -func (queue *BaseTaskQueue) getTaskByReqID(reqId UniqueID) task { +func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task { queue.utLock.Lock() defer queue.utLock.Lock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { - if e.Value.(task).ID() == reqId { + if e.Value.(task).ID() == reqID { return e.Value.(task) } } @@ -86,7 +86,7 @@ func (queue *BaseTaskQueue) getTaskByReqID(reqId UniqueID) task { queue.atLock.Lock() defer queue.atLock.Unlock() for ats := range queue.activeTasks { - if queue.activeTasks[ats].ID() == reqId { + if queue.activeTasks[ats].ID() == reqID { return queue.activeTasks[ats] } } @@ -216,14 +216,14 @@ func (sched *TaskScheduler) scheduleDqTask() task { return sched.DqQueue.PopUnissuedTask() } -func (sched *TaskScheduler) getTaskByReqID(reqID UniqueID) task { - if t := sched.DdQueue.getTaskByReqID(reqID); t != nil { +func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task { + if t := sched.DdQueue.getTaskByReqID(collMeta); t != nil { return t } - if t := sched.DmQueue.getTaskByReqID(reqID); t != nil { + if t := sched.DmQueue.getTaskByReqID(collMeta); t != nil { return t } - if t := sched.DqQueue.getTaskByReqID(reqID); t != nil { + if t := sched.DqQueue.getTaskByReqID(collMeta); t != nil { return t } return nil diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index 3778c25053..69198f2bc8 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -61,7 +61,7 @@ func (tt *timeTick) tick() error { tt.currentTick = ts } - if tt.areRequestsDelivered(tt.currentTick) == false { + if !tt.areRequestsDelivered(tt.currentTick) { return nil } msgPack := msgstream.MsgPack{} diff --git a/internal/reader/col_seg_container_test.go b/internal/reader/col_seg_container_test.go index f6362062f7..458a03f606 100644 --- a/internal/reader/col_seg_container_test.go +++ b/internal/reader/col_seg_container_test.go @@ -14,8 +14,8 @@ import ( //----------------------------------------------------------------------------------------------------- collection func TestColSegContainer_addCollection(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -66,8 +66,8 @@ func TestColSegContainer_addCollection(t *testing.T) { func TestColSegContainer_removeCollection(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -122,8 +122,8 @@ func TestColSegContainer_removeCollection(t *testing.T) { func TestColSegContainer_getCollectionByID(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -180,8 +180,8 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { func TestColSegContainer_getCollectionByName(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -239,8 +239,8 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { //----------------------------------------------------------------------------------------------------- partition func TestColSegContainer_addPartition(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -297,8 +297,8 @@ func TestColSegContainer_addPartition(t *testing.T) { func TestColSegContainer_removePartition(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -357,8 +357,8 @@ func TestColSegContainer_removePartition(t *testing.T) { func TestColSegContainer_getPartitionByTag(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -420,8 +420,8 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { //----------------------------------------------------------------------------------------------------- segment func TestColSegContainer_addSegment(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -482,8 +482,8 @@ func TestColSegContainer_addSegment(t *testing.T) { func TestColSegContainer_removeSegment(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -546,8 +546,8 @@ func TestColSegContainer_removeSegment(t *testing.T) { func TestColSegContainer_getSegmentByID(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", @@ -611,8 +611,8 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { func TestColSegContainer_hasSegment(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", diff --git a/internal/reader/collection_test.go b/internal/reader/collection_test.go index 7db8392c05..313784a62f 100644 --- a/internal/reader/collection_test.go +++ b/internal/reader/collection_test.go @@ -13,8 +13,8 @@ import ( func TestCollection_Partitions(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index 14cb0119b2..6907ce3bca 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -3,13 +3,14 @@ package reader import ( "context" "encoding/binary" + "math" + "testing" + "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "math" - "testing" - "time" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -33,8 +34,8 @@ func TestManipulationService_Start(t *testing.T) { } // init query node - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) // init meta fieldVec := schemapb.FieldSchema{ @@ -157,7 +158,7 @@ func TestManipulationService_Start(t *testing.T) { producerChannels := []string{"insert"} insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarCient(pulsarUrl) + insertStream.SetPulsarCient(pulsarURL) insertStream.CreatePulsarProducers(producerChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -171,10 +172,5 @@ func TestManipulationService_Start(t *testing.T) { node.Close() - for { - select { - case <-ctx.Done(): - return - } - } + <-ctx.Done() } diff --git a/internal/reader/partition_test.go b/internal/reader/partition_test.go index a9ee41a0c4..a21b5556a0 100644 --- a/internal/reader/partition_test.go +++ b/internal/reader/partition_test.go @@ -13,8 +13,8 @@ import ( func TestPartition_Segments(t *testing.T) { ctx := context.Background() - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) fieldVec := schemapb.FieldSchema{ Name: "vec", diff --git a/internal/reader/segment_test.go b/internal/reader/segment_test.go index 8198800eb8..96eaf71e83 100644 --- a/internal/reader/segment_test.go +++ b/internal/reader/segment_test.go @@ -539,8 +539,8 @@ func TestSegment_segmentDelete(t *testing.T) { //func TestSegment_segmentSearch(t *testing.T) { // ctx := context.Background() // // 1. Construct node, collection, partition and segment -// pulsarUrl := "pulsar://localhost:6650" -// node := NewQueryNode(ctx, 0, pulsarUrl) +// pulsarURL := "pulsar://localhost:6650" +// node := NewQueryNode(ctx, 0, pulsarURL) // var collection = node.newCollection(0, "collection0", "") // var partition = collection.newPartition("partition0") // var segment = partition.newSegment(0) diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 82e802c73d..c6882a2036 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -1,8 +1,9 @@ package flowgraph import ( - "github.com/zilliztech/milvus-distributed/internal/msgstream" "log" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type InputNode struct {