CentOS6.8中简介ActiveMQ跟Zookeeper的Java运用

CentOS6.8中简介ActiveMQ和Zookeeper的Java运用

ActiveMQ简介

ActiveMQ是Apache出品,流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS1.1和1.4使用J2EE企业集成模式和许多先进的功能。

特性

多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP。完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)。对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA。支持通过JDBC和journal提供高速的消息持久化。从设计上保证了高性能的集群,客户端-服务器,点对点,支持Ajax。

ActiviteMQ消息的3种形式

JMS 公共

点对点域

发布/订阅域

ConnectFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

1.点对点方式(point-to-point)

点对点的消息发送方式主要建立在Message Queue,Sender,Reciever上,Message Queue存贮消息,Sender发送消息,Receive接收消息。具体点就是Sender Client发送Message Queue ,而Receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

接收和发送消息基本流程

发送方:

(1).创建连接使用的工厂类JMS ConnectionFactory

(2).使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3).使用连接Connection 建立会话Session

(4).使用会话Session和管理对象Destination创建消息生产者MessageSender

(5).使用消息生产者MessageSender发送消息

接收方:

(1).创建连接使用的工厂类JMS ConnectionFactory

(2).使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3).使用连接Connection 建立会话Session

(4).使用会话Session和管理对象Destination创建消息接收者MessageReceiver

(5).使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver。消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

2.发布/订阅方式(publish/subscriber messaging)

发布/订阅方式用于多接收客户端的方式。作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收它创建以后发送客户端发送的信息。作为subscriber,在接收消息时有两种方法,destination的receive方法,和实现message listener接口的onMessage 方法。

Pub/Sub模式(Topic)

Pub/Sub模式与P2P模式不同, P2P模式Sender发送后会保存在容器中, 直到Receiver来取走, 消息抛弃; Pub/Sub模式过程如下:

ActiveMQ启动后发布消息1,如果没有消费者启动着,也就是没有消费者进行了订阅。那么这个消息就被抛弃了。

消费者1启动了,连接了activemq,进行了订阅,在等待消息activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

消费者2也启动了,连接了activemq,进行了订阅,在等待消息activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

消费者1关掉了。activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

消费者1又启动了。activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

安装ActiveMQ

说明:ActiveMQ需要预先安装JDK

下载activemq

cd

wget http://apache.fayea.com/activemq/5.14.4/apache-activemq-5.14.4-bin.tar.gz

tar -zxvf apache-activemq-5.14.4-bin.tar.gz

mv apache-activemq-5.14.4-bin /usr/local/activemq-5.14.4

2.在/etc/init.d/目录增加activemq文件,文件内容为:

cd /etc/init.d/
vim activemq

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

#!/bin/sh

export CATALINA_HOME=/usr/local/activemq-5.14.4

case $1 in

    start)

        sh $CATALINA_HOME/bin/activemq start

    ;;

    stop)

        sh $CATALINA_HOME/bin/activemq stop

    ;;

    restart)

        sh $CATALINA_HOME/bin/activemq stop

        sleep 1

        sh $CATALINA_HOME/bin/activemq start

    ;;

 

esac

exit 0

3.对activemq文件授予权限。

chmod 777 activemq

4.设置开机启动并启动activemq

chkconfig activemq on
service activemq start

访问地址:http://IP地址:8161/。

访问成功,ActiveMQ安装完毕。默认用户名密码为:admin/admin。

5.其他

查看activemq状态:service activemq status

其他和关闭activemq服务:service activemq start,service activemq stop

设置开机启动或不启动activemq服务:

chkconfig activemq on,chkconfig activemq off

启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在\conf\jetty-realm.properties中修改即可。

怎么在Java中使用请参考附件 

ZooKeeper安装部署

安装部署
鉴于阿里云服务器数量有限,此处实现zk server的伪集群安装。
1.1 下载解压

cd

wget http://apache.fayea.com/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -zxvf zookeeper-3.4.9.tar.gz

mv zookeeper-3.4.9.tar /usr/local/zookeeper1

cd /usr/local/zookeeper1

mkdir data

mkdir log

cd /usr/local/zookeeper1/conf

cp zoo_sample.cfg zoo.cfg

cd /usr/local

cp -rf zookeeper1 zookeeper2

cp -rf zookeeper1 zookeeper3
1.2 设置每个目录下conf/zoo.cfg配置文件

vim /usr/local/zookeeper1/conf/zoo.cfg
修改以下内容:
dataDir=/usr/local/zookeeper1/data
dataLogDir=/usr/local/zookeeper1/log
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

同理设置zookeeper2和zookeeper3,但是clientPort需要一次加1为:2182,2183。
最后在每个zk server配置文件的dataDir所对应的目录下必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同,即:

/usr/local/zookeeper1/data/myid 中的内容为1,对应server.1中的1

cd /usr/local/zookeeper1/data/

echo 1 >> myid(此处注意这里的空格与>的区别)
同理设置zookeeper2和zookeeper3的myid的值如下:
/usr/local/zookeeper2/data/myid 中的内容为2,对应server.2中的2
/usr/local/zookeeper3/data/myid 中的内容为3,对应server.3中的3
1.3 启动验证 
/usr/local/zookeeper1/bin/zkServer.sh start
/usr/local/zookeeper2/bin/zkServer.sh start
/usr/local/zookeeper3/bin/zkServer.sh start
启用成功后,输入jps看下进程
17011 Jps

10885 activemq.jar

16776 QuorumPeerMain

16410 QuorumPeerMain

16717 QuorumPeerMain
启动客户端测试:
/usr/local/zookeeper1/bin/zkCli.sh -server localhost:2181
(如果是远程连接,把localhost换成指定的IP即可)
成功后会进到提示符下:
[zk: localhost:2181(CONNECTED) 0]  
然后就可以用一些基础命令,比如 ls ,create ,delete ,get 来测试

二、java 与 zk的连接示例

2.1 maven项目的pom.xml中先添加以下依赖项
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.6</version>
</dependency>
2.2 最基本的示例程序(IP表示IP地址)
import java.io.IOException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
public class ZooKeeperHello {
  public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    ZooKeeper zk = new ZooKeeper("IP:2181", 300000, new DemoWatcher());

      //连接zk server
    String node = "/app1";
    Stat stat = zk.exists(node, false);//检测/app1是否存在
    if (stat == null) {
     //创建节点
     String createResult = zk.create(node, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      System.out.println(createResult);
    }
    //获取节点的值
    byte[] b = zk.getData(node, false, stat);
    System.out.println(new String(b));
    zk.close();
  }
  static class DemoWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
      System.out.println("----------->");
      System.out.println("path:" + event.getPath());
      System.out.println("type:" + event.getType());
      System.out.println("stat:" + event.getState());
      System.out.println("<-----------");
    }
  }
}
2.3 与zk集群的连接
Zookeeper的优点之一就是高可用性,集群的连接:
ZooKeeper zk = new ZooKeeper("IP:2181,IP:2182,IP:2183", 300000, new DemoWatcher());
集群存在极罕见的故障,比如以上代码执行时,刚初始化完成的时候正准备连接IP1时,因为网络故障IP1对应的server挂了,仍然会报错(此时zk还来不及选出新leader)。问题详见http://segmentfault.com/q/1010000002506725/a-1020000002507402,修改如下:
ZooKeeper zk = new ZooKeeper("IP:2181,IP:2182,IP:2183", 300000, new DemoWatcher());

if (!zk.getState().equals(ZooKeeper.States.CONNECTED)) {
  while (true) {
    if (zk.getState().equals(ZooKeeper.States.CONNECTED)) {
      break;
    }
    try {
      TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
用开源的zkClient,官方地址: https://github.com/sgroschupf/zkclient,使用方法:
<dependency>
  <groupId>com.github.sgroschupf</groupId>
  <artifactId>zkclient</artifactId>
  <version>0.1</version>
</dependency>
@Test
public void testZkClient() {
  ZkClient zkClient = new ZkClient("IP:2181,IP:2182,IP:2183");
  String node = "/app2";
  if (!zkClient.exists(node)) {
    zkClient.createPersistent(node, "hello zk");
  }
  System.out.println(zkClient.readData(node));
}