028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

9.sparkcore之共享变量

简介

  spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。

创新互联一直秉承“诚信做人,踏实做事”的原则,不欺瞒客户,是我们最起码的底线! 以服务为基础,以质量求生存,以技术求发展,成交一个客户多一个朋友!为您提供成都网站设计、成都网站建设、成都网页设计、微信小程序开发、成都网站开发、成都网站制作、成都软件开发、重庆APP开发是成都本地专业的网站建设和网站设计公司,等你一起来见证!

  spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。

广播变量

原理

9.spark core之共享变量

用法

# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

val countryContactCounts = contactCounts.map{case (sign, count) => {
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
    }}.reduceByKey((x, y) => x+y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量
final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction, String, Integer>() {
    public Tuple2 call(Tuple2 callSignCount) {
        String sign = callSignCount._1();
        String country = lookupCountry(sign, signPrefixes.value());
        return new Tuple2(country, callSignCount._2()); 
    }
}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

9.spark core之共享变量

用法

实例

  累加空行

python
file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 访问全局变量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 //累加器加1
    }
    line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
java
JavaRDD rdd = sc.textFile(args[1]);

final Accumulator blankLines = sc.accumulator(0);

JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() {
    public Iterable call(String line) {
        if ("".equals(line)) {
            blankLines.add(1);
        }
        return Arrays.asList(line.split(" "));
    }
});

callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

9.spark core之共享变量


分享题目:9.sparkcore之共享变量
URL地址:http://www.tsicrk.com/article/jpjode.html

其他资讯

让你的专属顾问为你服务

0.6489s