System Architecture Design

Before explaining the architecture of the scheduling system, let's first understand the commonly used terms of the scheduling system

1.Glossary

DAG: The full name is Directed Acyclic Graph, referred to as DAG. Task tasks in the workflow are assembled in the form of a directed acyclic graph, and topological traversal is performed from nodes with zero degrees of entry until there are no subsequent nodes. Examples are as follows:

dag example

dag example

Process definition: Visualization formed by dragging task nodes and establishing task node associationsDAG

Process instance: The process instance is the instantiation of the process definition, which can be generated by manual start or scheduled scheduling. Each time the process definition runs, a process instance is generated

Task instance: The task instance is the instantiation of the task node in the process definition, which identifies the specific task execution status

Task type: Currently supports SHELL, SQL, SUB_PROCESS (sub-process), PROCEDURE, MR, SPARK, PYTHON, DEPENDENT (depends), and plans to support dynamic plug-in expansion, note: SUB_PROCESS It is also a separate process definition that can be started and executed separately

Scheduling method: The system supports scheduled scheduling and manual scheduling based on cron expressions. Command type support: start workflow, start execution from current node, resume fault-tolerant workflow, resume pause process, start execution from failed node, complement, timing, rerun, pause, stop, resume waiting thread. Among them Resume fault-tolerant workflow and Resume waiting thread The two command types are used by the internal control of scheduling, and cannot be called from the outside

Scheduled: System adopts quartz distributed scheduler, and supports the visual generation of cron expressions

Rely: The system not only supports DAG simple dependencies between the predecessor and successor nodes, but also provides task dependent nodes, supporting between processes

Priority: Support the priority of process instances and task instances, if the priority of process instances and task instances is not set, the default is first-in-first-out

Email alert: Support SQL task Query result email sending, process instance running result email alert and fault tolerance alert notification

Failure strategy: For tasks running in parallel, if a task fails, two failure strategy processing methods are provided. Continue refers to regardless of the status of the task running in parallel until the end of the process failure. End means that once a failed task is found, Kill will also run the parallel task at the same time, and the process fails and ends

Complement: Supplement historical data,Supports interval parallel and serial two complement methods

2.System Structure

2.1 System architecture diagram

System architecture diagram

System architecture diagram

2.2 Start process activity diagram

Start process activity diagram

Start process activity diagram

2.3 Architecture description

  • MasterServer

    MasterServer adopts a distributed and centerless design concept. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer at the same time. When the MasterServer service starts, register a temporary node with Zookeeper, and perform fault tolerance by monitoring changes in the temporary node of Zookeeper. MasterServer provides monitoring services based on netty.

    The service mainly includes:
    • Distributed Quartz distributed scheduling component, which is mainly responsible for the start and stop operations of scheduled tasks. When Quartz starts the task, there will be a thread pool inside the Master that is specifically responsible for the follow-up operation of the processing task

    • MasterSchedulerThread is a scanning thread that regularly scans the command table in the database and performs different business operations according to different command types

    • MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of various command types

    • MasterTaskExecThread is mainly responsible for the persistence of tasks

  • WorkerServer

    WorkerServer also adopts a distributed and decentralized design concept. WorkerServer is mainly responsible for task execution and providing log services.

    When the WorkerServer service starts, register a temporary node with Zookeeper and maintain a heartbeat. Server provides monitoring services based on netty. Worker

    The service mainly includes:
    • Fetch TaskThread is mainly responsible for continuously getting tasks from Task Queue, and calling TaskScheduleThread corresponding executor according to different task types.
  • ZooKeeper

    ZooKeeper service, MasterServer and WorkerServer nodes in the system all use ZooKeeper for cluster management and fault tolerance. In addition, the system is based on ZooKeeper for event monitoring and distributed locks.

    We have also implemented queues based on Redis, but we hope that DolphinScheduler depends on as few components as possible, so we finally removed the Redis implementation.

  • Task Queue

    Provide task queue operation, the current queue is also implemented based on Zookeeper. Because there is less information stored in the queue, there is no need to worry about too much data in the queue. In fact, we have tested the millions of data storage queues, which has no impact on system stability and performance.

  • Alert

    Provide alarm related interface, the interface mainly includes alarm two types of alarm data storage, query and notification functions. Among them, there are email notification and SNMP (not yet implemented).

  • API

    The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to the outside world. Interfaces include workflow creation, definition, query, modification, release, logoff, manual start, stop, pause, resume, start execution from the node and so on.

  • UI

    The front-end page of the system provides various visual operation interfaces of the system,See more at Introduction to Functions section。

2.3 Architecture design ideas

One、Decentralization VS centralization
Centralized thinking

The centralized design concept is relatively simple. The nodes in the distributed cluster are divided into roles according to roles, which are roughly divided into two roles:

master-slave character

  • The role of the master is mainly responsible for task distribution and monitoring the health status of the slave, and can dynamically balance the task to the slave, so that the slave node will not be in a "busy dead" or "idle dead" state.
  • The role of Worker is mainly responsible for task execution and maintenance and Master's heartbeat, so that Master can assign tasks to Slave.

Problems in centralized thought design:

  • Once there is a problem with the Master, the dragons are headless and the entire cluster will collapse. In order to solve this problem, most of the Master/Slave architecture models adopt the design scheme of active and standby Master, which can be hot standby or cold standby, or automatic switching or manual switching, and more and more new systems are beginning to have The ability to automatically elect and switch Master to improve the availability of the system.
  • Another problem is that if the Scheduler is on the Master, although it can support different tasks in a DAG running on different machines, it will cause the Master to be overloaded. If the Scheduler is on the slave, all tasks in a DAG can only submit jobs on a certain machine. When there are more parallel tasks, the pressure on the slave may be greater.
Decentralized

Decentralization

  • In the decentralized design, there is usually no concept of Master/Slave, all roles are the same, the status is equal, the global Internet is a typical decentralized distributed system, any node equipment connected to the network is down, All will only affect a small range of functions.

  • The core design of decentralized design is that there is no "manager" different from other nodes in the entire distributed system, so there is no single point of failure. However, because there is no "manager" node, each node needs to communicate with other nodes to obtain the necessary machine information, and the unreliability of distributed system communication greatly increases the difficulty of implementing the above functions.

  • In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are constantly pouring out. Under this architecture, the managers in the cluster are dynamically selected, rather than preset, and when the cluster fails, the nodes of the cluster will automatically hold "meetings" to elect new "managers" To preside over the work. The most typical case is Etcd implemented by ZooKeeper and Go language.

  • The decentralization of DolphinScheduler is that the Master/Worker is registered in Zookeeper, and the Master cluster and Worker cluster are centerless, and the Zookeeper distributed lock is used to elect one of the Master or Worker as the "manager" to perform the task.

Two、Distributed lock practice

DolphinScheduler uses ZooKeeper distributed lock to realize that only one Master executes Scheduler at the same time, or only one Worker executes the submission of tasks.

  1. The core process algorithm for acquiring distributed locks is as follows:

Obtain distributed lock process

  1. Flow chart of implementation of Scheduler thread distributed lock in DolphinScheduler:

Obtain distributed lock process

Three、Insufficient thread loop waiting problem
  • If there is no sub-process in a DAG, if the number of data in the Command is greater than the threshold set by the thread pool, the process directly waits or fails.
  • If many sub-processes are nested in a large DAG, the following figure will produce a "dead" state:

Insufficient threads waiting loop problem

In the above figure, MainFlowThread waits for the end of SubFlowThread1, SubFlowThread1 waits for the end of SubFlowThread2, SubFlowThread2 waits for the end of SubFlowThread3, and SubFlowThread3 waits for a new thread in the thread pool, then the entire DAG process cannot end, so that the threads cannot be released. In this way, the state of the child-parent process loop waiting is formed. At this time, unless a new Master is started to add threads to break such a "stalemate", the scheduling cluster will no longer be used.

It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three solutions to reduce this risk:

  1. Calculate the sum of all Master threads, and then calculate the number of threads required for each DAG, that is, pre-calculate before the DAG process is executed. Because it is a multi-master thread pool, the total number of threads is unlikely to be obtained in real time.
  2. Judge the single-master thread pool. If the thread pool is full, let the thread fail directly.
  3. Add a Command type with insufficient resources. If the thread pool is insufficient, suspend the main process. In this way, there are new threads in the thread pool, which can make the process suspended by insufficient resources wake up to execute again.

note: The Master Scheduler thread is executed by FIFO when acquiring the Command.

So we chose the third way to solve the problem of insufficient threads.

Four、Fault-tolerant design

Fault tolerance is divided into service downtime fault tolerance and task retry, and service downtime fault tolerance is divided into master fault tolerance and worker fault tolerance.

1. Downtime fault tolerance

The service fault-tolerance design relies on ZooKeeper's Watcher mechanism, and the implementation principle is shown in the figure:

DolphinScheduler fault-tolerant design

Among them, the Master monitors the directories of other Masters and Workers. If the remove event is heard, fault tolerance of the process instance or task instance will be performed according to the specific business logic.
  • Master fault tolerance:

failover-master

Fault tolerance range: From the perspective of host, the fault tolerance range of Master includes: own host + node host that does not exist in the registry, and the entire process of fault tolerance will be locked;

Fault-tolerant content: Master's fault-tolerant content includes: fault-tolerant process instances and task instances. Before fault-tolerant, it compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;

Fault-tolerant post-processing: After the fault tolerance of ZooKeeper Master is completed, it is re-scheduled by the Scheduler thread in DolphinScheduler, traverses the DAG to find the "running" and "submit successful" tasks, monitors the status of its task instances for the "running" tasks, and "commits successful" tasks It is necessary to determine whether the task queue already exists. If it exists, the status of the task instance is also monitored. If it does not exist, resubmit the task instance.

  • Worker fault tolerance:

failover-worker

Fault tolerance range: From the perspective of process instance, each Master is only responsible for fault tolerance of its own process instance; it will lock only when handleDeadServer;

Fault-tolerant content: When sending the remove event of the Worker node, the Master only fault-tolerant task instances. Before fault-tolerant, it compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;

Fault-tolerant post-processing: Once the Master Scheduler thread finds that the task instance is in the "fault-tolerant" state, it takes over the task and resubmits it.

Note: Due to "network jitter", the node may lose its heartbeat with ZooKeeper in a short period of time, and the node's remove event may occur. For this situation, we use the simplest way, that is, once the node and ZooKeeper timeout connection occurs, then directly stop the Master or Worker service.

2.Task failed and try again

Here we must first distinguish the concepts of task failure retry, process failure recovery, and process failure rerun:

  • Task failure retry is at the task level and is automatically performed by the scheduling system. For example, if a Shell task is set to retry for 3 times, it will try to run it again up to 3 times after the Shell task fails.
  • Process failure recovery is at the process level and is performed manually. Recovery can only be performed from the failed node or from the current node
  • Process failure rerun is also at the process level and is performed manually, rerun is performed from the start node

Next to the topic, we divide the task nodes in the workflow into two types.

  • One is a business node, which corresponds to an actual script or processing statement, such as Shell node, MR node, Spark node, and dependent node.

  • There is also a logical node, which does not do actual script or statement processing, but only logical processing of the entire process flow, such as sub-process sections.

Each business node can be configured with the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the configured number of retries. Logical node Failure retry is not supported. But the tasks in the logical node support retry.

If there is a task failure in the workflow that reaches the maximum number of retries, the workflow will fail to stop, and the failed workflow can be manually rerun or process recovery operation

Five、Task priority design

In the early scheduling design, if there is no priority design and the fair scheduling design is used, the task submitted first may be completed at the same time as the task submitted later, and the process or task priority cannot be set, so We have redesigned this, and our current design is as follows:

  • According to priority of different process instances priority over priority of the same process instance priority over priority of tasks within the same processpriority over tasks within the same processsubmission order from high to Low task processing.
    • The specific implementation is to parse the priority according to the JSON of the task instance, and then save the process instance priority_process instance id_task priority_task id information in the ZooKeeper task queue, when obtained from the task queue, pass String comparison can get the tasks that need to be executed first

      • The priority of the process definition is to consider that some processes need to be processed before other processes. This can be configured when the process is started or scheduled to start. There are 5 levels in total, which are HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. As shown below

        Process priority configuration

      • The priority of the task is also divided into 5 levels, followed by HIGHEST, HIGH, MEDIUM, LOW, LOWEST. As shown below

        Task priority configuration

Six、Logback and netty implement log access
  • Since Web (UI) and Worker are not necessarily on the same machine, viewing the log cannot be like querying a local file. There are two options:

  • Put logs on the ES search engine

  • Obtain remote log information through netty communication

  • In consideration of the lightness of DolphinScheduler as much as possible, so I chose gRPC to achieve remote access to log information.

grpc remote access

  • We use the FileAppender and Filter functions of the custom Logback to realize that each task instance generates a log file.
  • FileAppender is mainly implemented as follows:
/**
 * task log appender
 */
public class TaskLogAppender extends FileAppender<ILoggingEvent> {

    ...

   @Override
   protected void append(ILoggingEvent event) {

       if (currentlyActiveFile == null){
           currentlyActiveFile = getFile();
       }
       String activeFile = currentlyActiveFile;
       // thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
       String threadName = event.getThreadName();
       String[] threadNameArr = threadName.split("-");
       // logId = processDefineId_processInstanceId_taskInstanceId
       String logId = threadNameArr[1];
       ...
       super.subAppend(event);
   }
}


Generate logs in the form of /process definition id/process instance id/task instance id.log

- Filter to match the thread name starting with TaskLogInfo:

- TaskLogFilter is implemented as follows:

```java
/**
*  task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {

   @Override
   public FilterReply decide(ILoggingEvent event) {
       if (event.getThreadName().startsWith("TaskLogInfo-")){
           return FilterReply.ACCEPT;
       }
       return FilterReply.DENY;
   }
}

### 3.Module introduction
- dolphinscheduler-alert alarm module, providing AlertServer service.

- dolphinscheduler-api web application module, providing ApiServer service.

- dolphinscheduler-common General constant enumeration, utility class, data structure or base class

- dolphinscheduler-dao provides operations such as database access.

- dolphinscheduler-remote client and server based on netty

- dolphinscheduler-server MasterServer and WorkerServer services

- dolphinscheduler-service service module, including Quartz, Zookeeper, log client access service, easy to call server module and api module

- dolphinscheduler-ui front-end module

### Sum up
From the perspective of scheduling, this article preliminarily introduces the architecture principles and implementation ideas of the big data distributed workflow scheduling system-DolphinScheduler. To be continued