Flink-Zeppelin On FlinkSql
Flink系列文章
-
更多Flink系列文章请点击Flink系列文章
-
更多大数据文章请点击大数据好文推荐
摘要
最近在调研流平台,发现各大公司流平台的Web界面都是自己一个团队开发,相当完备。苦于人力、时间有限,想找现成的能提交FlinkSql的Web代码,没找到合适的开源的。但是想起了之前看过的Zeppelin,现在已经支持Flink 1.0且支持DataStream、Table & SQL 等,遂赶紧尝试。如果能走通,后续计划在原数据这块儿看看有什么好办法管理起来。
1 Zeppelin
1.1 简介
- Apache Zeppelin
- Apache Zeppelin-preview版
- Github Zeppelin
Zeppelin基于界面化的笔记本,可实现数据驱动,使用SQL、Scala(没看错,能指直接写Scala代码!)等进行交互式数据分析。
可视化的笔记本是指:
- 数据摄取
- 数据发现
数据发现,根据阿里云数据治理 数据保护伞 数据发现章节所说,是指通过规则配置,帮助您有效识别组织内的敏感数据,以project等不同维度,为您提供可视化的数据资产展示。 - 数据分析
- 数据可视化、协作
1.2 架构
1.2.1 Zeppelin Server
Zeppelin Server是Zeppelin最主要的服务之一,负责管理Interpreter。可以启动多个Zeppelin Server组成服务状态一致的集群(使用了Raft协议,具体是 Raft算法库Atomix
),共享Notebook、元数据,实现Interpreter负载均衡。
- 集群模式下的ZeppelinServer就是一个Raft节点(Leader/Follower),而Interpreter是Raft客户端
- 集群模式中,每个ZeppelinServer都运行了一个集群管理服务(使用Raft算法库
Atomix
来组件服务状态一致的集群),会复制状态机(ClusterStateMachine
)来在ZeppelinServer集群上维护一致性的集群元数据(包括集群服务和进程状态)。 - 集群模式下,每个ZeppelinServer和Interpreter运行了集群管理客户端,使用Netty连接Raft Cluster Server来维护存储在ClusterStateMachine中的数据,进程停止后元数据信息将被清除。
- 集群模式下每个ZeppelinServer和Interpreter进程中运行了监控模块,周期性发送心跳给Raft Leader。
-
普通ZeppelinServer发送了本节点的CPU和内存资源平均使用情况,用以在创建Interpreter时进行分配。
-
Raft Leader ZeppelinServer会监控所有类型节点上报的心跳数据,发现超时就认为不可用并剔除。
-
- 集群模式下notebook和interpreter改动会自动同步到集群中所有节点
- 因为使用Raft协议,选举中必须过半投票选一个节点才能选出Leader,所以应该部署奇数个节点,比如2N + 1个,此时可容忍其中的 N 个节点挂掉
可通过Nginx等技术进行前端代理。多个用户访问域名时,Nginx根据分布式策略来将用户分配到不同的可用的Zeppelin Server,如上图User1/2。
1.2.2 Interpreter架构
每个Interpreter进程都是一个JVM进程,通过thrift和Zeppelin Server交互。单节点模式时在ZeppelinServer本地创建,集群模式时先从集群元数据中查找是否已经存在所需Interpreter如果有就直接通过元数据中的该Interpreter进程的Thrift IP和端口来将note和该进程建立绑定关系;如果不存在就通过元数据找出最空闲的ZeppelinServer节点,并通过Thrift来告知远程ZeppelinServer创建Interpreter进程。
用户使用的notebook运行在Interpreter上,底层就能跑各种引擎、语言。
Interpreter 进程启动后,将会在 Zeppelin Cluster MetaData 中提交自身的元数据信息,关闭时清除。如果没有正常退出导致元数据没有先清理,ZeppelinServer会主动周期性检查该元数据对应的Interpreter的心跳时间戳来确定对应的进程是否存活。
Zeppelin Server和Interpreter进程的通信手段是Netty。
-
InterpreterGroup
可包含多个Interpreter,比如SparkInterpreter Group 包含了 SparkSqlInterpreter、SparkInterpreter、PySparkInterpreter 等Intepreter。当用户使用 SparkInterpreter Group 创建一个 notebook 时,ZeppelinServer 会创建一个独立的 JVM 进程,进程中的 SparkSqlInterpreter 、 SparkInterpreter 、PySparkInterpreter可以共用一个 SparkContext,因为他们属于同一个InterpreterGroup。
InterpreterGroup和Interpreter关系具体还要取决于Interpreter Binding Mode。
1.2.3 Cluster MetaData
集群中元数据信息,KV键值对格式,包括ZeppelinServer和Interpreter进程元数据,通过State Machine 维护服务状态一致性。
ZeppelinServer和Interpreter都会周期性发送心跳来更新Cluster Metadata中自己的信息,而担任Leader的那个ZeppelinServer会定时检查Cluster Metadata中的时间戳信息,如果有超时的就会清理超时的服务和进程。
- ZeppelinServer元数据
- Interpreter元数据
1.2.3 Notebook
用户工作的平台,包含若干Paragraph。
默认为local即存在本地,集群模式时应该选择所有集群中的Zeppelin节点都能访问的位置,比如HDFS。
集群模式下,会将修改自动同步到所有ZeppelinServer节点。
1.2.4 Paragraph
一个Notebook包含若干Paragraph,可以共享数据。
比如一个FlinkSQL程序,可以在定义3个Paragraph:
- source table ddl
- sink table ddl
- insert into sink select * from source
1.3 Interpreter
1.3.1 概念
1.3.1.1 概述
Zeppelin interpreter是个重要的组件,可将任何语言和数据处理后端以插件化的方式接入Zeppelin,目前支持的技术栈如下:
如果没有你需要的,那还可以自定义一个:
- how to create a new interpreter
- Installing Interpreters
通过Interpreters,我们可以很方便的使用各种语言和数据处理后端,比如可以直接用%flink
来直接在Zeepelin中写scala代码。
1.3.1.2 Interpreter Binding Mode
用来控制Notebook和用户的隔离模式:
- Globally-shared
所有使用该interpreter的notebook/用户共享一个interpreter JVM进程和session,比如用flink on yarn
那就是每次提交的任务都是提交到一个Flink集群执行。此时Note之间可互相访问创建的变量。生产环境不推荐使用。
- Per Note-scoped
每个Note都会创建一个新的interpreter实例且拥有自己的Session,但是在同一个interpreter JVM进程中。此时仍可通过ResourcePool来跨NoteBook交换对象。
- Per Note-isolated
每个Note都会创建一个新的interpreter进程,也拥有独享的Session。此时仍可通过ResourcePool来跨NoteBook交换对象。
1.3.1.3 Interpreter的生命周期管理
Zeppelin 0.8.0以后提供了LifecycleManager
接口来控制interpreter生命周期,0.9.0有两个实现:
- TimeoutLifecycleManager(默认,可通过
zeppelin.interpreter.lifecyclemanager.class
切换)
当interpreter保持空闲一段时间后就会关闭interpreter。默认阈值为1小时,可通过zeppelin.interpreter.lifecyclemanager.timeout.threshold
设置。 - NullLifecycleManager
什么都不做,由用户控制interpreter生命周期
1.3.2 Interpreter架构
点击这里
1.3.3 Interpreter管理
1.3.3.1 创建Interpreter
- interpreter group
创建的时候,最重要的一点是选择interpreter group:
每个interpreter都属于某一个interpreter group
,一个interpreter group
将所有包含的interpreter
运行在一个jvm里,以他为单位进行启动/停止。
1.3.3.2 修改Interpreter
也是跟创建一样点击进入Interpreter界面,搜索后可以修改创建的interpreter。
1.3.3.3 Interpreter配置项目
见interpreter
1.3.3.4 Interpreter全局配置
Interpreter有很多配置,可直接在Zeppelin上Web界面上设置,有两类属性:
- 大写字母代表系统环境变量
比如flink interpreter中设置FLINK_HOME
和HADOOP_CONF_DIR
,则会将配置作为环境变量传递给flink interpreter进程,由flink使用。 - 否则表示普通interpreter属性
关于Context Parameters
- 还可以通过
#{contextParameterName}
来使用解释器上下文中的参数:
如果context参数为null,则将其替换为空字符串。
1.3.3.5 Interpreter细粒度配置(inline configuration)
上面说的都是interpreter下的通用配置,所有使用该interpreter的Notebook都是用该配置。但有些时候我们想每个notebook单独使用某些配置,虽然可创建单独interpreter但很不方便,所以可使用Inline Generic Configuration
。
即在notebook最开始的paragraph里面写:
%flink_chengc.conf
flink.execution.mode yarn
flink.tm.memory 2048
flink.jm.memory 1024
flink.yarn.appName chengc
flink.yarn.queue default
1.3.4 Interpreter on yarn
可通过interpreter 配置zeppelin.interpreter.launcher yarn
来讲interpreter运行在yarn上,然后会和Zeppelin服务端通过Thrift交互。
具体源码在YarnInterpreterLauncher
和YarnRemoteInterpreterProcess
。
可配合flink on yarn使用,则Interpreter在yarn上am 中拉起来后,会去拉起一个flink cluster。相关源码在FlinkScalaInterpreter
、FlinkInterpreter
等。
flink配置文件上传和读取流程如下:
- 先用当前本机环境变量将flink配置文件目录上传
- 再将envs设为ApplicationConstants.Environment.PWD.$(),即yarn上的am container工作目录
- am拉起来后,会去hdfs下载flink配置文件解压到工作目录,并按照 sys.env.getOrElse
去读取配置的工作目录路径下的flink配置文件,这样就能正确读取到我们本机的flink配置文件了
2 安装、配置和部署
2.1 下载
Download Apache Zeppelin
有三种方式:下载完整已编译二进制包、通过网络安装指定interpreter的包或者源码自己编译安装。
图方便就用第一种吧(不过很大,有1.5G)。
2.2 安装
可参考
- Zeppelin Install
必须是JDK 1.8(171)以上
下载完后直接解压zeppelin-0.9.0-preview1-bin-all.tgz
。
tar -zxvf zeppelin-0.9.0-preview1-bin-all.tgz
2.3 配置
2.3.1 常用配置
可参考
- Apache Zeppelin Configuration
可修改两个配置文件,都配置了同key属性时以环境变量文件为准:
- conf/zeppelin-env.sh
一些环境变量 - conf/zeppelin-site.xml
一些java属性
我改了几个属性:
-
zeppelin.server.addr
改为ip,默认127.0.0.1,不然其他机器访问不了 -
zeppelin.server.port
zeepelin启动后的web端口 -
zeppelin.interpreter.lifecyclemanager.class
改为org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager
,默认是NullLifecycleManager
,不会管interpreter是否空闲。而TimeoutLifecycleManager会在interpreter保持空闲状态超过zeppelin.interpreter.lifecyclemanager.timeout.threshold
毫秒时,干掉interpreter。 -
zeppelin.interpreter.lifecyclemanager.timeout.threshold
改为 3600000 -
zeppelin.interpreter.lifecyclemanager.timeout.checkinterval
改为60000,检测interpreter是否超时的间隔时间 -
zeppelin.recovery.storage.class
指定zeppelin恢复模式(详见https://mp.weixin.qq/s/D02M68HO4Te4cReIoRLMwQ)。改为
org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage
。设定后,关掉Zeppelin主进程不会关掉interpreter进程,重启zeppelin会去重连这些interpreter进程。这个时候如果还想干掉所有interpreter进程,请使用
bin/stop-interpreter.sh
。默认
NullRecoveryStorage
,意味着关掉Zeppelin就关掉了所有运行中的interpreter 进程。 -
zeppelin.recovery.dir
在集群模式下,还应该把此项设为hdfs上路径,如/tmp/zeppelin/recovery
。注意不要加如hdfs://namespace
,否则路径不对!
将Notebook保存在共享存储中 -
zeppelin.notebook.storage
设为org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo
-
zeppelin.notebook.dir
设为hdfs上的目录,如/tmp/zeppelin/notebook
。这里注意不能设为hdfs://namespace/xxx
,否则路径会有问题!
2.3.2 Interpreter配置
2.3.2.1 通用配置
- FLINK_HOME
如/xxx/flink
- flink.execution.mode
默认local
即本地模式,还可用yarn
- HADOOP_CONF_DIR
HADOOP配置文件所在路径,如/xxx/etc/hadoop
2.3.2.2 Flink Interpreter配置
可参考
- Flink interpreter for Apache Zeppelin-Configuration
重要配置如下:
-
flink.interpreter.close.shutdown_cluster
改为false。默认true,即在interpreter关闭时会shutdown应用程序。
注意,在interpreter on yarn+flink on yarn模式下可以将此设置改为true,效果就是只要你需要重启interpreter来改配置就会停止对应的flink app,达成同步。
-
zeppelin.interpreter.close.cancel_job
改为false默认true,即在interpreter关闭时会cancel我们的flink job。
注意,在interpreter on yarn+flink on yarn模式下可以将此设置改为true,效果就是只要你需要重启interpreter来改配置就会停止对应的flink app,达成同步。
如果flink任务依赖一些包,可以有三种方式加入依赖:
-
flink.execution.jars
指定flink job所依赖的普通jar包,所有的jar包都被会load到flink interpreter的classpath,还会被发送到Task Manager。 -
flink.udf.jars
和flink.execution.jars不同的地方在于Zeppelin会自动检测该选项指定的jar包中所包含的UDF class,会把检测到的UDF注册到TableEnvironment中(UDF的名字就是这个class name),以便用户使用。注意:
- 你的UDF Class必须包含一个无参的构造函数。
- 这种方式如果实在全局interpreter配置,则UDF也是全局的
- 还可以在你自己的notebook interpreter inline config里面配置,这种方式就对当前notebook生效
-
flink.execution.packages
类似flink.execution.jars,但不同的是Zeppelin会下载该选项指定的package以及该package的依赖放到flink interpreter的classpath。比如你想使用kafka connector配置如下org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
但我实测在测试环境无问题,生产环境因为安全策略不能自动下载,只能去repo1.maven手动下载打好的jar(flink-connector-kafka_2.11-1.10.0.jar、flink-connector-kafka-base_2.11-1.10.0.jar、flink-sql-connector-kafka_2.11-1.10.0.jar、flink-json-1.10.0.jar,如果找不到包还可以放入kafka-clients-2.2.0.jar),并upload到使用的
flink/lib
下,否则会报错如下:WARN [2020-05-28 11:41:34,042] ({SchedulerFactory11} NotebookServer.java[onStatusChange]:1901) - Job paragraph_1590573205576_568821782 is finished, status: ERROR, exception: null, result: %text org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found] at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found] at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76) at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355) at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.open(FlinkSqlInterrpeter.java:109) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:49) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70) ... 8 more Caused by: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found] at org.apache.zeppelin.flink.util.DependencyUtils$.resolveMavenCoordinates(DependencyUtils.scala:353) at org.apache.zeppelin.flink.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:60) at org.apache.zeppelin.flink.FlinkScalaInterpreter.getUserJars(FlinkScalaInterpreter.scala:740) at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:149) at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:66) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70) ... 13 more
2.3.3 Flink On Yarn + Interpreter On Yarn
2.3.3.1 添加jar包
需要添加一些必要的jar包放在FLINK_HOME//lib
下:
- flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
- flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
如果要采用Hive来存元数据或访问hive还需要:
- flink-connector-hive_2.11-1.10.0.jar
- hive-exec-2.3.3.jar
2.3.3.2 设置interpreter
2.3.3.2.1 flink on yarn
- flink.execution.mode
yarn
2.3.3.2.2 interpreter on yarn
最新master分支中,还可通过interpreter 配置zeppelin.interpreter.launcher yarn
来讲interpreter运行在yarn上,然后会和Zeppelin服务端通过Thrift交互,并向yarn申请资源来启动flink cluster。详见Flink on Zeppelin (7). Yarn Interpeter 模式。
此模式其他要求:
- 安装Hadoop client (hadoop 2和3都支持),要求能在本机直接运行
hadoop classpath
命令。
zeppelin需要调用此命令将所有hadoop jar放入zeppelin inclasspath。 - 环境变量中配置
USE_HADOOP=true
(也可在zeppelin-env.sh
)、HADOOP_CONF_DIR
interpreter local + flink on yarn:
interpreter on yarn + flink on yarn:
相关interpreter配置如下:
- zeppelin.interpreter.launcher
yarn - zeppelin.interpreter.yarn.resource.memory
默认1GB,单位为MB。指定interpreter进程内存。 - zeppelin.interpreter.yarn.resource.cores
默认1。指定interpreter进程cpu核数。 - zeppelin.interpreter.yarn.queue
默认default,指定interpreter进程提交运行的yarn 队列。 - HADOOP_USER_NAME
指定运行yarn app用户名。会作为环境变量,在Yarn启动container时使用。
这是yarn的配置,可参考identity-on-an-insecure-cluster-hadoop_user_name
2.4 启动
进入安装好的zeppelin目录后,执行
bin/zeppelin-daemon.sh start
随后就可以访问Zeepelin Web界面了:
可以看到,已经有了一些现成的Notebook示例。
2.5 停止
bin/zeppelin-daemon.sh stop
2.6 重启
bin/zeppelin-daemon.sh restart
2.7 集群模式
重要说明:
- 我使用集群模式运行一段时间以后,发现各个节点包括interpreter都报了很多关于Raft协议错误,而且是突然就崩了,导致zeppelin无法正常使用。由于zeppelin关于recover和interpreter on yarn模式已经合并到主分支,而且测试无误,所以我们放弃集群模式。
2.7.1 Zeppelin全局配置
-
必须将Notebook保存在共享存储中
- zeppelin.notebook.storage
设为org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo
- zeppelin.notebook.dir
设为hdfs上的目录,如/tmp/zeppelin/notebook
。这里注意不能设为hdfs://namespace/xxx
,否则路径会有问题!
- zeppelin.notebook.storage
-
zeppelin.recovery.dir
在集群模式下,还应该把此项设为hdfs上路径,如/tmp/zeppelin/recovery
。注意不要加如hdfs://namespace
,否则路径不对! -
zeppelin.cluster.addr
配置集群中的所有ZeppelinServerIP及Raft端口,Raft据此进行Leader选举、元数据维护等。多个地址用逗号分隔。这里需要注意的是,Raft端口号配置不要和
zeppelin.server.port
配置相同,否则会造成冲突!
注意事项:
- 因为使用Raft协议,选举中必须过半投票选一个节点才能选出Leader,所以应该部署奇数个节点,比如2N + 1个,此时可容忍其中的 N 个节点挂掉
2.7.2 Flink Interpreter配置
如果在Zeppelin集群模式下使用FlinkSql on Yarn,需要做如以下配置,否则报错:
- FLINK_CONF_DIR
/FLINK_HOME/conf - FLINK_PLUGINS_DIR
/FLINK_HOME/plugins - FLINK_LIB_DIR
/FLINK_HOME/lib
该错误我已经提交了issue给社区,详情可见:ZEPPELIN-4809
2.8 Hive整合
2.8.1 基本配置
主要是可以让Flink使用Hive Catalog存储Flink SQL 元数据(可参考HiveCatalog,注意这种表只能由Flink读写使用,不要用Hive去读写。可以在Hive命令行中使用DESCRIBE FORMATTED
命令查看表的元数据,如果是is_generic=true
代表是Flink专用表),也可以直接使用Flink读写Hive表数据。
需要将以下包放入$FLINK_HOME/lib
:
- flink-connector-hive_2.11-1.10.0.jar
- hive-exec-2.3.3.jar
然后设置flink interpreter:
- HIVE_CONF_DIR
设为hive-site.xml
所在目录 - zeppelin.flink.enableHive
设为true,启用hive - zeppelin.flink.hive.version
使用的hive 版本号
随后,使用flinksql注册的表会自动保存到hive default库里。
2.9 checkpoint相关
可以使用flink 配置,实现0代码配置checkpoint。
具体请参考flink-checkpoint配置
2.10 权限
zeppelin可采用LDAP做身份认证+shiro做权限控制。
相关内容可参考:
- Apache Shiro Configuration
- Apache Shiro authentication for Apache Zeppelin
- Apache Zeppelin 基于 kerberos 多租户集成
- Zeppelin集成Ldap(FreeIPA)
需要改主机名,会导致hadoop等很多服务不正常 - zeppelin集成openldap,以及admin用户设置
2.11 数据脱敏-Credentials
比如我们有一些ddl中定义了数据库连接信息,这些信息十分敏感不想暴露给其他人,这个时候我们可以用Credentials。
-
Credentials配置
先在interpreter配置injectCredentials true
,也可在notebook界面做配置,比如执行时使用%flink(injectCredential=true)
-
Credentials打开
-
Credentials定义
这里的Entity就相当于是你的Credentials的Key,在访问时使用。需要注意的是,每个人创建的Credentials对其他人都不可见,别人也无法使用。
-
Credentials使用
再次强调,每个人只能使用自己的Credentials。格式为user.EntityName
和password.EntityName
。(因0.9-preview尚不是稳定版本,可能存在一定变化,还可尝试EntityName.user
)。本用户使用Credentials效果:
用户使用其他人的Credentials效果,打出的是原始字符串而不是Credentials:
2.12 Pyflink
2.12.1 概述
-
必须使用版本号3.5-3.7的python
-
pip install apache-flink
如果安装很慢或者超时,可以加参数pip --default-timeout=100000 install -i https://pypi.tuna.tsinghua.edu/simple apache-flink。
如果提示pip需要升级,可执行
pip --default-timeout=100000 install -i https://pypi.tuna.tsinghua.edu/simple --upgrade pip
-
将
flink-python_2.11-1.10.0.jar
从$FLINK_HOME/opt
移动到$FLINK_HOME/lib
:cp opt/flink-python_2.11-1.10.0.jar lib/
-
配置好flink_interpreter的
zeppelin.pyflink.python
为python路径
2.12.2 python UDF
%flink.pyflink
class PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING, DataTypes.STRING()))
3 实用功能
3.1 定时调度
- Running a Notebook on a Given Schedule Automatically
前提是需要将zeppelin的配置zeppelin.notebook.cron.enable
设为true
3.2 数据可视化
3.2.1 概述
已经支持一些基本图标,任意后端输出都可以以图表方式展现!(不用后端人员再去学什么echarts了)
3.2.2 聚合指标运算
直接可以拖拽方式生成。
还想了解更多或二次开发就参考:
- basic display systems
- Angular AP 前端I
- Angular API 后端
3.2.3 动态表格
Dynamic Forms 是Zeppelin的一个高级功能,允许用户在代码中插入UI控件来允许用户定制化你的代码。Jdbc Interpreter支持这一功能,用户可以定制SQL,下面是一个下拉框的例子。
3.2.4 发布Zeppelin笔记本
可以直接将你的Zeppelin笔记本url分享给其他写作者,则大家都可以看到实时更新。
4 Zeppelin高可用
4.1 ZeppelinServer服务不可用
如上图,如果ZeppelinServer1 突然挂掉不可用,如果正确配置了相关配置zeppelin.recovery.storage.class
,则不会影响其上运行的interpreter进程,如果此时这些进程可访问,则其他ZeppelinServer节点可以通过Cluster Metadata读取到这些interpreter进程元数据信息,让用户继续使用这些interpreter进程。
当然Nginx也会发现ZeppelinServer1出现异常,将它视为离线状态。
那么现在本来是用ZeppelinServer1的User1再次启动NoteBook时,Nginx会将请求发送到其他ZeppelinServer节点来使用之前的interpreter进程。
4.2 Zeppelin节点整个不可用
当ZeppelinServer1所在节点整个挂掉时,其他ZeppelinServer会删除无效元数据,并重建interpreter进程。
5 二次开发
5.1 概述
Zeppelin前后端分离架构,可参考:
- Contributing to Documentation
- Contributing to Zeppelin-Web
有很多关于Zeppelin前端开发的详细内容 - Github-Zeppelin Web Application
安装yarn_package和启动zeppelin-web指导 - Contributing to Apache Zeppelin ( Website )
5.2 前端
- 首先要安装npm和nodejs,网上找教程即可。
- 然后参考Github-Zeppelin Web Application安装yarn(打包用的,不是hadoop那个yarn)
- 在本地Zeppelin工程目录下使用常规方式启动Zeppelin-server,默认是8080端扣
- 在本地Zeppelin工程/zeppelin-web 目录下执行
yarn run dev
即可开始调试,会自动连接到8080端口,并启动一个9000端口供访问和调试。现在你在ide里面改动js,便可立即体现在页面上了(当然,需要刷新一次)。 - 如果本地zeppelin-server运行很慢,可以将项目打包后放到运行速度较快的其他机器如测试环境机器,然后启动。本机的zeppelin-web只需修改
base-url.service.js
,假设远程zeppelin-server ip为192.168.1.1,则改动如下:this.getWebsocketUrl = function() { let wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; return wsProtocol + '//' + "192.168.1.1" + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/ws'; }; this.getBase = function() { return location.protocol + '//' + "192.168.1.1" + ':' + this.getPort() + location.pathname; };
问题:
- 如果遇到
lint:once
相关语法检测错误,可以把/zeppelin/zeppelin-web/package.json
中的"prebuild": "npm-run-all clean lint:once"
改为prebuild": "npm-run-all clean
,并去掉"lint:once": "eslint src"
。 - 如果遇到一些包卡住半天下不动,那就手动下下来放在那个目录里。
5.3 后端
好文推荐
Apache Zeppelin公众号
钉钉讨论群
官方
- Zeppelin-Jira
- Github-Apache Zeppelin
视频教程
- Flink on Zeppelin: 极致体验(1) 入门 + Batch
- Flink on Zeppelin: 极致体验(2) Streaming + 高级用法
- Flink on Zeppelin 视频教程全集
来自阿里的章剑锋,Zeppelin PMC
综合
- 章剑锋Jeff-Zeppelin专栏
- Flink on Zeppelin (1) - 入门篇
- Flink on Zeppelin (2) - Batch篇
- Flink on Zeppelin (3) - Streaming篇
- Flink on Zeppelin (4) - 高级特性篇
- Flink on Zeppelin (5) - 机器学习篇
- Flink on Zeppelin 极致体验 阿里章剑锋 - 直播回放
- Flink Sql on Zeppelin教程
架构和原理
- Zeppelin 分布式架构设计
- Zeppelin工作机制解析
调研
- Zeppelin调研与数据开发平台
源码
- Zeppelin源码分析
- Apache Zeppelin源码结构分析
- Understanding Zeppelin Interpreters
介绍了除Flink以外的一些Interpreters - Zeppelin求学之路(3)—Zeppelin基本模块介绍和Paragraph源码深入了解以及Note,NoteBook 简介,
- Zeppelin源码阅读之更新notebook的paragraph部分
使用
- Zeppelin: 让大数据插上机器学习的翅膀
网易杭州研究院数据科学中心机器学习开发组负责人 刘勋 - 如何在Apache Zeppelin中玩转Flink
- Hadoop - Zeppelin 使用心得
- Apache Zeppelin主要界面和基本操作
- 可视化分析工具Apache Zeppelin:数据分析从未这样简单
介绍了一些数据源连接配置、可视化插件(地图、热力图等) - Apache Zeppelin 基于 kerberos 多租户集成
- Apache zeppelin binding mode
关于interpreter隔离的讨论。看起来per-note模式中也会发生,不同用户使用不同note对应同一个interpreter,一个用户重启该interpreter会导致全部interpreter重启,对应任务停止!
二次开发
- Zeppelin Notebook Now Has a Stop Button
Notebook界面添加关闭按钮 - Zeppelin在求学之路----在Zeppelin上开发SendMai功
- zeppelin的数据集的优化
参考文档
- Apache Zeppelin
- Flink on Zeppelin (1) - 入门篇
- Flink on Zeppelin (2) - Batch篇
- Flink on Zeppelin (3) - Streaming篇
- Flink on Zeppelin (4). 高级特性篇
- Zeppelin 分布式架构设计
- 如何在Zeppelin里玩转Hive
- Zeppelin 0.8.0 New Features
来自Zeppelin PMC Jeff Zhang - 数据治理 数据保护伞 数据发现
- Zeppelin安全机制之Credentials使用技巧
更多推荐
Flink-Zeppelin On FlinkSql
发布评论