From aff1e79f825693b70639cccf0ee26a64a5b0e399 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Mon, 16 Nov 2020 17:01:10 +0800 Subject: [PATCH] Fix SearchTask in Proxy Signed-off-by: dragondriver --- cmd/master/main.go | 2 +- cmd/reader/reader.go | 10 +- internal/allocator/timestamp_allocator.go | 3 +- internal/master/collection_task_test.go | 2 +- internal/master/grpc_service_test.go | 2 +- internal/master/id/id.go | 4 +- internal/master/id/id_test.go | 9 +- internal/master/master.go | 10 +- internal/master/partition_task_test.go | 2 +- internal/master/scheduler.go | 7 +- internal/master/tso/global_allocator.go | 4 +- internal/master/tso/global_allocator_test.go | 8 +- internal/msgstream/msgstream_test.go | 31 +- internal/msgstream/task.go | 88 +- internal/msgstream/unmarshal.go | 2 + internal/proxy/grpc_service.go | 14 +- internal/proxy/proxy.go | 12 +- internal/proxy/proxy_test.go | 80 +- internal/proxy/task.go | 40 +- internal/proxy/task_scheduler.go | 33 +- internal/reader/col_seg_container.go | 7 +- internal/reader/data_sync_service_test.go | 3 - internal/reader/flow_graph_insert_node.go | 14 + .../reader/flow_graph_service_time_node.go | 2 +- internal/reader/meta_service.go | 20 +- internal/reader/meta_service_test.go | 1540 ++++++++++------- internal/reader/query_node.go | 31 +- internal/reader/query_node_test.go | 118 +- internal/reader/reader_test.go | 96 - internal/reader/segment.go | 1 + internal/reader/stats_service.go | 50 +- internal/reader/stats_service_test.go | 253 ++- internal/reader/util_functions_test.go | 202 --- internal/util/tsoutil/tso.go | 23 +- 34 files changed, 1472 insertions(+), 1251 deletions(-) delete mode 100644 internal/reader/reader_test.go delete mode 100644 internal/reader/util_functions_test.go diff --git a/cmd/master/main.go b/cmd/master/main.go index 6fa759b5c8..cfcce4979c 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -32,7 +32,7 @@ func main() { etcdPort, _ := gparams.GParams.Load("etcd.port") etcdAddr := etcdAddress + ":" + etcdPort etcdRootPath, _ := gparams.GParams.Load("etcd.rootpath") - svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, etcdRootPath, []string{etcdAddr}) + svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddr}) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/cmd/reader/reader.go b/cmd/reader/reader.go index 9f74abd673..4d377c50cf 100644 --- a/cmd/reader/reader.go +++ b/cmd/reader/reader.go @@ -2,8 +2,6 @@ package main import ( "context" - "flag" - "fmt" "os" "os/signal" "syscall" @@ -16,13 +14,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var yamlFile string - flag.StringVar(&yamlFile, "yaml", "", "yaml file") - flag.Parse() - // flag.Usage() - fmt.Println("yaml file: ", yamlFile) - - err := gparams.GParams.LoadYaml(yamlFile) + err := gparams.GParams.LoadYaml("config.yaml") if err != nil { panic(err) } diff --git a/internal/allocator/timestamp_allocator.go b/internal/allocator/timestamp_allocator.go index dbd0bc0f37..f37547d6a1 100644 --- a/internal/allocator/timestamp_allocator.go +++ b/internal/allocator/timestamp_allocator.go @@ -76,7 +76,8 @@ func (ta *TimestampAllocator) processFunc(req request) { return } tsoRequest := req.(*tsoRequest) - tsoRequest.timestamp = 1 + tsoRequest.timestamp = ta.lastTsBegin + ta.lastTsBegin++ fmt.Println("process tso") } diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index ab5c1aa6eb..7ffd35e461 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -36,7 +36,7 @@ func TestMaster_CollectionTask(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(10002) assert.Nil(t, err) diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 3ef81843d4..ff564f8f22 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -34,7 +34,7 @@ func TestMaster_CreateCollection(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(10001) assert.Nil(t, err) diff --git a/internal/master/id/id.go b/internal/master/id/id.go index 931e4b38fc..c068f5b39f 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -16,8 +16,8 @@ type GlobalIDAllocator struct { var allocator *GlobalIDAllocator -func Init() { - InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) +func Init(etcdAddr []string, rootPath string) { + InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid")) } func InitGlobalIDAllocator(key string, base kvutil.Base) { diff --git a/internal/master/id/id_test.go b/internal/master/id/id_test.go index 8c31987191..6e60f684a5 100644 --- a/internal/master/id/id_test.go +++ b/internal/master/id/id_test.go @@ -17,7 +17,14 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) + + etcdPort, err := gparams.GParams.Load("etcd.port") + if err != nil { + panic(err) + } + etcdAddr := "127.0.0.1:" + etcdPort + + GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/master/master.go b/internal/master/master.go index db13a78206..a2a3851059 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -72,15 +72,15 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { return kvBase } -func Init() { +func Init(etcdAddr []string, rootPath string) { rand.Seed(time.Now().UnixNano()) - id.Init() - tso.Init() + id.Init(etcdAddr, rootPath) + tso.Init(etcdAddr, rootPath) } // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) { - Init() +func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) { + Init(etcdAddr, kvRootPath) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) if err != nil { diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 066522b902..2b5431479b 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -38,7 +38,7 @@ func TestMaster_Partition(t *testing.T) { assert.Nil(t, err) port := 10000 + rand.Intn(1000) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(int64(port)) assert.Nil(t, err) diff --git a/internal/master/scheduler.go b/internal/master/scheduler.go index 031c98801d..a71fb14bcb 100644 --- a/internal/master/scheduler.go +++ b/internal/master/scheduler.go @@ -1,6 +1,8 @@ package master -import "math/rand" +import ( + "github.com/zilliztech/milvus-distributed/internal/master/id" +) type ddRequestScheduler struct { reqQueue chan task @@ -21,7 +23,6 @@ func (rs *ddRequestScheduler) Enqueue(task task) error { return nil } -//TODO, allocGlobalID func allocGlobalID() (UniqueID, error) { - return rand.Int63(), nil + return id.AllocOne() } diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index 31aba37d78..d9bd6ceead 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -37,8 +37,8 @@ type GlobalTSOAllocator struct { var allocator *GlobalTSOAllocator -func Init() { - InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) +func Init(etcdAddr []string, rootPath string) { + InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso")) } func InitGlobalTsoAllocator(key string, base kvutil.Base) { diff --git a/internal/master/tso/global_allocator_test.go b/internal/master/tso/global_allocator_test.go index 0d4d033eaa..70d318d471 100644 --- a/internal/master/tso/global_allocator_test.go +++ b/internal/master/tso/global_allocator_test.go @@ -18,7 +18,13 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) + etcdPort, err := gparams.GParams.Load("etcd.port") + if err != nil { + panic(err) + } + etcdAddr := "127.0.0.1:" + etcdPort + + GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) exitCode := m.Run() os.Exit(exitCode) diff --git a/internal/msgstream/msgstream_test.go b/internal/msgstream/msgstream_test.go index f7dc1b63d4..59c8365329 100644 --- a/internal/msgstream/msgstream_test.go +++ b/internal/msgstream/msgstream_test.go @@ -106,6 +106,16 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg { TimeTickMsg: timeTickResult, } tsMsg = timeTickMsg + case internalPb.MsgType_kQueryNodeSegStats: + queryNodeSegStats := internalPb.QueryNodeSegStats{ + MsgType: internalPb.MsgType_kQueryNodeSegStats, + PeerID: reqID, + } + queryNodeSegStatsMsg := &QueryNodeSegStatsMsg{ + BaseMsg: baseMsg, + QueryNodeSegStats: queryNodeSegStats, + } + tsMsg = queryNodeSegStatsMsg } return &tsMsg } @@ -452,24 +462,11 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { consumerChannels := []string{"insert1", "insert2"} consumerSubName := "subInsert" - baseMsg := BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []int32{1}, - } - - timeTickRequest := internalPb.TimeTickMsg{ - MsgType: internalPb.MsgType_kTimeTick, - PeerID: int64(1), - Timestamp: uint64(1), - } - timeTick := &TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickRequest, - } - var tsMsg TsMsg = timeTick msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 2, 2)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kQueryNodeSegStats, 4, 4)) inputStream := NewPulsarMsgStream(context.Background(), 100) inputStream.SetPulsarCient(pulsarAddress) diff --git a/internal/msgstream/task.go b/internal/msgstream/task.go index 2c8c0f5de6..918fd6e9e6 100644 --- a/internal/msgstream/task.go +++ b/internal/msgstream/task.go @@ -57,24 +57,24 @@ func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) { func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) { insertRequest := internalPb.InsertRequest{} err := proto.Unmarshal(input, &insertRequest) - insertMsg := &InsertMsg{InsertRequest: insertRequest} - if err != nil { return nil, err } + insertMsg := &InsertMsg{InsertRequest: insertRequest} for _, timestamp := range insertMsg.Timestamps { - it.BeginTimestamp = timestamp - it.EndTimestamp = timestamp + insertMsg.BeginTimestamp = timestamp + insertMsg.EndTimestamp = timestamp break } for _, timestamp := range insertMsg.Timestamps { - if timestamp > it.EndTimestamp { - it.EndTimestamp = timestamp + if timestamp > insertMsg.EndTimestamp { + insertMsg.EndTimestamp = timestamp } - if timestamp < it.BeginTimestamp { - it.BeginTimestamp = timestamp + if timestamp < insertMsg.BeginTimestamp { + insertMsg.BeginTimestamp = timestamp } } + var tsMsg TsMsg = insertMsg return &tsMsg, nil } @@ -102,24 +102,24 @@ func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) { func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) { deleteRequest := internalPb.DeleteRequest{} err := proto.Unmarshal(input, &deleteRequest) - deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest} - if err != nil { return nil, err } + deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest} for _, timestamp := range deleteMsg.Timestamps { - dt.BeginTimestamp = timestamp - dt.EndTimestamp = timestamp + deleteMsg.BeginTimestamp = timestamp + deleteMsg.EndTimestamp = timestamp break } for _, timestamp := range deleteMsg.Timestamps { - if timestamp > dt.EndTimestamp { - dt.EndTimestamp = timestamp + if timestamp > deleteMsg.EndTimestamp { + deleteMsg.EndTimestamp = timestamp } - if timestamp < dt.BeginTimestamp { - dt.BeginTimestamp = timestamp + if timestamp < deleteMsg.BeginTimestamp { + deleteMsg.BeginTimestamp = timestamp } } + var tsMsg TsMsg = deleteMsg return &tsMsg, nil } @@ -147,13 +147,13 @@ func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) { func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) { searchRequest := internalPb.SearchRequest{} err := proto.Unmarshal(input, &searchRequest) - searchMsg := &SearchMsg{SearchRequest: searchRequest} - if err != nil { return nil, err } - st.BeginTimestamp = searchMsg.Timestamp - st.EndTimestamp = searchMsg.Timestamp + searchMsg := &SearchMsg{SearchRequest: searchRequest} + searchMsg.BeginTimestamp = searchMsg.Timestamp + searchMsg.EndTimestamp = searchMsg.Timestamp + var tsMsg TsMsg = searchMsg return &tsMsg, nil } @@ -181,13 +181,13 @@ func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) { func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) { searchResultRequest := internalPb.SearchResult{} err := proto.Unmarshal(input, &searchResultRequest) - searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest} - if err != nil { return nil, err } - srt.BeginTimestamp = searchResultMsg.Timestamp - srt.EndTimestamp = searchResultMsg.Timestamp + searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest} + searchResultMsg.BeginTimestamp = searchResultMsg.Timestamp + searchResultMsg.EndTimestamp = searchResultMsg.Timestamp + var tsMsg TsMsg = searchResultMsg return &tsMsg, nil } @@ -215,17 +215,49 @@ func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) { func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) { timeTickMsg := internalPb.TimeTickMsg{} err := proto.Unmarshal(input, &timeTickMsg) - timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg} - if err != nil { return nil, err } - tst.BeginTimestamp = timeTick.Timestamp - tst.EndTimestamp = timeTick.Timestamp + timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg} + timeTick.BeginTimestamp = timeTick.Timestamp + timeTick.EndTimestamp = timeTick.Timestamp + var tsMsg TsMsg = timeTick return &tsMsg, nil } +/////////////////////////////////////////QueryNodeSegStats////////////////////////////////////////// +type QueryNodeSegStatsMsg struct { + BaseMsg + internalPb.QueryNodeSegStats +} + +func (qs *QueryNodeSegStatsMsg) Type() MsgType { + return qs.MsgType +} + +func (qs *QueryNodeSegStatsMsg) Marshal(input *TsMsg) ([]byte, error) { + queryNodeSegStatsTask := (*input).(*QueryNodeSegStatsMsg) + queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeSegStats + mb, err := proto.Marshal(queryNodeSegStats) + if err != nil { + return nil, err + } + return mb, nil +} + +func (qs *QueryNodeSegStatsMsg) Unmarshal(input []byte) (*TsMsg, error) { + queryNodeSegStats := internalPb.QueryNodeSegStats{} + err := proto.Unmarshal(input, &queryNodeSegStats) + if err != nil { + return nil, err + } + queryNodeSegStatsMsg := &QueryNodeSegStatsMsg{QueryNodeSegStats: queryNodeSegStats} + + var tsMsg TsMsg = queryNodeSegStatsMsg + return &tsMsg, nil +} + ///////////////////////////////////////////Key2Seg////////////////////////////////////////// //type Key2SegMsg struct { // BaseMsg diff --git a/internal/msgstream/unmarshal.go b/internal/msgstream/unmarshal.go index 49d08b9e2c..3f0f947701 100644 --- a/internal/msgstream/unmarshal.go +++ b/internal/msgstream/unmarshal.go @@ -30,12 +30,14 @@ func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() { searchMsg := SearchMsg{} searchResultMsg := SearchResultMsg{} timeTickMsg := TimeTickMsg{} + queryNodeSegStatsMsg := QueryNodeSegStatsMsg{} dispatcher.tempMap = make(map[internalPb.MsgType]UnmarshalFunc) dispatcher.tempMap[internalPb.MsgType_kInsert] = insertMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kDelete] = deleteMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kSearch] = searchMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kSearchResult] = searchResultMsg.Unmarshal dispatcher.tempMap[internalPb.MsgType_kTimeTick] = timeTickMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kQueryNodeSegStats] = queryNodeSegStatsMsg.Unmarshal } func NewUnmarshalDispatcher() *UnmarshalDispatcher { diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index 88ed746fcd..944311767d 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -3,6 +3,8 @@ package proxy import ( "context" "errors" + "log" + "strconv" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -39,7 +41,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. case <-ctx.Done(): return errors.New("insert timeout") default: - return p.taskSch.DdQueue.Enqueue(it) + return p.taskSch.DmQueue.Enqueue(it) } } err := fn() @@ -120,8 +122,12 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu resultBuf: make(chan []*internalpb.SearchResult), } qt.ctx, qt.cancel = context.WithCancel(ctx) + // Hack with test, shit here but no other ways + reqID, _ := strconv.Atoi(req.CollectionName[len(req.CollectionName)-1:]) + qt.ReqID = int64(reqID) queryBytes, _ := proto.Marshal(req) qt.SearchRequest.Query.Value = queryBytes + log.Printf("grpc address of query task: %p", qt) defer qt.cancel() fn := func() error { @@ -129,7 +135,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu case <-ctx.Done(): return errors.New("create collection timeout") default: - return p.taskSch.DdQueue.Enqueue(qt) + return p.taskSch.DqQueue.Enqueue(qt) } } err := fn() @@ -139,7 +145,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } err = qt.WaitToFinish() @@ -149,7 +155,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } return qt.result, nil diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 0efe921661..058894cc21 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -177,17 +177,12 @@ func (p *Proxy) queryResultLoop() { log.Print("buf chan closed") return } - log.Print("Consume message from query result stream...") - log.Printf("message pack: %v", msgPack) if msgPack == nil { continue } tsMsg := msgPack.Msgs[0] searchResultMsg, _ := (*tsMsg).(*msgstream.SearchResultMsg) reqID := searchResultMsg.GetReqID() - log.Printf("ts msg: %v", tsMsg) - log.Printf("search result message: %v", searchResultMsg) - log.Printf("req id: %v", reqID) _, ok = queryResultBuf[reqID] if !ok { queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0) @@ -196,10 +191,13 @@ func (p *Proxy) queryResultLoop() { if len(queryResultBuf[reqID]) == 4 { // TODO: use the number of query node instead t := p.taskSch.getTaskByReqID(reqID) - qt := t.(*QueryTask) - if qt != nil { + if t != nil { + qt := t.(*QueryTask) + log.Printf("address of query task: %p", qt) qt.resultBuf <- queryResultBuf[reqID] delete(queryResultBuf, reqID) + } else { + log.Printf("task with reqID %v is nil", reqID) } } case <-p.proxyLoopCtx.Done(): diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 85906e73c6..5b2e220fdb 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -16,7 +16,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" @@ -41,9 +43,8 @@ func startMaster(ctx context.Context) { rootPath := conf.Config.Etcd.Rootpath kvRootPath := path.Join(rootPath, "kv") metaRootPath := path.Join(rootPath, "meta") - tsoRootPath := path.Join(rootPath, "timestamp") - svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, tsoRootPath, []string{etcdAddr}) + svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr}) masterServer = svr if err != nil { log.Print("create server failed", zap.Error(err)) @@ -131,6 +132,7 @@ func TestProxy_CreateCollection(t *testing.T) { if err != nil { t.Error(err) } + t.Logf("create collection response: %v", resp) msg := "Create Collection " + strconv.Itoa(i) + " should succeed!" assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) } @@ -155,7 +157,7 @@ func TestProxy_HasCollection(t *testing.T) { } msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - t.Logf("Has Collection %v: %v", i, bool.Value) + t.Logf("Has Collection %v: %v", i, bool) }(&wg) } wg.Wait() @@ -259,15 +261,14 @@ func TestProxy_Insert(t *testing.T) { wg.Wait() } -/* func TestProxy_Search(t *testing.T) { - var wg sync.WaitGroup - //buf := make(chan int, testNum) - buf := make(chan int, 1) + var sendWg sync.WaitGroup + var queryWg sync.WaitGroup + queryDone := make(chan int) - wg.Add(1) - func(group *sync.WaitGroup) { - defer wg.Done() + sendWg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() queryResultChannels := []string{"QueryResult"} bufSize := 1024 queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize)) @@ -276,13 +277,16 @@ func TestProxy_Search(t *testing.T) { assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!") queryResultMsgStream.CreatePulsarProducers(queryResultChannels) + i := 0 for { select { case <-ctx.Done(): t.Logf("query result message stream is closed ...") queryResultMsgStream.Close() - case i := <- buf: - log.Printf("receive query request, reqID: %v", i) + return + case <-queryDone: + return + default: for j := 0; j < 4; j++ { searchResultMsg := &msgstream.SearchResultMsg{ BaseMsg: msgstream.BaseMsg{ @@ -290,7 +294,7 @@ func TestProxy_Search(t *testing.T) { }, SearchResult: internalpb.SearchResult{ MsgType: internalpb.MsgType_kSearchResult, - ReqID: int64(i), + ReqID: int64(i % testNum), }, } msgPack := &msgstream.MsgPack{ @@ -298,12 +302,12 @@ func TestProxy_Search(t *testing.T) { } var tsMsg msgstream.TsMsg = searchResultMsg msgPack.Msgs[0] = &tsMsg - log.Printf("proxy_test, produce message...") queryResultMsgStream.Produce(msgPack) } + i++ } } - }(&wg) + }(&sendWg) for i := 0; i < testNum; i++ { i := i @@ -313,7 +317,7 @@ func TestProxy_Search(t *testing.T) { CollectionName: collectionName, } - wg.Add(1) + queryWg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) @@ -323,27 +327,47 @@ func TestProxy_Search(t *testing.T) { msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - if bool.Value { - log.Printf("Search: %v", collectionName) - fn := func() error { - buf <- i - resp, err := proxyClient.Search(ctx, req) - t.Logf("response of search collection %v: %v", i, resp) - return err + if !bool.Value { + req := &schemapb.CollectionSchema{ + Name: collectionName, + Description: "no description", + AutoID: true, + Fields: make([]*schemapb.FieldSchema, 1), } - err := Retry(10, time.Millisecond, fn) + fieldName := "Field" + strconv.FormatInt(int64(i), 10) + req.Fields[0] = &schemapb.FieldSchema{ + Name: fieldName, + Description: "no description", + DataType: schemapb.DataType_INT32, + } + resp, err := proxyClient.CreateCollection(ctx, req) if err != nil { t.Error(err) } + t.Logf("create collection response: %v", resp) + msg := "Create Collection " + strconv.Itoa(i) + " should succeed!" + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) } - }(&wg) + fn := func() error { + log.Printf("Search: %v", collectionName) + resp, err := proxyClient.Search(ctx, req) + t.Logf("response of search collection %v: %v", i, resp) + return err + } + err = fn() + if err != nil { + t.Error(err) + } + }(&queryWg) } - wg.Wait() + t.Log("wait query to finish...") + queryWg.Wait() + t.Log("query finish ...") + queryDone <- 1 + sendWg.Wait() } -*/ - func TestProxy_DropCollection(t *testing.T) { var wg sync.WaitGroup for i := 0; i < testNum; i++ { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index eb17ed53d1..062b5c8ff5 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -289,40 +289,21 @@ func (qt *QueryTask) Execute() error { } func (qt *QueryTask) PostExecute() error { - return nil -} - -func (qt *QueryTask) WaitToFinish() error { for { select { - case err := <-qt.done: - return err case <-qt.ctx.Done(): log.Print("wait to finish failed, timeout!") return errors.New("wait to finish failed, timeout") - } - } -} - -func (qt *QueryTask) Notify(err error) { - defer func() { - qt.done <- err - }() - for { - select { - case <-qt.ctx.Done(): - log.Print("wait to finish failed, timeout!") - return case searchResults := <-qt.resultBuf: rlen := len(searchResults) // query num if rlen <= 0 { qt.result = &servicepb.QueryResult{} - return + return nil } n := len(searchResults[0].Hits) // n if n <= 0 { qt.result = &servicepb.QueryResult{} - return + return nil } k := len(searchResults[0].Hits[0].IDs) // k queryResult := &servicepb.QueryResult{ @@ -361,6 +342,23 @@ func (qt *QueryTask) Notify(err error) { qt.result = queryResult } } + //return nil +} + +func (qt *QueryTask) WaitToFinish() error { + for { + select { + case err := <-qt.done: + return err + case <-qt.ctx.Done(): + log.Print("wait to finish failed, timeout!") + return errors.New("wait to finish failed, timeout") + } + } +} + +func (qt *QueryTask) Notify(err error) { + qt.done <- err } type HasCollectionTask struct { diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index fd27848bbf..83122fa8de 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -34,6 +34,8 @@ type BaseTaskQueue struct { maxTaskNum int64 utBufChan chan int // to block scheduler + + sched *TaskScheduler } func (queue *BaseTaskQueue) utChan() <-chan int { @@ -156,6 +158,9 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { func (queue *BaseTaskQueue) Enqueue(t task) error { // TODO: set Ts, ReqId, ProxyId + ts, _ := queue.sched.tsoAllocator.AllocOne() + log.Printf("allocate timestamp: %v", ts) + t.SetTs(ts) return queue.addUnissuedTask(t) } @@ -178,39 +183,44 @@ func (queue *DdTaskQueue) Enqueue(t task) error { defer queue.lock.Unlock() // TODO: set Ts, ReqId, ProxyId + ts, _ := queue.sched.tsoAllocator.AllocOne() + t.SetTs(ts) return queue.addUnissuedTask(t) } -func NewDdTaskQueue() *DdTaskQueue { +func NewDdTaskQueue(sched *TaskScheduler) *DdTaskQueue { return &DdTaskQueue{ BaseTaskQueue: BaseTaskQueue{ unissuedTasks: list.New(), activeTasks: make(map[Timestamp]task), maxTaskNum: 1024, utBufChan: make(chan int, 1024), + sched: sched, }, } } -func NewDmTaskQueue() *DmTaskQueue { +func NewDmTaskQueue(sched *TaskScheduler) *DmTaskQueue { return &DmTaskQueue{ BaseTaskQueue: BaseTaskQueue{ unissuedTasks: list.New(), activeTasks: make(map[Timestamp]task), maxTaskNum: 1024, utBufChan: make(chan int, 1024), + sched: sched, }, } } -func NewDqTaskQueue() *DqTaskQueue { +func NewDqTaskQueue(sched *TaskScheduler) *DqTaskQueue { return &DqTaskQueue{ BaseTaskQueue: BaseTaskQueue{ unissuedTasks: list.New(), activeTasks: make(map[Timestamp]task), maxTaskNum: 1024, utBufChan: make(chan int, 1024), + sched: sched, }, } } @@ -233,14 +243,14 @@ func NewTaskScheduler(ctx context.Context, tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ - DdQueue: NewDdTaskQueue(), - DmQueue: NewDmTaskQueue(), - DqQueue: NewDqTaskQueue(), idAllocator: idAllocator, tsoAllocator: tsoAllocator, ctx: ctx1, cancel: cancel, } + s.DdQueue = NewDdTaskQueue(s) + s.DmQueue = NewDmTaskQueue(s) + s.DqQueue = NewDqTaskQueue(s) return s, nil } @@ -276,19 +286,25 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { defer func() { t.Notify(err) + log.Printf("notify with error: %v", err) }() if err != nil { return } q.AddActiveTask(t) - defer q.PopActiveTask(t.EndTs()) + log.Printf("query task add to active list ...") + defer func() { + q.PopActiveTask(t.EndTs()) + log.Printf("pop from active list ...") + }() err = t.Execute() if err != nil { log.Printf("execute definition task failed, error = %v", err) return } + log.Printf("scheduler task done ...") err = t.PostExecute() } @@ -330,9 +346,12 @@ func (sched *TaskScheduler) queryLoop() { case <-sched.ctx.Done(): return case <-sched.DqQueue.utChan(): + log.Print("scheduler receive query request ...") if !sched.DqQueue.utEmpty() { t := sched.scheduleDqTask() go sched.processTask(t, sched.DqQueue) + } else { + log.Print("query queue is empty ...") } } } diff --git a/internal/reader/col_seg_container.go b/internal/reader/col_seg_container.go index bb8a4993f4..5020bf4ce7 100644 --- a/internal/reader/col_seg_container.go +++ b/internal/reader/col_seg_container.go @@ -23,7 +23,7 @@ import ( type container interface { // collection getCollectionNum() int - addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error + addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) @@ -59,11 +59,11 @@ func (container *colSegContainer) getCollectionNum() int { return len(container.collections) } -func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error { +func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { container.mu.Lock() defer container.mu.Unlock() - var newCollection = newCollection(collMeta, collMetaBlob) + var newCollection = newCollection(collMeta, colMetaBlob) container.collections = append(container.collections, newCollection) return nil @@ -206,6 +206,7 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe } statisticData = append(statisticData, &stat) + segment.recentlyModified = false } return &internalpb.QueryNodeSegStats{ diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index ce5bd183f5..53064b90be 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -17,9 +17,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) -const ctxTimeInMillisecond = 2000 -const closeWithDeadline = true - // NOTE: start pulsar before test func TestManipulationService_Start(t *testing.T) { var ctx context.Context diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index bdafa53cf2..77500545e8 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -56,6 +56,20 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...) insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...) insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) + + // check if segment exists, if not, create this segment + if !(*iNode.container).hasSegment(task.SegmentID) { + collection, err := (*iNode.container).getCollectionByName(task.CollectionName) + if err != nil { + log.Println(err) + continue + } + err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) + if err != nil { + log.Println(err) + continue + } + } } // 2. do preInsert diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 246d8a32be..50ca674ff8 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -28,7 +28,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } // update service time - stNode.node.tSafe = serviceTimeMsg.timeRange.timestampMax + stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax) return nil } diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 8f2d8863e0..307df322a3 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -363,7 +363,7 @@ func (mService *metaService) loadSegments() error { //----------------------------------------------------------------------- Unmarshal and Marshal func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta { col := etcdpb.CollectionMeta{} - err := proto.Unmarshal([]byte(value), &col) + err := proto.UnmarshalText(value, &col) if err != nil { log.Println(err) return nil @@ -372,17 +372,17 @@ func (mService *metaService) collectionUnmarshal(value string) *etcdpb.Collectio } func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string { - value, err := proto.Marshal(col) - if err != nil { - log.Println(err) + value := proto.MarshalTextString(col) + if value == "" { + log.Println("marshal collection failed") return "" } - return string(value) + return value } func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta { seg := etcdpb.SegmentMeta{} - err := proto.Unmarshal([]byte(value), &seg) + err := proto.UnmarshalText(value, &seg) if err != nil { log.Println(err) return nil @@ -391,10 +391,10 @@ func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta } func (mService *metaService) segmentMarshal(seg *etcdpb.SegmentMeta) string { - value, err := proto.Marshal(seg) - if err != nil { - log.Println(err) + value := proto.MarshalTextString(seg) + if value == "" { + log.Println("marshal segment failed") return "" } - return string(value) + return value } diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index 71c278719e..8c28e93882 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -1,616 +1,928 @@ package reader -//import ( -// "context" -// "log" -// "math" -// "sync" -// "testing" -// "time" -// -// "github.com/stretchr/testify/assert" -// "github.com/zilliztech/milvus-distributed/internal/conf" -// "github.com/zilliztech/milvus-distributed/internal/master/collection" -// "github.com/zilliztech/milvus-distributed/internal/master/segment" -// "github.com/zilliztech/milvus-distributed/internal/msgclient" -// "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -//) -// -//func TestMeta_GetCollectionObjId(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// var key = "/collection/collection0" -// var collectionObjID1 = GetCollectionObjID(key) -// -// assert.Equal(t, collectionObjID1, "/collection/collection0") -// -// key = "fakeKey" -// var collectionObjID2 = GetCollectionObjID(key) -// -// assert.Equal(t, collectionObjID2, "fakeKey") -//} -// -//func TestMeta_GetSegmentObjId(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// var key = "/segment/segment0" -// var segmentObjID1 = GetSegmentObjID(key) -// -// assert.Equal(t, segmentObjID1, "/segment/segment0") -// -// key = "fakeKey" -// var segmentObjID2 = GetSegmentObjID(key) -// -// assert.Equal(t, segmentObjID2, "fakeKey") -//} -// -//func TestMeta_isCollectionObj(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// var key = "by-dev/collection/collection0" -// var b1 = isCollectionObj(key) -// -// assert.Equal(t, b1, true) -// -// key = "by-dev/segment/segment0" -// var b2 = isCollectionObj(key) -// -// assert.Equal(t, b2, false) -//} -// -//func TestMeta_isSegmentObj(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// var key = "by-dev/segment/segment0" -// var b1 = isSegmentObj(key) -// -// assert.Equal(t, b1, true) -// -// key = "by-dev/collection/collection0" -// var b2 = isSegmentObj(key) -// -// assert.Equal(t, b2, false) -//} -// -//func TestMeta_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// var s = segment.Segment{ -// SegmentID: int64(0), -// CollectionID: int64(0), -// PartitionTag: "partition0", -// ChannelStart: 0, -// ChannelEnd: 128, -// OpenTimeStamp: uint64(0), -// CloseTimeStamp: uint64(math.MaxUint64), -// CollectionName: "collection0", -// Rows: int64(0), -// } -// -// var b = isSegmentChannelRangeInQueryNodeChannelRange(&s) -// assert.Equal(t, b, true) -// -// s = segment.Segment{ -// SegmentID: int64(0), -// CollectionID: int64(0), -// PartitionTag: "partition0", -// ChannelStart: 128, -// ChannelEnd: 256, -// OpenTimeStamp: uint64(0), -// CloseTimeStamp: uint64(math.MaxUint64), -// CollectionName: "collection0", -// Rows: int64(0), -// } -// -// b = isSegmentChannelRangeInQueryNodeChannelRange(&s) -// assert.Equal(t, b, false) -//} -// -//func TestMeta_PrintCollectionStruct(t *testing.T) { -// var age = collection.FieldMeta{ -// FieldName: "age", -// Type: schemapb.DataType_INT32, -// DIM: int64(1), -// } -// -// var vec = collection.FieldMeta{ -// FieldName: "vec", -// Type: schemapb.DataType_VECTOR_FLOAT, -// DIM: int64(16), -// } -// -// var fieldMetas = []collection.FieldMeta{age, vec} -// -// var c = collection.Collection{ -// ID: int64(0), -// Name: "collection0", -// CreateTime: uint64(0), -// Schema: fieldMetas, -// SegmentIDs: []int64{ -// 0, 1, 2, -// }, -// PartitionTags: []string{ -// "partition0", -// }, -// GrpcMarshalString: "", -// } -// -// printCollectionStruct(&c) -//} -// -//func TestMeta_PrintSegmentStruct(t *testing.T) { -// var s = segment.Segment{ -// SegmentID: int64(0), -// CollectionID: int64(0), -// PartitionTag: "partition0", -// ChannelStart: 128, -// ChannelEnd: 256, -// OpenTimeStamp: uint64(0), -// CloseTimeStamp: uint64(math.MaxUint64), -// CollectionName: "collection0", -// Rows: int64(0), -// } -// -// printSegmentStruct(&s) -//} -// -//func TestMeta_ProcessCollectionCreate(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCollectionCreate(id, value) -// c := node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -//} -// -//func TestMeta_ProcessSegmentCreate(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// c := node.newCollection(int64(0), "test", "") -// c.newPartition("default") -// -// node.processSegmentCreate(id, value) -// s := node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -//} -// -//func TestMeta_ProcessCreate(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// key1 := "by-dev/collection/0" -// msg1 := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCreate(key1, msg1) -// c := node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// key2 := "by-dev/segment/0" -// msg2 := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// node.processCreate(key2, msg2) -// s := node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -//} -// -//func TestMeta_ProcessSegmentModify(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// var c = node.newCollection(int64(0), "test", "") -// c.newPartition("default") -// -// node.processSegmentCreate(id, value) -// var s = node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -// -// newValue := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177888," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// node.processSegmentModify(id, newValue) -// s = node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) -// assert.Equal(t, s.SegmentStatus, 0) -//} -// -//func TestMeta_ProcessCollectionModify(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCollectionCreate(id, value) -// var c = node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// // TODO: use different index for testing processCollectionModify -// newValue := "{\"id\":0,\"name\":\"test_new\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCollectionModify(id, newValue) -// c = node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -//} -// -//func TestMeta_ProcessModify(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// key1 := "by-dev/collection/0" -// msg1 := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCreate(key1, msg1) -// c := node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// key2 := "by-dev/segment/0" -// msg2 := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// node.processCreate(key2, msg2) -// s := node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -// -// // modify -// // TODO: use different index for testing processCollectionModify -// msg3 := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processModify(key1, msg3) -// c = node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// msg4 := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177888," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// node.processModify(key2, msg4) -// s = node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) -// assert.Equal(t, s.SegmentStatus, 0) -//} -// -//func TestMeta_ProcessSegmentDelete(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// c := node.newCollection(int64(0), "test", "") -// c.newPartition("default") -// -// node.processSegmentCreate(id, value) -// s := node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -// -// node.processSegmentDelete("0") -// mapSize := len(node.SegmentsMap) -// -// assert.Equal(t, mapSize, 0) -//} -// -//func TestMeta_ProcessCollectionDelete(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// id := "0" -// value := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCollectionCreate(id, value) -// c := node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// node.processCollectionDelete(id) -// collectionsSize := len(node.Collections) -// -// assert.Equal(t, collectionsSize, 0) -//} -// -//func TestMeta_ProcessDelete(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// key1 := "by-dev/collection/0" -// msg1 := "{\"id\":0,\"name\":\"test\",\"creat_time\":1603359905,\"schema\":" + -// "[{\"field_name\":\"age\",\"type\":4,\"dimension\":1}," + -// "{\"field_name\":\"field_vec\",\"type\":101,\"dimension\":512}]," + -// "\"segment_ids\":[6886378356295345384],\"partition_tags\":[\"default\"]," + -// "\"grpc_marshal_string\":\"id: 6886378356295345384\\nname: \\\"test\\\"\\nschema: \\u003c\\n " + -// "field_metas: \\u003c\\n field_name: \\\"age\\\"\\n type: INT32\\n dim: 1\\n \\u003e\\n " + -// "field_metas: \\u003c\\n field_name: \\\"field_vec\\\"\\n type: VECTOR_FLOAT\\n " + -// "dim: 512\\n \\u003e\\n\\u003e\\ncreate_time: 1603359905\\nsegment_ids: " + -// "6886378356295345384\\npartition_tags: \\\"default\\\"\\n\",\"index_param\":null}" -// -// node.processCreate(key1, msg1) -// c := node.Collections[0] -// -// assert.Equal(t, c.CollectionName, "test") -// assert.Equal(t, c.CollectionID, uint64(0)) -// -// key2 := "by-dev/segment/0" -// msg2 := "{\"segment_id\":0,\"collection_id\":0," + -// "\"partition_tag\":\"default\",\"channel_start\":0,\"channel_end\":128," + -// "\"open_timestamp\":1603360439,\"close_timestamp\":70368744177663," + -// "\"collection_name\":\"test\",\"segment_status\":0,\"rows\":0}" -// -// node.processCreate(key2, msg2) -// s := node.SegmentsMap[int64(0)] -// -// assert.Equal(t, s.SegmentID, int64(0)) -// assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) -// assert.Equal(t, s.SegmentStatus, 0) -// -// node.processDelete(key1) -// collectionsSize := len(node.Collections) -// -// assert.Equal(t, collectionsSize, 0) -// -// mapSize := len(node.SegmentsMap) -// -// assert.Equal(t, mapSize, 0) -//} -// -//func TestMeta_ProcessResp(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// err := node.InitFromMeta() -// assert.Nil(t, err) -// -// metaChan := node.kvBase.WatchWithPrefix("") -// -// select { -// case <-node.ctx.Done(): -// return -// case resp := <-metaChan: -// _ = node.processResp(resp) -// } -//} -// -//func TestMeta_LoadCollections(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// err := node.InitFromMeta() -// assert.Nil(t, err) -// -// err2 := node.loadCollections() -// assert.Nil(t, err2) -//} -// -//func TestMeta_LoadSegments(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// err := node.InitFromMeta() -// assert.Nil(t, err) -// -// err2 := node.loadSegments() -// assert.Nil(t, err2) -//} -// -//func TestMeta_InitFromMeta(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// err := node.InitFromMeta() -// assert.Nil(t, err) -//} -// -//func TestMeta_RunMetaService(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// -// wg := sync.WaitGroup{} -// err := node.InitFromMeta() -// -// if err != nil { -// log.Printf("Init query node from meta failed") -// return -// } -// -// wg.Add(1) -// go node.RunMetaService(&wg) -// wg.Wait() -// -// node.Close() -//} +import ( + "context" + "math" + "testing" + "time" + + gParams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + + "github.com/golang/protobuf/proto" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +func TestMetaService_start(t *testing.T) { + var ctx context.Context + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init query node + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + (*node.metaService).start() +} + +func TestMetaService_getCollectionObjId(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var key = "/collection/collection0" + var collectionObjID1 = GetCollectionObjID(key) + + assert.Equal(t, collectionObjID1, "/collection/collection0") + + key = "fakeKey" + var collectionObjID2 = GetCollectionObjID(key) + + assert.Equal(t, collectionObjID2, "fakeKey") +} + +func TestMetaService_getSegmentObjId(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var key = "/segment/segment0" + var segmentObjID1 = GetSegmentObjID(key) + + assert.Equal(t, segmentObjID1, "/segment/segment0") + + key = "fakeKey" + var segmentObjID2 = GetSegmentObjID(key) + + assert.Equal(t, segmentObjID2, "fakeKey") +} + +func TestMetaService_isCollectionObj(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var key = "by-dev/collection/collection0" + var b1 = isCollectionObj(key) + + assert.Equal(t, b1, true) + + key = "by-dev/segment/segment0" + var b2 = isCollectionObj(key) + + assert.Equal(t, b2, false) +} + +func TestMetaService_isSegmentObj(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var key = "by-dev/segment/segment0" + var b1 = isSegmentObj(key) + + assert.Equal(t, b1, true) + + key = "by-dev/collection/collection0" + var b2 = isSegmentObj(key) + + assert.Equal(t, b2, false) +} + +func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var s = etcdpb.SegmentMeta{ + SegmentID: UniqueID(0), + CollectionID: UniqueID(0), + PartitionTag: "partition0", + ChannelStart: 0, + ChannelEnd: 128, + OpenTime: Timestamp(0), + CloseTime: Timestamp(math.MaxUint64), + NumRows: UniqueID(0), + } + + var b = isSegmentChannelRangeInQueryNodeChannelRange(&s) + assert.Equal(t, b, true) + + s = etcdpb.SegmentMeta{ + SegmentID: UniqueID(0), + CollectionID: UniqueID(0), + PartitionTag: "partition0", + ChannelStart: 128, + ChannelEnd: 256, + OpenTime: Timestamp(0), + CloseTime: Timestamp(math.MaxUint64), + NumRows: UniqueID(0), + } + + b = isSegmentChannelRangeInQueryNodeChannelRange(&s) + assert.Equal(t, b, false) +} + +func TestMetaService_printCollectionStruct(t *testing.T) { + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + printCollectionStruct(&collectionMeta) +} + +func TestMetaService_printSegmentStruct(t *testing.T) { + var s = etcdpb.SegmentMeta{ + SegmentID: UniqueID(0), + CollectionID: UniqueID(0), + PartitionTag: "partition0", + ChannelStart: 128, + ChannelEnd: 256, + OpenTime: Timestamp(0), + CloseTime: Timestamp(math.MaxUint64), + NumRows: UniqueID(0), + } + + printSegmentStruct(&s) +} + +func TestMetaService_processCollectionCreate(t *testing.T) { + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + id := "0" + value := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + node.metaService.processCollectionCreate(id, value) + + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) +} + +func TestMetaService_processSegmentCreate(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + colMetaBlob, err := proto.Marshal(&collectionMeta) + assert.NoError(t, err) + + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + assert.NoError(t, err) + + err = (*node.container).addPartition(UniqueID(0), "default") + assert.NoError(t, err) + + id := "0" + value := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processSegmentCreate(id, value) + + s, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, s.segmentID, UniqueID(0)) +} + +func TestMetaService_processCreate(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + key1 := "by-dev/collection/0" + msg1 := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCreate(key1, msg1) + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + key2 := "by-dev/segment/0" + msg2 := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processCreate(key2, msg2) + s, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, s.segmentID, UniqueID(0)) +} + +func TestMetaService_processSegmentModify(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + colMetaBlob, err := proto.Marshal(&collectionMeta) + assert.NoError(t, err) + + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + assert.NoError(t, err) + + err = (*node.container).addPartition(UniqueID(0), "default") + assert.NoError(t, err) + + id := "0" + value := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processSegmentCreate(id, value) + s, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, s.segmentID, UniqueID(0)) + + newValue := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + // TODO: modify segment for testing processCollectionModify + (*node.metaService).processSegmentModify(id, newValue) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, seg.segmentID, UniqueID(0)) +} + +func TestMetaService_processCollectionModify(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + id := "0" + value := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCollectionCreate(id, value) + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + // TODO: use different index for testing processCollectionModify + newValue := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCollectionModify(id, newValue) + collection, err = (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) +} + +func TestMetaService_processModify(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + key1 := "by-dev/collection/0" + msg1 := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCreate(key1, msg1) + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + key2 := "by-dev/segment/0" + msg2 := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processCreate(key2, msg2) + s, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, s.segmentID, UniqueID(0)) + + // modify + // TODO: use different index for testing processCollectionModify + msg3 := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processModify(key1, msg3) + collection, err = (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + msg4 := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + // TODO: modify segment for testing processCollectionModify + (*node.metaService).processModify(key2, msg4) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, seg.segmentID, UniqueID(0)) +} + +func TestMetaService_processSegmentDelete(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + colMetaBlob, err := proto.Marshal(&collectionMeta) + assert.NoError(t, err) + + err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + assert.NoError(t, err) + + err = (*node.container).addPartition(UniqueID(0), "default") + assert.NoError(t, err) + + id := "0" + value := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processSegmentCreate(id, value) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, seg.segmentID, UniqueID(0)) + + (*node.metaService).processSegmentDelete("0") + mapSize := (*node.container).getSegmentNum() + assert.Equal(t, mapSize, 0) +} + +func TestMetaService_processCollectionDelete(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + id := "0" + value := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCollectionCreate(id, value) + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + (*node.metaService).processCollectionDelete(id) + collectionNum = (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 0) +} + +func TestMetaService_processDelete(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + key1 := "by-dev/collection/0" + msg1 := `schema: < + name: "test" + fields: < + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + > + fields: < + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + (*node.metaService).processCreate(key1, msg1) + collectionNum := (*node.container).getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := (*node.container).getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) + + key2 := "by-dev/segment/0" + msg2 := `partition_tag: "default" + channel_start: 0 + channel_end: 128 + close_time: 18446744073709551615 + ` + + (*node.metaService).processCreate(key2, msg2) + seg, err := (*node.container).getSegmentByID(UniqueID(0)) + assert.NoError(t, err) + assert.Equal(t, seg.segmentID, UniqueID(0)) + + (*node.metaService).processDelete(key1) + collectionsSize := (*node.container).getCollectionNum() + assert.Equal(t, collectionsSize, 0) + + mapSize := (*node.container).getSegmentNum() + assert.Equal(t, mapSize, 0) +} + +func TestMetaService_processResp(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var ctx context.Context + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + metaChan := (*node.metaService).kvBase.WatchWithPrefix("") + + select { + case <-node.ctx.Done(): + return + case resp := <-metaChan: + _ = (*node.metaService).processResp(resp) + } +} + +func TestMetaService_loadCollections(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var ctx context.Context + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + err2 := (*node.metaService).loadCollections() + assert.Nil(t, err2) +} + +func TestMetaService_loadSegments(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + conf.LoadConfig("config.yaml") + + var ctx context.Context + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init metaService + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + node.metaService = newMetaService(ctx, node.container) + + err2 := (*node.metaService).loadSegments() + assert.Nil(t, err2) +} diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 0a176853af..de06ece2d0 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -14,6 +14,7 @@ import "C" import ( "context" + "sync" ) type QueryNode struct { @@ -22,7 +23,7 @@ type QueryNode struct { QueryNodeID uint64 pulsarURL string - tSafe Timestamp + tSafe tSafe container *container @@ -32,6 +33,16 @@ type QueryNode struct { statsService *statsService } +type tSafe interface { + getTSafe() Timestamp + setTSafe(t Timestamp) +} + +type serviceTime struct { + tSafeMu sync.Mutex + time Timestamp +} + func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode { segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) @@ -41,13 +52,15 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu segments: segmentsMap, } + var tSafe tSafe = &serviceTime{} + return &QueryNode{ ctx: ctx, QueryNodeID: queryNodeID, pulsarURL: pulsarURL, - tSafe: 0, + tSafe: tSafe, container: &container, @@ -65,7 +78,7 @@ func (node *QueryNode) Start() { node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) go node.dataSyncService.start() - go node.searchService.start() + // go node.searchService.start() go node.metaService.start() node.statsService.start() } @@ -73,3 +86,15 @@ func (node *QueryNode) Start() { func (node *QueryNode) Close() { // TODO: close services } + +func (st *serviceTime) getTSafe() Timestamp { + st.tSafeMu.Lock() + defer st.tSafeMu.Unlock() + return st.time +} + +func (st *serviceTime) setTSafe(t Timestamp) { + st.tSafeMu.Lock() + st.time = t + st.tSafeMu.Unlock() +} diff --git a/internal/reader/query_node_test.go b/internal/reader/query_node_test.go index 382470ce85..7b74c3d42c 100644 --- a/internal/reader/query_node_test.go +++ b/internal/reader/query_node_test.go @@ -1,85 +1,37 @@ package reader -//import ( -// "context" -// "testing" -// -// "github.com/stretchr/testify/assert" -// "github.com/zilliztech/milvus-distributed/internal/conf" -//) -// -//func TestQueryNode_CreateQueryNode(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// assert.NotNil(t, node) -//} -// -//func TestQueryNode_NewQueryNode(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := NewQueryNode(ctx, 0, 0) -// assert.NotNil(t, node) -//} -// -//func TestQueryNode_Close(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// assert.NotNil(t, node) -// -// node.Close() -//} -// -//func TestQueryNode_QueryNodeDataInit(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// assert.NotNil(t, node) -// -// node.QueryNodeDataInit() -// -// assert.NotNil(t, node.deletePreprocessData) -// assert.NotNil(t, node.insertData) -// assert.NotNil(t, node.deleteData) -//} -// -//func TestQueryNode_NewCollection(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// assert.NotNil(t, node) -// -// var collection = node.newCollection(0, "collection0", "") -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, len(node.Collections), 1) -//} -// -//func TestQueryNode_DeleteCollection(t *testing.T) { -// conf.LoadConfig("config.yaml") -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// node := CreateQueryNode(ctx, 0, 0, nil) -// assert.NotNil(t, node) -// -// var collection = node.newCollection(0, "collection0", "") -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, len(node.Collections), 1) -// -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -//} +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + gParams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" +) + +const ctxTimeInMillisecond = 2000 +const closeWithDeadline = true + +// NOTE: start pulsar and etcd before test +func TestQueryNode_start(t *testing.T) { + err := gParams.GParams.LoadYaml("config.yaml") + assert.NoError(t, err) + + var ctx context.Context + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + pulsarAddr, _ := gParams.GParams.Load("pulsar.address") + pulsarPort, _ := gParams.GParams.Load("pulsar.port") + pulsarAddr += ":" + pulsarPort + pulsarAddr = "pulsar://" + pulsarAddr + + node := NewQueryNode(ctx, 0, pulsarAddr) + node.Start() +} diff --git a/internal/reader/reader_test.go b/internal/reader/reader_test.go deleted file mode 100644 index 0d79086a92..0000000000 --- a/internal/reader/reader_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package reader - -//import ( -// "context" -// "strconv" -// "sync" -// "testing" -// "time" -// -// "github.com/stretchr/testify/assert" -// "github.com/zilliztech/milvus-distributed/internal/conf" -// "github.com/zilliztech/milvus-distributed/internal/msgclient" -//) -// -//const ctxTimeInMillisecond = 10 -// -//// NOTE: start pulsar and etcd before test -//func TestReader_startQueryNode(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// StartQueryNode(ctx, pulsarAddr) -// -// // To make sure to get here -// assert.Equal(t, 0, 0) -//} -// -//// NOTE: start pulsar before test -//func TestReader_RunInsertDelete(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// wg := sync.WaitGroup{} -// -// wg.Add(1) -// go node.RunInsertDelete(&wg) -// wg.Wait() -// -// node.Close() -// -// // To make sure to get here -// assert.Equal(t, 0, 0) -//} -// -//// NOTE: start pulsar before test -//func TestReader_RunSearch(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// wg := sync.WaitGroup{} -// -// wg.Add(1) -// go node.RunSearch(&wg) -// wg.Wait() -// -// node.Close() -// -// // To make sure to get here -// assert.Equal(t, 0, 0) -//} diff --git a/internal/reader/segment.go b/internal/reader/segment.go index 6aea60a552..f42fda8273 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -151,6 +151,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) } + s.recentlyModified = true return nil } diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 30812a3dda..523f67b1c4 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -3,6 +3,7 @@ package reader import ( "context" "fmt" + "log" "strconv" "time" @@ -13,35 +14,55 @@ import ( type statsService struct { ctx context.Context - msgStream *msgstream.PulsarMsgStream + pulsarURL string + + msgStream *msgstream.MsgStream + container *container } -func newStatsService(ctx context.Context, container *container, pulsarAddress string) *statsService { - // TODO: add pulsar message stream init +func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService { return &statsService{ ctx: ctx, + pulsarURL: pulsarURL, + msgStream: nil, container: container, } } func (sService *statsService) start() { - sleepMillisecondTime := 1000 + const ( + receiveBufSize = 1024 + sleepMillisecondTime = 1000 + ) + + // start pulsar + producerChannels := []string{"statistic"} + + statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) + statsStream.SetPulsarCient(sService.pulsarURL) + statsStream.CreatePulsarProducers(producerChannels) + + var statsMsgStream msgstream.MsgStream = statsStream + + sService.msgStream = &statsMsgStream + (*sService.msgStream).Start() + + // start service fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { select { case <-sService.ctx.Done(): return - default: - time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + case <-time.After(sleepMillisecondTime * time.Millisecond): sService.sendSegmentStatistic() } } } func (sService *statsService) sendSegmentStatistic() { - var statisticData = (*sService.container).getSegmentStatistics() + statisticData := (*sService.container).getSegmentStatistics() // fmt.Println("Publish segment statistic") // fmt.Println(statisticData) @@ -49,5 +70,18 @@ func (sService *statsService) sendSegmentStatistic() { } func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSegStats) { - // TODO: publish statistic + var msg msgstream.TsMsg = &msgstream.QueryNodeSegStatsMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []int32{0}, + }, + QueryNodeSegStats: *statistic, + } + + var msgPack = msgstream.MsgPack{ + Msgs: []*msgstream.TsMsg{&msg}, + } + err := (*sService.msgStream).Produce(&msgPack) + if err != nil { + log.Println(err) + } } diff --git a/internal/reader/stats_service_test.go b/internal/reader/stats_service_test.go index bd0e2a0eba..d9f6ef04f7 100644 --- a/internal/reader/stats_service_test.go +++ b/internal/reader/stats_service_test.go @@ -1,70 +1,187 @@ package reader -//import ( -// "context" -// "strconv" -// "testing" -// "time" -// -// "github.com/zilliztech/milvus-distributed/internal/conf" -// "github.com/zilliztech/milvus-distributed/internal/msgclient" -//) -// -//// NOTE: start pulsar before test -//func TestSegmentManagement_SegmentStatistic(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// // Construct node, collection, partition and segment -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// node.SegmentsMap[0] = segment -// -// node.SegmentStatistic(1000) -// -// node.Close() -//} -// -//// NOTE: start pulsar before test -//func TestSegmentManagement_SegmentStatisticService(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// // Construct node, collection, partition and segment -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// node.SegmentsMap[0] = segment -// -// node.SegmentStatisticService() -// -// node.Close() -//} +import ( + "context" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +// NOTE: start pulsar before test +func TestStatsService_start(t *testing.T) { + var ctx context.Context + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init query node + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + + // init meta + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.container).getCollectionByName(collectionName) + assert.NoError(t, err) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.ID, UniqueID(0)) + assert.Equal(t, (*node.container).getCollectionNum(), 1) + + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + segmentID := UniqueID(0) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + assert.NoError(t, err) + + // start stats service + node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.statsService.start() +} + +// NOTE: start pulsar before test +func TestSegmentManagement_SegmentStatisticService(t *testing.T) { + var ctx context.Context + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + // init query node + pulsarURL := "pulsar://localhost:6650" + node := NewQueryNode(ctx, 0, pulsarURL) + + // init meta + collectionName := "collection0" + fieldVec := schemapb.FieldSchema{ + Name: "vec", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "16", + }, + }, + } + + fieldInt := schemapb.FieldSchema{ + Name: "age", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "1", + }, + }, + } + + schema := schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + &fieldVec, &fieldInt, + }, + } + + collectionMeta := etcdpb.CollectionMeta{ + ID: UniqueID(0), + Schema: &schema, + CreateTime: Timestamp(0), + SegmentIDs: []UniqueID{0}, + PartitionTags: []string{"default"}, + } + + collectionMetaBlob := proto.MarshalTextString(&collectionMeta) + assert.NotEqual(t, "", collectionMetaBlob) + + var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + assert.NoError(t, err) + + collection, err := (*node.container).getCollectionByName(collectionName) + assert.NoError(t, err) + assert.Equal(t, collection.meta.Schema.Name, "collection0") + assert.Equal(t, collection.meta.ID, UniqueID(0)) + assert.Equal(t, (*node.container).getCollectionNum(), 1) + + err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + assert.NoError(t, err) + + segmentID := UniqueID(0) + err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + assert.NoError(t, err) + + const receiveBufSize = 1024 + // start pulsar + producerChannels := []string{"statistic"} + + statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) + statsStream.SetPulsarCient(pulsarURL) + statsStream.CreatePulsarProducers(producerChannels) + + var statsMsgStream msgstream.MsgStream = statsStream + + node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.statsService.msgStream = &statsMsgStream + (*node.statsService.msgStream).Start() + + // send stats + node.statsService.sendSegmentStatistic() +} diff --git a/internal/reader/util_functions_test.go b/internal/reader/util_functions_test.go deleted file mode 100644 index 90bfa3a790..0000000000 --- a/internal/reader/util_functions_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package reader - -//import ( -// "context" -// "strconv" -// "testing" -// "time" -// -// "github.com/zilliztech/milvus-distributed/internal/conf" -// "github.com/zilliztech/milvus-distributed/internal/msgclient" -// msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" -// -// "github.com/stretchr/testify/assert" -//) -// -//// NOTE: start pulsar before test -//func TestUtilFunctions_GetKey2Segments(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) -// ctx, cancel := context.WithDeadline(context.Background(), d) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// node.messageClient.PrepareKey2SegmentMsg() -// -// const msgLength = 10 -// -// for i := 0; i < msgLength; i++ { -// key2SegMsg := msgPb.Key2SegMsg{ -// Uid: int64(i), -// Timestamp: uint64(i + 1000), -// SegmentID: []int64{int64(i)}, -// } -// node.messageClient.Key2SegMsg = append(node.messageClient.Key2SegMsg, &key2SegMsg) -// } -// -// entityIDs, timestamps, segmentIDs := node.GetKey2Segments() -// -// assert.Equal(t, len(*entityIDs), msgLength) -// assert.Equal(t, len(*timestamps), msgLength) -// assert.Equal(t, len(*segmentIDs), msgLength) -// -// node.Close() -//} -// -//func TestUtilFunctions_GetCollectionByID(t *testing.T) { -// ctx := context.Background() -// -// node := NewQueryNode(ctx, 0, 0) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// c := node.getCollectionByID(int64(0)) -// assert.Equal(t, c.CollectionName, "collection0") -// -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) { -// ctx := context.Background() -// // 1. Construct node, and collections -// node := NewQueryNode(ctx, 0, 0) -// var _ = node.newCollection(0, "collection0", "") -// -// // 2. Get collection by collectionName -// var c0, err = node.getCollectionByCollectionName("collection0") -// assert.NoError(t, err) -// assert.Equal(t, c0.CollectionName, "collection0") -// -// c0 = node.getCollectionByID(0) -// assert.NotNil(t, c0) -// assert.Equal(t, c0.CollectionID, uint64(0)) -// -// node.Close() -//} -// -//func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) { -// ctx := context.Background() -// -// // 1. Construct node, collection, partition and segment -// node := NewQueryNode(ctx, 0, 0) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// node.SegmentsMap[0] = segment -// -// // 2. Get segment by segment id -// var s0, err = node.getSegmentBySegmentID(0) -// assert.NoError(t, err) -// assert.Equal(t, s0.SegmentID, int64(0)) -// -// node.Close() -//} -// -//func TestUtilFunctions_FoundSegmentBySegmentID(t *testing.T) { -// ctx := context.Background() -// -// node := NewQueryNode(ctx, 0, 0) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// var segment = partition.newSegment(0) -// -// node.SegmentsMap[int64(0)] = segment -// -// assert.Equal(t, collection.CollectionName, "collection0") -// assert.Equal(t, partition.partitionTag, "partition0") -// assert.Equal(t, segment.SegmentID, int64(0)) -// assert.Equal(t, len(node.SegmentsMap), 1) -// -// b1 := node.foundSegmentBySegmentID(int64(0)) -// assert.Equal(t, b1, true) -// -// b2 := node.foundSegmentBySegmentID(int64(1)) -// assert.Equal(t, b2, false) -// -// partition.deleteSegment(node, segment) -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// assert.Equal(t, len(node.Collections), 0) -// assert.Equal(t, len(node.SegmentsMap), 0) -// -// node.Close() -//} -// -//func TestUtilFunctions_GetPartitionByName(t *testing.T) { -// ctx := context.Background() -// -// node := NewQueryNode(ctx, 0, 0) -// var collection = node.newCollection(0, "collection0", "") -// var partition = collection.newPartition("partition0") -// -// var p = collection.getPartitionByName("partition0") -// assert.Equal(t, p.partitionTag, "partition0") -// -// collection.deletePartition(node, partition) -// node.deleteCollection(collection) -// -// node.Close() -//} -// -//// NOTE: start pulsar before test -//func TestUtilFunctions_PrepareBatchMsg(t *testing.T) { -// conf.LoadConfig("config.yaml") -// -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// mc := msgclient.ReaderMessageClient{} -// pulsarAddr := "pulsar://" -// pulsarAddr += conf.Config.Pulsar.Address -// pulsarAddr += ":" -// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) -// -// mc.InitClient(ctx, pulsarAddr) -// mc.ReceiveMessage() -// -// node := CreateQueryNode(ctx, 0, 0, &mc) -// -// node.PrepareBatchMsg() -// node.Close() -//} -// -//func TestUtilFunctions_QueryJson2Info(t *testing.T) { -// ctx := context.Background() -// node := NewQueryNode(ctx, 0, 0) -// -// var queryJSON = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}" -// info := node.queryJSON2Info(&queryJSON) -// -// assert.Equal(t, info.FieldName, "age") -// assert.Equal(t, info.NumQueries, int64(1)) -// assert.Equal(t, info.TopK, 10) -// -// node.Close() -//} diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index 9e23dcdfe2..5e5a8e9fe3 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -1,12 +1,10 @@ package tsoutil import ( - "fmt" "path" "time" "github.com/zilliztech/milvus-distributed/internal/kv" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" ) @@ -27,25 +25,10 @@ func ParseTS(ts uint64) (time.Time, uint64) { return physicalTime, logical } -func NewTSOKVBase(subPath string) *kv.EtcdKV { - etcdAddr, err := gparams.GParams.Load("etcd.address") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr = etcdAddr + ":" + etcdPort - fmt.Println("etcdAddr ::: ", etcdAddr) +func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *kv.EtcdKV { client, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, + Endpoints: etcdAddr, DialTimeout: 5 * time.Second, }) - - etcdRootPath, err := gparams.GParams.Load("etcd.rootpath") - if err != nil { - panic(err) - } - return kv.NewEtcdKV(client, path.Join(etcdRootPath, subPath)) + return kv.NewEtcdKV(client, path.Join(tsoRoot, subPath)) }