当前位置: 首页 > news >正文

个人网站设计介绍文字俄罗斯搜索引擎yandex

个人网站设计介绍文字,俄罗斯搜索引擎yandex,科技 响应式网站模板,做好一个网站后1. UDF函数(用户自定义函数) 一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。 先…

1. UDF函数(用户自定义函数)

一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。

先准备数据:

1.1 导入必要的包

首先,确保导入必要的Spark包:

import org.apache.spark.sql.SparkSession

1.2 创建SparkSession

创建一个SparkSession对象,这是与Spark交互的入口。

1.3 定义UDF并注册到SparkSQL

定义一个Scala函数,并将其注册为UDF。示例

1.4 使用UDF在SQL查询中:

调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。

session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})

1.5 代码:

尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。

object TestUDF{def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF("id", "name", "salary", "bonus")session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})import org.apache.spark.sql.functions
//    df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
//      .select("id","name","all")
//      .show()df.createTempView("salary")session.sql("""|select id,name,all_income(salary,bonus) all from salary|""".stripMargin).show()}
}

输出:

2. UDAF(用户自定义的聚合函数)

指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。

以学生信息为主进行统计,所有人员的年龄的总和

或者每个性别的年龄的平均值

计算所有人的年龄之和:

package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** @Author HeXua* @Create 2024/11/29 19:09*         Version 1.0*/
object TestUDAF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF("id", "name", "age", "gender")import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register("mysum",udaf(new MySum))df.createTempView("student")session.sql("""|select mysum(age) from student|""".stripMargin).show()}
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int = 0//聚合逻辑def reduce(b: Int, a: Int): Int = a+b//整体聚合def merge(b1: Int, b2: Int): Int = b1+b2//最终返回值def finish(reduction: Int): Int = reduction//累加值的类型def bufferEncoder: Encoder[Int] = Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。

泛型参数解释:

1. 输入类型(IN)

这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。

2. 缓冲区类型(BUFFER)

这是聚合函数的中间状态类型,也称为缓冲区类型。

例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。

3. 输出类型(OUT)

这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。

方法解释:

zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。

reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。

finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。

bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。

outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。

计算每个性别的年龄的平均值:

case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo = AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo = {b.cnt += 1b.sum += ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {b1.cnt += b2.cntb1.sum += b2.sumb1}override def finish(reduction: AggragateVo): Double = {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

3. UDTF(用户自定义炸裂函数)

拆分函数,进入的是一行内容出现的结果是多行内容。

spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。

import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt").map(t => {val strs = t.split(",")(strs(0), strs(1), strs(2))}).toDF("id", "name", "actors")//explode map arraydf.createTempView("movies")session.sql("""|select id,name,actor  from movies lateral view explode(split(actors,'\\|')) t as actor|""".stripMargin).createTempView("movies1")session.sql("""|select count(1),actor from movies1 group by actor|""".stripMargin).show()}
}

http://www.ds6.com.cn/news/68700.html

相关文章:

  • 网站加入悬浮客服网站建设公司seo关键词
  • 网页布局类型及实例网店seo名词解释
  • 西安做视频网站公司免费b站推广网站
  • 成品app想找搜索引擎优化
  • 汕头在线制作网站长沙网站制作公司哪家好
  • jquery做背景的网站赏析整合营销方案
  • 部门如何强化政府网站建设网站建设的好公司
  • 郑州网站建设哪家最好短网址
  • 南宁制作营销型网站域名解析查询
  • 做网站的软件有哪些免费建站平台
  • 已被网站管理员设置拦截宁波seo网络优化公司
  • 济南资金盘网站开发公司靠谱吗国外广告联盟平台
  • 域名查询 查询网十堰seo优化
  • dw做的网站链接不会跳转竞价交易
  • 爱站网长尾关键词挖掘查询工具百度秒收录技术
  • 不用编程做网站企业qq多少钱一年
  • php网站开发流程步骤网络销售靠谱吗
  • 忆达城市建设游戏登录网站网站内容优化关键词布局
  • 云霄县建设局网站投诉长尾关键词挖掘爱站工具
  • wordpress+电脑测试seo公司发展前景
  • 公司做的网站版权归谁所有seo优化方案案例
  • 用ps如何做短视频网站沈阳百度推广哪家好
  • 商丘网站制作费用seo推广网络
  • 两学一做党员答题网站seo优化技巧
  • 网站建设论文500字学习软件
  • 深圳网站制作台网络推广员的工作内容
  • 网站自然优化自学我国的网络营销公司
  • 游戏币销售网站建设产品网络推广的方法有哪些
  • 陕西网站备案查询360优化大师官方网站
  • 企业网站托管价格如何创建微信小程序