Add segmentLoadDeletedRecord in querynode (#10449)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-10-22 18:51:14 +08:00 committed by GitHub
parent d49aeda094
commit f7c0f5b3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 1 deletions

View File

@ -84,6 +84,9 @@ PreDelete(CSegmentInterface c_segment, int64_t size);
CStatus
LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info);
CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info);
CStatus
UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info);

View File

@ -789,7 +789,7 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interfa
return nil
}
func (s *Segment) LoadDeletedRecord(primaryKeys []IntPrimaryKey) error {
func (s *Segment) segmentLoadDeletedRecord(primaryKeys []IntPrimaryKey, timestamps []Timestamp, rowCount int64) error {
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
@ -799,7 +799,26 @@ func (s *Segment) LoadDeletedRecord(primaryKeys []IntPrimaryKey) error {
errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
loadInfo := C.CLoadDeletedRecordInfo{
timestamps: unsafe.Pointer(&timestamps[0]),
primary_keys: unsafe.Pointer(&primaryKeys[0]),
row_count: C.int64_t(rowCount),
}
/*
CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info)
*/
var status = C.LoadDeletedRecord(s.segmentPtr, loadInfo)
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return errors.New("LoadDeletedRecord failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
log.Debug("load deleted record done",
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))
return nil
}

View File

@ -644,6 +644,34 @@ func TestSegment_segmentPreDelete(t *testing.T) {
deleteCollection(collection)
}
func TestSegment_segmentLoadDeletedRecord(t *testing.T) {
fieldParam := constFieldParam{
id: 100,
dataType: schemapb.DataType_Int64,
}
field := genPKField(fieldParam)
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: false,
Fields: []*schemapb.FieldSchema{
field,
},
}
seg := newSegment(newCollection(defaultCollectionID, schema),
defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
pks := []IntPrimaryKey{1, 2, 3}
timestamps := []Timestamp{10, 10, 10}
var rowCount int64 = 3
error := seg.segmentLoadDeletedRecord(pks, timestamps, rowCount)
assert.NoError(t, error)
}
func TestSegment_segmentLoadFieldData(t *testing.T) {
genSchemas := func(dataType schemapb.DataType) (*schemapb.CollectionSchema, *schemapb.CollectionSchema) {
constField := constFieldParam{