sparkstreaming使用实例:用户行为统计

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」。

前面我们已经做好了数据的输入以及数据的输出操作了,现在我们可以开始使用开始 spark streaming 的一些简单的使用了。这次完成的处理是对一个用户行为数据流来做分析。

数据源的数据原本是一个购物网站的数据集,被我做成了重复利用的数据(循环的读取),它共有五个字段。不过这次用到的只有第 3 个字段(代表用户的行为分类)。这个字段的取值范围是 1 到 4 。他们分别代表点击、收藏、加入购物车以及购买这四种行为。

我们每5秒统计一次数据,为此我们先建立一个相应的对象

val ssc = new StreamingContext(context, Seconds(5))
复制代码

其中 context 是一个 SparkContext 对象,另一个参数 5s 则是我们的微批的时间间隔。

然后我们从数据源中读取数据,前面我们已经实现过自己的数据源了,这里我们就直接使用,其他数据源也基本同理。

val customReceiverStream = ssc.receiverStream(new CustomReceiver(sys.env("BUSHOST"),sys.env("BUSPORT").toInt ))
复制代码

这里使用了环境变量来读取服务器的地址,使用命令行传参也可行。主要是为了灵活性。至于谁好谁坏就见仁见智了。

现在开始计算的过程。假设我们把输入的数据映射成这样的结构: ( 用户行为, 1 ) ,这样一来, 这里用户行为的统计其实就是典型的group by 然后组内求和的过程。或者说其实就是经典的单词统计模型。

具体代码如下:

val userBehavior = customReceiverStream
    .map(str => (str.split(",")(2), 1)) //先前说过的只取第三个值
val behaviorCount = userBehavior.reduceByKey(_ + _)
复制代码

有必要的话可能还需要和事实表关联,也就是把用户行为的种类转换到具体的行为,这里我们就不直接关联了,我们用模式匹配在最后输出操作的时候转换,只模拟下效果。

behaviorCount.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach(it => {
                                val behavior_type = it._1 match { // fake join
                                   case "1" => "click"
                                   case "2" => "favorite"
                                   case "3" => "cart"
                                   case "4" => "buy"
                                }
                               //这里输出数据
        })
    }
}
复制代码

如果是redis的话,就可以像前面写的那样处理。
最后程序要运行还需要启动StreamingContext对象

ssc.start()
ssc.awaitTermination()
复制代码

运行之后,我们可以在相应的输出流中进行查看。程序实际上比较简单,主要原因是spark steaming 为我们屏蔽了细节,上面的程序中我们的操作就是时间间隔之内的数据计算,它并不需要我们关注更多无关的部分。

这里只是最基础的内容,他还有更多更强大的功能,我们留到后续再介绍。