From 54b3e509bc409341af6db74ccb545a5aae70ebe3 Mon Sep 17 00:00:00 2001 From: xgc Date: Thu, 6 Jun 2024 17:55:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E7=94=9F=E6=88=90=E4=B8=BB=E9=94=AE=E6=97=A0=E6=B3=95=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E5=AE=98=E6=96=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../milvus/plus/cache/ConversionCache.java | 3 +- .../plus/converter/MilvusConverter.java | 5 +- .../milvus/plus/util/IdWorkerUtils.java | 135 ++++++++++++++++++ 3 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 milvus-plus-core/src/main/java/org/dromara/milvus/plus/util/IdWorkerUtils.java diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/cache/ConversionCache.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/cache/ConversionCache.java index 0132cec..c332ad5 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/cache/ConversionCache.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/cache/ConversionCache.java @@ -1,7 +1,7 @@ package org.dromara.milvus.plus.cache; -import org.dromara.milvus.plus.model.MilvusEntity; import lombok.Data; +import org.dromara.milvus.plus.model.MilvusEntity; /** * @author xgc **/ @@ -10,5 +10,6 @@ public class ConversionCache { private String collectionName; private PropertyCache propertyCache; private MilvusEntity milvusEntity; + private boolean autoID; } diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/converter/MilvusConverter.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/converter/MilvusConverter.java index 189c68e..3431eb2 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/converter/MilvusConverter.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/converter/MilvusConverter.java @@ -48,6 +48,7 @@ public class MilvusConverter { return cache.getMilvusEntity(); } MilvusEntity milvus = new MilvusEntity(); + boolean autoID=false; // 集合名称 MilvusCollection collectionAnnotation = entityClass.getAnnotation(MilvusCollection.class); if (Objects.isNull(collectionAnnotation)) { @@ -97,7 +98,8 @@ public class MilvusConverter { .isPrimaryKey(fieldAnnotation.isPrimaryKey()) .isPartitionKey(fieldAnnotation.isPartitionKey()) .elementType(fieldAnnotation.elementType()) - .autoID(fieldAnnotation.autoID()); + .autoID(false); + autoID=autoID?autoID:fieldAnnotation.autoID(); // 描述 Optional.of(fieldAnnotation.description()) .filter(StringUtils::isNotEmpty).ifPresent(builder::description); @@ -124,6 +126,7 @@ public class MilvusConverter { conversionCache.setMilvusEntity(milvus); conversionCache.setCollectionName(collectionName); conversionCache.setPropertyCache(propertyCache); + conversionCache.setAutoID(autoID); MilvusCache.milvusCache.put(entityClass.getName(), conversionCache); return milvus; diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/util/IdWorkerUtils.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/util/IdWorkerUtils.java new file mode 100644 index 0000000..3e31327 --- /dev/null +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/util/IdWorkerUtils.java @@ -0,0 +1,135 @@ +package org.dromara.milvus.plus.util; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.Collections; +import java.util.List; + +public class IdWorkerUtils { + + // 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动) + private static final long twepoch = 1288834974657L; + + // 机器标识位数 + private static final long workerIdBits = 5L; + // 数据中心标识位数 + private static final long datacenterIdBits = 5L; + + // 机器ID最大值 + private static final long maxWorkerId = -1L ^ (-1L << workerIdBits); + // 数据中心ID最大值 + private static final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); + + // 毫秒内自增位 + private static final long sequenceBits = 12L; + + // 机器ID偏左移12位 + private static final long workerIdShift = sequenceBits; + // 数据中心ID左移17位 + private static final long datacenterIdShift = sequenceBits + workerIdBits; + // 时间毫秒左移22位 + private static final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + + private static final long sequenceMask = -1L ^ (-1L << sequenceBits); + /* 上次生产id时间戳 */ + private static long lastTimestamp = -1L; + // 0,并发控制 + private static long sequence = 0L; + + // 数据标识id部分 + private static long datacenterId; + // 机器ID + private static long workerId; + + static { + datacenterId = getDatacenterId(maxDatacenterId); + workerId = getMaxWorkerId(); + } + + /** + * 获取下一个ID + * + * @return the next ID + */ + public static synchronized long nextId() { + long timestamp = timeGen(); + if (timestamp < lastTimestamp) { + throw new RuntimeException(String.format( + "Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + if (sequence == 0) { + timestamp = tilNextMillis(lastTimestamp); + } + } else { + sequence = 0L; + } + lastTimestamp = timestamp; + // ID偏移组合生成最终的ID,并返回ID + long nextId = ((timestamp - twepoch) << timestampLeftShift) + | (datacenterId << datacenterIdShift) + | (workerId << workerIdShift) | sequence; + return nextId; + } + + private static long tilNextMillis(final long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + private static long timeGen() { + return System.currentTimeMillis(); + } + + /** + *

+ * 获取 maxWorkerId + *

+ */ + private static long getMaxWorkerId() { + long maxWorkerId = 0L; + try { + List networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + for (NetworkInterface networkInterface : networkInterfaces) { + if (networkInterface.getName().contains("eth") || networkInterface.getName().contains("wlan")) { + byte[] macBytes = networkInterface.getHardwareAddress(); + if (macBytes != null) { + maxWorkerId |= ((macBytes[macBytes.length - 1] & 0xff) | ((macBytes[macBytes.length - 2] & 0xff) << 8)) & 0xffff; + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return (maxWorkerId >> 10) & 0x3ff; + } + + /** + *

+ * 数据标识id部分 + *

+ */ + private static long getDatacenterId(long maxDatacenterId) { + long id = 0L; + try { + InetAddress ip = InetAddress.getLocalHost(); + NetworkInterface network = NetworkInterface.getByInetAddress(ip); + if (network == null) { + id = 1L; + } else { + byte[] mac = network.getHardwareAddress(); + id = ((0x000000FF & (long) mac[mac.length - 1]) + | (0x0000FF00 & (((long) mac[mac.length - 2]) << 8))) >> 6; + id = id % (maxDatacenterId + 1); + } + } catch (Exception e) { + System.out.println(" getDatacenterId: " + e.getMessage()); + } + return id; + } +} \ No newline at end of file From 610fdf1688e09a1e898e20c508bb1dd8d3530f0a Mon Sep 17 00:00:00 2001 From: xgc Date: Thu, 6 Jun 2024 17:56:40 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E5=BF=85=E9=A1=BB=E9=9C=80=E8=A6=81=E5=85=A8=E9=83=A8=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E6=95=B0=E6=8D=AE=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/conditions/LambdaInsertWrapper.java | 8 ++ .../core/conditions/LambdaUpdateWrapper.java | 103 ++++++++++++------ .../service/AbstractMilvusClientBuilder.java | 2 +- 3 files changed, 76 insertions(+), 37 deletions(-) diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java index 7c35ffa..83061a6 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java @@ -10,10 +10,13 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.dromara.milvus.plus.cache.CollectionToPrimaryCache; import org.dromara.milvus.plus.cache.ConversionCache; +import org.dromara.milvus.plus.cache.MilvusCache; import org.dromara.milvus.plus.cache.PropertyCache; import org.dromara.milvus.plus.core.FieldFunction; import org.dromara.milvus.plus.model.vo.MilvusResp; +import org.dromara.milvus.plus.util.IdWorkerUtils; import java.util.*; @@ -90,7 +93,9 @@ public class LambdaInsertWrapper extends AbstractChainWrapper implements return insert(iterator); } public MilvusResp insert(Iterator iterator) throws MilvusException { + ConversionCache conversionCache = MilvusCache.milvusCache.get(entityType.getName()); PropertyCache propertyCache = conversionCache.getPropertyCache(); + String pk = CollectionToPrimaryCache.collectionToPrimary.get(collectionName); List jsonObjects=new ArrayList<>(); while (iterator.hasNext()) { T t1 = iterator.next(); @@ -102,6 +107,9 @@ public class LambdaInsertWrapper extends AbstractChainWrapper implements String tk = propertyCache.functionToPropertyMap.get(key); jsonObject.put(tk,value); } + if(conversionCache.isAutoID()){ + jsonObject.put(pk, IdWorkerUtils.nextId()); + } jsonObjects.add(jsonObject); } return insert(jsonObjects); diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java index c74403c..84dcc77 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java @@ -4,9 +4,9 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.milvus.exception.MilvusException; import io.milvus.v2.client.MilvusClientV2; -import io.milvus.v2.service.vector.request.SearchReq; +import io.milvus.v2.service.vector.request.QueryReq; import io.milvus.v2.service.vector.request.UpsertReq; -import io.milvus.v2.service.vector.response.SearchResp; +import io.milvus.v2.service.vector.response.QueryResp; import io.milvus.v2.service.vector.response.UpsertResp; import lombok.Data; import lombok.EqualsAndHashCode; @@ -340,12 +340,12 @@ public class LambdaUpdateWrapper extends AbstractChainWrapper implements W * * @return 搜索请求对象 */ - private SearchResp build() { + private QueryResp build() { String filterStr = buildFilters(); if (filterStr != null && !filterStr.isEmpty()) { - SearchReq.SearchReqBuilder builder = SearchReq.builder() + QueryReq.QueryReqBuilder builder = QueryReq.builder() .collectionName(collectionName).filter(filterStr); - return client.search(builder.build()); + return client.query(builder.build()); } else { return null; } @@ -356,43 +356,74 @@ public class LambdaUpdateWrapper extends AbstractChainWrapper implements W * * @return 更新响应对象 */ - public MilvusResp update(T t) throws MilvusException { - List jsonObjects = new ArrayList<>(); - SearchResp searchResp = build(); - List ids = new ArrayList<>(); - if (searchResp != null) { - for (List searchResult : searchResp.getSearchResults()) { - for (SearchResp.SearchResult result : searchResult) { - ids.add(result.getId()); + public MilvusResp update(T entity) throws MilvusException { + // 获取主键字段 + String primaryKeyField = CollectionToPrimaryCache.collectionToPrimary.get(collectionName); + // 将实体转换为属性映射 + Map propertiesMap = getPropertiesMap(entity); + PropertyCache propertyCache = conversionCache.getPropertyCache(); + // 初始化主键标识和主键值 + boolean hasPrimaryKey = false; + Object primaryKeyValue = null; + + // 准备更新的数据列表 + List updateDataList = new ArrayList<>(); + + // 构建单个更新对象 + JSONObject updateObject = new JSONObject(); + for (Map.Entry entry : propertiesMap.entrySet()) { + String field = entry.getKey(); + Object value = entry.getValue(); + String tableNameColumn = propertyCache.functionToPropertyMap.get(field); + // 检查是否为主键字段 + if (primaryKeyField.equals(tableNameColumn)) { + hasPrimaryKey = true; + primaryKeyValue = value; + } + // 添加到更新对象 + updateObject.put(tableNameColumn, value); + } + + // 检查是否需要构建查询条件 + boolean needBuildQuery = !hasPrimaryKey; + if (hasPrimaryKey) { + for (Map.Entry property : propertyCache.functionToPropertyMap.entrySet()) { + if (updateObject.get(property.getValue()) == null) { + needBuildQuery = true; + eq(primaryKeyField,primaryKeyValue); + break; } } } - Map propertiesMap = getPropertiesMap(t); - PropertyCache propertyCache = conversionCache.getPropertyCache(); - String pk = CollectionToPrimaryCache.collectionToPrimary.get(collectionName); - boolean havePk = false; - JSONObject jsonObject = new JSONObject(); - for (Map.Entry entry : propertiesMap.entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - String tk = propertyCache.functionToPropertyMap.get(key); - if (pk.equals(tk)) { - havePk = true; + + // 如果需要构建查询条件,则执行查询并准备更新数据 + if (needBuildQuery) { + QueryResp queryResp = build(); + if (queryResp != null) { + for (QueryResp.QueryResult result : queryResp.getQueryResults()) { + Map existingEntity = result.getEntity(); + JSONObject existingData = new JSONObject(); + + for (Map.Entry existingEntry : existingEntity.entrySet()) { + String existingField = existingEntry.getKey(); + Object existingValue = existingEntry.getValue(); + Object updateValue = updateObject.get(existingField); + existingData.put(existingField, updateValue != null ? updateValue : existingValue); + } + + updateDataList.add(existingData); + } } - jsonObject.put(tk, value); - } - if (!havePk && ids.isEmpty()) { - throw new MilvusException("not find primary key", 400); - } - if (havePk) { - jsonObjects.add(jsonObject); } else { - for (Object id : ids) { - jsonObject.put(pk, id); - jsonObjects.add(jsonObject); - } + updateDataList.add(updateObject); } - return update(jsonObjects); + + // 检查是否有数据需要更新 + if (updateDataList.isEmpty()) { + return new MilvusResp<>(); + } + // 执行更新操作 + return update(updateDataList); } private MilvusResp update(List jsonObjects) { diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/service/AbstractMilvusClientBuilder.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/service/AbstractMilvusClientBuilder.java index 1b72179..2ba5eb4 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/service/AbstractMilvusClientBuilder.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/service/AbstractMilvusClientBuilder.java @@ -121,7 +121,7 @@ public abstract class AbstractMilvusClientBuilder implements MilvusClientBuilder } private void aliasProcess(MilvusEntity milvusEntity) { - if (StringUtils.isBlank(milvusEntity.getCollectionName()) || milvusEntity.getAlias().isEmpty()) { + if (StringUtils.isBlank(milvusEntity.getCollectionName()) || milvusEntity.getAlias()==null|| milvusEntity.getAlias().isEmpty()) { return; } ListAliasResp listAliasResp = listAliases(milvusEntity);