diff --git a/internal/util/importutil/binlog_adapter.go b/internal/util/importutil/binlog_adapter.go index b4661626c0..286e6fb4c5 100644 --- a/internal/util/importutil/binlog_adapter.go +++ b/internal/util/importutil/binlog_adapter.go @@ -619,6 +619,19 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string, return shardList, nil } +// Sometimes the fieldID doesn't exist in the memoryData in the following case: +// Use an old backup tool(v0.2.2) to backup a collection of milvus v2.2.9, use a new backup tool to restore the collection +func (p *BinlogAdapter) verifyField(fieldID storage.FieldID, memoryData []map[storage.FieldID]storage.FieldData) error { + for _, fields := range memoryData { + _, ok := fields[fieldID] + if !ok { + log.Error("Binlog adapter: the field ID doesn't exist in collection schema", zap.Int64("fieldID", fieldID)) + return fmt.Errorf("the field ID %d doesn't exist in collection schema", fieldID) + } + } + return nil +} + // readInsertlog method reads an insert log, and split the data into different shards according to a shard list // The shardList is a list to tell which row belong to which shard, returned by getShardingListByPrimaryXXX() // For deleted rows, we say its shard id is -1. @@ -630,6 +643,12 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string, // Note: the row count of insert log need to be equal to length of shardList func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string, memoryData []map[storage.FieldID]storage.FieldData, shardList []int32) error { + err := p.verifyField(fieldID, memoryData) + if err != nil { + log.Error("Binlog adapter: could not read binlog file", zap.String("logPath", logPath), zap.Error(err)) + return fmt.Errorf("could not read binlog file %s, error: %w", logPath, err) + } + // open the insert log file binlogFile, err := NewBinlogFile(p.chunkManager) if err != nil { diff --git a/internal/util/importutil/binlog_adapter_test.go b/internal/util/importutil/binlog_adapter_test.go index a4486b85e4..abb481a6e3 100644 --- a/internal/util/importutil/binlog_adapter_test.go +++ b/internal/util/importutil/binlog_adapter_test.go @@ -1033,6 +1033,29 @@ func Test_BinlogAdapterDispatch(t *testing.T) { assert.Equal(t, 0, segmentsData[2][fieldID].RowNum()) } +func Test_BinlogAdapterVerifyField(t *testing.T) { + ctx := context.Background() + + segmentsData := make([]map[storage.FieldID]storage.FieldData, 0, 1) + segmentData := initSegmentData(sampleSchema()) + segmentsData = append(segmentsData, segmentData) + + flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error { + return nil + } + adapter, err := NewBinlogAdapter(ctx, sampleSchema(), 2, 1024, 2048, &MockChunkManager{}, flushFunc, 0, math.MaxUint64) + assert.NotNil(t, adapter) + assert.Nil(t, err) + + err = adapter.verifyField(103, segmentsData) + assert.NoError(t, err) + err = adapter.verifyField(999999, segmentsData) + assert.Error(t, err) + + err = adapter.readInsertlog(999999, "dummy", segmentsData, []int32{1}) + assert.Error(t, err) +} + func Test_BinlogAdapterReadInsertlog(t *testing.T) { ctx := context.Background()