RDD (Resilient Distributed Dataset) : This can be viewed as logical representation of a data structure by joining different partitions of data from different data nodes.
RDD can be created in two ways, besides creating new rdd from a rdd with a transformation
1. Parallelize a collection
val wordsRDD = sc.parallelize(List("fish", "cat", "dog"))
2. Read data from an External source (S3, HDFS, stream, etc..)
var linesRDD = sc.textFile("/path/to/README.md")
- on a default partitioning, each data feed will be split into multiple blocks and each block will be fed to
each partition
- on applying transformation (.filter(f(x))), data will be filtered but the number of partitions remain the same, though some of the partitions are empty
- coalesce(N) [co-allocation] function will bring down the total number of partitions to N
- collect() , this is when the DAG (Directed A cyclic Graph) plan executed to run different transformations defined and collect all the data from different nodes to the master node
environment variable used to set the spark version
export SPARK_MAJOR_VERSION=2
There are 4 different ways to run spark: They are explained below.
numcore setting in spark tells that how many task can be executed on worker nodes Executed JVM,
these thread/task on a single JVM called cores in spark
1) localmode
static partitioning
./bin/spark-shell --master local[2]
./bin/spark-submit --name "myfirstapp" --master local[2] myjar.jar
when a node has 4 cpu, make sure we will set numcore setting to multiple 2 or , which is 4*2=8 to get better performance
2) standalone scheduler
. static partitioning
spark-env.sh -- SPARK_LOCAL_DIRS
spark.cores.max - maximum amount of CPU cores to request for the application from across the cluster
spark.executor.memory - Memory for each executor
3) yarn
. dynamic partitioning
. two ways to run spark in yarn
1. client mode
2. cluster mode
--num-executors: controls how many executor jvm's to allocated
--executor-memory: heap memory for each executor
--executor-cores: CPU cores for each executor
dynamic executor allocation
spark.dynamicAllocation.enabled
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
spark.dynamicAllocation.schedulerBacklogTimeout
spark.dynamicAllocation.executorIdelTimeout
>>spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn
/opt/cloud/cdh/jars/examples-cdh.jar 10
>>spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master yarn
/opt/cloud/cdh/jars/examples-cdh.jar 10
4) Mesos
. dynamic partitioning
Mesos just act as orchestration for hadoop platform similar to YARN, but it is in cloudera distribution.
persist RDD;
RDD.cache() === RDD.persist(MEMEROY_ONLY)
default memory allocation in executor JVM
60% cached RDD (spark.storage.memoryFraction)
20% shuffel memory (spark.shuffle.memoryFraction)
20% User program
val conf = new SprakConf().setMaster(..).setAppName(..)
conf.registerKryoClasses(Seq(classOf[Myclass1], classOf[Myclass2]))
val sc = new SparkContext(conf)
if the objects are large we may need to increase the
spark.kryoserializer.buffer.mb config property, default is 2
Setting to measure the GC impact
use the below JVM options
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
RDDname.toDebugString: used to see the different stages of a RDD
Project Tungsten: used in DataFrame and DataSet
spark internals
Broadcast variable: when we want to send a read only data to each task, it is better to make that variable broadcast so that it will be send only once to the node and been used by different tasks with the Executed JVM
val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
Accumilators: these are used to count number of failed records, sum of some events or any debugging purpose
val accum = sc.accumulator(0)
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
accum.value
ways to send large data over the cluster of nodes
old technique: send 20MB files over to executers using http
new technique: using the bittorent protocal and driver will seed the data to executer in chunks and each executer shares the data them self and construct the full blown data.
spark-submit \--name "MediaApp" \
--master yarn \
--deploy-mode cluster \
--queue quename-exe \
--principal user@DNS.DOMAIN.COM \
--keytab /etc/user.keytab \
--num-executors 10 \
--driver-memory 10g \
--executor-memory 5g \
--executor-cores 5 \
--jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \
--files ${ROOT}/resources/log4j.properties,${ROOT}/resources/application.conf,${ROOT}/keys/kafka_jaas.conf,${ROOT}/keys/app.truststore.jks,${ROOT}/keys/kafka_cluster.keytab,${ROOT}/keys/krb5.conf \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
--conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
--conf "spark.streaming.backpressure.enabled=false" \
--conf "spark.streaming.backpressure.initialRate=30000" \
--conf "spark.streaming.receiver.maxRate=80000" \
--conf "spark.driver.memoryOverhead=5000" \
--conf "spark.executor.memoryOverhead=5000" \
--class "com.kafka.mypackage.myConsumer" \
$ROOT/bin/myapp-assembly-0.1.jar >myappsubmit_stdout_`date +%Y%m%d`.log 2>&1 &
#log4j.rootLogger=INFO,stdoutlog4j.rootLogger=INFO,rolling
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
#log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.layout.conversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{3}:%L - %m%n
log4j.appender.rolling.maxFileSize=10MB
log4j.appender.rolling.maxBackupIndex=50
log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/myapp-spark-streaming.log
log4j.appender.rolling.encoding=UTF-8
# Turn on all our debugging info
log4j.logger.org.apache.kafka=DEBUG,rolling
log4j.logger.org.apache.spark=DEBUG,rolling
log4j.logger.com.sprint.netscout.kafka.consumer=DEBUG,rolling
log4j.additivity.org.apache.kafka=false
log4j.additivity.org.apache.spark=false
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
log4j.logger.kafka.consumer=TRACE,stdout
log4j.logger.kafka.request.logger=TRACE,fileAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.network.Processor=TRACE,fileAppender
log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
KafkaClient{com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./kafkacluster.keytab"
storeKey=true
serviceName="kafka"
principal="kafkauser@DNS.KAFKA.COM";
};
default = FILE:/logs/krb5libs.log[libdefaults]
default_realm = DNS.KAFKA.COM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
udp_preference_limit = 1
[realms]
DNS.KAFKA.COM = {
kdc = kdc.server.dns:7088
admin_server = kdc.server.dns
}
[domain_realm]
.dns.applocal.com = DNS.KAFKA.COM
dns.applocal.com = DNS.KAFKA.COM
app.spark{spark.sql.orc.impl : "native"
spark.executor.heartbeatInterval : 20
spark.network.timeout = 120s
}
app.kafka{
bootstrap.servers : "brk.host.com:9093,brk.host.com:9093"
group.id : "groupname"
topic.name : "topicname"
enable.auto.commit : "false"
//possible values earliest , latest , none
auto.offset.reset : "earliest"
spark.streaming.kafka.consumer.cache.enabled : "false"
//request.timeout.ms > session.timeout.ms
request.timeout.ms : 660000
session.timeout.ms : 600000
}
app.kerberose{
security.protocol : "SASL_SSL"
ssl.enabled : true
java.security.auth.login.config : "./kafka_jaas.conf"
java.security.krb5.conf : "./krb5.conf"
sasl.mechanism : "GSSAPI"
sasl.kerberos.service.name : "kafka"
ssl.truststore.location : "./app.truststore.jks"
ssl.truststore.password : "password"
}
0 comments:
Post a Comment