花开不停 花开不停
首页
  • 追忆杂谈
  • 书信往来
  • 旅行记录
  • 文定之喜
  • 黄金屋-颜如玉
  • 程序化广告
  • 禅茶一味
  • 随写编年
  • 家人物语
  • 追忆青春
  • 大千世界
  • Shell
  • Java
  • Spark
  • Hadoop
  • ClickHouse
  • MySQL
  • PostgreSQL
  • MongoDB
  • 调度器
  • Zookeeper
  • Kafka
  • Flume
  • 学习周刊
关于
  • 分类
  • 标签
  • 归档
开往 (opens new window)

花开不停

此心光明,亦复何言
首页
  • 追忆杂谈
  • 书信往来
  • 旅行记录
  • 文定之喜
  • 黄金屋-颜如玉
  • 程序化广告
  • 禅茶一味
  • 随写编年
  • 家人物语
  • 追忆青春
  • 大千世界
  • Shell
  • Java
  • Spark
  • Hadoop
  • ClickHouse
  • MySQL
  • PostgreSQL
  • MongoDB
  • 调度器
  • Zookeeper
  • Kafka
  • Flume
  • 学习周刊
关于
  • 分类
  • 标签
  • 归档
开往 (opens new window)
  • Shell编程

  • Java编程笔记

  • Spark

    • Spark On Yarn 部署
    • Spark本地Idea开发环境部署
    • SparkStreaming消费Kafka
    • Spark-submit 参数配置
    • SparkStreaming开发示例
      • 1. 开发Spark
        • 1.1. 配置pom.xml
        • 1.2. SparkStreamingTest
        • 1.3. 打包插件
        • 1.4. 打包发布
    • Zeppelin Spark解释器
  • Hadoop

  • ClickHouse

  • MySQL

  • PostgreSQL

  • MongoDB

  • 调度器

  • Zookeeper

  • Kafka

  • Flume

  • 编程世界
  • Spark
花开不停
2024-03-08
目录

SparkStreaming开发示例原创

# 1. 开发Spark

# 1.1. 配置pom.xml

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.18</scala.version>
        <spark.version>3.5.0</spark.version>
        <hadoop.version>3.3.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-cos</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 外部集群支持 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 1.2. SparkStreamingTest

object SparkStreamingTest {

	val batchSeconds = 3600

	def main(args: Array[String]): Unit = {
		val sparkConf = new SparkConf()
			.setAppName("SparkStreamingTest")
			.setMaster("yarn")
			.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //设置序列化
			.set("spark.default.parallelism", "126") //设备task并行
			.set("spark.scheduler.mode", "FAIR")  //公平调度,合理分配资源

		val ssc = new StreamingContext(sparkConf, Seconds(batchSeconds))

		//latest,earliest
		val kafkaParams = Map[String, Object](
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafkaHost:9092",
			ConsumerConfig.GROUP_ID_CONFIG -> "spark-streaming-consumer-group",
			ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
		)
		val topics = Array("Spark-Test")
		val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
			ssc,
			PreferConsistent,
			Subscribe[String, String](topics, kafkaParams)
		)

		kafkaDataDS.persist(StorageLevel.MEMORY_AND_DISK)

		kafkaDataDS.foreachRDD((rdd, bTime) => {
			if (!rdd.isEmpty()) {
				println("-------------------------------------------")
				println(s"Time: $bTime")
				println("-------------------------------------------")
				processRDD(rdd, bTime)
			}
		})

		ssc.start()
		ssc.awaitTermination()
	}

	def processRDD(rdd: RDD[ConsumerRecord[String, String]], bTime: Time): Unit = {
		val bTimeMs: Long = bTime.milliseconds - (batchSeconds * 1000)

		val rest: RDD[String] = rdd.map(record => {
            val key = record.key
            val value = record.value
            (key,value)
        }).reduceByKey(x,y => {
            x+y
        }).map(line => {
			line._1 + "|" + line._2
		})
		val bTimeStr: String = new SimpleDateFormat("yyyy-MM-dd/HH/").format(bTimeMs)
		rest.saveAsTextFile(s"cosn://bucket/sparktest/$bTimeStr")
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# 1.3. 打包插件

    <build>
       <resources>
           <resource>
               <directory>src/main/resources</directory>
               <excludes>
                   <exclude>*</exclude>
               </excludes>
           </resource>
       </resources>

        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.5</arg>
                    </args>
                </configuration>
            </plugin>
        </plugins>
    </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 1.4. 打包发布

mvn clean package

  • jar包上传集群
$ spark-submit 
--master yarn  \
--deploy-mode cluster  \
--class com.spark.SparkStreamingTest  \
--executor-memory 7g   \
--executor-cores 14  \
--num-executors 9 spark-extractor-1.0-SNAPSHOT.jar > ../logs/SparkStreamingTest-2024-03-08.log
1
2
3
4
5
6
7
上次更新: 2024/06/28, 14:46:16

← Spark-submit 参数配置 Zeppelin Spark解释器→

最近更新
01
2025-05-26当我意识到我变得自私、暴躁、情绪不受控制 原创
05-26
02
clickhouse版本升级的语法变动21.8.9.1至23.8.9.1 原创
04-22
03
2025-03-28拍婚纱照 原创
04-02
更多文章>
Theme by Vdoing | Copyright © 2023-2025 | 京ICP备2023013437号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式