Zhen Ye 8ff118a9ff
fix: call IntoMessageProto instead of Payload when rpc (#43678)
issue: #43677

Signed-off-by: chyezh <chyezh@outlook.com>
2025-08-06 14:45:40 +08:00

91 lines
1.8 KiB
Go

//go:build test
// +build test
package walimplstest
import (
"context"
"encoding/json"
"sync"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var logs = typeutil.NewConcurrentMap[string, *messageLog]()
func getOrCreateLogs(name string) *messageLog {
l := newMessageLog()
l, _ = logs.GetOrInsert(name, l)
return l
}
func newMessageLog() *messageLog {
return &messageLog{
cond: syncutil.NewContextCond(&sync.Mutex{}),
id: 0,
logs: make([][]byte, 0),
}
}
type messageLog struct {
cond *syncutil.ContextCond
id int64
logs [][]byte
}
type entry struct {
ID int64
Payload []byte
Properties map[string]string
}
func (l *messageLog) Append(_ context.Context, msg message.MutableMessage) (message.MessageID, error) {
l.cond.LockAndBroadcast()
defer l.cond.L.Unlock()
id := l.id
pb := msg.IntoMessageProto()
newEntry := entry{
ID: id,
Payload: pb.Payload,
Properties: pb.Properties,
}
data, err := json.Marshal(newEntry)
if err != nil {
return nil, err
}
l.id++
l.logs = append(l.logs, data)
return NewTestMessageID(id), nil
}
func (l *messageLog) ReadAt(ctx context.Context, idx int) (message.ImmutableMessage, error) {
l.cond.L.Lock()
for idx >= len(l.logs) {
if err := l.cond.Wait(ctx); err != nil {
return nil, err
}
}
defer l.cond.L.Unlock()
data := l.logs[idx]
var newEntry entry
if err := json.Unmarshal(data, &newEntry); err != nil {
return nil, err
}
return message.NewImmutableMesasge(
NewTestMessageID(newEntry.ID),
newEntry.Payload,
newEntry.Properties,
), nil
}
func (l *messageLog) Len() int64 {
l.cond.L.Lock()
defer l.cond.L.Unlock()
return int64(len(l.logs))
}