站浏览量 站访问人数
目录
  1. 1. hdfs简介
  2. 2. Flink操作hdfs
  3. 3. spark下操作hdfs文件

讲述Flink读取、写入hdfs文件。

hdfs简介

hdfs(Hadoop Distributed File System),Hadoop的分布式文件系统,由google开源实现的。存储文件大,容错性能高,是当前集群环境下文件存储的必备系统,许多计算框架都用此。
具体可参考google的开源论文,详解其中架构。

Flink操作hdfs

Flink是分布式计算框架,可以从hdfs操作文件。Flink对hdfs的操作基于hadoop下的hdfs包。Flink读取hdfs文件与读取本地文件没有差别,均可采用同一种方法,只是在执行时,注意hdfs的文件地址与本地的不同。

1
2
3
4
5
6
7
8
9
10
val env = ExecutionEnvironment.getExecutionEnvironment
val file = args(0)
#本地,直接传入本地文件地址
#如果是hdfs,即打包到集群上执行,文件地址需填写绝对路径,
#hdfs的绝对路径填写: hdfs:///***/**/****
#而spark下的绝对路径是,/**/***/
val data = env.readTextFile(file).filter(!_.startsWith("#")).map { x =>
...
}.name("origin data")
env.execute("read from hdfs.")

Flink写入数据进hdfs如下:

1
word2vecSim.map{x=> x._1+"="+x._2.mkString(">")}.writeAsText(args(1))

一般Flink写入hdfs,采取的是DataSet[String],以便后续操作,即可以写入hive表里。这里需注意,Flink在写时可以采用覆盖操作,WriteMode.OVERWRITE

1
word2vecSim.map{x=> x._1+"="+x._2.mkString(">")}.writeAsText(args(1),WriteMode.OVERWRITE)

spark下操作hdfs文件

在Spark下,可以对hdfs文件进行删除操作,但是在Flink下,实践多次未成功,或许是运行的流程不同导致的。在spark下,可以用以下操作来对存在的hdfs文件进行删除操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.spark.SparkContext
import org.apache.hadoop.fs.Path

def deleteHdfsFile(sc: SparkContext,files:List[String]):Unit={
val hadoopConf = sc.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
files.foreach { file => /*删除保存的文件*/
val path1 = new Path(file)
if(hdfs.exists(path1)){
hdfs.delete(path1,true)
}
}
}

def deleteHdfsFile(sc: SparkContext,file:String):Unit={
val hadoopConf = sc.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val path1 = new Path(file)
if(hdfs.exists(path1)){
hdfs.delete(path1,true)
}
}