软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 云计算 -> Spark闭包与序列化 -> 正文阅读

[云计算]Spark闭包与序列化


本文原文出处: http://blog.csdn.net/bluishglc/article/details/50945032 严禁任何形式的转载,否则将委托CSDN官方维护权益!
在Spark的官方文档再三强调那些将要作用到RDD上的操作,都会被分发到各个worker节点上去执行,我们都知道,这些操作实际上就是一些函数和涉及的变量组成的闭包,这里显然涉及到一个容易被忽视的问题:闭包的“序列化”。显然,闭包是有状态的,这主要是指它牵涉到的那些自由变量以及自由变量的依赖到的其他变量,所以,在将一个简单的函数或者一段简短的操作(就是闭包)传递给类似RDD.map等函数时,Spark需要检索闭包内所有的涉及到的变量(包括传递依赖到的变量),正确地把这些变量序列化之后才能传递到worker节点并反序列化去执行。如果在涉及到的所有的变量中有任何不支持序列化或没有指明如何序列化自己时,你就会遇到这样的错误:
org.apache.spark.SparkException: Task not serializable

在下面的例子中,我们从kafka中持续地接收json消息,并在spark-streaming中将字符串解析成对应的实体:
object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        //this val is a part of closure, and it's not serializable!
        implicit val formats = DefaultFormats
        def parser(json: String) = parse(json).extract[Person].firstName
        lines.map(_._2).map(parser).print
        ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(2)
        ssc.stop()
    }

}

这段代码在执行时就会报如下错误:
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

问题的症结就是闭包没有办法序列化引起的。在这个例子里,闭包的范围是:函数parser以及它所依赖的一个隐式参数: formats , 而问题就出在这个隐式参数上, 它的类型是DefaultFormats,这个类没有提供序列化和反序列自身的说明,所以Spark无法序列化formats,进而无法将task推送到远端执行。
隐式参数formats是为extract准备的,它的参数列表如下:
org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...

找到问题的根源之后就好解决了。实际上我们根本不需要序列化formats, 对我们来说,它是无状态的。所以,我们只需要把它声明为一个全局静态的变量就可以绕过序列化。所以改动的方法就是简单地把implicit val formats = DefaultFormats的声明从方法内部迁移到App Object的字段位置上即可。
object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    //As Object field, global, static, no need to serialize
    implicit val formats = DefaultFormats

    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        def parser(json: String) = parse(json).extract[Person].firstName
        lines..map(_._2).map(parser).print
        ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(2)
        ssc.stop()
    }

}

最后我们来总结一下应该如何正确的处理Spark Task闭包的序列化问题。首先你需要对Task涉及的闭包的边界要有一个清晰的认识,要尽量地控制闭包的范围,和牵涉到的自由变量,一个非常值得警惕的地方是:尽量不要在闭包中直接引用一个类的成员变量和函数,这样会导致整个类实例被序列化。这样例子在Spark文档中也有提及,如下:
class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

然后,一个好的组织代码的方式是:除了那些很短小的函数,尽量把复杂的操作封装到全局单一的函数体:全局静态方法或者函数对象
如果确实需要某个类的实例参与到计算过程中,则要作好相关的序列化工作。
......显示全文...
    点击查看全文


上一篇文章      下一篇文章      查看所有文章
2016-03-21 22:10:28  
云计算 最新文章
CentOS7上安装Zabbix(快速安装监控工具Zab
十分钟搭建NeuralStyle服务
solr入门之拼写纠错深入研究及代码Demo
3个netty5的例子,简单介绍netty的用法
RedhatOpenshift云平台注册使用
Akka框架——第一节:并发编程简介
Hadoop实战:Linux报tmp磁盘存储不足
linux安装thrift
感觉快更快规划计划高考韩国
solr相似匹配
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 美食菜谱 新闻资讯 电影视频 小游戏 Chinese Culture 股票 租车
生肖星座 三丰软件 视频 开发 短信 中国文化 网文精选 搜图网 美图 阅读网 多播 租车 短信 看图 日历 万年历 2018年1日历
2018-1-20 7:29:54
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --