Datax
A DataX task type’s example and dive into information of PyDolphinScheduler.
Example
"""
A example workflow for task datax.
This example will create a workflow named `task_datax`.
`task_datax` is true workflow define and run task task_datax.
You can create data sources `first_mysql` and `first_mysql` through UI.
It creates a task to synchronize datax from the source database to the target database.
"""
import json
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# datax json template
JSON_TEMPLATE = {
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "usr",
"password": "pwd",
"column": ["id", "name", "code", "description"],
"splitPk": "id",
"connection": [
{
"table": ["source_table"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
}
],
},
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "usr",
"password": "pwd",
"column": ["id", "name"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
"table": ["target_table"],
}
],
},
},
}
],
"setting": {
"errorLimit": {"percentage": 0, "record": 0},
"speed": {"channel": 1, "record": 1000},
},
}
}
with Workflow(
name="task_datax_example",
) as workflow:
# This task synchronizes the data in `t_ds_project`
# of `first_mysql` database to `target_project` of `second_mysql` database.
# You have to make sure data source named `first_mysql` and `second_mysql` exists
# in your environment.
task1 = DataX(
name="task_datax",
datasource_name="first_mysql",
datatarget_name="second_mysql",
sql="select id, name, code, description from source_table",
target_table="target_table",
)
# You can custom json_template of datax to sync data. This task create a new
# datax job same as task1, transfer record from `first_mysql` to `second_mysql`
# We should format the custom json config if we want to format it in web UI
task2 = CustomDataX(
name="task_custom_datax", json=json.dumps(JSON_TEMPLATE, indent=4)
)
# [start resource_limit]
resource_limit = DataX(
name="resource_limit",
datasource_name="first_mysql",
datatarget_name="second_mysql",
sql="select id, name, code, description from source_table",
target_table="target_table",
cpu_quota=1,
memory_max=100,
)
# [end resource_limit]
workflow.run()
Resource Limit Example
We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
resource_limit = DataX(
name="resource_limit",
datasource_name="first_mysql",
datatarget_name="second_mysql",
sql="select id, name, code, description from source_table",
target_table="target_table",
cpu_quota=1,
memory_max=100,
)
Dive Into
Task datax.
- class pydolphinscheduler.tasks.datax.CustomDataX(name: str, json: str, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]
Bases:
WorkerResourceMixin
,BatchTask
Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
You provider json template for DataX, it can synchronize data according to the template you provided.
- Parameters:
name – task name for this task
json –
json template string, or json file path for custom DataX task,
CustomDataX
will not format json template, you should format by yourself.Use config string directly instead of json file path * should use
json.dumps()
to format it if your json template is dictimport json custom = CustomDataX( name="custom_datax", json=json.dumps({"job": {"content": [{"reader": {"name": "mysqlreader"}}]}}), )
or format it by manual if your json template is native str.
Use json file path, the format it shows in web UI is depended on your json file content.
import json custom = CustomDataX( name="custom_datax", # web UI datax config will show as json file content json="/path/to/datax.json", )
xms – jvm param about min memory for task datax running, default is 1g
xmx – jvm param about max memory for task datax running, default is 1g
- CUSTOM_CONFIG = 1
- _downstream_task_codes: set[int]
- _task_custom_attr: set = {'custom_config', 'json', 'xms', 'xmx'}
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- ext: set = {'.json'}
- ext_attr: str = '_json'
- class pydolphinscheduler.tasks.datax.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, datasource_type: str | None = None, datatarget_type: str | None = None, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: list[str] | None = None, post_statements: list[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]
Bases:
WorkerResourceMixin
,BatchTask
Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
MySQL
Oracle
Postgresql
SQLServer
You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.
- Parameters:
name – task name for this task
datasource_name – source database name for task datax to extract data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.
datatarget_name – target database name for task datax to load data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.
sql – sql statement for task datax to extract data form source database.
target_table – target table name for task datax to load data into target database.
datasource_type – source database type, dolphinscheduler use it to find :param:
datasource_name
in datasource center.datasource_type – target database type, dolphinscheduler use it to find :param:
datatarget_name
in datasource center.job_speed_byte – task datax job speed byte, default is 0. For more detail you can get from :seealso: https://github.com/alibaba/DataX
job_speed_record – task datax job speed record, default is 1000. For more detail you can get from :seealso: https://github.com/alibaba/DataX
pre_statements – task datax job pre statements, it will execute before task datax job start to load. default is None.
post_statements – task datax job post statements, it will execute after task datax job finish load. default is None.
- CUSTOM_CONFIG = 0
- _downstream_task_codes: set[int]
- _task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- ext: set = {'.sql'}
- ext_attr: str = '_sql'
- property source_params: dict
Get source params for datax task.
- property target_params: dict
Get target params for datax task.
- property task_params: dict
Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
YAML file example
# Define the workflow
workflow:
name: "DataX"
# Define the tasks within the workflow
tasks:
- name: task
task_type: DataX
datasource_name: db
datatarget_name: db
sql: show tables;
target_table: table_test
- name: task_custon_config
task_type: CustomDataX
json: $FILE{"example_datax.json"}
example_datax.json:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "usr",
"password": "pwd",
"column": [
"id",
"name",
"code",
"description"
],
"splitPk": "id",
"connection": [
{
"table": [
"source_table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/source_db"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "usr",
"password": "pwd",
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
"table": [
"target_table"
]
}
]
}
}
}
],
"setting": {
"errorLimit": {
"percentage": 0,
"record": 0
},
"speed": {
"channel": 1,
"record": 1000
}
}
}
}