阻塞隊(duì)列本質(zhì)上還是一個(gè)隊(duì)列,但是在隊(duì)列的基礎(chǔ)上加入了阻塞功能,并且線程安全。
那么它的阻塞功能體現(xiàn)在兩方面
1.當(dāng)隊(duì)列為空時(shí),進(jìn)行出隊(duì)列操作,就進(jìn)入阻塞狀態(tài)
2.當(dāng)隊(duì)列滿了時(shí),進(jìn)行入隊(duì)列操作,也進(jìn)入阻塞狀態(tài)。
1.2消息隊(duì)列消息隊(duì)列是基于阻塞隊(duì)列的基礎(chǔ)上增加了“消息的類型”,并按照指定類型進(jìn)行先進(jìn)先出(類似于優(yōu)先級(jí)隊(duì)列)
1.3生產(chǎn)者消費(fèi)者模型生產(chǎn)者消費(fèi)者模型也是基于阻塞隊(duì)列去完成的。
生產(chǎn)者消費(fèi)者模型的第一個(gè)好處:可以實(shí)現(xiàn)發(fā)送方和接收方的解耦,也就是降低了發(fā)送方和接收方的耦合程度
舉一個(gè)例子:
在開發(fā)中,經(jīng)常要進(jìn)行服務(wù)器之間的交流,如下
上圖中,客戶端調(diào)用了服務(wù)器A,服務(wù)器A需要去調(diào)用服務(wù)器B去完成一些任務(wù),此時(shí)服務(wù)器A就務(wù)必要清楚服務(wù)器B的存在,那么服務(wù)器A中的代碼就要和服務(wù)器B有關(guān)聯(lián)。
此時(shí)如果再插入一個(gè)服務(wù)器C,也需要A去傳給它任務(wù),那么此時(shí)A的代碼就要去修改、添加服務(wù)器C的元素,這樣的操作會(huì)增加很多不必要的開發(fā)負(fù)擔(dān)。
而生產(chǎn)者消費(fèi)者模型就是將服務(wù)器與B、C之間的溝通變成了阻塞隊(duì)列,A將任務(wù)放入相對(duì)應(yīng)的阻塞隊(duì)列中去,B、C各自去獲取各自阻塞隊(duì)列中的任務(wù),這樣一來(lái),A中的代碼就不會(huì)因?yàn)槠渌?wù)器的原因而去修改自己本身
生產(chǎn)者消費(fèi)者模型的第二個(gè)好處:可以避免“削峰填谷”,保證系統(tǒng)的穩(wěn)定性
這個(gè)好處也是和阻塞隊(duì)列有關(guān)的。
前面提到了:阻塞隊(duì)列的特點(diǎn)是 空了阻塞,滿了阻塞
加入某個(gè)時(shí)刻用戶發(fā)來(lái)了大量的請(qǐng)求,如果不加以控制會(huì)容易讓服務(wù)器崩潰
而在生產(chǎn)者消費(fèi)者模型里面?zhèn)鬟f請(qǐng)求要先進(jìn)入阻塞隊(duì)列,如果請(qǐng)求太多導(dǎo)致隊(duì)列滿了,那么其余的請(qǐng)求就會(huì)進(jìn)入阻塞狀態(tài),當(dāng)服務(wù)器在隊(duì)列中去獲取了新任務(wù)隊(duì)列中有空位后,新的請(qǐng)求才能進(jìn)入隊(duì)列。
1.4使用阻塞隊(duì)列阻塞隊(duì)列主要的功能有兩個(gè),一個(gè)是入隊(duì)(put),一個(gè)是出隊(duì)(take)
使用方法很簡(jiǎn)單,如下
public static void main(String[] args) throws InterruptedException {
BlockingQueueblockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("hello");
String res = blockingQueue.take();
System.out.println(res);
}
1.5實(shí)現(xiàn)阻塞隊(duì)列知道了阻塞隊(duì)列的特性,那么實(shí)現(xiàn)起來(lái)也很方便,首先要構(gòu)造出一個(gè)隊(duì)列,然后再對(duì)立面加入構(gòu)成阻塞的元素
構(gòu)造普通隊(duì)列:
class MyBlockingQueue {
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
public Integer take() throws InterruptedException {
int result = 0;
if(size == 0) {
? ? ? ? ? ? return null;
? ? ? ? }
result = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
return result;
}
public void put(int value) throws InterruptedException {
? ? ? ? if(size >= items.length) {
? ? ? ? ? ? return;
? ? ? ? }
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
}
}
大致思路是創(chuàng)建兩個(gè)指針,分別指向隊(duì)頭和隊(duì)尾,入隊(duì)讓隊(duì)尾++,出隊(duì)讓隊(duì)頭++,當(dāng)隊(duì)頭指針或者隊(duì)尾指針達(dá)到數(shù)組的長(zhǎng)度時(shí),讓其賦值到0,形成一個(gè)循環(huán)數(shù)組。
然后再隊(duì)列中加入阻塞元素
當(dāng)take時(shí)發(fā)現(xiàn)隊(duì)列為空,則讓其進(jìn)入等待狀態(tài),解除的條件是:執(zhí)行完一次put(因?yàn)閜ut會(huì)入隊(duì)一個(gè)新元素)
當(dāng)put時(shí)發(fā)現(xiàn)隊(duì)列滿了,則讓其進(jìn)入等待狀態(tài),解除的條件是:執(zhí)行完一次take(因?yàn)閠ake會(huì)出隊(duì)一個(gè)元素)
修改后如下(也是完整代碼):
class MyBlockingQueue {
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
public Integer take() throws InterruptedException {
int result = 0;
synchronized(this) {
while (size == 0) {
//隊(duì)列為空 進(jìn)入阻塞
this.wait();
}
result = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
//喚醒put中的阻塞
this.notify();
}
return result;
}
public void put(int value) throws InterruptedException {
synchronized(this) {
while (size == items.length) {
//隊(duì)列滿了,進(jìn)入阻塞
this.wait();
}
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
//喚醒take中的阻塞
this.notify();
}
}
}
2.定時(shí)器
2.1定時(shí)器的工作原理定時(shí)器會(huì)將傳入的任務(wù)和與其相對(duì)應(yīng)的等待時(shí)間存放到帶有阻塞的優(yōu)先級(jí)隊(duì)列中,當(dāng)時(shí)間間隔最短的任務(wù)到了執(zhí)行時(shí)間,就對(duì)其進(jìn)行出隊(duì)操作,并執(zhí)行任務(wù)里面的內(nèi)容。
2.2 定時(shí)器的使用調(diào)用定時(shí)器的schedule方法,在里面?zhèn)魅胍粋€(gè)任務(wù)和時(shí)間,如下:
public static void main(String[] args) {
System.out.println("程序啟動(dòng)");
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執(zhí)行定時(shí)器任務(wù)1");
}
},3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執(zhí)行定時(shí)器任務(wù)2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執(zhí)行定時(shí)器任務(wù)3");
}
},1000);
}
上面的打印順序是3 ,2 ,1
2.3 實(shí)現(xiàn)定時(shí)器定時(shí)器中有一個(gè)優(yōu)先級(jí)隊(duì)列,里面要存放任務(wù)和與其對(duì)應(yīng)的時(shí)間,所以我們先創(chuàng)建一個(gè)任務(wù)類將任務(wù)和時(shí)間包裝在一起,并且這個(gè)類要帶有比較功能(可以實(shí)現(xiàn)Comparable 接口)
class MyTask implements Comparable{
private Runnable runnable;
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
//獲取任務(wù)的時(shí)間
public long getTime() {
return time;
}
//執(zhí)行任務(wù)
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int)(this.time - o.time);
}
}
在定時(shí)器中要有一個(gè)掃描線程去時(shí)刻監(jiān)視隊(duì)列中任務(wù)的情況(查看是否到了執(zhí)行時(shí)間),我們選擇在構(gòu)造方法中去設(shè)計(jì)和運(yùn)行掃描線程。
這個(gè)線程的任務(wù)很簡(jiǎn)單,就是不斷將隊(duì)首元素(時(shí)間最短的元素)取出,判斷是否到達(dá)時(shí)間,然候根據(jù)情況選擇是將任務(wù)放回還是執(zhí)行,這個(gè)需要一個(gè)循環(huán)去不聽的執(zhí)行。
但是如果任務(wù)的執(zhí)行時(shí)間和當(dāng)前時(shí)間相差很多,不斷的取出、判斷、放回 會(huì)額外占用很多cpu資源,所以我們對(duì)其考慮進(jìn)行一個(gè)睡眠操作,睡眠的時(shí)間就是 -- 任務(wù)要執(zhí)行的時(shí)間減去當(dāng)前的時(shí)間,但是,如果有新的任務(wù)傳了進(jìn)來(lái),并且時(shí)間比當(dāng)前的最短時(shí)間要短就可能出現(xiàn)新任務(wù)沒(méi)有被執(zhí)行的情況,所以,上面的睡眠操作是不可取的,而是應(yīng)該使用wait,在wait中設(shè)置大的等待時(shí)間,當(dāng)有新的任務(wù)傳進(jìn)來(lái)時(shí)將wait喚醒,然后重新判斷,具體實(shí)現(xiàn)如下:
class MyTimer {
//掃描線程
private Thread t = null;
private PriorityBlockingQueuequeue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() ->{
while(true) {
try {
synchronized(this) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if(curTime< myTask.getTime()) {
queue.put(myTask);
this.wait(myTask.getTime() - curTime);
} else {
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
public void schedule(Runnable runnable,long after) {
//時(shí)間戳需要進(jìn)行換算
MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
queue.put(myTask);
synchronized(this) {
this.notify();
}
}
}
這里要注意,鎖一定要包括掃描線程while里面的全部?jī)?nèi)容,因?yàn)槿绻麙呙杈€程計(jì)算完需要等待的時(shí)間之后wait之前,掃描線程被切走,此時(shí)有一個(gè)新任務(wù)傳了進(jìn)來(lái),執(zhí)行了notify之后掃描線程才開始進(jìn)行工作,那么掃描線程就沒(méi)有掃描到新的任務(wù),如果新的任務(wù)的時(shí)間更短,那么新的任務(wù)就沒(méi)有被執(zhí)行。
完整代碼如下:
class MyTask implements Comparable{
private Runnable runnable;
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
//獲取任務(wù)的時(shí)間
public long getTime() {
return time;
}
//執(zhí)行任務(wù)
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int)(this.time - o.time);
}
}
class MyTimer {
//掃描線程
private Thread t = null;
private PriorityBlockingQueuequeue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() ->{
while(true) {
try {
synchronized(this) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if(curTime< myTask.getTime()) {
queue.put(myTask);
this.wait(myTask.getTime() - curTime);
} else {
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
public void schedule(Runnable runnable,long after) {
//時(shí)間戳需要進(jìn)行換算
MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
queue.put(myTask);
synchronized(this) {
this.notify();
}
}
}
3.線程池
3.1 線程池存在的意義(優(yōu)點(diǎn))我們知道線程是系統(tǒng)調(diào)度的最小單位,線程的存在是因?yàn)檫M(jìn)程太重了,線程的創(chuàng)建和銷毀都比進(jìn)程更高效,因此,很多時(shí)候可以使用多線程去代替多進(jìn)程來(lái)完成并發(fā)編程。
但是隨著并發(fā)程度的提高和性能要求的提高,我們發(fā)現(xiàn),線程的創(chuàng)建和銷毀好像也沒(méi)有那么輕量,當(dāng)進(jìn)行大量的創(chuàng)建、銷毀線程時(shí)開銷也很大,此時(shí)就引入了線程池。
在池子中有一些創(chuàng)建好的線程,當(dāng)調(diào)用線程時(shí)就從池中去取,用完了在還給線程池,相比于創(chuàng)建和銷毀,調(diào)用和歸還就更加輕量了。
并且,調(diào)用和歸還的操作可以由程序員去自己設(shè)計(jì),而不用全部聽從系統(tǒng)內(nèi)核的調(diào)度,使得程序更加的可控。
3.2 線程池的使用常見的線程池創(chuàng)建有四種:
//可以設(shè)置線程數(shù)量
ExecutorService pool = Executors.newFixedThreadPool(10);
//根據(jù)任務(wù)的數(shù)量 去動(dòng)態(tài)變化線程的數(shù)量
ExecutorService pool = Executors.newCachedThreadPool();
//只有一個(gè)線程
ExecutorService pool = Executors.newSingleThreadExecutor();
//類似定時(shí)器,讓任務(wù)延時(shí)進(jìn)行
ExecutorService pool = Executors.newScheduledThreadPool(10);
使用的方法很簡(jiǎn)單,就是調(diào)用里面的submit方法,在里面?zhèn)饕粋€(gè)任務(wù)就可以了,如下:
創(chuàng)建1000個(gè)任務(wù)讓線程池執(zhí)行
public static void main(String[] args) {
//可以設(shè)置線程數(shù)量
ExecutorService pool = Executors.newFixedThreadPool(10);
for(int i = 0;i< 1000;i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello" + n);
}
});
}
}
3.3 線程池的原理
3.3.1 工廠模式上面的使用我們可以看到,線程池的創(chuàng)建時(shí)沒(méi)有使用new的,而是調(diào)用了一個(gè)方法,而真正的new操作在方法里面進(jìn)行,這樣的設(shè)計(jì)模式叫做工廠模式。
工廠模式作用是什么呢?
在Java中,重載的規(guī)則是在 方法名(可以省略,因?yàn)橹剌d主要就是需要方法名相同)、參數(shù)個(gè)數(shù)、參數(shù)類型 其中至少有一項(xiàng)不同,否則就無(wú)法達(dá)成重載。
那么如果有兩種構(gòu)造方法,他們想要構(gòu)成重載,但是又達(dá)不到重載的條件,此時(shí),就可以使用工廠模式,去根據(jù)不同的需求去構(gòu)造。
舉個(gè)例子,假如有一個(gè)類,它的用途是構(gòu)造出一個(gè)坐標(biāo)系上的點(diǎn),構(gòu)造這樣的點(diǎn)可以傳入x、y坐標(biāo),也可以傳入距原點(diǎn)的半徑長(zhǎng)和角度大小r、a(極坐標(biāo)),這四個(gè)參數(shù)都需要double類型,數(shù)量相同,方法名也相同,無(wú)法達(dá)成重載,此時(shí)就可以使用工廠模式去創(chuàng)建。
如下所示:
Point point1 = newXYPoint(1,1);
Point point2 = newRAPoint(1,1);
使用工廠模式就很好的解決了上面的問(wèn)題。
3.3.2 ThreadPoolExecutor類在使用中我們提到了四種創(chuàng)建方法,這四種創(chuàng)建方法本質(zhì)上都是通過(guò)包裝ThreadPoolExecutor類來(lái)實(shí)現(xiàn)的。
ThreadPoolExecutor的構(gòu)造方法有7個(gè)參數(shù)分別是
(int corePoolSize , int maximumPoolSize , long keepAliveTime , TimeUnit unit , BlockingQueue
int corePoolSize
是核心線程數(shù),也就是線程池中固定的線程數(shù)量
int maximumPoolSize
是大線程數(shù),是線程池中可以包含的大線程數(shù)量
大線程數(shù)和核心線程數(shù)的差值屬于臨時(shí)線程,也就是可以允許被回收掉的線程,比如當(dāng)前任務(wù)量很多,那么就可以多創(chuàng)建幾個(gè)臨時(shí)線程去執(zhí)行任務(wù),當(dāng)任務(wù)量比較少的時(shí)候,這些臨時(shí)線程沒(méi)有事情干就可以被回收掉。
long keepAliveTime 和 TimeUnit unit(時(shí)間單位:s、ms、分鐘...)
這兩個(gè)參數(shù)描述了臨時(shí)線程可以最長(zhǎng)的“摸魚”時(shí)間,如果臨時(shí)線程沒(méi)有工作并且時(shí)間達(dá)到了最長(zhǎng)時(shí)間,那么此時(shí)就會(huì)被回收掉。
BlockingQueue
線程池的任務(wù)隊(duì)列,可以看得到是用一個(gè)阻塞隊(duì)列來(lái) 接收、取出 任務(wù)。
ThreadFactory threadFactory
用于創(chuàng)建線程。
RejectedExecutionHandler handler
它描述了線程池的拒絕策略。
標(biāo)準(zhǔn)庫(kù)中提供了四種線程池的拒絕策略,如下:
第一種:如果隊(duì)列滿了,那么直接報(bào)異常
第二種:如果隊(duì)列滿了,那么多出來(lái)的任務(wù),是誰(shuí)加進(jìn)來(lái)的就由誰(shuí)處理
第三種:如果隊(duì)列滿了,就丟棄最早的任務(wù)
第四種:如果隊(duì)列滿了,就丟棄最新的任務(wù)
3.4 實(shí)現(xiàn)線程池我們實(shí)現(xiàn)一個(gè)線程池的簡(jiǎn)單版本,也就是前面第一種創(chuàng)建
我們知道線程池中有一個(gè)阻塞隊(duì)列去存取任務(wù),有一個(gè)構(gòu)造方法去創(chuàng)建線程,
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
本文題目:多線程案例-創(chuàng)新互聯(lián)
鏈接URL:http://www.chinadenli.net/article20/dcigjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、網(wǎng)站排名、響應(yīng)式網(wǎng)站、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)頁(yè)設(shè)計(jì)公司、全網(wǎng)營(yíng)銷推廣
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)