兼容性
默认的,Spark 1.0.x版本绑定的是Tachyon 0.4.1版本.如果你的Tachyon版本不同,那需要在这个文件:
spark/core/pom.xml
中指定Tachyon的版本,然后重新编译正确的版本的Spark.
与Tachyon进行数据IO
这一部分的额外前提是Spark(0.6 或者后续版本).另外,假设你在运行Tachyon`site`.`TACHYON_RELEASED_VERSION` 或后续版本的时候通过查阅资料已经搭建好Tachyon和Hadoop 的本地模式或者集群模式.
如果运行的Spark版本低于1.0.0,那麻烦你把如下内容添加到这个文件:
spark/conf/spark-env.sh
中:
export SPARK_CLASSPATH=/pathToTachyon/client/target/tachyon-client-`site`.`TACHYON_RELEASED_VERSION`-jar-with-dependencies.jar:$SPARK_CLASSPATH
如果是跑在 hadoop 1.x 版本的集群上,那么需要创建一个新的文件:
spark/conf/core-site.xml
并把如下内容添加进去:
<configuration>
<property>
<name>fs.tachyon.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
</configuration>
将文件X写入HDFS,然后执行如下Spark Shell:
$ ./spark-shell
$ val s = sc.textFile("tachyon://localhost:19998/X")
$ s.count()
$ s.saveAsTextFile("tachyon://localhost:19998/Y")
查看 的话会发现会有个输出文件 Y 包含有 输入文件 X 内单词的数量(一个单词一行?).
将文件X写入HDFS,然后执行如下Spark Shell:
如果你通过sbt 或者 其他利用sbt的框架调用 Spark 的job:
val conf = new SparkConf()
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("fs.tachyon.impl", "tachyon.hadoop.TFS")
如果你的hadoop 集群版本为1.x 并且 以Zookeeper的容错模式 运行Tachyon,需要在之前创建的xml文件: spark/conf/core-site.xml 中添加如下实体(属性):
<property>
<name>fs.tachyon-ft.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
在这个文件 spark/conf/spark-env.sh 中添加如下内容:
export SPARK_JAVA_OPTS="
-Dtachyon.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181
-Dtachyon.usezookeeper=true
$SPARK_JAVA_OPTS
"
将文件 X 写入HDFS. 当运行Spark Shell的时候你可以指定任何的Tachyon Master:
$ ./spark-shell
$ val s = sc.textFile("tachyon-ft://stanbyHost:19998/X")
$ s.count()
$ s.saveAsTextFile("tachyon-ft://activeHost:19998/Y")
将 Spark RDDs 在 Tachyon中持久化
为满足这个特点,你需要运行 (1.0 或后续版本) 以及 Tachyon (0.4.1 或后续版本). 详细优点可查阅 .
Spark 应用程序需要设置两个参数: spark.tachyonStore.url 以及 spark.tachyonStore.baseDir.
spark.tachyonStore.url (默认值为 tachyon://localhost:19998) 是处于TachyonStore的Tachyon 文件系统的URL. spark.tachyonStore.baseDir (默认值为 java.io.tmpdir) 存储RDDs 的Tachyon 文件系统的根目录. 他可以为一个由逗号隔开的list,这个list由Tachyon众多文件夹组成 .
为了将Spark RDDs 在 Tachyon中持久化, 你需要用到这个参数: StorageLevel.OFF_HEAP . 使用Spark Shell的例子如下:
$ ./spark-shell
$ val rdd = sc.textFile(inputPath)
$ rdd.persist(StorageLevel.OFF_HEAP)
当Spark 应用程序在运行的时候,你可以在Tachyon 的Web界面(默认 URI 为)查看 spark.tachyonStore.baseDir . 你会发现有很多文件在那,他们是RDD块.通常来讲,当Spark 应用程序 运行结束的时候这些文件会被清理干净.你也可在Spark 应用中,将Tachyon 作为输入输出源来使用
#这是我第一次翻译,如有不足,请各位大神们支持^ ^
#By Lucosax
Yang