diff --git a/internal/core/src/storage/PayloadReader.cpp b/internal/core/src/storage/PayloadReader.cpp index f468bd343d..b7fe5117ed 100644 --- a/internal/core/src/storage/PayloadReader.cpp +++ b/internal/core/src/storage/PayloadReader.cpp @@ -29,7 +29,7 @@ PayloadReader::PayloadReader(const uint8_t* data, int length, DataType data_type, bool nullable) - : column_type_(data_type), nullable(nullable) { + : column_type_(data_type), nullable_(nullable) { auto input = std::make_shared(data, length); init(input); } @@ -73,7 +73,8 @@ PayloadReader::init(std::shared_ptr input) { st = arrow_reader->GetRecordBatchReader(&rb_reader); AssertInfo(st.ok(), "get record batch reader"); - field_data_ = CreateFieldData(column_type_, nullable, dim_, total_num_rows); + field_data_ = + CreateFieldData(column_type_, nullable_, dim_, total_num_rows); for (arrow::Result> maybe_batch : *rb_reader) { AssertInfo(maybe_batch.ok(), "get batch record success"); diff --git a/internal/core/src/storage/PayloadReader.h b/internal/core/src/storage/PayloadReader.h index 39aa6420fd..1e75dcd8cb 100644 --- a/internal/core/src/storage/PayloadReader.h +++ b/internal/core/src/storage/PayloadReader.h @@ -29,7 +29,7 @@ class PayloadReader { explicit PayloadReader(const uint8_t* data, int length, DataType data_type, - bool nullable); + bool nullable_); ~PayloadReader() = default; @@ -44,7 +44,7 @@ class PayloadReader { private: DataType column_type_; int dim_; - bool nullable; + bool nullable_; FieldDataPtr field_data_; }; diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index c812cda026..a53196d1a7 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -319,6 +319,7 @@ func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error { de.PayloadDataType = bsw.fieldSchema.DataType de.FieldID = bsw.fieldSchema.FieldID de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize)) + de.descriptorEventData.AddExtra(nullableKey, bsw.fieldSchema.Nullable) if err := de.Write(w); err != nil { return err } diff --git a/tests/integration/null_data/null_data_test.go b/tests/integration/null_data/null_data_test.go index afaf7f355e..5a34341fa6 100644 --- a/tests/integration/null_data/null_data_test.go +++ b/tests/integration/null_data/null_data_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -175,6 +176,32 @@ func (s *NullDataSuite) run() { s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName) + desCollResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + compactResp, err := c.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{ + CollectionID: desCollResp.GetCollectionID(), + }) + + s.NoError(err) + s.Equal(compactResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + compacted := func() bool { + resp, err := c.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ + CompactionID: compactResp.GetCompactionID(), + }) + if err != nil { + return false + } + return resp.GetState() == commonpb.CompactionState_Completed + } + for !compacted() { + time.Sleep(3 * time.Second) + } + // load loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName,