package com.ibm.sifs.ecomm.util;

import com.ibm.fci.jwt.FciJwt;
import com.ibm.sifs.act.util.XMLParser;
import com.ibm.sifs.common.persistence.EvidencePersistence;
import com.ibm.sifs.ecomm.ECommConstants;
import com.ibm.sifs.ecomm.pojo.Party;
import com.ibm.sifs.ecomm.pojo.RiskIndicator;
import com.ibm.sifs.services.client.SIFSRestClient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.json.JSONArray;
import org.json.JSONObject;

/* loaded from: input_file:com/ibm/sifs/ecomm/util/SparkUtil.class */
public class SparkUtil {
    static transient Logger sifsLogger = LogManager.getLogger(SparkUtil.class);

    public static StructType getRiskIndicatorStructType() {
        return Encoders.bean(RiskIndicator.class).schema();
    }

    public static Properties getDBProperties(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("user", properties.getProperty("db2user"));
        properties2.put("password", Utility.decode(properties.getProperty("db2password")));
        return properties2;
    }

    public static void createSparkProperties(SparkSession sparkSession, Properties properties) {
        if (properties != null) {
            for (String str : properties.stringPropertyNames()) {
                if (str.contains("password")) {
                    sparkSession.conf().set(str, Utility.decode(properties.getProperty(str)));
                } else {
                    sparkSession.conf().set(str, properties.getProperty(str));
                }
            }
        }
    }

    public static String getEntities(SparkSession sparkSession, Properties properties) {
        JSONArray optJSONArray;
        JSONArray jSONArray = new JSONArray();
        try {
            String enityTypes = getEnityTypes(sparkSession, properties);
            if (enityTypes != null && (optJSONArray = new JSONObject(enityTypes).optJSONArray("models")) != null) {
                for (int i = 0; i < optJSONArray.length(); i++) {
                    JSONObject jSONObject = optJSONArray.getJSONObject(i);
                    if (ECommConstants.PUBLISHED.equalsIgnoreCase(jSONObject.optString("status"))) {
                        long optLong = jSONObject.optLong(ECommConstants.SERVICE_ENTITY_TYPE_ID);
                        String optString = jSONObject.optString(ECommConstants.SERVICE_ENTITY_TYPE_NAME);
                        String optString2 = jSONObject.optString(ECommConstants.SERVICE_ENTITY_TYPE_COLOR);
                        String optString3 = jSONObject.optString(ECommConstants.SERVICE_ENTITY_TYPE_CATEGORY);
                        String optString4 = jSONObject.optString(ECommConstants.SERVICE_LEXICON_LOCATION);
                        JSONObject jSONObject2 = new JSONObject();
                        jSONObject2.put("entityName", optString);
                        jSONObject2.put("entityColorCode", optString2);
                        jSONObject2.put("entityTypeCategory", optString3);
                        jSONObject2.put("entityTypeId", optLong + ECommConstants.EMPTY_STRING);
                        jSONObject2.put("dictPath", optString4);
                        jSONArray.put(jSONObject2);
                    }
                }
            }
        } catch (Exception e) {
            sifsLogger.error("Error while fetching entities ", e);
        }
        return jSONArray.toString();
    }

    public static Map<String, Object> loadInitialData(SparkSession sparkSession, Properties properties) throws Exception {
        Dataset jdbc = sparkSession.read().jdbc(properties.getProperty("db2jdbcurl"), "(SELECT A.PARTY_ID,A.PARTY_FIRST_NAME,A.PARTY_LAST_NAME,B.PRIMARY_CONTACT_POINT, B.ALTERNATE_CONTACT_POINT, C.COMM_TYPE as COMM_TYPE, D.PARTY_JOB_ROLE_NAME as PARTY_JOB_ROLE_NAME FROM PARTY_MASTER A, PARTY_CONTACT_POINT B, COMM_TYPE_MASTER C, PARTY_JOB_ROLE_MASTER D WHERE A.PARTY_ID = B.PARTY_ID AND B.COMM_TYPE_ID = C.COMM_TYPE_ID AND A.PARTY_JOB_ROLE_ID = D.PARTY_JOB_ROLE_ID AND A.PARTY_STATUS = 'active')", getDBProperties(properties));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        for (Row row : jdbc.collectAsList()) {
            Party party = new Party();
            party.setPartyId((Long) row.getAs(ECommConstants.DB_PARTY_ID));
            party.setFirstName((String) row.getAs("PARTY_FIRST_NAME"));
            party.setLastName((String) row.getAs("PARTY_LAST_NAME"));
            party.setPrimaryContactPoint((String) row.getAs("PRIMARY_CONTACT_POINT"));
            party.setAlternateContactPoint((String) row.getAs("ALTERNATE_CONTACT_POINT"));
            party.setCommmType((String) row.getAs("COMM_TYPE"));
            party.setJobRole((String) row.getAs("PARTY_JOB_ROLE_NAME"));
            if (ECommConstants.EMAIL_COMM_TYPE.equalsIgnoreCase((String) row.getAs("COMM_TYPE"))) {
                hashMap.put(party.getPrimaryContactPoint(), party);
            } else if (ECommConstants.CHAT_COMM_TYPE.equalsIgnoreCase((String) row.getAs("COMM_TYPE"))) {
                hashMap2.put(party.getPrimaryContactPoint(), party);
            } else if (ECommConstants.PHONE_COMM_TYPE.equalsIgnoreCase((String) row.getAs("COMM_TYPE"))) {
                hashMap3.put(party.getPrimaryContactPoint(), party);
            } else if (ECommConstants.IPADDRESS_COMM_TYPE.equalsIgnoreCase((String) row.getAs("COMM_TYPE"))) {
                hashMap4.put(party.getPrimaryContactPoint(), party);
            } else if (ECommConstants.LOGINNAME_COMM_TYPE.equalsIgnoreCase((String) row.getAs("COMM_TYPE"))) {
                hashMap5.put(party.getPrimaryContactPoint(), party);
            }
        }
        HashMap hashMap6 = new HashMap();
        hashMap6.put(ECommConstants.EMAIL_COMM_TYPE, hashMap);
        hashMap6.put(ECommConstants.CHAT_COMM_TYPE, hashMap2);
        hashMap6.put(ECommConstants.PHONE_COMM_TYPE, hashMap3);
        hashMap6.put(ECommConstants.IPADDRESS_COMM_TYPE, hashMap4);
        hashMap6.put(ECommConstants.LOGINNAME_COMM_TYPE, hashMap5);
        return hashMap6;
    }

    public static Map<String, Integer> loadFeaturesData(SparkSession sparkSession, Properties properties) throws Exception {
        generateJWTToken(sparkSession, properties);
        HashMap hashMap = new HashMap();
        Iterator it = sparkSession.read().jdbc(properties.getProperty("db2jdbcurl"), "(SELECT A.FEATURE_NAME AS FEATURE_NAME FROM FEATURE_MASTER A)", getDBProperties(properties)).collectAsList().iterator();
        while (it.hasNext()) {
            hashMap.put(((Row) it.next()).getAs(ECommConstants.DB_FEATURE), 1);
        }
        String entities = getEntities(sparkSession, properties);
        if (entities != null) {
            JSONArray jSONArray = new JSONArray(entities);
            for (int i = 0; i < jSONArray.length(); i++) {
                hashMap.put(jSONArray.getJSONObject(i).getString("entityName"), 1);
            }
        }
        return hashMap;
    }

    public static void loadMasterData(SparkSession sparkSession, Properties properties) throws Exception {
        generateJWTToken(sparkSession, properties);
        getRiskIndicators(sparkSession, properties);
        DataUtil.getEnterpriseProfile(sparkSession, properties).createOrReplaceTempView("DB_ENTERPRISE_PROFILE");
        sparkSession.sql("SELECT A.RISK_INDICATOR_LOOKUP_CODE,B.MEAN,B.STD FROM global_temp.RISK_INDICATOR A, DB_ENTERPRISE_PROFILE B WHERE A.RISK_INDICATOR_ID = B.RISK_INDICATOR_ID").createGlobalTempView("ENTERPRISE_PROFILE");
        Dataset<Row> partyProfile = DataUtil.getPartyProfile(sparkSession, properties);
        sifsLogger.info("partyProfileDBData is ");
        partyProfile.createOrReplaceTempView("DB_PARTY_PROFILE");
        sparkSession.sql("SELECT A.RISK_INDICATOR_LOOKUP_CODE,B.PARTY_ID,B.MEAN,B.STD FROM global_temp.RISK_INDICATOR A, DB_PARTY_PROFILE B WHERE A.RISK_INDICATOR_ID = B.RISK_INDICATOR_ID ORDER BY B.PARTY_ID").createGlobalTempView("PARTY_PROFILE");
    }

    public static void persistsRiskEvidence(Dataset<Row> dataset, final Properties properties) {
        try {
            sifsLogger.info("Persisting Risk Evidence!!");
            final String jWTToken = getJWTToken(dataset.sparkSession());
            String[] columns = dataset.columns();
            final ArrayList arrayList = new ArrayList();
            for (int i = 0; i < columns.length; i++) {
                if (columns[i].contains(ECommConstants.RISK_INDICATOR_NAME) && !columns[i].contains(ECommConstants.SCORE_NAME) && !columns[i].contains(ECommConstants.COUNT_NAME)) {
                    arrayList.add(columns[i]);
                }
            }
            dataset.foreach(new ForeachFunction<Row>() { // from class: com.ibm.sifs.ecomm.util.SparkUtil.1
                public void call(Row row) throws Exception {
                    JSONArray jSONArray = new JSONArray();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        String str = (String) row.getAs((String) it.next());
                        if (Utility.isNotEmpty(str)) {
                            jSONArray.put(new JSONObject(str));
                        }
                    }
                    if (jSONArray.length() > 0) {
                        JSONObject jSONObject = new JSONObject();
                        jSONObject.put("riskIndicators", jSONArray);
                        String jSONObject2 = jSONObject.toString();
                        SparkUtil.sifsLogger.debug("Risk JSON == " + jSONObject2);
                        HashMap hashMap = new HashMap();
                        hashMap.put("CreateEvidenceREST", properties.getProperty("CreateEvidenceREST"));
                        hashMap.put(ECommConstants.JWT_TOKEN, jWTToken);
                        new EvidencePersistence(hashMap).persistRiskEvidence(jSONObject2);
                    }
                }
            });
            sifsLogger.info("Persisted Risk Evidence!!");
        } catch (Exception e) {
            sifsLogger.error("Error while persisting risk evidence", e);
        }
    }

    public static List<String> getRiskIndDetails(SparkSession sparkSession, String str) {
        Dataset sql = sparkSession.sql("SELECT RISK_INDICATOR_ID,RISK_INDICATOR_DESCRIPTION FROM global_temp.RISK_INDICATOR WHERE RISK_INDICATOR_LOOKUP_CODE = '" + str + "'");
        ArrayList arrayList = new ArrayList();
        Row row = (Row) sql.first();
        arrayList.add(Long.valueOf(row.getLong(0)).toString());
        arrayList.add(row.getString(1));
        return arrayList;
    }

    public static Double[] getEnterpriseProfile(SparkSession sparkSession, String str) {
        sifsLogger.info("getEnterpriseProfile rdcode " + str);
        Dataset sql = sparkSession.sql("SELECT MEAN,STD FROM global_temp.ENTERPRISE_PROFILE WHERE RISK_INDICATOR_LOOKUP_CODE = '" + str + "'");
        return new Double[]{Double.valueOf(((Row) sql.first()).getDouble(0)), Double.valueOf(((Row) sql.first()).getDouble(1))};
    }

    public static Map<Long, Object> getProfileMap(SparkSession sparkSession, String str) {
        HashMap hashMap = new HashMap();
        for (Row row : sparkSession.sql("SELECT PARTY_ID,MEAN,STD FROM global_temp.PARTY_PROFILE WHERE RISK_INDICATOR_LOOKUP_CODE = '" + str + "'").collectAsList()) {
            Long l = (Long) row.getAs(ECommConstants.DB_PARTY_ID);
            HashMap hashMap2 = new HashMap();
            hashMap2.put(ECommConstants.DB_MEAN, (Double) row.getAs(ECommConstants.DB_MEAN));
            hashMap2.put(ECommConstants.DB_STD, (Double) row.getAs(ECommConstants.DB_STD));
            hashMap.put(l, hashMap2);
        }
        return hashMap;
    }

    public static Dataset<Row> parseCommunicationFromHDFS(SparkSession sparkSession, String str, String str2) {
        Dataset<Row> dataset = null;
        try {
            long nanoTime = System.nanoTime();
            Dataset text = sparkSession.read().option("header", "false").text(str);
            long nanoTime2 = System.nanoTime();
            JavaRDD javaRDD = text.toJavaRDD();
            long nanoTime3 = System.nanoTime();
            if (ECommConstants.EMAIL_MESSAGE_TYPE.equalsIgnoreCase(str2)) {
                dataset = sparkSession.read().json(javaRDD.map(new Function<Row, String>() { // from class: com.ibm.sifs.ecomm.util.SparkUtil.2
                    public String call(Row row) throws Exception {
                        return XMLParser.parseEmailData((String) row.getAs(ECommConstants.SERVICE_KEY_VALUE));
                    }
                }));
            } else if (ECommConstants.CHAT_MESSAGE_TYPE.equalsIgnoreCase(str2)) {
                dataset = sparkSession.read().json(javaRDD.map(new Function<Row, String>() { // from class: com.ibm.sifs.ecomm.util.SparkUtil.3
                    public String call(Row row) throws Exception {
                        return XMLParser.parseChatData((String) row.getAs(ECommConstants.SERVICE_KEY_VALUE));
                    }
                }));
            } else if (ECommConstants.COMM_MESSAGE_TYPE.equalsIgnoreCase(str2)) {
                dataset = sparkSession.read().json(javaRDD.map(new Function<Row, String>() { // from class: com.ibm.sifs.ecomm.util.SparkUtil.4
                    public String call(Row row) throws Exception {
                        return (String) row.getAs(ECommConstants.SERVICE_KEY_VALUE);
                    }
                }));
            }
            long nanoTime4 = System.nanoTime();
            sifsLogger.info("Total Time to Read from HDFS " + (nanoTime2 - nanoTime) + " ns");
            sifsLogger.info("Total Time to Parse data " + (nanoTime4 - nanoTime3) + " ns");
        } catch (Exception e) {
            sifsLogger.error("Error in parseCommunicationFromHDFS ", e);
        }
        return dataset;
    }

    public static void persistCommWithFeaturestoHDFS(Dataset<Row> dataset, String str) {
        try {
            String[] columns = dataset.columns();
            final ArrayList arrayList = new ArrayList();
            for (int i = 0; i < columns.length; i++) {
                if (columns[i].contains(ECommConstants.FEATURE)) {
                    arrayList.add(columns[i]);
                }
            }
            JavaRDD map = dataset.toJavaRDD().map(new Function<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.SparkUtil.5
                public Row call(Row row) throws Exception {
                    JSONArray jSONArray = new JSONArray();
                    for (String str2 : arrayList) {
                        String str3 = (String) row.getAs(str2);
                        if (str3 != null && str3.length() > 0) {
                            JSONObject jSONObject = new JSONObject();
                            jSONObject.put(ECommConstants.SERVICE_KEY_NAME, str2);
                            jSONObject.put(ECommConstants.SERVICE_KEY_VALUE, str3);
                            jSONArray.put(jSONObject);
                        }
                    }
                    JSONObject jSONObject2 = new JSONObject();
                    jSONObject2.put("features", jSONArray);
                    return RowFactory.create(new Object[]{row.getAs(ECommConstants.COMM_ID), row.getAs(ECommConstants.COMM_SOURCE_ID), row.getAs(ECommConstants.COMM_TEXT), jSONObject2.toString()});
                }
            });
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(DataTypes.createStructField(ECommConstants.COMM_ID, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.COMM_SOURCE_ID, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.COMM_TEXT, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField("features", DataTypes.StringType, true));
            dataset.sparkSession().createDataFrame(map, DataTypes.createStructType(arrayList2)).write().mode("overwrite").option("header", "true").csv(str);
            sifsLogger.info("Data persisted to HDFS!!");
        } catch (Exception e) {
            sifsLogger.error("Error while persisting to HDFS ", e);
            sifsLogger.error("CDPJ0004E  HDFS Persistence " + dataset.count());
        }
    }

    public static Map<String, String> getAllNLCModels(Properties properties) {
        String invokeRESTService = Utility.invokeRESTService(properties.getProperty("nlcServiceUrl"), ECommConstants.EMPTY_STRING, ECommConstants.METHOD_GET);
        HashMap hashMap = new HashMap();
        if (invokeRESTService != null) {
            JSONArray optJSONArray = new JSONObject(invokeRESTService).optJSONArray("models");
            for (int i = 0; i < optJSONArray.length(); i++) {
                JSONObject jSONObject = optJSONArray.getJSONObject(i);
                if (ECommConstants.STATUS_PUBLISHED.equalsIgnoreCase(jSONObject.getString("status"))) {
                    hashMap.put(jSONObject.getString(ECommConstants.SERVICE_KEY_ID), jSONObject.getString(ECommConstants.SERVICE_KEY_NAME));
                }
            }
        }
        return hashMap;
    }

    public static String getJWTToken(SparkSession sparkSession) {
        return ((Row) sparkSession.sql("SELECT JWT_TOKEN FROM global_temp.SIFS_SECURITY").first()).getString(0);
    }

    public static void getRiskIndicators(SparkSession sparkSession, Properties properties) throws Exception {
        JSONObject jSONObject;
        JSONArray names;
        String invokeGET = SIFSRestClient.invokeGET(getJWTToken(sparkSession), properties.getProperty(ECommConstants.RMS_HOST_URL) + properties.getProperty(ECommConstants.RMS_ALL_NONDERIVED_URI), (Map) null);
        sifsLogger.info(new StringBuilder().append("RD response ").append(invokeGET).toString());
        ArrayList arrayList = new ArrayList();
        if (invokeGET != null && (names = (jSONObject = new JSONObject(invokeGET).getJSONObject(ECommConstants.RMS_RD_SOURCE)).names()) != null) {
            for (int i = 0; i < names.length(); i++) {
                JSONArray optJSONArray = jSONObject.optJSONArray((String) names.get(i));
                for (int i2 = 0; i2 < optJSONArray.length(); i2++) {
                    JSONObject optJSONObject = optJSONArray.optJSONObject(i2);
                    String str = ECommConstants.EMPTY_STRING;
                    JSONArray optJSONArray2 = optJSONObject.optJSONArray("properties");
                    if (optJSONArray2 != null) {
                        str = optJSONArray2.toString();
                    }
                    arrayList.add(RowFactory.create(new Object[]{Long.valueOf(optJSONObject.optLong(ECommConstants.SERVICE_KEY_ID)), optJSONObject.optString("lookupcode"), optJSONObject.optString(ECommConstants.SERVICE_KEY_NAME), optJSONObject.optString("desc"), str}));
                }
            }
        }
        if (arrayList.size() > 0) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_ID, DataTypes.LongType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_LOOKUP_CODE, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_NAME, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_DESCRIPTION, DataTypes.StringType, true));
            arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_PROPERTY, DataTypes.StringType, true));
            Dataset createDataFrame = sparkSession.createDataFrame(arrayList, DataTypes.createStructType(arrayList2));
            createDataFrame.createGlobalTempView(ECommConstants.RISK_INDICATOR);
            createDataFrame.show();
        }
    }

    public static void generateJWTToken(SparkSession sparkSession, Properties properties) throws Exception {
        String token = new FciJwt(properties.getProperty(ECommConstants.JWT_USER), properties.getProperty(ECommConstants.JWT_ROLE)).getToken();
        Row create = RowFactory.create(new Object[]{token});
        ArrayList arrayList = new ArrayList();
        arrayList.add(create);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(DataTypes.createStructField(ECommConstants.JWT_TOKEN, DataTypes.StringType, true));
        Dataset createDataFrame = sparkSession.createDataFrame(arrayList, DataTypes.createStructType(arrayList2));
        sparkSession.catalog().dropGlobalTempView("SIFS_SECURITY");
        createDataFrame.createGlobalTempView("SIFS_SECURITY");
        sifsLogger.debug("Token is ============" + token);
    }

    public static String getEnityTypes(SparkSession sparkSession, Properties properties) throws Exception {
        String invokeGET = SIFSRestClient.invokeGET(getJWTToken(sparkSession), properties.getProperty(ECommConstants.RMS_HOST_URL) + properties.getProperty(ECommConstants.RMS_GET_ENTITYTYPES), (Map) null);
        sifsLogger.debug(new StringBuilder().append("Entity Type response ").append(invokeGET).toString());
        return invokeGET;
    }

    public static String invokePolicyService(Properties properties, String str) {
        return SIFSRestClient.invokeGET(str, properties.getProperty("policyServiceUrl"), (Map) null);
    }
}
