mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add binlog unittest
Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
parent
16071d92b5
commit
9cbebc0221
@ -46,7 +46,7 @@ func (reader *BinlogReader) readMagicNumber() (int32, error) {
|
||||
}
|
||||
reader.currentOffset = 4
|
||||
if reader.magicNumber != MagicNumber {
|
||||
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(MagicNumber) +
|
||||
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(int(MagicNumber)) +
|
||||
", actual: " + strconv.Itoa(int(reader.magicNumber)))
|
||||
}
|
||||
|
||||
|
||||
@ -14,7 +14,6 @@ const (
|
||||
BinlogVersion = 1
|
||||
CommitID = 1
|
||||
ServerVersion = 1
|
||||
HeaderLength = 17
|
||||
)
|
||||
|
||||
type BinlogType int32
|
||||
@ -25,7 +24,7 @@ const (
|
||||
DDLBinlog
|
||||
)
|
||||
const (
|
||||
MagicNumber = 0xfffabc
|
||||
MagicNumber int32 = 0xfffabc
|
||||
)
|
||||
|
||||
type baseBinlogWriter struct {
|
||||
|
||||
@ -243,7 +243,6 @@ func newDescriptorEventData() (*descriptorEventData, error) {
|
||||
}
|
||||
data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size))
|
||||
}
|
||||
data.HeaderLength = int8(data.GetMemoryUsageInBytes())
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
|
||||
@ -54,8 +54,6 @@ func newDescriptorEventHeader() (*descriptorEventHeader, error) {
|
||||
TypeCode: DescriptorEventType,
|
||||
ServerID: ServerID,
|
||||
}
|
||||
header.EventLength = header.GetMemoryUsageInBytes()
|
||||
header.NextPosition = header.EventLength + 4
|
||||
return &header, nil
|
||||
}
|
||||
|
||||
|
||||
@ -33,7 +33,118 @@ func checkEventHeader(
|
||||
assert.Equal(t, npos, length)
|
||||
}
|
||||
|
||||
func TestEventWriterAndReader(t *testing.T) {
|
||||
func TestDescriptorEvent(t *testing.T) {
|
||||
desc, err := newDescriptorEvent()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
err = desc.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
buffer := buf.Bytes()
|
||||
|
||||
ts := UnsafeReadInt64(buffer, 0)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
assert.GreaterOrEqual(t, curts, ts)
|
||||
|
||||
utc := UnsafeReadInt8(buffer, int(unsafe.Sizeof(ts)))
|
||||
assert.Equal(t, EventTypeCode(utc), DescriptorEventType)
|
||||
usid := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)))
|
||||
assert.Equal(t, usid, int32(ServerID))
|
||||
elen := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid)))
|
||||
assert.Equal(t, elen, int32(len(buffer)))
|
||||
npos := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid)+unsafe.Sizeof(elen)))
|
||||
assert.GreaterOrEqual(t, npos, int32(binary.Size(MagicNumber)+len(buffer)))
|
||||
t.Logf("next position = %d", npos)
|
||||
|
||||
binVersion := UnsafeReadInt16(buffer, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, binVersion, int16(BinlogVersion))
|
||||
svrVersion := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion)))
|
||||
assert.Equal(t, svrVersion, int64(ServerVersion))
|
||||
cmitID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion))+int(unsafe.Sizeof(svrVersion)))
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
headLen := UnsafeReadInt8(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID)))
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
t.Logf("head len = %d", headLen)
|
||||
collID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen)))
|
||||
assert.Equal(t, collID, int64(-1))
|
||||
partID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID)))
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
segID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID)))
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID)))
|
||||
assert.Equal(t, startTs, int64(0))
|
||||
endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(startTs)))
|
||||
assert.Equal(t, endTs, int64(0))
|
||||
colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(cmitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(startTs))+
|
||||
int(unsafe.Sizeof(endTs)))
|
||||
assert.Equal(t, colType, int32(-1))
|
||||
|
||||
postHeadOffset := binary.Size(eventHeader{}) +
|
||||
int(unsafe.Sizeof(binVersion)) +
|
||||
int(unsafe.Sizeof(svrVersion)) +
|
||||
int(unsafe.Sizeof(cmitID)) +
|
||||
int(unsafe.Sizeof(headLen)) +
|
||||
int(unsafe.Sizeof(collID)) +
|
||||
int(unsafe.Sizeof(partID)) +
|
||||
int(unsafe.Sizeof(segID)) +
|
||||
int(unsafe.Sizeof(startTs)) +
|
||||
int(unsafe.Sizeof(endTs)) +
|
||||
int(unsafe.Sizeof(colType))
|
||||
|
||||
postHeadArray := buffer[postHeadOffset:]
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
hen := postHeadArray[i-DescriptorEventType]
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, hen, uint8(size))
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertEvent(t *testing.T) {
|
||||
insertT := func(t *testing.T,
|
||||
dt schemapb.DataType,
|
||||
ir1 func(w *insertEventWriter) error,
|
||||
@ -281,3 +392,745 @@ func TestEventWriterAndReader(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteEvent(t *testing.T) {
|
||||
deleteT := func(t *testing.T,
|
||||
dt schemapb.DataType,
|
||||
ir1 func(w *deleteEventWriter) error,
|
||||
ir2 func(w *deleteEventWriter) error,
|
||||
iw func(w *deleteEventWriter) error,
|
||||
ev interface{},
|
||||
) {
|
||||
w, err := newDeleteEventWriter(dt, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = ir1(w)
|
||||
assert.Nil(t, err)
|
||||
err = iw(w)
|
||||
assert.NotNil(t, err)
|
||||
err = ir2(w)
|
||||
assert.Nil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DeleteEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(dt, pBuf)
|
||||
assert.Nil(t, err)
|
||||
vals, _, err := pR.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, vals, ev)
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(dt, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
payload, _, err := r.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, payload, ev)
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
t.Run("delete_bool", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_BOOL,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]bool{true, false, true})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]bool{false, true, false})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]bool{true, false, true, false, true, false})
|
||||
})
|
||||
|
||||
t.Run("delete_int8", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_INT8,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int8{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int8{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]int8{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_int16", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_INT16,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int16{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int16{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]int16{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_int32", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_INT32,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int32{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int32{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]int32{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_int64", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_INT64,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int64{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int64{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]int64{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_float32", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_FLOAT,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float32{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float32{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]float32{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_float64", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_DOUBLE,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float64{1, 2, 3})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float64{4, 5, 6})
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5})
|
||||
},
|
||||
[]float64{1, 2, 3, 4, 5, 6})
|
||||
})
|
||||
|
||||
t.Run("delete_binary_vector", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_VECTOR_BINARY,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]byte{1, 2, 3, 4}, 16)
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]byte{5, 6, 7, 8}, 16)
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5, 6}, 16)
|
||||
},
|
||||
[]byte{1, 2, 3, 4, 5, 6, 7, 8})
|
||||
})
|
||||
|
||||
t.Run("delete_float_vector", func(t *testing.T) {
|
||||
deleteT(t, schemapb.DataType_VECTOR_FLOAT,
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float32{1, 2, 3, 4}, 2)
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]float32{5, 6, 7, 8}, 2)
|
||||
},
|
||||
func(w *deleteEventWriter) error {
|
||||
return w.AddDataToPayload([]int{1, 2, 3, 4, 5, 6}, 2)
|
||||
},
|
||||
[]float32{1, 2, 3, 4, 5, 6, 7, 8})
|
||||
})
|
||||
|
||||
t.Run("delete_string", func(t *testing.T) {
|
||||
w, err := newDeleteEventWriter(schemapb.DataType_STRING, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload("1234")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("567890")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("abcdefg")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{1, 2, 3})
|
||||
assert.NotNil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DeleteEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_STRING, pBuf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err := pR.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err := pR.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err := pR.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_STRING, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err = r.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err = r.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err = r.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCreateCollectionEvent(t *testing.T) {
|
||||
t.Run("create_event", func(t *testing.T) {
|
||||
w, err := newCreateCollectionEventWriter(schemapb.DataType_FLOAT, 0)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, w)
|
||||
})
|
||||
|
||||
t.Run("create_collection_timestamp", func(t *testing.T) {
|
||||
w, err := newCreateCollectionEventWriter(schemapb.DataType_INT64, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = w.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreateCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(createCollectionEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf)
|
||||
assert.Nil(t, err)
|
||||
vals, _, err := pR.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_INT64, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
payload, _, err := r.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, payload, []int64{1, 2, 3, 4, 5, 6})
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("create_collection_string", func(t *testing.T) {
|
||||
w, err := newCreateCollectionEventWriter(schemapb.DataType_STRING, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload("1234")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("567890")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("abcdefg")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{1, 2, 3})
|
||||
assert.NotNil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreateCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_STRING, pBuf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err := pR.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err := pR.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err := pR.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_STRING, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err = r.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err = r.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err = r.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDropCollectionEvent(t *testing.T) {
|
||||
t.Run("drop_event", func(t *testing.T) {
|
||||
w, err := newDropCollectionEventWriter(schemapb.DataType_FLOAT, 0)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, w)
|
||||
})
|
||||
|
||||
t.Run("drop_collection_timestamp", func(t *testing.T) {
|
||||
w, err := newDropCollectionEventWriter(schemapb.DataType_INT64, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = w.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(createCollectionEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf)
|
||||
assert.Nil(t, err)
|
||||
vals, _, err := pR.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_INT64, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
payload, _, err := r.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, payload, []int64{1, 2, 3, 4, 5, 6})
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("drop_collection_string", func(t *testing.T) {
|
||||
w, err := newDropCollectionEventWriter(schemapb.DataType_STRING, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload("1234")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("567890")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("abcdefg")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{1, 2, 3})
|
||||
assert.NotNil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_STRING, pBuf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err := pR.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err := pR.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err := pR.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_STRING, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err = r.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err = r.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err = r.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCreatePartitionEvent(t *testing.T) {
|
||||
t.Run("create_event", func(t *testing.T) {
|
||||
w, err := newCreatePartitionEventWriter(schemapb.DataType_FLOAT, 0)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, w)
|
||||
})
|
||||
|
||||
t.Run("create_partition_timestamp", func(t *testing.T) {
|
||||
w, err := newCreatePartitionEventWriter(schemapb.DataType_INT64, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = w.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreatePartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(createCollectionEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf)
|
||||
assert.Nil(t, err)
|
||||
vals, _, err := pR.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_INT64, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
payload, _, err := r.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, payload, []int64{1, 2, 3, 4, 5, 6})
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("create_partition_string", func(t *testing.T) {
|
||||
w, err := newCreatePartitionEventWriter(schemapb.DataType_STRING, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload("1234")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("567890")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("abcdefg")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{1, 2, 3})
|
||||
assert.NotNil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreatePartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_STRING, pBuf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err := pR.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err := pR.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err := pR.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_STRING, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err = r.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err = r.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err = r.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDropPartitionEvent(t *testing.T) {
|
||||
t.Run("drop_event", func(t *testing.T) {
|
||||
w, err := newDropPartitionEventWriter(schemapb.DataType_FLOAT, 0)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, w)
|
||||
})
|
||||
|
||||
t.Run("drop_partition_timestamp", func(t *testing.T) {
|
||||
w, err := newDropPartitionEventWriter(schemapb.DataType_INT64, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = w.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropPartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(createCollectionEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf)
|
||||
assert.Nil(t, err)
|
||||
vals, _, err := pR.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_INT64, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
payload, _, err := r.GetDataFromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, payload, []int64{1, 2, 3, 4, 5, 6})
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("drop_partition_string", func(t *testing.T) {
|
||||
w, err := newDropPartitionEventWriter(schemapb.DataType_STRING, 0)
|
||||
assert.Nil(t, err)
|
||||
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0))
|
||||
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
|
||||
err = w.AddDataToPayload("1234")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("567890")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddOneStringToPayload("abcdefg")
|
||||
assert.Nil(t, err)
|
||||
err = w.AddDataToPayload([]int{1, 2, 3})
|
||||
assert.NotNil(t, err)
|
||||
err = w.Finish()
|
||||
assert.Nil(t, err)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropPartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
|
||||
|
||||
payloadOffset := binary.Size(eventHeader{}) + binary.Size(insertEventData{})
|
||||
pBuf := wBuf[payloadOffset:]
|
||||
pR, err := NewPayloadReader(schemapb.DataType_STRING, pBuf)
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err := pR.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err := pR.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err := pR.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = pR.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := newEventReader(schemapb.DataType_STRING, bytes.NewBuffer(wBuf))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0, err = r.GetOneStringFromPayload(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s0, "1234")
|
||||
|
||||
s1, err = r.GetOneStringFromPayload(1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s1, "567890")
|
||||
|
||||
s2, err = r.GetOneStringFromPayload(2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s2, "abcdefg")
|
||||
|
||||
err = r.Close()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -179,6 +180,9 @@ func newDescriptorEvent() (*descriptorEvent, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header.EventLength = header.GetMemoryUsageInBytes() + data.GetMemoryUsageInBytes()
|
||||
header.NextPosition = int32(binary.Size(MagicNumber)) + header.EventLength
|
||||
data.HeaderLength = int8(binary.Size(eventHeader{}))
|
||||
return &descriptorEvent{
|
||||
descriptorEventHeader: *header,
|
||||
descriptorEventData: *data,
|
||||
@ -242,6 +246,10 @@ func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEven
|
||||
return writer, nil
|
||||
}
|
||||
func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*createCollectionEventWriter, error) {
|
||||
if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 {
|
||||
return nil, errors.New("incorrect data type")
|
||||
}
|
||||
|
||||
payloadWriter, err := NewPayloadWriter(dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -270,6 +278,10 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*
|
||||
return writer, nil
|
||||
}
|
||||
func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dropCollectionEventWriter, error) {
|
||||
if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 {
|
||||
return nil, errors.New("incorrect data type")
|
||||
}
|
||||
|
||||
payloadWriter, err := NewPayloadWriter(dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -297,6 +309,10 @@ func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dr
|
||||
return writer, nil
|
||||
}
|
||||
func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*createPartitionEventWriter, error) {
|
||||
if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 {
|
||||
return nil, errors.New("incorrect data type")
|
||||
}
|
||||
|
||||
payloadWriter, err := NewPayloadWriter(dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -325,6 +341,10 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*c
|
||||
return writer, nil
|
||||
}
|
||||
func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dropPartitionEventWriter, error) {
|
||||
if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 {
|
||||
return nil, errors.New("incorrect data type")
|
||||
}
|
||||
|
||||
payloadWriter, err := NewPayloadWriter(dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user