diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 28e16ec7ec..847ef6b29a 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -126,7 +126,7 @@ func TestSegmentManager_AssignSegment(t *testing.T) { } } - time.Sleep(time.Duration(Params.SegIDAssignExpiration)) + time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) timestamp, err := globalTsoAllocator() assert.Nil(t, err) err = mt.UpdateSegment(&pb.SegmentMeta{ @@ -156,3 +156,122 @@ func TestSegmentManager_AssignSegment(t *testing.T) { assert.Nil(t, err) assert.NotEqualValues(t, 0, segMeta.CloseTime) } + +func TestSegmentManager_SycnWritenode(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.TODO()) + defer cancelFunc() + + Init() + Params.TopicNum = 5 + Params.QueryNodeNum = 3 + Params.SegmentSize = 536870912 / 1024 / 1024 + Params.SegmentSizeFactor = 0.75 + Params.DefaultRecordSize = 1024 + Params.MinSegIDAssignCnt = 1048576 / 1024 + Params.SegIDAssignExpiration = 2000 + etcdAddress := Params.EtcdAddress + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + assert.Nil(t, err) + rootPath := "/test/root" + _, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix()) + assert.Nil(t, err) + + kvBase := etcdkv.NewEtcdKV(cli, rootPath) + defer kvBase.Close() + mt, err := NewMetaTable(kvBase) + assert.Nil(t, err) + + collName := "segmgr_test_coll" + var collID int64 = 1001 + partitionTag := "test_part" + schema := &schemapb.CollectionSchema{ + Name: collName, + Fields: []*schemapb.FieldSchema{ + {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32}, + {FieldID: 2, Name: "f2", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{ + {Key: "dim", Value: "128"}, + }}, + }, + } + err = mt.AddCollection(&pb.CollectionMeta{ + ID: collID, + Schema: schema, + CreateTime: 0, + SegmentIDs: []UniqueID{}, + PartitionTags: []string{}, + }) + assert.Nil(t, err) + err = mt.AddPartition(collID, partitionTag) + assert.Nil(t, err) + + var cnt int64 + globalIDAllocator := func() (UniqueID, error) { + val := atomic.AddInt64(&cnt, 1) + return val, nil + } + globalTsoAllocator := func() (Timestamp, error) { + val := atomic.AddInt64(&cnt, 1) + phy := time.Now().UnixNano() / int64(time.Millisecond) + ts := tsoutil.ComposeTS(phy, val) + return ts, nil + } + syncWriteChan := make(chan *msgstream.TimeTickMsg) + syncProxyChan := make(chan *msgstream.TimeTickMsg) + + segAssigner := NewSegmentAssigner(ctx, mt, globalTsoAllocator, syncProxyChan) + mockScheduler := &MockFlushScheduler{} + segManager, err := NewSegmentManager(ctx, mt, globalIDAllocator, globalTsoAllocator, syncWriteChan, mockScheduler, segAssigner) + assert.Nil(t, err) + + segManager.Start() + defer segManager.Close() + sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) + assert.Nil(t, err) + maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord)) + + req := []*internalpb.SegIDRequest{ + {Count: maxCount, ChannelID: 1, CollName: collName, PartitionTag: partitionTag}, + {Count: maxCount, ChannelID: 2, CollName: collName, PartitionTag: partitionTag}, + {Count: maxCount, ChannelID: 3, CollName: collName, PartitionTag: partitionTag}, + } + assignSegment, err := segManager.AssignSegment(req) + assert.Nil(t, err) + timestamp, err := globalTsoAllocator() + assert.Nil(t, err) + for i := 0; i < len(assignSegment); i++ { + assert.EqualValues(t, maxCount, assignSegment[i].Count) + assert.EqualValues(t, i+1, assignSegment[i].ChannelID) + + err = mt.UpdateSegment(&pb.SegmentMeta{ + SegmentID: assignSegment[i].SegID, + CollectionID: collID, + PartitionTag: partitionTag, + ChannelStart: 0, + ChannelEnd: 1, + CloseTime: timestamp, + NumRows: int64(maxCount), + MemSize: 500000, + }) + assert.Nil(t, err) + } + + time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) + + timestamp, err = globalTsoAllocator() + assert.Nil(t, err) + tsMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: timestamp, EndTimestamp: timestamp, HashValues: []uint32{}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + MsgType: internalpb.MsgType_kTimeTick, + PeerID: 1, + Timestamp: timestamp, + }, + } + syncWriteChan <- tsMsg + time.Sleep(300 * time.Millisecond) + + status := segManager.collStatus[collID] + assert.Empty(t, status.segments) +}