最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

spark双流join

[复制链接]
跳转到指定楼层
楼主
发表于 2021-2-8 21:28:32 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2021-2-8 21:30 编辑

flink流的join原理不同的是,Spark双流join是对俩个流做满外连接 ,因为网络延迟等关系,不能保证每个窗口中的数据key都能匹配上,这样势必会出现三种情况:(some,some),(None,some),(Some,None),根据这三种情况,下面做一下详细解析:

(some,some)—— 1号流和2号流中key能正常进行逻辑运算,但是考虑到2号流后续可能会有剩下的数据到来,所以需要将1号流中的key保存到redis,以等待接下来的数据
(None,Some)—— 找不到1号流中对应key的数据,需要去redis中查找1号流的缓存,如果找不到,则缓存起来,等待1号流
(Some,None)—— 找不到2号流中的数据,需要将key保存到redis,以等待接下来的数据,并且去reids中找2号流的缓存,如果有,则join,然后删除2号流的缓存

代码示例
[AppleScript] 纯文本查看 复制代码
def fullJoin(orderInfoStream: DStream[OrderInfo], orderDetailStream: DStream[OrderDetail]) = {
        val orderIdAndOrderInfo: DStream[(String, OrderInfo)] =
            orderInfoStream.map(info => (info.id, info))
        val orderIdAndOrderDetail: DStream[(String, OrderDetail)] =
            orderDetailStream.map(info => (info.order_id, info))
        
        orderIdAndOrderInfo
            .fullOuterJoin(orderIdAndOrderDetail)
            .mapPartitions((it: Iterator[(String, (Option[OrderInfo], Option[OrderDetail]))]) => {
                // 获取redis客户端
                val client: Jedis = RedisUtil.getClient
                // 读写操作
                val result: Iterator[SaleDetail] = it.flatMap {
                    // order_info有数据, order_detail有数据
                    case (orderId, (Some(orderInfo), Some(orderDetail))) =>
                        println("Some(orderInfo)   Some(orderDetail)")
                        // 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
                        cacheOrderInfo(orderInfo, client)
                        // 2. 把信息join到一起(其实就是放入一个样例类中)  (缺少用户信息, 后面再专门补充)
                        val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        // 3. 去order_detail的缓存找数据, 进行join
                        // 3.1 先获取这个order_id对应的所有的order_detail的key
                        import scala.collection.JavaConversions._
                        val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
                        val saleDetails: List[SaleDetail] = keys.map(key => {
                            val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                            // 删除对应的key, 如果不删, 有可能造成数据重复
                            client.del(key)
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        })
                        saleDetail :: saleDetails
                    case (orderId, (Some(orderInfo), None)) =>
                        println("Some(orderInfo), None")
                        // 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
                        cacheOrderInfo(orderInfo, client)
                        // 3. 去order_detail的缓存找数据, 进行join
                        // 3.1 先获取这个order_id对应的所有的order_detail的key
                        import scala.collection.JavaConversions._
                        val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
                        val saleDetails: List[SaleDetail] = keys.map(key => {
                            val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                            // 删除对应的key, 如果不删, 有可能造成数据重复
                            client.del(key)
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        })
                        saleDetails
                    case (orderId, (None, Some(orderDetail))) =>
                        println("None, Some(orderDetail)")
                        // 1. 去order_info的缓存中查找
                        val orderInfoJson = client.get("order_info:" + orderDetail.order_id)
                        if (orderInfoJson == null) {
                            // 3. 如果不存在, 则order_detail缓存
                            cacheOrderDetail(orderDetail, client)
                            Nil
                        } else {
                            // 2. 如果存在, 则join
                            val orderInfo = JSON.parseObject(orderInfoJson, classOf[OrderInfo])
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
                        }
                }
                
                // 关闭redis客户端
                client.close()
                
                result
            })
        
    }

本文作者Sheep Sun
本文链接https://www.cnblogs.com/yangxusun9/p/13137592.html

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-6-18 19:30

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表