這篇文章主要講解了“storm怎么構(gòu)建拓?fù)浯a”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“storm怎么構(gòu)建拓?fù)浯a”吧!
專注于為中小企業(yè)提供網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)平武免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了成百上千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。



1. 構(gòu)建拓?fù)浯a
package demo;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class AreaAmtTopo {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);
builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");
builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));
builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");
}
}2.一級(jí)過(guò)濾bolt
package demo;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//一級(jí)的過(guò)濾bolt
public class AreaFilterBolt implements IBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每個(gè)value的對(duì)應(yīng)name
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//order_id,order_amt,create_time,area_id
String order=input.getString(0);//取出集合values中的第一個(gè)value
if(order!=null){
String orderArr[]=order.split("\\t");
collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
}
}
@Override
public void prepare(Map arg0, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}3.局部匯總bolt(按日期和區(qū)域和匯總)
package demo;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//局部匯總
public class AreaAmtBolt implements IBasicBolt {
Map<String,Double> countsMap=null;
@Override
public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date_area","amt"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
// TODO Auto-generated method stub
countsMap =new HashMap<String, Double>();
}
@Override
public void execute(Tuple input,
BasicOutputCollector collector) {
if(input!=null)//如果spout端沒數(shù)據(jù)就會(huì)發(fā)空值,所以要做判斷再往下發(fā)
{
String area_id=input.getString(0);
Double order_amt=input.getDouble(1);
String order_date=input.getStringByField("order_date");
Double count=countsMap.get(area_id+"_"+order_date);
if (count==null){
count = 0.0;
}
count+=order_amt;
countsMap.put(area_id+"_"+order_date,count);
System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
collector.emit(new Values(area_id+"_"+order_date,count));
}
}
@Override
public void cleanup() {
countsMap.clear();
}
}4. 最終結(jié)果寫入Hbase
package demo;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//結(jié)果定時(shí)寫入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {
Map<String,Double> countsMap=null;
long beginTime=System.currentTimeMillis();
long endTime=0L;
HBaseDao dao=null;
@Override
public void declareOutputFields(
OutputFieldsDeclarer paramOutputFieldsDeclarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
countsMap =new HashMap<String, Double>();
dao=new HBaseDAOImp();
}
@Override
public void execute(Tuple input,
BasicOutputCollector paramBasicOutputCollector) {
String date_areaid=input.getString(0);
double order_amt=input.getDouble(1);
countsMap.put(date_areaid,order_amt);
endTime=System.currentTimeMillis();
if (endTime-beginTime>=5*1000){
for(String key:countsMap.keySet()){
//put into hbase
//2014-05-05_1,amt
dao.insert("area_order","cf","order_amt",countsMap.get(key));
System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
}
beginTime=System.currentTimeMillis();
}
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
}5. DateFmt代碼
package demo;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DateFmt {
public static final String date_long="yyyy-MM-dd HH:mm:ss";
public static final String date_short="yyyy-MM-dd";
public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
public static String getCountDate(String date,String patton){
SimpleDateFormat sdf=new SimpleDateFormat(patton);
Calendar cal =Calendar.getInstance();
if (date!=null){
try {
cal.setTime(sdf.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
}
return sdf.format(cal.getTime());
}
public static Date parseDate(String dateStr) throws Exception{
return sdf.parse(dateStr);
}
public static void main(String[] args) {
System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
}
}感謝各位的閱讀,以上就是“storm怎么構(gòu)建拓?fù)浯a”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)storm怎么構(gòu)建拓?fù)浯a這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
名稱欄目:storm怎么構(gòu)建拓?fù)浯a
網(wǎng)站路徑:http://www.chinadenli.net/article12/peecgc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷推廣、企業(yè)建站、ChatGPT、搜索引擎優(yōu)化、網(wǎng)站收錄、網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)