028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

8.sparkcore之读写数据

  spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。

创新互联公司专注于企业营销型网站建设、网站重做改版、安福网站定制设计、自适应品牌网站建设、H5建站商城系统网站开发、集团公司官网建设、外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为安福等各大城市提供网站开发制作服务。

文件系统

  文件系统主要有本地文件系统、Amazon S3、HDFS等。

  文件系统中存储的文件有多种存储格式。spark支持的一些常见格式有:

格式名称结构化说明
文件文件 普通文件文件,每行一条记录
JSON 半结构化 常见的基于文本的半结构化数据
CSV 常见的基于文本的格式,在电子表格应用中使用
SequenceFiles 一种用于键值对数据的常见Hadoop文件格式

文本文件

  输出文本文件时,可使用saveAsTextFile()方法接收一个目录,将RDD中的内容输出到目录中的多个文件中。

```
result.saveAsTextFile(outputFile)
```

JSON

CSV与TSV

  CSV与TSV文件每行都有固定的字段,字段之间使用分隔符(CSV使用逗号;tsv使用制表符)分隔。

SequenceFile

  SequenceFile是键值对形式的常用Hadoop数据格式。由于Hadoop使用一套自定义的序列化框架,因此SequenceFile的键值对类型需实现Hadoop的Writable接口。

数据库

  数据库主要分为关系型数据库(MySQL、PostgreSQL等)和非关系型数据库(HBase、ElasticSearch等)。

JDBC数据库连接

  spark使用JDBC访问关系型数据库(MySQL、PostgreSQL等),只需要构建一个org.apache.spark.rdd.JdbcRDD即可。

def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}

def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}

val data = new JdbcRDD(sc, createConnection, 
                "SELECT * FROM panda WHERE id >= ? AND id <= ?"),
                lowerBound = 1, upperBound = 3, 
                numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

HBase

  spark通过Hadoop输入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)访问HBase。这种格式返回键值对数据,键类型为org.apache.hadoop.hbase.io.ImmutableBytesWritable,值类型为org.apache.hadoop.hbase.client.Result。

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], ClassOf[Result])

ElasticSearch

  spark使用ElasticSearch-Hadoop连接器从ElasticSearch中读写数据。ElasticSearch连接器依赖于SparkContext设置的配置项。ElasticSearch连接器也没有用到Spark封装的类型,而使用saveAsHadoopDataSet。

def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args[1])
jobConf.set(ConfigurationOptions.ES_NODES, args[2])

val currentTweets = sc.hadoopRDD(jobConf, classOf[EsInputFormat[Object, MapWritable]], classOf[Object], ClassOf[MapWritable])

val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

8.spark core之读写数据


网站名称:8.sparkcore之读写数据
网页URL:http://www.tsicrk.com/article/jcphcs.html

其他资讯

让你的专属顾问为你服务

2.3788s