Before explaining the architecture of the scheduling system, let's first understand the commonly used terms of the scheduling system
DAG: The full name is Directed Acyclic Graph, referred to as DAG. Task tasks in the workflow are assembled in the form of a directed acyclic graph, and topological traversal is performed from nodes with zero degrees of entry until there are no subsequent nodes. Examples are as follows:
dag example
Process definition:Visualization formed by dragging task nodes and establishing task node associationsDAG
Process instance:The process instance is the instantiation of the process definition, which can be generated by manual start or scheduled scheduling. Each time the process definition runs, a process instance is generated
Task instance:The task instance is the instantiation of the task node in the process definition, which identifies the specific task execution status
Task type: Currently supports SHELL, SQL, SUB_PROCESS (sub-process), PROCEDURE, MR, SPARK, PYTHON, DEPENDENT (depends), and plans to support dynamic plug-in expansion, note: 其中子 SUB_PROCESS It is also a separate process definition that can be started and executed separately
Scheduling method: The system supports scheduled scheduling and manual scheduling based on cron expressions. Command type support: start workflow, start execution from current node, resume fault-tolerant workflow, resume pause process, start execution from failed node, complement, timing, rerun, pause, stop, resume waiting thread。Among them Resume fault-tolerant workflow 和 Resume waiting thread The two command types are used by the internal control of scheduling, and cannot be called from the outside
Scheduled:System adopts quartz distributed scheduler, and supports the visual generation of cron expressions
Rely:The system not only supports DAG simple dependencies between the predecessor and successor nodes, but also provides task dependent nodes, supporting between processes
Priority :Support the priority of process instances and task instances, if the priority of process instances and task instances is not set, the default is first-in first-out
Email alert:Support SQL task Query result email sending, process instance running result email alert and fault tolerance alert notification
Failure strategy:For tasks running in parallel, if a task fails, two failure strategy processing methods are provided. Continue refers to regardless of the status of the task running in parallel until the end of the process failure. End means that once a failed task is found, Kill will also run the parallel task at the same time, and the process fails and ends
Complement:Supplement historical data,Supports interval parallel and serial two complement methods
System architecture diagram
Start process activity diagram
MasterServer
MasterServer adopts a distributed and centerless design concept. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer at the same time. When the MasterServer service starts, register a temporary node with Zookeeper, and perform fault tolerance by monitoring changes in the temporary node of Zookeeper. MasterServer provides monitoring services based on netty.
Distributed Quartz distributed scheduling component, which is mainly responsible for the start and stop operations of scheduled tasks. When Quartz starts the task, there will be a thread pool inside the Master that is specifically responsible for the follow-up operation of the processing task
MasterSchedulerThread is a scanning thread that regularly scans the command table in the database and performs different business operations according to different command types
MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of various command types
MasterTaskExecThread is mainly responsible for the persistence of tasks
WorkerServer
WorkerServer also adopts a distributed and decentralized design concept. WorkerServer is mainly responsible for task execution and providing log services.
When the WorkerServer service starts, register a temporary node with Zookeeper and maintain a heartbeat. Server provides monitoring services based on netty. Worker
Fetch TaskThread is mainly responsible for continuously getting tasks from Task Queue, and calling TaskScheduleThread corresponding executor according to different task types.
LoggerServer is an RPC service that provides functions such as log fragment viewing, refreshing and downloading
ZooKeeper
ZooKeeper service, MasterServer and WorkerServer nodes in the system all use ZooKeeper for cluster management and fault tolerance. In addition, the system is based on ZooKeeper for event monitoring and distributed locks.
We have also implemented queues based on Redis, but we hope that DolphinScheduler depends on as few components as possible, so we finally removed the Redis implementation.
Task Queue
Provide task queue operation, the current queue is also implemented based on Zookeeper. Because there is less information stored in the queue, there is no need to worry about too much data in the queue. In fact, we have tested the millions of data storage queues, which has no impact on system stability and performance.
Alert
Provide alarm related interface, the interface mainly includes alarm two types of alarm data storage, query and notification functions. Among them, there are email notification and SNMP (not yet implemented).
API
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to the outside world. Interfaces include workflow creation, definition, query, modification, release, logoff, manual start, stop, pause, resume, start execution from the node and so on.
UI
The front-end page of the system provides various visual operation interfaces of the system,See more at System User Manual section。
The centralized design concept is relatively simple. The nodes in the distributed cluster are divided into roles according to roles, which are roughly divided into two roles:
Problems in centralized thought design:
In the decentralized design, there is usually no concept of Master/Slave, all roles are the same, the status is equal, the global Internet is a typical decentralized distributed system, any node equipment connected to the network is down, All will only affect a small range of functions.
The core design of decentralized design is that there is no "manager" different from other nodes in the entire distributed system, so there is no single point of failure. However, because there is no "manager" node, each node needs to communicate with other nodes to obtain the necessary machine information, and the unreliability of distributed system communication greatly increases the difficulty of implementing the above functions.
In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are constantly pouring out. Under this architecture, the managers in the cluster are dynamically selected, rather than preset, and when the cluster fails, the nodes of the cluster will automatically hold "meetings" to elect new "managers" To preside over the work. The most typical case is Etcd implemented by ZooKeeper and Go language.
The decentralization of DolphinScheduler is that the Master/Worker is registered in Zookeeper, and the Master cluster and Worker cluster are centerless, and the Zookeeper distributed lock is used to elect one of the Master or Worker as the "manager" to perform the task.
DolphinScheduler uses ZooKeeper distributed lock to realize that only one Master executes Scheduler at the same time, or only one Worker executes the submission of tasks.
It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three solutions to reduce this risk:
note:The Master Scheduler thread is executed by FIFO when acquiring the Command.
So we chose the third way to solve the problem of insufficient threads.
Fault tolerance is divided into service downtime fault tolerance and task retry, and service downtime fault tolerance is divided into master fault tolerance and worker fault tolerance.
The service fault-tolerance design relies on ZooKeeper's Watcher mechanism, and the implementation principle is shown in the figure:
Once the Master Scheduler thread finds that the task instance is in the "fault tolerant" state, it takes over the task and resubmits it.
Note: Due to "network jitter", the node may lose its heartbeat with ZooKeeper in a short period of time, and the node's remove event may occur. For this situation, we use the simplest way, that is, once the node and ZooKeeper timeout connection occurs, then directly stop the Master or Worker service.
Here we must first distinguish the concepts of task failure retry, process failure recovery, and process failure rerun:
Next to the topic, we divide the task nodes in the workflow into two types.
One is a business node, which corresponds to an actual script or processing statement, such as Shell node, MR node, Spark node, and dependent node.
There is also a logical node, which does not do actual script or statement processing, but only logical processing of the entire process flow, such as sub-process sections.
Each business node can be configured with the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the configured number of retries. Logical node Failure retry is not supported. But the tasks in the logical node support retry.
If there is a task failure in the workflow that reaches the maximum number of retries, the workflow will fail to stop, and the failed workflow can be manually rerun or process recovery operation
In the early scheduling design, if there is no priority design and the fair scheduling design is used, the task submitted first may be completed at the same time as the task submitted later, and the process or task priority cannot be set, so We have redesigned this, and our current design is as follows:
The specific implementation is to parse the priority according to the json of the task instance, and then save the process instance priority_process instance id_task priority_task id information in the ZooKeeper task queue, when obtained from the task queue, pass String comparison can get the tasks that need to be executed first
The priority of the process definition is to consider that some processes need to be processed before other processes. This can be configured when the process is started or scheduled to start. There are 5 levels in total, which are HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. As shown below
The priority of the task is also divided into 5 levels, followed by HIGHEST, HIGH, MEDIUM, LOW, LOWEST. As shown below
Since Web (UI) and Worker are not necessarily on the same machine, viewing the log cannot be like querying a local file. There are two options:
Put logs on ES search engine
Obtain remote log information through netty communication
In consideration of the lightness of DolphinScheduler as much as possible, so I chose gRPC to achieve remote access to log information.
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
Generate logs in the form of /process definition id/process instance id/task instance id.log
Filter to match the thread name starting with TaskLogInfo:
TaskLogFilter is implemented as follows:
/**
* task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
dolphinscheduler-alert alarm module, providing AlertServer service.
dolphinscheduler-api web application module, providing ApiServer service.
dolphinscheduler-common General constant enumeration, utility class, data structure or base class
dolphinscheduler-dao provides operations such as database access.
dolphinscheduler-remote client and server based on netty
dolphinscheduler-server MasterServer and WorkerServer services
dolphinscheduler-service service module, including Quartz, Zookeeper, log client access service, easy to call server module and api module
dolphinscheduler-ui front-end module
From the perspective of scheduling, this article preliminarily introduces the architecture principles and implementation ideas of the big data distributed workflow scheduling system-DolphinScheduler. To be continued