From ad36347fb3de0718eee009b2f3bacab3137c84b5 Mon Sep 17 00:00:00 2001 From: SimFG Date: Sat, 22 Feb 2025 12:29:53 +0800 Subject: [PATCH] fix: add BeginTimestamp and EndTimestamp to insert and upsert messages (#40110) - issue: #40109 - caused by: #38656 Signed-off-by: SimFG --- internal/proxy/task_insert.go | 2 ++ internal/proxy/task_upsert.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index cf400b7d06..1fcfc77460 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -287,6 +287,8 @@ func (it *insertTask) Execute(ctx context.Context) error { return err } it.insertMsg.CollectionID = collID + it.insertMsg.BeginTimestamp = it.BeginTs() + it.insertMsg.EndTimestamp = it.EndTs() getCacheDur := tr.RecordSpan() stream, err := it.chMgr.getOrCreateDmlStream(ctx, collID) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index a23a35a1a3..76670dcae5 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -436,6 +436,8 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP return err } it.upsertMsg.InsertMsg.CollectionID = collID + it.upsertMsg.InsertMsg.BeginTimestamp = it.BeginTs() + it.upsertMsg.InsertMsg.EndTimestamp = it.EndTs() log := log.Ctx(ctx).With( zap.Int64("collectionID", collID)) getCacheDur := tr.RecordSpan()