CentOS6.8中简介ActiveMQ跟Zookeeper的Java运用
ActiveMQ简介
ActiveMQ是Apache出品,流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS1.1和1.4使用J2EE企业集成模式和许多先进的功能。
特性
多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP。完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)。对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA。支持通过JDBC和journal提供高速的消息持久化。从设计上保证了高性能的集群,客户端-服务器,点对点,支持Ajax。
ActiviteMQ消息的3种形式
JMS 公共 |
点对点域 |
发布/订阅域 |
ConnectFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
1.点对点方式(point-to-point)
点对点的消息发送方式主要建立在Message Queue,Sender,Reciever上,Message Queue存贮消息,Sender发送消息,Receive接收消息。具体点就是Sender Client发送Message Queue ,而Receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
接收和发送消息基本流程
发送方:
(1).创建连接使用的工厂类JMS ConnectionFactory
(2).使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3).使用连接Connection 建立会话Session
(4).使用会话Session和管理对象Destination创建消息生产者MessageSender
(5).使用消息生产者MessageSender发送消息
接收方:
(1).创建连接使用的工厂类JMS ConnectionFactory
(2).使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3).使用连接Connection 建立会话Session
(4).使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5).使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver。消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。
2.发布/订阅方式(publish/subscriber messaging)
发布/订阅方式用于多接收客户端的方式。作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收它创建以后发送客户端发送的信息。作为subscriber,在接收消息时有两种方法,destination的receive方法,和实现message listener接口的onMessage 方法。
Pub/Sub模式(Topic)
Pub/Sub模式与P2P模式不同, P2P模式Sender发送后会保存在容器中, 直到Receiver来取走, 消息抛弃; Pub/Sub模式过程如下:
ActiveMQ启动后发布消息1,如果没有消费者启动着,也就是没有消费者进行了订阅。那么这个消息就被抛弃了。
消费者1启动了,连接了activemq,进行了订阅,在等待消息activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。
消费者2也启动了,连接了activemq,进行了订阅,在等待消息activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
消费者1关掉了。activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。
消费者1又启动了。activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
安装ActiveMQ
说明:ActiveMQ需要预先安装JDK
下载activemq
cd
wget http://apache.fayea.com/activemq/5.14.4/apache-activemq-5.14.4-bin.tar.gz
tar -zxvf apache-activemq-5.14.4-bin.tar.gz
mv apache-activemq-5.14.4-bin /usr/local/activemq-5.14.4
2.在/etc/init.d/目录增加activemq文件,文件内容为:
cd /etc/init.d/
vim activemq
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/bin/sh export CATALINA_HOME=/usr/local/activemq-5.14.4 case $1 in start) sh $CATALINA_HOME/bin/activemq start ;; stop) sh $CATALINA_HOME/bin/activemq stop ;; restart) sh $CATALINA_HOME/bin/activemq stop sleep 1 sh $CATALINA_HOME/bin/activemq start ;;
esac exit 0 |
3.对activemq文件授予权限。
chmod 777 activemq
4.设置开机启动并启动activemq
chkconfig activemq on
service activemq start
访问地址:http://IP地址:8161/。
访问成功,ActiveMQ安装完毕。默认用户名密码为:admin/admin。
5.其他
查看activemq状态:service activemq status
其他和关闭activemq服务:service activemq start,service activemq stop
设置开机启动或不启动activemq服务:
chkconfig activemq on,chkconfig activemq off
启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在\conf\jetty-realm.properties中修改即可。
怎么在Java中使用请参考附件
ZooKeeper安装部署
安装部署
鉴于阿里云服务器数量有限,此处实现zk server的伪集群安装。
1.1 下载解压
cd
wget http://apache.fayea.com/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -zxvf zookeeper-3.4.9.tar.gz
mv zookeeper-3.4.9.tar /usr/local/zookeeper1
cd /usr/local/zookeeper1
mkdir data
mkdir log
cd /usr/local/zookeeper1/conf
cp zoo_sample.cfg zoo.cfg
cd /usr/local
cp -rf zookeeper1 zookeeper2
cp -rf zookeeper1 zookeeper3
1.2 设置每个目录下conf/zoo.cfg配置文件
vim /usr/local/zookeeper1/conf/zoo.cfg
修改以下内容:
dataDir=/usr/local/zookeeper1/data
dataLogDir=/usr/local/zookeeper1/log
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
同理设置zookeeper2和zookeeper3,但是clientPort需要一次加1为:2182,2183。
最后在每个zk server配置文件的dataDir所对应的目录下必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同,即:
/usr/local/zookeeper1/data/myid 中的内容为1,对应server.1中的1
cd /usr/local/zookeeper1/data/
echo 1 >> myid(此处注意这里的空格与>的区别)
同理设置zookeeper2和zookeeper3的myid的值如下:
/usr/local/zookeeper2/data/myid 中的内容为2,对应server.2中的2
/usr/local/zookeeper3/data/myid 中的内容为3,对应server.3中的3
1.3 启动验证
/usr/local/zookeeper1/bin/zkServer.sh start
/usr/local/zookeeper2/bin/zkServer.sh start
/usr/local/zookeeper3/bin/zkServer.sh start
启用成功后,输入jps看下进程
17011 Jps
10885 activemq.jar
16776 QuorumPeerMain
16410 QuorumPeerMain
16717 QuorumPeerMain
启动客户端测试:
/usr/local/zookeeper1/bin/zkCli.sh -server localhost:2181
(如果是远程连接,把localhost换成指定的IP即可)
成功后会进到提示符下:
[zk: localhost:2181(CONNECTED) 0]
然后就可以用一些基础命令,比如 ls ,create ,delete ,get 来测试
二、java 与 zk的连接示例
2.1 maven项目的pom.xml中先添加以下依赖项
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
2.2 最基本的示例程序(IP表示IP地址)
import java.io.IOException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
public class ZooKeeperHello {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper("IP:2181", 300000, new DemoWatcher());
//连接zk server
String node = "/app1";
Stat stat = zk.exists(node, false);//检测/app1是否存在
if (stat == null) {
//创建节点
String createResult = zk.create(node, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(createResult);
}
//获取节点的值
byte[] b = zk.getData(node, false, stat);
System.out.println(new String(b));
zk.close();
}
static class DemoWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("----------->");
System.out.println("path:" + event.getPath());
System.out.println("type:" + event.getType());
System.out.println("stat:" + event.getState());
System.out.println("<-----------");
}
}
}
2.3 与zk集群的连接
Zookeeper的优点之一就是高可用性,集群的连接:
ZooKeeper zk = new ZooKeeper("IP:2181,IP:2182,IP:2183", 300000, new DemoWatcher());
集群存在极罕见的故障,比如以上代码执行时,刚初始化完成的时候正准备连接IP1时,因为网络故障IP1对应的server挂了,仍然会报错(此时zk还来不及选出新leader)。问题详见http://segmentfault.com/q/1010000002506725/a-1020000002507402,修改如下:
ZooKeeper zk = new ZooKeeper("IP:2181,IP:2182,IP:2183", 300000, new DemoWatcher());
if (!zk.getState().equals(ZooKeeper.States.CONNECTED)) {
while (true) {
if (zk.getState().equals(ZooKeeper.States.CONNECTED)) {
break;
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
用开源的zkClient,官方地址: https://github.com/sgroschupf/zkclient,使用方法:
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
@Test
public void testZkClient() {
ZkClient zkClient = new ZkClient("IP:2181,IP:2182,IP:2183");
String node = "/app2";
if (!zkClient.exists(node)) {
zkClient.createPersistent(node, "hello zk");
}
System.out.println(zkClient.readData(node));
}