用相同的线程Jedis订阅多个频道
我有一个使用Redis发布/订阅在Java中使用Jedis客户端在客户端之间传输消息的应用程序.我希望能够在用户键入命令时在运行时订阅频道,但是由于订阅是一项阻塞操作,因为它在调用订阅的线程上进行侦听,所以我不确定以后如何订阅其他频道在原始线程上.
I have an application that uses Redis publish/subscribe to transfer messages between clients using the Jedis client in Java. I want to be able to subscribe to channels at runtime when the user types a command but as subscribe is a blocking operation as it listens on the thread that calls subscribe I'm not sure how to go about subscribing to other channels at a later date on the original thread.
示例:
private PubSubListener psl = new PubSubListener();
public void onCommand(String[] args) {
subscribe(args[0]);
}
public void subscribe(String channel) {
Jedis jedis = jedisPool.getResource();
jedis.subscribe(psl, channel);
}
这将起作用,除了用于调度该命令的线程将用于轮询Redis,而我将无法使用该线程订阅更多的频道.
This would work except the thread that dispatches the command would then be used to poll Redis and I wouldn't be able to subscribe to any more channels with that thread.
我发现了同样的问题,即订阅后订阅线程就会阻塞.为了解决这个问题,我使用 Netty 实现了一个优化的发布/订阅客户端,并将其合并到Jedis分支
I observed the same problem, namely that the subscribing thread blocks once you subscribe. To address this, I implemented an optimized pub/sub client using Netty and incorporated it into a Jedis fork here. It's not a comprehensive solution and I have not had time to really finish it up, but it works for basic channel and pattern subscriptions.The basics are:
使用以下命令获取pubsub实例:
Acquire a pubsub instance using:
public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout)
使用以下方式发布/取消模式订阅:
Issue/Cancel pattern subscriptions using:
public ChannelFuture psubscribe(String... patterns)
public ChannelFuture punsubscribe(String... patterns)
您可以忽略返回的ChannelFuture,除非您想100%确定您的请求通过了(这是紧急的).
you can ignore the returned ChannelFuture unless you want to make 100% certain your request gets through (it's asynch).
使用以下方式发布/取消频道订阅:
Issue/Cancel channel subscriptions using:
public ChannelFuture subscribe(String... channels)
public ChannelFuture unsubscribe(String... channels)
然后实现SubListener实例:
Then implement SubListener instances:
public interface SubListener {
/**
* Callback when a message is published on a subscribed channel
* @param channel The channel the message was received on
* @param message The received message
*/
public void onChannelMessage(String channel, String message);
/**
* Callback when a message is published on a subscribed channel matching a subscribed pattern
* @param pattern The pattern that the channel matched
* @param channel The channel the message was received on
* @param message The received message
*/
public void onPatternMessage(String pattern, String channel, String message);
}
并使用以下命令注册/取消注册侦听器:
and register/unregister the listeners using:
public void registerListener(SubListener listener)
public void unregisterListener(SubListener listener)
OptimizedPubSub从不阻止,并且事件不会异步传递到已注册的SubListeners.
OptimizedPubSub never blocks and events are delivered to the registered SubListeners asynchronously.
fork现在有点旧了,因此它在当前形式中可能对您没有用,但是您可以轻松地从该包中提取源代码并独立构建它.依赖关系是Jedis和Netty.
The fork is a bit old now, so it may not be useful to you in its current form, but you can easily pull the source in that package and build it stand-alone. The dependencies are Jedis and Netty.
对不起,我没有更全面的解决方案.
Sorry I did not have a more comprehensive solution.