Wednesday, January 10, 2018

Spark basics


RDD  (Resilient Distributed Dataset) : This can be viewed as logical representation of a data structure by joining different partitions of data from different data nodes.

RDD can be created in two ways, besides creating new rdd from a rdd with a transformation

1. Parallelize a collection 

val wordsRDD = sc.parallelize(List("fish", "cat", "dog")) 

  

2. Read data from an External source (S3, HDFS, stream, etc..) 

var linesRDD = sc.textFile("/path/to/README.md") 

  

- on a default partitioning, each data feed will be split into multiple blocks and each block will be fed to  

each partition 

- on applying transformation (.filter(f(x))), data will be filtered but the number of partitions remain the same, though some of the partitions are empty 

- coalesce(N) [co-allocation] function will bring down the total number of partitions to N

- collect() , this is when the DAG (Directed A cyclic Graph) plan executed to run different transformations defined and collect all the data from different nodes to the master node 

  

 

environment variable used to set the spark version

export SPARK_MAJOR_VERSION=2


There are 4 different ways to run spark: They are explained below.

numcore setting in spark tells that how many task can be executed on worker nodes Executed JVM, 

these thread/task on a single JVM called cores in spark 


1) localmode 

static partitioning 

./bin/spark-shell --master local[2] 
./bin/spark-submit --name "myfirstapp" --master local[2] myjar.jar 

  

when a node has 4 cpu, make sure we will set numcore setting to multiple 2 or , which is 4*2=8 to get better performance


2) standalone scheduler 

. static partitioning 

spark-env.sh -- SPARK_LOCAL_DIRS 

spark.cores.max - maximum amount of CPU cores to request for the application from across the cluster 

spark.executor.memory - Memory for each executor 


3) yarn 

. dynamic partitioning 

. two ways to run spark in yarn 

    1. client mode 

    2. cluster mode 

     

    --num-executors: controls how many executor jvm's to allocated 

    --executor-memory: heap memory for each executor 

    --executor-cores: CPU cores for each executor 

     

    dynamic executor allocation 

     spark.dynamicAllocation.enabled 
    spark.dynamicAllocation.minExecutors 
    spark.dynamicAllocation.maxExecutors 
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 
    spark.dynamicAllocation.schedulerBacklogTimeout 
    spark.dynamicAllocation.executorIdelTimeout 

     

    >>spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn 

    /opt/cloud/cdh/jars/examples-cdh.jar 10 

     

    >>spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master yarn 

    /opt/cloud/cdh/jars/examples-cdh.jar 10 

     

4) Mesos 

. dynamic partitioning 

Mesos just act as orchestration for hadoop platform similar to YARN, but it is in cloudera distribution. 



persist RDD;

RDD.cache() === RDD.persist(MEMEROY_ONLY)


default memory allocation in executor JVM 

60% cached RDD (spark.storage.memoryFraction) 

20% shuffel memory (spark.shuffle.memoryFraction) 

20% User program 



val conf = new SprakConf().setMaster(..).setAppName(..) 
conf.registerKryoClasses(Seq(classOf[Myclass1], classOf[Myclass2])) 
val sc = new SparkContext(conf) 

  

if the objects are large we may need to increase the  

spark.kryoserializer.buffer.mb config property, default is 2 

  

 

Setting to measure the GC impact

use the below JVM options 

-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 

  

RDDname.toDebugString: used to see the different stages of a RDD 

  

Project Tungsten: used in DataFrame and DataSet 

 

spark internals 

  

Broadcast variable: when we want to send a read only data to each task, it is better to make that variable broadcast so that it will be send only once to the node and been used by different tasks with the Executed JVM 

  

val broadcastVar = sc.broadcast(Array(1,2,3)) 
broadcastVar.value 


Accumilators: these are used to count number of failed records, sum of some events or any debugging purpose 

  

val accum = sc.accumulator(0) 
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x) 
accum.value 



ways to send large data over the cluster of nodes

old technique: send 20MB files over to executers using http 

new technique: using the bittorent protocal and driver will seed the data to executer in chunks and each executer shares the data them self and construct the full blown data. 

  

  


off-heap storage for job processing: please read about tachyon

Sample Spark Streaming Kafka application in Kerberized cluster setup:

1) 
spark-submit \
        --name "MediaApp" \
        --master yarn  \
        --deploy-mode cluster \
        --queue quename-exe \
        --principal user@DNS.DOMAIN.COM \
        --keytab /etc/user.keytab \
        --num-executors 10 \
        --driver-memory 10g \
        --executor-memory 5g \
        --executor-cores 5 \
        --jars "$ROOT/bin/myappp-assembly-0.1-deps.jar" \
        --files ${ROOT}/resources/log4j.properties,${ROOT}/resources/application.conf,${ROOT}/keys/kafka_jaas.conf,${ROOT}/keys/app.truststore.jks,${ROOT}/keys/kafka_cluster.keytab,${ROOT}/keys/krb5.conf \
        --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
        --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties -Dlog4j.debug=true -Dconfig.file=./application.conf -Djava.security.auth.login.config=./kafka_jaas.conf -Djava.security.krb5.conf=./krb5.conf" \
        --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
        --conf "spark.streaming.backpressure.enabled=false" \
        --conf "spark.streaming.backpressure.initialRate=30000" \
        --conf "spark.streaming.receiver.maxRate=80000" \
        --conf "spark.driver.memoryOverhead=5000" \
        --conf "spark.executor.memoryOverhead=5000" \
        --class "com.kafka.mypackage.myConsumer" \
        $ROOT/bin/myapp-assembly-0.1.jar >myappsubmit_stdout_`date +%Y%m%d`.log 2>&1 &
2) 
resources/log4j.properties
#log4j.rootLogger=INFO,stdout
log4j.rootLogger=INFO,rolling
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
#log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.layout.conversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{3}:%L - %m%n
log4j.appender.rolling.maxFileSize=10MB
log4j.appender.rolling.maxBackupIndex=50
log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/myapp-spark-streaming.log
log4j.appender.rolling.encoding=UTF-8
# Turn on all our debugging info
log4j.logger.org.apache.kafka=DEBUG,rolling
log4j.logger.org.apache.spark=DEBUG,rolling
log4j.logger.com.sprint.netscout.kafka.consumer=DEBUG,rolling
log4j.additivity.org.apache.kafka=false
log4j.additivity.org.apache.spark=false
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
log4j.logger.kafka.consumer=TRACE,stdout
log4j.logger.kafka.request.logger=TRACE,fileAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.network.Processor=TRACE,fileAppender
log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG

3)
keys/kafka_jaas.conf
KafkaClient{
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="./kafkacluster.keytab"
    storeKey=true
    serviceName="kafka"
    principal="kafkauser@DNS.KAFKA.COM";
    };


4) set up the keys/krb5.conf correctly.
 default = FILE:/logs/krb5libs.log
[libdefaults]
 default_realm = DNS.KAFKA.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true
 udp_preference_limit = 1
[realms]
 DNS.KAFKA.COM = {
  kdc = kdc.server.dns:7088
  admin_server = kdc.server.dns
 }

[domain_realm]
 .dns.applocal.com = DNS.KAFKA.COM
 dns.applocal.com = DNS.KAFKA.COM


5) resources/application.conf
app.spark{
  spark.sql.orc.impl : "native"
  spark.executor.heartbeatInterval : 20
  spark.network.timeout = 120s
}
app.kafka{
  bootstrap.servers : "brk.host.com:9093,brk.host.com:9093"
  group.id : "groupname"
  topic.name : "topicname"
  enable.auto.commit : "false"
//possible values earliest , latest , none
  auto.offset.reset : "earliest"
  spark.streaming.kafka.consumer.cache.enabled : "false"
//request.timeout.ms  > session.timeout.ms
  request.timeout.ms : 660000
  session.timeout.ms : 600000
}
app.kerberose{
  security.protocol : "SASL_SSL"
  ssl.enabled : true
  java.security.auth.login.config : "./kafka_jaas.conf"
  java.security.krb5.conf : "./krb5.conf"
  sasl.mechanism : "GSSAPI"
  sasl.kerberos.service.name : "kafka"
  ssl.truststore.location : "./app.truststore.jks"
  ssl.truststore.password : "password"
}

Popular Posts

Blogger templates

Blogroll

About

Powered by Blogger.

Wikipedia

Search results