package com.ibm.sifs.ecomm.util;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

/* loaded from: input_file:com/ibm/sifs/ecomm/util/KafkaUtil.class */
public class KafkaUtil {
    private SparkContext sparkContext;
    Logger sifsLogger;
    private Map<String, Object> kafkaParams;
    private Long duration;

    public KafkaUtil() throws Exception {
        this.sparkContext = null;
        this.sifsLogger = LogManager.getLogger(KafkaUtil.class);
        this.kafkaParams = null;
        this.duration = null;
    }

    public KafkaUtil(Properties properties, SparkSession sparkSession) throws Exception {
        this.sparkContext = null;
        this.sifsLogger = LogManager.getLogger(KafkaUtil.class);
        this.kafkaParams = null;
        this.duration = null;
        this.sparkContext = sparkSession.sparkContext();
        this.sifsLogger.debug("Initializing KafkaUtil");
        initialize(sparkSession);
        this.duration = Long.valueOf(properties.getProperty("kafkaDuration"));
    }

    private synchronized void initialize(SparkSession sparkSession) throws Exception {
        this.kafkaParams = new HashMap();
        this.kafkaParams.put("bootstrap.servers", sparkSession.conf().get("bootstrap.servers"));
        this.kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.kafkaParams.put("group.id", sparkSession.conf().get("group.id"));
        this.kafkaParams.put("auto.offset.reset", sparkSession.conf().get("auto.offset.reset"));
        this.kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.kafkaParams.put("max.partition.fetch.bytes", sparkSession.conf().get("max.partition.fetch.bytes"));
        if (Boolean.valueOf(sparkSession.conf().get("KafkaSSLEnabled")).booleanValue()) {
            this.kafkaParams.put("security.protocol", sparkSession.conf().get("security.protocol"));
            this.kafkaParams.put("ssl.truststore.location", sparkSession.conf().get("ssl.truststore.location"));
            this.kafkaParams.put("ssl.truststore.password", sparkSession.conf().get("ssl.truststore.password"));
            this.kafkaParams.put("ssl.keystore.location", sparkSession.conf().get("ssl.keystore.location"));
            this.kafkaParams.put("ssl.keystore.password", sparkSession.conf().get("ssl.keystore.password"));
            this.kafkaParams.put("ssl.key.password", sparkSession.conf().get("ssl.key.password"));
        }
    }

    public JavaInputDStream<ConsumerRecord<String, byte[]>> getInputStream(String str) throws Exception {
        return KafkaUtils.createDirectStream(new JavaStreamingContext(new StreamingContext(this.sparkContext, Durations.seconds(this.duration.longValue()))), LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(str), this.kafkaParams));
    }

    public void publishToKafka(Dataset<Row> dataset, String str, Properties properties) {
        try {
            dataset.write().format("kafka").option("kafka.bootstrap.servers", properties.getProperty("bootstrap.servers")).option("topic", str).option("kafka.security.protocol", properties.getProperty("security.protocol")).option("kafka.ssl.truststore.location", properties.getProperty("ssl.truststore.location")).option("kafka.ssl.truststore.password", Utility.decode(properties.getProperty("ssl.truststore.password"))).option("kafka.ssl.keystore.location", properties.getProperty("ssl.keystore.location")).option("kafka.ssl.keystore.password", Utility.decode(properties.getProperty("ssl.keystore.password"))).option("kafka.ssl.key.password", Utility.decode(properties.getProperty("ssl.key.password"))).option("kafka.max.request.size", properties.getProperty("max.request.size")).save();
            this.sifsLogger.info("Records published to Kafka are " + dataset.count());
        } catch (Exception e) {
            this.sifsLogger.error("Error while publishing to kafka ", e);
        }
    }
}
