Invalid collection meta caceh && ingore time stamp if not dd request

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2021-01-23 10:12:41 +08:00 committed by yefu.chen
parent 50e2369000
commit e68d50f29e
4 changed files with 223 additions and 55 deletions

View File

@ -3,6 +3,7 @@ package masterservice
import (
"fmt"
"math/rand"
"regexp"
"testing"
"time"
@ -99,6 +100,12 @@ func TestGrpcService(t *testing.T) {
return 2000, nil
}
collectionMetaCache := make([]string, 0, 16)
core.InvalidateCollectionMetaCache = func(dbName string, collectionName string) error {
collectionMetaCache = append(collectionMetaCache, collectionName)
return nil
}
err = svr.Start()
assert.Nil(t, err)
@ -150,15 +157,60 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_kCreateCollection)
assert.Equal(t, createCollectionArray[0].CollectionName, "testColl")
req.Base.MsgID = 101
req.Base.Timestamp = 101
req.Base.SourceID = 101
status, err = cli.CreateCollection(req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
req.Base.MsgID = 102
req.Base.Timestamp = 102
req.Base.SourceID = 102
req.CollectionName = "testColl-again"
status, err = cli.CreateCollection(req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
schema.Name = req.CollectionName
sbf, err = proto.Marshal(&schema)
assert.Nil(t, err)
req.Schema = sbf
req.Base.MsgID = 103
req.Base.Timestamp = 103
req.Base.SourceID = 103
status, err = cli.CreateCollection(req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, len(createCollectionArray), 2)
assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_kCreateCollection)
assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again")
//time stamp go back
schema.Name = "testColl-goback"
sbf, err = proto.Marshal(&schema)
assert.Nil(t, err)
req.CollectionName = schema.Name
req.Schema = sbf
req.Base.MsgID = 103
req.Base.Timestamp = 103
req.Base.SourceID = 103
status, err = cli.CreateCollection(req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason)
assert.Nil(t, err)
assert.True(t, matched)
})
t.Run("has collection", func(t *testing.T) {
req := &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kHasCollection,
MsgID: 101,
Timestamp: 101,
SourceID: 101,
MsgID: 110,
Timestamp: 110,
SourceID: 110,
},
DbName: "testDb",
CollectionName: "testColl",
@ -171,9 +223,9 @@ func TestGrpcService(t *testing.T) {
req = &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kHasCollection,
MsgID: 102,
Timestamp: 102,
SourceID: 102,
MsgID: 111,
Timestamp: 111,
SourceID: 111,
},
DbName: "testDb",
CollectionName: "testColl2",
@ -183,20 +235,21 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, rsp.Value, false)
// test time stamp go back
req = &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kHasCollection,
MsgID: 102,
Timestamp: 102,
SourceID: 102,
MsgID: 111,
Timestamp: 111,
SourceID: 111,
},
DbName: "testDb",
CollectionName: "testColl2",
}
rsp, err = cli.HasCollection(req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_UNEXPECTED_ERROR)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, rsp.Value, false)
})
t.Run("describe collection", func(t *testing.T) {
@ -205,9 +258,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: 103,
Timestamp: 103,
SourceID: 103,
MsgID: 120,
Timestamp: 120,
SourceID: 120,
},
DbName: "testDb",
CollectionName: "testColl",
@ -223,26 +276,26 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.ShowCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: 106,
Timestamp: 106,
SourceID: 106,
MsgID: 130,
Timestamp: 130,
SourceID: 130,
},
DbName: "testDb",
}
rsp, err := cli.ShowCollections(req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, rsp.CollectionNames[0], "testColl")
assert.Equal(t, len(rsp.CollectionNames), 1)
assert.ElementsMatch(t, rsp.CollectionNames, []string{"testColl", "testColl-again"})
assert.Equal(t, len(rsp.CollectionNames), 2)
})
t.Run("create partition", func(t *testing.T) {
req := &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreatePartition,
MsgID: 107,
Timestamp: 107,
SourceID: 107,
MsgID: 140,
Timestamp: 140,
SourceID: 140,
},
DbName: "testDb",
CollectionName: "testColl",
@ -264,9 +317,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kHasPartition,
MsgID: 108,
Timestamp: 108,
SourceID: 108,
MsgID: 150,
Timestamp: 150,
SourceID: 150,
},
DbName: "testDb",
CollectionName: "testColl",
@ -284,9 +337,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: 110,
Timestamp: 110,
SourceID: 110,
MsgID: 160,
Timestamp: 160,
SourceID: 160,
},
DbName: "testDb",
CollectionName: "testColl",
@ -320,9 +373,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
MsgID: 111,
Timestamp: 111,
SourceID: 111,
MsgID: 170,
Timestamp: 170,
SourceID: 170,
},
CollectionID: coll.ID,
PartitionID: partID,
@ -338,9 +391,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateIndex,
MsgID: 112,
Timestamp: 112,
SourceID: 112,
MsgID: 180,
Timestamp: 180,
SourceID: 180,
},
DbName: "",
CollectionName: "testColl",
@ -366,9 +419,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeSegment,
MsgID: 113,
Timestamp: 113,
SourceID: 113,
MsgID: 190,
Timestamp: 190,
SourceID: 190,
},
CollectionID: coll.ID,
SegmentID: 1000,
@ -383,9 +436,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeIndex,
MsgID: 114,
Timestamp: 114,
SourceID: 114,
MsgID: 200,
Timestamp: 200,
SourceID: 200,
},
DbName: "",
CollectionName: "testColl",
@ -422,9 +475,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeIndex,
MsgID: 115,
Timestamp: 115,
SourceID: 115,
MsgID: 210,
Timestamp: 210,
SourceID: 210,
},
DbName: "",
CollectionName: "testColl",
@ -444,9 +497,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DropPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropPartition,
MsgID: 199,
Timestamp: 199,
SourceID: 199,
MsgID: 220,
Timestamp: 220,
SourceID: 220,
},
DbName: "testDb",
CollectionName: "testColl",
@ -467,9 +520,9 @@ func TestGrpcService(t *testing.T) {
req := &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection,
MsgID: 200,
Timestamp: 200,
SourceID: 200,
MsgID: 230,
Timestamp: 230,
SourceID: 230,
},
DbName: "testDb",
CollectionName: "testColl",
@ -481,13 +534,15 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, dropCollectionArray[0].Base.MsgType, commonpb.MsgType_kDropCollection)
assert.Equal(t, dropCollectionArray[0].CollectionName, "testColl")
assert.Equal(t, len(collectionMetaCache), 1)
assert.Equal(t, collectionMetaCache[0], "testColl")
req = &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection,
MsgID: 200,
Timestamp: 200,
SourceID: 200,
MsgID: 231,
Timestamp: 231,
SourceID: 231,
},
DbName: "testDb",
CollectionName: "testColl",

View File

@ -148,6 +148,9 @@ type Core struct {
//TODO, call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error)
//TODO, proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(dbName string, collectionName string) error
// put create index task into this chan
indexTaskQueue chan *CreateIndexTask
@ -228,6 +231,9 @@ func (c *Core) checkInit() error {
if c.BuildIndexReq == nil {
return errors.Errorf("BuildIndexReq is nil")
}
if c.InvalidateCollectionMetaCache == nil {
return errors.Errorf("InvalidateCollectionMetaCache is nil")
}
if c.indexTaskQueue == nil {
return errors.Errorf("indexTaskQueue is nil")
}
@ -254,13 +260,15 @@ func (c *Core) startDdScheduler() {
task.Notify(err)
break
}
if ts <= c.lastDdTimeStamp {
if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp {
task.Notify(errors.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp))
break
}
err = task.Execute()
task.Notify(err)
c.lastDdTimeStamp = ts
if ts > c.lastDdTimeStamp {
c.lastDdTimeStamp = ts
}
}
}
}
@ -523,8 +531,14 @@ func (c *Core) setMsgStreams() error {
segInfoMsg, ok := segm.(*ms.SegmentInfoMsg)
if ok {
c.DataServiceSegmentChan <- segInfoMsg.Segment
} else {
flushMsg, ok := segm.(*ms.SegmentFlushCompletedMsg)
if ok {
c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID
} else {
log.Printf("receiver unexpected msg from data service stream, value = %v", segm)
}
}
//TODO, if data node flush
}
}
}

View File

@ -14,6 +14,7 @@ import (
type reqTask interface {
Type() commonpb.MsgType
Ts() (typeutil.Timestamp, error)
IgnoreTimeStamp() bool
Execute() error
WaitToFinish() error
Notify(err error)
@ -48,10 +49,15 @@ type CreateCollectionReqTask struct {
func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreateCollectionReqTask) Execute() error {
var schema schemapb.CollectionSchema
err := proto.Unmarshal(t.Req.Schema, &schema)
@ -59,6 +65,10 @@ func (t *CreateCollectionReqTask) Execute() error {
return err
}
if t.Req.CollectionName != schema.Name {
return errors.Errorf("collection name = %s, schema.Name=%s", t.Req.CollectionName, schema.Name)
}
for idx, field := range schema.Fields {
field.FieldID = int64(idx + StartOfUserFieldID)
}
@ -141,11 +151,18 @@ func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DropCollectionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *DropCollectionReqTask) Execute() error {
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
}
if err = t.core.InvalidateCollectionMetaCache(t.Req.DbName, t.Req.CollectionName); err != nil {
return err
}
err = t.core.MetaTable.DeleteCollection(collMeta.ID)
if err != nil {
return err
@ -182,6 +199,10 @@ func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *HasCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasCollectionReqTask) Execute() error {
_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err == nil {
@ -206,6 +227,10 @@ func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeCollectionReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
@ -237,6 +262,10 @@ func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowCollectionReqTask) Execute() error {
coll, err := t.core.MetaTable.ListCollections()
if err != nil {
@ -259,6 +288,10 @@ func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreatePartitionReqTask) Execute() error {
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
@ -304,6 +337,10 @@ func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DropPartitionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *DropPartitionReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
@ -345,6 +382,10 @@ func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *HasPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasPartitionReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
@ -368,6 +409,10 @@ func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowPartitionReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
if err != nil {
@ -401,6 +446,10 @@ func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeSegmentReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
if err != nil {
@ -448,6 +497,10 @@ func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowSegmentReqTask) Execute() error {
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
if err != nil {
@ -476,6 +529,10 @@ func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreateIndexReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreateIndexReqTask) Execute() error {
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, t.Req.ExtraParams)
if err != nil {
@ -511,6 +568,10 @@ func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeIndexReqTask) Execute() error {
idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
if err != nil {

View File

@ -690,3 +690,41 @@ func (sim *SegmentInfoMsg) Unmarshal(input []byte) (TsMsg, error) {
SegmentMsg: segMsg,
}, nil
}
/////////////////////////////////////////SegmentFlushCompletedMsg//////////////////////////////////////////
type SegmentFlushCompletedMsg struct {
BaseMsg
datapb.SegmentFlushCompletedMsg
}
func (sfm *SegmentFlushCompletedMsg) Type() MsgType {
return sfm.Base.MsgType
}
func (sfm *SegmentFlushCompletedMsg) GetMsgContext() context.Context {
return sfm.MsgCtx
}
func (sfm *SegmentFlushCompletedMsg) SetMsgContext(ctx context.Context) {
sfm.MsgCtx = ctx
}
func (sfm *SegmentFlushCompletedMsg) Marshal(input TsMsg) ([]byte, error) {
sfmsg := input.(*SegmentFlushCompletedMsg)
mb, err := proto.Marshal(&sfmsg.SegmentFlushCompletedMsg)
if err != nil {
return nil, err
}
return mb, nil
}
func (sfm *SegmentFlushCompletedMsg) Unmarshal(input []byte) (TsMsg, error) {
sfmsg := datapb.SegmentFlushCompletedMsg{}
err := proto.Unmarshal(input, &sfmsg)
if err != nil {
return nil, err
}
return &SegmentFlushCompletedMsg{
SegmentFlushCompletedMsg: sfmsg,
}, nil
}