Thursday, June 11, 2020

Spark - Kafka Direct Streaming



The Spark Streaming integration for Kafka 0.10 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are notable differences in usage

Each item in the stream is a ConsumerRecord

For possible kafkaParams, see Kafka consumer config docs. If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see Storing Offsets below.


Environment we were using

Spark: 2.3

sparkstreaming-kafka: 0.10

scala: 2.11

export SPARK_MAJOR_VERSION=2



1) Get the latest offset information from the dataStore or the HDFS checkpointing file.
val lastcommitedoffsetdf = spark.read.option("delimiter", ",").schema(SubscriberMediaSchema.buildoffsetschema).option("header", "true").csv(appconf.getString("output.offsetcheckpoint"))
var fromOffsets = scala.collection.mutable.Map[TopicPartition,Long]()
val rows = lastcommitedoffsetdf.collect()
//"topic,partition,fromOffset,untilOffset,count"
rows.foreach (row => fromOffsets.put(new TopicPartition(row.getString(0), row.getInt(1)), row.getLong(3)))

2) Create the TopicPartition --> offset Map
val rows = lastcommitedoffsetdf.collect()

3) Construct the DirectStream with the offsets been created.
var messagesDStream : InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, ConsumerStrategies.Subscribe[String, String](topicsset, kafkaparams, fromOffsets)

Execute the below steps if the cluster is kerberized environment

kinit -kt user.keytab user@DNS   [ user@BDA.NETSCOUT.COM] 

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf" 

 

Default producer setup 

Start the producer


./bin/kafka-console-producer.sh --broker-list "hostxxxx.test.com:6667,hostxxxx.test.com:6667,tnblfxxxx.test.com:6667" --topic topicname --producer.config=/usr/hdp/2.6.3.13-5/kafka/config/producer.properties --security-protocol SASL_PLAINTEXT 

  

Default Consumer setup 

Start the consumer 


./bin/kafka-console-consumer.sh --bootstrap-server tnblfxxxx.test.com:6667 --topic topicname --from-beginning --security-protocol SASL_PLAINTEXT 

  

 

Spark-submit: 

 

Local mode 


spark-submit \ 
--master local[2] \ 
--driver-memory 2g \ 
--jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \ 
--conf spark.driver.extraJavaOptions="-Dconfig.file=/data/kafkaapp/resources/application.conf -Djava.security.auth.login.config=/data/kafkaapp/kafka_local_jaas.conf" \ 
--class "NimsConsumerStream"   $ROOT/bin/myappp-assembly-0.1.jar "arguments" 

 

Yarn client mode 


spark-submit \ 
--master yarn  \ 
--deploy-mode client \ 
--driver-memory 2g \ 
--executor-memory 2g \ 
--executor-cores 2 \ 
--jars "/data/kafkaapp/bin/myapp-assembly-0.1-deps.jar" \ 
--files log4j.properties,application.conf,kafka_jaas.conf,app.truststore.jks,remote.keytab,krb5.conf \ 
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file://log4j.properties -Dlog4j.debug=true -Dconfig.file=application.conf -Djava.security.auth.login.config=kafka_jaas.conf" \ 
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file://log4j.properties -Dlog4j.debug=true -Dconfig.file=application.conf -Djava.security.auth.login.config=kafka_jaas.conf" \ 
--class "NimsConsumerStream" \ 
$ROOT/bin/myappp-assembly-0.1.jar "arguments" 

 

Yarn Cluster mode 


spark-submit \
        --name "MediaApp" \
        --master yarn  \
        --deploy-mode cluster \
        --queue queue-name-exe \
        --principal user@DNS.DOMAIN.COM \
        --keytab /etc/user.keytab \
        --num-executors 10 \
        --driver-memory 10g \
        --executor-memory 5g \
        --executor-cores 5 \
        --jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \
        --files ${ROOT}/resources/log4j.properties,${ROOT}/resources/application.conf,${ROOT}/keys/kafka_jaas.conf,${ROOT}/keys/app.truststore.jks,${ROOT}/keys/kafka_cluster.keytab,${ROOT}/keys/krb5.conf \
        --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
        --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
        --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
        --conf "spark.streaming.backpressure.enabled=false" \
        --conf "spark.streaming.backpressure.initialRate=30000" \
        --conf "spark.streaming.receiver.maxRate=80000" \
        --conf "spark.driver.memoryOverhead=5000" \
        --conf "spark.executor.memoryOverhead=5000" \
        --class "com.kafka.mypackage.myConsumer" \
        $ROOT/bin/myapp-assembly-0.1.jar >myappsubmit_stdout_`date +%Y%m%d`.log 2>&1 &


New and better approach is the spark structured streaming using kafka connector which is a unbounded in-memory table using dataframes. we will discuss this in another post.



3 comments:

Popular Posts

Blogger templates

Blogroll

About

Powered by Blogger.

Wikipedia

Search results