作者 by aigle / 2024-01-12 / 暂无评论
DolphinScheduler安装部署
DolphinScheduler 作为一款开源分布式工作流任务调度系统,可以很好地部署和运行在 Intel 架构服务器及主流虚拟化环境下,并支持主流的Linux操作系统环境
| 操作系统 | 版本 |
|---|---|
| Red Hat Enterprise Linux | 7.0 及以上 |
| CentOS | 7.0 及以上 |
| Oracle Enterprise Linux | 7.0 及以上 |
| Ubuntu LTS | 16.04 及以上 |
注意: 以上 Linux 操作系统可运行在物理服务器以及 VMware、KVM、XEN 主流虚拟化环境上
DolphinScheduler 支持运行在 Intel x86-64 架构的 64 位通用硬件服务器平台。对生产环境的服务器硬件配置有以下建议:
生产环境
| CPU | 内存 | 硬盘类型 | 网络 | 实例数量 |
|---|---|---|---|---|
| 4核+ | 8 GB+ | SAS | 千兆网卡 | 1+ |
注意:
- 以上建议配置为部署 DolphinScheduler 的最低配置,生产环境强烈推荐使用更高的配置
- 硬盘大小配置建议 50GB+ ,系统盘和数据盘分开
DolphinScheduler正常运行提供如下的网络端口配置:
| 组件 | 默认端口 | 说明 |
|---|---|---|
| MasterServer | 5678 | 非通信端口,只需本机端口不冲突即可 |
| WorkerServer | 1234 | 非通信端口,只需本机端口不冲突即可 |
| ApiApplicationServer | 12345 | 提供后端通信端口 |
注意:
- MasterServer 和 WorkerServer 不需要开启网络间通信,只需本机端口不冲突即可
- 管理员可根据实际环境中 DolphinScheduler 组件部署方案,在网络侧和主机侧开放相关端口
DolphinScheduler 推荐 Chrome 以及使用 Chromium 内核的较新版本浏览器访问前端可视化操作界面
DolphinScheduler提供了4种安装部署方式:
注意:
1、Standalone仅建议20个以下工作流使用,因为其采用内存式的H2 Database, Zookeeper Testing Server,任务过多可能导致不稳定,并且如果重启或者停止standalone-server会导致内存中数据库里的数据清空。 如果您要连接外部数据库,比如mysql或者postgresql。
2、Kubernetes部署先决条件:Helm3.1.0+ ;Kubernetes1.12+;PV 供应(需要基础设施支持)
Standalone 仅适用于 DolphinScheduler 的快速体验.
二进制包:在下载页面下载 DolphinScheduler 二进制包
前置准备工作
JAVA_HOME 环境变量,并将其下的 bin 目录追加到 PATH 环境变量中。如果你的环境中已存在,可以跳过这步。二进制压缩包中有 standalone 启动的脚本,解压后即可快速启动。切换到有sudo权限的用户,运行脚本
# 解压并运行 Standalone Server
[root@qianfeng01 soft]# cd /opt/soft
[root@qianfeng01 soft]# tar -zxvf apache-dolphinscheduler-3.1.4-bin.tar.gz
[root@qianfeng01 soft]# cd ./apache-dolphinscheduler-3.1.4-bin
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh start standalone-server
#查询dolphinscheduler的单机服务
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# jps
18688 Jps
18665 StandaloneServer浏览器访问地址 http://qianfeng01:12345/dolphinscheduler/ui 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123

登录成功如下图所示:

脚本 ./bin/dolphinscheduler-daemon.sh 除了可以快捷启动 standalone 外,还能停止服务运行,全部命令如下
# 启动 Standalone Server 服务
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh start standalone-server
# 停止 Standalone Server 服务
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh stop standalone-serverStandalone server 使用 H2 数据库作为其元数据存储数据,这是为了上手简单,用户在启动服务器之前不需要启动数据库。但是如果用户想将元数据库存储在 MySQL 或 PostgreSQL 等其他数据库中,他们必须更改一些配置。请参考 数据源配置 Standalone 切换元数据库 创建并初始化数据库。
单机版到此为止即可。
集群模式下,可配置多个Master及多个Worker。通常可配置2~3个Master,若干个Worker。由于集群资源有限,此处配置一个Master,三个Worker,集群规划如下。
| 主机名 | ip | 服务 | 备注 |
|---|---|---|---|
| qianfeng01 | 192.168.10.101 | master,worker | |
| qianfeng02 | 192.168.10.102 | master,worker | 该服务器也可以安装master |
| qianfeng03 | 192.168.10.103 | worker |
数据库:本文使用的是MySQL 8.0.26版本,也可以使用5.7版本及以上,或者是使用PostgreSQL数据库(8.2.15+),两者任选其一即可,如 MySQL 则需要 JDBC Driver 8.0.16
╭liyadong at ~
╰$ scp ~/Desktop/mysql-connector-java-8.0.16.jar qianfeng01:/opt/soft创建部署用户,并为该用户配置免登录,以创建dolphinscheduler用户为例(准备执行DS安装程序的服务器上创建即可)
# 创建用户需使用root登录
useradd dolphinscheduler
# 添加密码
echo "dolphinscheduler" | passwd --stdin dolphinscheduler
# 配置sudo(系统管理命令)免密
sed -i '$adolphinscheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL' /etc/sudoers
sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers注意:
- 因为任务执行服务是以
sudo -u {linux-user}切换不同 linux 用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。初学习者不理解的话,完全可以暂时忽略这一点- 如果发现
/etc/sudoers文件中有 "Defaults requirett" 这行,也请注释掉
由于安装的时候需要向不同机器发送资源,所以要求各台机器间能实现SSH免密登陆。配置免密登陆的步骤如下
su dolphinscheduler
ssh-copy-id qianfeng01
ssh-copy-id qianfeng02
ssh-copy-id qianfeng03注意:
配置完成后,可以通过运行命令
ssh localhost判断是否成功,如果不需要输入密码就能ssh登陆则证明成功
启动zookeeper集群(所有安装Zookeper的服务器均执行)
# 启动 zookeeper
./bin/zkServer.sh startDolphinScheduler 元数据存储在关系型数据库中,目前支持 PostgreSQL 和 MySQL。下面分别介绍如何使用 MySQL 初始化数据库。
创建数据库、用户和授权
-- 进入MySQL命令行
[root@qianfeng01 soft]# mysql -uroot -p123456
-- 创建dolphinscheduler的数据库用户和密码,并限定登陆范围
mysql> CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'QF-Dolphinscheduler123!';
-- 创建dolphinscheduler的元数据库,并指定编码
mysql> CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
-- 为dolphinscheduler数据库授权
mysql> grant all privileges on dolphinscheduler.* to 'dolphinscheduler'@'%' ;
-- 刷新权限
mysql> flush privileges;上传DolphinScheduler安装包到qianfeng01节点的/opt/software目录,并解压安装包到该目录。
#上传安装包到服务器
$ scp ~/Desktop/apache-dolphinscheduler-3.1.4-bin.tar.gz qianfeng01:/opt/soft
#解压
[root@qianfeng01 soft]# tar -zxvf ./apache-dolphinscheduler-3.1.4-bin.tar.gz
# 修改目录权限,使得部署用户对二进制包解压后的 apache-dolphinscheduler-*-bin 目录有操作权限
[root@qianfeng01 soft]# chown -R dolphinscheduler:dolphinscheduler apache-dolphinscheduler-3.1.4-bin需要手动下载 对应的mysql-connector-java驱动(8.0.16)并移动到 DolphinScheduler 的每个模块的 libs 目录下,其中包括 api-server/libs 和 alert-server/libs 和 master-server/libs 和 worker-server/libs和<font color='red'>tools/libs</font>
#复制mysql的驱动到对应libs目录中
[root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/alert-server/libs/
[root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/master-server/libs/
[root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/worker-server/libs/
[root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/api-server/libs/
[root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/tools/libs/dolphinscheduler_env.sh配置文件文件 ./bin/env/dolphinscheduler_env.sh 描述了下列配置:
JAVA_HOME 和 SPARK_HOME都是在这里定义的zookeeper如果您不使用某些任务类型,您可以忽略任务外部依赖项,但您必须根据您的环境更改 JAVA_HOME、注册中心和数据库相关配置。
#修改内容如下
[root@qianfeng01 soft]# cd ./apache-dolphinscheduler-3.1.4-bin
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# vim ./bin/env/dolphinscheduler_env.sh
# JAVA_HOME, will use it to start DolphinScheduler server
# 改为自己的JDK路径
export JAVA_HOME=${JAVA_HOME:-/usr/local/jdk1.8.0_321}
# Database related configuration, set database type, username and password
# MySQL数据库连接信息
export DATABASE=${DATABASE:-mysql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://qianfeng01:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true"
export SPRING_DATASOURCE_USERNAME=${SPRING_DATASOURCE_USERNAME:-"dolphinscheduler"}
export SPRING_DATASOURCE_PASSWORD=${SPRING_DATASOURCE_PASSWORD:-"QF-Dolphinscheduler123!"}
# DolphinScheduler server related configuration
# 不用修改
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
# zookeeper集群信息
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-qianfeng01:2181,qianfeng02:2181,qianfeng03:2181}
# Tasks related configurations, need to change the configuration if you use the related tasks.
# 对已有可以正常配置,没有的保持默认即可
export HADOOP_HOME=${HADOOP_HOME:-/usr/local/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/local/hadoop-3.3.1/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/usr/local/spark-3.1.2}
export SPARK_HOME2=${SPARK_HOME2:-/usr/local/spark-3.1.2}
export PYTHON_HOME=${PYTHON_HOME:-/usr/bin/python}
export HIVE_HOME=${HIVE_HOME:-/usr/local/hive-3.1.2}
export FLINK_HOME=${FLINK_HOME:-/usr/local/flink-1.14.3}
export DATAX_HOME=${DATAX_HOME:-/usr/local/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH初始化元数据
# 切换到apache-dolphinscheduler-3.1.4-bin目录下,执行命令
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# sh ./tools/bin/upgrade-schema.sh完成基础环境和元数据库初始化的准备后,需要根据你的机器环境修改配置文件。配置文件可以在目录 bin/env 中找到,他们分别是 install_env.sh 和 dolphinscheduler_env.sh。
install_env.sh 文件文件 install_env.sh 描述了哪些机器将被安装 DolphinScheduler 以及每台机器对应安装哪些服务。您可以在路径 bin/env/install_env.sh 中找到此文件,可通过以下方式更改env变量,export <ENV_NAME>=,配置详情如下。
#修改配置如下
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# vim ./bin/env/install_env.sh
# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# Due to the master, worker, and API server being deployed on a single node, the IP of the server is the machine IP or localhost
ips="qianfeng01,qianfeng02,qianfeng03"
sshPort="22"
# masters可以是一台,也可以是多台,根据自己机器配置决定
masters="qianfeng01,qianfeng02"
workers="qianfeng01:default,qianfeng02:default,qianfeng03:default"
alertServer="qianfeng03"
apiServers="qianfeng01"
# DolphinScheduler installation path, it will auto-create if not exists
# 最终的安装路径
installPath=${installPath:-"/usr/local/dolphinscheduler-3.1.4"}
# Deploy user, use the user you create in section **Configure machine SSH password-free login**
deployUser="dolphinscheduler"
# The root of zookeeper, for now DolphinScheduler default registry server is zookeeper.
zkRoot=${zkRoot:-"/dolphinscheduler"}使用上面创建的部署用户运行以下命令完成部署,部署后的运行日志将存放在 安装目录下的logs 文件夹内
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# sh ./bin/install.sh注意:
第一次部署的话,可能出现 5 次sh: bin/dolphinscheduler-daemon.sh: No such file or directory相关信息,为非重要信息直接忽略即可。
第一次安装后会自动启动所有服务的,如有服务问题或者后续需要启停,命令如下。下面的操作脚本都在dolphinScheduler安装目录bin下。
# 一键停止集群所有服务
sh /usr/local/dolphinscheduler-3.1.4/bin/stop-all.sh
# 一键开启集群所有服务
sh /usr/local/dolphinscheduler-3.1.4/bin/start-all.sh
# 启停 Master
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop master-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start master-server
# 启停 Worker
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start worker-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop worker-server
# 启停 Api
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start api-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop api-server
# 启停 Logger
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start logger-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop logger-server
# 启停 Alert
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start alert-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop alert-server
# 启停 Python Gateway
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start python-gateway-server
sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop python-gateway-server注意:
- 每个服务在路径
<service>/conf/dolphinscheduler_env.sh中都有dolphinscheduler_env.sh文件,这是可以为微 服务需求提供便利。意味着您可以基于不同的环境变量来启动各个服务,只需要在对应服务中配置<service>/conf/dolphinscheduler_env.sh然后通过<service>/bin/start.sh命令启动即可。但是如果您使用命令/bin/dolphinscheduler-daemon.sh start <service>启动服务器,它将会用文件bin/env/dolphinscheduler_env.sh覆盖<service>/conf/dolphinscheduler_env.sh然后启动服务,目的是为了减少用户修改配置的成本.- 服务用途请具体参见《系统架构设计》小节。Python gateway service 默认与 api-server 一起启动,如果您不想启动 Python gateway service 请通过更改 api-server 配置文件
api-server/conf/application.yaml中的python-gateway.enabled : false来禁用它。
浏览器访问地址 http://qianfeng01:12345/dolphinscheduler/ui 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123

登录如下图所示:

去监控中心查看Masters,如下图所示:

去监控中心查看workers,如下图所示:

到此为止,DolphinScheduler集群安装部署就完成了。

点击顶部导航项目管理--->项目创建--->填写如下信息


在项目列表中--->点击项目名称链接--->进入项目首页--->如下图:

项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。这几个指标的说明如下:
租户就是操作系统的实际用户,ds需要租户来真正执行。

租户创建完成。
创建工作流按钮,进入工作流DAG编辑页面,如下图所示:
到画板中,新增一个Shell任务,如下图所示:
添加 Shell 任务的参数设置:
节点名称,描述,脚本字段;运行标志勾选正常,若勾选禁止执行,运行工作流不会执行该任务;任务优先级:当 worker 线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行;超时时长,当任务执行时间超过超时时长,会发送告警邮件并且任务超时失败;test.sh,脚本中调用资源命令为 sh test.sh。==注意==调用需要使用资源的全路径;确定按钮,保存任务设置。
==注:==上图中Node_B和Node_C是根据Node_A创建方式创建即可,后续的不同类型的工作流的创建也不在一步一步创建。
保存按钮,弹出基本信息弹框,如下图所示,输入工作流定义名称,工作流定义描述,设置全局变量(选填,参考全局参数),点击确定按钮,工作流定义创建成功。
其他类型任务,请参考 任务节点类型和参数设置。
执行策略
并行:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。串行等待:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。串行抛弃:如果对于同一个工作流定义,同时有多个工作流实例,则抛弃后生成的工作流实例并杀掉正在跑的实例。串行优先:如果对于同一个工作流定义,同时有多个工作流实例,则按照优先级串行执行工作流实例。虚线,在执行工作流实例的时候会跳过实时任务的执行。
删除图标,删除任务间的依赖关系。
工作流定义列表的操作功能如下:
下线的工作流定义。工作流DAG编辑同创建工作流定义。下线时,上线工作流,只有上线状态的工作流能运行,但不能编辑。上线时,下线工作流,下线状态的工作流可以编辑,但不能运行。
上线按钮,上线工作流。
运行按钮,弹出启动参数设置弹框,如下图所示,设置启动参数,点击弹框中的运行按钮,工作流开始运行,工作流实例页面生成一条工作流实例。
工作流运行参数说明:
继续表示:某一任务失败后,其他任务节点正常执行;结束表示:终止所有正在执行的任务,并终止整个流程。最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。补数:指运行指定日期范围内的工作流定义,根据补数策略生成对应的工作流实例,补数策略包括串行补数、并行补数 2 种模式。
日期可以通过页面选择或者手动输入,日期范围是左关右关区间(startDate <= N <= endDate)
12月 1号到12月20号依次执行,依次在流程实例页面生成两条流程实例。
调度日期手动输入,在弹出的输入框中输入如下即可:
2024-1-09 00:00:00,2022-12-20 00:00:00

注意:
节点执行类型的选择。
定时按钮,弹出定时参数设置弹框,如下图所示:

创建按钮,创建定时成功,此时定时状态为"下线",定时需上线才生效。定时管理按钮,进入定时管理页面,点击上线按钮,定时状态变为上线,如下图所示,工作流定时生效。

工作流定义列表,选中操作中的导出,如下图:

点击导出后,工作流文件将会被下载到电脑本地。
点击项目管理->工作流->工作流定义,进入工作流定义页面,点击导入工作流按钮,导入本地工作流文件,工作流定义列表显示导入的工作流,状态为下线。

到此为止,工作流相关的操作就结束了。
工作流复制

复制完成的结果:

注意
复制不包含上下线的状态。





查看变量,查看工作流实例的全局参数和局部参数;点击左上角启动参数,查看工作流实例的启动参数,如下图所示
点击项目管理->工作流->工作流实例,进入工作流实例页面,如下图所示:

成功/失败/停止 状态的流程。点击编辑按钮或工作流实例名称进入 DAG 编辑页面,编辑后点击保存按钮,弹出保存 DAG 弹框,如下图所示,修改流程定义信息,在弹框中勾选是否更新工作流定义,保存后则将实例修改的信息更新到工作流定义;若不勾选,则不更新工作流定义。
kill worker 进程,再执行 kill -9 操作。批量任务定义允许您在基于任务级别而不是在工作流中操作修改任务。再此之前,我们已经有了工作流级别的任务编辑器,你可以在工作流定义 单击特定的工作流,然后编辑任务的定义。当您想编辑特定的任务定义但不记得它属于哪个工作流时,这是令人沮丧的。所以我们决定在 任务 菜单下添加 任务定义 视图。

在该视图中:
操作 列中的相关按钮来进行编辑、移动、查看版本和删除任务。任务类型 或 工作流程名称 进行查询。项目管理->任务->任务定义->批量任务->创建任务,创建任务试图如下所示:

注意:
上线状态的工作流是不能定义,即你定义也不能保存。
实时任务是在工作流定义中创建Flink_Stream类型的工作流定义,则会在实时任务列表中显示。
修改和执行。



点击顶部导航数据源中心--->创建数据源--->数据源选择MySQL--->填写如下信息--->测试连接--->成功--->确定

注意:
数据源中心支持列出、查看、修改、删除等操作。
点击顶部导航数据源中心--->创建数据源--->数据源选择HIVE/UMPALA--->写如下信息--->测试连接--->成功--->确定

注意:如果您希望在同一个会话中执行多个 HIVE SQL,您可以修改配置文件
common.properties中的配置,设置support.hive.oneSession = true。 这对运行 HIVE SQL 前设置环境变量的场景会很有帮助。参数support.hive.oneSession默认值为false,多条 SQL 将在不同的会话中运行。
Hiveserver2连接错误
通常是在使用hiveserver2或者dolphinscheduler连接hive(hiveserver2服务)的时候会报错如下:
2023-04-09 00:05:47,328 WARN [main] jdbc.HiveConnection (HiveConnection.java:<init>(237)) - Failed to connect to qianfeng01:10000
Error: Could not open client transport with JDBC Uri: jdbc:hive2://qianfeng01:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: root is not allowed to impersonate root (state=08S01,code=0)解决办法:
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>注意:这里登陆的是root用户。若登录的是hadoop用户,则配置文件中的root需要修改为hadoop
当上述文件配置不正确时会可能会引起beeline连接Hive时报如下异常
User: hadoop is not allowed to impersonate root), serverProtocolVersion:null) (state=08S01,code=0)
先部署安装使用的是hadoop用户,故而需要修改core-site.xml配置文件
点击顶部导航数据源中心--->创建数据源--->数据源选择SPARK--->写如下信息--->测试连接--->成功--->确定

注意:如果开启了kerberos,则需要填写 Principal
资源中心通常用于上传文件、UDF 函数和任务组管理。那资源上传到那儿呢?具体如下:
集群 模式或者 伪集群 模式部署DolphinScheduler,您需要对以下路径的文件进行配置:==api-server/conf/common.properties 和 worker-server/conf/common.properties==;若您以 单机 模式部署DolphinScheduler,您只需要配置 standalone-server/conf/common.properties,具体配置如下:
resource.storage.upload.base.path 改为本地存储路径,请确保部署 DolphinScheduler 的用户拥有读写权限,例如:resource.storage.upload.base.path=/tmp/dolphinscheduler。当路径不存在时会自动创建文件夹resource.storage.type=HDFS 和 resource.hdfs.fs.defaultFS=file:///。注意:如果您不想用默认值作为资源中心的基础路径,请修改
resource.storage.upload.base.path的值。
当需要使用资源中心进行相关文件的创建或者上传操作时,所有的文件和资源都会被存储在分布式文件系统HDFS或者远端的对象存储,如S3上。所以需要进行以下配置:
vim api-server/conf/common.properties和worker-server/conf/common.properties文件均修改如下:
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/opt/install/dolphinscheduler-3.1.4/data
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: HDFS, S3, OSS, NONE
resource.storage.type=HDFS
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://qianfeng01:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty
yarn.resourcemanager.ha.rm.ids=
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
yarn.application.status.address=http://qianfeng01:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://qianfeng01:19888/ws/v1/history/mapreduce/jobs/%s将修改的文件分发到其它服务器
#将./api-server/conf/common.properties分发到02和03服务器
[root@qianfeng01 dolphinscheduler-3.1.4]# scp ./api-server/conf/common.properties qianfeng02:/usr/local/dolphinscheduler-3.1.4/api-server/conf/
[root@qianfeng01 dolphinscheduler-3.1.4]# scp ./api-server/conf/common.properties qianfeng03:/usr/local/dolphinscheduler-3.1.4/api-server/conf/
#将./worker-server/conf/common.properties分发到02和03服务器
[root@qianfeng01 dolphinscheduler-3.1.4]# scp ./worker-server/conf/common.properties qianfeng02:/usr/local/dolphinscheduler-3.1.4/worker-server/conf/
[root@qianfeng01 dolphinscheduler-3.1.4]# scp ./worker-server/conf/common.properties qianfeng03:/usr/local/dolphinscheduler-3.1.4/worker-server/conf/如Hadoop为HA,则需将Hadoop的core-site.xml和hdfs-site.xml拷贝到worker-server/conf 以及 api-server/conf中
#本地拷贝到api-server/conf/
[root@bj-zjk-001 dolphinscheduler-3.1.4]# cp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml /opt/install/dolphinscheduler-3.1.4/api-server/conf/
#远程分发到02和03服务器的api-server/conf/
[root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-002:/opt/install/dolphinscheduler-3.1.4/api-server/conf/
[root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-003:/opt/install/dolphinscheduler-3.1.4/api-server/conf/
#拷贝到worker-server/conf/,一定要有该步骤,否则,将资源中心的资源应用到工作流就会报空指针错误
[root@bj-zjk-001 dolphinscheduler-3.1.4]# cp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml /opt/install/dolphinscheduler-3.1.4/worker-server/conf/
#远程分发到02和03服务器的worker-server/conf/
[root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-002:/opt/install/dolphinscheduler-3.1.4/worker-server/conf/
[root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-003:/opt/install/dolphinscheduler-3.1.4/worker-server/conf/创建对应目录
#在HDFS上创建目录
[root@bj-zjk-001 dolphinscheduler-3.1.4]# hdfs dfs -mkdir /dolphinscheduler
#本地每一台服务器创建目录
[root@bj-zjk-001 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data
[root@bj-zjk-002 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data
[root@bj-zjk-003 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data重新启动DolphinScheduler服务
#停止ds服务
[root@bj-zjk-001 dolphinscheduler-3.1.4]# sh /opt/install/dolphinscheduler-3.1.4/bin/stop-all.sh
#启动ds服务
[root@bj-zjk-001 dolphinscheduler-3.1.4]# sh /opt/install/dolphinscheduler-3.1.4/bin/start-all.sh注意:
- 如果只配置了
api-server/conf/common.properties的文件,则只是开启了资源上传的操作,并不能满足正常使用。如果想要在工作流中执行相关文件则需要额外配置worker-server/conf/common.properties。- 如果用到资源上传的功能,那么安装部署中,部署用户需要有这部分的操作权限。
- 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的
core-site.xml和hdfs-site.xml复制到worker-server/conf以及api-server/conf,非 NameNode HA 跳过次步骤。
当在调度过程中需要使用到第三方的 jar 或者用户需要自定义脚本的情况,可以通过在该页面完成相关操作。可创建的文件类型包括:txt/log/sh/conf/py/java 等。并且可以对文件进行编辑、重命名、下载和删除等操作。
注意:
- 当您以
admin身份等入并操作文件时,需要先给admin设置租户
基础操作

文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties。选择创建文件->如下图所示:


注意:
- 上传、创建、重命名文件时,文件名和源文件名(上传时)均不能带有
.以及/特殊符号。- 上述案例中,关于删除、查看、重命名文件都不再掩饰。
该案例主要通过上传一个简单的 shell 脚本,来演示如何在工作流定义中使用资源中心的文件。像 MR、Spark 等任务需要用到 jar 包或者py文件,也是同理。
在项目管理的工作流定义模块,创建一个新的工作流,使用 Shell 任务。

sh rs_test.shrs_test.sh
注意:
- 脚本中选择资源文件时文件名称需要保持和所选择资源全路径一致: 例如:资源路径为
/resource/hello.sh则脚本中调用需要使用/resource/hello.sh全路径
DolphinScheduler任务插件有一些公共参数,我们将这些公共参数列在文档中供您查阅。每种任务都有如下的所有或者部分默认参数:
| 任务参数 | 描述 |
|---|---|
| 任务名称 | 任务的名称,同一个工作流定义中的节点名称不能重复。 |
| 运行标志 | 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。 |
| 描述 | 当前节点的功能描述。 |
| 任务优先级 | worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。 |
| Worker分组 | 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。 |
| 任务组名称 | 任务资源组,未配置则不生效。 |
| 组内优先级 | 一个任务组内此任务的优先级。 |
| 环境名称 | 配置任务执行的环境。 |
| 失败重试次数 | 任务失败重新提交的次数,可以在下拉菜单中选择或者手动填充。 |
| 失败重试间隔 | 任务失败重新提交任务的时间间隔,可以在下拉菜单中选择或者手动填充。 |
| CPU 配额 | 为执行的任务分配指定的CPU时间配额,单位为百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。 task.resource.limit.state |
| 最大内存 | 为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。该功能由 task.resource.limit.state 控制。 |
| 超时告警 | 设置超时告警、超时失败。当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。该功能由 task.resource.limit.state 控制。 |
| 资源 | 任务执行时所需资源文件 |
| 前置任务 | 设置当前任务的前置(上游)任务。 |
| 延时执行时间 | 任务延迟执行的时间,以分为单位 |
Shell 任务类型,用于创建 Shell 类型的任务并执行一系列的 Shell 脚本。worker 执行该任务的时候,会生成一个临时 shell 脚本,并使用与租户同名的 linux 用户执行这个脚本。
到画板中,即可完成创建。该样例模拟了常见的简单任务,这些任务只需要简单的一两行命令就能运行起来。我们以打印一行日志为例,该任务仅会在日志文件中打印一行 "This is a demo of shell task"

该样例模拟了自定义参数任务,为了更方便的复用已有的任务,或者面对动态的需求时,我们会使用变量保证脚本的复用性。本例中,我们先在自定义脚本 中定义了参数 "param_key",并将他的值设置为 "param_val"。接着在"脚本"中声明了 echo 命令,将参数 "param_key" 打印了出来。当我们保存 并运行任务后,在日志中会看到将参数 "param_key" 对应的值 "param_val" 打印出来。

上线任务,运行后如下:

注意事项
Shell 任务类型通过解析任务日志是否包含
application_xxx_xxx的内容来判断是否 Yarn 任务,如果是则会将相应的application_id的状态作为当前 Shell 节点的运行状态判断,此时如果操作停止工作流则会 Kill 相应的application_id如果 Shell 任务中需要使用到用户自定义的脚本,可通过资源中心来上传对应的文件然后在 Shell 任务中引用他们
SQL任务类型,用于连接数据库并执行相应SQL。
需要提前创建,请参考数据源中心)SQL类型:支持查询和非查询两种。
DML select 类型的命令,是有结果集返回的,可以指定邮件通知为表格、附件或表格附件三种模板;非查询:支持DDL全部命令 和DML update、delete、insert三种类型的命令;
1.当数据源选择Hive数据源时,不需要填写此参数。因为Hive数据源本身支持一次执行多段SQL语句;
2.当数据源选择MySQL数据源时,并且要执行多段SQL语句时,需要填写此参数为分号 ;。因为MySQL数据源不支持一次执行多段SQL语句;

MySQL的操作就完成了。
该样例向Hive中创建临时表tmp_user并写入一行数据。选择SQL类型为非查询,在创建临时表之前需要确保该表不存在,所以我们使用自定义参数,在每次运行时获取当天时间作为表名后缀,这样这个任务就可以每天运行。创建的表名格式为:tmp_user_{yyyyMMdd}。


上线运行测试,成功如下图所示。

注意:
- biz_date是自定义参数名称;
- ${system.biz.curdate}是系统内置的当前日期;
- ${biz_date}是获取自定义参数biz_date的值。
- 注意SQL类型的选择,如果是INSERT等操作需要选择非查询类型。
- 为了兼容长会话情况,UDF函数的创建是通过CREATE OR REPLACE语句
查询Hive中结果,登录集群使用hive命令或使用beeline、JDBC、Hue等方式连接apache hive进行查询,查询SQL为select * from tmp_hello_world_{yyyyMMdd},请将{yyyyMMdd}替换为运行当天的日期,查询结果如下图:

同理,去操作MySQL的SQL语句即可。
使用Hive Cli任务插件创建Hive Cli类型的任务执行SQL脚本语句或者SQL任务文件。 执行任务的worker会通过hive -e命令执行hive SQL脚本语句或者通过hive -f命令执行资源中心中的Hive SQL文件。
在DolphinScheduler中,我们有Hive CLI任务插件和使用Hive数据源的SQL插件提供用户在不同场景下使用,您可以根据需要进行选择。
Hive CLI任务插件直接连接HDFS和Hive Metastore来执行hive类型的任务,所以需要能够访问到对应的服务。 执行任务的worker节点需要有相应的Hive jar包以及Hive和HDFS的配置文件。 但是在生产调度中,Hive CLI任务插件能够提供更可靠的稳定性。使用Hive数据源的SQL插件不需要您在worker节点上有相应的Hive jar包以及Hive和HDFS的配置文件,而且支持 Kerberos认证。 但是在生产调度中,若调度压力很大,使用这种方式可能会遇到HiveServer2服务过载失败等问题。Hive小蜜蜂Logo 到画板中,即可完成创建。| 任务参数 | 描述 |
|---|---|
| Hive Cli 任务类型 | Hive Cli任务执行方式,可以选择FROM_SCRIPT或者FROM_FILE。 |
| Hive SQL 脚本 | 手动填入Hive SQL脚本语句。 |
| Hive Cli 选项 | Hive Cli的其他选项,如--verbose。 |
| 资源 | 如果您选择FROM_FILE作为Hive Cli任务类型,您需要在资源中选择Hive SQL文件。 |
因为我们Hive通常不是每一台服务器都部署得有,所以将装有Hive的服务器放到一个worker分组中,在运行工作流的时候,就会将任务跑到指定worker组的服务器上,就不会导致hive命令或者相关依赖找不到的问题。

worker分组就创建成功了。
Hive CLI任务节点执行Hive SQL脚本语句:
上线并运行:

测试结果如下:

Hive CLI任务节点从资源中心的Hive SQL去资源中心创建文件:

创建hive cli的from_file类型的任务:

上线并运行:

运行状态如下图:

具体每个任务的日志,自己看看即可。
Python 任务类型,用于创建 Python 类型的任务并执行一系列的 Python 脚本。worker 执行该任务的时候,会生成一个临时Python脚本, 并使用与租户同名的 Linux 用户执行这个脚本。
| 任务参数 | 描述 |
|---|---|
| 脚本 | 用户开发的PYTHON程序 |
| 自定义参数 | 是PYTHON局部的用户自定义参数,会替换脚本中以${变量}的内容 |
该样例模拟了自定义参数任务,为了更方便的复用已有的任务,或者面对动态的需求时,我们会使用变量保证脚本的复用性。本例中,我们先在自定义脚本 中定义了参数 "param_key",并将他的值设置为 "param_val"。接着在"脚本"中使用了 print 函数,将参数 "param_key" 打印了出来。当我们保存 并运行任务后,在日志中会看到将参数 "param_key" 对应的值 "param_val" 打印出来。
print("this is a deamo of python task")
print("${param_key}")
将上述任务保存,然后上线,执行如下图:

Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,Worker 支持两个不同类型的 Spark 命令提交任务:
spark submit 方式提交任务。spark sql 方式提交任务。默认任务参数一栏。--jar、--files、--archives、--conf 格式。本案例为创建一个视图表 terms 并写入三行数据和一个格式为 parquet 的表 wc 并判断该表是否存在。程序类型为 SQL。将视图表 terms 的数据插入到格式为 parquet 的表 wc。

SQL代码:
create or replace temporary view terms
as
select * from values("spark hadoop hadoop flink flink ds hadoop")
as tab(content);
create table if not exists wc(word string,cnt int)
using parquet options(path="/tmp/wc.parquet");
insert overwrite table wc
select term,count(term) from (
select explode(split(content,"")) as term
from terms
)
group by term
;
select * from wc;运行的时候,确保是否配置了Spark的环境变量;
DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker 会通过执行 ${DATAX_HOME}/bin/datax.py 来解析传入的 json 文件。
| 任务参数 | 描述 |
| -------------- | ------------------------------------------------------------ |
| json | DataX 同步的 json 配置文件 |
| 资源 | 在使用自定义json中如果集群开启了kerberos认证后,datax读取或者写入hdfs、hbase等插件时需要使用相关的keytab,xml文件等,则可使用改选项。资源中心-文件管理上传或创建的文件 |
| 自定义参数 | sql 任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换 sql 语句中 ${变量} |
| 数据源 | 选择抽取数据的数据源 |
| sql 语句 | 目标库抽取数据的 sql 语句,节点执行时自动解析 sql 查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换 |
| 目标库 | 选择数据同步的目标库 |
| 目标库前置 | 前置 sql 在 sql 语句之前执行(目标库执行) |
| 目标库后置 | 后置 sql 在 sql 语句之后执行(目标库执行) |
| 限流(字节数) | 限制查询的字节数 |
| 限流(记录数) | 限制查询的记录数 |
该样例演示为从 MySQL数据导入到 HDFS 中。
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:/opt/install/dolphinscheduler-3.1.4/bin/env/dolphinscheduler_env.sh。
vim /opt/install/dolphinscheduler-3.1.4/bin/env/dolphinscheduler_env.sh
export HADOOP_HOME=${HADOOP_HOME:-/usr/local/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/local/hadoop-3.3.1/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/usr/local/spark-3.1.2}
export SPARK_HOME2=${SPARK_HOME2:-/usr/local/spark-3.1.2}
export PYTHON_HOME=${PYTHON_HOME:-/usr/bin/python}
export HIVE_HOME=${HIVE_HOME:-/usr/local/hive-3.1.2}
export FLINK_HOME=${FLINK_HOME:-/usr/local/flink-1.14.3}
export DATAX_HOME=${DATAX_HOME:-/usr/local/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH当环境配置完成之后,需要重启 DolphinScheduler。
配置从MySQL中将数据抽取到Hive表(对应HDFS目录)中,具体DataX文件如下:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": ["select * from sexinfo where sid > 0;"],
"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/test"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://qianfeng01:9000",
"path": "/user/hive/warehouse/sexinfo",
"fileName": "sexdata",
"column": [
{"name": "sid", "type": "int"},
{"name": "stag", "type": "string"}
],
"fileType": "text",
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}具体DS中DataX的任务核心配置如下:

注意:
如果我们将
hdfs-site.xml、core-site.xml和hive-site.xml三个配置文件放到hdfswriter-0.0.1-SNAPSHOT.jar中,然后替换服务器该jar包即可,则DataX任务配置中如下这一段json可以去掉:"hadoopConfig":{ "dfs.nameservices": "qianfengns", "dfs.ha.namenodes.qianfeng": "namenode85,namenode111", "dfs.namenode.rpc-address.qianfengns.namenode85": "bj-zjk-001:8020", "dfs.namenode.rpc-address.qianfengns.namenode111": "bj-zjk-002:8020", "dfs.client.failover.proxy.provider.qianfengns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }


将DataX任务调度,其运行成功状态如下图所示:


注意:
- DolphinScheudler整合Hadoop、DataX等,服务所占用内存大概需要8G左右,如果单机内存少于8G,估计DataX类型的任务(该任务也耗内存)不能执行。
- 我的qianfeng01虚拟机的内存是12G。可以参考
子流程节点,就是把外部的某个工作流定义当做一个节点去执行。
任务节点到画板中。| 任务参数 | 描述 |
|---|---|
| 子节点 | 是选择子流程的工作流定义,右上角进入该子节点可以跳转到所选子流程的工作流定义 |
该样例模拟了常见的任务类型,这里我们使用子结点任务调用 Shell 打印出 ”hello world“。即将一个 shell 任务当作子结点来执行。
创建一个 shell 任务,用于打印 “hello”。该工作流定义为 test_dag01。

在使用 sub_process 的过程中,需要创建所需的子结点任务,也就是我们第一步所创建的 shell 任务。然后如下图所示,在 ⑤ 的位置选择对应的子结点即可。

创建 sub_process 完成之后,再创建一个对应的 shell 任务,用于打印 “world”,并将二者连接起来。

保存当前工作流,并上线运行当前工作流

查看运行状态:

注意事项
在使用 sub_process 调用子结点任务的时候,需要保证定义的子结点为上线状态,否则 sub_process 的工作流无法正常运行。
Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。
Dependent 节点提供了逻辑判断功能,可以按照逻辑来检测所依赖节点的执行情况。
例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在当天都执行成功。具体流程:
本步骤不再操作,我这儿创建的shell任务,并且需要提前创建好工作流。

Test_dagC:


注意
点击一下上诉的==且==边上的白色按钮就可以变成==或==关系。

更多的依赖配置可参考官网。
Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。
任务节点到画板中。| 任务参数 | 描述 |
| ------------ | ------------------------------------------------------------ |
| 下游任务选择 | 根据前置任务的状态来跳转到对应的分支:成功分支 - 当上游运行成功时,运行成功选择的分支;失败分支 - 当上游运行失败时,运行失败选择的分支 |
| 上游条件选择 | 可以为 Conditions 任务选择一个或多个上游任务:增加上游依赖 - 通过选择第一个参数选择对应的任务名称,通过第二个参数选择触发的 Conditions 任务的状态;上游任务关系选择 - 当有多个上游任务时,可以通过且以及或操作符实现任务的复杂关系。 |
switch:Condition节点主要依据上游节点的执行状态(成功、失败)执行对应分支。Switch节点主要依据全局变量的值和用户所编写的表达式判断结果执行对应分支
该样例通过使用 Shell 任务来演示 Condition 任务的操作流程。
进入工作流定义页面,然后分别创建如下任务节点:

condition_test任务具体配置如下:

注意
分支流转的选择,必须要先有下游依赖任务节点,并且要和下游连好线之后才能选择分支流。
当完成创建工作流之后,可以上线运行该工作流。在工作流实例页面可以查看到各个任务的执行状态。如下图所示:

注意:
- Conditions 任务==支持多个上游任务,但只支持两个下游任务==。
- Conditions 任务以及包含该任务的工作流不支持复制操作。
- Conditions 的前置任务不能连接其分支节点,会造成逻辑混乱,不符合 DAG 调度。如下图所示的情况是错误的。
Switch 是一个条件判断节点,依据全局变量的值和用户所编写的表达式判断结果执行对应分支。 注意使用 javax.script.ScriptEngine.eval 执行表达式。
点击项目管理 -> 项目名称 -> 工作流定义,点击"创建工作流"按钮,进入 DAG 编辑页面。 拖动工具栏中的
任务节点到画板中即能完成任务创建。
注意
switch 任务创建后,要先配置上下游,才能配置任务分支的参数。
默认参数说明请参考DolphinScheduler任务参数附录。
| 任务参数 | 描述 |
|---|---|
| 条件 | 可以为 switch 任务配置多个条件,当条件满足时,就会执行指定的分支,可以配置多个不同的条件来满足不同的业务,使用字符串判断时需要使用"" |
| 分支流转 | 默认的流转内容,当条件中的内容为全部不符合要求时,则运行分支流转中指定的分支 |
这里使用一个 switch 任务以及三个 shell 任务来演示。
新建 switch 任务,以及下游的三个 shell 任务。shell 任务没有要求。 switch 任务需要和下游任务连线配置关系后,才可以进行下游任务的选择。

配置条件和默认分支,满足条件会走指定分支,都不满足则走默认分支。 图中如果变量的值为 "A" 则执行分支 taskA,如果变量的值为 "B" 则执行分支 taskB ,都不满足则执行 default。

条件使用了全局变量,请参考全局变量。 这里配置全局变量的值为 A。

如果执行正确,那么 taskA 会被正确执行。
上线并执行,查看是否符合预期。可以看到符合预期,执行了指定的下游任务 taskA。

内置参数,故名思义,是DolphinScheduler系统自带的参数,几乎都是时间相关的参数。分为基础内置和衍生内置参数两类。
| 变量名 | 声明方式 | 含义 |
| ------------------ | --------------------- | ------------------------------------------------- |
| system.biz.date | ${system.biz.date} | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd |
| system.biz.curdate | ${system.biz.curdate} | 日常调度实例定时的定时时间,格式为 yyyyMMdd |
| system.datetime | ${system.datetime} | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss |
衍生内置参数
也可以通过以下两种方式:
使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
直接加减数字 在自定义格式后直接“+/-”数字
全局参数是指针对整个工作流的所有任务节点都有效的参数,在工作流定义页面配置。
具体的使用方式可结合实际的生产情况而定,这里演示为使用 Shell 任务打印出前一天的日期。
创建一个 Shell 任务,并在脚本内容中输入 echo ${dt}。此时 dt 则为我们需要声明的全局参数。如下图所示:

全局参数配置方式如下:在工作流定义页面,点击“设置全局”右边的加号,填写对应的变量名称和对应的值,保存即可。
如下图所示:

注意:
- 这里定义的 dt 参数可以被其它任一节点的局部参数引用。
- 传递参数时,使用$[......]符号
- 全局变量的值,出了时间,还可以设置别的类型值。
进入任务实例页面,可以通过查看日志,验证任务的执行结果,判断参数是否有效。

在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。
本地参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,填写对应的变量名称和对应的值,保存即可。
如果要在任务中使用配置参数并在下游任务中使用它们:
setValue 和自定义参数 export 本地参数setValue 和自定义参数 export 本地参数setValue 和 Bash 环境变量参数 export 本地参数任务案例
本样例展示了如何使用本地参数,打印输出当前日期。创建一个 Shell 任务,并编写脚本内容为 echo ${dt}。点击配置栏中的自定义参数,配置如下图所示:

参数说明:
保存工作流并运行,查看 Shell 任务输出日志。

注意:
本地参数可以在当前任务节点的工作流中,设置其为 OUT 则可以传递给下游的工作流使用,可以参考:参数传递
如果你想简单 export 参数然后在下游任务中使用它们,你可以在你的任务中使用 setValue,你可以将参数统一在一个任务中管理。在 Shell 任务中使用语法 echo '${setValue(set_val=123)}'(不要忘记单引号) 并添加新的 OUT 自定义参数来 export 它。

你可以在下游任务中使用语法 echo '${set_val}' 在获取设定的值。
如果你想用自定义参数而不是常量值来实现参数 export,并下游任务中使用它们,你可以在通过 setValue 和 自定义参数实现,当你想改变参数的值时可以直接改变 “自定义参数”模块中的值,这让程序更加容易维护。您可以在 Shell 任务中使用语法 echo '#{setValue(set_val_param=${param})}'(如果你想要将任何 变量赋值给 setValue 请不要忘记使用单引号)并添加新的 IN 自定义参数用于输入变量 val 和 OUT 自定义参数用于 export 参数 set_val_param。

你可以在下游任务中使用语法 echo '${set_val_param}' 在获取设定的值。
如果你想用 bash 变量而不是常量值 export 参数,并在下游任务中使用它们,你可以在通过 setValue 和 Bash 变量实现,它更灵活,例如你动态获取现有的本地 或 HTTP 资源获取设定变量。 您可以使用类似的语法
lines_num=$(wget https://raw.githubusercontent.com/apache/dolphinscheduler/dev/README.md -q -O - | wc -l | xargs)
echo "#{setValue(set_val_var=${lines_num})}"在 Shell 任务中(**如果你想要将任何变量赋值给 `setValue`** 请不要忘记使用双引号)和 `OUT` 自定义参数用于 export 参数 `set_val_var`。

你可以在下游任务中使用语法 `echo '${set_val_var}'` 在获取设定的值。
DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、上下游参数传递。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题。
本地任务引用全局参数的前提是,你已经定义了全局参数,使用方式和本地参数中的使用方式类似,但是参数的值需要配置成全局参数中的 key。
DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支持上游单向传递给下游。目前支持这个特性的任务类型有:
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能。
注意:
若节点之间没有依赖关系,则局部参数无法通过上游传递。
展示了如何使用参数传递的功能,通过 SHELL 任务来创建本地参数并赋值传递给下游,SQL 任务通过获得上游任务的参数完成查询操作。
创建Shell任务,并设置参数
用户需要传递参数,在定义 SHELL 脚本时,需要输出格式为 ${setValue(key=value)} 的语句,key 为对应参数的 prop,value 为该参数的值。

参数说明:
'${setValue(output=1)}' 赋值,并传递给下游参数SHELL 节点定义时当日志检测到 ${setValue(output=1)} 的格式时,会将 1 赋值给 output,下游节点便可以直接使用变量 output 的值。同样,您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。

创建SQL并使用参数
完成上述的 SHELL 任务之后,我们可以使用上游所传递的 output 作为 SQL 的查询对象。其中将所查询的 id 重命名为 ID,作为参数输出。

注意:
- 如果 SQL 节点的结果只有一行,一个或多个字段,参数的名字需要和字段名称一致。数据类型可选择为除 LIST 以外的其他类型。变量会选择 SQL 查询结果中的列名中与该变量名称相同的列对应的值。
- 如果 SQL 节点的结果为多行,一个或多个字段,参数的名字需要和字段名称一致。数据类型选择为 LIST。获取到 SQL 查询结果后会将对应列转化为 LIST,并将该结果转化为 JSON 后作为对应变量的值。
DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:
因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。DolphinScheduler 参数的优先级从高到低为:本地参数 > 上游任务传递的参数 > 全局参数
在上游任务传递的参数中,由于上游可能存在多个任务向下游传递参数,当上游传递的参数名称相同时:
下面案例向你展示如何使用任务参数传递的优先级问题
节点 【useParam】可以使用到节点【createParam】中设置的变量。而节点 【useParam】与节点【noUseParam】中并没有依赖关系,所以并不会获取到节点【noUseParam】的变量。下图中只是以 shell 节点作为例子,其他类型节点具有相同的使用规则。

NoUseParam配置:

CreateParam配置:

UseParam配置:

保存工作流:

上线运行结果依次如下:



到此为止,参数优先级案例就完成了,可以适当修改提交上线,观察结果。
告警就是相关任务或者工作流成功、失败等发出告警,具体支持的告警为:
登陆地址:https://mail.163.com/,输入用户名密码,登陆后进入邮箱主界面



点击继续开启,我使用扫描二维码,然后会弹出发送短信(内容和发送号码都已经自动生成好),点击发送短信即可,发完两三秒后,就可以点击下图的2步骤(我已发送):

成功验证,即识别到你发送的短信后,会弹出如下图,包含授权码,相当于密码:

如果需要使用Email进行告警,请在告警实例管理中创建告警实例,并选择Email插件。 下面显示了 Email 配置示例:


到此为止,邮件的告警实例配置完成。


点击群机器人后,如下图:

点击添加机器人:

第一步:添加自定义机器人进群

第二步:配置webhook(每个机器人都有一个webhook)

点击完成,然后弹窗中点击我知道了,到此则设置完成了飞书的群机器人。

到此为止,feishu告警实例已经创建成功。
-自己建立


配置机器人:

复制webhook即可:

创建dingTalk实例:

至于其他的告警实例,大家可以根据自己企业的需要,选择用对应的插件,创建对应的告警实例即可。
告警组可以包含多个告警实例,这样就可以将任务或者工作流运行的信息发送给对应的告警组。




到此为止,告警组都创建成功。
注意:
Default admin warning group为默认的告警组,不能删除。
将以前的任何一个作业提交上线,然后选择刚才配置的告警组即可,具体如下图所示:

告警的核心应用配置是第4、5步骤,告警结果如下:

工作流上线运行并配置钉钉告警组:

查看钉钉群聊和飞书群聊中,是否有任务执行信息:

到此为止,我们的告警组测试就完成了。
服务管理主要是对系统中的各个服务的健康状况和基本信息的监控和显示。
主要是 master 的相关信息。

主要是 worker 的相关信息。相关图片不用再截图。
主要是 DB 的健康状况。

主要是相关命令数指标统计。

审计日志的记录提供了有关谁访问了系统,以及他或她在给定时间段内执行了哪些操作的信息,他对于维护安全都很有用。

队列参数时使用的。YARN调度器的资源队列,队列概念只对跑在YARN上的任务类型有效。此处创建出的队列,可供后续任务进行选择。此处可不创建队列。
注意:
- 在DolphinScheduler中创建队列,并不会影响到YARN调度器的队列配置。
- 该处创建的队列暂时不支持删除操作。
worker.properties 配置文件中参数 worker.tenant.auto.create=true 实现当 Linux 用户不存在时自动创建该用户。worker.tenant.auto.create=true 参数会要求 worker 可以免密运行 sudo 命令。
租户可以进行修改和删除操作。
用户分为管理员用户和普通用户

用户管理可以进行授权、编辑和删除用户等信息。
注意:
- 如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下。
- 默认admin就是管理员,不能删除。

选择授权项目:

资源、数据源、UDF 函数和命名空间等授权同项目授权类似。

调用示例:
/**
* test token
*/
public void doPOSTParam()throws Exception{
// create HttpClient
CloseableHttpClient httpclient = HttpClients.createDefault();
// create http post request
HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create");
httpPost.setHeader("token", "5ba791ca49155158e540b01313b181ec");
// set parameters
List<NameValuePair> parameters = new ArrayList<NameValuePair>();
parameters.add(new BasicNameValuePair("projectName", "qzw"));
parameters.add(new BasicNameValuePair("desc", "qzw"));
UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
httpPost.setEntity(formEntity);
CloseableHttpResponse response = null;
try {
// execute
response = httpclient.execute(httpPost);
// response status code 200
if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
}
} finally {
if (response != null) {
response.close();
}
httpclient.close();
}
}
注意:
普通用户自己对这个token是没有编辑、更新、删除等权限,只有查看和应用权限。
每个 worker 节点都会归属于自己的 worker 分组,默认分组为 default。
在任务执行时,可以将任务分配给指定 worker 分组,最终由该组中的 worker 节点执行该任务。

直接点击worker分组后的删除按钮即可,如下图所示:

注意
- default是默认分组,包含所有的worker,可以通过配置进行更新。
- default分组不支持删除。
- 关于相同的Worker被多个分组共享时,他们的资源应用情况也会被进行共享。
创建/更新 环境
export HADOOP_HOME=${HADOOP_HOME:-/usr/local/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/local/hadoop-3.3.1/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/usr/local/spark-3.1.2}
export SPARK_HOME2=${SPARK_HOME2:-/usr/local/spark-3.1.2}
export PYTHON_HOME=${PYTHON_HOME:-/usr/bin/python}
export HIVE_HOME=${HIVE_HOME:-/usr/local/hive-3.1.2}
export FLINK_HOME=${FLINK_HOME:-/usr/local/flink-1.14.3}
export DATAX_HOME=${DATAX_HOME:-/usr/local/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH创建如下图所示:

在工作流定义中创建任务节点选择 worker 分组和 worker 分组对应的环境,任务执行时 worker 会先执行环境在执行任务。

注意:
当无法在任务定义或工作流运行对话框中使用你想要使用的环境时,请检查您已经选择worker,并且您要使用的环境已经关联到您选择的worker中。
每个工作流可以绑定零到若干个集群用来支持多集群,目前先用于k8s。

创建和授权后,k8s命名空间和工作流会增加关联集群的功能。每一个集群会有独立的工作流和任务实例独立运行。
先把k8s集群连接的配置录入 database 的表 t_ds_k8s给批次使用后续移除,namespace的创建现在通过下拉选择集群。

创建和授权后,在相关k8s任务选择命名空间时下拉可选,如果k8s集群名字是ds_null_k8s是测试模式,不会真正操作集群。
数据质量任务是用于检查数据在集成、处理过程中的数据准确性。本版本的数据质量任务包括单表检查、单表自定义SQL检查、多表准确性以及两表值比对。数据质量任务的运行环境为Spark2.4.0,其他版本尚未进行过验证,用户可自行验证。
TaskParam中 运行任务时,Master会解析TaskParam,封装DataQualityTask所需要的参数下发至Worker。 Worker运行数据质量任务,数据质量任务在运行结束之后将统计结果写入到指定的存储引擎中,当前数据质量任务结果存储在dolphinscheduler的t_ds_dq_execute_result表中 Worker发送任务结果给Master。Master收到Worker的TaskResponse之后会判断任务类型是否为DataQualityTask,如果是的话会根据taskInstanceId从t_ds_dq_execute_result中读取相应的结果,然后根据用户配置好的检查方式,操作符和阈值进行结果判断,如果结果为失败的话,会根据用户配置好的的失败策略进行相应的操作,告警或者中断。编辑vim ./worker-server/conf/common.properties配置文件,修改如下:
data-quality.jar.name=dolphinscheduler-data-quality-3.1.4.jar编辑vim ./master-server/conf/common.properties配置文件,修改如下:
resource.storage.type=HDFS
resource.hdfs.root.user=root
resource.hdfs.fs.defaultFS=hdfs://qianfeng01:9000注意事项
添加配置信息:
<server-name>/conf/common.propertiesdata-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar- 这里的
data-quality.jar.name请根据实际打包的名称来填写。- 如果单独打包
data-quality的话,记得修改包名和data-quality.jar.name一致。- 如果是老版本升级使用,运行之前需要先执行
SQL更新脚本进行数据库初始化。- 当前
dolphinscheduler-data-quality-dev-SNAPSHOT.jar是瘦包,不包含任何JDBC驱动。 如果有JDBC驱动需要,可以在节点设置选项参数处设置--jars参数, 如:--jars /lib/jars/mysql-connector-java-8.0.16.jar。- 当前只测试了
MySQL、PostgreSQL和HIVE数据源,其他数据源暂时未测试过。Spark需要配置好读取Hive元数据,Spark不是采用JDBC的方式读取Hive。
校验方式:
期望值类型
例子
假设实际值为10,操作符为 >, 期望值为9,那么结果 10 -9 > 0 为真,那就意味列为空的行数据已经超过阈值,任务被判定为失败。
表行数校验的目标是检查表的行数是否达到预期的值,如果行数未达标,则会判断任务为失败
任务参数介绍


--conf spark.yarn.maxAppAttempts=1 \
--jars /usr/local/dolphinscheduler-3.1.4/worker-server/libs/mysql-connector-java-8.0.16.jar上面任务创建好后,保存工作流即可。



空值检查的目标是检查出指定列为空的行数,可将为空的行数与总行数或者指定阈值进行比较,如果大于某个阈值则判定为失败
任务参数介绍


不赘诉


任务参数解释


不赘诉


评论已关闭