diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 7a27dd2fac..b16feb9ef1 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -313,6 +313,29 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) + t.Run("update channel timetick", func(t *testing.T) { + req := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + }, + } + status, err := svr.UpdateChannelTimeTick(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + }) + + t.Run("release DQL msg stream", func(t *testing.T) { + req := &proxypb.ReleaseDQLMessageStreamRequest{} + assert.Panics(t, func() { svr.ReleaseDQLMessageStream(ctx, req) }) + }) + + t.Run("get metrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + rsp, err := svr.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + }) + t.Run("create collection", func(t *testing.T) { schema := schemapb.CollectionSchema{ Name: collName, diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 0b5468758c..bd7a621f85 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -29,17 +29,14 @@ type ParamTable struct { Address string Port int - PulsarAddress string - RocksmqPath string - RocksmqRetentionSizeInMinutes int64 - RocksmqRetentionSizeInMB int64 - EtcdEndpoints []string - MetaRootPath string - KvRootPath string - MsgChannelSubName string - TimeTickChannel string - StatisticsChannel string - DmlChannelName string + PulsarAddress string + EtcdEndpoints []string + MetaRootPath string + KvRootPath string + MsgChannelSubName string + TimeTickChannel string + StatisticsChannel string + DmlChannelName string DmlChannelNum int64 MaxPartitionNum int64 @@ -65,7 +62,6 @@ func (p *ParamTable) Init() { } p.initPulsarAddress() - p.initRocksmqPath() p.initEtcdEndpoints() p.initMetaRootPath() p.initKvRootPath() @@ -97,18 +93,6 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } -func (p *ParamTable) initRocksmqPath() { - path, err := p.Load("_RocksmqPath") - if err != nil { - panic(err) - } - p.RocksmqPath = path -} - -func (p *ParamTable) initRocksmqRetentionTimeInMinutes() { - p.RocksmqRetentionSizeInMinutes = p.ParseInt64("rootcoord.RocksmqRetentionSizeInMinutes") -} - func (p *ParamTable) initEtcdEndpoints() { endpoints, err := p.Load("_EtcdEndpoints") if err != nil { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c5e1f2def4..2df5cccc07 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -585,16 +585,13 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { c.CallGetBinlogFilePathsService = func(ctx context.Context, segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) { defer func() { if err := recover(); err != nil { - retFiles = nil retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err) } }() <-initCh //wait connect to data coord ts, err := c.TSOAllocator(1) if err != nil { - retFiles = nil - retErr = err - return + return nil, err } binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{ Base: &commonpb.MsgBase{ @@ -606,41 +603,29 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { SegmentID: segID, }) if err != nil { - retFiles = nil - retErr = err - return + return nil, err } if binlog.Status.ErrorCode != commonpb.ErrorCode_Success { - retFiles = nil - retErr = fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason) - return + return nil, fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason) } for i := range binlog.FieldIDs { if binlog.FieldIDs[i] == fieldID { - retFiles = binlog.Paths[i].Values - retErr = nil - return + return binlog.Paths[i].Values, nil } } - retFiles = nil - retErr = fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID) - return + return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID) } c.CallGetNumRowsService = func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (retRows int64, retErr error) { defer func() { if err := recover(); err != nil { - retRows = 0 retErr = fmt.Errorf("get num rows panic, msg = %v", err) - return } }() <-initCh ts, err := c.TSOAllocator(1) if err != nil { - retRows = 0 - retErr = err - return + return retRows, err } segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ @@ -652,36 +637,26 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { SegmentIDs: []typeutil.UniqueID{segID}, }) if err != nil { - retRows = 0 - retErr = err - return + return retRows, err } if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success { - return 0, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason) + return retRows, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason) } if len(segInfo.Infos) != 1 { log.Debug("get segment info empty") - retRows = 0 - retErr = nil - return + return retRows, nil } if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed { log.Debug("segment id not flushed", zap.Int64("segment id", segID)) - retRows = 0 - retErr = nil - return + return retRows, nil } - retRows = segInfo.Infos[0].NumOfRows - retErr = nil - return + return segInfo.Infos[0].NumOfRows, nil } c.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) (retSegIDs []typeutil.UniqueID, retErr error) { defer func() { if err := recover(); err != nil { - retSegIDs = []typeutil.UniqueID{} retErr = fmt.Errorf("get flushed segments from data coord panic, msg = %v", err) - return } }() <-initCh @@ -697,18 +672,12 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { } rsp, err := s.GetFlushedSegments(ctx, req) if err != nil { - retSegIDs = []typeutil.UniqueID{} - retErr = err - return + return retSegIDs, err } if rsp.Status.ErrorCode != commonpb.ErrorCode_Success { - retSegIDs = []typeutil.UniqueID{} - retErr = fmt.Errorf("get flushed segments from data coord failed, reason = %s", rsp.Status.Reason) - return + return retSegIDs, fmt.Errorf("get flushed segments from data coord failed, reason = %s", rsp.Status.Reason) } - retSegIDs = rsp.Segments - retErr = nil - return + return rsp.Segments, nil } return nil @@ -732,9 +701,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) { defer func() { if err := recover(); err != nil { - retID = 0 retErr = fmt.Errorf("build index panic, msg = %v", err) - return } }() <-initCh @@ -746,25 +713,18 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { IndexName: idxInfo.IndexName, }) if err != nil { - retID = 0 - retErr = err - return + return retID, err } if rsp.Status.ErrorCode != commonpb.ErrorCode_Success { - retID = 0 - retErr = fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason) - return + return retID, fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason) } - retID = rsp.IndexBuildID - retErr = nil - return + return rsp.IndexBuildID, nil } c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) (retErr error) { defer func() { if err := recover(); err != nil { retErr = fmt.Errorf("drop index from index service panic, msg = %v", err) - return } }() <-initCh @@ -772,15 +732,12 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { IndexID: indexID, }) if err != nil { - retErr = err - return + return err } if rsp.ErrorCode != commonpb.ErrorCode_Success { - retErr = fmt.Errorf(rsp.Reason) - return + return fmt.Errorf(rsp.Reason) } - retErr = nil - return + return nil } return nil @@ -804,7 +761,6 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error { defer func() { if err := recover(); err != nil { retErr = fmt.Errorf("release collection from query service panic, msg = %v", err) - return } }() <-initCh @@ -820,15 +776,12 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error { } rsp, err := s.ReleaseCollection(ctx, req) if err != nil { - retErr = err - return + return err } if rsp.ErrorCode != commonpb.ErrorCode_Success { - retErr = fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason) - return + return fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason) } - retErr = nil - return + return nil } c.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) (retErr error) { defer func() { @@ -850,15 +803,12 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error { } rsp, err := s.ReleasePartitions(ctx, req) if err != nil { - retErr = err - return + return err } if rsp.ErrorCode != commonpb.ErrorCode_Success { - retErr = fmt.Errorf("ReleasePartitions from query service failed, error = %s", rsp.Reason) - return + return fmt.Errorf("ReleasePartitions from query service failed, error = %s", rsp.Reason) } - retErr = nil - return + return nil } return nil } @@ -1012,11 +962,13 @@ func (c *Core) Init() error { return initError } -func (c *Core) reSendDdMsg(ctx context.Context) error { - flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0) - if err != nil || flag == "true" { - log.Debug("No un-successful DdMsg") - return nil +func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { + if !force { + flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0) + if err != nil || flag == "true" { + log.Debug("No un-successful DdMsg") + return nil + } } ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix, 0) @@ -1029,6 +981,10 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { return err } + var invalidateCache bool + var ts typeutil.Timestamp + var dbName, collName string + switch ddOp.Type { case CreateCollectionDDType: var ddReq = internalpb.CreateCollectionRequest{} @@ -1042,11 +998,14 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { if err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { return err } + invalidateCache = false case DropCollectionDDType: var ddReq = internalpb.DropCollectionRequest{} if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { return err } + ts = ddReq.Base.Timestamp + dbName, collName = ddReq.DbName, ddReq.CollectionName collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) if err != nil { return err @@ -1054,66 +1013,59 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { return err } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ddReq.Base.Timestamp, - SourceID: c.session.ServerID, - }, - DbName: ddReq.DbName, - CollectionName: ddReq.CollectionName, - } - c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) - + invalidateCache = true case CreatePartitionDDType: var ddReq = internalpb.CreatePartitionRequest{} if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { return err } + ts = ddReq.Base.Timestamp + dbName, collName = ddReq.DbName, ddReq.CollectionName collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) if err != nil { return err } + if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil { + return fmt.Errorf("partition %s already created", ddReq.PartitionName) + } if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { return err } - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO, msg type - MsgID: 0, //TODO, msg id - Timestamp: ddReq.Base.Timestamp, - SourceID: c.session.ServerID, - }, - DbName: ddReq.DbName, - CollectionName: ddReq.CollectionName, - } - c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) + invalidateCache = true case DropPartitionDDType: var ddReq = internalpb.DropPartitionRequest{} if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { return err } + ts = ddReq.Base.Timestamp + dbName, collName = ddReq.DbName, ddReq.CollectionName collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) if err != nil { return err } + if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil { + return err + } if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { return err } + invalidateCache = true + default: + return fmt.Errorf("Invalid DdOperation %s", ddOp.Type) + } + + if invalidateCache { req := proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO, msg type MsgID: 0, //TODO, msg id - Timestamp: ddReq.Base.Timestamp, + Timestamp: ts, SourceID: c.session.ServerID, }, - DbName: ddReq.DbName, - CollectionName: ddReq.CollectionName, + DbName: dbName, + CollectionName: collName, } c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) - default: - return fmt.Errorf("Invalid DdOperation %s", ddOp.Type) } // Update DDOperation in etcd @@ -1134,7 +1086,7 @@ func (c *Core) Start() error { log.Debug("RootCoord Start WatchProxy failed", zap.Error(err)) return } - if err := c.reSendDdMsg(c.ctx); err != nil { + if err := c.reSendDdMsg(c.ctx, false); err != nil { log.Debug("RootCoord Start reSendDdMsg failed", zap.Error(err)) return } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 0616c3f464..ecf35a9529 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -68,6 +68,13 @@ func (p *proxyMock) GetCollArray() []string { return ret } +func (p *proxyMock) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, nil +} + type dataMock struct { types.DataCoord randVal int @@ -272,6 +279,7 @@ func TestRootCoord(t *testing.T) { Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + Params.DmlChannelName = fmt.Sprintf("dml-%d", randVal) err = core.Register() assert.Nil(t, err) @@ -418,7 +426,6 @@ func TestRootCoord(t *testing.T) { dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) dmlStream.Start() - // get CreateCollectionMsg // get CreateCollectionMsg msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) assert.Equal(t, 1, len(msgs)) @@ -502,6 +509,9 @@ func TestRootCoord(t *testing.T) { status, err = core.CreateCollection(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + + err = core.reSendDdMsg(core.ctx, true) + assert.Nil(t, err) }) t.Run("has collection", func(t *testing.T) { @@ -641,6 +651,9 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID) + + err = core.reSendDdMsg(core.ctx, true) + assert.NotNil(t, err) }) t.Run("has partition", func(t *testing.T) { @@ -973,6 +986,25 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) assert.Equal(t, dropPartID, ddReq.PartitionID) + + err = core.reSendDdMsg(core.ctx, true) + assert.NotNil(t, err) + }) + + t.Run("remove DQL msgstream", func(t *testing.T) { + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) + assert.Nil(t, err) + + req := &proxypb.ReleaseDQLMessageStreamRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_RemoveQueryChannels, + SourceID: core.session.ServerID, + }, + CollectionID: collMeta.ID, + } + status, err := core.ReleaseDQLMessageStream(core.ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) }) t.Run("drop collection", func(t *testing.T) { @@ -1043,6 +1075,9 @@ func TestRootCoord(t *testing.T) { err = proto.UnmarshalText(ddOp.Body, &ddReq) assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) + + err = core.reSendDdMsg(core.ctx, true) + assert.NotNil(t, err) }) t.Run("context_cancel", func(t *testing.T) { @@ -1417,7 +1452,6 @@ func TestRootCoord(t *testing.T) { p1 := sessionutil.Session{ ServerID: 100, } - p2 := sessionutil.Session{ ServerID: 101, } @@ -1428,9 +1462,11 @@ func TestRootCoord(t *testing.T) { s2, err := json.Marshal(&p2) assert.Nil(t, err) - _, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyRole)+"-1", string(s1)) + proxy1 := path.Join(sessKey, typeutil.ProxyRole) + "-1" + proxy2 := path.Join(sessKey, typeutil.ProxyRole) + "-2" + _, err = core.etcdCli.Put(ctx2, proxy1, string(s1)) assert.Nil(t, err) - _, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyRole)+"-2", string(s2)) + _, err = core.etcdCli.Put(ctx2, proxy2, string(s2)) assert.Nil(t, err) time.Sleep(100 * time.Millisecond) @@ -1481,6 +1517,11 @@ func TestRootCoord(t *testing.T) { // add 3 proxy channels assert.Equal(t, 3, core.chanTimeTick.GetChanNum()-numChan) + + _, err = core.etcdCli.Delete(ctx2, proxy1) + assert.Nil(t, err) + _, err = core.etcdCli.Delete(ctx2, proxy2) + assert.Nil(t, err) }) t.Run("get metrics", func(t *testing.T) {