使用apache Beam JAVA SDK将流作业中的失败行插入写入bigquery?

问题描述:

在运行流式作业时,总是有插入到大查询中时未处理的行的日志,这总是很好的.捕获这些内容并将其写入另一个大查询表中,将可以弄清出了什么问题.

While running a streaming job its always good to have logs of rows which were not processed while inserting into big query. Catching and write those into another big query table will give an idea for what went wrong.

下面是您可以尝试实现的步骤.

Below are the steps that you can try to achieve the same.

先决条件:

  • apache-beam> = 2.10.0或最新版本

使用sdk中可用的 getFailedInsertsWithErr()函数,您可以轻松捕获失败的插入内容并推送到另一个表以执行RCA.这成为调试无限运行的流管道的重要功能.

Using the getFailedInsertsWithErr() function available in the sdk you can easily catch the failed inserts and push to another table for performing RCA. This becomes an important feature for debugging streaming pipelines which are running infinitely.

BigQueryInsertError 是一个错误函数,由大查询针对失败的TableRow返回.这将包含以下参数

BigQueryInsertError is an error function that is thrown back by big query for a failed TableRow. This will contain the following parameters

  • 行.
  • 错误堆栈跟踪和错误消息有效负载.
  • 表引用对象.

可以捕获以上参数并将其推入另一个bq表.错误记录的示例架构.

The above parameters can be captured and pushed into another bq table. Example schema for error records.

    "fields": [{
            "name": "timestamp",
            "type": "TIMESTAMP",
            "mode": "REQUIRED"
        },
        {
            "name": "payloadString",
            "type": "STRING",
            "mode": "REQUIRED"
        },
        {
            "name": "errorMessage",
            "type": "STRING",
            "mode": "NULLABLE"
        },
        {
            "name": "stacktrace",
            "type": "STRING",
            "mode": "NULLABLE"
        }
    ]
}