From 32bbbadef74474e0012438e11cea037a3ab47020 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 17 Dec 2022 20:48:11 +0800 Subject: [PATCH 1/4] [impv] Remove tenant define is workflow current tenant in workflow only work when the first time user do not exist, when user change the tenant in workflow but tenant exist, it will be ignore, so we try to remove it from workflow, and in #40 we try to create both user and tenant vis cli instead of auto create --- docs/source/concept.rst | 7 +- docs/source/config.rst | 2 - examples/yaml_define/tutorial.yaml | 1 - src/pydolphinscheduler/cli/tenants.py | 64 +++++++++++++++++++ src/pydolphinscheduler/cli/users.py | 0 src/pydolphinscheduler/configuration.py | 4 +- .../core/default_config.yaml | 58 ----------------- src/pydolphinscheduler/core/workflow.py | 19 +----- src/pydolphinscheduler/default_config.yaml | 5 +- .../examples/bulk_create_example.py | 4 +- .../examples/multi_resources_example.py | 1 - .../examples/task_condition_example.py | 2 +- .../examples/task_datax_example.py | 1 - .../examples/task_dependent_example.py | 2 - .../examples/task_dvc_example.py | 1 - .../examples/task_flink_example.py | 2 +- .../examples/task_kubernetes_example.py | 1 - .../examples/task_map_reduce_example.py | 2 +- .../examples/task_mlflow_example.py | 1 - .../examples/task_openmldb_example.py | 1 - .../examples/task_pytorch_example.py | 1 - .../examples/task_sagemaker_example.py | 1 - .../examples/task_spark_example.py | 2 +- .../examples/task_switch_example.py | 2 +- src/pydolphinscheduler/examples/tutorial.py | 1 - .../examples/tutorial_decorator.py | 1 - .../examples/tutorial_resource_plugin.py | 1 - src/pydolphinscheduler/java_gateway.py | 2 - src/pydolphinscheduler/models/meta.py | 47 ++++++++++++++ src/pydolphinscheduler/models/tenant.py | 2 +- src/pydolphinscheduler/models/user.py | 2 +- src/pydolphinscheduler/utils/encode.py | 6 ++ 32 files changed, 134 insertions(+), 112 deletions(-) create mode 100644 src/pydolphinscheduler/cli/tenants.py create mode 100644 src/pydolphinscheduler/cli/users.py delete mode 100644 src/pydolphinscheduler/core/default_config.yaml create mode 100644 src/pydolphinscheduler/models/meta.py create mode 100644 src/pydolphinscheduler/utils/encode.py diff --git a/docs/source/concept.rst b/docs/source/concept.rst index 6048ed4..084980c 100644 --- a/docs/source/concept.rst +++ b/docs/source/concept.rst @@ -80,11 +80,12 @@ Tenant ~~~~~~ Tenant is the user who run task command in machine or in virtual machine. it could be assign by simple string. +You should change the tenant value to exists tenant in your host, it config in `config.yaml` in your pydolphinscheduler +``PYDS_HOME``, or via :doc:`CLI ` -.. code-block:: python +.. code-block:: bash - # - workflow = Workflow(name="workflow tenant", tenant="tenant_exists") + pydolphinscheduler config --set default.user.tenant .. note:: diff --git a/docs/source/config.rst b/docs/source/config.rst index c5753cf..70e9845 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -101,8 +101,6 @@ All environment variables as below, and you could modify their value via `Bash < +------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+ | | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | + +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | -+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ | Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | + +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ | | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | diff --git a/examples/yaml_define/tutorial.yaml b/examples/yaml_define/tutorial.yaml index 40d456b..05a181d 100644 --- a/examples/yaml_define/tutorial.yaml +++ b/examples/yaml_define/tutorial.yaml @@ -20,7 +20,6 @@ workflow: name: "tutorial" schedule: "0 0 0 * * ? *" start_time: "2021-01-01" - tenant: "tenant_exists" release_state: "offline" run: true diff --git a/src/pydolphinscheduler/cli/tenants.py b/src/pydolphinscheduler/cli/tenants.py new file mode 100644 index 0000000..08b9f4b --- /dev/null +++ b/src/pydolphinscheduler/cli/tenants.py @@ -0,0 +1,64 @@ +import click + +from pydolphinscheduler import configuration +from pydolphinscheduler.models.tenant import Tenant + + +@click.help_option() +def tenants() -> click.group: + """Users subcommand group.""" + + +@click.group() +def tenants() -> click.group: + """Users subcommand group.""" + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +@click.option( + "-q", "--queue-name", "queue_name", + required=True, + type=str, +) +@click.option( + "-d", "--description", "description", + required=True, + type=str, +) +def create(name, queue_name, description): + tenant = Tenant.get(name) + if tenant: + click.echo(f"Tenant with name {name} already exists.", err=True) + new_tenant = Tenant.create(name, queue_name, description) + click.echo(f"Tenant {new_tenant.name} had been created.") + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +def delete(name): + tenant = Tenant.delete(name) + if not tenant: + click.echo(f"Tenant with name {name} not exists.", err=True) + click.echo(f"Tenant: {tenant}.") + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +def get(name): + tenant = Tenant.get(name) + if not tenant: + click.echo(f"Tenant with name {name} not exists.", err=True) + click.echo(f"Tenant: {tenant}.") diff --git a/src/pydolphinscheduler/cli/users.py b/src/pydolphinscheduler/cli/users.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pydolphinscheduler/configuration.py b/src/pydolphinscheduler/configuration.py index d12e47c..9d12afe 100644 --- a/src/pydolphinscheduler/configuration.py +++ b/src/pydolphinscheduler/configuration.py @@ -184,6 +184,7 @@ def get_bool(val: Any) -> bool: "PYDS_USER_PASSWORD", configs.get("default.user.password") ) USER_EMAIL = os.environ.get("PYDS_USER_EMAIL", configs.get("default.user.email")) +USER_TENANT = os.environ.get("PYDS_USER_STATE", configs.get("default.user.tenant")) USER_PHONE = str(os.environ.get("PYDS_USER_PHONE", configs.get("default.user.phone"))) USER_STATE = get_int( os.environ.get("PYDS_USER_STATE", configs.get("default.user.state")) @@ -193,9 +194,6 @@ def get_bool(val: Any) -> bool: WORKFLOW_PROJECT = os.environ.get( "PYDS_WORKFLOW_PROJECT", configs.get("default.workflow.project") ) -WORKFLOW_TENANT = os.environ.get( - "PYDS_WORKFLOW_TENANT", configs.get("default.workflow.tenant") -) WORKFLOW_USER = os.environ.get( "PYDS_WORKFLOW_USER", configs.get("default.workflow.user") ) diff --git a/src/pydolphinscheduler/core/default_config.yaml b/src/pydolphinscheduler/core/default_config.yaml deleted file mode 100644 index 5541af7..0000000 --- a/src/pydolphinscheduler/core/default_config.yaml +++ /dev/null @@ -1,58 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Setting about Java gateway server -java_gateway: - # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different - # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` - address: 127.0.0.1 - - # The port of Python gateway server start. Define which port you could connect to Python gateway server from - # Python API side. - port: 25333 - - # Whether automatically convert Python objects to Java Objects. Default value is ``True``. There is some - # performance lost when set to ``True`` but for now pydolphinscheduler do not handle the convert issue between - # java and Python, mark it as TODO item in the future. - auto_convert: true - -# Setting about dolphinscheduler default value, will use the value set below if property do not set, which -# including ``user``, ``workflow`` -default: - # Default value for dolphinscheduler's user object - user: - name: userPythonGateway - password: userPythonGateway - email: userPythonGateway@dolphinscheduler.com - tenant: tenant_pydolphin - phone: 11111111111 - state: 1 - # Default value for dolphinscheduler's workflow object - workflow: - project: project-pydolphin - tenant: tenant_pydolphin - user: userPythonGateway - queue: queuePythonGateway - worker_group: default - # Release state of workflow, default value is ``online`` which mean setting workflow online when it submits - # to Java gateway, if you want to set workflow offline set its value to ``offline`` - release_state: online - time_zone: Asia/Shanghai - # Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state, - # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are - # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL`` - warning_type: NONE diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py index ca25b7c..9f1f3e2 100644 --- a/src/pydolphinscheduler/core/workflow.py +++ b/src/pydolphinscheduler/core/workflow.py @@ -27,7 +27,7 @@ from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.java_gateway import gateway -from pydolphinscheduler.models import Base, Project, Tenant, User +from pydolphinscheduler.models import Base, Project, User from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule @@ -87,7 +87,6 @@ class Workflow(Base): _KEY_ATTR = { "name", "project", - "tenant", "release_state", "param", } @@ -96,7 +95,6 @@ class Workflow(Base): "name", "description", "_project", - "_tenant", "worker_group", "warning_type", "warning_group_id", @@ -120,7 +118,6 @@ def __init__( timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE, user: Optional[str] = configuration.WORKFLOW_USER, project: Optional[str] = configuration.WORKFLOW_PROJECT, - tenant: Optional[str] = configuration.WORKFLOW_TENANT, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, warning_group_id: Optional[int] = 0, @@ -140,7 +137,6 @@ def __init__( self.timezone = timezone self._user = user self._project = project - self._tenant = tenant self.worker_group = worker_group self.warning_type = warning_type if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"): @@ -178,16 +174,6 @@ def __enter__(self) -> "Workflow": def __exit__(self, exc_type, exc_val, exc_tb) -> None: WorkflowContext.delete() - @property - def tenant(self) -> Tenant: - """Get attribute tenant.""" - return Tenant(self._tenant) - - @tenant.setter - def tenant(self, tenant: Tenant) -> None: - """Set attribute tenant.""" - self._tenant = tenant.name - @property def project(self) -> Project: """Get attribute project.""" @@ -204,7 +190,7 @@ def user(self) -> User: For now we just get from python models but not from java gateway models, so it may not correct. """ - return User(name=self._user, tenant=self._tenant) + return User(name=self._user) @staticmethod def _parse_datetime(val: Any) -> Any: @@ -438,7 +424,6 @@ def submit(self) -> int: self.execution_type, self.timeout, self.worker_group, - self._tenant, self.release_state, # TODO add serialization function json.dumps(self.task_relation_json), diff --git a/src/pydolphinscheduler/default_config.yaml b/src/pydolphinscheduler/default_config.yaml index 0c51880..c5b9f2e 100644 --- a/src/pydolphinscheduler/default_config.yaml +++ b/src/pydolphinscheduler/default_config.yaml @@ -39,16 +39,15 @@ java_gateway: default: # Default value for dolphinscheduler's user object user: - name: userPythonGateway + name: userPythonGateway1 password: userPythonGateway email: userPythonGateway@dolphinscheduler.com - tenant: tenant_pydolphin + tenant: zhongjiajie phone: 11111111111 state: 1 # Default value for dolphinscheduler's workflow object workflow: project: project-pydolphin - tenant: tenant_pydolphin user: userPythonGateway queue: queuePythonGateway worker_group: default diff --git a/src/pydolphinscheduler/examples/bulk_create_example.py b/src/pydolphinscheduler/examples/bulk_create_example.py index 229811c..1263478 100644 --- a/src/pydolphinscheduler/examples/bulk_create_example.py +++ b/src/pydolphinscheduler/examples/bulk_create_example.py @@ -31,8 +31,6 @@ NUM_WORKFLOWS = 10 NUM_TASKS = 5 -# Make sure your tenant exists in your operator system -TENANT = "exists_tenant" # Whether task should dependent on pre one or not # False will create workflow with independent task, while True task will dependent on pre-task and dependence # link like `pre_task -> current_task -> next_task`, default True @@ -41,7 +39,7 @@ for wf in range(0, NUM_WORKFLOWS): workflow_name = f"workflow:{wf}" - with Workflow(name=workflow_name, tenant=TENANT) as workflow: + with Workflow(name=workflow_name) as workflow: for t in range(0, NUM_TASKS): task_name = f"task:{t}-{workflow_name}" command = f"echo This is task {task_name}" diff --git a/src/pydolphinscheduler/examples/multi_resources_example.py b/src/pydolphinscheduler/examples/multi_resources_example.py index 978a357..1507ab8 100644 --- a/src/pydolphinscheduler/examples/multi_resources_example.py +++ b/src/pydolphinscheduler/examples/multi_resources_example.py @@ -62,7 +62,6 @@ with Workflow( name="multi_resources_example", - tenant="tenant_exists", # [start create_new_resources] resource_list=[ Resource( diff --git a/src/pydolphinscheduler/examples/task_condition_example.py b/src/pydolphinscheduler/examples/task_condition_example.py index 585bc76..4f592b6 100644 --- a/src/pydolphinscheduler/examples/task_condition_example.py +++ b/src/pydolphinscheduler/examples/task_condition_example.py @@ -35,7 +35,7 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition from pydolphinscheduler.tasks.shell import Shell -with Workflow(name="task_condition_example", tenant="tenant_exists") as workflow: +with Workflow(name="task_condition_example") as workflow: pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1") pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2") pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3") diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py index a1422ef..6fdf779 100644 --- a/src/pydolphinscheduler/examples/task_datax_example.py +++ b/src/pydolphinscheduler/examples/task_datax_example.py @@ -74,7 +74,6 @@ with Workflow( name="task_datax_example", - tenant="tenant_exists", ) as workflow: # This task synchronizes the data in `t_ds_project` # of `first_mysql` database to `target_project` of `second_mysql` database. diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py index 648bb5c..a09aaf1 100644 --- a/src/pydolphinscheduler/examples/task_dependent_example.py +++ b/src/pydolphinscheduler/examples/task_dependent_example.py @@ -42,7 +42,6 @@ with Workflow( name="task_dependent_external", - tenant="tenant_exists", ) as workflow: task_1 = Shell(name="task_1", command="echo task 1") task_2 = Shell(name="task_2", command="echo task 2") @@ -51,7 +50,6 @@ with Workflow( name="task_dependent_example", - tenant="tenant_exists", ) as workflow: task = Dependent( name="task_dependent", diff --git a/src/pydolphinscheduler/examples/task_dvc_example.py b/src/pydolphinscheduler/examples/task_dvc_example.py index 98e03f1..ea3b572 100644 --- a/src/pydolphinscheduler/examples/task_dvc_example.py +++ b/src/pydolphinscheduler/examples/task_dvc_example.py @@ -25,7 +25,6 @@ with Workflow( name="task_dvc_example", - tenant="tenant_exists", ) as workflow: init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data") upload_task = DVCUpload( diff --git a/src/pydolphinscheduler/examples/task_flink_example.py b/src/pydolphinscheduler/examples/task_flink_example.py index e5084b4..c5e97ef 100644 --- a/src/pydolphinscheduler/examples/task_flink_example.py +++ b/src/pydolphinscheduler/examples/task_flink_example.py @@ -21,7 +21,7 @@ from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType -with Workflow(name="task_flink_example", tenant="tenant_exists") as workflow: +with Workflow(name="task_flink_example") as workflow: task = Flink( name="task_flink", main_class="org.apache.flink.streaming.examples.wordcount.WordCount", diff --git a/src/pydolphinscheduler/examples/task_kubernetes_example.py b/src/pydolphinscheduler/examples/task_kubernetes_example.py index 3f2ee0d..84a497d 100644 --- a/src/pydolphinscheduler/examples/task_kubernetes_example.py +++ b/src/pydolphinscheduler/examples/task_kubernetes_example.py @@ -23,7 +23,6 @@ with Workflow( name="task_kubernetes_example", - tenant="tenant_exists", ) as workflow: task_k8s = Kubernetes( name="task_k8s", diff --git a/src/pydolphinscheduler/examples/task_map_reduce_example.py b/src/pydolphinscheduler/examples/task_map_reduce_example.py index 70a6b2b..a1aa2fa 100644 --- a/src/pydolphinscheduler/examples/task_map_reduce_example.py +++ b/src/pydolphinscheduler/examples/task_map_reduce_example.py @@ -22,7 +22,7 @@ from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.map_reduce import MR -with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as workflow: +with Workflow(name="task_map_reduce_example") as workflow: task = MR( name="task_mr", main_class="wordcount", diff --git a/src/pydolphinscheduler/examples/task_mlflow_example.py b/src/pydolphinscheduler/examples/task_mlflow_example.py index 7030bcf..46d6a86 100644 --- a/src/pydolphinscheduler/examples/task_mlflow_example.py +++ b/src/pydolphinscheduler/examples/task_mlflow_example.py @@ -31,7 +31,6 @@ with Workflow( name="task_mlflow_example", - tenant="tenant_exists", ) as workflow: # run custom mlflow project to train model diff --git a/src/pydolphinscheduler/examples/task_openmldb_example.py b/src/pydolphinscheduler/examples/task_openmldb_example.py index a8186e7..6411916 100644 --- a/src/pydolphinscheduler/examples/task_openmldb_example.py +++ b/src/pydolphinscheduler/examples/task_openmldb_example.py @@ -29,7 +29,6 @@ with Workflow( name="task_openmldb_example", - tenant="tenant_exists", ) as workflow: task_openmldb = OpenMLDB( name="task_openmldb", diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py index 8cb3a2d..bfb4424 100644 --- a/src/pydolphinscheduler/examples/task_pytorch_example.py +++ b/src/pydolphinscheduler/examples/task_pytorch_example.py @@ -23,7 +23,6 @@ with Workflow( name="task_pytorch_example", - tenant="tenant_exists", ) as workflow: # run project with existing environment diff --git a/src/pydolphinscheduler/examples/task_sagemaker_example.py b/src/pydolphinscheduler/examples/task_sagemaker_example.py index cbebfa7..6024823 100644 --- a/src/pydolphinscheduler/examples/task_sagemaker_example.py +++ b/src/pydolphinscheduler/examples/task_sagemaker_example.py @@ -35,7 +35,6 @@ with Workflow( name="task_sagemaker_example", - tenant="tenant_exists", ) as workflow: task_sagemaker = SageMaker( name="task_sagemaker", diff --git a/src/pydolphinscheduler/examples/task_spark_example.py b/src/pydolphinscheduler/examples/task_spark_example.py index 77ec4ac..e7df697 100644 --- a/src/pydolphinscheduler/examples/task_spark_example.py +++ b/src/pydolphinscheduler/examples/task_spark_example.py @@ -21,7 +21,7 @@ from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark -with Workflow(name="task_spark_example", tenant="tenant_exists") as workflow: +with Workflow(name="task_spark_example") as workflow: task = Spark( name="task_spark", main_class="org.apache.spark.examples.SparkPi", diff --git a/src/pydolphinscheduler/examples/task_switch_example.py b/src/pydolphinscheduler/examples/task_switch_example.py index b5c60f0..0ff0b65 100644 --- a/src/pydolphinscheduler/examples/task_switch_example.py +++ b/src/pydolphinscheduler/examples/task_switch_example.py @@ -35,7 +35,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition with Workflow( - name="task_switch_example", tenant="tenant_exists", param={"var": "1"} + name="task_switch_example", param={"var": "1"} ) as workflow: parent = Shell(name="parent", command="echo parent") switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1") diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py index 10243fd..74080b8 100644 --- a/src/pydolphinscheduler/examples/tutorial.py +++ b/src/pydolphinscheduler/examples/tutorial.py @@ -45,7 +45,6 @@ name="tutorial", schedule="0 0 0 * * ? *", start_time="2021-01-01", - tenant="tenant_exists", ) as workflow: # [end workflow_declare] # [start task_declare] diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py index 9e584a6..f3b878b 100644 --- a/src/pydolphinscheduler/examples/tutorial_decorator.py +++ b/src/pydolphinscheduler/examples/tutorial_decorator.py @@ -74,7 +74,6 @@ def task_union(): name="tutorial_decorator", schedule="0 0 0 * * ? *", start_time="2021-01-01", - tenant="tenant_exists", ) as workflow: # [end workflow_declare] diff --git a/src/pydolphinscheduler/examples/tutorial_resource_plugin.py b/src/pydolphinscheduler/examples/tutorial_resource_plugin.py index f336455..0089e5f 100644 --- a/src/pydolphinscheduler/examples/tutorial_resource_plugin.py +++ b/src/pydolphinscheduler/examples/tutorial_resource_plugin.py @@ -41,7 +41,6 @@ name="tutorial_resource_plugin", schedule="0 0 0 * * ? *", start_time="2021-01-01", - tenant="tenant_exists", resource_plugin=Local("/tmp"), ) as workflow: # [end workflow_declare] diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py index 46a284e..dcf4fe8 100644 --- a/src/pydolphinscheduler/java_gateway.py +++ b/src/pydolphinscheduler/java_gateway.py @@ -258,7 +258,6 @@ def create_or_update_workflow( execution_type: str, timeout: int, worker_group: str, - tenant_code: str, release_state: int, task_relation_json: str, task_definition_json: str, @@ -277,7 +276,6 @@ def create_or_update_workflow( warning_group_id, timeout, worker_group, - tenant_code, release_state, task_relation_json, task_definition_json, diff --git a/src/pydolphinscheduler/models/meta.py b/src/pydolphinscheduler/models/meta.py new file mode 100644 index 0000000..62417cf --- /dev/null +++ b/src/pydolphinscheduler/models/meta.py @@ -0,0 +1,47 @@ +from functools import wraps +from inspect import signature +from typing import Tuple, Dict + +from py4j.java_gateway import JavaObject +from pydolphinscheduler.utils.string import snake2camel + + +class ModelMeta(type): + _FUNC_INIT = "__init__" + _PARAM_SELF = "self" + + def __new__(mcs, name: str, bases: Tuple, attrs: Dict): + + if mcs._FUNC_INIT not in attrs: + raise TypeError("Class with mateclass %s must have %s method", (mcs.__name__, mcs._FUNC_INIT)) + + sig = signature(attrs.get(mcs._FUNC_INIT)) + param = [param.name for name, param in sig.parameters.items() if name != mcs._PARAM_SELF] + + for attr_name, attr_value in attrs.items(): + if isinstance(attr_value, classmethod) and not attr_name.startswith("__"): + attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param) + return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs) + + @classmethod + def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None): + @wraps(cm) + def wrapper(*args, **kwargs): + class_ = type(name, (), attrs) + + java_obj = cm.__func__(class_, *args, **kwargs) + assert isinstance(java_obj, JavaObject), "The function %s must return JavaObject" % cm.__func__.__name__ + + obj_init_params = [] + for param in params: + java_func_name = mcs.py4j_attr_func_name(param) + java_func = getattr(java_obj, java_func_name) + obj_init_params.append(java_func()) + + return class_(*obj_init_params) + + return wrapper + + @classmethod + def py4j_attr_func_name(mcs, name: str) -> str: + return snake2camel(f"get_{name}") diff --git a/src/pydolphinscheduler/models/tenant.py b/src/pydolphinscheduler/models/tenant.py index 146aec0..10882c2 100644 --- a/src/pydolphinscheduler/models/tenant.py +++ b/src/pydolphinscheduler/models/tenant.py @@ -29,7 +29,7 @@ class Tenant(BaseSide): def __init__( self, - name: str = configuration.WORKFLOW_TENANT, + name: str = configuration.USER_TENANT, queue: str = configuration.WORKFLOW_QUEUE, description: Optional[str] = None, tenant_id: Optional[int] = None, diff --git a/src/pydolphinscheduler/models/user.py b/src/pydolphinscheduler/models/user.py index e45f98d..e58af46 100644 --- a/src/pydolphinscheduler/models/user.py +++ b/src/pydolphinscheduler/models/user.py @@ -43,7 +43,7 @@ def __init__( password: Optional[str] = configuration.USER_PASSWORD, email: Optional[str] = configuration.USER_EMAIL, phone: Optional[str] = configuration.USER_PHONE, - tenant: Optional[str] = configuration.WORKFLOW_TENANT, + tenant: Optional[str] = configuration.USER_TENANT, queue: Optional[str] = configuration.WORKFLOW_QUEUE, status: Optional[int] = configuration.USER_STATE, ): diff --git a/src/pydolphinscheduler/utils/encode.py b/src/pydolphinscheduler/utils/encode.py new file mode 100644 index 0000000..d893a39 --- /dev/null +++ b/src/pydolphinscheduler/utils/encode.py @@ -0,0 +1,6 @@ +import hashlib + + +def md5(string: str) -> str: + """Encode string with MD5 hashlib.""" + return hashlib.md5(string.encode('utf-8')).hexdigest() From 5cdb8164703c9aa815e3b62c978a990e878498db Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 17 Dec 2022 21:01:25 +0800 Subject: [PATCH 2/4] style --- src/pydolphinscheduler/cli/tenants.py | 64 ---------------------- src/pydolphinscheduler/cli/users.py | 0 src/pydolphinscheduler/default_config.yaml | 4 +- src/pydolphinscheduler/models/meta.py | 47 ---------------- src/pydolphinscheduler/utils/encode.py | 6 -- tests/core/test_workflow.py | 32 +---------- tests/test_configuration.py | 4 +- tests/utils/test_yaml_parser.py | 1 - 8 files changed, 5 insertions(+), 153 deletions(-) delete mode 100644 src/pydolphinscheduler/cli/tenants.py delete mode 100644 src/pydolphinscheduler/cli/users.py delete mode 100644 src/pydolphinscheduler/models/meta.py delete mode 100644 src/pydolphinscheduler/utils/encode.py diff --git a/src/pydolphinscheduler/cli/tenants.py b/src/pydolphinscheduler/cli/tenants.py deleted file mode 100644 index 08b9f4b..0000000 --- a/src/pydolphinscheduler/cli/tenants.py +++ /dev/null @@ -1,64 +0,0 @@ -import click - -from pydolphinscheduler import configuration -from pydolphinscheduler.models.tenant import Tenant - - -@click.help_option() -def tenants() -> click.group: - """Users subcommand group.""" - - -@click.group() -def tenants() -> click.group: - """Users subcommand group.""" - - -@tenants.command() -@click.option( - "-n", "--name", "name", - required=True, - type=str, -) -@click.option( - "-q", "--queue-name", "queue_name", - required=True, - type=str, -) -@click.option( - "-d", "--description", "description", - required=True, - type=str, -) -def create(name, queue_name, description): - tenant = Tenant.get(name) - if tenant: - click.echo(f"Tenant with name {name} already exists.", err=True) - new_tenant = Tenant.create(name, queue_name, description) - click.echo(f"Tenant {new_tenant.name} had been created.") - - -@tenants.command() -@click.option( - "-n", "--name", "name", - required=True, - type=str, -) -def delete(name): - tenant = Tenant.delete(name) - if not tenant: - click.echo(f"Tenant with name {name} not exists.", err=True) - click.echo(f"Tenant: {tenant}.") - - -@tenants.command() -@click.option( - "-n", "--name", "name", - required=True, - type=str, -) -def get(name): - tenant = Tenant.get(name) - if not tenant: - click.echo(f"Tenant with name {name} not exists.", err=True) - click.echo(f"Tenant: {tenant}.") diff --git a/src/pydolphinscheduler/cli/users.py b/src/pydolphinscheduler/cli/users.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/pydolphinscheduler/default_config.yaml b/src/pydolphinscheduler/default_config.yaml index c5b9f2e..58bdff8 100644 --- a/src/pydolphinscheduler/default_config.yaml +++ b/src/pydolphinscheduler/default_config.yaml @@ -39,10 +39,10 @@ java_gateway: default: # Default value for dolphinscheduler's user object user: - name: userPythonGateway1 + name: userPythonGateway password: userPythonGateway email: userPythonGateway@dolphinscheduler.com - tenant: zhongjiajie + tenant: tenant_pydolphin phone: 11111111111 state: 1 # Default value for dolphinscheduler's workflow object diff --git a/src/pydolphinscheduler/models/meta.py b/src/pydolphinscheduler/models/meta.py deleted file mode 100644 index 62417cf..0000000 --- a/src/pydolphinscheduler/models/meta.py +++ /dev/null @@ -1,47 +0,0 @@ -from functools import wraps -from inspect import signature -from typing import Tuple, Dict - -from py4j.java_gateway import JavaObject -from pydolphinscheduler.utils.string import snake2camel - - -class ModelMeta(type): - _FUNC_INIT = "__init__" - _PARAM_SELF = "self" - - def __new__(mcs, name: str, bases: Tuple, attrs: Dict): - - if mcs._FUNC_INIT not in attrs: - raise TypeError("Class with mateclass %s must have %s method", (mcs.__name__, mcs._FUNC_INIT)) - - sig = signature(attrs.get(mcs._FUNC_INIT)) - param = [param.name for name, param in sig.parameters.items() if name != mcs._PARAM_SELF] - - for attr_name, attr_value in attrs.items(): - if isinstance(attr_value, classmethod) and not attr_name.startswith("__"): - attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param) - return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs) - - @classmethod - def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None): - @wraps(cm) - def wrapper(*args, **kwargs): - class_ = type(name, (), attrs) - - java_obj = cm.__func__(class_, *args, **kwargs) - assert isinstance(java_obj, JavaObject), "The function %s must return JavaObject" % cm.__func__.__name__ - - obj_init_params = [] - for param in params: - java_func_name = mcs.py4j_attr_func_name(param) - java_func = getattr(java_obj, java_func_name) - obj_init_params.append(java_func()) - - return class_(*obj_init_params) - - return wrapper - - @classmethod - def py4j_attr_func_name(mcs, name: str) -> str: - return snake2camel(f"get_{name}") diff --git a/src/pydolphinscheduler/utils/encode.py b/src/pydolphinscheduler/utils/encode.py deleted file mode 100644 index d893a39..0000000 --- a/src/pydolphinscheduler/utils/encode.py +++ /dev/null @@ -1,6 +0,0 @@ -import hashlib - - -def md5(string: str) -> str: - """Encode string with MD5 hashlib.""" - return hashlib.md5(string.encode('utf-8')).hexdigest() diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py index 43f1ddd..c96b0ec 100644 --- a/tests/core/test_workflow.py +++ b/tests/core/test_workflow.py @@ -28,7 +28,7 @@ from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.exceptions import PyDSParamException -from pydolphinscheduler.models import Project, Tenant, User +from pydolphinscheduler.models import Project, User from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition from pydolphinscheduler.utils.date import conv_to_schedule from tests.testing.task import Task @@ -51,7 +51,6 @@ def test_workflow_key_attr(func): [ ("timezone", configuration.WORKFLOW_TIME_ZONE), ("project", Project(configuration.WORKFLOW_PROJECT)), - ("tenant", Tenant(configuration.WORKFLOW_TENANT)), ( "user", User( @@ -59,7 +58,7 @@ def test_workflow_key_attr(func): configuration.USER_PASSWORD, configuration.USER_EMAIL, configuration.USER_PHONE, - configuration.WORKFLOW_TENANT, + configuration.USER_TENANT, configuration.WORKFLOW_QUEUE, configuration.USER_STATE, ), @@ -148,7 +147,6 @@ def test_set_release_state_error(value): "set_attr,set_val,get_attr,get_val", [ ("_project", "project", "project", Project("project")), - ("_tenant", "tenant", "tenant", Tenant("tenant")), ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)), ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)), ], @@ -335,7 +333,6 @@ def test_workflow_get_define_without_task(): "name": TEST_WORKFLOW_NAME, "description": None, "project": configuration.WORKFLOW_PROJECT, - "tenant": configuration.WORKFLOW_TENANT, "workerGroup": configuration.WORKFLOW_WORKER_GROUP, "warningType": configuration.WORKFLOW_WARNING_TYPE, "warningGroupId": 0, @@ -458,31 +455,6 @@ def test_workflow_simple_separate(): assert all(["task-" in task.name for task in workflow.task_list]) -@pytest.mark.parametrize( - "user_attrs", - [ - {"tenant": "tenant_specific"}, - ], -) -def test_set_workflow_user_attr(user_attrs): - """Test user with correct attributes if we specific assigned to workflow object.""" - default_value = { - "tenant": configuration.WORKFLOW_TENANT, - } - with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as workflow: - user = workflow.user - for attr in default_value: - # Get assigned attribute if we specific, else get default value - except_attr = ( - user_attrs[attr] if attr in user_attrs else default_value[attr] - ) - # Get actually attribute of user object - actual_attr = getattr(user, attr) - assert ( - except_attr == actual_attr - ), f"Except attribute is {except_attr} but get {actual_attr}" - - def test_schedule_json_none_schedule(): """Test function schedule_json with None as schedule.""" with Workflow( diff --git a/tests/test_configuration.py b/tests/test_configuration.py index a3dd07f..8fabcca 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -181,10 +181,10 @@ def test_get_configs_build_in(): "userPythonGateway@dolphinscheduler.com", "userPythonGateway@edit.com", ), + ("default.user.tenant", "tenant_pydolphin", "edit_tenant_pydolphin"), ("default.user.phone", 11111111111, 22222222222), ("default.user.state", 1, 0), ("default.workflow.project", "project-pydolphin", "eidt-project-pydolphin"), - ("default.workflow.tenant", "tenant_pydolphin", "edit_tenant_pydolphin"), ("default.workflow.user", "userPythonGateway", "editUserPythonGateway"), ("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"), ("default.workflow.worker_group", "default", "specific"), @@ -220,7 +220,6 @@ def test_single_config_get_set_not_exists_key(): ("USER_PHONE", "11111111111"), ("USER_STATE", 1), ("WORKFLOW_PROJECT", "project-pydolphin"), - ("WORKFLOW_TENANT", "tenant_pydolphin"), ("WORKFLOW_USER", "userPythonGateway"), ("WORKFLOW_QUEUE", "queuePythonGateway"), ("WORKFLOW_WORKER_GROUP", "default"), @@ -249,7 +248,6 @@ def test_get_configuration(config_name: str, expect: Any): ("USER_PHONE", "11111111111", "22222222222"), ("USER_STATE", 1, 0), ("WORKFLOW_PROJECT", "project-pydolphin", "env-project-pydolphin"), - ("WORKFLOW_TENANT", "tenant_pydolphin", "env-tenant_pydolphin"), ("WORKFLOW_USER", "userPythonGateway", "envUserPythonGateway"), ("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"), ("WORKFLOW_WORKER_GROUP", "default", "custom"), diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py index 6ea8b52..40becc7 100644 --- a/tests/utils/test_yaml_parser.py +++ b/tests/utils/test_yaml_parser.py @@ -58,7 +58,6 @@ "default.user.state": (1, 0), "default.workflow": yaml.load("no need test"), "default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"), - "default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"), "default.workflow.user": ("userPythonGateway", "SmithEdit"), "default.workflow.queue": ("queuePythonGateway", "queueEdit"), "default.workflow.worker_group": ("default", "wgEdit"), From d370f737066555b44005c5f352dba16b759015e5 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 17 Dec 2022 21:34:46 +0800 Subject: [PATCH 3/4] add updating --- UPDATING.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index 7ca97ea..f6e2097 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,11 @@ It started after version 2.0.5 released ## Main +* Remove attribute tenant from pydolphinscheduler.core.workflow.workflow ([#54](https://github.com/apache/dolphinscheduler-sdk-python/pull/54)) + and please change tenant name in ``config.yaml`` in ``PYDS_HOME`` + +## 4.0.0 + * Change Task attr ``timeout`` type from int to timedelta and use timeout determine attr ``timeout_flag`` value ([#41](https://github.com/apache/dolphinscheduler-sdk-python/pull/41)) * Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)). * Change class name from process definition to workflow ([#26](https://github.com/apache/dolphinscheduler-sdk-python/pull/26)) From 406568c1b8c24be1c8e7f61562b9abc7202ee795 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 17 Dec 2022 21:37:52 +0800 Subject: [PATCH 4/4] fix style --- src/pydolphinscheduler/examples/task_switch_example.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/pydolphinscheduler/examples/task_switch_example.py b/src/pydolphinscheduler/examples/task_switch_example.py index 0ff0b65..7cefcdd 100644 --- a/src/pydolphinscheduler/examples/task_switch_example.py +++ b/src/pydolphinscheduler/examples/task_switch_example.py @@ -34,9 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition -with Workflow( - name="task_switch_example", param={"var": "1"} -) as workflow: +with Workflow(name="task_switch_example", param={"var": "1"}) as workflow: parent = Shell(name="parent", command="echo parent") switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1") switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")