设为首页 - 加入收藏 武汉站长网 (http://www.027zz.com)- 国内知名站长资讯网站,提供最新最全的站长资讯,创业经验,网站建设等!
热搜: 什么 系统 手机 区块
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Spark Delta Lake写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去使用及实现原理代码解析

发布时间:2019-10-03 02:01 所属栏目:[教程] 来源:明惠
导读:Delta Lake 写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去按照dt分区 df.write.f

Apache Spark Delta Lake 写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去使用及实现原理代码解析

Delta Lake 写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下:

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/")?
  2. ??
  3. //新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去按照?dt?分区?
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")?
  5. ??
  6. //?覆盖之前的新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去?
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")?

大家可以看出,使用写 Delta 新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去是非常简单的,这也是 Delte Lake 介绍的 100% 兼容 Spark。

Delta Lake 写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去原理

前面简单了解了如何使用 Delta Lake 来写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去,本小结我们将深入介绍 Delta Lake 是如何保证写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去的基本原理以及如何保证事务性。

得益于 Apache Spark 强大的新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去源 API,我们可以很方便的给 Spark 添加任何新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去源,我们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。我们调用上面的写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现如下:

  1. override?def?createRelation(?
  2. ????sqlContext:?SQLContext,?
  3. ????mode:?SaveMode,?
  4. ????parameters:?Map[String,?String],?
  5. ????data:?DataFrame):?BaseRelation?=?{?
  6. ??
  7. ??//?写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去的路径?
  8. ??val?path?=?parameters.getOrElse("path",?{?
  9. ????throw?DeltaErrors.pathNotSpecifiedException?
  10. ??})?
  11. ??
  12. ??//?分区字段?
  13. ??val?partitionColumns?=?parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)?
  14. ????.map(DeltaDataSource.decodePartitioningColumns)?
  15. ????.getOrElse(Nil)?
  16. ??
  17. ??
  18. ??//?事务日志对象?
  19. ??val?deltaLog?=?DeltaLog.forTable(sqlContext.sparkSession,?path)?
  20. ??
  21. ??//?真正的写操作过程?
  22. ??WriteIntoDelta(?
  23. ????deltaLog?=?deltaLog,?
  24. ????mode?=?mode,?
  25. ????new?DeltaOptions(parameters,?sqlContext.sparkSession.sessionState.conf),?
  26. ????partitionColumns?=?partitionColumns,?
  27. ????configuration?=?Map.empty,?
  28. ????data?=?data).run(sqlContext.sparkSession)?
  29. ??
  30. ??deltaLog.createRelation()?
  31. }?

其中 mode 就是保持新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,比如分区字段、新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们需要保存的新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去。

createRelation 方法紧接着就是获取新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去保存的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做很多事情,比如会读取磁盘所有的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面可以拿到最新新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去的版本。由于 deltaLog 的初始化成本比较高,所以 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去保持一小时,缓存大小可以通过 delta.log.cacheSize 参数进行设置。只要写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去的路径是一样的,就只需要初始化一次 deltaLog,后面直接从缓存中拿即可。除非之前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容非常多,所以我们会单独使用一篇文章进行介绍。

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会调用 run 方法执行真正的写新万博app新万博app_西甲联赛预测 万博app_万博体育app 登陆不进去操作。WriteIntoDelta 的 run 方法实现如下:

  1. override?def?run(sparkSession:?SparkSession):?Seq[Row]?=?{?
  2. ????deltaLog.withNewTransaction?{?txn?=>?
  3. ??????val?actions?=?write(txn,?sparkSession)?
  4. ??????val?operation?=?DeltaOperations.Write(mode,?Option(partitionColumns),?options.replaceWhere)?
  5. ??????txn.commit(actions,?operation)?
  6. ????}?
  7. ????Seq.empty?
  8. }?

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

网友评论
推荐文章