如何将数据从dynamodb推送到流
下面是json文件
[
{
"year": 2013,
"title": "Rush",
"actors": [
"Daniel Bruhl",
"Chris Hemsworth",
"Olivia Wilde"
]
},
{
"year": 2013,
"title": "Prisoners",
"actors": [
"Hugh Jackman",
"Jake Gyllenhaal",
"Viola Davis"
]
}
]
下面是推送到dynamodb的代码.我创建了testjsonbucket
存储桶名称,moviedataten.json
是文件名并保存在json上方.
主排序键作为标题(字符串).
Below is the code to push to dynamodb. I have created testjsonbucket
bucket name, moviedataten.json
is the filename and saved above json.Create a dynamodb with Primary partition key as year (Number) and
Primary sort key as title (String).
import json
from decimal import Decimal
import json
import boto3
s3 = boto3.resource('s3')
obj = s3.Object('testjsonbucket', 'moviedataten.json')
body = obj.json
#def lambda_handler(event,context):
# print (body)
def load_movies(movies, dynamodb=None):
if not dynamodb:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Movies')
for movie in movies:
year = int(movie['year'])
title = movie['title']
print("Adding movie:", year, title)
table.put_item(Item=movie)
def lambda_handler(event, context):
movie_list = json.loads(body, parse_float=Decimal)
load_movies(movie_list)
- 我想从dynamodb推送到ElasticSearch.
- 我已经创建了一个弹性域
https://xx.x.x.com/testelas
- 我已经通过链接
- I want to push in to ElasticSearch from dynamodb.
- I have created a Elastic Domain
https://xx.x.x.com/testelas
- I have gone through the link https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/
- I clicked Managestream also
我的要求:
Dynamodb的任何变化都必须反映在Elasticsearch中?
Any change in Dynamodb has to reflect in the Elasticsearch?
此lambda只是将文档写入DynamoDb,因此,我不建议在此lambda中添加代码以将同一对象推送到Elastic search,因为lambda函数应执行一项任务,并将同一文档推送至ELK应作为
This lambda just writing the document to DynamoDb, and I will not recommend adding the code in this lambda to push the same object to Elastic search, as lambda function should perform a single task and pushing the same document to ELK should be managed as a DynamoDB stream.
- 如果ELK关闭或不可用怎么办,您将如何在lambda中进行管理?
- 如果您以后想禁用此功能怎么办?您将需要修改lambda而不是通过AWS API或AWS控制台进行控制,您只需要在不需要对上述lambda边代码进行任何更改的情况下禁用流即可.
- 如果您只想将修改或TTL项目移动到弹性搜索该怎么办?
因此,创建Dyanodb Stream,将文档推送到另一个负责将文档推送到ELK的Lambda,使用此选项,您还可以同时推送新旧项目.
So create Dyanodb Stream that pushes the document to another Lambda that is responsible to push the document to ELK, with this option you can also push old and new both items.
You can look into this article too that describe another approach data-streaming-from-dynamodb-to-elasticsearch
对于上述方法,请查看此GitHub项目 dynamodb-stream-elasticsearch .
For above approach look into this GitHub project dynamodb-stream-elasticsearch.
const { pushStream } = require('dynamodb-stream-elasticsearch');
const { ES_ENDPOINT, INDEX, TYPE } = process.env;
function myHandler(event, context, callback) {
console.log('Received event:', JSON.stringify(event, null, 2));
pushStream({ event, endpoint: ES_ENDPOINT, index: INDEX, type: TYPE })
.then(() => {
callback(null, `Successfully processed ${event.Records.length} records.`);
})
.catch((e) => {
callback(`Error ${e}`, null);
});
}
exports.handler = myHandler;