Before explaining the architecture of the scheduling system, let's first understand the commonly used terms of the scheduling system
System architecture diagram
Start process activity diagram
MasterServer
MasterServer adopts a distributed and centerless design concept. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer at the same time. When the MasterServer service starts, register a temporary node with Zookeeper, and perform fault tolerance by monitoring changes in the temporary node of Zookeeper. MasterServer provides monitoring services based on netty.
MasterSchedulerService is a scanning thread that scans the command table in the database regularly, generates workflow instances, and performs different business operations according to different command types
WorkflowExecuteThread is mainly responsible for DAG task segmentation, task submission, logical processing of various command types, processing task status and workflow status events
EventExecuteService handles all state change events of the workflow instance that the master is responsible for, and uses the thread pool to process the state events of the workflow
StateWheelExecuteThread handles timing state updates of dependent tasks and timeout tasks
WorkerServer
WorkerServer also adopts a distributed centerless design concept, supports custom task plug-ins, and is mainly responsible for task execution and log services.
When the WorkerServer service starts, it registers a temporary node with Zookeeper and maintains a heartbeat.
- **WorkerManagerThread** mainly receives tasks sent by the master through netty, and calls **TaskExecuteThread** corresponding executors according to different task types.
- **RetryReportTaskStatusThread** mainly reports the task status to the master through netty. If the report fails, the report will always be retried.
- **LoggerServer** is a log service that provides log fragment viewing, refreshing and downloading functions
Registry
The registry is implemented as a plug-in, and Zookeeper is supported by default. The MasterServer and WorkerServer nodes in the system use the registry for cluster management and fault tolerance. In addition, the system also performs event monitoring and distributed locks based on the registry.
Alert
Provide alarm-related functions and only support stand-alone service. Support custom alarm plug-ins.
API
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to the outside world. Interfaces include workflow creation, definition, query, modification, release, logoff, manual start, stop, pause, resume, start execution from the node and so on.
UI
The front-end page of the system provides various visual operation interfaces of the system,See more at Introduction to Functions section。
The centralized design concept is relatively simple. The nodes in the distributed cluster are divided into roles according to roles, which are roughly divided into two roles:
Problems in centralized thought design:
In the decentralized design, there is usually no concept of Master/Slave, all roles are the same, the status is equal, the global Internet is a typical decentralized distributed system, any node equipment connected to the network is down, All will only affect a small range of functions.
The core design of decentralized design is that there is no "manager" different from other nodes in the entire distributed system, so there is no single point of failure. However, because there is no "manager" node, each node needs to communicate with other nodes to obtain the necessary machine information, and the unreliability of distributed system communication greatly increases the difficulty of implementing the above functions.
In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are constantly pouring out. Under this architecture, the managers in the cluster are dynamically selected, rather than preset, and when the cluster fails, the nodes of the cluster will automatically hold "meetings" to elect new " managers" To preside over the work. The most typical case is Etcd implemented by ZooKeeper and Go language.
The decentralization of DolphinScheduler is that the Master/Worker is registered in Zookeeper to realize the non-centralization of the Master cluster and the Worker cluster. The sharding mechanism is used to fairly distribute the workflow for execution on the master, and tasks are sent to the workers for execution through different sending strategies. Specific task
DolphinScheduler uses the sharding algorithm to modulate the command and assigns it according to the sort id of the master. The master converts the received command into a workflow instance, and uses the thread pool to process the workflow instance
DolphinScheduler's process of workflow:
It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three solutions to reduce this risk:
note: The Master Scheduler thread is executed by FIFO when acquiring the Command.
So we chose the third way to solve the problem of insufficient threads.
Fault tolerance is divided into service downtime fault tolerance and task retry, and service downtime fault tolerance is divided into master fault tolerance and worker fault tolerance.
The service fault-tolerance design relies on ZooKeeper's Watcher mechanism, and the implementation principle is shown in the figure:
Once the Master Scheduler thread finds that the task instance is in the "fault-tolerant" state, it takes over the task and resubmits it.
Note: Due to "network jitter", the node may lose its heartbeat with ZooKeeper in a short period of time, and the node's remove event may occur. For this situation, we use the simplest way, that is, once the node and ZooKeeper timeout connection occurs, then directly stop the Master or Worker service.
Here we must first distinguish the concepts of task failure retry, process failure recovery, and process failure rerun:
Next to the topic, we divide the task nodes in the workflow into two types.
One is a business node, which corresponds to an actual script or processing statement, such as Shell node, MR node, Spark node, and dependent node.
There is also a logical node, which does not do actual script or statement processing, but only logical processing of the entire process flow, such as sub-process sections.
Each business node can be configured with the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the configured number of retries. Logical node Failure retry is not supported. But the tasks in the logical node support retry.
If there is a task failure in the workflow that reaches the maximum number of retries, the workflow will fail to stop, and the failed workflow can be manually rerun or process recovery operation
In the early scheduling design, if there is no priority design and the fair scheduling design is used, the task submitted first may be completed at the same time as the task submitted later, and the process or task priority cannot be set, so We have redesigned this, and our current design is as follows:
The specific implementation is to parse the priority according to the JSON of the task instance, and then save the process instance priority_process instance id_task priority_task id information in the ZooKeeper task queue, when obtained from the task queue, pass String comparison can get the tasks that need to be executed first
The priority of the process definition is to consider that some processes need to be processed before other processes. This can be configured when the process is started or scheduled to start. There are 5 levels in total, which are HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. As shown below
The priority of the task is also divided into 5 levels, followed by HIGHEST, HIGH, MEDIUM, LOW, LOWEST. As shown below
Since Web (UI) and Worker are not necessarily on the same machine, viewing the log cannot be like querying a local file. There are two options:
Put logs on the ES search engine
Obtain remote log information through netty communication
In consideration of the lightness of DolphinScheduler as much as possible, so I chose gRPC to achieve remote access to log information.
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
Generate logs in the form of /process definition id/process instance id/task instance id.log
- Filter to match the thread name starting with TaskLogInfo:
- TaskLogFilter is implemented as follows:
```java
/**
* task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}