SODBASE CEP学习进阶篇(6):实现反压和流限速
前面文章介绍过流数据输入速率要和处理能力相匹配,短时数据爆发由内部缓冲队列来缓冲。如果确实存在某个时间点持续数据爆发,可以考虑采取反压限流的方法。
1. 示例操作步骤
(1)下载SODBASE Studio2.0.22(sp1)以上版本,解压,打开configuration/global.properties,将引擎的缓冲队列长度设的较小,作以下配置
maxqueuelength=100
warnqueuelength=20
(2)下载示例EPL模型backpress01.sod、backpress02.sod
backpress01功能:CPU满负荷不停的产生模拟数据,打印到屏幕。因为严格地讲,屏幕打印的速度也是跟不上CPU满负荷产生数据速度的。因此容易达到队列报警值20。
backpress02功能:将system.sys_warn的报警事件反接到backpress01的控制Socket端口上。
(3)打开SODBASE Studio,导入backpress01.sod、backpress02.sod
先测试运行backpress02,再将backpress01也测试运行起来。
(4)结果输出
可以看到如下的输出结果,并且发现隔一段时间,数据就慢下来,这就是反压限速的原因。
2. 工作原理
2.1可控制输入适配器
上面示例能够实现反压限速,是因为编写backpress01的输入适配器时,继承了SODBASE CEP一类可控制输入适配器。这类可控制输入适配器,通过Socket监听控制事件,并在接收到控制事件后调用回调函数callback(PrimitiveEvent e),在回调函数中可以让输入流sleep一段时间。
读者要实现同样的功能,只需集成实现com.sodbase.inputadaptor.controllable.ControllableInputAdaptorI类。前3个参数默认为数据流名、控制监听端口、控制监听端口超时时间(ms)。读者要添加参数,可以从第4个参数开始添加。示例中的输入适配器代码如下
public class ControllableTestInputAdaptor extends ControllableInputAdaptorI { private boolean running=true; private long suspendtime=0; @Override public void setUp() { //必须调用super.setUp()启动控制监听端口 super.setUp(); } @Override public void callback(PrimitiveEvent primitiveEvent) { if(primitiveEvent.getAttributeValueType("cause").getValue().equals(Constants.causecode1)) suspendtime=5000; } @Override public boolean isRunning() { return running; } @Override public void stopInputStream() { //必须调用super.stopInputStream()关闭控制监听端口 super.stopInputStream(); this.running=false; } public void run() { int count = 1; while (running) { try { if(suspendtime>0) { Thread.sleep(suspendtime); suspendtime=0; } } catch (InterruptedException e) { e.printStackTrace(); } PrimitiveEvent primitiveEvent = new PrimitiveEvent(); primitiveEvent.getAttributeMap().put("id", new ValueType(String.valueOf(count),"string")); count++; Date d = new Date(); long time = d.getTime(); primitiveEvent.setStart_ts(time); primitiveEvent.setEnd_ts(time); this.putEventToStream(primitiveEvent); } } }
2.2 系统报警事件
SODBASE CEP允许通过warnqueuelength配置缓冲队列长度的报警长度,可针对报警采取一些运维管理措施。如果不配置,默认为最大缓冲队列长度maxqueuelength的80%。
system.sys_warning和system.sys_error是系统内置报警流,通常会含3个字段
(1)cause:报警和报错的原因
(2)queryname:引起报警和报错的EPL语句名称
(3)message:消息提示
如cause=’warnqueuelengthexceeded’时,即超过了缓冲队列的报警阈值。
2.3 注意事项
(1)多用户环境下,报警流queryname字段会加前缀"用户名."。
(2)设计系统时,不要过度依赖反压限速功能,因为反压限速会增加输入端负载,也会给系统带来新的问题。正确方式是在系统架构初期,采用模拟数据和最大输入速率配置好缓冲区大小,设计好处理方式并留出余量,让处理能力和输入速率相匹配。
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
版权声明:本文为博主原创文章,未经博主允许不得转载。