benthos 通过rest api 配置 stream 说明

stream 模式,我们可以通过rest api 进行控制

使用方法

  • 启动
benthos --streams
  • 进行流的配置(rest api)
curl http://localhost:4195/streams/foo -X POST --data-binary @- <<EOF
input:
  type: http_server
buffer:
  type: memory
pipeline:
  threads: 4
  processors:
  - type: jmespath
    jmespath:
      query: "{id: user.id, content: body.content}"
output:
  type: http_server
EOF
  • 查询配置
    benthos 通过rest api 配置 stream 说明
  • 添加另外一个stream 配置
curl http://localhost:4195/streams/bar -X POST --data-binary @- <<EOF
input:
  type: kafka
  kafka:
    addresses:
    - localhost:9092
    topic: my_topic
buffer:
  type: none
pipeline:
  threads: 1
  processors:
  - type: sample
    sample:
      retain: 10
output:
  type: elasticsearch
  elasticsearch:
    urls:
    - http://localhost:9200
EOF
  • 获取单个流配置信息(rest api)
curl http://localhost:4195/streams/foo | jq '.'

benthos 通过rest api 配置 stream 说明

  • 更新配置(api)

    以下移除 memery buffer

curl http://localhost:4195/streams/foo -X PUT --data-binary @- <<EOF
input:
  type: http_server
buffer:
  type: none
pipeline:
  threads: 4
  processors:
  - type: jmespath
    jmespath:
      query: "{id: user.id, content: body.content}"
output:
  type: http_server
EOF
  • 删除配置的stream
curl http://localhost:4195/streams/foo  -X DELETE

参考资料

https://github.com/Jeffail/benthos/blob/HEAD/docs/streams/using_REST_API.md