這篇文章主要介紹“Hadoo是怎么將作業(yè)提交給集群的”,在日常操作中,相信很多人在Hadoo是怎么將作業(yè)提交給集群的問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Hadoo是怎么將作業(yè)提交給集群的”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
專注于為中小企業(yè)提供做網(wǎng)站、網(wǎng)站設(shè)計服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)壽縣免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了近1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

通過圖可知主要有三個部分,即: 1) JobClient:作業(yè)客戶端。 2) JobTracker:作業(yè)的跟蹤器。 3) TaskTracker:任務(wù)的跟蹤器。
MapReduce將作業(yè)提交給JobClient,然后JobClient與JobTracker交互,JobTracker再去監(jiān)控與分配TaskTracker,完成具體作業(yè)的處理。
以下分析的是Hadoop2.6.4的源碼。請注意: 源碼與之前Hadoop版本的略有差別,所以有些概念還是與上圖有點差別。
**job.waitForCompletion(true)**
跟蹤waitForCompletion, 注意其中的submit(),如下:
/**
* Submit the job to the cluster and wait for it to finish.
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}參數(shù) verbose ,如果想在控制臺打印當前的任務(wù)執(zhí)行進度,則設(shè)為true
**
** 在submit 方法中會把Job提交給對應(yīng)的Cluster,然后不等待Job執(zhí)行結(jié)束就立刻返回
同時會把Job實例的狀態(tài)設(shè)置為JobState.RUNNING,從而來表示Job正在進行中
然后在Job運行過程中,可以調(diào)用getJobState()來獲取Job的運行狀態(tài)
/**
* Submit the job to the cluster and return immediately.
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}而在任務(wù)提交前,會先通過connect()方法鏈接集群(Cluster):
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}這是一個線程保護方法。這個方法中根據(jù)配置信息初始化了一個Cluster對象,即代表集群
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}而在上段代碼之前,
private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class);
可以看出創(chuàng)建客戶端代理階段使用了java.util.ServiceLoader,包含LocalClientProtocolProvider(本地作業(yè))和YarnClientProtocolProvider(yarn作業(yè))(hadoop有一個Yarn參數(shù)mapreduce.framework.name用來控制你選擇的應(yīng)用框架。在MRv2里,mapreduce.framework.name有兩個值:local和yarn),此處會根據(jù)mapreduce.framework.name的配置創(chuàng)建相應(yīng)的客戶端
mapred-site.xml:
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
submitter.submitJobInternal(Job.this, cluster);
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}通過如下代碼正式提交Job到Y(jié)arn:
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());

到最后,通過RPC的調(diào)用,最終會返回一個JobStatus對象,它的toString方法可以在JobClient端打印運行的相關(guān)日志信息。
if (status != null) {
return status;
}public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("job-id : " + jobid);
buffer.append("uber-mode : " + isUber);
buffer.append("map-progress : " + mapProgress);
buffer.append("reduce-progress : " + reduceProgress);
buffer.append("cleanup-progress : " + cleanupProgress);
buffer.append("setup-progress : " + setupProgress);
buffer.append("runstate : " + runState);
buffer.append("start-time : " + startTime);
buffer.append("user-name : " + user);
buffer.append("priority : " + priority);
buffer.append("scheduling-info : " + schedulingInfo);
buffer.append("num-used-slots" + numUsedSlots);
buffer.append("num-reserved-slots" + numReservedSlots);
buffer.append("used-mem" + usedMem);
buffer.append("reserved-mem" + reservedMem);
buffer.append("needed-mem" + neededMem);
return buffer.toString();
}(到這里任務(wù)都給yarn了,這里就只剩下監(jiān)控(如果設(shè)置為true的話)),即:
if (verbose) {
monitorAndPrintJob();
}這只是完成了作業(yè)Job的提交。
到此,關(guān)于“Hadoo是怎么將作業(yè)提交給集群的”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章名稱:Hadoo是怎么將作業(yè)提交給集群的
轉(zhuǎn)載注明:http://www.chinadenli.net/article38/peshpp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護、網(wǎng)站導(dǎo)航、響應(yīng)式網(wǎng)站、微信公眾號、App開發(fā)、網(wǎng)站內(nèi)鏈
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)