如何使用Apache Flink實現(xiàn)自定義Sink,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

創(chuàng)新互聯(lián)長期為上千多家客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為源城企業(yè)提供專業(yè)的成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè),源城網(wǎng)站改版等技術(shù)服務(wù)。擁有十余年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。
socket發(fā)送過來的數(shù)據(jù),把String類型轉(zhuǎn)成對象,然后把Java對象保存到MySQL數(shù)據(jù)庫中。
創(chuàng)建數(shù)據(jù)庫和表
create database imooc_flink; create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) )
導(dǎo)入mysql依賴:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency>
創(chuàng)建POJO Student
package com.vincent.course05;
public class Student {
private int id;
private String name;
private int age;
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}然后創(chuàng)建連接,SinkToMySQL.java
package com.vincent.course05;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkToMySQL extends RichSinkFunction<Student> {
PreparedStatement ps;
private Connection connection;
/**
* open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into student(id, name, age) values(?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//關(guān)閉連接和釋放資源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Student value, Context context) throws Exception {
//組裝數(shù)據(jù),執(zhí)行插入操作
ps.setInt(1, value.getId());
ps.setString(2, value.getName());
ps.setInt(3, value.getAge());
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
} catch (Exception e) {
e.printStackTrace();
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}main方法:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);
SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) throws Exception {
String[] splits = value.split(",");
Student student = new Student();
student.setId(Integer.parseInt(splits[0]));
student.setName(splits[1]);
student.setAge(Integer.parseInt(splits[2]));
return student;
}
});
studentStream.addSink(new SinkToMySQL());
environment.execute("JavaCustomSinkToMysql");
}從socket中獲取數(shù)據(jù),數(shù)據(jù)格式使用逗號分割,在控制臺中輸入:
nc -lk 9999 1,tom,23
檢查數(shù)據(jù)庫,數(shù)據(jù)庫中多了一條數(shù)據(jù)
mysql> select * from student; +----+------+------+ | id | name | age | +----+------+------+ | 1 | tom | 23 | +----+------+------+ 1 row in set (0.00 sec)
這樣就很方便的使用自定義的sink,寫入到MySQL中去。
總結(jié):
第一步:繼承RichSinkFunction<T> T就是想要寫入的對象類型
第二步:重寫方法 open/close生命周期方法,invoke每條記錄執(zhí)行一次
默認情況下open方法的并行度不是1,跟具體的電腦有關(guān)系。
看完上述內(nèi)容,你們掌握如何使用Apache Flink實現(xiàn)自定義Sink的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
當(dāng)前名稱:如何使用ApacheFlink實現(xiàn)自定義Sink
轉(zhuǎn)載源于:http://www.chinadenli.net/article36/ishdsg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、用戶體驗、移動網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計、網(wǎng)站排名、域名注冊
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)