Spark、Spark streaming、Kafka和ES整合
前言
业务需求,记录一下实现。
代码
spark 读写es
spark读es的数据
import org.apache.log4j.Loggerimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark._object GetES { private[this] val LOG = Logger.getLogger(getClass().getName()); def main(args: Array[String]) { val conf = new SparkConf().setAppName("test-es").setMaster("spark://sparkip:7077") .set("es.index.auto.create", "false") .set("es.nodes", "esip") val sc = new SparkContext(conf) val data = sc.esRDD("seventest/test").collect() for (tmp <- data) { LOG.info(tmp)