java多线程体味3-实际项目应用
java多线程体验3-实际项目应用
多线程发现短信的实例:
1、生产获取处理数据的类:
2、消费者,发送短信的线程类:
3、短信发送主线程类:
4、Spring的配置:
<bean id="messageSendService" class="com.csair.cbd.sms.services.impl.MessageSendServiceImpl">
<constructor-arg type="int" value="100" /><!-- buffer size -->
<constructor-arg type="int" value="6000" /><!-- scan interval -->
<property name="messageChannels">
<list>
<bean id="smsChannel" class="com.csair.cbd.sms.services.impl.SMSChannel" >
<property name="url" value="http://10.101.116.12:888/SmsCenterWs/services/ISmsWS"></property> <!-- http://10.101.67.20:80/SmsCenterWs/services/ISmsWS -->
<!-- property name="username" value="test"></property>
<property name="password" value="test"></property-->
<property name="smsUser">
<map>
<!-- 实时发送短信接口帐号 -->
<entry key="sms_user_realtime">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto">
<property name="userId" value="test"/>
<property name="password" value="test"/>
</bean>
</entry>
<!-- 非实时发送短信接口帐号 -->
<entry key="sms_user2_delay">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto"> </bean>
</entry>
</map>
</property>
</bean>
<bean id="emailChannel" class="com.csair.cbd.sms.services.impl.EmailChannel">
</property>
</bean>
</list>
</property>
</bean>
多线程发现短信的实例:
1、生产获取处理数据的类:
private class ScanTask extends TimerTask { @Override public void run() { // if there is no capacity, cancel this scan if (messageQueue.remainingCapacity() <= 0) { return; } // 取出比剩余容量两倍的信息,为下一分钟的扫描间隙准备,put方法可以挂在queue上 List<SysMessageSending> candidate = sysMessageSendingDao.findForSending(messageQueue.remainingCapacity()*2); //System.out.println("取出" + messageQueue.remainingCapacity()*2); for (SysMessageSending msg : candidate ) { if (checkAppoint(msg)) { try { messageQueue.put(msg); } catch (InterruptedException e) { LOG.warn("putting in message Queue is interrupted."); } } else { // invalid message, delete from the message queue SysMessageSendingLog log = new SysMessageSendingLog(); BeanUtils.copyProperties(msg, log); log.setId(null); log.setOid(msg.getId()); //copy the msg's ID as orginal ID log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED); sysMessageSendingLogDao.insert(log); sysMessageSendingDao.delete(msg); } } if (needReception) { LOG.info("start read receptions."); // scan the log table and find the message's whose reception is null List<SysMessageSendingLog> receptWanted = sysMessageSendingLogDao.findMessageLogForReception(); for (SysMessageSendingLog log : receptWanted) { MessageChannel channel = messageChannelMap.get(log.getSendType()); if (channel != null ) { String recept = channel.getReception(log.getSid()); if (recept != null) { sysMessageSendingLogDao.updateReception(log.getSid(), recept); } } } LOG.info("end read receptions."); } } }
2、消费者,发送短信的线程类:
private class MessageSendingThread extends Thread { public MessageSendingThread() { this.setName("MessageSendService.MessageSendingThread"); } /** * write log and delete original message from queue * @param msg * @param success - true: send succefully, false: send failed and expired */ private void afterMessageSending(SysMessageSending msg, boolean success, String sid) { // write success log SysMessageSendingLog log = new SysMessageSendingLog(); BeanUtils.copyProperties(msg, log); log.setId(null); log.setSid(sid); log.setOid(msg.getId()); //copy the msg's ID as orginal ID if (success) { log.setRealDate(new Date()); log.setSendStatus(SysMessageSendingLog.SEND_STATUS_SUCCESS); } else { log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED); } sysMessageSendingLogDao.insert(log); // delete the message from the queue if (msg.getId() != null) { // if it's from the database, delete it sysMessageSendingDao.delete(msg); } } // do job public void run() { LOG.debug("SendingThread is started"); while (true) { try { SysMessageSending msg = messageQueue.take(); LOG.debug("a message was taken from the message queue. ready for sending"); // try to get the correct message channel MessageChannel channel = messageChannelMap.get(msg.getSendType()); if (channel != null ) { try { if (msg.getId() != null && !sysMessageSendingDao.checkForSending(msg.getId())) { // NOTE: just locking the msg and status not synchronized yet! continue; // omit this msg, it's has been processed by another thread } // all the message in the queue take them as ready for sending Message message = new Message(msg.getContent()); String sid = channel.sendMessage(msg.getAddress(), message.getSubject(), message.getText(),msg.getSmsUser()); try { if (transactionManager != null) { // 把操作消息队列表和日志表的操作放置在一个事务中 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); LOG.debug("sending successfully, start transaction for writing message log"); try { afterMessageSending(msg, true, sid); } catch (RuntimeException e) { // to prevent DAO exception interfering the message sending Exeption LOG.error("dao exception happend when transfering message to message log table."); LOG.error("detail error message:" + e.getMessage()); transactionManager.rollback(status); } // commit the transaction if (!status.isCompleted()) { transactionManager.commit(status); LOG.debug("commit transaction."); } LOG.debug("end transaction for message log."); } else { afterMessageSending(msg, true, sid); } } catch (Exception e) { LOG.error("unknown error happend when transfering message to message log table."); e.printStackTrace(); // print detail unknown error } } catch (Exception e) { LOG.warn("error happended when try to send message to:" + msg.getAddress() + " on message channel:" + channel); LOG.warn("error message:" + e.getMessage()); // check if is it expired? if ( new Date().after(msg.getExpireDate()) ) { // write fail message log if (transactionManager != null) { // 把操作消息队列表和日志表的操作放置在一个事务中 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); LOG.debug("send failed and message expired, start transaction for writing message log"); try { afterMessageSending(msg, false, null); } catch (RuntimeException re) { LOG.error("dao exception happened and prevent operations on message sending tables. rolling back."); LOG.error("detail error message:" + e.getMessage()); transactionManager.rollback(status); } if (!status.isCompleted()) { transactionManager.commit(status); LOG.debug("commit transaction."); } LOG.debug("end transaction for message log."); } else { afterMessageSending(msg, false, null); } } else { // not expired but failed if (msg.getId() == null) { LOG.info("message sending not succesful now, persist the message to the queue table."); msg.setStatus("0"); // error happend for the first time sysMessageSendingDao.insert(msg); // save the message if it's not persisted yet } else { // increase the error count sysMessageSendingDao.addErrorCount(msg); } } } } else { LOG.warn("ignored this message because cannot find right channel for the send type:" + msg.getSendType()); LOG.info("set the status to FAILED directly for msg which cann't find right channel."); if (msg.getId() == null) { msg.setStatus(SysMessageSending.STATUS_FAILED); sysMessageSendingDao.insert(msg); } else { sysMessageSendingDao.changeMessageStatus(msg.getId(), SysMessageSending.STATUS_FAILED); } } } catch (InterruptedException e) { LOG.warn("taking on the blocking message queue is interrupted."); } } } }
3、短信发送主线程类:
public MessageSendServiceImpl(int bufferSize, int scanInteval, boolean needReception, boolean runThread) { LOG.debug("initialize message buffer queue size to " + bufferSize); this.messageQueue = new ArrayBlockingQueue<SysMessageSending>(bufferSize); // set if need reception this.needReception = needReception; if (runThread) { // start the inner thread - TODO 可以对于每一种通道开一个线程,提高性能,防止通道之间等待 this.sendingThread = new MessageSendingThread(); this.sendingThread.start(); // start the thread // start the scan thread this.timer.schedule(new ScanTask(), 1000, scanInteval); } }
4、Spring的配置:
<bean id="messageSendService" class="com.csair.cbd.sms.services.impl.MessageSendServiceImpl">
<constructor-arg type="int" value="100" /><!-- buffer size -->
<constructor-arg type="int" value="6000" /><!-- scan interval -->
<property name="messageChannels">
<list>
<bean id="smsChannel" class="com.csair.cbd.sms.services.impl.SMSChannel" >
<property name="url" value="http://10.101.116.12:888/SmsCenterWs/services/ISmsWS"></property> <!-- http://10.101.67.20:80/SmsCenterWs/services/ISmsWS -->
<!-- property name="username" value="test"></property>
<property name="password" value="test"></property-->
<property name="smsUser">
<map>
<!-- 实时发送短信接口帐号 -->
<entry key="sms_user_realtime">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto">
<property name="userId" value="test"/>
<property name="password" value="test"/>
</bean>
</entry>
<!-- 非实时发送短信接口帐号 -->
<entry key="sms_user2_delay">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto"> </bean>
</entry>
</map>
</property>
</bean>
<bean id="emailChannel" class="com.csair.cbd.sms.services.impl.EmailChannel">
</property>
</bean>
</list>
</property>
</bean>