博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark中的广播变量broadcast
阅读量:6438 次
发布时间:2019-06-23

本文共 4788 字,大约阅读时间需要 15 分钟。

Spark中的Broadcast处理

首先先来看一看broadcast的使用代码:

val values = List[Int](1,2,3)

val broadcastValues = sparkContext.broadcast(values)

rdd.mapPartitions(iter => {

  broadcastValues.getValue.foreach(println)

})

 

在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,

最后在rdd的每个partition的迭代时,使用这个广播变量.

 

接下来看看广播变量的生成与数据的读取实现部分:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {

  assertNotStopped()
  if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {

这里要注意,使用broadcast时,不能直接对RDD进行broadcast的操作.

    // This is a warning instead of an exception in order to avoid breaking

//       user programs that

    // might have created RDD broadcast variables but not used them:
    logWarning("Can not directly broadcast RDDs; instead, call collect() and "
      "broadcast the result (see SPARK-5063)")
  }

 

通过broadcastManager中的newBroadcast函数来进行广播.

  val bc = env.broadcastManager.newBroadcast[T](valueisLocal)
  val callSite = getCallSite
  logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
  cleaner.foreach(_.registerBroadcastForCleanup(bc))
  bc
}

 

在BroadcastManager中生成广播变量的函数,这个函数直接使用的broadcastFactory的相应函数.

broadcastFactory的实例通过配置spark.broadcast.factory,

     默认是TorrentBroadcastFactory.

def newBroadcast[T: ClassTag](value_ : TisLocal: Boolean): Broadcast[T] = {

  broadcastFactory.newBroadcast[T](value_isLocal

       nextBroadcastId.getAndIncrement())

}

 

在TorrentBroadcastFactory中生成广播变量的函数:

在这里面,直接生成了一个TorrentBroadcast的实例.

override def newBroadcast[T: ClassTag](value_ : TisLocal: Boolean, id: Long)

: Broadcast[T] = {

  new TorrentBroadcast[T](value_id)
}

 

TorrentBroadcast实例生成时的处理流程:

这里基本的代码部分是直接写入这个要广播的变量,返回的值是这个变量所占用的block的个数.

Broadcast的block的大小通过spark.broadcast.blockSize配置.默认是4MB,

Broadcast的压缩是否通过spark.broadcast.compress配置,默认是true表示启用,默认情况下使用snappy的压缩.

 

private val broadcastId BroadcastBlockId(id)

/** Total number of blocks this broadcast variable contains. */
private val numBlocksInt = writeBlocks(obj)

 

接下来生成一个lazy的属性,这个属性仅仅有在详细的使用时,才会运行,在实例生成时不运行(上面的演示样例中的getValue.foreach时运行).

@transient private lazy val _value= readBroadcastBlock()

override protected def getValue() = {

  _value
}

 

看看实例生成时的writeBlocks的函数:

private def writeBlocks(value: T): Int = {

这里先把这个广播变量保存一份到当前的task的storage中,这样做是保证在读取时,假设要使用这个广播变量的task就是本地的task时,直接从blockManager中本地读取.

  SparkEnv.get.blockManager.putSingle(broadcastIdvalue

StorageLevel.MEMORY_AND_DISK,

    tellMaster = false)

 

这里依据block的设置大小,对value进行序列化/压缩分块,每个块的大小为blocksize的大小,

  val blocks =
    TorrentBroadcast.blockifyObject(valueblockSizeSparkEnv.get.serializer

    compressionCodec)

 

这里把序列化并压缩分块后的blocks进行迭代,存储到blockManager中,

  blocks.zipWithIndex.foreach { case (blocki) =>
    SparkEnv.get.blockManager.putBytes(
      BroadcastBlockId(id"piece" + i),
      block,
      StorageLevel.MEMORY_AND_DISK_SER,
      tellMaster = true)
  }

这个函数的返回值是一个int类型的值,这个值就是序列化压缩存储后block的个数.

  blocks.length
}

 

在我们的演示样例中,使用getValue时,会运行实例初始化时定义的lazy的函数readBroadcastBlock:

private def readBroadcastBlock(): = Utils.tryOrIOException {

  TorrentBroadcast.synchronized {
    setConf(SparkEnv.get.conf)

这里先从local端的blockmanager中直接读取storage中相应此广播变量的内容,假设能读取到,表示这个广播变量已经读取过来或者说这个task就是广播的本地executor.

    SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
      case Some(x) =>
        x.asInstanceOf[T]

以下这部分运行时,表示这个广播变量在当前的executor中是第一次读取,通过readBlocks函数去读取这个广播变量的全部的blocks,反序列化后,直接把这个广播变量存储到本地的blockManager中,下次读取时,就能够直接从本地进行读取.

      case None =>
        logInfo("Started reading broadcast variable " + id)
        val startTimeMs = System.currentTimeMillis()
        val blocks = readBlocks()
        logInfo("Reading broadcast variable " + id + " took" 

              Utils.getUsedTimeMs(startTimeMs))

        val obj = TorrentBroadcast.unBlockifyObject[T](
          blocksSparkEnv.get.serializercompressionCodec)
        // Store the merged copy in BlockManager so other tasks on this executor don't
        // need to re-fetch it.
        SparkEnv.get.blockManager.putSingle(
          broadcastIdobjStorageLevel.MEMORY_AND_DISKtellMaster = false)
        obj
    }
  }
}

 

最后再看看readBlocks函数的处理流程:

private def readBlocks(): Array[ByteBuffer] = {

这里定义的变量用于存储读取到的block的信息,numBlocks是广播变量序列化后所占用的block的个数.

  val blocks = new Array[ByteBuffer](numBlocks)
  val bm = SparkEnv.get.blockManager

这里開始迭代读取每个block的内容,这里的读取是先从local中进行读取,假设local中没有读取到数据时,通过blockManager读取远端的数据,通过读取这个block相应的location从这个location去读取这个block的内容,并存储到本地的blockManager中.最后,这个函数返回读取到的blocks的集合.

  for (pid <- Random.shuffle(Seq.range(0numBlocks))) {
    val pieceId = BroadcastBlockId(id"piece" + pid)
    logDebug(s"Reading piece $pieceId of $broadcastId")
    def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
    def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
      SparkEnv.get.blockManager.putBytes(
        pieceId,
        block,
        StorageLevel.MEMORY_AND_DISK_SER,
        tellMaster = true)
      block
    }
    val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
      throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
    blocks(pid) = block
  }
  blocks
}

转载地址:http://dlzwo.baihongyu.com/

你可能感兴趣的文章
Maven实战(六)--- dependencies与dependencyManagement的区别
查看>>
创业者应该有的5个正常心态(转)
查看>>
php模式设计之 注册树模式
查看>>
【Android UI设计与开发】3.引导界面(三)实现应用程序只启动一次引导界面
查看>>
_ENV和_G
查看>>
别做操之过急的”无效将军”,做实实在在的”日拱一卒” 纵使一年不将军,不可一日不拱卒...
查看>>
Oracle Grid Infrastructure: Understanding Split-Brain Node Eviction (文档 ID 1546004.1)
查看>>
Linux改变进程优先级的nice命令
查看>>
**16.app后端如何保证通讯安全--url签名
查看>>
win32窗口机制之CreateWindow
查看>>
C/C++ 一段代码区分数组指针|指针数组|函数指针|函数指针数组
查看>>
awakeFromNib小总结
查看>>
java知识大全积累篇
查看>>
善于总结所做所学的内容
查看>>
Lua-简洁、轻量、可扩展的脚本语言
查看>>
org.hibernate.MappingException: entity class not found hbm可以解析,但是实体类不能解析...
查看>>
Android -- Drag&&Drop
查看>>
Extjs4:改变Grid单元格背景色(转载)
查看>>
中医无绝症[转载]
查看>>
ZendStudio10.6.1如何安装最新的集成svn小工具?
查看>>