mirror of
https://gitee.com/dromara/MilvusPlus.git
synced 2025-12-06 17:08:27 +08:00
commit
1564b28b9a
@ -1,7 +1,7 @@
|
|||||||
package org.dromara.milvus.plus.cache;
|
package org.dromara.milvus.plus.cache;
|
||||||
|
|
||||||
import org.dromara.milvus.plus.model.MilvusEntity;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.dromara.milvus.plus.model.MilvusEntity;
|
||||||
/**
|
/**
|
||||||
* @author xgc
|
* @author xgc
|
||||||
**/
|
**/
|
||||||
@ -10,5 +10,6 @@ public class ConversionCache {
|
|||||||
private String collectionName;
|
private String collectionName;
|
||||||
private PropertyCache propertyCache;
|
private PropertyCache propertyCache;
|
||||||
private MilvusEntity milvusEntity;
|
private MilvusEntity milvusEntity;
|
||||||
|
private boolean autoID;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -48,6 +48,7 @@ public class MilvusConverter {
|
|||||||
return cache.getMilvusEntity();
|
return cache.getMilvusEntity();
|
||||||
}
|
}
|
||||||
MilvusEntity milvus = new MilvusEntity();
|
MilvusEntity milvus = new MilvusEntity();
|
||||||
|
boolean autoID=false;
|
||||||
// 集合名称
|
// 集合名称
|
||||||
MilvusCollection collectionAnnotation = entityClass.getAnnotation(MilvusCollection.class);
|
MilvusCollection collectionAnnotation = entityClass.getAnnotation(MilvusCollection.class);
|
||||||
if (Objects.isNull(collectionAnnotation)) {
|
if (Objects.isNull(collectionAnnotation)) {
|
||||||
@ -97,7 +98,8 @@ public class MilvusConverter {
|
|||||||
.isPrimaryKey(fieldAnnotation.isPrimaryKey())
|
.isPrimaryKey(fieldAnnotation.isPrimaryKey())
|
||||||
.isPartitionKey(fieldAnnotation.isPartitionKey())
|
.isPartitionKey(fieldAnnotation.isPartitionKey())
|
||||||
.elementType(fieldAnnotation.elementType())
|
.elementType(fieldAnnotation.elementType())
|
||||||
.autoID(fieldAnnotation.autoID());
|
.autoID(false);
|
||||||
|
autoID=autoID?autoID:fieldAnnotation.autoID();
|
||||||
// 描述
|
// 描述
|
||||||
Optional.of(fieldAnnotation.description())
|
Optional.of(fieldAnnotation.description())
|
||||||
.filter(StringUtils::isNotEmpty).ifPresent(builder::description);
|
.filter(StringUtils::isNotEmpty).ifPresent(builder::description);
|
||||||
@ -124,6 +126,7 @@ public class MilvusConverter {
|
|||||||
conversionCache.setMilvusEntity(milvus);
|
conversionCache.setMilvusEntity(milvus);
|
||||||
conversionCache.setCollectionName(collectionName);
|
conversionCache.setCollectionName(collectionName);
|
||||||
conversionCache.setPropertyCache(propertyCache);
|
conversionCache.setPropertyCache(propertyCache);
|
||||||
|
conversionCache.setAutoID(autoID);
|
||||||
MilvusCache.milvusCache.put(entityClass.getName(), conversionCache);
|
MilvusCache.milvusCache.put(entityClass.getName(), conversionCache);
|
||||||
|
|
||||||
return milvus;
|
return milvus;
|
||||||
|
|||||||
@ -10,10 +10,13 @@ import lombok.Data;
|
|||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.dromara.milvus.plus.cache.CollectionToPrimaryCache;
|
||||||
import org.dromara.milvus.plus.cache.ConversionCache;
|
import org.dromara.milvus.plus.cache.ConversionCache;
|
||||||
|
import org.dromara.milvus.plus.cache.MilvusCache;
|
||||||
import org.dromara.milvus.plus.cache.PropertyCache;
|
import org.dromara.milvus.plus.cache.PropertyCache;
|
||||||
import org.dromara.milvus.plus.core.FieldFunction;
|
import org.dromara.milvus.plus.core.FieldFunction;
|
||||||
import org.dromara.milvus.plus.model.vo.MilvusResp;
|
import org.dromara.milvus.plus.model.vo.MilvusResp;
|
||||||
|
import org.dromara.milvus.plus.util.IdWorkerUtils;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
@ -90,7 +93,9 @@ public class LambdaInsertWrapper<T> extends AbstractChainWrapper<T> implements
|
|||||||
return insert(iterator);
|
return insert(iterator);
|
||||||
}
|
}
|
||||||
public MilvusResp<InsertResp> insert(Iterator<T> iterator) throws MilvusException {
|
public MilvusResp<InsertResp> insert(Iterator<T> iterator) throws MilvusException {
|
||||||
|
ConversionCache conversionCache = MilvusCache.milvusCache.get(entityType.getName());
|
||||||
PropertyCache propertyCache = conversionCache.getPropertyCache();
|
PropertyCache propertyCache = conversionCache.getPropertyCache();
|
||||||
|
String pk = CollectionToPrimaryCache.collectionToPrimary.get(collectionName);
|
||||||
List<JSONObject> jsonObjects=new ArrayList<>();
|
List<JSONObject> jsonObjects=new ArrayList<>();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
T t1 = iterator.next();
|
T t1 = iterator.next();
|
||||||
@ -102,6 +107,9 @@ public class LambdaInsertWrapper<T> extends AbstractChainWrapper<T> implements
|
|||||||
String tk = propertyCache.functionToPropertyMap.get(key);
|
String tk = propertyCache.functionToPropertyMap.get(key);
|
||||||
jsonObject.put(tk,value);
|
jsonObject.put(tk,value);
|
||||||
}
|
}
|
||||||
|
if(conversionCache.isAutoID()){
|
||||||
|
jsonObject.put(pk, IdWorkerUtils.nextId());
|
||||||
|
}
|
||||||
jsonObjects.add(jsonObject);
|
jsonObjects.add(jsonObject);
|
||||||
}
|
}
|
||||||
return insert(jsonObjects);
|
return insert(jsonObjects);
|
||||||
|
|||||||
@ -4,9 +4,9 @@ import com.alibaba.fastjson.JSON;
|
|||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import io.milvus.exception.MilvusException;
|
import io.milvus.exception.MilvusException;
|
||||||
import io.milvus.v2.client.MilvusClientV2;
|
import io.milvus.v2.client.MilvusClientV2;
|
||||||
import io.milvus.v2.service.vector.request.SearchReq;
|
import io.milvus.v2.service.vector.request.QueryReq;
|
||||||
import io.milvus.v2.service.vector.request.UpsertReq;
|
import io.milvus.v2.service.vector.request.UpsertReq;
|
||||||
import io.milvus.v2.service.vector.response.SearchResp;
|
import io.milvus.v2.service.vector.response.QueryResp;
|
||||||
import io.milvus.v2.service.vector.response.UpsertResp;
|
import io.milvus.v2.service.vector.response.UpsertResp;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
@ -340,12 +340,12 @@ public class LambdaUpdateWrapper<T> extends AbstractChainWrapper<T> implements W
|
|||||||
*
|
*
|
||||||
* @return 搜索请求对象
|
* @return 搜索请求对象
|
||||||
*/
|
*/
|
||||||
private SearchResp build() {
|
private QueryResp build() {
|
||||||
String filterStr = buildFilters();
|
String filterStr = buildFilters();
|
||||||
if (filterStr != null && !filterStr.isEmpty()) {
|
if (filterStr != null && !filterStr.isEmpty()) {
|
||||||
SearchReq.SearchReqBuilder<?, ?> builder = SearchReq.builder()
|
QueryReq.QueryReqBuilder<?, ?> builder = QueryReq.builder()
|
||||||
.collectionName(collectionName).filter(filterStr);
|
.collectionName(collectionName).filter(filterStr);
|
||||||
return client.search(builder.build());
|
return client.query(builder.build());
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -356,43 +356,74 @@ public class LambdaUpdateWrapper<T> extends AbstractChainWrapper<T> implements W
|
|||||||
*
|
*
|
||||||
* @return 更新响应对象
|
* @return 更新响应对象
|
||||||
*/
|
*/
|
||||||
public MilvusResp<UpsertResp> update(T t) throws MilvusException {
|
public MilvusResp<UpsertResp> update(T entity) throws MilvusException {
|
||||||
List<JSONObject> jsonObjects = new ArrayList<>();
|
// 获取主键字段
|
||||||
SearchResp searchResp = build();
|
String primaryKeyField = CollectionToPrimaryCache.collectionToPrimary.get(collectionName);
|
||||||
List<Object> ids = new ArrayList<>();
|
// 将实体转换为属性映射
|
||||||
if (searchResp != null) {
|
Map<String, Object> propertiesMap = getPropertiesMap(entity);
|
||||||
for (List<SearchResp.SearchResult> searchResult : searchResp.getSearchResults()) {
|
PropertyCache propertyCache = conversionCache.getPropertyCache();
|
||||||
for (SearchResp.SearchResult result : searchResult) {
|
// 初始化主键标识和主键值
|
||||||
ids.add(result.getId());
|
boolean hasPrimaryKey = false;
|
||||||
|
Object primaryKeyValue = null;
|
||||||
|
|
||||||
|
// 准备更新的数据列表
|
||||||
|
List<JSONObject> updateDataList = new ArrayList<>();
|
||||||
|
|
||||||
|
// 构建单个更新对象
|
||||||
|
JSONObject updateObject = new JSONObject();
|
||||||
|
for (Map.Entry<String, Object> entry : propertiesMap.entrySet()) {
|
||||||
|
String field = entry.getKey();
|
||||||
|
Object value = entry.getValue();
|
||||||
|
String tableNameColumn = propertyCache.functionToPropertyMap.get(field);
|
||||||
|
// 检查是否为主键字段
|
||||||
|
if (primaryKeyField.equals(tableNameColumn)) {
|
||||||
|
hasPrimaryKey = true;
|
||||||
|
primaryKeyValue = value;
|
||||||
|
}
|
||||||
|
// 添加到更新对象
|
||||||
|
updateObject.put(tableNameColumn, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查是否需要构建查询条件
|
||||||
|
boolean needBuildQuery = !hasPrimaryKey;
|
||||||
|
if (hasPrimaryKey) {
|
||||||
|
for (Map.Entry<String, String> property : propertyCache.functionToPropertyMap.entrySet()) {
|
||||||
|
if (updateObject.get(property.getValue()) == null) {
|
||||||
|
needBuildQuery = true;
|
||||||
|
eq(primaryKeyField,primaryKeyValue);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Map<String, Object> propertiesMap = getPropertiesMap(t);
|
|
||||||
PropertyCache propertyCache = conversionCache.getPropertyCache();
|
// 如果需要构建查询条件,则执行查询并准备更新数据
|
||||||
String pk = CollectionToPrimaryCache.collectionToPrimary.get(collectionName);
|
if (needBuildQuery) {
|
||||||
boolean havePk = false;
|
QueryResp queryResp = build();
|
||||||
JSONObject jsonObject = new JSONObject();
|
if (queryResp != null) {
|
||||||
for (Map.Entry<String, Object> entry : propertiesMap.entrySet()) {
|
for (QueryResp.QueryResult result : queryResp.getQueryResults()) {
|
||||||
String key = entry.getKey();
|
Map<String, Object> existingEntity = result.getEntity();
|
||||||
Object value = entry.getValue();
|
JSONObject existingData = new JSONObject();
|
||||||
String tk = propertyCache.functionToPropertyMap.get(key);
|
|
||||||
if (pk.equals(tk)) {
|
for (Map.Entry<String, Object> existingEntry : existingEntity.entrySet()) {
|
||||||
havePk = true;
|
String existingField = existingEntry.getKey();
|
||||||
|
Object existingValue = existingEntry.getValue();
|
||||||
|
Object updateValue = updateObject.get(existingField);
|
||||||
|
existingData.put(existingField, updateValue != null ? updateValue : existingValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateDataList.add(existingData);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
jsonObject.put(tk, value);
|
|
||||||
}
|
|
||||||
if (!havePk && ids.isEmpty()) {
|
|
||||||
throw new MilvusException("not find primary key", 400);
|
|
||||||
}
|
|
||||||
if (havePk) {
|
|
||||||
jsonObjects.add(jsonObject);
|
|
||||||
} else {
|
} else {
|
||||||
for (Object id : ids) {
|
updateDataList.add(updateObject);
|
||||||
jsonObject.put(pk, id);
|
|
||||||
jsonObjects.add(jsonObject);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return update(jsonObjects);
|
|
||||||
|
// 检查是否有数据需要更新
|
||||||
|
if (updateDataList.isEmpty()) {
|
||||||
|
return new MilvusResp<>();
|
||||||
|
}
|
||||||
|
// 执行更新操作
|
||||||
|
return update(updateDataList);
|
||||||
}
|
}
|
||||||
|
|
||||||
private MilvusResp<UpsertResp> update(List<JSONObject> jsonObjects) {
|
private MilvusResp<UpsertResp> update(List<JSONObject> jsonObjects) {
|
||||||
|
|||||||
@ -121,7 +121,7 @@ public abstract class AbstractMilvusClientBuilder implements MilvusClientBuilder
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void aliasProcess(MilvusEntity milvusEntity) {
|
private void aliasProcess(MilvusEntity milvusEntity) {
|
||||||
if (StringUtils.isBlank(milvusEntity.getCollectionName()) || milvusEntity.getAlias().isEmpty()) {
|
if (StringUtils.isBlank(milvusEntity.getCollectionName()) || milvusEntity.getAlias()==null|| milvusEntity.getAlias().isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ListAliasResp listAliasResp = listAliases(milvusEntity);
|
ListAliasResp listAliasResp = listAliases(milvusEntity);
|
||||||
|
|||||||
@ -0,0 +1,135 @@
|
|||||||
|
package org.dromara.milvus.plus.util;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.NetworkInterface;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class IdWorkerUtils {
|
||||||
|
|
||||||
|
// 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动)
|
||||||
|
private static final long twepoch = 1288834974657L;
|
||||||
|
|
||||||
|
// 机器标识位数
|
||||||
|
private static final long workerIdBits = 5L;
|
||||||
|
// 数据中心标识位数
|
||||||
|
private static final long datacenterIdBits = 5L;
|
||||||
|
|
||||||
|
// 机器ID最大值
|
||||||
|
private static final long maxWorkerId = -1L ^ (-1L << workerIdBits);
|
||||||
|
// 数据中心ID最大值
|
||||||
|
private static final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
|
||||||
|
|
||||||
|
// 毫秒内自增位
|
||||||
|
private static final long sequenceBits = 12L;
|
||||||
|
|
||||||
|
// 机器ID偏左移12位
|
||||||
|
private static final long workerIdShift = sequenceBits;
|
||||||
|
// 数据中心ID左移17位
|
||||||
|
private static final long datacenterIdShift = sequenceBits + workerIdBits;
|
||||||
|
// 时间毫秒左移22位
|
||||||
|
private static final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
|
||||||
|
|
||||||
|
private static final long sequenceMask = -1L ^ (-1L << sequenceBits);
|
||||||
|
/* 上次生产id时间戳 */
|
||||||
|
private static long lastTimestamp = -1L;
|
||||||
|
// 0,并发控制
|
||||||
|
private static long sequence = 0L;
|
||||||
|
|
||||||
|
// 数据标识id部分
|
||||||
|
private static long datacenterId;
|
||||||
|
// 机器ID
|
||||||
|
private static long workerId;
|
||||||
|
|
||||||
|
static {
|
||||||
|
datacenterId = getDatacenterId(maxDatacenterId);
|
||||||
|
workerId = getMaxWorkerId();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取下一个ID
|
||||||
|
*
|
||||||
|
* @return the next ID
|
||||||
|
*/
|
||||||
|
public static synchronized long nextId() {
|
||||||
|
long timestamp = timeGen();
|
||||||
|
if (timestamp < lastTimestamp) {
|
||||||
|
throw new RuntimeException(String.format(
|
||||||
|
"Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastTimestamp == timestamp) {
|
||||||
|
sequence = (sequence + 1) & sequenceMask;
|
||||||
|
if (sequence == 0) {
|
||||||
|
timestamp = tilNextMillis(lastTimestamp);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sequence = 0L;
|
||||||
|
}
|
||||||
|
lastTimestamp = timestamp;
|
||||||
|
// ID偏移组合生成最终的ID,并返回ID
|
||||||
|
long nextId = ((timestamp - twepoch) << timestampLeftShift)
|
||||||
|
| (datacenterId << datacenterIdShift)
|
||||||
|
| (workerId << workerIdShift) | sequence;
|
||||||
|
return nextId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long tilNextMillis(final long lastTimestamp) {
|
||||||
|
long timestamp = timeGen();
|
||||||
|
while (timestamp <= lastTimestamp) {
|
||||||
|
timestamp = timeGen();
|
||||||
|
}
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long timeGen() {
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 获取 maxWorkerId
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
private static long getMaxWorkerId() {
|
||||||
|
long maxWorkerId = 0L;
|
||||||
|
try {
|
||||||
|
List<NetworkInterface> networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces());
|
||||||
|
for (NetworkInterface networkInterface : networkInterfaces) {
|
||||||
|
if (networkInterface.getName().contains("eth") || networkInterface.getName().contains("wlan")) {
|
||||||
|
byte[] macBytes = networkInterface.getHardwareAddress();
|
||||||
|
if (macBytes != null) {
|
||||||
|
maxWorkerId |= ((macBytes[macBytes.length - 1] & 0xff) | ((macBytes[macBytes.length - 2] & 0xff) << 8)) & 0xffff;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return (maxWorkerId >> 10) & 0x3ff;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 数据标识id部分
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
private static long getDatacenterId(long maxDatacenterId) {
|
||||||
|
long id = 0L;
|
||||||
|
try {
|
||||||
|
InetAddress ip = InetAddress.getLocalHost();
|
||||||
|
NetworkInterface network = NetworkInterface.getByInetAddress(ip);
|
||||||
|
if (network == null) {
|
||||||
|
id = 1L;
|
||||||
|
} else {
|
||||||
|
byte[] mac = network.getHardwareAddress();
|
||||||
|
id = ((0x000000FF & (long) mac[mac.length - 1])
|
||||||
|
| (0x0000FF00 & (((long) mac[mac.length - 2]) << 8))) >> 6;
|
||||||
|
id = id % (maxDatacenterId + 1);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.out.println(" getDatacenterId: " + e.getMessage());
|
||||||
|
}
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user