Datax

A DataX task type’s example and dive into information of PyDolphinScheduler.

Example

"""
A example workflow for task datax.

This example will create a workflow named `task_datax`.
`task_datax` is true workflow define and run task task_datax.
You can create data sources `first_mysql` and `first_mysql` through UI.
It creates a task to synchronize datax from the source database to the target database.
"""

from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.datax import CustomDataX, DataX

# datax json template
JSON_TEMPLATE = {
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "usr",
                        "password": "pwd",
                        "column": ["id", "name", "code", "description"],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": ["source_table"],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
                            }
                        ],
                    },
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "usr",
                        "password": "pwd",
                        "column": ["id", "name"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
                                "table": ["target_table"],
                            }
                        ],
                    },
                },
            }
        ],
        "setting": {
            "errorLimit": {"percentage": 0, "record": 0},
            "speed": {"channel": 1, "record": 1000},
        },
    }
}

with ProcessDefinition(
    name="task_datax_example",
    tenant="tenant_exists",
) as pd:
    # This task synchronizes the data in `t_ds_project`
    # of `first_mysql` database to `target_project` of `second_mysql` database.
    # You have to make sure data source named `first_mysql` and `second_mysql` exists
    # in your environment.
    task1 = DataX(
        name="task_datax",
        datasource_name="first_mysql",
        datatarget_name="second_mysql",
        sql="select id, name, code, description from source_table",
        target_table="target_table",
    )

    # You can custom json_template of datax to sync data. This task create a new
    # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
    task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
    pd.run()

Dive Into

Task datax.

class pydolphinscheduler.tasks.datax.CustomDataX(name: str, json: str, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: Task

Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.

You provider json template for DataX, it can synchronize data according to the template you provided.

CUSTOM_CONFIG = 1
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'custom_config', 'json', 'xms', 'xmx'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
class pydolphinscheduler.tasks.datax.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: List[str] | None = None, post_statements: List[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: Task

Task DataX object, declare behavior for DataX task to dolphinscheduler.

It should run database datax job in multiply sql link engine, such as: - MySQL - Oracle - Postgresql - SQLServer You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.

CUSTOM_CONFIG = 0
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property task_params: Dict

Override Task.task_params for datax task.

datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

YAML file example

# Define the workflow
workflow:
  name: "DataX"

# Define the tasks under the workflow
tasks:
  - name: task
    task_type: DataX
    datasource_name: db
    datatarget_name: db
    sql: show tables;
    target_table: table_test

  - name: task_custon_config
    task_type: CustomDataX
    json: $FILE{"example_datax.json"}

example_datax.json:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "usr",
            "password": "pwd",
            "column": [
              "id",
              "name",
              "code",
              "description"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "source_table"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/source_db"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "usr",
            "password": "pwd",
            "column": [
              "id",
              "name"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
                "table": [
                  "target_table"
                ]
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "percentage": 0,
        "record": 0
      },
      "speed": {
        "channel": 1,
        "record": 1000
      }
    }
  }
}