SparkStreaming开发示例原创
# 1. 开发Spark
# 1.1. 配置pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.18</scala.version>
<spark.version>3.5.0</spark.version>
<hadoop.version>3.3.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cos</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 外部集群支持 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 1.2. SparkStreamingTest
object SparkStreamingTest {
val batchSeconds = 3600
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("SparkStreamingTest")
.setMaster("yarn")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //设置序列化
.set("spark.default.parallelism", "126") //设备task并行
.set("spark.scheduler.mode", "FAIR") //公平调度,合理分配资源
val ssc = new StreamingContext(sparkConf, Seconds(batchSeconds))
//latest,earliest
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafkaHost:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "spark-streaming-consumer-group",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val topics = Array("Spark-Test")
val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
kafkaDataDS.persist(StorageLevel.MEMORY_AND_DISK)
kafkaDataDS.foreachRDD((rdd, bTime) => {
if (!rdd.isEmpty()) {
println("-------------------------------------------")
println(s"Time: $bTime")
println("-------------------------------------------")
processRDD(rdd, bTime)
}
})
ssc.start()
ssc.awaitTermination()
}
def processRDD(rdd: RDD[ConsumerRecord[String, String]], bTime: Time): Unit = {
val bTimeMs: Long = bTime.milliseconds - (batchSeconds * 1000)
val rest: RDD[String] = rdd.map(record => {
val key = record.key
val value = record.value
(key,value)
}).reduceByKey(x,y => {
x+y
}).map(line => {
line._1 + "|" + line._2
})
val bTimeStr: String = new SimpleDateFormat("yyyy-MM-dd/HH/").format(bTimeMs)
rest.saveAsTextFile(s"cosn://bucket/sparktest/$bTimeStr")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 1.3. 打包插件
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>*</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 1.4. 打包发布
mvn clean package
- jar包上传集群
$ spark-submit
--master yarn \
--deploy-mode cluster \
--class com.spark.SparkStreamingTest \
--executor-memory 7g \
--executor-cores 14 \
--num-executors 9 spark-extractor-1.0-SNAPSHOT.jar > ../logs/SparkStreamingTest-2024-03-08.log
1
2
3
4
5
6
7
2
3
4
5
6
7
上次更新: 2024/06/28, 14:46:16
- 02
- 2025-03-28拍婚纱照 原创04-02
- 03
- 2024-04-05的晚上 原创04-01