zhenshan.cao 2025-11-04 16:51:33 +08:00 committed by GitHub
parent 9e4975bdfa
commit 6327c9a514
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 2362 additions and 1117 deletions

View File

@ -458,6 +458,7 @@ IsPrimitiveType(proto::schema::DataType type) {
case proto::schema::DataType::Double:
case proto::schema::DataType::String:
case proto::schema::DataType::VarChar:
case proto::schema::DataType::Timestamptz:
return true;
default:
return false;
@ -1045,6 +1046,9 @@ struct fmt::formatter<milvus::proto::schema::DataType>
case milvus::proto::schema::DataType::Geometry:
name = "Geometry";
break;
case milvus::proto::schema::DataType::Timestamptz:
name = "Timestamptz";
break;
case milvus::proto::schema::DataType::Text:
name = "Text";
break;

View File

@ -60,6 +60,10 @@ PhyNullExpr::Eval(EvalCtx& context, VectorPtr& result) {
result = ExecVisitorImpl<double>(input);
break;
}
case DataType::TIMESTAMPTZ: {
result = ExecVisitorImpl<int64_t>(input);
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()

View File

@ -58,7 +58,7 @@ PhyTimestamptzArithCompareExpr::ExecCompareVisitorImplForAll(
zeroRightOperand.set_int64_val(0);
auto helperExpr =
std::make_shared<milvus::expr::BinaryArithOpEvalRangeExpr>(
expr_->timestamp_column_,
expr_->column_,
expr_->compare_op_,
proto::plan::ArithOpType::Add,
expr_->compare_value_,

View File

@ -37,8 +37,8 @@ class PhyTimestamptzArithCompareExpr : public SegmentExpr {
name,
op_ctx,
segment,
expr->timestamp_column_.field_id_,
expr->timestamp_column_.nested_path_,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::TIMESTAMPTZ,
active_count,
batch_size,
@ -57,6 +57,11 @@ class PhyTimestamptzArithCompareExpr : public SegmentExpr {
return true;
}
std::optional<milvus::expr::ColumnInfo>
GetColumnInfo() const override {
return expr_->column_;
}
private:
template <typename T>
VectorPtr

View File

@ -109,7 +109,7 @@ IsMaterializedViewSupported(const DataType& data_type) {
data_type == DataType::INT16 || data_type == DataType::INT32 ||
data_type == DataType::INT64 || data_type == DataType::FLOAT ||
data_type == DataType::DOUBLE || data_type == DataType::VARCHAR ||
data_type == DataType::STRING;
data_type == DataType::TIMESTAMPTZ || data_type == DataType::STRING;
}
struct ColumnInfo {
@ -686,7 +686,7 @@ class TimestamptzArithCompareExpr : public ITypeFilterExpr {
const proto::plan::Interval& interval,
const proto::plan::OpType compare_op,
const proto::plan::GenericValue& compare_value)
: timestamp_column_(timestamp_column),
: column_(timestamp_column),
arith_op_(arith_op),
interval_(interval),
compare_op_(compare_op),
@ -696,8 +696,7 @@ class TimestamptzArithCompareExpr : public ITypeFilterExpr {
std::string
ToString() const override {
std::stringstream ss;
ss << "TimestamptzArithCompareExpr:[Column: "
<< timestamp_column_.ToString()
ss << "TimestamptzArithCompareExpr:[Column: " << column_.ToString()
<< ", ArithOp: " << milvus::proto::plan::ArithOpType_Name(arith_op_)
<< ", Interval: " << interval_.ShortDebugString()
<< ", CompareOp: " << milvus::proto::plan::OpType_Name(compare_op_)
@ -706,7 +705,7 @@ class TimestamptzArithCompareExpr : public ITypeFilterExpr {
}
public:
const ColumnInfo timestamp_column_;
const ColumnInfo column_;
const proto::plan::ArithOpType arith_op_;
const proto::plan::Interval interval_;
const proto::plan::OpType compare_op_;

View File

@ -434,6 +434,11 @@ class ChunkedColumn : public ChunkedColumnBase {
op_ctx, dst, offsets, count);
break;
}
case DataType::TIMESTAMPTZ: {
BulkPrimitiveValueAtImpl<int64_t, int64_t>(
op_ctx, dst, offsets, count);
break;
}
case DataType::FLOAT: {
BulkPrimitiveValueAtImpl<float, float>(
op_ctx, dst, offsets, count);

View File

@ -469,6 +469,11 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
op_ctx, dst, offsets, count);
break;
}
case DataType::TIMESTAMPTZ: {
BulkPrimitiveValueAtImpl<int64_t, int64_t>(
op_ctx, dst, offsets, count);
break;
}
case DataType::FLOAT: {
BulkPrimitiveValueAtImpl<float, float>(
op_ctx, dst, offsets, count);

View File

@ -205,7 +205,8 @@ class ChunkedColumnInterface {
return data_type == DataType::INT8 || data_type == DataType::INT16 ||
data_type == DataType::INT32 || data_type == DataType::INT64 ||
data_type == DataType::FLOAT || data_type == DataType::DOUBLE ||
data_type == DataType::BOOL;
data_type == DataType::BOOL ||
data_type == DataType::TIMESTAMPTZ;
}
static bool

View File

@ -81,6 +81,9 @@ DefaultValueChunkTranslator::estimated_byte_size_of_cell(
case milvus::DataType::INT64:
value_size = sizeof(int64_t);
break;
case milvus::DataType::TIMESTAMPTZ:
value_size = sizeof(int64_t);
break;
case milvus::DataType::FLOAT:
value_size = sizeof(float);
break;

View File

@ -118,7 +118,7 @@ func MarshalFieldModel(field *Field) *schemapb.FieldSchema {
IsPartitionKey: field.IsPartitionKey,
IsClusteringKey: field.IsClusteringKey,
IsFunctionOutput: field.IsFunctionOutput,
DefaultValue: field.DefaultValue,
DefaultValue: proto.Clone(field.DefaultValue).(*schemapb.ValueField),
ElementType: field.ElementType,
Nullable: field.Nullable,
}

View File

@ -1,7 +1,8 @@
grammar Plan;
expr:
Identifier (op1=(ADD | SUB) INTERVAL interval_string=StringLiteral)? op2=(LT | LE | GT | GE | EQ | NE) ISO compare_string=StringLiteral # TimestamptzCompare
Identifier (op1=(ADD | SUB) INTERVAL interval_string=StringLiteral)? op2=(LT | LE | GT | GE | EQ | NE) ISO compare_string=StringLiteral # TimestamptzCompareForward
| ISO compare_string=StringLiteral op2=(LT | LE | GT | GE | EQ | NE) Identifier (op1=(ADD | SUB) INTERVAL interval_string=StringLiteral)? # TimestamptzCompareReverse
| IntegerConstant # Integer
| FloatingConstant # Floating
| BooleanConstant # Boolean

File diff suppressed because one or more lines are too long

View File

@ -71,6 +71,10 @@ func (v *BasePlanVisitor) VisitBoolean(ctx *BooleanContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitTimestamptzCompareReverse(ctx *TimestamptzCompareReverseContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitSTDWithin(ctx *STDWithinContext) interface{} {
return v.VisitChildren(ctx)
}
@ -79,6 +83,10 @@ func (v *BasePlanVisitor) VisitShift(ctx *ShiftContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitTimestamptzCompareForward(ctx *TimestamptzCompareForwardContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitCall(ctx *CallContext) interface{} {
return v.VisitChildren(ctx)
}
@ -147,10 +155,6 @@ func (v *BasePlanVisitor) VisitUnary(ctx *UnaryContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitTimestamptzCompare(ctx *TimestamptzCompareContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BasePlanVisitor) VisitInteger(ctx *IntegerContext) interface{} {
return v.VisitChildren(ctx)
}

File diff suppressed because it is too large Load Diff

View File

@ -55,12 +55,18 @@ type PlanVisitor interface {
// Visit a parse tree produced by PlanParser#Boolean.
VisitBoolean(ctx *BooleanContext) interface{}
// Visit a parse tree produced by PlanParser#TimestamptzCompareReverse.
VisitTimestamptzCompareReverse(ctx *TimestamptzCompareReverseContext) interface{}
// Visit a parse tree produced by PlanParser#STDWithin.
VisitSTDWithin(ctx *STDWithinContext) interface{}
// Visit a parse tree produced by PlanParser#Shift.
VisitShift(ctx *ShiftContext) interface{}
// Visit a parse tree produced by PlanParser#TimestamptzCompareForward.
VisitTimestamptzCompareForward(ctx *TimestamptzCompareForwardContext) interface{}
// Visit a parse tree produced by PlanParser#Call.
VisitCall(ctx *CallContext) interface{}
@ -112,9 +118,6 @@ type PlanVisitor interface {
// Visit a parse tree produced by PlanParser#Unary.
VisitUnary(ctx *UnaryContext) interface{}
// Visit a parse tree produced by PlanParser#TimestamptzCompare.
VisitTimestamptzCompare(ctx *TimestamptzCompareContext) interface{}
// Visit a parse tree produced by PlanParser#Integer.
VisitInteger(ctx *IntegerContext) interface{}

View File

@ -12,11 +12,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
parser "github.com/milvus-io/milvus/internal/parser/planparserv2/generated"
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type ParserVisitorArgs struct {
TimezonePreference []string
Timezone string
}
type ParserVisitor struct {
@ -1920,7 +1921,7 @@ func (v *ParserVisitor) VisitSTDWithin(ctx *parser.STDWithinContext) interface{}
}
}
func (v *ParserVisitor) VisitTimestamptzCompare(ctx *parser.TimestamptzCompareContext) interface{} {
func (v *ParserVisitor) VisitTimestamptzCompareForward(ctx *parser.TimestamptzCompareForwardContext) interface{} {
colExpr, err := v.translateIdentifier(ctx.Identifier().GetText())
identifier := ctx.Identifier().Accept(v)
if err != nil {
@ -1952,7 +1953,7 @@ func (v *ParserVisitor) VisitTimestamptzCompare(ctx *parser.TimestamptzCompareCo
compareOp := cmpOpMap[ctx.GetOp2().GetTokenType()]
timestamptzInt64, err := parseISOWithTimezone(unquotedCompareStr, v.args.TimezonePreference)
timestamptzInt64, err := funcutil.ValidateAndReturnUnixMicroTz(unquotedCompareStr, v.args.Timezone)
if err != nil {
return err
}
@ -1976,3 +1977,86 @@ func (v *ParserVisitor) VisitTimestamptzCompare(ctx *parser.TimestamptzCompareCo
dataType: schemapb.DataType_Bool,
}
}
func (v *ParserVisitor) VisitTimestamptzCompareReverse(ctx *parser.TimestamptzCompareReverseContext) interface{} {
colExpr, err := v.translateIdentifier(ctx.Identifier().GetText())
identifier := ctx.Identifier().GetText()
if err != nil {
return fmt.Errorf("can not translate identifier: %s", identifier)
}
if colExpr.dataType != schemapb.DataType_Timestamptz {
return fmt.Errorf("field '%s' is not a timestamptz datatype", identifier)
}
arithOp := planpb.ArithOpType_Unknown
interval := &planpb.Interval{}
if ctx.GetOp1() != nil {
arithOp = arithExprMap[ctx.GetOp1().GetTokenType()]
rawIntervalStr := ctx.GetInterval_string().GetText()
unquotedIntervalStr, err := convertEscapeSingle(rawIntervalStr)
if err != nil {
return fmt.Errorf("can not convert interval string: %s", rawIntervalStr)
}
interval, err = parseISODuration(unquotedIntervalStr)
if err != nil {
return err
}
}
rawCompareStr := ctx.GetCompare_string().GetText()
unquotedCompareStr, err := convertEscapeSingle(rawCompareStr)
if err != nil {
return fmt.Errorf("can not convert compare string: %s", rawCompareStr)
}
originalCompareOp := cmpOpMap[ctx.GetOp2().GetTokenType()]
compareOp := reverseCompareOp(originalCompareOp)
if compareOp == planpb.OpType_Invalid && originalCompareOp != planpb.OpType_Invalid {
return fmt.Errorf("unsupported comparison operator for reverse Timestamptz: %s", ctx.GetOp2().GetText())
}
timestamptzInt64, err := funcutil.ValidateAndReturnUnixMicroTz(unquotedCompareStr, v.args.Timezone)
if err != nil {
return err
}
newExpr := &planpb.Expr{
Expr: &planpb.Expr_TimestamptzArithCompareExpr{
TimestamptzArithCompareExpr: &planpb.TimestamptzArithCompareExpr{
TimestamptzColumn: toColumnInfo(colExpr),
ArithOp: arithOp,
Interval: interval,
CompareOp: compareOp,
CompareValue: &planpb.GenericValue{
Val: &planpb.GenericValue_Int64Val{Int64Val: timestamptzInt64},
},
},
},
}
return &ExprWithType{
expr: newExpr,
dataType: schemapb.DataType_Bool,
}
}
func reverseCompareOp(op planpb.OpType) planpb.OpType {
switch op {
case planpb.OpType_LessThan:
return planpb.OpType_GreaterThan
case planpb.OpType_LessEqual:
return planpb.OpType_GreaterEqual
case planpb.OpType_GreaterThan:
return planpb.OpType_LessThan
case planpb.OpType_GreaterEqual:
return planpb.OpType_LessEqual
case planpb.OpType_Equal:
return planpb.OpType_Equal
case planpb.OpType_NotEqual:
return planpb.OpType_NotEqual
default:
return planpb.OpType_Invalid
}
}

View File

@ -5,7 +5,6 @@ import (
"regexp"
"strconv"
"strings"
"time"
"unicode"
"github.com/cockroachdb/errors"
@ -173,6 +172,11 @@ func getTargetType(lDataType, rDataType schemapb.DataType) (schemapb.DataType, e
return schemapb.DataType_Geometry, nil
}
}
if typeutil.IsTimestamptzType(lDataType) {
if typeutil.IsTimestamptzType(rDataType) {
return schemapb.DataType_Timestamptz, nil
}
}
if typeutil.IsFloatingType(lDataType) {
if typeutil.IsJSONType(rDataType) || typeutil.IsArithmetic(rDataType) {
return schemapb.DataType_Double, nil
@ -241,6 +245,9 @@ func castValue(dataType schemapb.DataType, value *planpb.GenericValue) (*planpb.
if typeutil.IsStringType(dataType) && IsString(value) {
return value, nil
}
if typeutil.IsTimestamptzType(dataType) {
return value, nil
}
if typeutil.IsBoolType(dataType) && IsBool(value) {
return value, nil
@ -630,7 +637,8 @@ func canArithmetic(left, leftElement, right, rightElement schemapb.DataType, rev
func canConvertToIntegerType(dataType, elementType schemapb.DataType) bool {
return typeutil.IsIntegerType(dataType) || typeutil.IsJSONType(dataType) ||
(typeutil.IsArrayType(dataType) && typeutil.IsIntegerType(elementType))
(typeutil.IsArrayType(dataType) && typeutil.IsIntegerType(elementType)) ||
typeutil.IsTimestamptzType(dataType)
}
func isIntegerColumn(col *planpb.ColumnInfo) bool {
@ -864,31 +872,3 @@ func parseISODuration(durationStr string) (*planpb.Interval, error) {
return interval, nil
}
func parseISOWithTimezone(isoString string, preferredZones []string) (int64, error) {
timeZoneOffsetRegex := regexp.MustCompile(`([+-]\d{2}:\d{2}|Z)$`)
// layout for timestamp string without timezone
const layoutForNaiveTime = "2025-01-02T15:04:05.999999999"
if timeZoneOffsetRegex.MatchString(isoString) { // has timezone
t, err := time.Parse(time.RFC3339Nano, isoString)
if err != nil {
return 0, fmt.Errorf("failed to parse timezone-aware string '%s': %w", isoString, err)
}
return t.UnixMicro(), nil
}
for _, zoneName := range preferredZones {
loc, err := time.LoadLocation(zoneName)
if err != nil {
continue
}
t, err := time.ParseInLocation(layoutForNaiveTime, isoString, loc)
if err == nil {
return t.UnixMicro(), nil
}
}
t, err := time.ParseInLocation(layoutForNaiveTime, isoString, time.UTC)
if err != nil {
return 0, fmt.Errorf("failed to parse naive time string '%s' even with UTC fallback: %w", isoString, err)
}
return t.UnixMicro(), nil
}

View File

@ -633,14 +633,6 @@ func parseRankParams(rankParamsPair []*commonpb.KeyValuePair, schema *schemapb.C
}, nil
}
func parseTimezone(params []*commonpb.KeyValuePair) string {
timezone, err := funcutil.GetAttrByKeyFromRepeatedKV(TimezoneKey, params)
if err != nil {
return ""
}
return timezone
}
func parseTimeFields(params []*commonpb.KeyValuePair) []string {
timeFields, err := funcutil.GetAttrByKeyFromRepeatedKV(TimefieldsKey, params)
if err != nil {

View File

@ -73,7 +73,6 @@ const (
OffsetKey = "offset"
LimitKey = "limit"
// key for timestamptz translation
TimezoneKey = "timezone"
TimefieldsKey = "time_fields"
SearchIterV2Key = "search_iter_v2"
@ -417,6 +416,12 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}
// Validate timezone
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, t.GetProperties())
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
// validate clustering key
if err := t.validateClusteringKey(ctx); err != nil {
return err
@ -1203,9 +1208,9 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
}
}
// Check the validation of timezone
err := checkTimezone(t.Properties...)
if err != nil {
return err
userDefinedTimezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, t.Properties)
if exist && !funcutil.IsTimezoneValid(userDefinedTimezone) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", userDefinedTimezone)
}
} else if len(t.GetDeleteKeys()) > 0 {
key := hasPropInDeletekeys(t.DeleteKeys)

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -68,7 +69,15 @@ func (cdt *createDatabaseTask) OnEnqueue() error {
}
func (cdt *createDatabaseTask) PreExecute(ctx context.Context) error {
return ValidateDatabaseName(cdt.GetDbName())
err := ValidateDatabaseName(cdt.GetDbName())
if err != nil {
return err
}
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, cdt.GetProperties())
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
return nil
}
func (cdt *createDatabaseTask) Execute(ctx context.Context) error {
@ -267,9 +276,9 @@ func (t *alterDatabaseTask) OnEnqueue() error {
func (t *alterDatabaseTask) PreExecute(ctx context.Context) error {
if len(t.GetProperties()) > 0 {
// Check the validation of timezone
err := checkTimezone(t.Properties...)
if err != nil {
return err
userDefinedTimezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, t.Properties)
if exist && !funcutil.IsTimezoneValid(userDefinedTimezone) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", userDefinedTimezone)
}
}
_, ok := common.GetReplicateID(t.Properties)

View File

@ -301,10 +301,8 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
if err != nil {
return ErrWithLog(log, "Failed to get collection info", err)
}
_, dbTimezone := getDbTimezone(db)
_, colTimezone := getColTimezone(colInfo)
timezonePreference := []string{colTimezone, dbTimezone}
visitorArgs := &planparserv2.ParserVisitorArgs{TimezonePreference: timezonePreference}
colTimezone := getColTimezone(colInfo)
visitorArgs := &planparserv2.ParserVisitorArgs{Timezone: colTimezone}
start := time.Now()
dr.plan, err = planparserv2.CreateRetrievePlanArgs(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues(), visitorArgs)

View File

@ -228,6 +228,8 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
return Params.AutoIndexConfig.ScalarIntIndexType.GetValue()
} else if typeutil.IsFloatingType(dataType) {
return Params.AutoIndexConfig.ScalarFloatIndexType.GetValue()
} else if typeutil.IsTimestamptzType(dataType) {
return Params.AutoIndexConfig.ScalarTimestampTzIndexType.GetValue()
}
return Params.AutoIndexConfig.ScalarVarcharIndexType.GetValue()
}
@ -427,7 +429,7 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
}
}
// auto fill json path with field name if not specified for json index
// autofill json path with field name if not specified for json index
if typeutil.IsJSONType(cit.fieldSchema.DataType) {
if _, exist := indexParamsMap[common.JSONPathKey]; !exist {
indexParamsMap[common.JSONPathKey] = cit.req.FieldName

View File

@ -243,13 +243,6 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
return err
}
// trans timestamptz data
_, colTimezone := getColTimezone(colInfo)
err = timestamptzIsoStr2Utc(it.insertMsg.GetFieldsData(), colTimezone)
if err != nil {
return err
}
partitionKeyMode, err := isPartitionKeyMode(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("check partition key mode failed", zap.String("collectionName", collectionName), zap.Error(err))

View File

@ -79,6 +79,7 @@ type queryTask struct {
allQueryCnt int64
totalRelatedDataSize int64
mustUsePartitionKey bool
resolvedTimezoneStr string
storageCost segcore.StorageCost
}
@ -233,9 +234,9 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
}
}
timezoneStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TimezoneKey, queryParamsPair)
if err == nil {
timezone = timezoneStr
timezone, _ = funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, queryParamsPair)
if (timezone != "") && !funcutil.IsTimezoneValid(timezone) {
return nil, merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", timezone)
}
extractTimeFieldsStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TimefieldsKey, queryParamsPair)
@ -457,9 +458,16 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.request.Expr = IDs2Expr(pkField, t.ids)
}
_, colTimezone := getColTimezone(colInfo)
timezonePreference := []string{t.queryParams.timezone, colTimezone}
if err := t.createPlanArgs(ctx, &planparserv2.ParserVisitorArgs{TimezonePreference: timezonePreference}); err != nil {
if t.queryParams.timezone != "" {
// validated in queryParams, no need to validate again
t.resolvedTimezoneStr = t.queryParams.timezone
log.Debug("determine timezone from request", zap.String("user defined timezone", t.resolvedTimezoneStr))
} else {
t.resolvedTimezoneStr = getColTimezone(colInfo)
log.Debug("determine timezone from collection", zap.Any("collection timezone", t.resolvedTimezoneStr))
}
if err := t.createPlanArgs(ctx, &planparserv2.ParserVisitorArgs{Timezone: t.resolvedTimezoneStr}); err != nil {
return err
}
t.plan.Node.(*planpb.PlanNode_Query).Query.Limit = t.RetrieveRequest.Limit
@ -666,31 +674,17 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
// first page for iteration, need to set up sessionTs for iterator
t.result.SessionTs = getMaxMvccTsFromChannels(t.channelsMvcc, t.BeginTs())
}
// Translate timestamp to ISO string
collName := t.request.GetCollectionName()
dbName := t.request.GetDbName()
collID, err := globalMetaCache.GetCollectionID(context.Background(), dbName, collName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, dbName, collName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}
_, colTimezone := getColTimezone(colInfo)
if !t.reQuery {
if len(t.queryParams.extractTimeFields) > 0 {
log.Debug("extracting fields for timestamptz", zap.Strings("fields", t.queryParams.extractTimeFields))
err = extractFieldsFromResults(t.result.GetFieldsData(), []string{t.queryParams.timezone, colTimezone}, t.queryParams.extractTimeFields)
err = extractFieldsFromResults(t.result.GetFieldsData(), t.resolvedTimezoneStr, t.queryParams.extractTimeFields)
if err != nil {
log.Warn("fail to extract fields for timestamptz", zap.Error(err))
return err
}
} else {
log.Debug("translate timestamp to ISO string", zap.String("user define timezone", t.queryParams.timezone))
err = timestamptzUTC2IsoStr(t.result.GetFieldsData(), t.queryParams.timezone, colTimezone)
err = timestamptzUTC2IsoStr(t.result.GetFieldsData(), t.resolvedTimezoneStr)
if err != nil {
log.Warn("fail to translate timestamp", zap.Error(err))
return err

View File

@ -96,6 +96,8 @@ type searchTask struct {
functionScore *rerank.FunctionScore
rankParams *rankParams
resolvedTimezoneStr string
isIterator bool
// we always remove pk field from output fields, as search result already contains pk field.
// if the user explicitly set pk field in output fields, we add it back to the result.
@ -286,6 +288,19 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
}
}
timezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, t.request.SearchParams)
if exist {
if !funcutil.IsTimezoneValid(timezone) {
log.Info("get invalid timezone from request", zap.String("timezone", timezone))
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", timezone)
}
log.Debug("determine timezone from request", zap.String("user defined timezone", timezone))
} else {
timezone = getColTimezone(collectionInfo)
log.Debug("determine timezone from collection", zap.Any("collection timezone", timezone))
}
t.resolvedTimezoneStr = timezone
t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]()
if err = ValidateTask(t); err != nil {
@ -878,32 +893,16 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
// Translate timestamp to ISO string
collName := t.request.GetCollectionName()
dbName := t.request.GetDbName()
collID, err := globalMetaCache.GetCollectionID(context.Background(), dbName, collName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, dbName, collName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}
_, colTimezone := getColTimezone(colInfo)
timeFields := parseTimeFields(t.request.SearchParams)
timezoneUserDefined := parseTimezone(t.request.SearchParams)
if timeFields != nil {
log.Debug("extracting fields for timestamptz", zap.Strings("fields", timeFields))
err = extractFieldsFromResults(t.result.GetResults().GetFieldsData(), []string{timezoneUserDefined, colTimezone}, timeFields)
err = extractFieldsFromResults(t.result.GetResults().GetFieldsData(), t.resolvedTimezoneStr, timeFields)
if err != nil {
log.Warn("fail to extract fields for timestamptz", zap.Error(err))
return err
}
} else {
log.Debug("translate timstamp to ISO string", zap.String("user define timezone", timezoneUserDefined))
err = timestamptzUTC2IsoStr(t.result.GetResults().GetFieldsData(), timezoneUserDefined, colTimezone)
err = timestamptzUTC2IsoStr(t.result.GetResults().GetFieldsData(), t.resolvedTimezoneStr)
if err != nil {
log.Warn("fail to translate timestamp", zap.Error(err))
return err

View File

@ -1068,17 +1068,6 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
}
}
// trans timestamptz data
_, colTimezone := getColTimezone(colInfo)
err = timestamptzIsoStr2Utc(it.insertFieldData, colTimezone)
if err != nil {
return err
}
err = timestamptzIsoStr2Utc(it.req.GetFieldsData(), colTimezone)
if err != nil {
return err
}
it.upsertMsg = &msgstream.UpsertMsg{
InsertMsg: &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{

View File

@ -2740,143 +2740,54 @@ func reconstructStructFieldDataForSearch(results *milvuspb.SearchResults, schema
results.Results.OutputFields = outputFields
}
func hasTimestamptzField(schema *schemapb.CollectionSchema) bool {
for _, field := range schema.Fields {
if field.GetDataType() == schemapb.DataType_Timestamptz {
return true
}
func getColTimezone(colInfo *collectionInfo) string {
timezone, _ := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, colInfo.properties)
if timezone == "" {
timezone = common.DefaultTimezone
}
return false
return timezone
}
func getDefaultTimezoneVal(props ...*commonpb.KeyValuePair) (bool, string) {
for _, p := range props {
// used in collection or database
if p.GetKey() == common.DatabaseDefaultTimezone || p.GetKey() == common.CollectionDefaultTimezone {
return true, p.Value
}
}
return false, ""
}
func checkTimezone(props ...*commonpb.KeyValuePair) error {
hasTImezone, timezoneStr := getDefaultTimezoneVal(props...)
if hasTImezone {
_, err := time.LoadLocation(timezoneStr)
if err != nil {
return merr.WrapErrParameterInvalidMsg("invalid timezone, should be a IANA timezone name: %s", err.Error())
}
}
return nil
}
func getColTimezone(colInfo *collectionInfo) (bool, string) {
return getDefaultTimezoneVal(colInfo.properties...)
}
func getDbTimezone(dbInfo *databaseInfo) (bool, string) {
return getDefaultTimezoneVal(dbInfo.properties...)
}
func timestamptzIsoStr2Utc(columns []*schemapb.FieldData, colTimezone string) error {
naiveLayouts := []string{
"2006-01-02T15:04:05.999999999",
"2006-01-02T15:04:05",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05",
}
for _, fieldData := range columns {
if fieldData.GetType() != schemapb.DataType_Timestamptz {
continue
}
scalarField := fieldData.GetScalars()
if scalarField == nil || scalarField.GetStringData() == nil {
log.Warn("field data is not string data", zap.String("fieldName", fieldData.GetFieldName()))
return merr.WrapErrParameterInvalidMsg("field data is not string data")
}
stringData := scalarField.GetStringData().GetData()
utcTimestamps := make([]int64, len(stringData))
for i, isoStr := range stringData {
var t time.Time
var err error
// parse directly
t, err = time.Parse(time.RFC3339Nano, isoStr)
if err == nil {
utcTimestamps[i] = t.UnixMicro()
continue
}
// no timezone, try to find timezone in collecion -> database level
defaultTZ := "UTC"
if colTimezone != "" {
defaultTZ = colTimezone
}
location, err := time.LoadLocation(defaultTZ)
if err != nil {
log.Error("invalid timezone", zap.String("timezone", defaultTZ), zap.Error(err))
return merr.WrapErrParameterInvalidMsg("got invalid default timezone: %s", defaultTZ)
}
var parsed bool
for _, layout := range naiveLayouts {
t, err = time.ParseInLocation(layout, isoStr, location)
if err == nil {
parsed = true
break
}
}
if !parsed {
log.Warn("Can not parse timestamptz string", zap.String("timestamp_string", isoStr))
return merr.WrapErrParameterInvalidMsg("got invalid timestamptz string: %s", isoStr)
}
utcTimestamps[i] = t.UnixMicro()
}
// Replace data in place
fieldData.GetScalars().Data = &schemapb.ScalarField_TimestamptzData{
TimestamptzData: &schemapb.TimestamptzArray{
Data: utcTimestamps,
},
}
}
return nil
}
func timestamptzUTC2IsoStr(results []*schemapb.FieldData, userDefineTimezone string, colTimezone string) error {
// Determine the target timezone based on priority: collection -> database -> UTC.
defaultTZ := "UTC"
if userDefineTimezone != "" {
defaultTZ = userDefineTimezone
} else if colTimezone != "" {
defaultTZ = colTimezone
}
location, err := time.LoadLocation(defaultTZ)
// timestamptzUTC2IsoStr converts Timestamptz (Unix Microsecond) data
// within FieldData results into ISO-8601 strings, applying the correct
// timezone offset and using the optimized format (microsecond precision, no trailing zeros).
func timestamptzUTC2IsoStr(results []*schemapb.FieldData, colTimezone string) error {
location, err := time.LoadLocation(colTimezone)
if err != nil {
log.Error("invalid timezone", zap.String("timezone", defaultTZ), zap.Error(err))
return merr.WrapErrParameterInvalidMsg("got invalid default timezone: %s", defaultTZ)
log.Error("invalid timezone", zap.String("timezone", colTimezone), zap.Error(err))
return merr.WrapErrParameterInvalidMsg("got invalid default timezone: %s", colTimezone)
}
for _, fieldData := range results {
if fieldData.GetType() != schemapb.DataType_Timestamptz {
continue
}
scalarField := fieldData.GetScalars()
// Guard against nil scalars or missing timestamp data
if scalarField == nil || scalarField.GetTimestamptzData() == nil {
if longData := scalarField.GetLongData(); longData != nil && len(longData.GetData()) > 0 {
log.Warn("field data is not Timestamptz data", zap.String("fieldName", fieldData.GetFieldName()))
return merr.WrapErrParameterInvalidMsg("field data for '%s' is not Timestamptz data", fieldData.GetFieldName())
}
// Handle the case of an empty field (e.g., all nulls), skip if no data to process.
continue
}
utcTimestamps := scalarField.GetTimestamptzData().GetData()
isoStrings := make([]string, len(utcTimestamps))
// CORE CHANGE: Use the optimized formatting function
for i, ts := range utcTimestamps {
// 1. Convert Unix Microsecond (UTC) to a time.Time object (still in UTC).
t := time.UnixMicro(ts).UTC()
// 2. Adjust the time object to the target location.
localTime := t.In(location)
isoStrings[i] = localTime.Format(time.RFC3339Nano)
// 3. Format using the optimized logic (max 6 digits, no trailing zeros)
isoStrings[i] = funcutil.FormatTimeMicroWithoutTrailingZeros(localTime)
}
// Replace the TimestamptzData with the new StringData in place.
@ -2918,23 +2829,11 @@ func extractFields(t time.Time, fieldList []string) ([]int64, error) {
return extractedValues, nil
}
func extractFieldsFromResults(results []*schemapb.FieldData, precedenceTimezone []string, fieldList []string) error {
var targetLocation *time.Location
for _, tz := range precedenceTimezone {
if tz != "" {
loc, err := time.LoadLocation(tz)
if err != nil {
log.Error("invalid timezone provided in precedence list", zap.String("timezone", tz), zap.Error(err))
return merr.WrapErrParameterInvalidMsg("got invalid timezone: %s", tz)
}
targetLocation = loc
break // Use the first valid timezone found.
}
}
if targetLocation == nil {
targetLocation = time.UTC
func extractFieldsFromResults(results []*schemapb.FieldData, timezone string, fieldList []string) error {
targetLocation, err := time.LoadLocation(timezone)
if err != nil {
log.Error("invalid timezone", zap.String("timezone", timezone), zap.Error(err))
return merr.WrapErrParameterInvalidMsg("got invalid timezone: %s", timezone)
}
for _, fieldData := range results {

View File

@ -189,11 +189,12 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, helper *typeutil.Sch
case schemapb.DataType_ArrayOfStruct:
panic("unreachable, array of struct should have been flattened")
case schemapb.DataType_Timestamptz:
// TODO: Add check logic for timestamptz data
if err := v.checkTimestamptzFieldData(field, helper.GetTimezone()); err != nil {
return err
}
default:
}
}
err := v.fillWithValue(data, helper, int(numRows))
if err != nil {
return err
@ -465,13 +466,11 @@ func FillWithNullValue(field *schemapb.FieldData, fieldSchema *schemapb.FieldSch
if err != nil {
return err
}
case *schemapb.ScalarField_StringData:
sd.StringData.Data, err = fillWithNullValueImpl(sd.StringData.Data, field.GetValidData())
if err != nil {
return err
}
case *schemapb.ScalarField_ArrayData:
sd.ArrayData.Data, err = fillWithNullValueImpl(sd.ArrayData.Data, field.GetValidData())
if err != nil {
@ -568,11 +567,32 @@ func FillWithDefaultValue(field *schemapb.FieldData, fieldSchema *schemapb.Field
}
case *schemapb.ScalarField_TimestamptzData:
// Basic validation: Check if the length of the validity mask matches the number of rows.
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
// Retrieve the default value, which is usually stored as int64 (UTC microseconds).
defaultValue := fieldSchema.GetDefaultValue().GetTimestamptzData()
// If the int64 default value is 0 (which might happen if it was not fully persisted
// or if the underlying storage is being checked), attempt to fall back to the string value.
if defaultValue == 0 {
strDefaultValue := fieldSchema.GetDefaultValue().GetStringData()
// If a non-empty string default value exists, perform conversion.
if len(strDefaultValue) != 0 {
// NOTE: The strDefaultValue is guaranteed to be a valid ISO 8601 timestamp string,
// as it was validated during collection schema creation (by checkAndRewriteTimestampTzDefaultValue).
//
// Since the string either contains a UTC offset (e.g., '+08:00') or should be treated
// as UTC/the collection's primary timezone, the 'common.DefaultTimezone' passed here
// as the fallback timezone is generally inconsequential (negligible)
// for the final conversion result in this specific context.
defaultValue, _ = funcutil.ValidateAndReturnUnixMicroTz(strDefaultValue, common.DefaultTimezone)
}
}
sd.TimestamptzData.Data, err = fillWithDefaultValueImpl(sd.TimestamptzData.Data, defaultValue, field.GetValidData())
if err != nil {
return nil
@ -939,16 +959,6 @@ func (v *validateUtil) checkDoubleFieldData(field *schemapb.FieldData, fieldSche
return nil
}
func (v *validateUtil) checkTimestamptzFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetTimestamptzData().GetData()
if data == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need long int array", "got nil", msg)
}
// TODO: Additional checks?
return nil
}
func (v *validateUtil) checkArrayElement(array *schemapb.ArrayArray, field *schemapb.FieldSchema) error {
switch field.GetElementType() {
case schemapb.DataType_Bool:
@ -1140,6 +1150,43 @@ func (v *validateUtil) checkArrayOfVectorFieldData(field *schemapb.FieldData, fi
}
}
// checkTimestamptzFieldData validates the input string data for a Timestamptz field,
// converts it into UTC Unix Microseconds (int64), and replaces the data in place.
func (v *validateUtil) checkTimestamptzFieldData(field *schemapb.FieldData, timezone string) error {
// 1. Structural Check: Data must be present and must be a string array
scalarField := field.GetScalars()
if scalarField == nil || scalarField.GetStringData() == nil {
log.Warn("timestamptz field data is not string array", zap.String("fieldName", field.GetFieldName()))
return merr.WrapErrParameterInvalidMsg("timestamptz field data must be a string array")
}
stringData := scalarField.GetStringData().GetData()
utcTimestamps := make([]int64, len(stringData))
// 2. Validation and Conversion Loop
for i, isoStr := range stringData {
// Use the centralized parser (funcutil.ParseTimeTz) for validation and parsing.
t, err := funcutil.ParseTimeTz(isoStr, timezone)
if err != nil {
log.Warn("cannot parse timestamptz string", zap.String("timestamp_string", isoStr), zap.Error(err))
// Use the recommended refined error message structure
const invalidMsg = "invalid timezone name; must be a valid IANA Time Zone ID (e.g., 'Asia/Shanghai' or 'UTC')"
return merr.WrapErrParameterInvalidMsg("got invalid timestamptz string '%s': %s", isoStr, invalidMsg)
}
// Convert the time object to Unix Microseconds (int64)
utcTimestamps[i] = t.UnixMicro()
}
// 3. In-Place Data Replacement: Replace StringData with converted TimestamptzData (int64)
field.GetScalars().Data = &schemapb.ScalarField_TimestamptzData{
TimestamptzData: &schemapb.TimestamptzArray{
Data: utcTimestamps,
},
}
return nil
}
func verifyLengthPerRow[E interface{ ~string | ~[]byte }](strArr []E, maxLength int64) (int, bool) {
for i, s := range strArr {
if int64(len(s)) > maxLength {

View File

@ -153,6 +153,70 @@ func checkGeometryDefaultValue(value string) error {
return nil
}
// checkAndRewriteTimestampTzDefaultValue processes the collection schema to validate
// and rewrite default values for TIMESTAMPTZ fields.
//
// Background:
// 1. TIMESTAMPTZ default values are initially stored as user-provided ISO 8601 strings
// (in ValueField.GetStringData()).
// 2. Milvus stores TIMESTAMPTZ data internally as UTC microseconds (int64).
//
// Logic:
// The function iterates through all fields of type DataType_Timestamptz. For each field
// with a default value:
// 1. It retrieves the collection's default timezone if no offset is present in the string.
// 2. It calls ValidateAndReturnUnixMicroTz to validate the string (including the UTC
// offset range check) and convert it to the absolute UTC microsecond (int64) value.
// 3. It rewrites the ValueField, setting the LongData field with the calculated int64
// value, thereby replacing the initial string representation.
func checkAndRewriteTimestampTzDefaultValue(schema *schemapb.CollectionSchema) error {
// 1. Get the collection-level default timezone.
// Assuming common.TimezoneKey and common.DefaultTimezone are defined constants.
timezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, schema.GetProperties())
if !exist {
timezone = common.DefaultTimezone
}
for _, fieldSchema := range schema.GetFields() {
// Only process TIMESTAMPTZ fields.
if fieldSchema.GetDataType() != schemapb.DataType_Timestamptz {
continue
}
defaultValue := fieldSchema.GetDefaultValue()
if defaultValue == nil {
continue
}
// 2. Read the default value as a string (the input format).
// We expect the default value to be set in string_data initially.
stringTz := defaultValue.GetStringData()
if stringTz == "" {
// Skip or handle empty string default values if necessary.
continue
}
// 3. Validate the string and convert it to UTC microsecond (int64).
// This also performs the critical UTC offset range validation.
utcMicro, err := funcutil.ValidateAndReturnUnixMicroTz(stringTz, timezone)
if err != nil {
// If validation fails (e.g., invalid format or illegal offset), return error immediately.
return err
}
// 4. Rewrite the default value to store the UTC microsecond (int64).
// By setting ValueField_LongData, the oneof field in the protobuf structure
// automatically switches from string_data to long_data.
defaultValue.Data = &schemapb.ValueField_LongData{
LongData: utcMicro,
}
// The original string_data field is now cleared due to the oneof nature,
// and the default value is correctly represented as an int64 microsecond value.
}
return nil
}
func hasSystemFields(schema *schemapb.CollectionSchema, systemFields []string) bool {
for _, f := range schema.GetFields() {
if funcutil.SliceContain(systemFields, f.GetName()) {
@ -174,6 +238,11 @@ func (t *createCollectionTask) validateSchema(ctx context.Context, schema *schem
return err
}
// Validate default
if err := checkAndRewriteTimestampTzDefaultValue(schema); err != nil {
return err
}
if err := checkStructArrayFieldSchema(schema.GetStructArrayFields()); err != nil {
return err
}
@ -383,6 +452,12 @@ func (t *createCollectionTask) prepareSchema(ctx context.Context) error {
return err
}
// Validate timezone
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, t.Req.GetProperties())
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
// Set properties for persistent
t.body.CollectionSchema.Properties = t.Req.GetProperties()
t.body.CollectionSchema.Version = 0
@ -472,13 +547,13 @@ func (t *createCollectionTask) Prepare(ctx context.Context) error {
}
// set collection timezone
properties := t.Req.GetProperties()
ok, _ := getDefaultTimezoneVal(properties...)
_, ok := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, properties)
if !ok {
ok, defaultTz := getDefaultTimezoneVal(db.Properties...)
if !ok {
defaultTz = "UTC"
dbTz, ok2 := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, db.Properties)
if !ok2 {
dbTz = common.DefaultTimezone
}
timezoneKV := &commonpb.KeyValuePair{Key: common.CollectionDefaultTimezone, Value: defaultTz}
timezoneKV := &commonpb.KeyValuePair{Key: common.CollectionDefaultTimezone, Value: dbTz}
t.Req.Properties = append(properties, timezoneKV)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -37,6 +38,12 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r
return merr.WrapErrParameterInvalidMsg("can not alter cipher related properties")
}
// Validate timezone
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, req.GetProperties())
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
isEnableDynamicSchema, targetValue, err := common.IsEnableDynamicSchema(req.GetProperties())
if err != nil {
return merr.WrapErrParameterInvalidMsg("invalid dynamic schema property value: %s", req.GetProperties()[0].GetValue())

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -51,6 +52,12 @@ func (c *Core) broadcastAlterDatabase(ctx context.Context, req *rootcoordpb.Alte
return merr.WrapErrParameterInvalidMsg("can not alter cipher related properties")
}
// Validate timezone
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, req.GetProperties())
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
if err != nil {
return err

View File

@ -27,9 +27,12 @@ import (
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
func (c *Core) broadcastCreateDatabase(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error {
@ -54,7 +57,10 @@ func (c *Core) broadcastCreateDatabase(ctx context.Context, req *milvuspb.Create
if err != nil {
return errors.Wrap(err, "failed to tidy database cipher properties")
}
tz, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, properties)
if exist && !funcutil.IsTimezoneValid(tz) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", tz)
}
msg := message.NewCreateDatabaseMessageBuilderV2().
WithHeader(&message.CreateDatabaseMessageHeader{
DbName: req.GetDbName(),

View File

@ -66,6 +66,11 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
return err
}
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
// NEW STEP: Convert TIMESTAMPTZ default values back to string format for the user.
err = rewriteTimestampTzDefaultValueToString(t.Rsp)
if err != nil {
return err
}
t.Rsp.RequestTime = t.ts
return nil
}

View File

@ -1069,10 +1069,68 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
resp.DbId = collInfo.DBID
resp.UpdateTimestamp = collInfo.UpdateTimestamp
resp.UpdateTimestampStr = strconv.FormatUint(collInfo.UpdateTimestamp, 10)
return resp
}
// rewriteTimestampTzDefaultValueToString converts the default_value of TIMESTAMPTZ fields
// in the DescribeCollectionResponse from the internal int64 (UTC microsecond) format
// back to a human-readable, timezone-aware string (RFC3339Nano).
//
// This is necessary because TIMESTAMPTZ default values are stored internally as int64
// after validation but must be returned to the user as a string, respecting the
// collection's default timezone for display purposes if no explicit offset was stored.
func rewriteTimestampTzDefaultValueToString(resp *milvuspb.DescribeCollectionResponse) error {
if resp.GetSchema() == nil {
return nil
}
// 1. Determine the target timezone for display.
// This is typically stored in the collection properties.
timezone, exist := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, resp.GetSchema().GetProperties())
if !exist {
timezone = common.DefaultTimezone // Fallback to a default, like "UTC"
}
// 2. Iterate through all fields in the schema.
for _, fieldSchema := range resp.Schema.GetFields() {
// Only process TIMESTAMPTZ fields.
if fieldSchema.GetDataType() != schemapb.DataType_Timestamptz {
continue
}
defaultValue := fieldSchema.GetDefaultValue()
if defaultValue == nil {
continue
}
// 3. Check if the default value is stored in the internal int64 (LongData) format.
// If it's not LongData, we assume it's either unset or already a string (which shouldn't happen
// if the creation flow worked correctly).
utcMicro, ok := defaultValue.GetData().(*schemapb.ValueField_LongData)
if !ok {
continue // Skip if not stored as LongData (int64)
}
ts := utcMicro.LongData
// 4. Convert the int64 microsecond value back to a timezone-aware string.
tzString, err := funcutil.ConvertUnixMicroToTimezoneString(ts, timezone)
if err != nil {
// In a real system, you might log the error and use the raw int64 as a fallback string,
// but here we'll set a placeholder string to avoid crashing.
tzString = fmt.Sprintf("Error converting timestamp: %v", err)
return errors.Wrap(err, tzString)
}
// 5. Rewrite the default value field in the response schema.
// The protobuf oneof structure ensures setting one field clears the others.
fieldSchema.GetDefaultValue().Data = &schemapb.ValueField_StringData{
StringData: tzString,
}
}
return nil
}
func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
return &milvuspb.DescribeCollectionResponse{

View File

@ -399,6 +399,8 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error {
if dtype == schemapb.DataType_Geometry {
return checkGeometryDefaultValue(fieldSchema.GetDefaultValue().GetStringData())
}
log.Info("czsHaha111", zap.String("type", dtype.String()), zap.String("name", fieldSchema.GetName()), zap.Any("fieldSchema", fieldSchema),
zap.Any("defaultValue", fieldSchema.GetDefaultValue()), zap.Any("type", dtype.String()))
errTypeMismatch := func(fieldName, fieldType, defaultValueType string) error {
msg := fmt.Sprintf("type (%s) of field (%s) is not equal to the type(%s) of default_value", fieldType, fieldName, defaultValueType)
return merr.WrapErrParameterInvalidMsg(msg)
@ -440,17 +442,22 @@ func checkFieldSchema(fieldSchemas []*schemapb.FieldSchema) error {
return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_Timestamptz")
}
case *schemapb.ValueField_StringData:
if dtype != schemapb.DataType_VarChar {
return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_VarChar")
if dtype != schemapb.DataType_VarChar && dtype != schemapb.DataType_Timestamptz {
if dtype != schemapb.DataType_VarChar {
return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_VarChar")
}
return errTypeMismatch(fieldSchema.GetName(), dtype.String(), "DataType_Timestamptz")
}
maxLength, err := parameterutil.GetMaxLength(fieldSchema)
if err != nil {
return err
}
defaultValueLength := len(fieldSchema.GetDefaultValue().GetStringData())
if int64(defaultValueLength) > maxLength {
msg := fmt.Sprintf("the length (%d) of string exceeds max length (%d)", defaultValueLength, maxLength)
return merr.WrapErrParameterInvalid("valid length string", "string length exceeds max length", msg)
if dtype == schemapb.DataType_VarChar {
maxLength, err := parameterutil.GetMaxLength(fieldSchema)
if err != nil {
return err
}
defaultValueLength := len(fieldSchema.GetDefaultValue().GetStringData())
if int64(defaultValueLength) > maxLength {
msg := fmt.Sprintf("the length (%d) of string exceeds max length (%d)", defaultValueLength, maxLength)
return merr.WrapErrParameterInvalid("valid length string", "string length exceeds max length", msg)
}
}
case *schemapb.ValueField_BytesData:
if dtype != schemapb.DataType_JSON {
@ -568,13 +575,3 @@ func nextFieldID(coll *model.Collection) int64 {
}
return maxFieldID + 1
}
func getDefaultTimezoneVal(props ...*commonpb.KeyValuePair) (bool, string) {
for _, p := range props {
// used in collection or database
if p.GetKey() == common.DatabaseDefaultTimezone || p.GetKey() == common.CollectionDefaultTimezone {
return true, p.Value
}
}
return false, ""
}

View File

@ -1600,7 +1600,6 @@ func fillMissingFields(schema *schemapb.CollectionSchema, insertData *InsertData
batchRows := int64(insertData.GetRowNum())
allFields := typeutil.GetAllFieldSchemas(schema)
for _, field := range allFields {
// Skip function output fields and system fields
if field.GetIsFunctionOutput() || field.GetFieldID() < 100 {

View File

@ -215,7 +215,7 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
vectors := lo.Flatten(arrayData.([][]int8))
return vectors, nil, nil
case schemapb.DataType_Array:
// array has not support default_value
// array has not supported default_value
if c.field.GetNullable() {
return ReadNullableArrayData(c, count)
}

View File

@ -19,8 +19,9 @@ func (c *STLSORTChecker) CheckTrain(dataType schemapb.DataType, elementType sche
}
func (c *STLSORTChecker) CheckValidDataType(indexType IndexType, field *schemapb.FieldSchema) error {
if !typeutil.IsArithmetic(field.GetDataType()) && !typeutil.IsStringType(field.GetDataType()) {
return errors.New(fmt.Sprintf("STL_SORT are only supported on numeric or varchar field, got %s", field.GetDataType()))
dataType := field.GetDataType()
if !typeutil.IsArithmetic(dataType) && !typeutil.IsStringType(dataType) && !typeutil.IsTimestamptzType(dataType) {
return errors.New(fmt.Sprintf("STL_SORT are only supported on numeric, varchar or timestamptz field, got %s", field.GetDataType()))
}
return nil
}

View File

@ -95,6 +95,8 @@ const (
CurrentScalarIndexEngineVersion = int32(2)
)
const DefaultTimezone = "UTC"
// Endian is type alias of binary.LittleEndian.
// Milvus uses little endian by default.
var Endian = binary.LittleEndian
@ -246,6 +248,7 @@ const (
NamespaceEnabledKey = "namespace.enabled"
// timezone releated
TimezoneKey = "timezone"
DatabaseDefaultTimezone = "database.timezone"
CollectionDefaultTimezone = "collection.timezone"
AllowInsertAutoIDKey = "allow_insert_auto_id"

View File

@ -0,0 +1,200 @@
package funcutil
import (
"bytes"
"fmt"
"strings"
"time"
)
// Define max/min offset boundaries in seconds for validation, exported for external checks if necessary.
const (
MaxOffsetSeconds = 14 * 3600 // +14:00
MinOffsetSeconds = -12 * 3600 // -12:00
)
// NaiveTzLayouts is a list of common timestamp formats that lack timezone information.
var NaiveTzLayouts = []string{
"2006-01-02T15:04:05.999999999",
"2006-01-02T15:04:05",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05",
}
// ParseTimeTz is the internal core function for parsing TZ-aware or naive timestamps.
// It includes strict validation for the UTC offset range.
func ParseTimeTz(inputStr string, defaultTimezoneStr string) (time.Time, error) {
// 1. Primary parsing: Attempt to parse a TZ-aware string (RFC3339Nano)
t, err := time.Parse(time.RFC3339Nano, inputStr)
if err == nil {
// Parsing succeeded (TZ-aware string). Now, perform the strict offset validation.
// If the string contains an explicit offset (like +99:00), t.Zone() will reflect it.
_, offsetSeconds := t.Zone()
if offsetSeconds > MaxOffsetSeconds || offsetSeconds < MinOffsetSeconds {
offsetHours := offsetSeconds / 3600
return time.Time{}, fmt.Errorf("UTC offset hour %d is out of the valid range [%d, %d]", offsetHours, MinOffsetSeconds/3600, MaxOffsetSeconds/3600)
}
return t, nil
}
loc, err := time.LoadLocation(defaultTimezoneStr)
if err != nil {
return time.Time{}, fmt.Errorf("invalid default timezone string '%s': %w", defaultTimezoneStr, err)
}
// 2. Fallback parsing: Attempt to parse a naive string using NaiveTzLayouts
var parsed bool
for _, layout := range NaiveTzLayouts {
// For naive strings, time.ParseInLocation assigns the default location (loc).
parsedTime, parseErr := time.ParseInLocation(layout, inputStr, loc)
if parseErr == nil {
t = parsedTime
parsed = true
break
}
}
if !parsed {
return time.Time{}, fmt.Errorf("invalid timestamp string: '%s'. Does not match any known format", inputStr)
}
// No offset validation needed here: The time was assigned the safe defaultTimezoneStr (loc),
// which is already validated via time.LoadLocation.
return t, nil
}
// ValidateTimestampTz checks if the timestamp string is valid (TZ-aware or naive + default TZ).
func ValidateTimestampTz(inputStr string, defaultTimezoneStr string) error {
_, err := ParseTimeTz(inputStr, defaultTimezoneStr)
return err
}
// ValidateAndNormalizeTimestampTz validates the timestamp and normalizes it to a TZ-aware RFC3339Nano string.
func ValidateAndNormalizeTimestampTz(inputStr string, defaultTimezoneStr string) (string, error) {
t, err := ParseTimeTz(inputStr, defaultTimezoneStr)
if err != nil {
return "", err
}
// Normalization: Format the time object to include the timezone offset.
return t.Format(time.RFC3339Nano), nil
}
// ValidateAndReturnUnixMicroTz validates the timestamp and returns its Unix microsecond (int64) representation.
func ValidateAndReturnUnixMicroTz(inputStr string, defaultTimezoneStr string) (int64, error) {
t, err := ParseTimeTz(inputStr, defaultTimezoneStr)
if err != nil {
return 0, err
}
// UnixMicro() returns the number of microseconds since UTC 1970-01-01T00:00:00Z.
return t.UnixMicro(), nil
}
// CompareUnixMicroTz compares two timestamp strings at Unix microsecond precision.
// If both strings are valid and represent the same microsecond moment in time, it returns true.
// Note: It assumes the input strings are guaranteed to be valid as per the requirement.
// If not, it will return an error indicating the invalid input.
func CompareUnixMicroTz(ts1 string, ts2 string, defaultTimezoneStr string) (bool, error) {
// 1. Parse the first timestamp
t1, err := ParseTimeTz(ts1, defaultTimezoneStr)
if err != nil {
return false, fmt.Errorf("error parsing first timestamp '%s': %w", ts1, err)
}
// 2. Parse the second timestamp
t2, err := ParseTimeTz(ts2, defaultTimezoneStr)
if err != nil {
return false, fmt.Errorf("error parsing second timestamp '%s': %w", ts2, err)
}
// 3. Compare their Unix Microsecond values (int64)
// This automatically compares them based on the UTC epoch, regardless of their original location representation.
return t1.UnixMicro() == t2.UnixMicro(), nil
}
// ConvertUnixMicroToTimezoneString converts a Unix microsecond timestamp (UTC epoch)
// into a TZ-aware string formatted as RFC3339Nano, adjusted to the target timezone.
func ConvertUnixMicroToTimezoneString(ts int64, targetTimezoneStr string) (string, error) {
loc, err := time.LoadLocation(targetTimezoneStr)
if err != nil {
return "", fmt.Errorf("invalid target timezone string '%s': %w", targetTimezoneStr, err)
}
// 1. Convert Unix Microsecond (UTC) to a time.Time object (still in UTC).
t := time.UnixMicro(ts).UTC()
// 2. Adjust the time object to the target location.
localTime := t.In(loc)
// 3. Format the result.
return localTime.Format(time.RFC3339Nano), nil
}
// formatTimeMicroWithoutTrailingZeros is an optimized function to format a time.Time
// object. It first truncates the time to microsecond precision (6 digits) and then
// removes all trailing zeros from the fractional seconds part.
//
// Example 1: 2025-03-20T10:30:00.123456000Z -> 2025-03-20T10:30:00.123456Z
// Example 2: 2025-03-20T10:30:00.123000000Z -> 2025-03-20T10:30:00.123Z
// Example 3: 2025-03-20T10:30:00.000000000Z -> 2025-03-20T10:30:00Z
func FormatTimeMicroWithoutTrailingZeros(t time.Time) string {
// 1. Truncate to Microsecond (6 digits max) to ensure we don't exceed the required precision.
tMicro := t.Truncate(time.Microsecond)
// 2. Format the time using the standard high precision format (RFC3339Nano).
// This results in exactly 9 fractional digits, padded with trailing zeros if necessary.
s := tMicro.Format(time.RFC3339Nano)
// 3. Locate the key delimiters ('.' and the Timezone marker 'Z' or '+/-').
dotIndex := strings.LastIndexByte(s, '.')
// Find the Timezone marker index (Z, +, or -)
tzIndex := len(s) - 1
for ; tzIndex >= 0; tzIndex-- {
if s[tzIndex] == 'Z' || s[tzIndex] == '+' || s[tzIndex] == '-' {
break
}
}
// If the format is unexpected, return the original string.
if dotIndex == -1 || tzIndex == -1 {
return s
}
// 4. Extract and efficiently trim the fractional part using bytes.TrimRight.
// Slice the fractional part (e.g., "123456000")
fractionalPart := s[dotIndex+1 : tzIndex]
// Use bytes.TrimRight for efficient removal of trailing '0' characters.
trimmedBytes := bytes.TrimRight([]byte(fractionalPart), "0")
// 5. Reconstruct the final string based on the trimming result.
// Case A: The fractional part was entirely zeros (e.g., .000000000)
if len(trimmedBytes) == 0 {
// Remove the '.' and the fractional part, keep the Timezone marker.
// Result: "2025-03-20T10:30:00Z"
return s[:dotIndex] + s[tzIndex:]
}
// Case B: Fractional part remains (e.g., .123, .123456)
// Recombine: [Time Body] + "." + [Trimmed Fraction] + [Timezone Marker]
// The dot (s[:dotIndex+1]) must be retained here.
return s[:dotIndex+1] + string(trimmedBytes) + s[tzIndex:]
}
// IsTimezoneValid checks if a given string is a valid, recognized timezone name
// (e.g., "Asia/Shanghai" or "UTC").
// It utilizes Go's time.LoadLocation function.
func IsTimezoneValid(tz string) bool {
if tz == "" {
return false
}
_, err := time.LoadLocation(tz)
return err == nil
}

View File

@ -0,0 +1,629 @@
package funcutil
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// TestValidateAndNormalizeTimestampTz tests the function for string validation and normalization.
func TestValidateAndNormalizeTimestampTz(t *testing.T) {
testCases := []struct {
name string
inputStr string
defaultTZ string
expectedOutput string
expectError bool
errorContainsMsg string
}{
{
name: "Case 1: TZ-aware (UTC)",
inputStr: "2024-10-23T15:30:00Z",
defaultTZ: "Asia/Tokyo",
expectedOutput: "2024-10-23T15:30:00Z",
expectError: false,
},
{
name: "Case 2: TZ-aware (Offset)",
inputStr: "2024-10-23T15:30:00+05:30",
defaultTZ: "UTC",
expectedOutput: "2024-10-23T15:30:00+05:30",
expectError: false,
},
{
name: "Case 3: Naive (Apply Default TZ Shanghai)",
inputStr: "2024-10-23 15:30:00",
defaultTZ: "Asia/Shanghai", // Shanghai is UTC+08:00
expectedOutput: "2024-10-23T15:30:00+08:00",
expectError: false,
},
{
name: "Case 4: Naive (Apply Default TZ LA)",
inputStr: "2024-10-23 15:30:00.123456",
defaultTZ: "America/Los_Angeles", // LA is UTC-07:00 (PDT for Oct)
expectedOutput: "2024-10-23T15:30:00.123456-07:00",
expectError: false,
},
{
name: "Case 5: Invalid Format",
inputStr: "23-10-2024 15:30",
defaultTZ: "UTC",
expectedOutput: "",
expectError: true,
errorContainsMsg: "invalid timestamp string",
},
{
name: "Case 6: Invalid Default Timezone",
inputStr: "2024-10-23T15:30:00Z",
defaultTZ: "Invalid/TZ",
expectedOutput: "",
expectError: true,
errorContainsMsg: "invalid default timezone string",
},
{
name: "Case 7: Offset Too High (+15:00)",
inputStr: "2024-10-23T15:30:00+15:00",
defaultTZ: "UTC",
expectedOutput: "",
expectError: true,
errorContainsMsg: "UTC offset hour 15 is out of the valid range",
},
{
name: "Case 8: Offset Too Low (-13:00)",
inputStr: "2024-10-23T15:30:00-13:00",
defaultTZ: "UTC",
expectedOutput: "",
expectError: true,
errorContainsMsg: "UTC offset hour -13 is out of the valid range",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := ValidateAndNormalizeTimestampTz(tc.inputStr, tc.defaultTZ)
if tc.expectError {
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), tc.errorContainsMsg)
}
} else {
assert.NoError(t, err)
// Truncate expected output to microsecond for safe comparison
// as time.Time can contain up to nanoseconds, but we test to microsecond precision.
tParsed, _ := time.Parse(time.RFC3339Nano, tc.expectedOutput)
expectedFormatted := tParsed.Truncate(time.Microsecond).Format(time.RFC3339Nano)
assert.Equal(t, expectedFormatted, result)
}
})
}
}
// TestValidateAndReturnUnixMicroTz tests the function for Unix microsecond conversion.
func TestValidateAndReturnUnixMicroTz(t *testing.T) {
// Base time for comparison: 2024-10-23T15:30:00.123456Z (UTC)
// UnixMicro should be the same regardless of input format/timezone if moment is the same.
expectedMicro := int64(1729697400123456)
testCases := []struct {
name string
inputStr string
defaultTZ string
expectedMicro int64
expectError bool
}{
{
name: "Case 1: TZ-aware (UTC)",
inputStr: "2024-10-23T15:30:00.123456Z",
defaultTZ: "Asia/Tokyo",
expectedMicro: expectedMicro,
expectError: false,
},
{
name: "Case 2: TZ-aware (Offset)",
inputStr: "2024-10-23T23:30:00.123456+08:00", // +8h is 15:30 UTC
defaultTZ: "UTC",
expectedMicro: expectedMicro,
expectError: false,
},
{
name: "Case 3: Naive (Apply Default TZ Shanghai)",
inputStr: "2024-10-23 23:30:00.123456", // 23:30 Shanghai (UTC+08:00) is 15:30 UTC
defaultTZ: "Asia/Shanghai",
expectedMicro: expectedMicro,
expectError: false,
},
{
name: "Case 4: Naive (Apply Default TZ LA)",
inputStr: "2024-10-23 08:30:00.123456", // 08:30 LA (PDT, UTC-07:00) is 15:30 UTC
defaultTZ: "America/Los_Angeles",
expectedMicro: expectedMicro,
expectError: false,
},
{
name: "Case 5: Invalid Offset",
inputStr: "2024-10-23T15:30:00+15:00",
defaultTZ: "UTC",
expectedMicro: 0,
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := ValidateAndReturnUnixMicroTz(tc.inputStr, tc.defaultTZ)
if tc.expectError {
assert.Error(t, err)
assert.Equal(t, int64(0), result)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedMicro, result)
}
})
}
}
// TestCompareUnixMicroTz tests the comparison function.
func TestCompareUnixMicroTz(t *testing.T) {
// Base time for comparison: 2025-10-23T15:30:00Z (Using the current date for realism)
// These strings all represent the moment 2025-10-23T15:30:00.000000Z
sameTime1 := "2025-10-23T15:30:00Z"
sameTime2 := "2025-10-23T23:30:00+08:00" // Shanghai: 23:30 local -> 15:30 UTC
sameTime3 := "2025-10-23 15:30:00" // Naive time
// A slightly different moment (1 microsecond later)
differentTime := "2025-10-23T15:30:00.000001Z"
testCases := []struct {
name string
ts1 string
ts2 string
defaultTZ string
expectedCmp bool
expectError bool
}{
{
name: "Case 1: Same moment, different explicit TZ format",
ts1: sameTime1, // 15:30 UTC
ts2: sameTime2, // 15:30 UTC
defaultTZ: "UTC",
expectedCmp: true,
expectError: false,
},
{
name: "Case 2: Same moment, one naive, UTC default TZ needed",
ts1: sameTime1, // 15:30 UTC
ts2: sameTime3, // 15:30 assigned UTC -> 15:30 UTC
defaultTZ: "UTC",
expectedCmp: true,
expectError: false,
},
{
// FIX FOR DST: On 2025-10-23, New York is in EDT (UTC-04:00).
// To match 15:30 UTC, the naive time (ts2) must be 15:30 + 04:00 = 19:30 local time.
name: "Case 3: Same moment, DST-aware comparison for naive string",
ts1: "2025-10-23T19:30:00-04:00", // 19:30 EDT -> 15:30 UTC
ts2: "2025-10-23 19:30:00", // 19:30 assigned New York (EDT) -> 15:30 UTC
defaultTZ: "America/New_York",
expectedCmp: true,
expectError: false,
},
{
name: "Case 4: Different moment (1 microsecond difference)",
ts1: sameTime1,
ts2: differentTime,
defaultTZ: "UTC",
expectedCmp: false,
expectError: false,
},
// Revised Case 5
{
name: "Case 5: Different naive times under the same default TZ",
ts1: "2025-10-23 10:00:00", // 10:00 assigned LA -> 03:00 UTC
ts2: "2025-10-23 11:00:00", // 11:00 assigned LA -> 04:00 UTC
defaultTZ: "America/Los_Angeles",
expectedCmp: false, // 03:00 UTC != 04:00 UTC
expectError: false,
},
// New Case 5B: Identical naive strings MUST be equal.
{
name: "Case 5B: Identical naive strings must be equal",
ts1: "2025-10-23 10:00:00", // 10:00 assigned LA -> 03:00 UTC
ts2: "2025-10-23 10:00:00", // 10:00 assigned LA -> 03:00 UTC
defaultTZ: "America/Los_Angeles",
expectedCmp: true,
expectError: false,
},
{
name: "Case 6: Invalid TS1 (Offset Too High)",
ts1: "2025-10-23T15:30:00+15:00", // Should fail offset check
ts2: sameTime1,
defaultTZ: "UTC",
expectedCmp: false,
expectError: true,
},
{
name: "Case 7: Invalid TS2 (Bad Format)",
ts1: sameTime1,
ts2: "not a timestamp",
defaultTZ: "UTC",
expectedCmp: false,
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := CompareUnixMicroTz(tc.ts1, tc.ts2, tc.defaultTZ)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedCmp, result)
}
})
}
}
// TestValidateTimestampTz tests the simple validation function.
func TestValidateTimestampTz(t *testing.T) {
testCases := []struct {
name string
inputStr string
defaultTZ string
expectError bool
}{
{
name: "Case 1: Valid TZ-aware",
inputStr: "2024-10-23T15:30:00Z",
defaultTZ: "UTC",
expectError: false,
},
{
name: "Case 2: Valid Naive",
inputStr: "2024-10-23 15:30:00",
defaultTZ: "Asia/Shanghai",
expectError: false,
},
{
name: "Case 3: Invalid Format",
inputStr: "Invalid",
defaultTZ: "UTC",
expectError: true,
},
{
name: "Case 4: Invalid Offset",
inputStr: "2024-10-23T15:30:00+15:00",
defaultTZ: "UTC",
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := ValidateTimestampTz(tc.inputStr, tc.defaultTZ)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
// TestConvertUnixMicroToTimezoneString tests the conversion of Unix Microsecond
// to a TZ-aware RFC3339Nano string.
func TestConvertUnixMicroToTimezoneString2(t *testing.T) {
// Base time for all tests: 2025-03-20T10:30:00.123456Z (UTC)
// This date is chosen to test standard time outside of DST changes.
baseTimeUTC := time.Date(2025, time.March, 20, 10, 30, 0, 123456000, time.UTC)
baseUnixMicro := baseTimeUTC.UnixMicro() // 1710921000123456 (Actual value calculated by Go)
// DST test date: 2025-10-23T15:30:00.000000Z (UTC)
// New York is in EDT (UTC-04:00) on this date.
dstTimeUTC := time.Date(2025, time.October, 23, 15, 30, 0, 0, time.UTC)
dstUnixMicro := dstTimeUTC.UnixMicro() // 1729697400000000
testCases := []struct {
name string
unixMicro int64
targetTZ string
expectedOutput string
expectError bool
errorContainsMsg string
}{
// --- Basic Functionality & Precision ---
{
name: "Case 1: Standard conversion to UTC",
unixMicro: baseUnixMicro,
targetTZ: "UTC",
expectedOutput: "2025-03-20T10:30:00.123456Z",
expectError: false,
},
{
name: "Case 2: Conversion to Asia/Shanghai (+08:00)",
unixMicro: baseUnixMicro,
targetTZ: "Asia/Shanghai", // UTC+08:00
expectedOutput: "2025-03-20T18:30:00.123456+08:00",
expectError: false,
},
{
name: "Case 3: Conversion to America/Los_Angeles (PDT on Mar 20)",
unixMicro: baseUnixMicro,
targetTZ: "America/Los_Angeles",
// Corrected Output: 10:30 UTC - 7 hours (PDT) = 03:30 local time
expectedOutput: "2025-03-20T03:30:00.123456-07:00",
expectError: false,
},
// --- DST Handling (2025-10-23) ---
{
name: "Case 4: DST active (America/New_York) - UTC-04:00",
unixMicro: dstUnixMicro,
targetTZ: "America/New_York", // EDT = UTC-04:00 on this date
expectedOutput: "2025-10-23T11:30:00-04:00", // 15:30 UTC -> 11:30 local time
expectError: false,
},
// --- Error Handling ---
{
name: "Case 5: Invalid Timezone String",
unixMicro: baseUnixMicro,
targetTZ: "Invalid/TZ_Name",
expectedOutput: "",
expectError: true,
errorContainsMsg: "invalid target timezone string",
},
{
name: "Case 6: UnixMicro 0 (Epoch)",
unixMicro: 0,
targetTZ: "UTC",
expectedOutput: "1970-01-01T00:00:00Z",
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := ConvertUnixMicroToTimezoneString(tc.unixMicro, tc.targetTZ)
if tc.expectError {
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), tc.errorContainsMsg)
}
} else {
assert.NoError(t, err)
// Assert the resulting string can be parsed back and retains microsecond precision
tParsed, parseErr := time.Parse(time.RFC3339Nano, result)
assert.NoError(t, parseErr)
// Standardize the expected output to ensure nanosecond precision padding is correct
expectedFormatted := tc.expectedOutput
assert.Equal(t, expectedFormatted, result)
// Ensure the microsecond value is maintained (conversion consistency check)
assert.Equal(t, tc.unixMicro, tParsed.UnixMicro())
}
})
}
}
// TestFormatTimeMicroWithoutTrailingZerosREVISED tests the custom formatting function
// to ensure it correctly truncates to microsecond and removes trailing zeros.
func TestFormatTimeMicroWithoutTrailingZeros(t *testing.T) {
testCases := []struct {
name string
nanoSeconds int // Nanoseconds part
tzOffsetHours int // TZ Offset in hours
expectedOutput string // Expected final string after cleaning
}{
{
name: "Case 1: Full Microsecond Precision (No trailing zeros to remove)",
nanoSeconds: 123456000, // .123456
tzOffsetHours: 8,
expectedOutput: "2025-10-23T00:00:00.123456+08:00",
},
{
name: "Case 2: Millisecond Precision (Remove 6 trailing zeros)",
nanoSeconds: 123000000, // .123
tzOffsetHours: 8,
expectedOutput: "2025-10-23T00:00:00.123+08:00",
},
{
name: "Case 3: Second Precision (Remove all 9 fractional zeros)",
nanoSeconds: 0, // .000000
tzOffsetHours: 8,
expectedOutput: "2025-10-23T00:00:00+08:00", // Note: The dot is removed.
},
{
name: "Case 4: Single-digit precision (Remove 8 trailing zeros)",
nanoSeconds: 100000000, // .1
tzOffsetHours: 0,
expectedOutput: "2025-10-23T00:00:00.1Z",
},
{
name: "Case 5: Precision beyond microsecond (Truncate first, then remove zeros)",
nanoSeconds: 123456789, // .123456789 -> truncates to .123456
tzOffsetHours: -7,
expectedOutput: "2025-10-23T00:00:00.123456-07:00",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a time.Time object for a fixed date/time
tz := time.FixedZone("TEST_TZ", tc.tzOffsetHours*3600)
tInput := time.Date(2025, time.October, 23, 0, 0, 0, tc.nanoSeconds, tz)
result := FormatTimeMicroWithoutTrailingZeros(tInput)
assert.Equal(t, tc.expectedOutput, result)
// Verification: Ensure round-trip parsing still works and yields the correct microsecond value
tParsed, parseErr := time.Parse(time.RFC3339Nano, result)
assert.NoError(t, parseErr)
// Microsecond precision is preserved
assert.Equal(t, tInput.Truncate(time.Microsecond).UnixMicro(), tParsed.UnixMicro())
})
}
}
// funcutil/time_test.go (Updated TestConvertUnixMicroToTimezoneString)
// TestConvertUnixMicroToTimezoneString tests the conversion of Unix Microsecond
// to a TZ-aware RFC3339Nano string, now including the trailing zero removal logic.
func TestConvertUnixMicroToTimezoneString(t *testing.T) {
// Base time 1: 2025-03-20T10:30:00.123456Z (UTC). Nanoseconds: 123456000
baseTimeUTC := time.Date(2025, time.March, 20, 10, 30, 0, 123456000, time.UTC)
baseUnixMicro := baseTimeUTC.UnixMicro() // 1710921000123456
// Base time 2: 2025-10-23T15:30:00.000000Z (UTC). Nanoseconds: 0
dstTimeUTC := time.Date(2025, time.October, 23, 15, 30, 0, 0, time.UTC)
dstUnixMicro := dstTimeUTC.UnixMicro() // 1729697400000000
testCases := []struct {
name string
unixMicro int64
targetTZ string
expectedOutput string // NOW CLEANED OF TRAILING ZEROS
expectError bool
errorContainsMsg string
}{
// --- Basic Functionality & Precision (Using baseUnixMicro - ends in .123456) ---
{
name: "Case 1: Standard conversion to UTC (Microsecond precision)",
unixMicro: baseUnixMicro,
targetTZ: "UTC",
expectedOutput: "2025-03-20T10:30:00.123456Z", // Removed "000"
expectError: false,
},
{
name: "Case 2: Conversion to Asia/Shanghai (+08:00)",
unixMicro: baseUnixMicro,
targetTZ: "Asia/Shanghai", // UTC+08:00
expectedOutput: "2025-03-20T18:30:00.123456+08:00", // Removed "000"
expectError: false,
},
{
// Fix from previous round: 10:30 UTC - 7 hours (PDT) = 03:30 local time.
name: "Case 3: Conversion to America/Los_Angeles (PDT on Mar 20)",
unixMicro: baseUnixMicro,
targetTZ: "America/Los_Angeles", // PDT = UTC-07:00
expectedOutput: "2025-03-20T03:30:00.123456-07:00", // Removed "000"
expectError: false,
},
// --- DST Handling (Using dstUnixMicro - ends in .000000) ---
{
name: "Case 4: DST active (America/New_York) - UTC-04:00 (Second precision)",
unixMicro: dstUnixMicro,
targetTZ: "America/New_York", // EDT = UTC-04:00 on this date
expectedOutput: "2025-10-23T11:30:00-04:00", // Removed all fractional zeros and the dot
expectError: false,
},
// --- Custom Test Case: Only Millisecond precision (must remove 3 trailing zeros) ---
{
name: "Case 5: Millisecond precision input",
// Use the calculated value for 2025-03-20T10:30:00.123Z.
// If you were previously using baseUnixMicro, calculate the new value:
// (baseUnixMicro / 1000000) * 1000000 + 123000
unixMicro: (baseUnixMicro / 1000000 * 1000000) + 123000,
targetTZ: "UTC",
expectedOutput: "2025-03-20T10:30:00.123Z",
expectError: false,
},
// --- Error Handling ---
{
name: "Case 6: Invalid Timezone String",
unixMicro: baseUnixMicro,
targetTZ: "Invalid/TZ_Name",
expectedOutput: "",
expectError: true,
errorContainsMsg: "invalid target timezone string",
},
{
name: "Case 7: UnixMicro 0 (Epoch) - Second precision",
unixMicro: 0,
targetTZ: "UTC",
expectedOutput: "1970-01-01T00:00:00Z", // Removed all fractional zeros and the dot
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := ConvertUnixMicroToTimezoneString(tc.unixMicro, tc.targetTZ)
if tc.expectError {
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), tc.errorContainsMsg)
}
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedOutput, result)
// Verify round-trip consistency: the resulting string must parse back
// to the original microsecond value (since no significant digits were lost).
tParsed, parseErr := time.Parse(time.RFC3339Nano, result)
assert.NoError(t, parseErr)
assert.Equal(t, tc.unixMicro, tParsed.UnixMicro())
}
})
}
}
// TestIsTimezoneValid tests the IsTimezoneValid function.
func TestIsTimezoneValid(t *testing.T) {
// Common valid IANA timezone names
validTimezones := []string{
"UTC",
"Local",
"Asia/Shanghai",
"America/New_York",
"Europe/London",
"Australia/Sydney",
"Etc/GMT+10",
}
// Common invalid or malformed timezone names
invalidTimezones := []string{
"",
"GMT+8",
"CST",
"PST",
"Invalid/Zone",
"Asia/Beijingg",
"99:00",
time.FixedZone("MyZone", 3600).String(), // Custom location names (like "MyZone") are not recognized by time.LoadLocation.
}
// Test valid timezones
for _, tz := range validTimezones {
t.Run("Valid_"+tz, func(t *testing.T) {
if !IsTimezoneValid(tz) {
t.Errorf("IsTimezoneValid(\"%s\") expected true, but got false", tz)
}
})
}
// Test invalid timezones
for _, tz := range invalidTimezones {
t.Run("Invalid_"+tz, func(t *testing.T) {
if IsTimezoneValid(tz) {
t.Errorf("IsTimezoneValid(\"%s\") expected false, but got true", tz)
}
})
}
}

View File

@ -49,15 +49,16 @@ type AutoIndexConfig struct {
AutoIndexSearchConfig ParamItem `refreshable:"true"`
AutoIndexTuningConfig ParamGroup `refreshable:"true"`
ScalarAutoIndexEnable ParamItem `refreshable:"true"`
ScalarAutoIndexParams ParamItem `refreshable:"true"`
ScalarNumericIndexType ParamItem `refreshable:"true"`
ScalarIntIndexType ParamItem `refreshable:"true"`
ScalarVarcharIndexType ParamItem `refreshable:"true"`
ScalarBoolIndexType ParamItem `refreshable:"true"`
ScalarFloatIndexType ParamItem `refreshable:"true"`
ScalarJSONIndexType ParamItem `refreshable:"true"`
ScalarGeometryIndexType ParamItem `refreshable:"true"`
ScalarAutoIndexEnable ParamItem `refreshable:"true"`
ScalarAutoIndexParams ParamItem `refreshable:"true"`
ScalarNumericIndexType ParamItem `refreshable:"true"`
ScalarIntIndexType ParamItem `refreshable:"true"`
ScalarVarcharIndexType ParamItem `refreshable:"true"`
ScalarBoolIndexType ParamItem `refreshable:"true"`
ScalarFloatIndexType ParamItem `refreshable:"true"`
ScalarJSONIndexType ParamItem `refreshable:"true"`
ScalarGeometryIndexType ParamItem `refreshable:"true"`
ScalarTimestampTzIndexType ParamItem `refreshable:"true"`
BitmapCardinalityLimit ParamItem `refreshable:"true"`
}
@ -197,7 +198,7 @@ func (p *AutoIndexConfig) init(base *BaseTable) {
p.ScalarAutoIndexParams = ParamItem{
Key: "scalarAutoIndex.params.build",
Version: "2.4.0",
DefaultValue: `{"int": "HYBRID","varchar": "HYBRID","bool": "BITMAP", "float": "INVERTED", "json": "INVERTED", "geometry": "RTREE"}`,
DefaultValue: `{"int": "HYBRID","varchar": "HYBRID","bool": "BITMAP", "float": "INVERTED", "json": "INVERTED", "geometry": "RTREE", "timestamptz": "STL_SORT"}`,
}
p.ScalarAutoIndexParams.Init(base.mgr)
@ -262,6 +263,18 @@ func (p *AutoIndexConfig) init(base *BaseTable) {
}
p.ScalarGeometryIndexType.Init(base.mgr)
p.ScalarTimestampTzIndexType = ParamItem{
Version: "2.6.0",
Formatter: func(v string) string {
m := p.ScalarAutoIndexParams.GetAsJSONMap()
if m == nil {
return ""
}
return m["timestamptz"]
},
}
p.ScalarTimestampTzIndexType.Init(base.mgr)
p.BitmapCardinalityLimit = ParamItem{
Key: "scalarAutoIndex.params.bitmapCardinalityLimit",
Version: "2.5.0",

View File

@ -344,8 +344,9 @@ type SchemaHelper struct {
partitionKeyOffset int
clusteringKeyOffset int
dynamicFieldOffset int
// include sub fields in StructArrayField
// include sub-fields in StructArrayField
allFields []*schemapb.FieldSchema
timezone string
}
// CreateSchemaHelper returns a new SchemaHelper object
@ -403,9 +404,26 @@ func CreateSchemaHelper(schema *schemapb.CollectionSchema) (*SchemaHelper, error
schemaHelper.dynamicFieldOffset = offset
}
}
found := false
for _, kv := range schema.GetProperties() {
if kv.Key == common.TimezoneKey {
schemaHelper.timezone = kv.Value
found = true
break
}
}
if !found {
schemaHelper.timezone = common.DefaultTimezone
}
return &schemaHelper, nil
}
// GetTimezone returns the timezone string associated with the schema.
func (helper *SchemaHelper) GetTimezone() string {
return helper.timezone
}
// GetPrimaryKeyField returns the schema of the primary key
func (helper *SchemaHelper) GetPrimaryKeyField() (*schemapb.FieldSchema, error) {
if helper.primaryKeyOffset == -1 {
@ -608,8 +626,7 @@ func IsVectorArrayType(dataType schemapb.DataType) bool {
func IsIntegerType(dataType schemapb.DataType) bool {
switch dataType {
case schemapb.DataType_Int8, schemapb.DataType_Int16,
schemapb.DataType_Int32, schemapb.DataType_Int64,
schemapb.DataType_Timestamptz:
schemapb.DataType_Int32, schemapb.DataType_Int64:
return true
default:
return false
@ -624,6 +641,10 @@ func IsGeometryType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_Geometry
}
func IsTimestamptzType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_Timestamptz
}
func IsArrayType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_Array
}
@ -677,7 +698,7 @@ func IsVariableDataType(dataType schemapb.DataType) bool {
}
func IsPrimitiveType(dataType schemapb.DataType) bool {
return IsArithmetic(dataType) || IsStringType(dataType) || IsBoolType(dataType)
return IsArithmetic(dataType) || IsStringType(dataType) || IsBoolType(dataType) || IsTimestamptzType(dataType)
}
// PrepareResultFieldData construct this slice fo FieldData for final result reduce

View File

@ -471,7 +471,7 @@ func TestCreateSortedScalarIndex(t *testing.T) {
if field.DataType == entity.FieldTypeBool ||
field.DataType == entity.FieldTypeJSON || field.DataType == entity.FieldTypeArray {
_, err := mc.CreateIndex(ctx, client.NewCreateIndexOption(schema.CollectionName, field.Name, idx))
require.ErrorContains(t, err, "STL_SORT are only supported on numeric or varchar field")
require.ErrorContains(t, err, "STL_SORT are only supported on numeric, varchar or timestamptz field")
} else {
idxTask, err := mc.CreateIndex(ctx, client.NewCreateIndexOption(schema.CollectionName, field.Name, idx))
common.CheckErr(t, err, true)
@ -623,7 +623,7 @@ func TestCreateIndexJsonField(t *testing.T) {
errMsg string
}
inxError := []scalarIndexError{
{index.NewSortedIndex(), "STL_SORT are only supported on numeric or varchar field"},
{index.NewSortedIndex(), "STL_SORT are only supported on numeric, varchar or timestamptz field"},
{index.NewTrieIndex(), "TRIE are only supported on varchar field"},
}
for _, idxErr := range inxError {
@ -649,7 +649,7 @@ func TestCreateUnsupportedIndexArrayField(t *testing.T) {
errMsg string
}
inxError := []scalarIndexError{
{index.NewSortedIndex(), "STL_SORT are only supported on numeric or varchar field"},
{index.NewSortedIndex(), "STL_SORT are only supported on numeric, varchar or timestamptz field"},
{index.NewTrieIndex(), "TRIE are only supported on varchar field"},
}

View File

@ -851,7 +851,7 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base):
supported_field_type = "varchar"
got_json_suffix = ""
if not_supported_varchar_scalar_index == "STL_SORT":
supported_field_type = "numeric or varchar"
supported_field_type = "numeric, varchar or timestamptz"
got_json_suffix = ", got JSON"
if not_supported_varchar_scalar_index == "BITMAP":
supported_field_type = "bool, int, string and array"
@ -1814,4 +1814,4 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base):
"json_path": f"{json_field_name}['a'][0]",
"index_type": supported_varchar_scalar_index,
"field_name": json_field_name,
"index_name": index_name + '/a/0'})
"index_name": index_name + '/a/0'})