Spark、Spark streaming、Kafka和ES整合

JerryXia 发表于 , 阅读 (34)

前言

业务需求,记录一下实现。

代码

spark 读写es

spark读es的数据

import org.apache.log4j.Loggerimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark._object GetES {  private[this] val LOG = Logger.getLogger(getClass().getName());  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("test-es").setMaster("spark://sparkip:7077")      .set("es.index.auto.create", "false")      .set("es.nodes", "esip")    val sc = new SparkContext(conf)    val data = sc.esRDD("seventest/test").collect()    for (tmp <- data) {      LOG.info(tmp)