Datax

A DataX task type’s example and dive into information of PyDolphinScheduler.

Example

"""
A example workflow for task datax.

This example will create a workflow named `task_datax`.
`task_datax` is true workflow define and run task task_datax.
You can create data sources `first_mysql` and `first_mysql` through UI.
It creates a task to synchronize datax from the source database to the target database.
"""

import json

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.datax import CustomDataX, DataX

# datax json template
JSON_TEMPLATE = {
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "usr",
                        "password": "pwd",
                        "column": ["id", "name", "code", "description"],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": ["source_table"],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
                            }
                        ],
                    },
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "usr",
                        "password": "pwd",
                        "column": ["id", "name"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
                                "table": ["target_table"],
                            }
                        ],
                    },
                },
            }
        ],
        "setting": {
            "errorLimit": {"percentage": 0, "record": 0},
            "speed": {"channel": 1, "record": 1000},
        },
    }
}

with Workflow(
    name="task_datax_example",
) as workflow:
    # This task synchronizes the data in `t_ds_project`
    # of `first_mysql` database to `target_project` of `second_mysql` database.
    # You have to make sure data source named `first_mysql` and `second_mysql` exists
    # in your environment.
    task1 = DataX(
        name="task_datax",
        datasource_name="first_mysql",
        datatarget_name="second_mysql",
        sql="select id, name, code, description from source_table",
        target_table="target_table",
    )

    # You can custom json_template of datax to sync data. This task create a new
    # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
    # We should format the custom json config if we want to format it in web UI
    task2 = CustomDataX(
        name="task_custom_datax", json=json.dumps(JSON_TEMPLATE, indent=4)
    )

    # [start resource_limit]
    resource_limit = DataX(
        name="resource_limit",
        datasource_name="first_mysql",
        datatarget_name="second_mysql",
        sql="select id, name, code, description from source_table",
        target_table="target_table",
        cpu_quota=1,
        memory_max=100,
    )
    # [end resource_limit]
    workflow.run()

Resource Limit Example

We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.

    resource_limit = DataX(
        name="resource_limit",
        datasource_name="first_mysql",
        datatarget_name="second_mysql",
        sql="select id, name, code, description from source_table",
        target_table="target_table",
        cpu_quota=1,
        memory_max=100,
    )

Dive Into

Task datax.

class pydolphinscheduler.tasks.datax.CustomDataX(name: str, json: str, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.

You provider json template for DataX, it can synchronize data according to the template you provided.

Parameters:
  • name – task name for this task

  • json

    json template string, or json file path for custom DataX task, CustomDataX will not format json template, you should format by yourself.

    • Use config string directly instead of json file path * should use json.dumps() to format it if your json template is dict

      import json
      
      custom = CustomDataX(
          name="custom_datax",
          json=json.dumps({"job": {"content": [{"reader": {"name": "mysqlreader"}}]}}),
      )
      
      • or format it by manual if your json template is native str.

    • Use json file path, the format it shows in web UI is depended on your json file content.

      import json
      
      custom = CustomDataX(
          name="custom_datax",
          # web UI datax config will show as json file content
          json="/path/to/datax.json",
      )
      

  • xms – jvm param about min memory for task datax running, default is 1g

  • xmx – jvm param about max memory for task datax running, default is 1g

CUSTOM_CONFIG = 1
_downstream_task_codes: set[int]
_task_custom_attr: set = {'custom_config', 'json', 'xms', 'xmx'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
ext: set = {'.json'}
ext_attr: str = '_json'
class pydolphinscheduler.tasks.datax.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, datasource_type: str | None = None, datatarget_type: str | None = None, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: list[str] | None = None, post_statements: list[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task DataX object, declare behavior for DataX task to dolphinscheduler.

It should run database datax job in multiply sql link engine, such as:

  • MySQL

  • Oracle

  • Postgresql

  • SQLServer

You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.

Parameters:
  • name – task name for this task

  • datasource_name – source database name for task datax to extract data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.

  • datatarget_name – target database name for task datax to load data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.

  • sql – sql statement for task datax to extract data form source database.

  • target_table – target table name for task datax to load data into target database.

  • datasource_type – source database type, dolphinscheduler use it to find :param:datasource_name in datasource center.

  • datasource_type – target database type, dolphinscheduler use it to find :param:datatarget_name in datasource center.

  • job_speed_byte – task datax job speed byte, default is 0. For more detail you can get from :seealso: https://github.com/alibaba/DataX

  • job_speed_record – task datax job speed record, default is 1000. For more detail you can get from :seealso: https://github.com/alibaba/DataX

  • pre_statements – task datax job pre statements, it will execute before task datax job start to load. default is None.

  • post_statements – task datax job post statements, it will execute after task datax job finish load. default is None.

CUSTOM_CONFIG = 0
_downstream_task_codes: set[int]
_task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
ext: set = {'.sql'}
ext_attr: str = '_sql'
property source_params: dict

Get source params for datax task.

property target_params: dict

Get target params for datax task.

property task_params: dict

Override Task.task_params for datax task.

datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

YAML file example

# Define the workflow
workflow:
  name: "DataX"

# Define the tasks within the workflow
tasks:
  - name: task
    task_type: DataX
    datasource_name: db
    datatarget_name: db
    sql: show tables;
    target_table: table_test

  - name: task_custon_config
    task_type: CustomDataX
    json: $FILE{"example_datax.json"}

example_datax.json:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "usr",
            "password": "pwd",
            "column": [
              "id",
              "name",
              "code",
              "description"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "source_table"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/source_db"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "usr",
            "password": "pwd",
            "column": [
              "id",
              "name"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
                "table": [
                  "target_table"
                ]
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "percentage": 0,
        "record": 0
      },
      "speed": {
        "channel": 1,
        "record": 1000
      }
    }
  }
}