mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: not set nullable when stream writer write headers (#35799)
#35802 Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
d10aa4626f
commit
a3f2f044d6
@ -29,7 +29,7 @@ PayloadReader::PayloadReader(const uint8_t* data,
|
|||||||
int length,
|
int length,
|
||||||
DataType data_type,
|
DataType data_type,
|
||||||
bool nullable)
|
bool nullable)
|
||||||
: column_type_(data_type), nullable(nullable) {
|
: column_type_(data_type), nullable_(nullable) {
|
||||||
auto input = std::make_shared<arrow::io::BufferReader>(data, length);
|
auto input = std::make_shared<arrow::io::BufferReader>(data, length);
|
||||||
init(input);
|
init(input);
|
||||||
}
|
}
|
||||||
@ -73,7 +73,8 @@ PayloadReader::init(std::shared_ptr<arrow::io::BufferReader> input) {
|
|||||||
st = arrow_reader->GetRecordBatchReader(&rb_reader);
|
st = arrow_reader->GetRecordBatchReader(&rb_reader);
|
||||||
AssertInfo(st.ok(), "get record batch reader");
|
AssertInfo(st.ok(), "get record batch reader");
|
||||||
|
|
||||||
field_data_ = CreateFieldData(column_type_, nullable, dim_, total_num_rows);
|
field_data_ =
|
||||||
|
CreateFieldData(column_type_, nullable_, dim_, total_num_rows);
|
||||||
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch :
|
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch :
|
||||||
*rb_reader) {
|
*rb_reader) {
|
||||||
AssertInfo(maybe_batch.ok(), "get batch record success");
|
AssertInfo(maybe_batch.ok(), "get batch record success");
|
||||||
|
|||||||
@ -29,7 +29,7 @@ class PayloadReader {
|
|||||||
explicit PayloadReader(const uint8_t* data,
|
explicit PayloadReader(const uint8_t* data,
|
||||||
int length,
|
int length,
|
||||||
DataType data_type,
|
DataType data_type,
|
||||||
bool nullable);
|
bool nullable_);
|
||||||
|
|
||||||
~PayloadReader() = default;
|
~PayloadReader() = default;
|
||||||
|
|
||||||
@ -44,7 +44,7 @@ class PayloadReader {
|
|||||||
private:
|
private:
|
||||||
DataType column_type_;
|
DataType column_type_;
|
||||||
int dim_;
|
int dim_;
|
||||||
bool nullable;
|
bool nullable_;
|
||||||
FieldDataPtr field_data_;
|
FieldDataPtr field_data_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -319,6 +319,7 @@ func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error {
|
|||||||
de.PayloadDataType = bsw.fieldSchema.DataType
|
de.PayloadDataType = bsw.fieldSchema.DataType
|
||||||
de.FieldID = bsw.fieldSchema.FieldID
|
de.FieldID = bsw.fieldSchema.FieldID
|
||||||
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize))
|
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize))
|
||||||
|
de.descriptorEventData.AddExtra(nullableKey, bsw.fieldSchema.Nullable)
|
||||||
if err := de.Write(w); err != nil {
|
if err := de.Write(w); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -175,6 +176,32 @@ func (s *NullDataSuite) run() {
|
|||||||
|
|
||||||
s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName)
|
s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName)
|
||||||
|
|
||||||
|
desCollResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||||
|
CollectionName: collectionName,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
|
||||||
|
|
||||||
|
compactResp, err := c.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{
|
||||||
|
CollectionID: desCollResp.GetCollectionID(),
|
||||||
|
})
|
||||||
|
|
||||||
|
s.NoError(err)
|
||||||
|
s.Equal(compactResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
|
||||||
|
|
||||||
|
compacted := func() bool {
|
||||||
|
resp, err := c.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
|
||||||
|
CompactionID: compactResp.GetCompactionID(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return resp.GetState() == commonpb.CompactionState_Completed
|
||||||
|
}
|
||||||
|
for !compacted() {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// load
|
// load
|
||||||
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||||
DbName: dbName,
|
DbName: dbName,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user