mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Avoid Sync when the segment open (#21397)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
0bcedbd2bf
commit
3667bb09b8
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -214,7 +215,7 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
|
|||||||
historyInsertBuf: make([]*BufferData, 0),
|
historyInsertBuf: make([]*BufferData, 0),
|
||||||
historyDeleteBuf: make([]*DelDataBuf, 0),
|
historyDeleteBuf: make([]*DelDataBuf, 0),
|
||||||
startPos: req.startPos,
|
startPos: req.startPos,
|
||||||
lastSyncTs: req.recoverTs,
|
lastSyncTs: tsoutil.GetCurrentTime(),
|
||||||
}
|
}
|
||||||
seg.setType(req.segType)
|
seg.setType(req.segType)
|
||||||
// Set up pk stats
|
// Set up pk stats
|
||||||
|
|||||||
@ -84,7 +84,7 @@ func (p *proxyManager) WatchProxy() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev))
|
log.Info("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev))
|
||||||
// all init function should be clear meta firstly.
|
// all init function should be clear meta firstly.
|
||||||
for _, f := range p.initSessionsFunc {
|
for _, f := range p.initSessionsFunc {
|
||||||
f(sessions)
|
f(sessions)
|
||||||
@ -103,7 +103,7 @@ func (p *proxyManager) WatchProxy() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.WatchChan) {
|
func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.WatchChan) {
|
||||||
log.Debug("start to watch etcd")
|
log.Info("start to watch etcd")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -194,7 +194,7 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se
|
|||||||
for _, v := range resp.Kvs {
|
for _, v := range resp.Kvs {
|
||||||
session, err := p.parseSession(v.Value)
|
session, err := p.parseSession(v.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("failed to unmarshal session", zap.Error(err))
|
log.Warn("failed to unmarshal session", zap.Error(err))
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
sessions = append(sessions, session)
|
sessions = append(sessions, session)
|
||||||
|
|||||||
@ -120,7 +120,7 @@ func TestProxyManager_ErrCompacted(t *testing.T) {
|
|||||||
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer etcdCli.Close()
|
defer etcdCli.Close()
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
sessKey := path.Join(Params.EtcdCfg.MetaRootPath.GetValue(), sessionutil.DefaultServiceRoot)
|
sessKey := path.Join(Params.EtcdCfg.MetaRootPath.GetValue(), sessionutil.DefaultServiceRoot)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user