今天就跟大家聊聊有關(guān)什么是增量索引實現(xiàn)以及投送數(shù)據(jù)到MQ kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

創(chuàng)新互聯(lián)是一家專業(yè)提供海拉爾企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、成都網(wǎng)站設(shè)計、HTML5建站、小程序制作等業(yè)務(wù)。10年已為海拉爾眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進行中。
我們將根據(jù)binlog 的數(shù)據(jù)對象,來實現(xiàn)增量數(shù)據(jù)的處理,我們構(gòu)建廣告的增量數(shù)據(jù),其實說白了就是為了在后期能把廣告投放到索引服務(wù),實現(xiàn)增量數(shù)據(jù)到增量索引的生成。
定義一個投遞增量數(shù)據(jù)的接口(接收參數(shù)為我們上一節(jié)定義的binlog日志的轉(zhuǎn)換對象)
/**
* ISender for 投遞增量數(shù)據(jù) 方法定義接口
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
public interface ISender {
void sender(MySQLRowData rowData);
}創(chuàng)建增量索引監(jiān)聽器
/**
* IncrementListener for 增量數(shù)據(jù)實現(xiàn)監(jiān)聽
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
* @since 2019/6/27
*/
@Slf4j
@Component
public class IncrementListener implements Ilistener {
private final AggregationListener aggregationListener;
@Autowired
public IncrementListener(AggregationListener aggregationListener) {
this.aggregationListener = aggregationListener;
}
//根據(jù)名稱選擇要注入的投遞方式
@Resource(name = "indexSender")
private ISender sender;
/**
* 標(biāo)注為 {@link PostConstruct},
* 即表示在服務(wù)啟動,Bean完成初始化之后,立刻初始化
*/
@Override
@PostConstruct
public void register() {
log.info("IncrementListener register db and table info.");
Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
}
@Override
public void onEvent(BinlogRowData eventData) {
TableTemplate table = eventData.getTableTemplate();
EventType eventType = eventData.getEventType();
//包裝成最后需要投遞的數(shù)據(jù)
MysqlRowData rowData = new MysqlRowData();
rowData.setTableName(table.getTableName());
rowData.setLevel(eventData.getTableTemplate().getLevel());
//將EventType轉(zhuǎn)為OperationTypeEnum
OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
rowData.setOperationTypeEnum(operationType);
//獲取模版中該操作對應(yīng)的字段列表
List<string> fieldList = table.getOpTypeFieldSetMap().get(operationType);
if (null == fieldList) {
log.warn("{} not support for {}.", operationType, table.getTableName());
return;
}
for (Map<string, string> afterMap : eventData.getAfter()) {
Map<string, string> _afterMap = new HashMap<>();
for (Map.Entry<string, string> entry : afterMap.entrySet()) {
String colName = entry.getKey();
String colValue = entry.getValue();
_afterMap.put(colName, colValue);
}
rowData.getFieldValueMap().add(_afterMap);
}
sender.sender(rowData);
}
}首先來配置監(jiān)聽binlog的數(shù)據(jù)庫連接信息
adconf: mysql: host: 127.0.0.1 port: 3306 username: root password: 12345678 binlogName: "" position: -1 # 從當(dāng)前位置開始監(jiān)聽
編寫配置類:
/**
* BinlogConfig for 定義監(jiān)聽Binlog的配置信息
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
private String host;
private Integer port;
private String username;
private String password;
private String binlogName;
private Long position;
}在我們實現(xiàn) 監(jiān)聽binlog那節(jié),我們實現(xiàn)了一個自定義client CustomBinlogClient,需要實現(xiàn)binlog的監(jiān)聽,這個監(jiān)聽的客戶端就必須是一個獨立運行的線程,并且要在程序啟動的時候進行監(jiān)聽,我們來實現(xiàn)運行當(dāng)前client的方式,這里我們會使用到一個新的Runnerorg.springframework.boot.CommandLineRunner,let's code.
@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {
@Autowired
private CustomBinlogClient binlogClient;
@Override
public void run(String... args) throws Exception {
log.info("BinlogRunner is running...");
binlogClient.connect();
}
}在binlog監(jiān)聽的過程中,我們看到針對于int, String 這類數(shù)據(jù)字段,mysql的記錄是沒有問題的,但是針對于時間類型,它被格式化成了字符串類型:Fri Jun 21 15:07:53 CST 2019。
--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}對于這個時間格式,我們需要關(guān)注2點信息:
CST,這個時間格式會比我們的時間+ 8h(中國標(biāo)準(zhǔn)時間 China Standard Time UT+8:00)
需要對這個日期進行解釋處理
當(dāng)然,我們也可以通過設(shè)置mysql的日期格式來改變該行為,在此,我們通過編碼來解析該時間格式:
/**
* Thu Jun 27 08:00:00 CST 2019
*/
public static Date parseBinlogString2Date(String dateString) {
try {
DateFormat dateFormat = new SimpleDateFormat(
"EEE MMM dd HH:mm:ss zzz yyyy",
Locale.US
);
return DateUtils.addHours(dateFormat.parse(dateString), -8);
} catch (ParseException ex) {
log.error("parseString2Date error:{}", dateString);
return null;
}
}因為我們在定義索引的時候,是根據(jù)表之間的層級關(guān)系(Level)來設(shè)定的,根據(jù)代碼規(guī)范,不允許出現(xiàn)Magic Number, 因此我們定義一個數(shù)據(jù)層級枚舉,來表達數(shù)據(jù)層級。
/**
* AdDataLevel for 廣告數(shù)據(jù)層級
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
@Getter
public enum AdDataLevel {
LEVEL2("2", "level 2"),
LEVEL3("3", "level 3"),
LEVEL4("4", "level 4");
private String level;
private String desc;
AdDataLevel(String level, String desc) {
this.level = level;
this.desc = desc;
}
}因為增量數(shù)據(jù)可以投遞到不同的位置以及用途,我們之前實現(xiàn)了一個投遞接口com.sxzhongf.ad.sender.ISender,接下來我們實現(xiàn)一個投遞類:
@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {
/**
* 根據(jù)廣告級別,投遞Binlog數(shù)據(jù)
*/
@Override
public void sender(MysqlRowData rowData) {
if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
Level2RowData(rowData);
} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
Level3RowData(rowData);
} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
Level4RowData(rowData);
} else {
log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
}
}
private void Level2RowData(MysqlRowData rowData) {
if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
List<adplantable> planTables = new ArrayList<>();
for (Map<string, string> fieldValueMap : rowData.getFieldValueMap()) {
AdPlanTable planTable = new AdPlanTable();
//Map的第二種循環(huán)方式
fieldValueMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
planTable.setPlanId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
planTable.setUserId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
planTable.setPlanStatus(Integer.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
break;
}
});
planTables.add(planTable);
}
//投遞推廣計劃
planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
List<adcreativetable> creativeTables = new LinkedList<>();
rowData.getFieldValueMap().forEach(afterMap -> {
AdCreativeTable creativeTable = new AdCreativeTable();
afterMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
creativeTable.setAdId(Long.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
creativeTable.setType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
creativeTable.setMaterialType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
creativeTable.setHeight(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
creativeTable.setWidth(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
creativeTable.setAuditStatus(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
creativeTable.setAdUrl(v);
break;
}
});
creativeTables.add(creativeTable);
});
//投遞廣告創(chuàng)意
creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
}
}
private void Level3RowData(MysqlRowData rowData) {
...
}
/**
* 處理4級廣告
*/
private void Level4RowData(MysqlRowData rowData) {
...
}
}為了我們的數(shù)據(jù)投放更加靈活,方便數(shù)據(jù)統(tǒng)計,分析等系統(tǒng)的需求,我們來實現(xiàn)一個投放到消息中的接口,其他服務(wù)可以訂閱當(dāng)前MQ 的TOPIC來實現(xiàn)數(shù)據(jù)訂閱。
配置文件中配置TOPIC
adconf:
kafka:
topic: ad-search-mysql-data
--------------------------------------
/**
* KafkaSender for 投遞Binlog增量數(shù)據(jù)到kafka消息隊列
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
* @since 2019/7/1
*/
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {
@Value("${adconf.kafka.topic}")
private String topic;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 發(fā)送數(shù)據(jù)到kafka隊列
*/
@Override
public void sender(MysqlRowData rowData) {
kafkaTemplate.send(
topic, JSON.toJSONString(rowData)
);
}
/**
* 測試消費kafka消息
*/
@KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
public void processMysqlRowData(ConsumerRecord<!--?, ?--> record) {
Optional<!--?--> kafkaMsg = Optional.ofNullable(record.value());
if (kafkaMsg.isPresent()) {
Object message = kafkaMsg.get();
MysqlRowData rowData = JSON.parseObject(
message.toString(),
MysqlRowData.class
);
System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
//sender.sender();
}
}
}
```</adcreativetable></string,></adplantable></string,></string,></string,></string>看完上述內(nèi)容,你們對什么是增量索引實現(xiàn)以及投送數(shù)據(jù)到MQ kafka有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
新聞標(biāo)題:什么是增量索引實現(xiàn)以及投送數(shù)據(jù)到MQkafka
文章起源:http://www.chinadenli.net/article0/pesdio.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗、網(wǎng)站收錄、微信小程序、外貿(mào)建站、網(wǎng)頁設(shè)計公司、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)