mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: Set current partition stats version to 0 by default when not present (#37299)
issue: #37156 1. Still need to record the current stats version. 2. Set it to 0 when the current stats version is not found. Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
b8492498ac
commit
4d98833bc3
@ -485,6 +485,12 @@ func (t *clusteringCompactionTask) completeTask() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(),
|
||||||
|
t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())
|
||||||
|
if err != nil {
|
||||||
|
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
|
||||||
|
}
|
||||||
|
|
||||||
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -121,8 +121,6 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat
|
|||||||
}
|
}
|
||||||
|
|
||||||
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info
|
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info
|
||||||
// after v2.5.0, the current version will be updated when updating the partition stats info, so there’s no need to split it into two steps
|
|
||||||
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].currentVersion = info.Version
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -42,6 +42,7 @@ func (s *PartitionStatsMetaSuite) SetupTest() {
|
|||||||
catalog := mocks.NewDataCoordCatalog(s.T())
|
catalog := mocks.NewDataCoordCatalog(s.T())
|
||||||
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
|
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||||
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
|
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
|
||||||
|
catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||||
s.catalog = catalog
|
s.catalog = catalog
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,6 +75,9 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
|
|||||||
ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100)
|
ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100)
|
||||||
s.NotNil(ps)
|
s.NotNil(ps)
|
||||||
|
|
||||||
|
err = partitionStatsMeta.SaveCurrentPartitionStatsVersion(1, 2, "ch-1", 100)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
|
currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
|
||||||
s.Equal(int64(100), currentVersion)
|
s.Equal(int64(100), currentVersion)
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@ -43,6 +44,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util"
|
"github.com/milvus-io/milvus/pkg/util"
|
||||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -930,6 +932,9 @@ func (kc *Catalog) GetCurrentPartitionStatsVersion(ctx context.Context, collID,
|
|||||||
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
|
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
|
||||||
valueStr, err := kc.MetaKv.Load(key)
|
valueStr, err := kc.MetaKv.Load(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, merr.ErrIoKeyNotFound) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user