diff --git a/reader/main.go b/reader/main.go index 6b6caa6821..3945c9b2a9 100644 --- a/reader/main.go +++ b/reader/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "github.com/czs007/suvlim/conf" @@ -9,17 +10,19 @@ import ( ) func main() { + ctx, _ := context.WithCancel(context.Background()) + var yamlFile string flag.StringVar(&yamlFile, "yaml", "", "yaml file") flag.Parse() // flag.Usage() fmt.Println("yaml file: ", yamlFile) conf.LoadConfig(yamlFile) - + pulsarAddr := "pulsar://" pulsarAddr += conf.Config.Pulsar.Address pulsarAddr += ":" pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - reader.StartQueryNode(pulsarAddr) + reader.StartQueryNode(ctx, pulsarAddr) } diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index d67bc5318f..5b03629486 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -14,6 +14,9 @@ import ( ) type MessageClient struct { + // context + ctx context.Context + // timesync timeSyncCfg *timesync.ReaderTimeSyncCfg @@ -22,12 +25,12 @@ type MessageClient struct { key2SegChan chan *msgpb.Key2SegMsg // pulsar - client pulsar.Client + client pulsar.Client //searchResultProducer pulsar.Producer searchResultProducers map[int64]pulsar.Producer segmentsStatisticProducer pulsar.Producer - searchConsumer pulsar.Consumer - key2segConsumer pulsar.Consumer + searchConsumer pulsar.Consumer + key2segConsumer pulsar.Consumer // batch messages InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg @@ -79,27 +82,45 @@ func (mc *MessageClient) GetSearchChan() <-chan *msgpb.SearchMsg { func (mc *MessageClient) receiveSearchMsg() { for { - searchMsg := msgpb.SearchMsg{} - msg, err := mc.searchConsumer.Receive(context.Background()) - err = proto.Unmarshal(msg.Payload(), &searchMsg) - if err != nil { - log.Fatal(err) + select { + case <-mc.ctx.Done(): + return + default: + searchMsg := msgpb.SearchMsg{} + msg, err := mc.searchConsumer.Receive(mc.ctx) + if err != nil { + log.Println(err) + continue + } + err = proto.Unmarshal(msg.Payload(), &searchMsg) + if err != nil { + log.Fatal(err) + } + mc.searchChan <- &searchMsg + mc.searchConsumer.Ack(msg) } - mc.searchChan <- &searchMsg - mc.searchConsumer.Ack(msg) } } func (mc *MessageClient) receiveKey2SegMsg() { for { - key2SegMsg := msgpb.Key2SegMsg{} - msg, err := mc.key2segConsumer.Receive(context.Background()) - err = proto.Unmarshal(msg.Payload(), &key2SegMsg) - if err != nil { - log.Fatal(err) + select { + case <-mc.ctx.Done(): + return + default: + key2SegMsg := msgpb.Key2SegMsg{} + msg, err := mc.key2segConsumer.Receive(mc.ctx) + if err != nil { + log.Println(err) + continue + } + err = proto.Unmarshal(msg.Payload(), &key2SegMsg) + if err != nil { + log.Fatal(err) + } + mc.key2SegChan <- &key2SegMsg + mc.key2segConsumer.Ack(msg) } - mc.key2SegChan <- &key2SegMsg - mc.key2segConsumer.Ack(msg) } } @@ -141,7 +162,7 @@ func (mc *MessageClient) createClient(url string) pulsar.Client { if conf.Config.Pulsar.Authentication { // create client with Authentication client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: url, + URL: url, Authentication: pulsar.NewAuthenticationToken(conf.Config.Pulsar.Token), }) @@ -162,7 +183,10 @@ func (mc *MessageClient) createClient(url string) pulsar.Client { return client } -func (mc *MessageClient) InitClient(url string) { +func (mc *MessageClient) InitClient(ctx context.Context, url string) { + // init context + mc.ctx = ctx + //create client mc.client = mc.createClient(url) mc.MessageClientID = conf.Config.Reader.ClientId @@ -185,7 +209,7 @@ func (mc *MessageClient) InitClient(url string) { insertOrDeleteTopicName = "InsertOrDelete-" + conf.Config.Pulsar.User + "-" } - for _, key := range proxyIdList{ + for _, key := range proxyIdList { topic := searchResultTopicName topic = topic + strconv.Itoa(int(key)) mc.searchResultProducers[key] = mc.creatProducer(topic) @@ -217,7 +241,8 @@ func (mc *MessageClient) InitClient(url string) { readSubName := "reader" + strconv.Itoa(mc.MessageClientID) readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize) - timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic, + timeSync, err := timesync.NewReaderTimeSync(ctx, + timeSyncTopic, timeSyncSubName, readTopics, readSubName, @@ -236,14 +261,26 @@ func (mc *MessageClient) InitClient(url string) { } func (mc *MessageClient) Close() { - mc.client.Close() - for key, _ := range mc.searchResultProducers { - mc.searchResultProducers[key].Close() + if mc.client != nil { + mc.client.Close() + } + for key, _ := range mc.searchResultProducers { + if mc.searchResultProducers[key] != nil { + mc.searchResultProducers[key].Close() + } + } + if mc.segmentsStatisticProducer != nil { + mc.segmentsStatisticProducer.Close() + } + if mc.searchConsumer != nil { + mc.searchConsumer.Close() + } + if mc.key2segConsumer != nil { + mc.key2segConsumer.Close() + } + if mc.timeSyncCfg != nil { + mc.timeSyncCfg.Close() } - mc.segmentsStatisticProducer.Close() - mc.searchConsumer.Close() - mc.key2segConsumer.Close() - mc.timeSyncCfg.Close() } type MessageType int diff --git a/reader/read_node/index_test.go b/reader/read_node/index_test.go index 358c998179..7b18a3442e 100644 --- a/reader/read_node/index_test.go +++ b/reader/read_node/index_test.go @@ -1,6 +1,7 @@ package reader import ( + "context" "encoding/binary" "fmt" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" @@ -12,7 +13,8 @@ import ( func TestIndex_BuildIndex(t *testing.T) { // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + ctx := context.Background() + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -74,4 +76,5 @@ func TestIndex_BuildIndex(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + node.Close() } diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index 20eca81a7b..ffa5923fd8 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -1,7 +1,6 @@ package reader import ( - "context" "fmt" "log" "path" @@ -274,12 +273,12 @@ func (node *QueryNode) InitFromMeta() error { return nil } -func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) { +func (node *QueryNode) RunMetaService(wg *sync.WaitGroup) { //node.InitFromMeta() metaChan := node.kvBase.WatchWithPrefix("") for { select { - case <-ctx.Done(): + case <-node.ctx.Done(): wg.Done() println("DONE!!!!!!") return diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index a0049965a8..2892a3f4d7 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -14,6 +14,7 @@ package reader import "C" import ( + "context" "encoding/json" "fmt" "github.com/czs007/suvlim/conf" @@ -87,6 +88,9 @@ type InsertLog struct { } type QueryNode struct { + // context + ctx context.Context + QueryNodeId uint64 Collections []*Collection SegmentsMap map[int64]*Segment @@ -102,7 +106,7 @@ type QueryNode struct { InsertLogs []InsertLog } -func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *QueryNode { mc := message_client.MessageClient{} queryNodeTimeSync := &QueryNodeTime{ @@ -129,6 +133,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { } return &QueryNode{ + ctx: ctx, QueryNodeId: queryNodeId, Collections: nil, SegmentsMap: segmentsMap, @@ -140,11 +145,15 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { } func (node *QueryNode) Close() { - node.messageClient.Close() - node.kvBase.Close() + if node.messageClient != nil { + node.messageClient.Close() + } + if node.kvBase != nil { + node.kvBase.Close() + } } -func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode { +func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode { queryNodeTimeSync := &QueryNodeTime{ ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, @@ -172,6 +181,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes } return &QueryNode{ + ctx: ctx, QueryNodeId: queryNodeId, Collections: nil, SegmentsMap: segmentsMap, @@ -269,52 +279,64 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { if Debug { for { - var msgLen = node.PrepareBatchMsg() - var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} - assert.NotEqual(nil, 0, timeRange.timestampMin) - assert.NotEqual(nil, 0, timeRange.timestampMax) + select { + case <-node.ctx.Done(): + wg.Done() + return + default: + var msgLen = node.PrepareBatchMsg() + var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} + assert.NotEqual(nil, 0, timeRange.timestampMin) + assert.NotEqual(nil, 0, timeRange.timestampMax) - if node.msgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter { - node.WriteQueryLog() - BaselineCounter = node.msgCounter.InsertCounter / CountInsertMsgBaseline - } + if node.msgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter { + node.WriteQueryLog() + BaselineCounter = node.msgCounter.InsertCounter / CountInsertMsgBaseline + } - if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { + if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { + node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + continue + } + + node.QueryNodeDataInit() + node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) + //fmt.Println("MessagesPreprocess Done") + node.WriterDelete() + node.PreInsertAndDelete() + //fmt.Println("PreInsertAndDelete Done") + node.DoInsertAndDelete() + //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - continue } - - node.QueryNodeDataInit() - node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) - //fmt.Println("MessagesPreprocess Done") - node.WriterDelete() - node.PreInsertAndDelete() - //fmt.Println("PreInsertAndDelete Done") - node.DoInsertAndDelete() - //fmt.Println("DoInsertAndDelete Done") - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) } } else { for { - var msgLen = node.PrepareBatchMsg() - var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} - assert.NotEqual(nil, 0, timeRange.timestampMin) - assert.NotEqual(nil, 0, timeRange.timestampMax) + select { + case <-node.ctx.Done(): + wg.Done() + return + default: + var msgLen = node.PrepareBatchMsg() + var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} + assert.NotEqual(nil, 0, timeRange.timestampMin) + assert.NotEqual(nil, 0, timeRange.timestampMax) - if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { + if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { + node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + continue + } + + node.QueryNodeDataInit() + node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) + //fmt.Println("MessagesPreprocess Done") + node.WriterDelete() + node.PreInsertAndDelete() + //fmt.Println("PreInsertAndDelete Done") + node.DoInsertAndDelete() + //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) - continue } - - node.QueryNodeDataInit() - node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) - //fmt.Println("MessagesPreprocess Done") - node.WriterDelete() - node.PreInsertAndDelete() - //fmt.Println("PreInsertAndDelete Done") - node.DoInsertAndDelete() - //fmt.Println("DoInsertAndDelete Done") - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) } } wg.Done() @@ -336,6 +358,9 @@ func (node *QueryNode) TestInsertDelete(timeRange TimeRange) { func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { for { select { + case <-node.ctx.Done(): + wg.Done() + return case msg := <-node.messageClient.GetSearchChan(): node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) @@ -651,7 +676,6 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { } } - var entities = msgPb.Entities{ Ids: make([]int64, 0), } diff --git a/reader/read_node/reader.go b/reader/read_node/reader.go index f9c9927dd5..5f365b82d1 100644 --- a/reader/read_node/reader.go +++ b/reader/read_node/reader.go @@ -7,13 +7,12 @@ import ( "sync" ) -func StartQueryNode(pulsarURL string) { +func StartQueryNode(ctx context.Context, pulsarURL string) { mc := message_client.MessageClient{} - mc.InitClient(pulsarURL) + mc.InitClient(ctx, pulsarURL) mc.ReceiveMessage() - qn := CreateQueryNode(0, 0, &mc) - ctx := context.Background() + qn := CreateQueryNode(ctx, 0, 0, &mc) // Segments Services go qn.SegmentManagementService() @@ -28,7 +27,7 @@ func StartQueryNode(pulsarURL string) { } wg.Add(3) - go qn.RunMetaService(ctx, &wg) + go qn.RunMetaService(&wg) go qn.RunInsertDelete(&wg) go qn.RunSearch(&wg) wg.Wait() diff --git a/reader/read_node/reader_test.go b/reader/read_node/reader_test.go index 35db89f951..77782ab58e 100644 --- a/reader/read_node/reader_test.go +++ b/reader/read_node/reader_test.go @@ -1,22 +1,25 @@ package reader import ( + "context" "github.com/czs007/suvlim/conf" "strconv" "testing" + "time" ) +const ctxTimeInMillisecond = 200 + +// NOTE: start pulsar before test func TestReader_startQueryNode(t *testing.T) { - //pulsarURL := "pulsar://localhost:6650" + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + pulsarAddr := "pulsar://" pulsarAddr += conf.Config.Pulsar.Address pulsarAddr += ":" pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - println(pulsarAddr) - StartQueryNode(pulsarAddr) - - //go StartQueryNode(pulsarAddr, 0) - //StartQueryNode(pulsarAddr, 1) - - + StartQueryNode(ctx, pulsarAddr) } diff --git a/reader/read_node/result_test.go b/reader/read_node/result_test.go index d82f3559fc..8257558afc 100644 --- a/reader/read_node/result_test.go +++ b/reader/read_node/result_test.go @@ -1,21 +1,39 @@ package reader import ( + "context" + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/reader/message_client" + "strconv" "testing" + "time" masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" ) +// NOTE: start pulsar before test func TestResult_PublishSearchResult(t *testing.T) { + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + node := CreateQueryNode(ctx, 0, 0, &mc) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) node.SegmentsMap[0] = segment - // TODO: start pulsar server const N = 10 var entityIDs = msgPb.Entities{ Ids: make([]int64, N), @@ -29,11 +47,26 @@ func TestResult_PublishSearchResult(t *testing.T) { result.Distances = append(result.Distances, float32(i)) } node.PublishSearchResult(&result) + node.Close() } +// NOTE: start pulsar before test func TestResult_PublishFailedSearchResult(t *testing.T) { + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + node := CreateQueryNode(ctx, 0, 0, &mc) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -41,11 +74,27 @@ func TestResult_PublishFailedSearchResult(t *testing.T) { // TODO: start pulsar server node.PublishFailedSearchResult() + + node.Close() } +// NOTE: start pulsar before test func TestResult_PublicStatistic(t *testing.T) { + conf.LoadConfig("config.yaml") + + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + node := CreateQueryNode(ctx, 0, 0, &mc) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -66,4 +115,6 @@ func TestResult_PublicStatistic(t *testing.T) { // TODO: start pulsar server node.PublicStatistic(&statisticData) + + node.Close() } diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index b7a2eed148..689b589a7d 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -35,8 +35,13 @@ func (node *QueryNode) SegmentManagementService() { sleepMillisecondTime := 1000 fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") for { - time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) - node.SegmentsManagement() + select { + case <-node.ctx.Done(): + return + default: + time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + node.SegmentsManagement() + } } } @@ -91,7 +96,12 @@ func (node *QueryNode) SegmentStatisticService() { sleepMillisecondTime := 1000 fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { - time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) - node.SegmentStatistic(sleepMillisecondTime) + select { + case <-node.ctx.Done(): + return + default: + time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + node.SegmentStatistic(sleepMillisecondTime) + } } } diff --git a/reader/read_node/segment_service_test.go b/reader/read_node/segment_service_test.go index ae0590a867..892659085f 100644 --- a/reader/read_node/segment_service_test.go +++ b/reader/read_node/segment_service_test.go @@ -1,53 +1,93 @@ package reader import ( + "context" + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/reader/message_client" + "strconv" "testing" + "time" ) func TestSegmentManagement_SegmentsManagement(t *testing.T) { // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + ctx := context.Background() + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) node.SegmentsMap[0] = segment - // TODO: fix segment management node.SegmentsManagement() + + node.Close() } func TestSegmentManagement_SegmentService(t *testing.T) { + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) node.SegmentsMap[0] = segment - // TODO: fix segment service node.SegmentManagementService() + + node.Close() } +// NOTE: start pulsar before test func TestSegmentManagement_SegmentStatistic(t *testing.T) { + conf.LoadConfig("config.yaml") + ctx, _ := context.WithCancel(context.Background()) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + mc.ReceiveMessage() + node := CreateQueryNode(ctx, 0, 0, &mc) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) node.SegmentsMap[0] = segment - // TODO: start pulsar server node.SegmentStatistic(1000) + + node.Close() } +// NOTE: start pulsar before test func TestSegmentManagement_SegmentStatisticService(t *testing.T) { + conf.LoadConfig("config.yaml") + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + mc.ReceiveMessage() + node := CreateQueryNode(ctx, 0, 0, &mc) + // Construct node, collection, partition and segment - node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) node.SegmentsMap[0] = segment - // TODO: start pulsar server node.SegmentStatisticService() + + node.Close() } diff --git a/reader/read_node/segment_test.go b/reader/read_node/segment_test.go index 3d83c99d67..de86e9d28e 100644 --- a/reader/read_node/segment_test.go +++ b/reader/read_node/segment_test.go @@ -1,6 +1,7 @@ package reader import ( + "context" "encoding/binary" "fmt" "math" @@ -12,7 +13,8 @@ import ( func TestSegment_ConstructorAndDestructor(t *testing.T) { // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + ctx := context.Background() + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -21,11 +23,14 @@ func TestSegment_ConstructorAndDestructor(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_SegmentInsert(t *testing.T) { // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + ctx := context.Background() + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -66,11 +71,14 @@ func TestSegment_SegmentInsert(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_SegmentDelete(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -91,11 +99,14 @@ func TestSegment_SegmentDelete(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_SegmentSearch(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -152,11 +163,14 @@ func TestSegment_SegmentSearch(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_SegmentPreInsert(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -169,11 +183,14 @@ func TestSegment_SegmentPreInsert(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_SegmentPreDelete(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -186,13 +203,16 @@ func TestSegment_SegmentPreDelete(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } // Segment util functions test //////////////////////////////////////////////////////////////////////////// func TestSegment_GetStatus(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -205,11 +225,14 @@ func TestSegment_GetStatus(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_Close(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -222,11 +245,14 @@ func TestSegment_Close(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_GetRowCount(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -271,11 +297,14 @@ func TestSegment_GetRowCount(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_GetDeletedCount(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -301,11 +330,14 @@ func TestSegment_GetDeletedCount(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_GetMemSize(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -344,22 +376,28 @@ func TestSegment_GetMemSize(t *testing.T) { // 6. Get memory usage in bytes var memSize = segment.GetMemSize() - assert.Equal(t, memSize, uint64(1048714)) + assert.Equal(t, memSize, uint64(2785280)) // 7. Destruct collection, partition and segment partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } func TestSegment_RealSchemaTest(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment //var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " + // "field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " + // "field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" + // "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n" - var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n \u003e\n field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n \u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n" - node := NewQueryNode(0, 0) + var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n " + + "field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n dim: 1\n \u003e\n " + + "field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n dim: 16\n " + + "\u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n" + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", schemaString) var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -400,4 +438,6 @@ func TestSegment_RealSchemaTest(t *testing.T) { partition.DeleteSegment(segment) collection.DeletePartition(partition) node.DeleteCollection(collection) + + node.Close() } diff --git a/reader/read_node/util_functions_test.go b/reader/read_node/util_functions_test.go index 7f0ae295e4..3ae099795e 100644 --- a/reader/read_node/util_functions_test.go +++ b/reader/read_node/util_functions_test.go @@ -1,18 +1,42 @@ package reader import ( + "context" + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/reader/message_client" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" ) +// NOTE: start pulsar before test func TestUtilFunctions_GetKey2Segments(t *testing.T) { - // TODO: Add GetKey2Segments test + conf.LoadConfig("config.yaml") + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, _ := context.WithDeadline(context.Background(), d) + + mc := message_client.MessageClient{} + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + mc.InitClient(ctx, pulsarAddr) + + mc.ReceiveMessage() + node := CreateQueryNode(ctx, 0, 0, &mc) + + node.messageClient.PrepareKey2SegmentMsg() + var _, _, _ = node.GetKey2Segments() + + node.Close() } func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) { + ctx := context.Background() // 1. Construct node, and collections - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var _ = node.NewCollection(0, "collection0", "") // 2. Get collection by collectionName @@ -21,12 +45,15 @@ func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) { assert.Equal(t, c0.CollectionName, "collection0") c0 = node.GetCollectionByID(0) assert.NotNil(t, c0) - assert.Equal(t, c0.CollectionID, 0) + assert.Equal(t, c0.CollectionID, uint64(0)) + + node.Close() } func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) { + ctx := context.Background() // 1. Construct node, collection, partition and segment - node := NewQueryNode(0, 0) + node := NewQueryNode(ctx, 0, 0) var collection = node.NewCollection(0, "collection0", "") var partition = collection.NewPartition("partition0") var segment = partition.NewSegment(0) @@ -36,4 +63,6 @@ func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) { var s0, err = node.GetSegmentBySegmentID(0) assert.NoError(t, err) assert.Equal(t, s0.SegmentId, int64(0)) + + node.Close() } diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index adc7613ec5..ae1205dd34 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -66,7 +66,6 @@ type ReaderTimeSyncCfg struct { revTimesyncFromReader map[uint64]int ctx context.Context - cancel context.CancelFunc InsertLogs []InsertLog RoleType TimeSyncRole } @@ -83,6 +82,7 @@ func toMillisecond(ts *pb.TimeSyncMsg) int { } func NewReaderTimeSync( + ctx context.Context, timeSyncTopic string, timeSyncSubName string, readTopics []string, @@ -133,7 +133,7 @@ func NewReaderTimeSync( r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize) r.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, len(readTopics)*r.readerQueueSize) r.revTimesyncFromReader = make(map[uint64]int) - r.ctx, r.cancel = context.WithCancel(context.Background()) + r.ctx = ctx var client pulsar.Client var err error @@ -186,13 +186,20 @@ func NewReaderTimeSync( } func (r *ReaderTimeSyncCfg) Close() { - r.cancel() - r.timeSyncConsumer.Close() - r.readerConsumer.Close() - for i := 0; i < len(r.readerProducer); i++ { - r.readerProducer[i].Close() + if r.timeSyncConsumer != nil { + r.timeSyncConsumer.Close() + } + if r.readerConsumer != nil { + r.readerConsumer.Close() + } + for i := 0; i < len(r.readerProducer); i++ { + if r.readerProducer[i] != nil { + r.readerProducer[i].Close() + } + } + if r.pulsarClient != nil { + r.pulsarClient.Close() } - r.pulsarClient.Close() } func (r *ReaderTimeSyncCfg) Start() error { @@ -278,43 +285,48 @@ func (r *ReaderTimeSyncCfg) sendEOFMsg(ctx context.Context, msg *pulsar.Producer } func (r *ReaderTimeSyncCfg) startTimeSync() { + ctx := r.ctx tsm := make([]*pb.TimeSyncMsg, 0, len(r.proxyIdList)*2) - ctx, _ := context.WithCancel(r.ctx) var err error for { - //var start time.Time - for len(tsm) != len(r.proxyIdList) { - tsm = r.alignTimeSync(tsm) - tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm)) - if err != nil { - if ctx.Err() != nil { - return - } else { - //TODO, log error msg - log.Printf("read time sync error %v", err) + select { + case <-ctx.Done(): + return + default: + //var start time.Time + for len(tsm) != len(r.proxyIdList) { + tsm = r.alignTimeSync(tsm) + tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm)) + if err != nil { + if ctx.Err() != nil { + return + } else { + //TODO, log error msg + log.Printf("read time sync error %v", err) + } } } - } - ts := tsm[0].Timestamp - for i := 1; i < len(tsm); i++ { - if tsm[i].Timestamp < ts { - ts = tsm[i].Timestamp + ts := tsm[0].Timestamp + for i := 1; i < len(tsm); i++ { + if tsm[i].Timestamp < ts { + ts = tsm[i].Timestamp + } } - } - tsm = tsm[:0] - //send timestamp flag to reader channel - msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: r.readStopFlagClientId} - payload, err := proto.Marshal(&msg) - if err != nil { - //TODO log error - log.Printf("Marshal timesync flag error %v", err) - } else { - wg := sync.WaitGroup{} - wg.Add(len(r.readerProducer)) - for index := range r.readerProducer { - go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg) + tsm = tsm[:0] + //send timestamp flag to reader channel + msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: r.readStopFlagClientId} + payload, err := proto.Marshal(&msg) + if err != nil { + //TODO log error + log.Printf("Marshal timesync flag error %v", err) + } else { + wg := sync.WaitGroup{} + wg.Add(len(r.readerProducer)) + for index := range r.readerProducer { + go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg) + } + wg.Wait() } - wg.Wait() } } } @@ -362,7 +374,7 @@ func (r *ReaderTimeSyncCfg) WriteInsertLog() { } func (r *ReaderTimeSyncCfg) startReadTopics() { - ctx, _ := context.WithCancel(r.ctx) + ctx := r.ctx tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0} const Debug = true const WriterBaseline = 1000 * 1000