javazx 发表于 2017-4-17 13:58:22

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

3.4 服务库与事件库
本节介绍服务库和事件库。
3.4.1 服务库
对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
STOPPED( 已停止) 。
❑任何服务状态变化都可以触发另外一些动作。
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、
ApplicationMasterService等。
图3-13 YARN中服务模型的类图
在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
务的统一管理。
3.4.2 事件库
YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将
各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处
理模型可概括为图3-14所示。
图3-14 YARN的事件处理模型
整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器
( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成
( 达到终止条件) 。
在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、
MRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
驱动服务的运行。
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由
中央异步调度器统一管理和调度。
服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
则采用事件联系起来, 系统设计简单且维护方便。
图3-15 事件与事件处理器
3.4.3 YARN服务库和事件库的使用方法
为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
1) 定义Task事件。
public class TaskEvent extends AbstractEvent<TaskEventType> {
private String taskID; //Task ID
public TaskEvent(String taskID, TaskEventType type) {
super(type);
this.taskID = taskID;
} p
ublic String getTaskID() {
return taskID;
} 其
中, Task事件类型定义如下:
public enum TaskEventType {
T_KILL,
T_SCHEDULE
}
2) 定义Job事件。
public class JobEvent extends AbstractEvent<JobEventType> {
private String jobID;
public JobEvent(String jobID, JobEventType type) {
super(type);
this.jobID = jobID;
} p
ublic String getJobId() {
return jobID;
}
}
其中, Job事件类型定义如下:
public enum JobEventType {
JOB_KILL,
JOB_INIT,
JOB_START
}
3) 事件调度器。
接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
@SuppressWarnings("unchecked")
public class SimpleMRAppMaster extends CompositeService {
private Dispatcher dispatcher; //中央异步调度器
private String jobID;
private int taskNumber; //该作业包含的任务数目
private String[] taskIDs; //该作业内部包含的所有任务
public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
super(name);
this.jobID = jobID;
this.taskNumber = taskNumber;
taskIDs = new String;
for(int i = 0; i < taskNumber; i++) {
taskIDs = new String(jobID + "_task_" + i);
}
} p
ublic void serviceInit(final Configuration conf) throws Exception {
dispatcher = new AsyncDispatcher();//定义一个中央异步调度器
//分别注册Job和Task事件调度器
dispatcher.register(JobEventType.class, new JobEventDispatcher());
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
addService((Service) dispatcher);
super.serviceInit(conf);
} p
ublic Dispatcher getDispatcher() {
return dispatcher;
} p
rivate class JobEventDispatcher implements EventHandler<JobEvent> {
@Override
public void handle(JobEvent event) {
if(event.getType() == JobEventType.JOB_KILL) {
System.out.println("Receive JOB_KILL event, killing all the tasks");
for(int i = 0; i < taskNumber; i++) {
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
TaskEventType.T_KILL));
}
} else if(event.getType() == JobEventType.JOB_INIT) {
System.out.println("Receive JOB_INIT event, scheduling tasks");
for(int i = 0; i < taskNumber; i++) {
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
TaskEventType.T_SCHEDULE));
}
}
}
}p
rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
if(event.getType() == TaskEventType.T_KILL) {
System.out.println("Receive T_KILL event of task " + event.getTaskID());
} else if(event.getType() == TaskEventType.T_SCHEDULE) {
System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
}
}
}
}
4) 测试程序。
@SuppressWarnings("unchecked")
public class SimpleMRAppMasterTest {
public static void main(String[] args) throws Exception {
String jobID = "job_20131215_12";
SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
YarnConfiguration conf = new YarnConfiguration(new Configuration());
appMaster.serviceInit(conf);
appMaster.serviceStart();
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
JobEventType.JOB_KILL));
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
JobEventType.JOB_INIT));
}
3.4.4 事件驱动带来的变化
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 , 但这种方式不是在大系统中彻底解决
问题之道, 必须引入新的编程模型。
图3-16 基于函数调用的工作流程
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则
是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需
向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。
图3-17 基于事件驱动的工作流程
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。


页: [1]
查看完整版本: 《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】