欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flink中怎么自定義Redis的Sink函數(shù)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Flink中怎么自定義redis的Sink函數(shù),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)公司專注于企業(yè)全網(wǎng)整合營銷推廣、網(wǎng)站重做改版、肥城網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計商城建設(shè)、集團公司官網(wǎng)建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為肥城等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

1.添加redis對應(yīng)pom依賴

 <dependency>    <groupId>org.apache.bahir</groupId>    <artifactId>flink-connector-redis_2.11</artifactId>    <version>1.0</version></dependency>

2.主函數(shù)代碼:

package com.hadoop.ljs.flink110.redis;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import scala.Tuple2;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-05-02 10:30 * @version: v1.0 * @description: com.hadoop.ljs.flink110.redis */public class RedisSinkMain {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
       DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filter = source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if (null == value || value.split(",").length != 2) {                    return false;                }                return true;            }        });        DataStream<Tuple2<String, String>> keyValue = filter.map(new MapFunction<String, Tuple2<String, String>>() {            @Override            public Tuple2<String, String> map(String value) throws Exception {
               String[] split = value.split(",");
               return new Tuple2<>(split[0], split[1]);            }        });        //創(chuàng)建redis的配置 單機redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig        FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
       keyValue.addSink(new RedisSink<Tuple2<String, String>>(redisConf, new RedisMapper<Tuple2<String, String>>() {            @Override            public RedisCommandDescription getCommandDescription() {                return new RedisCommandDescription(RedisCommand.HSET,"table1");            }            @Override            public String getKeyFromData(Tuple2<String, String> data) {                return data._1;            }            @Override            public String getValueFromData(Tuple2<String, String> data) {                return data._2;            }        }));        /*啟動執(zhí)行*/        senv.execute();    }}

3.函數(shù)測試

1).window端scoket發(fā)送數(shù)據(jù)

Flink中怎么自定義Redis的Sink函數(shù)

2.redis結(jié)果驗證

Flink中怎么自定義Redis的Sink函數(shù)

上述就是小編為大家分享的Flink中怎么自定義Redis的Sink函數(shù)了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

本文名稱:Flink中怎么自定義Redis的Sink函數(shù)
標題鏈接:http://www.chinadenli.net/article20/pejojo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站移動網(wǎng)站建設(shè)網(wǎng)站設(shè)計公司網(wǎng)站改版外貿(mào)網(wǎng)站建設(shè)建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)