System architecture diagram
Start process activity diagram
MasterServer
MasterServer adopts a distributed and decentralized design concept. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer at the same time. When the MasterServer service starts, register a temporary node with ZooKeeper, and perform fault tolerance by monitoring changes in the temporary node of ZooKeeper. MasterServer provides monitoring services based on netty.
Distributed Quartz distributed scheduling component, which is mainly responsible for the start and stop operations of schedule tasks. When Quartz starts the task, there will be a thread pool inside the Master responsible for the follow-up operation of the processing task.
MasterSchedulerThread is a scanning thread that regularly scans the command table in the database and runs different business operations according to different command types.
MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing to different command types.
MasterTaskExecThread is mainly responsible for the persistence to tasks.
WorkerServer
WorkerServer also adopts a distributed and decentralized design concept. WorkerServer is mainly responsible for task execution and providing log services.
When the WorkerServer service starts, register a temporary node with ZooKeeper and maintain a heartbeat. Server provides monitoring services based on netty.
ZooKeeper
ZooKeeper service, MasterServer and WorkerServer nodes in the system all use ZooKeeper for cluster management and fault tolerance. In addition, the system implements event monitoring and distributed locks based on ZooKeeper.
We have also implemented queues based on Redis, but we hope DolphinScheduler depends on as few components as possible, so we finally removed the Redis implementation.
Task Queue
Provide task queue operation, the current queue is also implement base on ZooKeeper. Due to little information stored in the queue, there is no need to worry about excessive data in the queue. In fact, we have tested the millions of data storage in queues, which has no impact on system stability and performance.
Alert
Provide alarm related interface, the interface mainly includes alarm two types of alarm data storage, query and notification functions. Among them, there are email notification and SNMP (not yet implemented).
API
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to external. Interfaces include workflow creation, definition, query, modification, release, logoff, manual start, stop, pause, resume, start execution from specific node, etc.
UI
The front-end page of the system provides various visual operation interfaces of the system, see more at Introduction to Functions section.
The centralized design concept is relatively simple. The nodes in the distributed cluster are roughly divided into two roles according to responsibilities:
Problems in centralized thought design:
DolphinScheduler uses ZooKeeper distributed lock to implement only one Master executes Scheduler at the same time, or only one Worker executes the submission of tasks.
It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three solutions to reduce this risk:
Note: The Master Scheduler thread executes by FIFO when acquiring the Command.
So we choose the third way to solve the problem of insufficient threads.
Fault tolerance divides into service downtime fault tolerance and task retry, and service downtime fault tolerance divides into master fault tolerance and worker fault tolerance.
The service fault-tolerance design relies on ZooKeeper's Watcher mechanism, and the implementation principle shows in the figure:
Fault tolerance range: From the perspective of host, the fault tolerance range of Master includes: own host and node host that does not exist in the registry, and the entire process of fault tolerance will be locked;
Fault-tolerant content: Master's fault-tolerant content includes: fault-tolerant process instances and task instances. Before fault-tolerant, compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;
Fault-tolerant post-processing: After the fault tolerance of ZooKeeper Master completed, then re-schedule by the Scheduler thread in DolphinScheduler, traverses the DAG to find the "running" and "submit successful" tasks. Monitor the status of its task instances for the "running" tasks, and for the "commits successful" tasks, it is necessary to find out whether the task queue already exists. If exists, monitor the status of the task instance. Otherwise, resubmit the task instance.
Fault tolerance range: From the perspective of process instance, each Master is only responsible for fault tolerance of its own process instance; it will lock only when handleDeadServer
;
Fault-tolerant content: When sending the remove event of the Worker node, the Master only fault-tolerant task instances. Before fault-tolerant, compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;
Fault-tolerant post-processing: Once the Master Scheduler thread finds that the task instance is in the "fault-tolerant" state, it takes over the task and resubmits it.
Note: Due to "network jitter", the node may lose heartbeat with ZooKeeper in a short period of time, and the node's remove event may occur. For this situation, we use the simplest way, that is, once the node and ZooKeeper timeout connection occurs, then directly stop the Master or Worker service.
Here we must first distinguish the concepts of task failure retry, process failure recovery, and process failure re-run:
Next to the main point, we divide the task nodes in the workflow into two types.
One is a business node, which corresponds to an actual script or process command, such as shell node, MR node, Spark node, and dependent node.
Another is a logical node, which does not operate actual script or process command, but only logical processing to the entire process flow, such as sub-process sections.
Each business node can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the retry times. Logical node failure retry is not supported, but the tasks in the logical node support.
If there is a task failure in the workflow that reaches the maximum retry times, the workflow will fail and stop, and the failed workflow can be manually re-run or process recovery operations.
In the early schedule design, if there is no priority design and use the fair scheduling, the task submitted first may complete at the same time with the task submitted later, thus invalid the priority of process or task. So we have re-designed this, and the following is our current design:
The specific implementation is to parse the priority according to the JSON of the task instance, and then save the process instance priority_process instance id_task priority_task id information to the ZooKeeper task queue. When obtain from the task queue, we can get the highest priority task by comparing string.
- The priority of the process definition is to consider that some processes need to process before other processes. Configure the priority when the process starts or schedules. There are 5 levels in total, which are HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. As shown below
<p align="center">
<img src="https://user-images.githubusercontent.com/10797147/146744784-eb351b14-c94a-4ed6-8ba4-5132c2a3d116.png" alt="Process priority configuration" width="40%" />
</p>
Since Web (UI) and Worker are not always on the same machine, to view the log cannot be like querying a local file. There are two options:
Put logs on the ES search engine.
Obtain remote log information through netty communication.
In consideration of the lightness of DolphinScheduler as much as possible, so choose gRPC to achieve remote access to log information.
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
Generate logs in the form of /process definition id /process instance id /task instance id.log
Filter to match the thread name starting with TaskLogInfo:
The following shows the TaskLogFilter implementation:
/**
* task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
From the perspective of scheduling, this article preliminarily introduces the architecture principles and implementation ideas of the big data distributed workflow scheduling system: DolphinScheduler. To be continued.