# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task datax."""
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.models.datasource import Datasource
[docs]
class CustomDataX(WorkerResourceMixin, BatchTask):
"""Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
You provider json template for DataX, it can synchronize data according to the template you provided.
:param name: task name for this task
:param json: json template string, or json file path for custom DataX task, :class:`CustomDataX` will not
format json template, you should format by yourself.
* Use config string directly instead of json file path
* should use :func:`json.dumps` to format it if your json template is dict
.. code-block:: python
import json
custom = CustomDataX(
name="custom_datax",
json=json.dumps({"job": {"content": [{"reader": {"name": "mysqlreader"}}]}}),
)
* or format it by manual if your json template is native str.
* Use json file path, the format it shows in web UI is depended on your json file content.
.. code-block:: python
import json
custom = CustomDataX(
name="custom_datax",
# web UI datax config will show as json file content
json="/path/to/datax.json",
)
:param xms: jvm param about min memory for task datax running, default is 1g
:param xmx: jvm param about max memory for task datax running, default is 1g
"""
CUSTOM_CONFIG = 1
_task_custom_attr = {"custom_config", "json", "xms", "xmx"}
ext: set = {".json"}
ext_attr: str = "_json"
def __init__(
self,
name: str,
json: str,
xms: int | None = 1,
xmx: int | None = 1,
*args,
**kwargs,
):
self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.xms = xms
self.xmx = xmx
self.add_attr(**kwargs)
[docs]
class DataX(WorkerResourceMixin, BatchTask):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
- MySQL
- Oracle
- Postgresql
- SQLServer
You provider datasource_name and datatarget_name contain connection information, it decisions which
database type and database instance would synchronous data.
:param name: task name for this task
:param datasource_name: source database name for task datax to extract data, it must exist in
dolphinscheduler's datasource center otherwise task datax will raise exception.
:param datatarget_name: target database name for task datax to load data, it must exist in
dolphinscheduler's datasource center otherwise task datax will raise exception.
:param sql: sql statement for task datax to extract data form source database.
:param target_table: target table name for task datax to load data into target database.
:param datasource_type: source database type, dolphinscheduler use it to find :param:``datasource_name``
in datasource center.
:param datasource_type: target database type, dolphinscheduler use it to find :param:``datatarget_name``
in datasource center.
:param job_speed_byte: task datax job speed byte, default is 0. For more detail you can get from
:seealso: https://github.com/alibaba/DataX
:param job_speed_record: task datax job speed record, default is 1000. For more detail you can get from
:seealso: https://github.com/alibaba/DataX
:param pre_statements: task datax job pre statements, it will execute before task datax job start to load.
default is None.
:param post_statements: task datax job post statements, it will execute after task datax job finish load.
default is None.
"""
CUSTOM_CONFIG = 0
_task_custom_attr = {
"custom_config",
"sql",
"target_table",
"job_speed_byte",
"job_speed_record",
"pre_statements",
"post_statements",
"xms",
"xmx",
}
ext: set = {".sql"}
ext_attr: str = "_sql"
def __init__(
self,
name: str,
datasource_name: str,
datatarget_name: str,
sql: str,
target_table: str,
datasource_type: str | None = None,
datatarget_type: str | None = None,
job_speed_byte: int | None = 0,
job_speed_record: int | None = 1000,
pre_statements: list[str] | None = None,
post_statements: list[str] | None = None,
xms: int | None = 1,
xmx: int | None = 1,
*args,
**kwargs,
):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.datasource_type = datasource_type
self.datasource_name = datasource_name
self.datatarget_type = datatarget_type
self.datatarget_name = datatarget_name
self.target_table = target_table
self.job_speed_byte = job_speed_byte
self.job_speed_record = job_speed_record
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
self.xms = xms
self.xmx = xmx
self.add_attr(**kwargs)
@property
def source_params(self) -> dict:
"""Get source params for datax task."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datasource_name, self.datasource_type
)
return {
"dsType": datasource_task_u.type,
"dataSource": datasource_task_u.id,
}
@property
def target_params(self) -> dict:
"""Get target params for datax task."""
datasource_task_u = Datasource.get_task_usage_4j(
self.datatarget_name, self.datatarget_type
)
return {
"dtType": datasource_task_u.type,
"dataTarget": datasource_task_u.id,
}
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict:
"""Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if we
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
params.update(self.source_params)
params.update(self.target_params)
return params