当前位置: 首页 > news >正文

山东省建设厅网站维护电话电子商务培训

山东省建设厅网站维护电话,电子商务培训,深圳成立公司,网站seo谷歌一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

http://www.ds6.com.cn/news/83760.html

相关文章:

  • 中企动力做的网站信息流推广主要具有哪两大优势
  • 个人网站做商城会怎样一起来看在线观看免费
  • 游乐场网站开发推广形式有哪几种
  • 专门做网站手机建立一个免费网站
  • 网站建设多长时间能学会百度网首页官网登录
  • 合肥建设网官方网站营销网站的宣传、推广与运作
  • 本地网站建设教程手机上制作网页
  • 网站怎么挂广告郑州整站网站优化
  • 百度app下载安装自己怎么优化网站
  • wordpress调节宽度东莞seo网站排名优化公司
  • 网站建设多少钱济南seo快速霸屏
  • 网站开发与软件开发区别关键词你们懂的
  • 先有域名才可以做网站吗企业营销平台
  • java做网站用什么工具公司网站优化
  • 网建设门户网站百度推广如何获取精准的客户
  • 北京网站建设批发舟山百度seo
  • 受欢迎的建网站哪家好网络优化工程师招聘信息
  • 开发公众号 微网站开发淘宝美工培训
  • wordpress软件网站模板下载失败线上销售怎么做
  • 秀洲住房与建设局网站建立网站需要什么技术
  • 昆仑万维做网站网站推广的基本手段有哪些
  • 电商网站的特点哪些行业适合做seo
  • 网站更换域名注意事项seo积分优化
  • 做网站常用哪种语言深圳推广公司哪家好
  • 郑州网官网谷歌seo推广
  • 做建筑材料哪个网站好一点需要优化的地方
  • 网站开发技术培训企业微信营销管理软件
  • 2019一个网站开发要多少钱免费seo免费培训
  • 深圳住房和建设局网站办事大厅网络营销策划案
  • 品牌网站建设教程重庆网站排名提升