Skip to content

💥 🚀 封装sparkstreaming动态调节batch time(有数据就执行计算);🚀 支持运行过程中增删topic;🚀 封装sparkstreaming 1.6 - kafka 010 用以支持 SSL。

Notifications You must be signed in to change notification settings

LinMingQiang/sparkstreaming

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 

Repository files navigation

🎉v1.6.0-0.10


  • 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的问题
  • 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,StructuredStreaming中使用trigger实现了。)
  • 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
  • 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。
  • 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
  • 支持rdd.updateOffset 来管理偏移量。

🎉 v1.6.0-0.10_ssl


  • 只是结合了 sparkstreaming 1.6 和 kafka 010 。 使低版本的spark能够使用kafka的ssl验证
  • 支持 SSL
  • 支持spark 1.6 和 kafka 0.10 的结合
  • 支持管理offset

🎉v2.0.1-0.10


  • 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的问题
  • 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,StructuredStreaming中使用trigger实现了。)
  • 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
  • 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
  • 支持rdd.updateOffset 来管理偏移量。
  • 由于kakfa-010 的api的变化,之前的 kafka-08 版本的 spark-kafka 虽然能用,但是他依赖于spark-streaming-kafka-0-8_2.10
    .(可能会导致一些版本问题);所以这次重新写了一个 kafka010 & spark-2.x 版本 ;但是使用方法还是跟之前的差不多,
  • kafka010有两种来管理offset的方式,一种是旧版的用zookeeper来管理,一种是本身自带的。现只提供zookeeper的管理方式
  • 要确保编译的kafka-client的版本和服务器端的版本一致,否则会报 Error reading string of length 27489, only 475 bytes available 等错误
  • 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。

🎉Spark-Util_1.6

POINT

  • spark与其他组件的封装api
  • 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,在Structed Streaming中使用trigger触发机制实现);不使用streamingContext 来实现流式计算,因为streamingContext是严格的时间间隔执行job任务,当job时间远小于batchtime时,会有大量的时间是在sleep等待下一个批次执行的到来(具体可以看看streamingContext的源码);StreamingDynamicContext 的设计借鉴了streamingContext的设计。但是在Job的提交上不使用Queue队列来appending堆积的job。当job执行完后,用户可以自己选择是否立刻执行下一个批次的计算,还是选择继续等待指定时长。
  • 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
  • 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。
  • 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
  • 支持rdd.updateOffset 来管理偏移量。
  • 封装 StreamingKafkaContext :你依然可以用 streamingContext来实现流式计算,词Api封装了读取kafka数据。

Support


scala version Kafka version hbase 1.0+ es 2.3.0 kudu 1.3.0 SSL
spark 1.3.x 2.10 0.8 👌 🌟 🍆 NO
spark 1.6.x 2.10 0.8 🐤 🎅 🌽 NO
spark 1.6.x 2.10 0.10+ 🐤 🎅 🌽 YES
spark 2.0.x 2.10/2.11 0.10+ 😃 🍒 🍑 YES

🎃 Table of contents

Spark kafka

  • 封装了StreamingDynamicContext 。动态地调整 streaming的批次间隔时间,不像sparkstreaming的批次间隔时间是固定的(Streaming Kafka DynamicContext is encapsulated. Dynamically adjust the batch interval of streaming, unlike sparkstreaming, where the batch interval is fixed)
  • 使用StreamingDynamicContext 可以让你在流式程序的执行过程中动态的调整你的topic和获取kafkardd的方式。而不需要重新启动程序
  • 添加了 sparkStreaming 1.6 -> kafka 010 的 spark-streaming-kafka-0-10_2.10 。用以支持ssl 。
  • 封装了spark/sparkstreaming direct读取kafka数据的方式;提供rdd.updateOffset方法来手动管理偏移量到zk; 提供配置参数。
    (Encapsulated spark/sparkstreaming to read Kafka with Low level integration (offset in zookeeper)。Provides many configuration parameters to control the way to read Kafka data)
  • 支持topic新增分区
    (Support topic to add new partition)
  • 支持rdd数据写入kafka 的算子
    (Supporting RDD data to write to Kafka)
  • 支持 Kafka SSL (提供spark 1.6 + Kafka 010 的整合api)(sparkstreaming 1.6 with kafka 010 )
    (Support Kafka SSL (0.10+,spark 1.6+))
  • Add parameters : 'kafka.consumer.from' To dynamically decide whether to get Kafka data from last or from consumption point
  • The version support of spark2.x Kafka 0.10+ is provided.(0.8, there is a big change compared to the 0.10 version.)
  • https://github.com/LinMingQiang/spark-util/tree/spark-kafka-0-8_1.6 或者 https://github.com/LinMingQiang/spark-kafka
  val kp = SparkKafkaContext.getKafkaParam(brokers,groupId,"consum","earliest")
  val skc = new SparkKafkaContext(kp,sparkconf)
  val kafkadataRdd = skc.kafkaRDD(topics,last,msgHandle)
  //...do something
  kafkadataRdd.updateOffsets(groupId)//update offset to zk

Spark Hbase

  • 根据scan条件扫描hbase数据成RDD
    (spark scan hbase data to RDD)
    scan -> RDD[T]
  • 根据RDD的数据来批量gethbase
    (spark RDD[T] get from hbase to RDD[U])
    RDD[T] -> Get -> RDD[U]
  • 根据RDD的数据来批量 写入
    spark RDD[T] write to hbase
    RDD[T] -> Put -> Hbase
  • 根据RDD的数据来批量更新rdd数据
    spark RDD[T] update with hbase data
    RDD[T] -> Get -> Combine -> RDD[U]
  • 根据RDD的数据来批量更新rdd数据并写回hbase
    spark RDD[T] update with hbase data then put return to hbase
    RDD[T] -> Get -> Combine -> Put -> Hbase
   val conf = new SparkConf().setMaster("local").setAppName("tets")
   val sc = new SparkContext(conf)
   val hc = new SparkHBaseContext(sc, zk)
   hc.hbaseRDD(tablename, f).foreach { println }
   hc.scanHbaseRDD(tablename, new Scan(), f)

Spark ES Util

sc.esRDD("testindex/testtype", query)

Spark Kudu

Flink kafka

  • 这是一个简单的例子。读取卡夫卡数据,实现WordCount统计并写入HBase
  • This is a simple example. Read Kafka data, implement WordCount statistics and write to HBase

Splunk

  • Splunk是一个日志显示和监视系统
    (Splunk is a log display and monitoring system.)
  • Splunk的安装和使用
    (Installation and use of Splunk)

Kafka Util

  • 操作kafka工具类,提供每天记录主题的偏移量,主要用于日重新计算、小时重新计算等功能。
    Operate the tool class of kafka, provide offset to record topic by day, mainly used for day recalculation, hour recalculation and other functions  

Hbase Util

  • 操作Hbase的工具类,查询HBase表的region信息,用于手动分割过大的region
    The tool class that operates Hbase, inquires the region information of HBase table, used for manual split some excessive region  

database util

  • Provides a connection tool for each database. include: es,hbase,mysql,mongo

Elasticserach shade

  • Resolving conflicts between ES and spark and Hadoop related packages

Rabbitmq util

Releases

No releases published

Packages

No packages published

Languages