The Spark Streaming integration for Kafka 0.10 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are notable differences in usage
Each item in the stream is a ConsumerRecord
For possible kafkaParams, see Kafka consumer config docs. If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see Storing Offsets below.
Environment we were using
Spark: 2.3
sparkstreaming-kafka: 0.10
scala: 2.11
export SPARK_MAJOR_VERSION=2
1) Get the latest offset information from the dataStore or the HDFS checkpointing file.
val lastcommitedoffsetdf = spark.read.option("delimiter", ",").schema(SubscriberMediaSchema.buildoffsetschema).option("header", "true").csv(appconf.getString("output.offsetcheckpoint"))
var fromOffsets = scala.collection.mutable.Map[TopicPartition,Long]()
val rows = lastcommitedoffsetdf.collect()
//"topic,partition,fromOffset,untilOffset,count"
rows.foreach (row => fromOffsets.put(new TopicPartition(row.getString(0), row.getInt(1)), row.getLong(3)))
val rows = lastcommitedoffsetdf.collect()
var messagesDStream : InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, ConsumerStrategies.Subscribe[String, String](topicsset, kafkaparams, fromOffsets)
kinit -kt user.keytab user@DNS [ user@BDA.NETSCOUT.COM]
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"
Default producer setup
Start the producer
./bin/kafka-console-producer.sh --broker-list "hostxxxx.test.com:6667,hostxxxx.test.com:6667,tnblfxxxx.test.com:6667" --topic topicname --producer.config=/usr/hdp/2.6.3.13-5/kafka/config/producer.properties --security-protocol SASL_PLAINTEXT
Default Consumer setup
Start the consumer
./bin/kafka-console-consumer.sh --bootstrap-server tnblfxxxx.test.com:6667 --topic topicname --from-beginning --security-protocol SASL_PLAINTEXT
Spark-submit:
Local mode
spark-submit \
--master local[2] \
--driver-memory 2g \
--jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \
--conf spark.driver.extraJavaOptions="-Dconfig.file=/data/kafkaapp/resources/application.conf -Djava.security.auth.login.config=/data/kafkaapp/kafka_local_jaas.conf" \
--class "NimsConsumerStream" $ROOT/bin/myappp-assembly-0.1.jar "arguments"
Yarn client mode
spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 2 \
--jars "/data/kafkaapp/bin/myapp-assembly-0.1-deps.jar" \
--files log4j.properties,application.conf,kafka_jaas.conf,app.truststore.jks,remote.keytab,krb5.conf \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file://log4j.properties -Dlog4j.debug=true -Dconfig.file=application.conf -Djava.security.auth.login.config=kafka_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file://log4j.properties -Dlog4j.debug=true -Dconfig.file=application.conf -Djava.security.auth.login.config=kafka_jaas.conf" \
--class "NimsConsumerStream" \
$ROOT/bin/myappp-assembly-0.1.jar "arguments"
Yarn Cluster mode
spark-submit \
--name "MediaApp" \
--master yarn \
--deploy-mode cluster \
--queue queue-name-exe \
--principal user@DNS.DOMAIN.COM \
--keytab /etc/user.keytab \
--num-executors 10 \
--driver-memory 10g \
--executor-memory 5g \
--executor-cores 5 \
--jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \
--files ${ROOT}/resources/log4j.properties,${ROOT}/resources/application.conf,${ROOT}/keys/kafka_jaas.conf,${ROOT}/keys/app.truststore.jks,${ROOT}/keys/kafka_cluster.keytab,${ROOT}/keys/krb5.conf \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
--conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
--conf "spark.streaming.backpressure.enabled=false" \
--conf "spark.streaming.backpressure.initialRate=30000" \
--conf "spark.streaming.receiver.maxRate=80000" \
--conf "spark.driver.memoryOverhead=5000" \
--conf "spark.executor.memoryOverhead=5000" \
--class "com.kafka.mypackage.myConsumer" \
$ROOT/bin/myapp-assembly-0.1.jar >myappsubmit_stdout_`date +%Y%m%d`.log 2>&1 &
New and better approach is the spark structured streaming using kafka connector which is a unbounded in-memory table using dataframes. we will discuss this in another post.
Excellent work
ReplyDeleteThis is great Sankar !
ReplyDeleteSimple and Clear Awesome
ReplyDelete