spring Batch兑现数据库大数据量读写
1. data-source-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. --> <context:component-scan base-package="net.etongbao.vasp.ac" /> <!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close" abstract="false" scope="singleton"> <!-- oracle.jdbc.driver.oracleDriver --> <property name="driverClass" value="oracle.jdbc.OracleDriver" /> <property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" /> <property name="user" value="USR_DEV01" /> <property name="password" value="2AF0829C" /> <property name="checkoutTimeout" value="30000" /> <property name="maxIdleTime" value="120" /> <property name="maxPoolSize" value="100" /> <property name="minPoolSize" value="2" /> <property name="initialPoolSize" value="2" /> <property name="maxStatements" value="0" /> <property name="maxStatementsPerConnection" value="0" /> <property name="idleConnectionTestPeriod" value="30" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <tx:annotation-driven transaction-manager="transactionManager" /> </beans>
2. quartz-context.xml commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,
每次回滚数就是commit-interval数
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <import resource="data-source-context.xml"/> <!-- JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <!-- batch config --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB --> <batch:job id="ledgerJob"> <batch:listeners> <batch:listener ref="appJobExecutionListener" /> </batch:listeners> <batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager">
<batch:tasklet>
<batch:listeners>
<batch:listener ref="itemFailureLoggerListener" />
</batch:listeners>
<batch:chunk reader="ledgerReader" writer="ledgerWriter"
commit-interval="10000" /> <!-- 1万条进行一次commit -->
</batch:tasklet>
</batch:tasklet> </batch:step> </batch:job> <!-- READER --> <bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="select * from ledger" /> <property name="rowMapper" ref="ledgerRowMapper" /> </bean> <!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理, 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance --> <bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" /> <!-- 定时任务 开始 --> <bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject"> <!-- 定时执行的类 --> <ref bean="quartzLedgerJob" /> </property> <property name="targetMethod"> <!-- 定时执行的类方法 --> <value>execute</value> </property> </bean> <bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" > <!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 --> <property name="jobDetail" > <ref bean="ledgerJobDetail" /> </property> <!--在每天下午18点到下午18:59期间的每1分钟触发 --> <!--在每天上午10点40分准时触发 --> <property name="cronExpression" > <!-- <value>0 * 15 * * ?</value> --> <value>0 45 10 * * ? * </value> </property> </bean> <!-- 触发器工厂,将所有的定时任务都注入工厂--> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!-- 添加触发器 --> <property name="triggers"> <list> <!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)--> <ref local="ledgerCronTrigger" /> </list> </property> </bean> <!-- 定时任务 结束 --> </beans>
3.定时调度job类 QuartzLedgerJob.java
package net.etongbao.vasp.ac.quartz;
import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; /** * 定时调度类 * @author Fu Wei * */ @Component("quartzLedgerJob") public class QuartzLedgerJob { private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class); @Autowired private JobLauncher jobLauncher; @Autowired private Job ledgerJob; @Autowired JobParametersBuilder jobParameterBulider; private static long counter = 0l; /** * 执行业务方法 * @throws Exception */ public void execute() throws Exception { LOG.debug("start..."); StopWatch sw = new StopWatch(); sw.start(); /* * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑, * 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理, * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance */ jobParameterBulider.addDate("date", new Date()); jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters()); sw.stop(); LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter); } }
4.程序启动类 StartQuartz.java
package net.etongbao.vasp.ac.quartz;
import java.io.FileNotFoundException; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 启动定时调度 * @author Fu Wei * */ public class StartQuartz { public static void main(String[] args) throws FileNotFoundException { new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml"); } }
5.pojo类 Ledger.java
package net.etongbao.vasp.ac.pojo; import java.io.Serializable; import java.util.Date; public class Ledger implements Serializable { private int id; private Date receiptDate; private String memberName; private String checkNumber; private Date checkDate; private String paymentType; private double depositAmount; private double paymentAmount; private String comments; public Ledger() { super(); } public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType, double depositAmount, double paymentAmount, String comments) { super(); this.id = id; this.receiptDate = receiptDate; this.memberName = memberName; this.checkNumber = checkNumber; this.checkDate = checkDate; this.paymentType = paymentType; this.depositAmount = depositAmount; this.paymentAmount = paymentAmount; this.comments = comments; } public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getReceiptDate() { return receiptDate; } public void setReceiptDate(Date receiptDate) { this.receiptDate = receiptDate; } public String getMemberName() { return memberName; } public void setMemberName(String memberName) { this.memberName = memberName; } public String getCheckNumber() { return checkNumber; } public void setCheckNumber(String checkNumber) { this.checkNumber = checkNumber; } public Date getCheckDate() { return checkDate; } public void setCheckDate(Date checkDate) { this.checkDate = checkDate; } public String getPaymentType() { return paymentType; } public void setPaymentType(String paymentType) { this.paymentType = paymentType; } public double getDepositAmount() { return depositAmount; } public void setDepositAmount(double depositAmount) { this.depositAmount = depositAmount; } public double getPaymentAmount() { return paymentAmount; } public void setPaymentAmount(double paymentAmount) { this.paymentAmount = paymentAmount; } public String getComments() { return comments; } public void setComments(String comments) { this.comments = comments; } }
6. LedgerDaoImpl.java
package net.etongbao.vasp.ac.dao.impl;
import java.sql.PreparedStatement; import java.sql.SQLException; import net.etongbao.vasp.ac.dao.LedgerDao; import net.etongbao.vasp.ac.pojo.Ledger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementSetter; import org.springframework.stereotype.Repository; /** * ledger数据操作类 * * @author Fu Wei * */ @Repository public class LedgerDaoImpl implements LedgerDao { private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)"; @Autowired private JdbcTemplate jdbcTemplate; @Override public void save(final Ledger item) { jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() { public void setValues(PreparedStatement stmt) throws SQLException { stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime())); stmt.setString(2, item.getMemberName()); stmt.setString(3, item.getCheckNumber()); stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime())); stmt.setString(5, item.getPaymentType()); stmt.setDouble(6, item.getDepositAmount()); stmt.setDouble(7, item.getPaymentAmount()); stmt.setString(8, item.getComments()); } }); } }
7.接口 LedgerDao .java
package net.etongbao.vasp.ac.dao; import net.etongbao.vasp.ac.pojo.Ledger; public interface LedgerDao { public void save(final Ledger item) ; }
8. JdbcTemplete 需要的LedgerRowMapper.java
package net.etongbao.vasp.ac.batch.writer;
import java.sql.ResultSet; import java.sql.SQLException; import net.etongbao.vasp.ac.pojo.Ledger; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; /** * ledger行的映射类 * @author Administrator * */ @Component("ledgerRowMapper") public class LedgerRowMapper implements RowMapper { public Object mapRow(ResultSet rs, int rowNum) throws SQLException { Ledger ledger = new Ledger(); ledger.setId(rs.getInt("id")); ledger.setReceiptDate(rs.getDate("rcv_dt")); ledger.setMemberName(rs.getString("mbr_nm")); ledger.setCheckNumber(rs.getString("chk_nbr")); ledger.setCheckDate(rs.getDate("chk_dt")); ledger.setPaymentType(rs.getString("pymt_typ")); ledger.setDepositAmount(rs.getDouble("dpst_amt")); ledger.setPaymentAmount(rs.getDouble("pymt_amt")); ledger.setComments(rs.getString("comments")); return ledger; } }
9.关键类LedgerWriter.java ,写入数据,负责数据的添加
package net.etongbao.vasp.ac.batch.writer; import java.util.List; import net.etongbao.vasp.ac.dao.LedgerDao; import net.etongbao.vasp.ac.pojo.Ledger; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * ledger写入数据 * * @author Fu Wei * */ @Component("ledgerWriter") public class LedgerWriter implements ItemWriter<Ledger> { @Autowired private LedgerDao ledgerDao; /** * 写入数据 * * @param ledgers */ public void write(List<? extends Ledger> ledgers) throws Exception { for (Ledger ledger : ledgers) { ledgerDao.save(ledger); } } }
classPath:
<?xml version="1.0" encoding="UTF-8"?>
<classpath> <classpathentry kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/> <classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/> <classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/> <classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/> <classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/> <classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/> <classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/> <classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/> <classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/> <classpathentry kind="lib" path="lib/ojdbc6.jar"/> <classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/> <classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/> <classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/> <classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/> <classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/> <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/> <classpathentry kind="output" path="bin"/> </classpath>
总结: 测试数据8万多条,响应时间3分多钟。
关键在于quartz-context.xml 中<bean id="ledgerReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="select * from ledger" />
<property name="rowMapper" ref="ledgerRowMapper" />
</bean> 负责读取数据 ,在程序执行时一次性抓取全部数据后在批量的交给LedgerWriter进行写操作。当然也可以使用分页读取JdbcPagingItemReader,但要分页数量与写入数量要大写相同,还可以对分页出来的数据进行添加悲观锁
LedgerWriter.java 负责写入数据,每次写入1000条。