如何以安全的方式进行 Xtrim(修剪 redis 流)

问题描述:

Redis Xtrim 是保留一些记录并删除旧的.

Redis Xtrim is to keep a number of records and remove the older ones.

假设我有 100 条记录,我想保留最新的 10 条记录.(假设 java/scala API)

suppose I am having 100 records, I want to keep the newest 10 ones. (suppose java/scala API)

redis = new Jedis(...)
redis.xlen("mystream") // returns 100
// new records arrive stream at here
redis.xtrim("mystream", 10)

然而,新记录在 xlenxtrim 的执行之间到达.例如现在有 105 条记录,它会保留 95 - 105,但是我想保留从 90 开始.所以它也会修剪 90 - 95,这很糟糕.

however, new records arrive between the execution of xlen and xtrim. So for example now there are 105 records, it will keep 95 - 105, however I want to keep start from 90. So it also trim 90 - 95, which is bad.

有什么想法吗?

选项 1:如果您始终只想保留 10 个最新的,请使用:

Option 1: If you always want to keep only the 10 newest ones, use:

XADD mystream MAXLEN 10 * field value ...

通过将修剪移至加法,您可以将其始终保持为 10.您可以忘记单独的修剪逻辑.

By moving trimming to the addition, you keep it always at 10. You can forget about separate logic to trim.

在绝地武士中,

xadd(String key, StreamEntryID id, Map<String,String> hash, long maxLen, boolean approximateLength)

选项 2:乐观,使用交易.

WATCH mystream 
XLEN mystream
MULTI
XTRIM mystream MAXLEN 10
EXEC

这里,如果流被修改,事务将失败,EXEC返回null,然后重试.

Here, if the stream has been modified, the transaction will fail, EXEC returns null, you then retry.

Jedis 交易示例

选项 3:

使用 Lua 脚本 调整您的 XTRIM 目标编号.有了这个,你可以执行 XLEN,做你的逻辑来决定你想要的修剪,然后做 XTRIM,所有 Redis 服务器端,原子地,所以没有您的直播有机会在此期间发生变化.

Use a Lua Script to adjust your XTRIM target number. With this, you can execute the XLEN, do your logic to decide on the trimming you want, and then do XTRIM, all Redis-server-side, atomically, so no chance for your stream changing in-between.

这里有一些 eval 示例 与绝地武士.

Here some eval examples with Jedis.

更新:

像下面这样的 Lua 脚本应该可以解决问题:原子地计算 myStreams 条目的当前数量,然后相应地调用 XTRIM.

A Lua Script like the following should do the trick: atomically evaluate the current number of myStreams entries and then call XTRIM accordingly.

EVAL "local streamLen = redis.call('XLEN', KEYS[1]) \n if streamLen >= tonumber(ARGV[1]) then \n return redis.call('XTRIM', KEYS[1], 'MAXLEN', streamLen - tonumber(ARGV[1])) \n else return redis.error_reply(KEYS[1]..' has less than '..ARGV[1]..' items') end" 1 myStream 90

我们来看看 Lua 脚本:

Let's take a look at the Lua script:

local streamLen = redis.call('XLEN', KEYS[1])
if streamLen >= tonumber(ARGV[1]) then 
    return redis.call('XTRIM', KEYS[1], 'MAXLEN', streamLen - tonumber(ARGV[1]))
else
    return redis.error_reply(KEYS[1]..' has less than '..ARGV[1]..' items')
end

我在 如何从 Redis 流中删除或移除给定数量的条目?