From 1323bdb8f4149c1de7bc470fde9a40f33ae91ee5 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 22 Sep 2020 15:27:50 +0800 Subject: [PATCH] Update segment close time Signed-off-by: bigsheeper --- reader/read_node/meta.go | 21 +++++++++++++++++++++ reader/read_node/segment_service.go | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index 3cfee81dc0..27b7545d98 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -3,6 +3,7 @@ package reader import ( "context" "fmt" + "log" "path" "reflect" "strconv" @@ -48,6 +49,15 @@ func isSegmentObj(key string) bool { return index == 0 } +func isSegmentChannelRangeInQueryNodeChannelRange(segment *mock.Segment) bool { + if segment.ChannelStart > segment.ChannelEnd { + log.Printf("Illegal segment channel range") + return false + } + // TODO: add query node channel range check + return true +} + func printCollectionStruct(obj *mock.Collection) { v := reflect.ValueOf(obj) v = reflect.Indirect(v) @@ -93,6 +103,11 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { println(err.Error()) } printSegmentStruct(segment) + + if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + return + } + collection := node.GetCollectionByID(segment.CollectionID) if collection != nil { partition := collection.GetPartitionByName(segment.PartitionTag) @@ -101,6 +116,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { // start new segment and add it into partition.OpenedSegments newSegment := partition.NewSegment(newSegmentID) newSegment.SegmentStatus = SegmentOpened + newSegment.SegmentCloseTime = -1 partition.OpenedSegments = append(partition.OpenedSegments, newSegment) node.SegmentsMap[newSegmentID] = newSegment } @@ -130,6 +146,11 @@ func (node *QueryNode) processSegmentModify(id string, value string) { println(err.Error()) } printSegmentStruct(segment) + + if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + return + } + seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64 if seg != nil { seg.SegmentCloseTime = segment.CloseTimeStamp diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index 1e5aa004b8..32d3d03067 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -19,7 +19,7 @@ func (node *QueryNode) SegmentsManagement() { for _, partition := range collection.Partitions { for _, oldSegment := range partition.OpenedSegments { // TODO: check segment status - if timeNow >= oldSegment.SegmentCloseTime { + if oldSegment.SegmentCloseTime != -1 && timeNow >= oldSegment.SegmentCloseTime { // close old segment and move it into partition.ClosedSegments if oldSegment.SegmentStatus != SegmentOpened { log.Println("Never reach here, Opened segment cannot be closed")