028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

flink1.2版本时间、水位线的介绍和用法

本篇内容主要讲解“flink1.2版本时间、水位线的介绍和用法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink1.2版本时间、水位线的介绍和用法”吧!

创新互联建站是一家企业级云计算解决方案提供商,超15年IDC数据中心运营经验。主营GPU显卡服务器,站群服务器,服务器托管,海外高防服务器,机柜大带宽租用·托管,动态拨号VPS,海外云手机,海外云服务器,海外服务器租用托管等。

水位线

种类

顺序事件中的Watermarks

乱序事件中的Watermarks

并行数据流中的Watermarks

时间概念

Event Time

watermark
// 1、创建flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 设置并行度
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);  //处理模式设定:流或批

// 生成 watermark 的时间间隔(每 n 毫秒),设置周期性的产生水位线的时间间隔。当数据流很大的时候,如果每个事件都产生水位线,会影响性能。
//env.getConfig().setAutoWatermarkInterval(1000); // 自动水印时间间隔 12版本不用设置,有默认

指定Timestamps

SingleOutputStreamOperator> formatData =text.map(new MapFunction>() { 
  // 数据格式转换
  private static final long serialVersionUID = 1L;
  @Override
  public Tuple3 map(String value) throws Exception {
    Tuple3 data = new Tuple3();
    String[] dataTmp = value.split("\\|");

    data.f0 = dataTmp[0];
    data.f1 = dataTmp[1];
    data.f2 = Integer.parseInt(dataTmp[2]);
    return data;
  }
});
SingleOutputStreamOperator> orderDSWithWatemark=formatData
    .assignTimestampsAndWatermarks( // 设置watermark  watemark = 最大事件时间 - 最大延迟或乱序时间
    WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大无序度时间即最大延迟时间/乱序时间
    .withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000)  //时间为毫秒级

);
SingleOutputStreamOperator> result=orderDSWithWatemark.keyBy(one -> one.f1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设定窗口大小
//		.allowedLateness(Time.seconds(1))  //延时处理时间
//		.sideOutputLateData(lateOutputTag)   //侧输出
.reduce(new ReduceFunction>() { // 处理逻辑
    private static final long serialVersionUID = -6695049408336015245L;

    @Override
    public Tuple3 reduce(Tuple3 value1,
        Tuple3 value2) throws Exception {
      Tuple3 data = new Tuple3();
      data.f0 = value2.f0;
      data.f1 = value1.f1;
      data.f2 = value1.f2 + value2.f2;
      System.out.println(data);
      return data;
    }
  });
result.print("滚动事件时间");
env.execute();

总结

到此,相信大家对“flink1.2版本时间、水位线的介绍和用法”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


分享标题:flink1.2版本时间、水位线的介绍和用法
文章链接:http://www.tsicrk.com/article/ggdoeh.html

其他资讯

让你的专属顾问为你服务

1.4784s