国产99久久精品_欧美日本韩国一区二区_激情小说综合网_欧美一级二级视频_午夜av电影_日本久久精品视频

最新文章專題視頻專題問答1問答10問答100問答1000問答2000關(guān)鍵字專題1關(guān)鍵字專題50關(guān)鍵字專題500關(guān)鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關(guān)鍵字專題關(guān)鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
當(dāng)前位置: 首頁 - 科技 - 知識百科 - 正文

Giraph源碼分析啟動ZooKeeper服務(wù)

來源:懂視網(wǎng) 責(zé)編:小采 時間:2020-11-09 15:12:56
文檔

Giraph源碼分析啟動ZooKeeper服務(wù)

Giraph源碼分析啟動ZooKeeper服務(wù):說明: (1) 實驗環(huán)境. 三臺服務(wù)器:test165、test62、test63。test165同時是JobTracker和TaskTracker. 測試?yán)樱汗倬W(wǎng)自帶的SSSP程序,數(shù)據(jù)是自己模擬生成。 運(yùn)行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.2
推薦度:
導(dǎo)讀Giraph源碼分析啟動ZooKeeper服務(wù):說明: (1) 實驗環(huán)境. 三臺服務(wù)器:test165、test62、test63。test165同時是JobTracker和TaskTracker. 測試?yán)樱汗倬W(wǎng)自帶的SSSP程序,數(shù)據(jù)是自己模擬生成。 運(yùn)行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.2

說明: (1) 實驗環(huán)境. 三臺服務(wù)器:test165、test62、test63。test165同時是JobTracker和TaskTracker. 測試?yán)樱汗倬W(wǎng)自帶的SSSP程序,數(shù)據(jù)是自己模擬生成。 運(yùn)行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o

說明:

(1) 實驗環(huán)境.

三臺服務(wù)器:test165、test62、test63。test165同時是JobTracker和TaskTracker.

測試?yán)樱汗倬W(wǎng)自帶的SSSP程序,數(shù)據(jù)是自己模擬生成。

運(yùn)行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5

(2). 為節(jié)約空間,下文中所有代碼均為核心代碼片段。

(3). core-site.xml中hadoop.tmp.dir的路徑設(shè)為:/home/hadoop/hadooptmp

(4).寫本文是多次調(diào)試完成的,故文中的JobID不一樣,讀者可理解為同一JobID.

(5). 后續(xù)文章也遵循上述規(guī)則。

1. org.apache.giraph.graph.GraphMapper類

Giraph中自定義org.apache.giraph.graph.GraphMapper類來繼承Hadoop中的 org.apache.hadoop.mapreduce.Mapper類,覆寫了setup()、map()、cleanup()和run()方法。GraphMapper類的說明如下:

This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.

BSP的運(yùn)算邏輯被封裝在GraphMapper類中,其擁有一GraphTaskManager對象,用來管理Job的tasks。每個GraphMapper對象都相當(dāng)于BSP中的一個計算節(jié)點(diǎn)(compute node)。

在GraphMapper類中的setup()方法中,創(chuàng)建GraphTaskManager對象并調(diào)用其setup()方法進(jìn)行一些初始化工作。如下:

 @Override
 public void setup(Context context)
 throws IOException, InterruptedException {
 // Execute all Giraph-related role(s) assigned to this compute node.
 // Roles can include "master," "worker," "zookeeper," or . . . ?
 graphTaskManager = new GraphTaskManager(context);
 graphTaskManager.setup(
 DistributedCache.getLocalCacheArchives(context.getConfiguration()));
 }
map()方法為空,因為所有操作都被封裝在了GraphTaskManager類中。在run()方法中調(diào)用GraphTaskManager對象的execute()方法進(jìn)行BSP迭代計算。
@Override
 public void run(Context context) throws IOException, InterruptedException {
 // Notify the master quicker if there is worker failure rather than
 // waiting for ZooKeeper to timeout and delete the ephemeral znodes
 try {
 setup(context);
 while (context.nextKeyValue()) {
 graphTaskManager.execute();
 }
 cleanup(context);
 // Checkstyle exception due to needing to dump ZooKeeper failure
 } catch (RuntimeException e) {
 graphTaskManager.zooKeeperCleanup();
 graphTaskManager.workerFailureCleanup();
 }
 }

2. org.apache.giraph.graph.GraphTaskManager 類

功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.

下面講述setup()方法,代碼如下。

 /**
 * Called by owner of this GraphTaskManager on each compute node
 * @param zkPathList the path to the ZK jars we need to run the job
 */
 public void setup(Path[] zkPathList) throws IOException, InterruptedException {
 context.setStatus("setup: Initializing Zookeeper services.");
 locateZookeeperClasspath(zkPathList);
 serverPortList = conf.getZookeeperList();
 if (serverPortList == null && startZooKeeperManager()) {
 return; // ZK connect/startup failed
 }
 if (zkManager != null && zkManager.runsZooKeeper()) {
 LOG.info("setup: Chosen to run ZooKeeper...");
 }
 context.setStatus("setup: Connected to Zookeeper service " +serverPortList);
 this.graphFunctions = determineGraphFunctions(conf, zkManager);
 instantiateBspService(serverPortList, sessionMsecTimeout);
 }
依次介紹每個方法的功能:

1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路徑為:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于啟動ZooKeeper服務(wù)。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定義如下,

 /**
 * Instantiate and configure ZooKeeperManager for this job. This will
 * result in a Giraph-owned Zookeeper instance, a connection to an
 * existing quorum as specified in the job configuration, or task failure
 * @return true if this task should terminate
 */
 private boolean startZooKeeperManager()
 throws IOException, InterruptedException {
 zkManager = new ZooKeeperManager(context, conf);
 context.setStatus("setup: Setting up Zookeeper manager.");
 zkManager.setup();
 if (zkManager.computationDone()) {
 done = true;
 return true;
 }
 zkManager.onlineZooKeeperServers();
 serverPortList = zkManager.getZooKeeperServerPortString();
 return false;
 }

org.apache.giraph.zk.ZooKeeperManager 類,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager類的setup()定義如下:

/**
 * Create the candidate stamps and decide on the servers to start if
 * you are partition 0.
 */
 public void setup() throws IOException, InterruptedException {
 createCandidateStamp();
 getZooKeeperServerList();
 }
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目錄下為每個task創(chuàng)建一個文件,文件內(nèi)容為空。文件名為本機(jī)的Hostname+taskPartition,如下截圖:

\

運(yùn)行時指定了5個workers(-w 5),再加上一個master,所有上面有6個task。

getZooKeeperServerList()方法中,taskPartition為0的task會調(diào)用createZooKeeperServerList()方法創(chuàng)建ZooKeeper server List,也是創(chuàng)建一個空文件,通過文件名來描述Zookeeper servers。

\

createZooKeeperServerList核心代碼如下:

/**
 * Task 0 will call this to create the ZooKeeper server list. The result is
 * a file that describes the ZooKeeper servers through the filename.
 */
 private void createZooKeeperServerList() throws IOException,
 InterruptedException {
 Map hostnameTaskMap = Maps.newTreeMap();
 while (true) {
 FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
 hostnameTaskMap.clear();
 if (fileStatusArray.length > 0) {
 for (FileStatus fileStatus : fileStatusArray) { 
 String[] hostnameTaskArray =
 fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
 
 if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
 hostnameTaskMap.put(hostnameTaskArray[0],
 new Integer(hostnameTaskArray[1]));
 }
 }
 if (hostnameTaskMap.size() >= serverCount) {
 break;
 }
 Thread.sleep(pollMsecs);
 }
 }
 }
首先獲取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目錄下文件,如果當(dāng)前目錄下有文件,則把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。掃描taskDirectory目錄后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT變量,定義為1),就停止外層的循環(huán)。外層循環(huán)的目的是:因為taskDirectory下的文件每個task文件時多個task在分布式條件下創(chuàng)建的,有可能task 0在此創(chuàng)建server List時,別的task還沒有生成后task文件。Giraph默認(rèn)為每個Job啟動一個ZooKeeper服務(wù),也就是說只有一個task會啟動ZooKeeper服務(wù)。

經(jīng)過多次測試,task 0總是被選為ZooKeeper Server ,因為在同一進(jìn)程中,掃描taskDirectory時,只有它對應(yīng)的task 文件(其他task的文件還沒有生成好),然后退出for循環(huán),發(fā)現(xiàn)hostNameTaskMap的size等于1,直接退出while循環(huán)。那么此處就選了test162 0。

最后,創(chuàng)建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0

\

onlineZooKeeperServers(),根據(jù)zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder來創(chuàng)建ZooKeeper服務(wù)進(jìn)程,然后Task 0 再通過socket連接到ZooKeeper服務(wù)進(jìn)程上,最后創(chuàng)建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 來標(biāo)記master任務(wù)已完成。worker一直在進(jìn)行循環(huán)檢測master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服務(wù)已經(jīng)啟動完成。

啟動ZooKeeper服務(wù)的命令如下:

\

3) determineGraphFunctions()。

GraphTaskManager類中有CentralizedServiceMaster對象和CentralizedServiceWorker 對象,分別對應(yīng)于master和worker。每個BSP compute node扮演的角色判定邏輯如下:

a) If not split master, everyone does the everything and/or running ZooKeeper.

b) If split master/worker, masters also run ZooKeeper

c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.

該判定在GraphTaskManager 類中的靜態(tài)方法determineGraphFunctions()中定義,片段代碼如下:

 private static GraphFunctions determineGraphFunctions(
 ImmutableClassesGiraphConfiguration conf,
 ZooKeeperManager zkManager) {
 // What functions should this mapper do?
 if (!splitMasterWorker) {
 if ((zkManager != null) && zkManager.runsZooKeeper()) {
 functions = GraphFunctions.ALL;
 } else {
 functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
 }
 } else {
 if (zkAlreadyProvided) {
 int masterCount = conf.getZooKeeperServerCount();
 if (taskPartition < masterCount) {
 functions = GraphFunctions.MASTER_ONLY;
 } else {
 functions = GraphFunctions.WORKER_ONLY;
 }
 } else {
 if ((zkManager != null) && zkManager.runsZooKeeper()) {
 functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
 } else {
 functions = GraphFunctions.WORKER_ONLY;
 }
 }
 }
 return functions;
 }

默認(rèn)的,Giraph會區(qū)分master和worker。會在master上面啟動zookeeper服務(wù),不會在worker上啟動ZooKeeper服務(wù)。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

文檔

Giraph源碼分析啟動ZooKeeper服務(wù)

Giraph源碼分析啟動ZooKeeper服務(wù):說明: (1) 實驗環(huán)境. 三臺服務(wù)器:test165、test62、test63。test165同時是JobTracker和TaskTracker. 測試?yán)樱汗倬W(wǎng)自帶的SSSP程序,數(shù)據(jù)是自己模擬生成。 運(yùn)行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.2
推薦度:
標(biāo)簽: 服務(wù) 啟動 分析
  • 熱門焦點(diǎn)

最新推薦

猜你喜歡

熱門推薦

專題
Top
主站蜘蛛池模板: 欧美亚洲另类在线观看 | 亚洲精品98久久久久久中文字幕 | 久久精品2| 成人国内精品久久久久影院 | 国产偷窥在线观看 | 国产精品自拍第一页 | 国产精品日本 | 91欧洲在线视精品在亚洲 | 欧美在线观看网站 | 在线播放一区 | 日韩 国产 欧美 精品 在线 | 亚洲欧美日韩成人 | 国产精品不卡视频 | 中文国产成人精品少久久 | 亚洲日韩图片专区第1页 | 午夜看一级特黄a大片黑 | 国产在线一区视频 | 欧美日韩中字 | 久久se精品一区二区国产 | 国产精品免费在线播放 | 99热只有精品一区二区 | 中文在线第一页 | 免费一区二区 | 国产一区二区精品久久91 | 青青草国产在线视频 | 性插动态图 | 国产成人三级经典中文 | 在线免费黄色网址 | 午夜精品久久久久久91 | 日韩欧美专区 | 日韩 亚洲 制服 欧美 综合 | 亚洲一级毛片 | 美女视频黄全免费的 | 九九久久精品国产 | 欧美 日韩 国产 色 欧美 日韩 中文 | 国产成人免费高清激情明星 | 另类区| 中文字幕欧美在线观看 | 成人欧美一区二区三区视频 | 久久婷婷色一区二区三区 | 国产精品99久久久久久www |