Spark Interpreter for Apache Zeppelin
Overview
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Apache Spark is supported in Zeppelin with Spark interpreter group which consists of below six interpreters.
Name | Class | Description |
---|---|---|
%spark | SparkInterpreter | Creates a SparkContext/SparkSession and provides a Scala environment |
%spark.pyspark | PySparkInterpreter | Provides a Python environment |
%spark.ipyspark | IPySparkInterpreter | Provides a IPython environment |
%spark.r | SparkRInterpreter | Provides an R environment with SparkR support |
%spark.sql | SparkSQLInterpreter | Provides a SQL environment |
%spark.kotlin | KotlinSparkInterpreter | Provides a Kotlin environment |
Configuration
The Spark interpreter can be configured with properties provided by Zeppelin. You can also set other Spark properties which are not listed in the table. For a list of additional properties, refer to Spark Available Properties.
Property | Default | Description |
---|---|---|
SPARK_HOME |
Location of spark distribution | |
spark.master | local[*] | Spark master uri. e.g. spark://masterhost:7077 |
spark.submit.deployMode | The deploy mode of Spark driver program, either "client" or "cluster", Which means to launch driver program locally ("client") or remotely ("cluster") on one of the nodes inside the cluster. | |
spark.app.name | Zeppelin | The name of spark application. |
spark.driver.cores | 1 | Number of cores to use for the driver process, only in cluster mode. |
spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). |
spark.executor.cores | 1 | The number of cores to use on each executor |
spark.executor.memory | 1g | Executor memory per worker instance. e.g. 512m, 32g |
spark.executor.instances | 2 | The number of executors for static allocation |
spark.files | Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed. | |
spark.jars | Comma-separated list of jars to include on the driver and executor classpaths. Globs are allowed. | |
spark.jars.packages | Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories. | |
PYSPARK_PYTHON |
python | Python binary executable to use for PySpark in both driver and executors (default is python ).
Property spark.pyspark.python take precedence if it is set |
PYSPARK_DRIVER_PYTHON |
python | Python binary executable to use for PySpark in driver only (default is PYSPARK_PYTHON ).
Property spark.pyspark.driver.python take precedence if it is set |
zeppelin.pyspark.useIPython | false | Whether use IPython when the ipython prerequisites are met in %spark.pyspark |
zeppelin.R.cmd | R | R binary executable path. |
zeppelin.spark.concurrentSQL | false | Execute multiple SQL concurrently if set true. |
zeppelin.spark.concurrentSQL.max | 10 | Max number of SQL concurrently executed |
zeppelin.spark.maxResult | 1000 | Max number rows of Spark SQL result to display. |
zeppelin.spark.printREPLOutput | true | Print scala REPL output |
zeppelin.spark.useHiveContext | true | Use HiveContext instead of SQLContext if it is true. Enable hive for SparkSession |
zeppelin.spark.enableSupportedVersionCheck | true | Do not change - developer only setting, not for production use |
zeppelin.spark.sql.interpolation | false | Enable ZeppelinContext variable interpolation into spark sql |
zeppelin.spark.uiWebUrl | Overrides Spark UI default URL. Value should be a full URL (ex: http://{hostName}/{uniquePath}. In Kubernetes mode, value can be Jinja template string with 3 template variables 'PORT', 'SERVICENAME' and 'SERVICEDOMAIN'. (ex: http://-.) | |
spark.webui.yarn.useProxy | false | whether use yarn proxy url as spark weburl, e.g. http://localhost:8088/proxy/application1583396598068_0004 |
spark.repl.target | jvm-1.6 |
Manually specifying the Java version of Spark Interpreter Scala REPL,Available options: scala-compile v2.10.7 to v2.11.12 supports "jvm-1.5, jvm-1.6, jvm-1.7 and jvm-1.8", and the default value is jvm-1.6. scala-compile v2.10.1 to v2.10.6 supports "jvm-1.5, jvm-1.6, jvm-1.7", and the default value is jvm-1.6. scala-compile v2.12.x defaults to jvm-1.8, and only supports jvm-1.8. |
Without any configuration, Spark interpreter works out of box in local mode. But if you want to connect to your Spark cluster, you'll need to follow below two simple steps.
Export SPARK_HOME
There are several options for setting SPARK_HOME
.
- Set
SPARK_HOME
inzeppelin-env.sh
- Set
SPARK_HOME
in Interpreter setting page - Set
SPARK_HOME
via inline generic configuration
1. Set SPARK_HOME
in zeppelin-env.sh
If you work with only one version of spark, then you can set SPARK_HOME
in zeppelin-env.sh
because any setting in zeppelin-env.sh
is globally applied.
e.g.
export SPARK_HOME=/usr/lib/spark
You can optionally set more environment variables in zeppelin-env.sh
# set hadoop conf dir
export HADOOP_CONF_DIR=/usr/lib/hadoop
2. Set SPARK_HOME
in Interpreter setting page
If you want to use multiple versions of spark, then you need create multiple spark interpreters and set SPARK_HOME
for each of them. e.g.
Create a new spark interpreter spark24
for spark 2.4 and set SPARK_HOME
in interpreter setting page
Create a new spark interpreter spark16
for spark 1.6 and set SPARK_HOME
in interpreter setting page
3. Set SPARK_HOME
via inline generic configuration
Besides setting SPARK_HOME
in interpreter setting page, you can also use inline generic configuration to put the
configuration with code together for more flexibility. e.g.
Set master in Interpreter menu
After starting Zeppelin, go to Interpreter menu and edit spark.master property in your Spark interpreter setting. The value may vary depending on your Spark cluster deployment type.
For example,
- local[*] in local mode
- spark://master:7077 in standalone cluster
- yarn-client in Yarn client mode (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x)
- yarn-cluster in Yarn cluster mode (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x)
- mesos://host:5050 in Mesos cluster
That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way. For the further information about Spark & Zeppelin version compatibility, please refer to "Available Interpreters" section in Zeppelin download page.
Note that without exporting
SPARK_HOME
, it's running in local mode with included version of Spark. The included version may vary depending on the build profile.Yarn client mode and local mode will run driver in the same machine with zeppelin server, this would be dangerous for production. Because it may run out of memory when there's many spark interpreters running at the same time. So we suggest you only allow yarn-cluster mode via setting
zeppelin.spark.only_yarn_cluster
inzeppelin-site.xml
.
Configure yarn mode for Spark 3.x
Specifying yarn-client
& yarn-cluster
in spark.master
is not supported in Spark 3.x any more, instead you need to use spark.master
and spark.submit.deployMode
together.
Mode | spark.master | spark.submit.deployMode |
---|---|---|
Yarn Client | yarn | client |
Yarn Cluster | yarn | cluster |
SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext, SQLContext, SparkSession (for spark 2.x) and ZeppelinContext are automatically created and exposed as variable names sc
, sqlContext
, spark
and z
, respectively, in Scala, Kotlin, Python and R environments.
Note that Scala/Python/R environment shares the same SparkContext, SQLContext, SparkSession and ZeppelinContext instance.
YARN Mode
Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is supported from 0.8.0). For yarn mode, you must specify SPARK_HOME
& HADOOP_CONF_DIR
.
Usually you only have one hadoop cluster, so you can set HADOOP_CONF_DIR
in zeppelin-env.sh
which is applied to all spark interpreters. If you want to use spark against multiple hadoop cluster, then you need to define
HADOOP_CONF_DIR
in interpreter setting or via inline generic configuration.
Dependency Management
For spark interpreter, it is not recommended to use Zeppelin's Dependency Management for managing
third party dependencies (%spark.dep
is removed from Zeppelin 0.9 as well). Instead you should set the standard Spark properties.
Spark Property | Spark Submit Argument | Description |
---|---|---|
spark.files | --files | Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed. |
spark.jars | --jars | Comma-separated list of jars to include on the driver and executor classpaths. Globs are allowed. |
spark.jars.packages | --packages | Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories. |
You can either set Spark properties in interpreter setting page or set Spark submit arguments in zeppelin-env.sh
via environment variable SPARK_SUBMIT_OPTIONS
.
For examples:
export SPARK_SUBMIT_OPTIONS="--files <my_file> --jars <my_jar> --packages <my_package>"
But it is not recommended to set them in SPARK_SUBMIT_OPTIONS
. Because it will be shared by all spark interpreters, which means you can not set different dependencies for different users.
PySpark
There're 2 ways to use PySpark in Zeppelin:
- Vanilla PySpark
- IPySpark
Vanilla PySpark (Not Recommended)
Vanilla PySpark interpreter is almost the same as vanilla Python interpreter except Zeppelin inject SparkContext, SQLContext, SparkSession via variables sc
, sqlContext
, spark
.
By default, Zeppelin would use IPython in %spark.pyspark
when IPython is available, Otherwise it would fall back to the original PySpark implementation.
If you don't want to use IPython, then you can set zeppelin.pyspark.useIPython
as false
in interpreter setting. For the IPython features, you can refer doc
Python Interpreter
IPySpark (Recommended)
You can use IPySpark
explicitly via %spark.ipyspark
. IPySpark interpreter is almost the same as IPython interpreter except Zeppelin inject SparkContext, SQLContext, SparkSession via variables sc
, sqlContext
, spark
.
For the IPython features, you can refer doc Python Interpreter
SparkR
Zeppelin support SparkR via %spark.r
. Here's configuration for SparkR Interpreter.
Spark Property | Default | Description |
---|---|---|
zeppelin.R.cmd | R | R binary executable path. |
zeppelin.R.knitr | true | Whether use knitr or not. (It is recommended to install knitr and use it in Zeppelin) |
zeppelin.R.image.width | 100% | R plotting image width. |
zeppelin.R.render.options | out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F, fig.retina = 2 | R plotting options. |
SparkSql
Spark Sql Interpreter share the same SparkContext/SparkSession with other Spark interpreter. That means any table registered in scala, python or r code can be accessed by Spark Sql. For examples:
%spark
case class People(name: String, age: Int)
var df = spark.createDataFrame(List(People("jeff", 23), People("andy", 20)))
df.createOrReplaceTempView("people")
%spark.sql
select * from people
By default, each sql statement would run sequentially in %spark.sql
. But you can run them concurrently by following setup.
- Set
zeppelin.spark.concurrentSQL
to true to enable the sql concurrent feature, underneath zeppelin will change to use fairscheduler for spark. And also setzeppelin.spark.concurrentSQL.max
to control the max number of sql statements running concurrently. - Configure pools by creating
fairscheduler.xml
under yourSPARK_CONF_DIR
, check the official spark doc Configuring Pool Properties Set pool property via setting paragraph property. e.g.
%spark(pool=pool1) sql statement
This pool feature is also available for all versions of scala Spark, PySpark. For SparkR, it is only available starting from 2.3.0.
Interpreter Setting Option
You can choose one of shared
, scoped
and isolated
options when you configure Spark interpreter.
e.g.
- In
scoped
per user mode, Zeppelin creates separated Scala compiler for each user but share a single SparkContext. - In
isolated
per user mode, Zeppelin creates separated SparkContext for each user.
ZeppelinContext
Zeppelin automatically injects ZeppelinContext
as variable z
in your Scala/Python environment. ZeppelinContext
provides some additional functions and utilities.
See Zeppelin-Context for more details.
Setting up Zeppelin with Kerberos
Logical setup with Zeppelin, Kerberos Key Distribution Center (KDC), and Spark on YARN:
There're several ways to make spark work with kerberos enabled hadoop cluster in Zeppelin.
Share one single hadoop cluster. In this case you just need to specify
zeppelin.server.kerberos.keytab
andzeppelin.server.kerberos.principal
in zeppelin-site.xml, Spark interpreter will use these setting by default.Work with multiple hadoop clusters. In this case you can specify
spark.yarn.keytab
andspark.yarn.principal
to overridezeppelin.server.kerberos.keytab
andzeppelin.server.kerberos.principal
.
User Impersonation
In yarn mode, the user who launch the zeppelin server will be used to launch the spark yarn application. This is not a good practise. Most of time, you will enable shiro in Zeppelin and would like to use the login user to submit the spark yarn app. For this purpose, you need to enable user impersonation for more security control. In order the enable user impersonation, you need to do the following steps
Step 1 Enable user impersonation setting hadoop's core-site.xml
. E.g. if you are using user zeppelin
to launch Zeppelin, then add the following to core-site.xml
, then restart both hdfs and yarn.
<property>
<name>hadoop.proxyuser.zeppelin.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.zeppelin.hosts</name>
<value>*</value>
</property>
Step 2 Enable interpreter user impersonation in Spark interpreter's interpreter setting. (Enable shiro first of course)
Step 3(Optional) If you are using kerberos cluster, then you need to set zeppelin.server.kerberos.keytab
and zeppelin.server.kerberos.principal
to the user(aka. user in Step 1) you want to
impersonate in zeppelin-site.xml
.
Deprecate Spark 2.2 and earlier versions
Starting from 0.9, Zeppelin deprecate Spark 2.2 and earlier versions. So you will see a warning message when you use Spark 2.2 and earlier.
You can get rid of this message by setting zeppelin.spark.deprecatedMsg.show
to false
.
Configuration Setup
On the server that Zeppelin is installed, install Kerberos client modules and configuration, krb5.conf. This is to make the server communicate with KDC.
Add the two properties below to Spark configuration (
[SPARK_HOME]/conf/spark-defaults.conf
):spark.yarn.principal spark.yarn.keytab
NOTE: If you do not have permission to access for the above spark-defaults.conf file, optionally, you can add the above lines to the Spark Interpreter setting through the Interpreter tab in the Zeppelin UI.
That's it. Play with Zeppelin!