最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

Spark技术内幕: Shuffle详解(二)

[复制链接]
跳转到指定楼层
楼主
发表于 2015-5-20 15:34:12 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

本文主要关注ShuffledRDD的Shuffle Read是如何从其他的node上读取数据的。

上文讲到了获取如何获取的策略都在org.apache.Spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中。可以见注释。


    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
      // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
      // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
      // nodes, rather than blocking on reading output from one node.
      // 为了快速的得到数据,每次都会启动5个线程去最多5个node上取数据;
      // 每次请求的数据不会超过spark.reducer.maxMbInFlight(默认值为48MB) / 5。
      // 这样做的原因有几个:
      // 1. 避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。
      //    如果一个连接将要占用48M的带宽,这个Network IO可能会成为瓶颈。
      // 2. 请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是那个请求最长的。
      //    如果不是并行请求,那么总时间将是所有的请求时间之和。
      // 而设置spark.reducer.maxMbInFlight,也是为了不要占用过多的内存
      val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
      logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)


      // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
      // at most maxBytesInFlight in order to limit the amount of data in flight.
      val remoteRequests = new ArrayBuffer[FetchRequest]
      var totalBlocks = 0
      for ((address, blockInfos) <- blocksByAddress) { //  address实际上是executor_id
        totalBlocks += blockInfos.size
        if (address == blockManagerId) { //数据在本地,那么直接走local read
          // Filter out zero-sized blocks
          localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)
          _numBlocksToFetch += localBlocksToFetch.size
        } else {
          val iterator = blockInfos.iterator
          var curRequestSize = 0L
          var curBlocks = new ArrayBuffer[(BlockId, Long)]
          while (iterator.hasNext) {
          // blockId 是org.apache.spark.storage.ShuffleBlockId,
          // 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
            val (blockId, size) = iterator.next()
            // Skip empty blocks
            if (size > 0) { //过滤掉为大小为0的文件
              curBlocks += ((blockId, size))
              remoteBlocksToFetch += blockId
              _numBlocksToFetch += 1
              curRequestSize += size
            } else if (size < 0) {
              throw new BlockException(blockId, "Negative block size " + size)
            }
            if (curRequestSize >= targetRequestSize) { // 避免一次请求的数据量过大
              // Add this FetchRequest
              remoteRequests += new FetchRequest(address, curBlocks)
              curBlocks = new ArrayBuffer[(BlockId, Long)]
              logDebug(s"Creating fetch request of $curRequestSize at $address")
              curRequestSize = 0
            }
          }
          // Add in the final request
          if (!curBlocks.isEmpty) { // 将剩余的请求放到最后一个request中。
            remoteRequests += new FetchRequest(address, curBlocks)
          }
        }
      }
      logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
        totalBlocks + " blocks")
      remoteRequests
    }


来自群组: Spark精英汇
楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-6-27 03:37

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表