diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go index 01ce0f3277..3b9fe9de7a 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/messageutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" ) @@ -251,10 +252,15 @@ func (impl *shardInterceptor) handleSchemaChange(ctx context.Context, msg messag func (impl *shardInterceptor) handleAlterCollection(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) { putCollectionMsg := message.MustAsMutableAlterCollectionMessageV2(msg) header := putCollectionMsg.Header() - segmentIDs, err := impl.shardManager.FlushAndFenceSegmentAllocUntil(header.GetCollectionId(), msg.TimeTick()) - if err != nil { - return nil, status.NewUnrecoverableError(err.Error()) + var segmentIDs []int64 + var err error + if messageutil.IsSchemaChange(header) { + segmentIDs, err = impl.shardManager.FlushAndFenceSegmentAllocUntil(header.GetCollectionId(), msg.TimeTick()) + if err != nil { + return nil, status.NewUnrecoverableError(err.Error()) + } } + header.FlushedSegmentIds = segmentIDs putCollectionMsg.OverwriteHeader(header) return appendOp(ctx, msg)