欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

flink中的聚合算子是什么

這篇文章主要講解了“flink中的聚合算子是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“flink中的聚合算子是什么”吧!

創(chuàng)新互聯(lián)建站從2013年創(chuàng)立,先為察隅等服務(wù)建站,察隅等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為察隅企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。

前言

flink中的一個(gè)接口org.apache.flink.api.common.functions.AggregateFunction,這個(gè)類(lèi)可以接在window流之后,做窗口內(nèi)的統(tǒng)計(jì)計(jì)算。

注意:除了這個(gè)接口AggregateFunction,flink中還有一個(gè)抽象類(lèi)AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把這個(gè)弄混淆了,接口AggregateFunction我們可以理解為flink中的一個(gè)算子,和MapFunction、FlatMapFunction等是同級(jí)別的,而抽象類(lèi)AggregateFunction是用于用戶自定義聚合函數(shù)的,和max、min之類(lèi)的函數(shù)是同級(jí)的。

 

原理解析

比如我們想實(shí)現(xiàn)一個(gè)類(lèi)似sql的功能:

select TUMBLE_START(proctime,INTERVAL '2' SECOND)  as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
 

這個(gè)sql就是來(lái)統(tǒng)計(jì)一下每?jī)擅腌姷幕瑒?dòng)窗口內(nèi)每個(gè)人出現(xiàn)的次數(shù),今天我們就以這個(gè)簡(jiǎn)單的sql的功能為例講解一下flink的aggregate算子,其實(shí)就是我們用程序來(lái)實(shí)現(xiàn)這個(gè)sql的功能。

首先看一下聚合函數(shù)的接口:


@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
 ACC createAccumulator();
 ACC add(IN value, ACC accumulator);
 ACC merge(ACC a, ACC b);
 OUT getResult(ACC accumulator);
}

 

這個(gè)接口AggregateFunction里面有4個(gè)方法,我們分別來(lái)講解一下。

  1. AggregateFunction這個(gè)類(lèi)是一個(gè)泛型類(lèi),這里面有三個(gè)參數(shù),IN, ACC, OUT。IN就是聚合函數(shù)的輸入類(lèi)型,ACC是存儲(chǔ)中間結(jié)果的類(lèi)型,OUT是聚合函數(shù)的輸出類(lèi)型。
  2. createAccumulator    
    這個(gè)方法首先要?jiǎng)?chuàng)建一個(gè)累加器,要進(jìn)行一些初始化的工作,比如我們要進(jìn)行count計(jì)數(shù)操作,就要給累加器一個(gè)初始值。
  3. add    
    add方法就是我們要做聚合的時(shí)候的核心邏輯,比如我們做count累加,其實(shí)就是來(lái)一個(gè)數(shù),然后就加一。    
    類(lèi)似上面的sql的邏輯,我們?cè)趯?xiě)業(yè)務(wù)邏輯的時(shí)候,可以這么想,進(jìn)入這方法數(shù)的數(shù)據(jù)都是屬于某一個(gè)用戶的,系統(tǒng)在調(diào)用這個(gè)方法之前會(huì)先進(jìn)行hash分組,然后不同的用戶會(huì)重復(fù)調(diào)用這個(gè)方法。所以這個(gè)函數(shù)的入?yún)⑹荌N類(lèi)型,返回值是ACC類(lèi)型
  4. merge    
    因?yàn)閒link是一個(gè)分布式計(jì)算框架,可能計(jì)算是分布在很多節(jié)點(diǎn)上同時(shí)進(jìn)行的,比如上述的add操作,可能同一個(gè)用戶在不同的節(jié)點(diǎn)上分別調(diào)用了add方法在本地節(jié)點(diǎn)對(duì)本地的數(shù)據(jù)進(jìn)行了聚合操作,但是我們要的是整個(gè)結(jié)果,整個(gè)時(shí)候,我們就需要把每個(gè)用戶各個(gè)節(jié)點(diǎn)上的聚合結(jié)果merge一下,整個(gè)merge方法就是做這個(gè)工作的,所以它的入?yún)⒑统鰠⒌念?lèi)型都是中間結(jié)果類(lèi)型ACC。
  5. getResult    
    這個(gè)方法就是將每個(gè)用戶最后聚合的結(jié)果經(jīng)過(guò)處理之后,按照OUT的類(lèi)型返回,返回的結(jié)果也就是聚合函數(shù)的輸出結(jié)果了。
 

實(shí)例講解 

自定義source

首先我們自定義source生成用戶的信息

 public static class MySource implements SourceFunction<Tuple2<String,Long>>{

  private volatile boolean isRunning = true;

  String userids[] = {
    "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
    "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
    "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
    "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
    "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
  };

  @Override
  public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(10);
    String userid = userids[(int) (Math.random() * (userids.length - 1))];
    ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }
   

自定義聚合函數(shù)


 public static class CountAggregate
   implements AggregateFunction<Tuple2<String,Long>,Integer,Integer>{

  @Override
  public Integer createAccumulator(){
   return 0;
  }

  @Override
  public Integer add(Tuple2<String,Long> value, Integer accumulator){
   return ++accumulator;
  }

  @Override
  public Integer getResult(Integer accumulator){
   return accumulator;
  }

  @Override
  public Integer merge(Integer a, Integer b){
   return a + b;
  }
 }
   

自定義結(jié)果輸出函數(shù)


 /**
  * 這個(gè)是為了將聚合結(jié)果輸出
  */
 public static class WindowResult
   implements WindowFunction<Integer,Tuple3<String,Date,Integer>,Tuple,TimeWindow>{

  @Override
  public void apply(
    Tuple key,
    TimeWindow window,
    Iterable<Integer> input,
    Collector<Tuple3<String,Date,Integer>> out) throws Exception{

   String k = ((Tuple1<String>) key).f0;
   long windowStart = window.getStart();
   int result = input.iterator().next();
   out.collect(Tuple3.of(k, new Date(windowStart), result));

  }
 }
   

主流程


 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource());

  dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
            .aggregate(new CountAggregate(), new WindowResult()
            ).print();

  env.execute();

感謝各位的閱讀,以上就是“flink中的聚合算子是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)flink中的聚合算子是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

名稱(chēng)欄目:flink中的聚合算子是什么
網(wǎng)站鏈接:http://www.chinadenli.net/article14/pieode.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站網(wǎng)站維護(hù)微信公眾號(hào)App開(kāi)發(fā)服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

營(yíng)銷(xiāo)型網(wǎng)站建設(shè)