mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix a crash bug for backup/restore (#24529)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
parent
0d0e97ab7f
commit
5358da2a10
@ -619,6 +619,19 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
|
||||
return shardList, nil
|
||||
}
|
||||
|
||||
// Sometimes the fieldID doesn't exist in the memoryData in the following case:
|
||||
// Use an old backup tool(v0.2.2) to backup a collection of milvus v2.2.9, use a new backup tool to restore the collection
|
||||
func (p *BinlogAdapter) verifyField(fieldID storage.FieldID, memoryData []map[storage.FieldID]storage.FieldData) error {
|
||||
for _, fields := range memoryData {
|
||||
_, ok := fields[fieldID]
|
||||
if !ok {
|
||||
log.Error("Binlog adapter: the field ID doesn't exist in collection schema", zap.Int64("fieldID", fieldID))
|
||||
return fmt.Errorf("the field ID %d doesn't exist in collection schema", fieldID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readInsertlog method reads an insert log, and split the data into different shards according to a shard list
|
||||
// The shardList is a list to tell which row belong to which shard, returned by getShardingListByPrimaryXXX()
|
||||
// For deleted rows, we say its shard id is -1.
|
||||
@ -630,6 +643,12 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
|
||||
// Note: the row count of insert log need to be equal to length of shardList
|
||||
func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string,
|
||||
memoryData []map[storage.FieldID]storage.FieldData, shardList []int32) error {
|
||||
err := p.verifyField(fieldID, memoryData)
|
||||
if err != nil {
|
||||
log.Error("Binlog adapter: could not read binlog file", zap.String("logPath", logPath), zap.Error(err))
|
||||
return fmt.Errorf("could not read binlog file %s, error: %w", logPath, err)
|
||||
}
|
||||
|
||||
// open the insert log file
|
||||
binlogFile, err := NewBinlogFile(p.chunkManager)
|
||||
if err != nil {
|
||||
|
||||
@ -1033,6 +1033,29 @@ func Test_BinlogAdapterDispatch(t *testing.T) {
|
||||
assert.Equal(t, 0, segmentsData[2][fieldID].RowNum())
|
||||
}
|
||||
|
||||
func Test_BinlogAdapterVerifyField(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
segmentsData := make([]map[storage.FieldID]storage.FieldData, 0, 1)
|
||||
segmentData := initSegmentData(sampleSchema())
|
||||
segmentsData = append(segmentsData, segmentData)
|
||||
|
||||
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
|
||||
return nil
|
||||
}
|
||||
adapter, err := NewBinlogAdapter(ctx, sampleSchema(), 2, 1024, 2048, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
|
||||
assert.NotNil(t, adapter)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = adapter.verifyField(103, segmentsData)
|
||||
assert.NoError(t, err)
|
||||
err = adapter.verifyField(999999, segmentsData)
|
||||
assert.Error(t, err)
|
||||
|
||||
err = adapter.readInsertlog(999999, "dummy", segmentsData, []int32{1})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func Test_BinlogAdapterReadInsertlog(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user