mirror of
https://gitee.com/dromara/easy-es.git
synced 2025-12-06 17:18:57 +08:00
0.9.9版本 自动托管索引功能 支持多种策略
This commit is contained in:
parent
87a111e9f4
commit
6441351b13
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>easy-es-parent</artifactId>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
<relativePath>../easy-es-parent</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
@ -21,12 +21,12 @@
|
||||
<dependency>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-core</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-extension</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@ -57,7 +57,7 @@ public class EsAutoConfiguration implements InitializingBean, EnvironmentAware,
|
||||
@ConditionalOnMissingBean
|
||||
public RestHighLevelClient restHighLevelClient() {
|
||||
// 处理地址
|
||||
String address = environment.getProperty(ADDRESS);
|
||||
String address = esConfigProperties.getAddress();
|
||||
if (StringUtils.isEmpty(address)) {
|
||||
throw ExceptionUtils.eee("please config the es address");
|
||||
}
|
||||
|
||||
@ -6,26 +6,18 @@ package com.xpc.easyes.autoconfig.constants;
|
||||
* Copyright © 2021 xpc1024 All Rights Reserved
|
||||
**/
|
||||
public interface PropertyKeyConstants {
|
||||
/**
|
||||
* es 地址
|
||||
*/
|
||||
String ADDRESS = "easy-es.address";
|
||||
/**
|
||||
* 属性
|
||||
*/
|
||||
String SCHEMA = "easy-es.schema";
|
||||
/**
|
||||
* es用户名
|
||||
*/
|
||||
String USERNAME = "easy-es.username";
|
||||
/**
|
||||
* es密码
|
||||
*/
|
||||
String PASSWORD = "easy-es.password";
|
||||
/**
|
||||
* 框架banner是否展示
|
||||
*/
|
||||
String PRINT_DSL = "easy-es.global-config.print-dsl";
|
||||
/**
|
||||
* 是否开启索引自动托管
|
||||
*/
|
||||
String OPEN_AUTO_PROCESS_INDEX = "easy-es.global-config.open-auto-process-index";
|
||||
/**
|
||||
* 自动托管索引模式
|
||||
*/
|
||||
String AUTO_PROCESS_INDEX_MODE = "easy-es.global-config..auto_process_index_mode";
|
||||
/**
|
||||
* es索引前缀
|
||||
*/
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
package com.xpc.easyes.autoconfig.factory;
|
||||
|
||||
|
||||
import com.xpc.easyes.autoconfig.service.AutoProcessIndexService;
|
||||
import com.xpc.easyes.core.toolkit.ExceptionUtils;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.EnvironmentAware;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.xpc.easyes.autoconfig.constants.PropertyKeyConstants.OPEN_AUTO_PROCESS_INDEX;
|
||||
|
||||
/**
|
||||
* 自动托管索引策略工厂
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@Component
|
||||
@ConditionalOnClass(RestHighLevelClient.class)
|
||||
@ConditionalOnProperty(prefix = "easy-es", name = {"enable"}, havingValue = "true", matchIfMissing = true)
|
||||
public class IndexStrategyFactory implements ApplicationContextAware, InitializingBean, EnvironmentAware {
|
||||
/**
|
||||
* 预估初始策略工厂容量
|
||||
*/
|
||||
private static final Integer DEFAULT_SIZE = 4;
|
||||
private ApplicationContext applicationContext;
|
||||
private Environment environment;
|
||||
/**
|
||||
* 策略容器
|
||||
*/
|
||||
private static final Map<Integer, AutoProcessIndexService> SERVICE_MAP = new HashMap<>(DEFAULT_SIZE);
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 默认开启
|
||||
boolean isOpen = Optional.ofNullable(environment.getProperty(OPEN_AUTO_PROCESS_INDEX))
|
||||
.map(Boolean::parseBoolean)
|
||||
.orElse(Boolean.TRUE);
|
||||
if (isOpen) {
|
||||
applicationContext.getBeansOfType(AutoProcessIndexService.class)
|
||||
.values()
|
||||
.forEach(v -> SERVICE_MAP.putIfAbsent(v.getStrategyType(), v));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnvironment(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
public AutoProcessIndexService getByStrategyType(Integer strategyType) {
|
||||
return Optional.ofNullable(SERVICE_MAP.get(strategyType))
|
||||
.orElseThrow(() -> ExceptionUtils.eee("no such service strategyType:{}", strategyType));
|
||||
|
||||
}
|
||||
}
|
||||
@ -1,16 +1,13 @@
|
||||
package com.xpc.easyes.autoconfig.register;
|
||||
|
||||
import com.xpc.easyes.autoconfig.config.EsConfigProperties;
|
||||
import com.xpc.easyes.autoconfig.factory.IndexStrategyFactory;
|
||||
import com.xpc.easyes.autoconfig.service.AutoProcessIndexService;
|
||||
import com.xpc.easyes.core.cache.BaseCache;
|
||||
import com.xpc.easyes.core.common.EntityFieldInfo;
|
||||
import com.xpc.easyes.core.common.EntityInfo;
|
||||
import com.xpc.easyes.core.enums.Analyzer;
|
||||
import com.xpc.easyes.core.params.EsIndexParam;
|
||||
import com.xpc.easyes.core.params.IndexParam;
|
||||
import com.xpc.easyes.core.cache.GlobalConfigCache;
|
||||
import com.xpc.easyes.core.config.GlobalConfig;
|
||||
import com.xpc.easyes.core.enums.AutoProcessIndexStrategyEnum;
|
||||
import com.xpc.easyes.core.proxy.EsMapperProxy;
|
||||
import com.xpc.easyes.core.toolkit.CollectionUtils;
|
||||
import com.xpc.easyes.core.toolkit.EntityInfoHelper;
|
||||
import com.xpc.easyes.core.toolkit.IndexUtils;
|
||||
import com.xpc.easyes.core.toolkit.TypeUtils;
|
||||
import com.xpc.easyes.extension.anno.Intercepts;
|
||||
import com.xpc.easyes.extension.plugins.Interceptor;
|
||||
@ -21,12 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* 代理类
|
||||
@ -34,8 +26,6 @@ import java.util.logging.Logger;
|
||||
* Copyright © 2021 xpc1024 All Rights Reserved
|
||||
**/
|
||||
public class MapperFactoryBean<T> implements FactoryBean<T> {
|
||||
private final static Logger log = Logger.getAnonymousLogger();
|
||||
|
||||
private Class<T> mapperInterface;
|
||||
|
||||
@Autowired
|
||||
@ -47,6 +37,9 @@ public class MapperFactoryBean<T> implements FactoryBean<T> {
|
||||
@Autowired
|
||||
private EsConfigProperties esConfigProperties;
|
||||
|
||||
@Autowired
|
||||
private IndexStrategyFactory indexStrategyFactory;
|
||||
|
||||
public MapperFactoryBean() {
|
||||
}
|
||||
|
||||
@ -59,8 +52,11 @@ public class MapperFactoryBean<T> implements FactoryBean<T> {
|
||||
|
||||
EsMapperProxy<T> esMapperProxy = new EsMapperProxy<>(mapperInterface);
|
||||
|
||||
// 获取实体类
|
||||
Class<?> entityClass = TypeUtils.getInterfaceT(mapperInterface, 0);
|
||||
|
||||
// 初始化缓存
|
||||
BaseCache.initCache(mapperInterface, client);
|
||||
BaseCache.initCache(mapperInterface, entityClass, client);
|
||||
|
||||
// 创建代理
|
||||
T t = (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[]{mapperInterface}, esMapperProxy);
|
||||
@ -69,15 +65,11 @@ public class MapperFactoryBean<T> implements FactoryBean<T> {
|
||||
InterceptorChain interceptorChain = this.initInterceptorChain();
|
||||
|
||||
// 异步处理索引创建/更新/数据迁移等
|
||||
CompletableFuture.supplyAsync(this::processIndexAsync)
|
||||
.whenCompleteAsync((isSuccess, throwable) -> {
|
||||
if (isSuccess) {
|
||||
log.info("===> Congratulations auto process index by Easy-Es is done !");
|
||||
} else {
|
||||
Optional.ofNullable(throwable).ifPresent(Throwable::printStackTrace);
|
||||
}
|
||||
});
|
||||
|
||||
GlobalConfig.DbConfig dbConfig = GlobalConfigCache.getGlobalConfig().getDbConfig();
|
||||
if (!AutoProcessIndexStrategyEnum.MANUAL.getStrategyType().equals(dbConfig.getAutoProcessIndexMode())) {
|
||||
AutoProcessIndexService autoProcessIndexService = indexStrategyFactory.getByStrategyType(dbConfig.getAutoProcessIndexMode());
|
||||
autoProcessIndexService.processIndexAsync(entityClass, client);
|
||||
}
|
||||
return interceptorChain.pluginAll(t);
|
||||
}
|
||||
|
||||
@ -108,63 +100,4 @@ public class MapperFactoryBean<T> implements FactoryBean<T> {
|
||||
return esConfigProperties.getInterceptorChain();
|
||||
}
|
||||
|
||||
private boolean processIndexAsync() {
|
||||
Class<?> entityClass = TypeUtils.getInterfaceT(mapperInterface, 0);
|
||||
EntityInfo entityInfo = EntityInfoHelper.getEntityInfo(entityClass);
|
||||
boolean existsIndex = IndexUtils.existsIndex(client, entityInfo.getIndexName());
|
||||
if (existsIndex) {
|
||||
return doUpdateIndex(entityInfo);
|
||||
} else {
|
||||
return doCreateIndex(entityInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doUpdateIndex(EntityInfo entityInfo) {
|
||||
// 是否存在别名
|
||||
|
||||
// 是否有内容变化
|
||||
// if (nothingChanged){
|
||||
// return Boolean.TRUE;
|
||||
// }
|
||||
|
||||
// 创建新索引
|
||||
|
||||
// 迁移数据
|
||||
|
||||
// 原子操作 通过别名切换老索引至新索引并删除老索引
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean doCreateIndex(EntityInfo entityInfo) {
|
||||
// 封装字段信息参数
|
||||
List<EsIndexParam> esIndexParamList = new ArrayList<>();
|
||||
List<EntityFieldInfo> fieldList = entityInfo.getFieldList();
|
||||
if (CollectionUtils.isNotEmpty(fieldList)) {
|
||||
fieldList.forEach(field -> {
|
||||
EsIndexParam esIndexParam = new EsIndexParam();
|
||||
String esFieldType = IndexUtils.getEsFieldType(field.getFieldType(), field.getColumnType());
|
||||
esIndexParam.setFieldType(esFieldType);
|
||||
esIndexParam.setFieldName(field.getMappingColumn());
|
||||
if (!Analyzer.NONE.equals(field.getAnalyzer())) {
|
||||
esIndexParam.setAnalyzer(field.getAnalyzer());
|
||||
}
|
||||
if (!Analyzer.NONE.equals(field.getSearchAnalyzer())) {
|
||||
esIndexParam.setSearchAnalyzer(field.getSearchAnalyzer());
|
||||
}
|
||||
esIndexParamList.add(esIndexParam);
|
||||
});
|
||||
}
|
||||
|
||||
// 设置创建参数
|
||||
IndexParam indexParam = new IndexParam();
|
||||
indexParam.setEsIndexParamList(esIndexParamList);
|
||||
indexParam.setIndexName(entityInfo.getIndexName());
|
||||
indexParam.setAliasName(entityInfo.getAliasName());
|
||||
indexParam.setShardsNum(entityInfo.getShardsNum());
|
||||
indexParam.setReplicasNum(entityInfo.getReplicasNum());
|
||||
|
||||
// 执行创建
|
||||
return IndexUtils.createIndex(client, indexParam);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
package com.xpc.easyes.autoconfig.service;
|
||||
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
|
||||
/**
|
||||
* 自动托管索引接口
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
public interface AutoProcessIndexService {
|
||||
/**
|
||||
* 获取当前策略类型
|
||||
*
|
||||
* @return 策略类型
|
||||
*/
|
||||
Integer getStrategyType();
|
||||
|
||||
/**
|
||||
* 异步处理索引
|
||||
*
|
||||
* @param entityClass 实体类
|
||||
* @param client restHighLevelClient
|
||||
*/
|
||||
void processIndexAsync(Class<?> entityClass, RestHighLevelClient client);
|
||||
}
|
||||
@ -0,0 +1,77 @@
|
||||
package com.xpc.easyes.autoconfig.service.impl;
|
||||
|
||||
import com.xpc.easyes.autoconfig.service.AutoProcessIndexService;
|
||||
import com.xpc.easyes.core.common.EntityInfo;
|
||||
import com.xpc.easyes.core.enums.AutoProcessIndexStrategyEnum;
|
||||
import com.xpc.easyes.core.params.CreateIndexParam;
|
||||
import com.xpc.easyes.core.params.EsIndexInfo;
|
||||
import com.xpc.easyes.core.toolkit.EntityInfoHelper;
|
||||
import com.xpc.easyes.core.toolkit.IndexUtils;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 自动非平滑托管索引实现类, 重建索引时原索引数据会被删除
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@Service
|
||||
@ConditionalOnClass(RestHighLevelClient.class)
|
||||
@ConditionalOnProperty(prefix = "easy-es", name = {"enable"}, havingValue = "true", matchIfMissing = true)
|
||||
public class AutoProcessIndexNotSmoothlyServiceImpl implements AutoProcessIndexService {
|
||||
|
||||
@Override
|
||||
public Integer getStrategyType() {
|
||||
return AutoProcessIndexStrategyEnum.NOT_SMOOTHLY.getStrategyType();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void processIndexAsync(Class<?> entityClass, RestHighLevelClient client) {
|
||||
IndexUtils.supplyAsync(this::process, entityClass, client);
|
||||
}
|
||||
|
||||
private boolean process(Class<?> entityClass, RestHighLevelClient client) {
|
||||
EntityInfo entityInfo = EntityInfoHelper.getEntityInfo(entityClass);
|
||||
// 是否存在索引
|
||||
boolean existsIndex = IndexUtils.existsIndexWithRetryAndSetActiveIndex(entityInfo, client);
|
||||
if (existsIndex) {
|
||||
// 更新
|
||||
return doUpdateIndex(entityInfo, client);
|
||||
} else {
|
||||
// 新建
|
||||
return doCreateIndex(entityInfo, client);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doUpdateIndex(EntityInfo entityInfo, RestHighLevelClient client) {
|
||||
// 获取索引信息
|
||||
EsIndexInfo esIndexInfo = IndexUtils.getIndex(client, entityInfo.getIndexName());
|
||||
|
||||
// 索引是否有变化 若有则直接删除旧索引,创建新索引 若无则直接返回托管成功
|
||||
boolean isIndexNeedChange = IndexUtils.isIndexNeedChange(esIndexInfo, entityInfo);
|
||||
if (!isIndexNeedChange) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// 直接删除旧索引
|
||||
IndexUtils.deleteIndex(client, entityInfo.getIndexName());
|
||||
|
||||
// 初始化创建索引参数
|
||||
CreateIndexParam createIndexParam = IndexUtils.getCreateIndexParam(entityInfo);
|
||||
|
||||
// 执行创建
|
||||
return IndexUtils.createIndex(client, createIndexParam);
|
||||
}
|
||||
|
||||
private boolean doCreateIndex(EntityInfo entityInfo, RestHighLevelClient client) {
|
||||
// 初始化创建索引参数
|
||||
CreateIndexParam createIndexParam = IndexUtils.getCreateIndexParam(entityInfo);
|
||||
|
||||
// 执行创建
|
||||
return IndexUtils.createIndex(client, createIndexParam);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,124 @@
|
||||
package com.xpc.easyes.autoconfig.service.impl;
|
||||
|
||||
import com.xpc.easyes.autoconfig.service.AutoProcessIndexService;
|
||||
import com.xpc.easyes.core.common.EntityInfo;
|
||||
import com.xpc.easyes.core.enums.AutoProcessIndexStrategyEnum;
|
||||
import com.xpc.easyes.core.params.CreateIndexParam;
|
||||
import com.xpc.easyes.core.params.EsIndexInfo;
|
||||
import com.xpc.easyes.core.toolkit.EntityInfoHelper;
|
||||
import com.xpc.easyes.core.toolkit.IndexUtils;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static com.xpc.easyes.core.constants.BaseEsConstants.S1_SUFFIX;
|
||||
import static com.xpc.easyes.core.constants.BaseEsConstants.SO_SUFFIX;
|
||||
|
||||
/**
|
||||
* 自动平滑托管索引实现类,本框架默认模式,过程零停机,数据会自动转移至新索引
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@Service
|
||||
@ConditionalOnClass(RestHighLevelClient.class)
|
||||
@ConditionalOnProperty(prefix = "easy-es", name = {"enable"}, havingValue = "true", matchIfMissing = true)
|
||||
public class AutoProcessIndexSmoothlyServiceImpl implements AutoProcessIndexService {
|
||||
@Override
|
||||
public Integer getStrategyType() {
|
||||
return AutoProcessIndexStrategyEnum.SMOOTHLY.getStrategyType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processIndexAsync(Class<?> entityClass, RestHighLevelClient client) {
|
||||
IndexUtils.supplyAsync(this::process, entityClass, client);
|
||||
}
|
||||
|
||||
|
||||
private synchronized boolean process(Class<?> entityClass, RestHighLevelClient client) {
|
||||
EntityInfo entityInfo = EntityInfoHelper.getEntityInfo(entityClass);
|
||||
|
||||
// 索引是否已存在
|
||||
boolean existsIndex = IndexUtils.existsIndexWithRetryAndSetActiveIndex(entityInfo, client);
|
||||
if (existsIndex) {
|
||||
// 更新
|
||||
return doUpdateIndex(entityInfo, client);
|
||||
} else {
|
||||
// 新建
|
||||
return doCreateIndex(entityInfo, client);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private boolean doUpdateIndex(EntityInfo entityInfo, RestHighLevelClient client) {
|
||||
// 获取索引信息
|
||||
EsIndexInfo esIndexInfo = IndexUtils.getIndex(client, entityInfo.getIndexName());
|
||||
|
||||
// 是否存在默认别名,若无则给添加
|
||||
if (!esIndexInfo.getHasDefaultAlias()) {
|
||||
IndexUtils.addDefaultAlias(client, entityInfo.getIndexName());
|
||||
}
|
||||
|
||||
// 索引是否有变化 若有则创建新索引并无感迁移, 若无则直接返回托管成功
|
||||
boolean isIndexNeedChange = IndexUtils.isIndexNeedChange(esIndexInfo, entityInfo);
|
||||
if (!isIndexNeedChange) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// 创建新索引
|
||||
String releaseIndexName = generateReleaseIndexName(entityInfo.getIndexName());
|
||||
entityInfo.setReleaseIndexName(releaseIndexName);
|
||||
boolean isCreateIndexSuccess = doCreateIndex(entityInfo, client);
|
||||
if (!isCreateIndexSuccess) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
// 迁移数据至新创建的索引
|
||||
boolean isDataMigrationSuccess = doDataMigration(entityInfo.getIndexName(), releaseIndexName, client);
|
||||
if (!isDataMigrationSuccess) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
// 原子操作 切换别名:将默认别名关联至新索引,并将旧索引的默认别名移除
|
||||
boolean isChangeAliasSuccess = IndexUtils.changeAliasAtomic(client, entityInfo.getIndexName(), releaseIndexName);
|
||||
if (!isChangeAliasSuccess) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
// 删除旧索引
|
||||
boolean isDeletedIndexSuccess = IndexUtils.deleteIndex(client, entityInfo.getIndexName());
|
||||
if (!isDeletedIndexSuccess) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
// 用最新索引覆盖缓存中的老索引
|
||||
entityInfo.setIndexName(releaseIndexName);
|
||||
|
||||
// done.
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
private String generateReleaseIndexName(String oldIndexName) {
|
||||
if (oldIndexName.endsWith(SO_SUFFIX)) {
|
||||
return oldIndexName.split(SO_SUFFIX)[0] + S1_SUFFIX;
|
||||
} else if (oldIndexName.endsWith(S1_SUFFIX)) {
|
||||
return oldIndexName.split(S1_SUFFIX)[0] + SO_SUFFIX;
|
||||
} else {
|
||||
return oldIndexName + SO_SUFFIX;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doDataMigration(String oldIndexName, String releaseIndexName, RestHighLevelClient client) {
|
||||
return IndexUtils.reindex(client, oldIndexName, releaseIndexName);
|
||||
}
|
||||
|
||||
|
||||
private boolean doCreateIndex(EntityInfo entityInfo, RestHighLevelClient client) {
|
||||
// 初始化创建索引参数
|
||||
CreateIndexParam createIndexParam = IndexUtils.getCreateIndexParam(entityInfo);
|
||||
|
||||
// 执行创建
|
||||
return IndexUtils.createIndex(client, createIndexParam);
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,3 +1,6 @@
|
||||
# Auto Configure
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.xpc.easyes.autoconfig.config.EsAutoConfiguration
|
||||
com.xpc.easyes.autoconfig.config.EsAutoConfiguration,\
|
||||
com.xpc.easyes.autoconfig.factory.IndexStrategyFactory,\
|
||||
com.xpc.easyes.autoconfig.service.impl.AutoProcessIndexSmoothlyServiceImpl,\
|
||||
com.xpc.easyes.autoconfig.service.impl.AutoProcessIndexNotSmoothlyServiceImpl
|
||||
|
||||
@ -7,13 +7,13 @@
|
||||
<parent>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-parent</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
<relativePath>../easy-es-parent</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-core</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
|
||||
@ -3,7 +3,6 @@ package com.xpc.easyes.core.cache;
|
||||
import com.xpc.easyes.core.conditions.BaseEsMapperImpl;
|
||||
import com.xpc.easyes.core.toolkit.ExceptionUtils;
|
||||
import com.xpc.easyes.core.toolkit.FieldUtils;
|
||||
import com.xpc.easyes.core.toolkit.TypeUtils;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
@ -35,11 +34,10 @@ public class BaseCache {
|
||||
* @param mapperInterface mapper接口
|
||||
* @param client es客户端
|
||||
*/
|
||||
public static void initCache(Class<?> mapperInterface, RestHighLevelClient client) {
|
||||
public static void initCache(Class<?> mapperInterface, Class<?> entityClass, RestHighLevelClient client) {
|
||||
// 初始化baseEsMapper的所有实现类实例
|
||||
BaseEsMapperImpl baseEsMapper = new BaseEsMapperImpl();
|
||||
baseEsMapper.setClient(client);
|
||||
Class<?> entityClass = TypeUtils.getInterfaceT(mapperInterface, 0);
|
||||
|
||||
baseEsMapper.setEntityClass(entityClass);
|
||||
baseEsMapper.setGlobalConfig(GlobalConfigCache.getGlobalConfig());
|
||||
@ -90,7 +88,7 @@ public class BaseCache {
|
||||
* @param methodName 方法名
|
||||
* @return 执行方法
|
||||
*/
|
||||
public static Method setterMethod(Class<?> entityClass, String methodName){
|
||||
public static Method setterMethod(Class<?> entityClass, String methodName) {
|
||||
return Optional.ofNullable(baseEsEntityMethodMap.get(entityClass))
|
||||
.map(b -> b.get(SET_FUNC_PREFIX + FieldUtils.firstToUpperCase(methodName)))
|
||||
.orElseThrow(() -> ExceptionUtils.eee("no such method:", entityClass, methodName));
|
||||
|
||||
@ -3,6 +3,7 @@ package com.xpc.easyes.core.cache;
|
||||
import com.xpc.easyes.core.config.GlobalConfig;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@ -15,7 +16,13 @@ public class GlobalConfigCache {
|
||||
private static final Map<Class<?>, GlobalConfig> globalConfigMap = new ConcurrentHashMap<>(1);
|
||||
|
||||
public static GlobalConfig getGlobalConfig() {
|
||||
return globalConfigMap.get(GlobalConfig.class);
|
||||
return Optional.ofNullable(globalConfigMap.get(GlobalConfig.class))
|
||||
.orElseGet(() -> {
|
||||
GlobalConfig globalConfig = new GlobalConfig();
|
||||
GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
|
||||
globalConfig.setDbConfig(dbConfig);
|
||||
return globalConfig;
|
||||
});
|
||||
}
|
||||
|
||||
public static void setGlobalConfig(GlobalConfig globalConfig) {
|
||||
|
||||
@ -3,7 +3,7 @@ package com.xpc.easyes.core.common;
|
||||
import com.alibaba.fastjson.PropertyNamingStrategy;
|
||||
import com.alibaba.fastjson.parser.deserializer.ExtraProcessor;
|
||||
import com.alibaba.fastjson.serializer.SerializeFilter;
|
||||
import com.xpc.easyes.core.enums.FieldType;
|
||||
import com.xpc.easyes.core.constants.BaseEsConstants;
|
||||
import com.xpc.easyes.core.enums.IdType;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
@ -33,9 +33,13 @@ public class EntityInfo {
|
||||
*/
|
||||
private Class<?> idClass;
|
||||
/**
|
||||
* 索引名称
|
||||
* 索引名称(原索引名)
|
||||
*/
|
||||
private String indexName;
|
||||
/**
|
||||
* 新索引名(由EE在更新索引时自动创建)
|
||||
*/
|
||||
private String releaseIndexName;
|
||||
/**
|
||||
* 表映射结果集
|
||||
*/
|
||||
@ -53,13 +57,13 @@ public class EntityInfo {
|
||||
*/
|
||||
private String keyColumn;
|
||||
/**
|
||||
* 分片数
|
||||
* 分片数 默认为1
|
||||
*/
|
||||
private Integer shardsNum;
|
||||
private Integer shardsNum = BaseEsConstants.ONE;
|
||||
/**
|
||||
* 副本数
|
||||
* 副本数 默认为1
|
||||
*/
|
||||
private Integer replicasNum;
|
||||
private Integer replicasNum = BaseEsConstants.ONE;
|
||||
/**
|
||||
* 索引别名
|
||||
*/
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.xpc.easyes.core.config;
|
||||
|
||||
import com.xpc.easyes.core.enums.AutoProcessIndexStrategyEnum;
|
||||
import com.xpc.easyes.core.enums.FieldStrategy;
|
||||
import com.xpc.easyes.core.enums.IdType;
|
||||
import lombok.Data;
|
||||
@ -38,5 +39,17 @@ public class GlobalConfig {
|
||||
* 字段验证策略 (默认 NOT NULL)
|
||||
*/
|
||||
private FieldStrategy fieldStrategy = FieldStrategy.NOT_NULL;
|
||||
/**
|
||||
* 是否开启自动托管索引 默认开启
|
||||
*/
|
||||
private boolean openAutoProcessIndex = true;
|
||||
/**
|
||||
* 自动托管索引模式 默认为平滑迁移
|
||||
*/
|
||||
private Integer autoProcessIndexMode = AutoProcessIndexStrategyEnum.SMOOTHLY.getStrategyType();
|
||||
/**
|
||||
* 是否分布式环境
|
||||
*/
|
||||
private boolean isDistributed = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,5 +114,40 @@ public interface BaseEsConstants {
|
||||
* count DSL语句前缀
|
||||
*/
|
||||
String COUNT_DSL_PREFIX = "===> Execute Count DSL By Easy-Es(Note that size does not affect the total count): ";
|
||||
|
||||
/**
|
||||
* 分片数key
|
||||
*/
|
||||
String SHARDS_NUM_KEY = "index.number_of_shards";
|
||||
/**
|
||||
* 副本数key
|
||||
*/
|
||||
String REPLICAS_NUM_KEY = "index.number_of_replicas";
|
||||
/**
|
||||
* 默认迁移操作规则
|
||||
*/
|
||||
String DEFAULT_DEST_OP_TYPE = "create";
|
||||
/**
|
||||
* 默认冲突处理
|
||||
*/
|
||||
String DEFAULT_CONFLICTS = "proceed";
|
||||
/**
|
||||
* 更新索引时自动创建的索引后缀s 灵感来源于jvm young区s0,s1垃圾回收
|
||||
*/
|
||||
String S_SUFFIX = "_s";
|
||||
/**
|
||||
* 更新索引时自动创建的索引后缀s0
|
||||
*/
|
||||
String SO_SUFFIX = "_s0";
|
||||
/**
|
||||
* 更新索引时自动创建的索引后缀s1
|
||||
*/
|
||||
String S1_SUFFIX = "_s1";
|
||||
/**
|
||||
* 分布式锁提示内容
|
||||
*/
|
||||
String DISTRIBUTED_LOCK_TIP_JSON = "{\"tip\":\"Do not delete unless deadlock occurs\"}";
|
||||
/**
|
||||
* 获取/释放 分布式锁 最大失败重试次数
|
||||
*/
|
||||
Integer LOCK_MAX_RETRY = 5;
|
||||
}
|
||||
|
||||
@ -0,0 +1,27 @@
|
||||
package com.xpc.easyes.core.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 索引策略枚举
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@AllArgsConstructor
|
||||
public enum AutoProcessIndexStrategyEnum {
|
||||
/**
|
||||
* 平滑迁移策略,零停机 默认策略
|
||||
*/
|
||||
SMOOTHLY(1),
|
||||
/**
|
||||
* 非平滑迁移策略 简单粗暴 备选
|
||||
*/
|
||||
NOT_SMOOTHLY(2),
|
||||
/**
|
||||
* 用户手动调用API处理索引
|
||||
*/
|
||||
MANUAL(3);
|
||||
@Getter
|
||||
private Integer strategyType;
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
package com.xpc.easyes.core.params;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 创建索引参数
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@Data
|
||||
public class CreateIndexParam {
|
||||
/**
|
||||
* 实体类
|
||||
*/
|
||||
private Class<?> entityClass;
|
||||
/**
|
||||
* 索引名
|
||||
*/
|
||||
private String indexName;
|
||||
/**
|
||||
* 别名
|
||||
*/
|
||||
private String aliasName;
|
||||
/**
|
||||
* 分片数
|
||||
*/
|
||||
private Integer shardsNum;
|
||||
/**
|
||||
* 副本数
|
||||
*/
|
||||
private Integer replicasNum;
|
||||
/**
|
||||
* 索引字段及类型分词器等信息
|
||||
*/
|
||||
private List<EsIndexParam> esIndexParamList;
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package com.xpc.easyes.core.params;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* es索引信息
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public class EsIndexInfo {
|
||||
/**
|
||||
* 是否存在默认别名
|
||||
*/
|
||||
private Boolean hasDefaultAlias;
|
||||
/**
|
||||
* 分片数
|
||||
*/
|
||||
private Integer shardsNum;
|
||||
/**
|
||||
* 副本数
|
||||
*/
|
||||
private Integer replicasNum;
|
||||
/**
|
||||
* 索引字段信息
|
||||
*/
|
||||
private Map<String, Object> mapping;
|
||||
}
|
||||
@ -1,18 +0,0 @@
|
||||
package com.xpc.easyes.core.params;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public class IndexParam {
|
||||
private Class<?> entityClass;
|
||||
private String indexName;
|
||||
private String aliasName;
|
||||
private Integer shardsNum;
|
||||
private Integer replicasNum;
|
||||
private List<EsIndexParam> esIndexParamList;
|
||||
}
|
||||
@ -1,36 +1,60 @@
|
||||
package com.xpc.easyes.core.toolkit;
|
||||
|
||||
import com.xpc.easyes.core.cache.GlobalConfigCache;
|
||||
import com.xpc.easyes.core.common.EntityFieldInfo;
|
||||
import com.xpc.easyes.core.common.EntityInfo;
|
||||
import com.xpc.easyes.core.config.GlobalConfig;
|
||||
import com.xpc.easyes.core.constants.BaseEsConstants;
|
||||
import com.xpc.easyes.core.enums.Analyzer;
|
||||
import com.xpc.easyes.core.enums.FieldType;
|
||||
import com.xpc.easyes.core.enums.JdkDataTypeEnum;
|
||||
import com.xpc.easyes.core.params.CreateIndexParam;
|
||||
import com.xpc.easyes.core.params.EsIndexInfo;
|
||||
import com.xpc.easyes.core.params.EsIndexParam;
|
||||
import com.xpc.easyes.core.params.IndexParam;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetIndexResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static com.xpc.easyes.core.constants.BaseEsConstants.*;
|
||||
|
||||
/**
|
||||
* 索引工具类
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public class IndexUtils {
|
||||
|
||||
@SneakyThrows
|
||||
public static boolean existsIndex(RestHighLevelClient client, String indexName) {
|
||||
GetIndexRequest request = new GetIndexRequest(indexName);
|
||||
return client.indices().exists(request, RequestOptions.DEFAULT);
|
||||
try {
|
||||
return client.indices().exists(request, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.eee("existsIndex exception", indexName, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean createIndex(RestHighLevelClient client, IndexParam indexParam) {
|
||||
|
||||
public static boolean createIndex(RestHighLevelClient client, CreateIndexParam indexParam) {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexParam.getIndexName());
|
||||
|
||||
// 分片个副本信息
|
||||
@ -60,45 +84,106 @@ public class IndexUtils {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化索引mapping
|
||||
*
|
||||
* @param indexParamList 索引参数列表
|
||||
* @return 索引mapping
|
||||
*/
|
||||
private static Map<String, Object> initMapping(List<EsIndexParam> indexParamList) {
|
||||
Map<String, Object> mapping = new HashMap<>(1);
|
||||
Map<String, Object> properties = new HashMap<>(indexParamList.size());
|
||||
GlobalConfig.DbConfig dbConfig = Optional.ofNullable(GlobalConfigCache.getGlobalConfig())
|
||||
.map(GlobalConfig::getDbConfig)
|
||||
.orElse(new GlobalConfig.DbConfig());
|
||||
|
||||
indexParamList.forEach(indexParam -> {
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
info.put(BaseEsConstants.TYPE, indexParam.getFieldType());
|
||||
// 设置分词器
|
||||
if (FieldType.TEXT.getType().equals(indexParam.getFieldType())) {
|
||||
Optional.ofNullable(indexParam.getAnalyzer())
|
||||
.ifPresent(analyzer ->
|
||||
info.put(BaseEsConstants.ANALYZER, indexParam.getAnalyzer().toString().toLowerCase()));
|
||||
Optional.ofNullable(indexParam.getSearchAnalyzer())
|
||||
.ifPresent(searchAnalyzer ->
|
||||
info.put(BaseEsConstants.SEARCH_ANALYZER, indexParam.getSearchAnalyzer().toString().toLowerCase()));
|
||||
}
|
||||
|
||||
// 驼峰处理
|
||||
String fieldName = indexParam.getFieldName();
|
||||
if (dbConfig.isMapUnderscoreToCamelCase()) {
|
||||
fieldName = StringUtils.camelToUnderline(fieldName);
|
||||
}
|
||||
properties.put(fieldName, info);
|
||||
});
|
||||
|
||||
mapping.put(BaseEsConstants.PROPERTIES, properties);
|
||||
return mapping;
|
||||
public static boolean createEmptyIndex(RestHighLevelClient client, String indexName) {
|
||||
CreateIndexRequest request = new CreateIndexRequest(indexName);
|
||||
CreateIndexResponse createIndexResponse;
|
||||
try {
|
||||
createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
System.out.println("already created");
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
return createIndexResponse.isAcknowledged();
|
||||
}
|
||||
|
||||
public static EsIndexInfo getIndex(RestHighLevelClient client, String indexName) {
|
||||
GetIndexRequest request = new GetIndexRequest(indexName);
|
||||
GetIndexResponse getIndexResponse;
|
||||
try {
|
||||
getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.eee("getIndex exception", indexName, e);
|
||||
}
|
||||
return parseGetIndexResponse(getIndexResponse, indexName);
|
||||
}
|
||||
|
||||
public static void addDefaultAlias(RestHighLevelClient client, String indexName) {
|
||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||
IndicesAliasesRequest.AliasActions aliasActions =
|
||||
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD);
|
||||
aliasActions.index(indexName);
|
||||
aliasActions.alias(DEFAULT_ALIAS);
|
||||
indicesAliasesRequest.addAliasAction(aliasActions);
|
||||
try {
|
||||
client.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static boolean reindex(RestHighLevelClient client, String oldIndexName, String releaseIndexName) {
|
||||
ReindexRequest reindexRequest = new ReindexRequest();
|
||||
reindexRequest.setSourceIndices(oldIndexName);
|
||||
reindexRequest.setDestIndex(releaseIndexName);
|
||||
reindexRequest.setDestOpType(DEFAULT_DEST_OP_TYPE);
|
||||
reindexRequest.setConflicts(DEFAULT_CONFLICTS);
|
||||
reindexRequest.setRefresh(Boolean.TRUE);
|
||||
try {
|
||||
BulkByScrollResponse response = client.reindex(reindexRequest, RequestOptions.DEFAULT);
|
||||
List<BulkItemResponse.Failure> bulkFailures = response.getBulkFailures();
|
||||
if (CollectionUtils.isEmpty(bulkFailures)) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
return Boolean.FALSE;
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.eee("reindex exception", oldIndexName, releaseIndexName, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static EsIndexInfo parseGetIndexResponse(GetIndexResponse getIndexResponse, String indexName) {
|
||||
EsIndexInfo esIndexInfo = new EsIndexInfo();
|
||||
|
||||
// 设置是否已存在默认别名
|
||||
esIndexInfo.setHasDefaultAlias(Boolean.FALSE);
|
||||
Optional.ofNullable(getIndexResponse.getAliases())
|
||||
.flatMap(aliases -> Optional.ofNullable(aliases.get(indexName)))
|
||||
.ifPresent(aliasMetadataList ->
|
||||
aliasMetadataList.forEach(aliasMetadata -> {
|
||||
if (DEFAULT_ALIAS.equals(aliasMetadata.alias())) {
|
||||
esIndexInfo.setHasDefaultAlias(Boolean.TRUE);
|
||||
}
|
||||
}));
|
||||
|
||||
// 设置分片及副本数
|
||||
Optional.ofNullable(getIndexResponse.getSettings())
|
||||
.flatMap(settingsMap -> Optional.ofNullable(settingsMap.get(indexName)))
|
||||
.ifPresent(p -> {
|
||||
String shardsNumStr = p.get(SHARDS_NUM_KEY);
|
||||
Optional.ofNullable(shardsNumStr)
|
||||
.ifPresent(s -> esIndexInfo.setShardsNum(Integer.parseInt(s)));
|
||||
String replicasNumStr = p.get(REPLICAS_NUM_KEY);
|
||||
Optional.ofNullable(replicasNumStr)
|
||||
.ifPresent(r -> esIndexInfo.setReplicasNum(Integer.parseInt(r)));
|
||||
});
|
||||
|
||||
// 设置mapping信息
|
||||
Optional.ofNullable(getIndexResponse.getMappings())
|
||||
.flatMap(stringMappingMetadataMap -> Optional.ofNullable(stringMappingMetadataMap.get(indexName))
|
||||
.flatMap(mappingMetadata -> Optional.ofNullable(mappingMetadata.getSourceAsMap())))
|
||||
.ifPresent(esIndexInfo::setMapping);
|
||||
|
||||
return esIndexInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据注解/字段类型名称获取在es中的索引类型
|
||||
*
|
||||
* @param fieldType 注解中指定的es索引类型
|
||||
* @param typeName 字段类型
|
||||
* @return 推断出的es中的索引类型
|
||||
*/
|
||||
public static String getEsFieldType(FieldType fieldType, String typeName) {
|
||||
if (Objects.nonNull(fieldType) && !FieldType.NONE.equals(fieldType)) {
|
||||
// 如果用户有自定义字段类型,则使用该类型
|
||||
@ -146,4 +231,175 @@ public class IndexUtils {
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 初始化索引mapping
|
||||
*
|
||||
* @param indexParamList 索引参数列表
|
||||
* @return 索引mapping
|
||||
*/
|
||||
public static Map<String, Object> initMapping(List<EsIndexParam> indexParamList) {
|
||||
Map<String, Object> mapping = new HashMap<>(1);
|
||||
Map<String, Object> properties = new HashMap<>(indexParamList.size());
|
||||
GlobalConfig.DbConfig dbConfig = Optional.ofNullable(GlobalConfigCache.getGlobalConfig())
|
||||
.map(GlobalConfig::getDbConfig)
|
||||
.orElse(new GlobalConfig.DbConfig());
|
||||
|
||||
indexParamList.forEach(indexParam -> {
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
info.put(BaseEsConstants.TYPE, indexParam.getFieldType());
|
||||
// 设置分词器
|
||||
if (FieldType.TEXT.getType().equals(indexParam.getFieldType())) {
|
||||
Optional.ofNullable(indexParam.getAnalyzer())
|
||||
.ifPresent(analyzer ->
|
||||
info.put(BaseEsConstants.ANALYZER, indexParam.getAnalyzer().toString().toLowerCase()));
|
||||
Optional.ofNullable(indexParam.getSearchAnalyzer())
|
||||
.ifPresent(searchAnalyzer ->
|
||||
info.put(BaseEsConstants.SEARCH_ANALYZER, indexParam.getSearchAnalyzer().toString().toLowerCase()));
|
||||
}
|
||||
|
||||
// 驼峰处理
|
||||
String fieldName = indexParam.getFieldName();
|
||||
if (dbConfig.isMapUnderscoreToCamelCase()) {
|
||||
fieldName = StringUtils.camelToUnderline(fieldName);
|
||||
}
|
||||
properties.put(fieldName, info);
|
||||
});
|
||||
|
||||
mapping.put(BaseEsConstants.PROPERTIES, properties);
|
||||
return mapping;
|
||||
}
|
||||
|
||||
public static boolean changeAliasAtomic(RestHighLevelClient client, String oldIndexName, String releaseIndexName) {
|
||||
IndicesAliasesRequest.AliasActions addIndexAction = new IndicesAliasesRequest.AliasActions(
|
||||
IndicesAliasesRequest.AliasActions.Type.ADD).index(releaseIndexName).alias(DEFAULT_ALIAS);
|
||||
IndicesAliasesRequest.AliasActions removeAction = new IndicesAliasesRequest.AliasActions(
|
||||
IndicesAliasesRequest.AliasActions.Type.REMOVE).index(oldIndexName).alias(DEFAULT_ALIAS);
|
||||
|
||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||
indicesAliasesRequest.addAliasAction(addIndexAction);
|
||||
indicesAliasesRequest.addAliasAction(removeAction);
|
||||
try {
|
||||
AcknowledgedResponse acknowledgedResponse = client.indices().updateAliases(indicesAliasesRequest,
|
||||
RequestOptions.DEFAULT);
|
||||
return acknowledgedResponse.isAcknowledged();
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.eee("changeAlias exception", oldIndexName, releaseIndexName, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean deleteIndex(RestHighLevelClient client, String indexName) {
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
|
||||
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
try {
|
||||
AcknowledgedResponse acknowledgedResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
|
||||
return acknowledgedResponse.isAcknowledged();
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.eee("deleteIndex exception", indexName, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CreateIndexParam getCreateIndexParam(EntityInfo entityInfo) {
|
||||
// 初始化字段信息参数
|
||||
List<EntityFieldInfo> fieldList = entityInfo.getFieldList();
|
||||
List<EsIndexParam> esIndexParamList = initIndexParam(fieldList);
|
||||
|
||||
// 设置创建参数
|
||||
CreateIndexParam createIndexParam = new CreateIndexParam();
|
||||
createIndexParam.setEsIndexParamList(esIndexParamList);
|
||||
createIndexParam.setAliasName(entityInfo.getAliasName());
|
||||
createIndexParam.setShardsNum(entityInfo.getShardsNum());
|
||||
createIndexParam.setReplicasNum(entityInfo.getReplicasNum());
|
||||
createIndexParam.setIndexName(entityInfo.getIndexName());
|
||||
|
||||
// 如果有设置新索引名称,则用新索引名覆盖原索引名进行创建
|
||||
Optional.ofNullable(entityInfo.getReleaseIndexName()).ifPresent(createIndexParam::setIndexName);
|
||||
return createIndexParam;
|
||||
}
|
||||
|
||||
public static List<EsIndexParam> initIndexParam(List<EntityFieldInfo> fieldList) {
|
||||
List<EsIndexParam> esIndexParamList = new ArrayList<>();
|
||||
if (CollectionUtils.isNotEmpty(fieldList)) {
|
||||
fieldList.forEach(field -> {
|
||||
EsIndexParam esIndexParam = new EsIndexParam();
|
||||
String esFieldType = IndexUtils.getEsFieldType(field.getFieldType(), field.getColumnType());
|
||||
esIndexParam.setFieldType(esFieldType);
|
||||
esIndexParam.setFieldName(field.getMappingColumn());
|
||||
if (!Analyzer.NONE.equals(field.getAnalyzer())) {
|
||||
esIndexParam.setAnalyzer(field.getAnalyzer());
|
||||
}
|
||||
if (!Analyzer.NONE.equals(field.getSearchAnalyzer())) {
|
||||
esIndexParam.setSearchAnalyzer(field.getSearchAnalyzer());
|
||||
}
|
||||
esIndexParamList.add(esIndexParam);
|
||||
});
|
||||
}
|
||||
return esIndexParamList;
|
||||
}
|
||||
|
||||
public static boolean isIndexNeedChange(EsIndexInfo esIndexInfo, EntityInfo entityInfo) {
|
||||
if (!entityInfo.getShardsNum().equals(esIndexInfo.getShardsNum())) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
if (!entityInfo.getReplicasNum().equals(esIndexInfo.getReplicasNum())) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// 根据当前实体类及自定义注解配置, 生成最新的Mapping信息
|
||||
List<EsIndexParam> esIndexParamList = IndexUtils.initIndexParam(entityInfo.getFieldList());
|
||||
Map<String, Object> mapping = IndexUtils.initMapping(esIndexParamList);
|
||||
|
||||
// 与查询到的已知index对比是否发生改变
|
||||
Map<String, Object> esIndexInfoMapping = Objects.isNull(esIndexInfo.getMapping())
|
||||
? new HashMap<>(0) : esIndexInfo.getMapping();
|
||||
return !mapping.equals(esIndexInfoMapping);
|
||||
}
|
||||
|
||||
public static boolean existsIndexWithRetryAndSetActiveIndex(EntityInfo entityInfo, RestHighLevelClient client) {
|
||||
boolean exists = IndexUtils.existsIndex(client, entityInfo.getIndexName());
|
||||
if (exists) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 重试 看加了后缀的_s0 和_s1是否存在
|
||||
for (int i = 0; i <= 1; i++) {
|
||||
String retryIndexName = entityInfo.getIndexName() + S_SUFFIX + i;
|
||||
exists = IndexUtils.existsIndex(client, retryIndexName);
|
||||
if (exists) {
|
||||
entityInfo.setIndexName(retryIndexName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return exists;
|
||||
}
|
||||
|
||||
public static void supplyAsync(BiFunction<Class<?>, RestHighLevelClient, Boolean> biFunction, Class<?> entityClass, RestHighLevelClient client) {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
GlobalConfig.DbConfig dbConfig = GlobalConfigCache.getGlobalConfig().getDbConfig();
|
||||
if (!dbConfig.isDistributed()) {
|
||||
// 非分布式项目, 直接处理
|
||||
return biFunction.apply(entityClass, client);
|
||||
}
|
||||
try {
|
||||
// 尝试获取分布式锁
|
||||
boolean lock = LockUtils.tryLock(client, entityClass.getSimpleName().toLowerCase(), LOCK_MAX_RETRY);
|
||||
if (!lock) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
return biFunction.apply(entityClass, client);
|
||||
} finally {
|
||||
LockUtils.release(client, entityClass.getSimpleName().toLowerCase(), LOCK_MAX_RETRY);
|
||||
}
|
||||
}).whenCompleteAsync((status, throwable) -> {
|
||||
if (status) {
|
||||
LogUtils.info("===> Congratulations auto process index by Easy-Es is done !");
|
||||
} else {
|
||||
LogUtils.info("===> Unfortunately, auto process index by Easy-Es failed, please check your configuration");
|
||||
}
|
||||
Optional.ofNullable(throwable).ifPresent(Throwable::printStackTrace);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,121 @@
|
||||
package com.xpc.easyes.core.toolkit;
|
||||
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static com.xpc.easyes.core.constants.BaseEsConstants.*;
|
||||
|
||||
/**
|
||||
* 基于es写的轻量级分布式锁,仅供框架内部使用,可避免引入redis/zk等其它依赖
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
public class LockUtils {
|
||||
/**
|
||||
* 锁所在索引
|
||||
*/
|
||||
private final static String LOCK_INDEX = "ee-distribute-lock";
|
||||
/**
|
||||
* id字段名
|
||||
*/
|
||||
private final static String ID_FIELD = "_id";
|
||||
/**
|
||||
* 重试等待时间
|
||||
*/
|
||||
private final static Integer WAIT_SECONDS = 60;
|
||||
|
||||
public static synchronized boolean tryLock(RestHighLevelClient client, String idValue, Integer maxRetry) {
|
||||
boolean existsIndex = IndexUtils.existsIndex(client, LOCK_INDEX);
|
||||
if (!existsIndex) {
|
||||
IndexUtils.createEmptyIndex(client, LOCK_INDEX);
|
||||
}
|
||||
|
||||
if (maxRetry <= ZERO) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
if (getCount(client, idValue) > ZERO) {
|
||||
try {
|
||||
Thread.sleep(WAIT_SECONDS / maxRetry);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return tryLock(client, idValue, --maxRetry);
|
||||
} else {
|
||||
return createLock(client, idValue);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean createLock(RestHighLevelClient client, String idValue) {
|
||||
IndexRequest indexRequest = new IndexRequest(LOCK_INDEX);
|
||||
indexRequest.id(idValue);
|
||||
indexRequest.source(DISTRIBUTED_LOCK_TIP_JSON, XContentType.JSON);
|
||||
IndexResponse response;
|
||||
try {
|
||||
response = client.index(indexRequest, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
return response.status().equals(RestStatus.CREATED);
|
||||
}
|
||||
|
||||
public synchronized static boolean release(RestHighLevelClient client, String idValue, Integer maxRetry) {
|
||||
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX);
|
||||
deleteRequest.id(idValue);
|
||||
if (maxRetry <= ZERO) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
DeleteResponse response;
|
||||
try {
|
||||
response = client.delete(deleteRequest, RequestOptions.DEFAULT);
|
||||
System.out.println(response.status());
|
||||
} catch (IOException e) {
|
||||
return retryRelease(client, idValue, --maxRetry);
|
||||
}
|
||||
if (RestStatus.OK.equals(response.status())) {
|
||||
return Boolean.TRUE;
|
||||
} else {
|
||||
return retryRelease(client, idValue, maxRetry);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean retryRelease(RestHighLevelClient client, String idValue, Integer maxRetry) {
|
||||
try {
|
||||
Thread.sleep(WAIT_SECONDS / maxRetry);
|
||||
} catch (InterruptedException interruptedException) {
|
||||
interruptedException.printStackTrace();
|
||||
}
|
||||
return release(client, idValue, --maxRetry);
|
||||
}
|
||||
|
||||
private static Integer getCount(RestHighLevelClient client, String idValue) {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices(LOCK_INDEX);
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(QueryBuilders.termQuery(ID_FIELD, idValue));
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
SearchResponse response;
|
||||
try {
|
||||
response = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return ONE;
|
||||
}
|
||||
return (int) response.getHits().getTotalHits().value;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,16 @@
|
||||
package com.xpc.easyes.core.toolkit;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* 日志工具类,用于需要打印日志的地方,可避免引入其它日志框架依赖
|
||||
* <p>
|
||||
* Copyright © 2022 xpc1024 All Rights Reserved
|
||||
**/
|
||||
public class LogUtils {
|
||||
private final static Logger log = Logger.getAnonymousLogger();
|
||||
|
||||
public static void info(String... params) {
|
||||
log.info(String.join(",", params));
|
||||
}
|
||||
}
|
||||
@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-parent</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
|
||||
<name>easy-es-parent</name>
|
||||
<description>easy use for elastic search</description>
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<artifactId>easy-es</artifactId>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
@ -21,7 +21,7 @@
|
||||
<dependency>
|
||||
<groupId>io.github.xpc1024</groupId>
|
||||
<artifactId>easy-es-boot-starter</artifactId>
|
||||
<version>0.9.8</version>
|
||||
<version>0.9.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@ -2,8 +2,9 @@ package com.xpc.easyes.sample.entity;
|
||||
|
||||
import com.xpc.easyes.core.anno.HighLightMappingField;
|
||||
import com.xpc.easyes.core.anno.TableField;
|
||||
import com.xpc.easyes.core.anno.TableName;
|
||||
import com.xpc.easyes.core.enums.Analyzer;
|
||||
import com.xpc.easyes.core.enums.FieldStrategy;
|
||||
import com.xpc.easyes.core.enums.FieldType;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
@ -16,7 +17,6 @@ import java.time.LocalDateTime;
|
||||
**/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
@TableName("my_doc")
|
||||
public class Document {
|
||||
/**
|
||||
* es中的唯一id
|
||||
@ -29,6 +29,7 @@ public class Document {
|
||||
/**
|
||||
* 文档内容
|
||||
*/
|
||||
@TableField(fieldType = FieldType.TEXT, analyzer = Analyzer.IK_SMART, searchAnalyzer = Analyzer.IK_SMART)
|
||||
private String content;
|
||||
/**
|
||||
* 作者 加@TableField注解,并指明strategy = FieldStrategy.NOT_EMPTY 表示更新的时候的策略为 创建者不为空字符串时才更新
|
||||
@ -55,7 +56,7 @@ public class Document {
|
||||
/**
|
||||
* 自定义字段
|
||||
*/
|
||||
@TableField(value = "wu-la")
|
||||
@TableField(value = "u-la")
|
||||
private String customField;
|
||||
|
||||
/**
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user