mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Fix segment close strategy
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
1323bdb8f4
commit
a2ae3044b0
@ -2,6 +2,8 @@ package reader
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
@ -23,7 +25,7 @@ func TestIndex_BuildIndex(t *testing.T) {
|
||||
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
|
||||
// schema_tmp->AddField("age", DataType::INT32);
|
||||
const DIM = 16
|
||||
const N = 10000
|
||||
const N = 100
|
||||
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
var rawData []byte
|
||||
for _, ele := range vec {
|
||||
@ -54,17 +56,19 @@ func TestIndex_BuildIndex(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// 7. Do search
|
||||
//var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
|
||||
//var queryRawData = make([]float32, 0)
|
||||
//for i := 0; i < 16; i++ {
|
||||
// queryRawData = append(queryRawData, float32(i))
|
||||
//}
|
||||
//var vectorRecord = msgPb.VectorRowRecord{
|
||||
// FloatData: queryRawData,
|
||||
//}
|
||||
//var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[N/2], &vectorRecord)
|
||||
//assert.NoError(t, searchErr)
|
||||
//fmt.Println(searchRes)
|
||||
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
|
||||
var queryRawData = make([]float32, 0)
|
||||
for i := 0; i < 16; i++ {
|
||||
queryRawData = append(queryRawData, float32(i))
|
||||
}
|
||||
var vectorRecord = msgPb.VectorRowRecord{
|
||||
FloatData: queryRawData,
|
||||
}
|
||||
|
||||
query := node.QueryJson2Info(&queryJson)
|
||||
var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord)
|
||||
assert.NoError(t, searchErr)
|
||||
fmt.Println(searchRes)
|
||||
|
||||
// 8. Destruct node, collection, and segment
|
||||
partition.DeleteSegment(segment)
|
||||
|
||||
@ -116,7 +116,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
|
||||
// start new segment and add it into partition.OpenedSegments
|
||||
newSegment := partition.NewSegment(newSegmentID)
|
||||
newSegment.SegmentStatus = SegmentOpened
|
||||
newSegment.SegmentCloseTime = -1
|
||||
newSegment.SegmentCloseTime = segment.CloseTimeStamp
|
||||
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
|
||||
node.SegmentsMap[newSegmentID] = newSegment
|
||||
}
|
||||
|
||||
@ -19,7 +19,7 @@ func (node *QueryNode) SegmentsManagement() {
|
||||
for _, partition := range collection.Partitions {
|
||||
for _, oldSegment := range partition.OpenedSegments {
|
||||
// TODO: check segment status
|
||||
if oldSegment.SegmentCloseTime != -1 && timeNow >= oldSegment.SegmentCloseTime {
|
||||
if timeNow >= oldSegment.SegmentCloseTime {
|
||||
// close old segment and move it into partition.ClosedSegments
|
||||
if oldSegment.SegmentStatus != SegmentOpened {
|
||||
log.Println("Never reach here, Opened segment cannot be closed")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user