# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task datax."""
from typing import Dict, List, Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
[docs]
class CustomDataX(Task):
"""Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
You provider json template for DataX, it can synchronize data according to the template you provided.
"""
CUSTOM_CONFIG = 1
_task_custom_attr = {"custom_config", "json", "xms", "xmx"}
ext: set = {".json"}
ext_attr: str = "_json"
def __init__(
self,
name: str,
json: str,
xms: Optional[int] = 1,
xmx: Optional[int] = 1,
*args,
**kwargs
):
self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.xms = xms
self.xmx = xmx
[docs]
class DataX(Task):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as:
- MySQL
- Oracle
- Postgresql
- SQLServer
You provider datasource_name and datatarget_name contain connection information, it decisions which
database type and database instance would synchronous data.
"""
CUSTOM_CONFIG = 0
_task_custom_attr = {
"custom_config",
"sql",
"target_table",
"job_speed_byte",
"job_speed_record",
"pre_statements",
"post_statements",
"xms",
"xmx",
}
ext: set = {".sql"}
ext_attr: str = "_sql"
def __init__(
self,
name: str,
datasource_name: str,
datatarget_name: str,
sql: str,
target_table: str,
job_speed_byte: Optional[int] = 0,
job_speed_record: Optional[int] = 1000,
pre_statements: Optional[List[str]] = None,
post_statements: Optional[List[str]] = None,
xms: Optional[int] = 1,
xmx: Optional[int] = 1,
*args,
**kwargs
):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.datasource_name = datasource_name
self.datatarget_name = datatarget_name
self.target_table = target_table
self.job_speed_byte = job_speed_byte
self.job_speed_record = job_speed_record
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
self.xms = xms
self.xmx = xmx
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if we
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
datasource = Database(self.datasource_name, "dsType", "dataSource")
params.update(datasource)
datatarget = Database(self.datatarget_name, "dtType", "dataTarget")
params.update(datatarget)
return params