diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 7b19bf9370..248e08be9a 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -197,6 +197,11 @@ func (t *clusteringCompactionTask) init() error { return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan") } for _, field := range t.plan.Schema.Fields { + // todo(wayblink): supprot null in clustring compact + if field.GetNullable() { + return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("clustering compaction can't be trigger in field(%s) which set nullable == true", field.GetName())) + } + if field.GetIsPrimaryKey() && field.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(field.GetDataType()) { pkField = field } diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 31973712c2..8ed753410e 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -167,6 +167,22 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() { s.Equal(8, s.task.getWorkerPoolSize()) s.Equal(8, s.task.mappingPool.Cap()) s.Equal(8, s.task.flushPool.Cap()) + + s.task.plan.Schema = genCollectionSchema() + s.task.plan.Schema.Fields = append(s.task.plan.Schema.Fields, &schemapb.FieldSchema{ + FieldID: 104, + Name: "nullableFid", + DataType: schemapb.DataType_Int64, + Nullable: true, + }) + s.task.plan.ClusteringKeyField = 100 + s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + }, + } + err = s.task.init() + s.Require().Error(err) } func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {