未加星标

聊聊flink的Sliding Window

字体大小 | |
[系统(windows) 所属分类 系统(windows) | 发布者 店小二05 | 时间 2019 | 作者 红领巾 ] 0人收藏点击收藏

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimewindows.java

@PublicEvolving public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long slide; private final long offset; protected SlidingEventTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "SlidingEventTimeWindows(" + size + ", " + slide + ")"; } public static SlidingEventTimeWindows of(Time size, Time slide) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } } 复制代码 SlidingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0 assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart,然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为true SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值 SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; private final long slide; private SlidingProcessingTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; } public static SlidingProcessingTimeWindows of(Time size, Time slide) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } } 复制代码

与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp

本文系统(windows)相关术语:三级网络技术 计算机三级网络技术 网络技术基础 计算机网络技术

分页:12
转载请注明
本文标题:聊聊flink的Sliding Window
本站链接:https://www.codesec.net/view/628334.html


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 系统(windows) | 评论(0) | 阅读(14)