增加SpringBatch的支持的测试demo

This commit is contained in:
zhj149 2025-04-28 15:13:52 +08:00
parent ea03a5f3df
commit 336e938471
6 changed files with 370 additions and 0 deletions

View File

@ -69,6 +69,12 @@
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.8.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.h2database</groupId>-->

View File

@ -0,0 +1,50 @@
package com.mybatisflex.test.batch;
import com.mybatisflex.spring.FlexTransactionManager;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
/**
* 配置batch
*/
@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer {
/**
* springbatch默认配置;使用内存库
*
* @return
*/
@Override
protected JobRepository createJobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
return factory.getObject();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务
jobLauncher.setJobRepository(this.getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
/**
* @return
*/
@Override
public PlatformTransactionManager getTransactionManager() {
FlexTransactionManager mybatisTransactionManager = new FlexTransactionManager();
return mybatisTransactionManager;
}
}

View File

@ -0,0 +1,180 @@
package com.mybatisflex.test.batch;
import com.mybatisflex.core.query.QueryWrapper;
import com.mybatisflex.spring.batch.MybatisFlexBatchItemWriter;
import com.mybatisflex.spring.batch.MybatisFlexPagingItemReader;
import com.mybatisflex.spring.batch.builder.MyBatisFlexBatchItemWriterBuilder;
import com.mybatisflex.spring.batch.builder.MyBatisFlexPagingItemReaderBuilder;
import com.mybatisflex.test.mapper.MyAccountMapper;
import com.mybatisflex.test.model.Account;
import com.mybatisflex.test.model.table.AccountTableDef;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* 生成batch demo
*/
@Configuration
public class BatchJobConfiguration implements StepExecutionListener, JobExecutionListener {
/**
* 帐户信息
*/
@Autowired
@Lazy
private MyAccountMapper accountMapper;
/**
* 帐户导入信息
*
* @param jobBuilders
* @return
*/
@Bean(name = "testImportJob")
public Job testImportJob(JobBuilderFactory jobBuilders,
@Qualifier("accountStep") Step accountStep) {
return jobBuilders.get("testImportJob")
.start(accountStep).listener(this)
.build();
}
/**
* 帐户执行阶段
*
* @param stepBuilders
* @return
*/
@Bean
public Step accountStep(StepBuilderFactory stepBuilders,
@Qualifier("accountReader") MybatisFlexPagingItemReader accountReader,
@Qualifier("accountProcessor") ItemProcessor accountProcessor,
@Qualifier("accountWriter") MybatisFlexBatchItemWriter<Account> accountWriter) {
TaskletStep step = stepBuilders.get("创建帐户")
.<Account, Account>chunk(10)
.reader(accountReader)
.processor(accountProcessor)
.writer(accountWriter)
.build();
step.registerStepExecutionListener(this);
return step;
}
/**
* 帐户读取
* @return
*/
@Bean
@StepScope
public MybatisFlexPagingItemReader accountReader() {
QueryWrapper query = QueryWrapper.create();
query.select(AccountTableDef.ACCOUNT.ALL_COLUMNS)
.from(AccountTableDef.ACCOUNT);
MyBatisFlexPagingItemReaderBuilder builder = new MyBatisFlexPagingItemReaderBuilder();
MybatisFlexPagingItemReader reader = builder.mapper(accountMapper)
.pageSize(10)
.queryWrapper(query)
.build();
return reader;
}
/**
* 数据转换
* @return
*/
@Bean
@StepScope
public ItemProcessor<Account,Account> accountProcessor() {
ItemProcessor<Account,Account> processor = new ItemProcessor<Account, Account>() {
@Override
public Account process(Account account) throws Exception {
Account entity = new Account();
BeanUtils.copyProperties(account, entity);
entity.setUserName(account.getUserName() + "_1");
entity.setId(null);
return entity;
}
};
return processor;
}
/**
* 帐户写入
*
* @return
*/
@Bean
@StepScope
public MybatisFlexBatchItemWriter<Account> accountWriter() {
MybatisFlexBatchItemWriter writer = new MyBatisFlexBatchItemWriterBuilder()
.mapper(accountMapper)
.build();
return writer;
}
//============以下是事件监听==============
/**
* Initialize the state of the listener with the {@link StepExecution} from
* the current scope.
*
* @param stepExecution instance of {@link StepExecution}.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
}
/**
* Give a listener a chance to modify the exit status from a step. The value
* returned will be combined with the normal exit status using
* {@link ExitStatus#and(ExitStatus)}.
* <p>
* Called after execution of step's processing logic (both successful or
* failed). Throwing exception in this method has no effect, it will only be
* logged.
*
* @param stepExecution {@link StepExecution} instance.
* @return an {@link ExitStatus} to combine with the normal value. Return
* {@code null} to leave the old value unchanged.
*/
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " 共计导入:" + stepExecution.getWriteCount() + "行数据");
return stepExecution.getExitStatus();
}
/**
* Callback before a job executes.
*
* @param jobExecution the current {@link JobExecution}
*/
@Override
public void beforeJob(JobExecution jobExecution) {
}
/**
* Callback after completion of a job. Called after both both successful and
* failed executions. To perform logic on a particular status, use
* "if (jobExecution.getStatus() == BatchStatus.X)".
*
* @param jobExecution the current {@link JobExecution}
*/
@Override
public void afterJob(JobExecution jobExecution) {
}
}

View File

@ -0,0 +1,37 @@
package com.mybatisflex.test.config;
import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Info;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* swagger配置
*
* @author sam
*/
@Configuration
public class OpenApiConfig {
/**
* 本系统端口
*/
@Value("${spring-boot.version:2.7.11}")
private String version;
/**
* swagger2 http://localhost:8080/swagger-ui.html
* swagger3 http://localhost:8080/swagger-ui/index.html
*
* @return
*/
@Bean
public OpenAPI myOpenAPI() {
return new OpenAPI()
.info(new Info()
.title("mybatis-flex测试")
.description("mybatis-flex")
.version(version));
}
}

View File

@ -0,0 +1,77 @@
package com.mybatisflex.test.controller;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 测试SpringBatch功能的controller
*/
@RestController
@CrossOrigin
@RequestMapping("job")
@Tag(name = "SpringBatch测试", description = "SpringBatch测试")
public class BatchController {
/**
* job执行器
*/
@Autowired
@Lazy
private JobLauncher jobLauncher;
/**
* springbatch job
*/
@Autowired
@Lazy
@Qualifier("testImportJob")
private Job testImportJob;
/**
* springbatch job 测试
*
* @return
*/
@GetMapping("testImportJob")
public Map<String, Object> testImportJob() {
try {
JobParametersBuilder parameters = new JobParametersBuilder();
//为了防止任务无法重复创建的bug
parameters.addDate("createDate", new Date());
JobParameters jobParameters = parameters.toJobParameters();
JobExecution run1 = jobLauncher.run(testImportJob, jobParameters);
run1.setStartTime(new Date());
while (run1.isRunning()) {
Thread.sleep(500);
}
Map<String, Object> map = new HashMap<>();
map.put("code", 200);
map.put("success", true);
map.put("message", "job completed");
return map;
} catch (Exception ex) {
Map<String, Object> map = new HashMap<>();
map.put("code", 500);
map.put("success", false);
map.put("message", ex.getMessage());
return map;
}
}
}

View File

@ -36,6 +36,26 @@
# url: jdbc:mysql://localhost:3306/flex_test
# username: root
# password: 12345678
#禁止项目启动自动运行job
spring:
batch:
job:
enabled: false
#springdoc-openapi项目配置
springdoc:
swagger-ui:
path: /swagger-ui.html
tags-sorter: alpha
operations-sorter: alpha
api-docs:
path: /v3/api-docs
group-configs:
- group: 'default'
paths-to-match: '/**'
packages-to-scan: com.mybatisflex.test.controller
mybatis-flex:
datasource:
ds1: