带有Python的Apache Beam中的会话窗口

问题描述:

我有一系列的用户事件.我已经将它们映射到KV {userId,event},并分配了时间戳.

I have a stream of user events. I've mapped them into KV{ userId, event }, and assigned timestamps.

这将以流模式运行.我希望能够创建以下输入输出结果:

This is to run in streaming mode. I would like to have be able to create the following input-output result:

会话窗口间隔= 1

  • 输入:user=1, timestamp=1, event=a
  • 输入:user=2, timestamp=2, event=a
  • 输入:user=2, timestamp=3, event=a
  • 输入:user=1, timestamp=2, event=b
  • 时间:lwm=3
  • 输出:user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • 时间:lwm=4
  • 输出:user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]
  • input: user=1, timestamp=1, event=a
  • input: user=2, timestamp=2, event=a
  • input: user=2, timestamp=3, event=a
  • input: user=1, timestamp=2, event=b
  • time: lwm=3
  • output: user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • time: lwm=4
  • output: user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]

这样我就可以编写函数来减少用户在会话窗口中的事件列表以及会话窗口的开始和结束时间.

So that I can write my function to reduce thee list of events in the session window for the user as well as the start and end time of the session window.

我该怎么写? (如果回答,请查看示例",这不是一个有效的答案,因为它们从不将事件列表以窗口作为参数输入到reducer中)

How do I write this? (If you answer; "look at the examples", it's not a valid answer, because they never feed the list of events into the reducer with the window as a parameter)

如果我理解正确,这将是此

If I understand this correctly, this would be a follow-up to this question and naturally accomplished by adding the Group By Key step as I propose in my solution there.

因此,请参阅我以前的解释,如果我们有这样的管道,则仅关注更改:

So, referring to my previous explanation and focusing here on the changes only, if we have a pipeline like this:

events = (p
  | 'Create Events' >> beam.Create(user1_data + user2_data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
  | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                             timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
  | 'Group' >> beam.GroupByKey() \
  | 'analyze_session'         >> beam.ParDo(AnalyzeSession()))

现在,元素按照您在问题描述中的描述进行排列,因此我们只需将它们记录在AnalyzeSession中:

Now the elements are arranged as you describe in the question description so we can simply log them in AnalyzeSession:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info(element)
    yield element

获得所需的结果:

INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])

如果要避免冗余信息,例如将user_idtimestamp作为值的一部分,则可以在Map步骤中将其删除. 根据完整的用例(即减少每个会话级别的汇总事件),我们可以执行类似以下操作的事情:计算事件数或会话持续时间:

If you want to avoid redundant information such as having the user_id and timestamp as part of the values they can be removed in the Map step. As per the complete use case (i.e. reducing the aggregated events on a per-session level) we can do stuff like counting the number of events or session duration with something like this:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    user = element[0]
    num_events = str(len(element[1]))
    window_end = window.end.to_utc_datetime()
    window_start = window.start.to_utc_datetime()
    session_duration = window_end - window_start

    logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)

    yield element

对于我的示例,将输出以下内容:

which, for my example, will output the following:

INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session

完整代码此处