diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index d09723ec35..08b7a29ec6 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -24,10 +24,12 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" @@ -254,6 +256,29 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique } for _, field := range insertCodec.Schema.Schema.Fields { + // check insert data contain this field + // must be all missing or all exists + allExists := true + allMissing := false + for _, block := range data { + _, ok := block.Data[field.FieldID] + allExists = allExists && ok + allMissing = allMissing || !ok + } + + // found missing block + if !allExists { + if !field.GetNullable() { + return nil, errors.Newf("field %d(%s) missing and field not nullable", field.GetFieldID(), field.GetName()) + } + // segment must be in same schema + if !allMissing { + return nil, errors.Newf("segment must not be heterogeneous, all blocks must contain all fields or none, abnormal field %d(%s)", field.GetFieldID(), field.GetName()) + } + log.Info("Skip field nullable missing field, could be schema change", zap.Int64("fieldId", field.GetFieldID()), zap.String("fieldName", field.GetName())) + continue + } + // encode fields writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID, field.GetNullable())