package com.ibm.sifs.ecomm.util;

import com.ibm.sifs.ecomm.ECommConstants;
import com.ibm.sifs.ecomm.SIFSSolrConstants;
import com.ibm.sifs.services.client.SIFSRestClient;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.phoenix.spark.DataFrameFunctions;
import org.apache.phoenix.spark.SparkSqlContextFunctions;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/* loaded from: input_file:com/ibm/sifs/ecomm/util/DataUtil.class */
public class DataUtil {
    static transient Logger sifsLogger = LogManager.getLogger(SparkUtil.class);

    /* loaded from: input_file:com/ibm/sifs/ecomm/util/DataUtil$GetFunction.class */
    public static class GetFunction implements Function<byte[], Get> {
        private static final long serialVersionUID = 1;

        public Get call(byte[] bArr) throws Exception {
            return new Get(bArr);
        }
    }

    /* loaded from: input_file:com/ibm/sifs/ecomm/util/DataUtil$ResultFunction.class */
    public static class ResultFunction implements Function<Result, Row> {
        private static final long serialVersionUID = 1;

        public Row call(Result result) throws Exception {
            String str = ECommConstants.EMPTY_STRING;
            String str2 = ECommConstants.EMPTY_STRING;
            for (Cell cell : result.rawCells()) {
                String bytes = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                String bytes2 = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                String bytes3 = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                if (ECommConstants.DEFAULT_COLUMN_FAMILY.equalsIgnoreCase(bytes) && ECommConstants.ATTACHMENT.equalsIgnoreCase(bytes2)) {
                    str = bytes3;
                }
                if (ECommConstants.DEFAULT_COLUMN_FAMILY.equalsIgnoreCase(bytes) && ECommConstants.COMM_INIT_NAME.equalsIgnoreCase(bytes2)) {
                    str2 = bytes3;
                }
            }
            return RowFactory.create(new Object[]{Bytes.toString(result.getRow()), str, str2});
        }
    }

    public static Map<String, String> createDBColumnMap() {
        HashMap hashMap = new HashMap();
        hashMap.put(ECommConstants.COMM_INIT_CONTACT, "INITIATORCONTACT");
        hashMap.put("participants", "PARTICIPANTSCONTACT");
        return hashMap;
    }

    public static void persistsData(Dataset<Row> dataset, final Properties properties) {
        try {
            sifsLogger.info("persistsData for Hbase");
            final Map<String, String> createDBColumnMap = createDBColumnMap();
            final String jWTToken = SparkUtil.getJWTToken(dataset.sparkSession());
            String[] columns = dataset.columns();
            List asList = Arrays.asList(ECommConstants.FEATURE_MAP, ECommConstants.COMM_TEXT, ECommConstants.COMM_CLEAN_TEXT, ECommConstants.ALTERNATE_UTTERANCE);
            final ArrayList arrayList = new ArrayList();
            final ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < columns.length; i++) {
                if (!asList.contains(columns[i])) {
                    arrayList2.add(columns[i]);
                }
                if (columns[i].contains(ECommConstants.FEATURE)) {
                    arrayList.add(columns[i]);
                }
            }
            JavaRDD javaRDD = dataset.toJavaRDD();
            JavaHBaseContext hbaseContextFromDataset = HbaseUtil.getHbaseContextFromDataset(dataset);
            final byte[] bytes = Bytes.toBytes(ECommConstants.DEFAULT_COLUMN_FAMILY);
            hbaseContextFromDataset.bulkPut(javaRDD, TableName.valueOf(ECommConstants.COMMUNICATION_TABLE), new Function<Row, Put>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.1
                public Put call(Row row) throws Exception {
                    Put put = new Put(Bytes.toBytes((String) row.getAs(ECommConstants.COMM_ID)));
                    JSONObject jSONObject = new JSONObject();
                    JSONArray jSONArray = new JSONArray();
                    for (String str : arrayList2) {
                        if (((String) createDBColumnMap.get(str)) == null) {
                            str.toUpperCase();
                        }
                        String str2 = ECommConstants.EMPTY_STRING;
                        try {
                            str2 = row.getAs(str).toString();
                            put.addColumn(bytes, Bytes.toBytes(str.toUpperCase()), Bytes.toBytes(str2));
                        } catch (Exception e) {
                            DataUtil.sifsLogger.error("Error fetching column " + str + ", of Communication :" + row.getAs(ECommConstants.COMM_ID));
                        }
                        if (arrayList.contains(str) && Utility.isNotEmpty(str2)) {
                            JSONObject jSONObject2 = new JSONObject(str2);
                            if (jSONObject2.has("tags")) {
                                Utility.concatArray(jSONObject2.getJSONArray("tags"), jSONArray);
                            }
                        }
                    }
                    jSONObject.put("tags", jSONArray);
                    put.addColumn(bytes, Bytes.toBytes("tags".toUpperCase()), Bytes.toBytes(jSONObject.toString()));
                    return put;
                }
            });
            sifsLogger.info("Loaded communication data");
            JavaRDD flatMap = javaRDD.flatMap(new FlatMapFunction<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.2
                public Iterator<Row> call(Row row) throws Exception {
                    ArrayList arrayList3 = new ArrayList();
                    if (arrayList.contains(ECommConstants.ENTITY_FEATURE)) {
                        String str = (String) row.getAs(ECommConstants.ENTITY_FEATURE);
                        String str2 = (String) row.getAs(ECommConstants.COMM_INIT_NAME);
                        if (Utility.isNotEmpty(str) && Utility.isNotEmpty(str2)) {
                            JSONObject jSONObject = new JSONObject(str);
                            String str3 = (String) row.getAs(ECommConstants.COMM_ID);
                            JSONArray optJSONArray = jSONObject.optJSONArray(ECommConstants.SERVICE_ENTITIES);
                            for (int i2 = 0; i2 < optJSONArray.length(); i2++) {
                                JSONObject jSONObject2 = optJSONArray.getJSONObject(i2);
                                arrayList3.add(RowFactory.create(new Object[]{str3, ECommConstants.ENTITY_REL_TYPE_MENTIONS, str2, ECommConstants.ENTITY_TYPE_PEOPLE, jSONObject2.getString(ECommConstants.SERVICE_KEY_NAME), jSONObject2.getString(ECommConstants.SERVICE_KEY_TYPE)}));
                            }
                        }
                    }
                    return arrayList3.iterator();
                }
            });
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_COMMUNICATION_ID, DataTypes.StringType, true));
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_ENT_REL_TYPE, DataTypes.StringType, true));
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_SOURCE_ENT, DataTypes.StringType, true));
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_SOURCE_ENT_TYPE, DataTypes.StringType, true));
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_TARGET_ENT, DataTypes.StringType, true));
            arrayList3.add(DataTypes.createStructField(ECommConstants.PDB_TARGET_ENT_TYPE, DataTypes.StringType, true));
            new DataFrameFunctions(dataset.sparkSession().createDataFrame(flatMap, DataTypes.createStructType(arrayList3))).saveToPhoenix(ECommConstants.PDB_COM_ENTITY_REL_TABLE, HbaseUtil.getHbaseConfiguration(), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), false);
            sifsLogger.info("Loaded communication entities");
            dataset.foreach(new ForeachFunction<Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.3
                public void call(Row row) throws Exception {
                    JSONArray jSONArray;
                    List asList2 = Arrays.asList(row.schema().fieldNames());
                    JSONObject jSONObject = new JSONObject();
                    jSONObject.put("commid", (String) row.getAs(ECommConstants.COMM_ID));
                    jSONObject.put("commtype", (String) row.getAs(ECommConstants.COMM_TYPE));
                    jSONObject.put(SIFSSolrConstants.COMM_CHANNEL, (String) row.getAs(ECommConstants.COMM_CHANNEL));
                    jSONObject.put("sourcereferenceid", (String) row.getAs(ECommConstants.COMM_SOURCE_ID));
                    jSONObject.put(SIFSSolrConstants.COMM_START_TIME, (String) row.getAs(ECommConstants.COMM_START_TIME));
                    jSONObject.put(SIFSSolrConstants.COMM_END_TIME, (String) row.getAs(ECommConstants.COMM_END_TIME));
                    jSONObject.put(SIFSSolrConstants.COMM_SUBJECT, (String) row.getAs(ECommConstants.COMM_SUBJECT));
                    jSONObject.put(SIFSSolrConstants.COMM_TEXT, (String) row.getAs(ECommConstants.COMM_TEXT));
                    jSONObject.put(SIFSSolrConstants.COMM_INITIATOR_NAME, (String) row.getAs(ECommConstants.COMM_INIT_NAME));
                    jSONObject.put(SIFSSolrConstants.COMM_INITIATOR_ID, (String) row.getAs(ECommConstants.COMM_INIT_ID));
                    jSONObject.put(SIFSSolrConstants.COMM_INITIATOR_CONTACT, (String) row.getAs(ECommConstants.COMM_INIT_CONTACT));
                    if (asList2.contains(ECommConstants.ALTERNATE_UTTERANCE)) {
                        jSONObject.put(SIFSSolrConstants.ALTERNATE_UTTERANCE, (String) row.getAs(ECommConstants.ALTERNATE_UTTERANCE));
                    }
                    StringTokenizer stringTokenizer = new StringTokenizer((String) row.getAs(ECommConstants.PARTICIPANTS_ID), ECommConstants.DELIM);
                    JSONArray jSONArray2 = new JSONArray();
                    if (stringTokenizer != null) {
                        while (stringTokenizer.hasMoreTokens()) {
                            jSONArray2.put(stringTokenizer.nextToken());
                        }
                    }
                    StringTokenizer stringTokenizer2 = new StringTokenizer((String) row.getAs(ECommConstants.PARTICIPANTS_NAME), ECommConstants.DELIM);
                    JSONArray jSONArray3 = new JSONArray();
                    if (stringTokenizer2 != null) {
                        while (stringTokenizer2.hasMoreTokens()) {
                            jSONArray3.put(stringTokenizer2.nextToken());
                        }
                    }
                    jSONObject.put(SIFSSolrConstants.COMM_PARTICIPANTS_ID, jSONArray2);
                    jSONObject.put(SIFSSolrConstants.COMM_PARTICIPANTS_NAME, jSONArray3);
                    StringTokenizer stringTokenizer3 = new StringTokenizer((String) row.getAs("participants"), ECommConstants.DELIM);
                    JSONArray jSONArray4 = new JSONArray();
                    if (stringTokenizer3 != null) {
                        while (stringTokenizer3.hasMoreTokens()) {
                            jSONArray4.put(stringTokenizer3.nextToken());
                        }
                    }
                    jSONObject.put(SIFSSolrConstants.COMM_PARTICIPANTS_CONTACT, jSONArray4);
                    JSONArray jSONArray5 = new JSONArray();
                    for (String str : arrayList) {
                        String str2 = (String) row.getAs(str);
                        if (str2 != null && str2.length() > 0) {
                            JSONObject jSONObject2 = new JSONObject();
                            jSONObject2.put(ECommConstants.SERVICE_KEY_NAME, str);
                            jSONObject2.put(ECommConstants.SERVICE_KEY_VALUE, str2);
                            jSONArray5.put(jSONObject2);
                        }
                    }
                    jSONObject.put("features", jSONArray5);
                    JSONArray jSONArray6 = new JSONArray();
                    String str3 = (String) row.getAs(ECommConstants.COMM_QUEUES);
                    if (Utility.isNotEmpty(str3) && (jSONArray = new JSONArray(str3)) != null) {
                        for (int i2 = 0; i2 < jSONArray.length(); i2++) {
                            jSONArray6.put(jSONArray.getJSONObject(i2).getString(ECommConstants.SERVICE_KEY_ID));
                        }
                    }
                    jSONObject.put(ECommConstants.SERVICE_QUEUEIDS, jSONArray6);
                    DataUtil.sifsLogger.debug("Comm Service input " + jSONObject.toString());
                    long nanoTime = System.nanoTime();
                    if (SIFSRestClient.invokePOST(jWTToken, properties.getProperty("CreateCommREST"), jSONObject.toString()) == null) {
                        DataUtil.sifsLogger.error("CDPJ0004E  Persisting data in DB and Solr  GCID " + jSONObject.get("sourcereferenceid"));
                    }
                    DataUtil.sifsLogger.info("Comm service processing time " + (System.nanoTime() - nanoTime) + " ns");
                }
            });
        } catch (Exception e) {
            sifsLogger.error("Error while persisting comm data through service ", e);
        }
    }

    public static List<String> getCommunicationFields(SparkSession sparkSession, Properties properties) {
        return (List) new SparkSqlContextFunctions(new SQLContext(sparkSession)).phoenixTableAsDataFrame(ECommConstants.PDB_CATALOG_TABLE, ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList("COLUMN_NAME").iterator()).asScala()).toSeq(), Option.apply("TABLE_SCHEM  = 'SIFS' AND TABLE_NAME = 'COMMUNICATION'"), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), HbaseUtil.getHbaseConfiguration()).collectAsList().stream().filter(row -> {
            return row.getString(0) != null;
        }).map(row2 -> {
            return row2.getString(0);
        }).collect(Collectors.toList());
    }

    public static Dataset<Row> getCommDataByDate(SparkSession sparkSession, String str, Properties properties) {
        Dataset<Row> withColumnRenamed = new SparkSqlContextFunctions(new SQLContext(sparkSession)).phoenixTableAsDataFrame(ECommConstants.PDB_COMMUNICATION_TABLE, ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(getCommunicationFields(sparkSession, properties).iterator()).asScala()).toSeq(), Option.apply("COMMSTARTTIME LIKE '" + str + "%'"), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), HbaseUtil.getHbaseConfiguration()).withColumnRenamed(ECommConstants.COMM_ID.toUpperCase(), ECommConstants.COMM_ID).withColumnRenamed(ECommConstants.COMM_TYPE.toUpperCase(), ECommConstants.COMM_TYPE).withColumnRenamed(ECommConstants.COMM_CHANNEL.toUpperCase(), ECommConstants.COMM_CHANNEL).withColumnRenamed(ECommConstants.COMM_SOURCE_ID.toUpperCase(), ECommConstants.COMM_SOURCE_ID).withColumnRenamed(ECommConstants.COMM_START_TIME.toUpperCase(), ECommConstants.COMM_START_TIME).withColumnRenamed(ECommConstants.COMM_END_TIME.toUpperCase(), ECommConstants.COMM_END_TIME).withColumnRenamed(ECommConstants.COMM_SUBJECT.toUpperCase(), ECommConstants.COMM_SUBJECT).withColumnRenamed(ECommConstants.COMM_SUBJECT.toUpperCase(), ECommConstants.COMM_SUBJECT).withColumnRenamed("INITIATORCONTACT", ECommConstants.COMM_INIT_CONTACT).withColumnRenamed(ECommConstants.COMM_INIT_ID.toUpperCase(), ECommConstants.COMM_INIT_ID).withColumnRenamed(ECommConstants.COMM_INIT_NAME.toUpperCase(), ECommConstants.COMM_INIT_NAME).withColumnRenamed("PARTICIPANTSCONTACT", "participants").withColumnRenamed(ECommConstants.PARTICIPANTS_ID.toUpperCase(), ECommConstants.PARTICIPANTS_ID).withColumnRenamed(ECommConstants.PARTICIPANTS_NAME.toUpperCase(), ECommConstants.PARTICIPANTS_NAME).withColumnRenamed(ECommConstants.COMM_META_FEATURES.toUpperCase(), ECommConstants.COMM_META_FEATURES).withColumnRenamed(ECommConstants.CONCEPT_FEATURE.toUpperCase(), ECommConstants.CONCEPT_FEATURE).withColumnRenamed(ECommConstants.EMOTION_FETAURE.toUpperCase(), ECommConstants.EMOTION_FETAURE).withColumnRenamed(ECommConstants.CLASSIFICATION_FEATURE.toUpperCase(), ECommConstants.CLASSIFICATION_FEATURE).withColumnRenamed(ECommConstants.ATTACHMENT.toUpperCase(), ECommConstants.ATTACHMENT);
        withColumnRenamed.cache();
        return withColumnRenamed;
    }

    public static void persistBehaviorProfile(Dataset<Row> dataset, String str, Properties properties) {
        try {
            sifsLogger.info("Persisting Behavior Profile in DB!!");
            SparkSession sparkSession = dataset.sparkSession();
            dataset.createOrReplaceTempView("communication");
            JavaRDD map = sparkSession.sql("select INITIATORID, EMOTIONFEATURE,date(COMMSTARTTIME) as commDateTime from communication where INITIATORID is not null and EMOTIONFEATURE is not null and date(COMMSTARTTIME) = '" + str + "'").toJavaRDD().map(new Function<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.4
                public Row call(Row row) throws Exception {
                    String str2;
                    Double valueOf = Double.valueOf(0.0d);
                    Double valueOf2 = Double.valueOf(0.0d);
                    Double valueOf3 = Double.valueOf(0.0d);
                    Double valueOf4 = Double.valueOf(0.0d);
                    Double valueOf5 = Double.valueOf(0.0d);
                    Double valueOf6 = Double.valueOf(0.0d);
                    String str3 = (String) row.getAs("EMOTIONFEATURE");
                    Long valueOf7 = Long.valueOf(Long.parseLong((String) row.getAs("INITIATORID")));
                    if (str3 != null && str3.length() > 0) {
                        JSONObject jSONObject = new JSONObject(str3);
                        if (jSONObject.has("emotions")) {
                            JSONObject jSONObject2 = jSONObject.getJSONObject("emotions");
                            if (jSONObject2 != null) {
                                valueOf = Double.valueOf(Double.parseDouble(jSONObject2.getString("sad")));
                                valueOf2 = Double.valueOf(Double.parseDouble(jSONObject2.getString("anger")));
                                valueOf4 = Double.valueOf(Double.parseDouble(jSONObject2.getString("disgust")));
                                valueOf5 = Double.valueOf(Double.parseDouble(jSONObject2.getString("fear")));
                                valueOf6 = Double.valueOf(Double.parseDouble(jSONObject2.getString("joy")));
                            }
                            JSONObject jSONObject3 = (JSONObject) jSONObject.get("sentiment");
                            if (jSONObject3 != null && (str2 = (String) jSONObject3.get(ECommConstants.SERVICE_KEY_TYPE)) != null && str2.equalsIgnoreCase(ECommConstants.NEGATIVE_SENTIMENT)) {
                                Object obj = jSONObject3.get("score");
                                valueOf3 = obj instanceof Long ? Double.valueOf(((Long) obj).doubleValue()) : obj instanceof Integer ? Double.valueOf(((Integer) obj).doubleValue()) : (Double) obj;
                            }
                        }
                    }
                    return RowFactory.create(new Object[]{valueOf7, row.getAs("commDateTime"), valueOf2, valueOf, valueOf4, valueOf5, valueOf6, valueOf3});
                }
            });
            ArrayList arrayList = new ArrayList();
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_PARTY_ID, DataTypes.LongType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_DATE, DataTypes.DateType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_ANGER_SCORE, DataTypes.DoubleType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_SAD_SCORE, DataTypes.DoubleType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_DISGUST_SCORE, DataTypes.DoubleType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_FEAR_SCORE, DataTypes.DoubleType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_JOY_SCORE, DataTypes.DoubleType, true));
            arrayList.add(DataTypes.createStructField(ECommConstants.DB_NEG_SENT_SCORE, DataTypes.DoubleType, true));
            Dataset createDataFrame = sparkSession.createDataFrame(map, DataTypes.createStructType(arrayList));
            Dataset agg = createDataFrame.groupBy(new Column[]{createDataFrame.col(ECommConstants.DB_PARTY_ID), createDataFrame.col(ECommConstants.DB_DATE)}).agg(functions.max(ECommConstants.DB_ANGER_SCORE).alias(ECommConstants.DB_ANGER_SCORE), new Column[]{functions.max(ECommConstants.DB_SAD_SCORE).alias(ECommConstants.DB_SAD_SCORE), functions.max(ECommConstants.DB_DISGUST_SCORE).alias(ECommConstants.DB_DISGUST_SCORE), functions.max(ECommConstants.DB_FEAR_SCORE).alias(ECommConstants.DB_FEAR_SCORE), functions.max(ECommConstants.DB_JOY_SCORE).alias(ECommConstants.DB_JOY_SCORE), functions.min(ECommConstants.DB_NEG_SENT_SCORE).alias(ECommConstants.DB_NEG_SENT_SCORE), functions.count(ECommConstants.DB_PARTY_ID).alias("COMMUNICATION_COUNT")});
            new DataFrameFunctions(agg).saveToPhoenix(ECommConstants.PDB_PARTY_BEHAVIOR_TABLE, HbaseUtil.getHbaseConfiguration(), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), false);
            sifsLogger.info("Persisted Behavior Profile in DB!!");
        } catch (Exception e) {
            sifsLogger.error("Error in persist behavior profile ", e);
            throw e;
        }
    }

    public static Dataset<Row> getEnterpriseProfile(SparkSession sparkSession, Properties properties) {
        return new SparkSqlContextFunctions(new SQLContext(sparkSession)).phoenixTableAsDataFrame(ECommConstants.PDB_ENTERPRISE_PROFILE_TABLE, ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.DB_RISK_INDICATOR_ID, ECommConstants.DB_MEAN, ECommConstants.DB_STD).iterator()).asScala()).toSeq(), Option.apply("DATE = (SELECT MAX(DATE) FROM SIFS.ENTERPRISE_PROFILE)"), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), HbaseUtil.getHbaseConfiguration());
    }

    public static Dataset<Row> getPartyProfile(SparkSession sparkSession, Properties properties) {
        return new SparkSqlContextFunctions(new SQLContext(sparkSession)).phoenixTableAsDataFrame(ECommConstants.PDB_PARTY_PROFILE_TABLE, ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.DB_RISK_INDICATOR_ID, ECommConstants.DB_PARTY_ID, ECommConstants.DB_MEAN, ECommConstants.DB_STD).iterator()).asScala()).toSeq(), Option.apply("DATE = (SELECT MAX(DATE) FROM SIFS.PARTY_PROFILE WHERE MEAN IS NOT NULL)"), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), HbaseUtil.getHbaseConfiguration());
    }

    public static void persistPartyProfile(Dataset<Row> dataset, final String str, Properties properties) {
        try {
            sifsLogger.info("Persisting Party Profile in DB!!");
            dataset.cache();
            SparkSession sparkSession = dataset.sparkSession();
            String[] columns = dataset.columns();
            ArrayList arrayList = new ArrayList();
            boolean z = false;
            for (int i = 0; i < columns.length; i++) {
                if (columns[i].contains(ECommConstants.SCORE_NAME)) {
                    arrayList.add(dataset.col(columns[i]));
                    z = true;
                }
                if (ECommConstants.COMM_INIT_ID.equals(columns[i])) {
                    arrayList.add(dataset.col(columns[i]));
                }
                if (ECommConstants.COMM_START_TIME.equals(columns[i])) {
                    arrayList.add(dataset.col(columns[i]));
                }
            }
            if (z) {
                Dataset select = dataset.select((Column[]) arrayList.toArray(new Column[arrayList.size()]));
                select.createOrReplaceTempView("profile");
                StringBuffer stringBuffer = new StringBuffer("select ");
                stringBuffer.append(ECommConstants.COMM_INIT_ID);
                stringBuffer.append(" as ");
                stringBuffer.append(ECommConstants.DB_PARTY_ID);
                stringBuffer.append(ECommConstants.COMMA);
                stringBuffer.append("date(");
                stringBuffer.append(ECommConstants.COMM_START_TIME);
                stringBuffer.append(") as ");
                stringBuffer.append(ECommConstants.DB_DATE);
                stringBuffer.append(ECommConstants.COMMA);
                String[] columns2 = select.columns();
                for (int i2 = 0; i2 < columns2.length; i2++) {
                    if (columns2[i2].contains(ECommConstants.SCORE_NAME)) {
                        String substring = columns2[i2].substring(0, columns2[i2].indexOf("_"));
                        stringBuffer.append("avg(");
                        stringBuffer.append(columns2[i2]);
                        stringBuffer.append(") as ");
                        stringBuffer.append(substring);
                        if (i2 < columns2.length - 1) {
                            stringBuffer.append(ECommConstants.COMMA);
                        }
                    }
                }
                stringBuffer.append(" from profile where ");
                stringBuffer.append(ECommConstants.COMM_INIT_ID);
                stringBuffer.append(" is not null");
                stringBuffer.append(" group by ");
                stringBuffer.append(ECommConstants.COMM_INIT_ID);
                stringBuffer.append(ECommConstants.COMMA);
                stringBuffer.append("date(");
                stringBuffer.append(ECommConstants.COMM_START_TIME);
                stringBuffer.append(")");
                Dataset sql = sparkSession.sql(stringBuffer.toString());
                sparkSession.catalog().dropTempView("profile");
                String[] columns3 = sql.columns();
                final ArrayList arrayList2 = new ArrayList();
                final HashMap hashMap = new HashMap();
                for (int i3 = 0; i3 < columns3.length; i3++) {
                    if (columns3[i3].contains(ECommConstants.RISK_INDICATOR_NAME)) {
                        arrayList2.add(columns3[i3]);
                        hashMap.put(columns3[i3], Long.valueOf(((Row) sparkSession.sql("SELECT RISK_INDICATOR_ID FROM global_temp.RISK_INDICATOR WHERE RISK_INDICATOR_LOOKUP_CODE = '" + columns3[i3] + "'").first()).getLong(0)));
                    }
                }
                JavaRDD flatMap = sql.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.5
                    public Iterator<Row> call(Row row) throws Exception {
                        ArrayList arrayList3 = new ArrayList();
                        for (String str2 : arrayList2) {
                            arrayList3.add(RowFactory.create(new Object[]{Long.valueOf(Long.parseLong((String) row.getAs(ECommConstants.DB_PARTY_ID))), Date.valueOf(str), hashMap.get(str2), null, null, (Double) row.getAs(str2)}));
                        }
                        return arrayList3.iterator();
                    }
                });
                ArrayList arrayList3 = new ArrayList();
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_PARTY_ID, DataTypes.LongType, true));
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_DATE, DataTypes.DateType, true));
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_ID, DataTypes.LongType, true));
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_MEAN, DataTypes.DoubleType, true));
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_STD, DataTypes.DoubleType, true));
                arrayList3.add(DataTypes.createStructField(ECommConstants.DB_SCORE, DataTypes.DoubleType, true));
                Dataset createDataFrame = sparkSession.createDataFrame(flatMap, DataTypes.createStructType(arrayList3));
                Configuration hbaseConfiguration = HbaseUtil.getHbaseConfiguration();
                String zookeeperUrl = HbaseUtil.getZookeeperUrl(properties);
                new DataFrameFunctions(createDataFrame).saveToPhoenix(ECommConstants.PDB_PARTY_PROFILE_TABLE, hbaseConfiguration, Option.apply(zookeeperUrl), Option.apply(ECommConstants.EMPTY_STRING), false);
                sifsLogger.info("Persisted Party Profile in DB!!");
            } else {
                sifsLogger.info("No updates for Party Profile !!");
            }
        } catch (Exception e) {
            sifsLogger.error("Error in persistProfile ", e);
        }
    }

    public static void persistPartyProfileForCount(Dataset<Row> dataset, String str, Properties properties) {
        try {
            sifsLogger.info("Persisting Party Profile for Count in DB!!");
            SparkSession sparkSession = dataset.sparkSession();
            String[] columns = dataset.columns();
            final ArrayList arrayList = new ArrayList();
            boolean z = false;
            final HashMap hashMap = new HashMap();
            for (int i = 0; i < columns.length; i++) {
                if (columns[i].contains(ECommConstants.COUNT_NAME)) {
                    String substring = columns[i].substring(0, columns[i].indexOf("_"));
                    arrayList.add(columns[i]);
                    hashMap.put(substring, Long.valueOf(((Row) sparkSession.sql("SELECT RISK_INDICATOR_ID FROM global_temp.RISK_INDICATOR WHERE RISK_INDICATOR_LOOKUP_CODE = '" + substring + "'").first()).getLong(0)));
                    z = true;
                }
            }
            if (z) {
                JavaRDD flatMap = dataset.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.6
                    public Iterator<Row> call(Row row) throws Exception {
                        ArrayList arrayList2 = new ArrayList();
                        for (String str2 : arrayList) {
                            arrayList2.add(RowFactory.create(new Object[]{(Long) row.getAs(ECommConstants.DB_PARTY_ID), row.getAs(ECommConstants.DB_DATE), hashMap.get(str2.substring(0, str2.indexOf("_"))), null, null, (Double) row.getAs(str2)}));
                        }
                        return arrayList2.iterator();
                    }
                });
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_PARTY_ID, DataTypes.LongType, true));
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_DATE, DataTypes.DateType, true));
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_RISK_INDICATOR_ID, DataTypes.LongType, true));
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_MEAN, DataTypes.DoubleType, true));
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_STD, DataTypes.DoubleType, true));
                arrayList2.add(DataTypes.createStructField(ECommConstants.DB_SCORE, DataTypes.DoubleType, true));
                Dataset createDataFrame = sparkSession.createDataFrame(flatMap, DataTypes.createStructType(arrayList2));
                Configuration hbaseConfiguration = HbaseUtil.getHbaseConfiguration();
                String zookeeperUrl = HbaseUtil.getZookeeperUrl(properties);
                new DataFrameFunctions(createDataFrame).saveToPhoenix(ECommConstants.PDB_PARTY_PROFILE_TABLE, hbaseConfiguration, Option.apply(zookeeperUrl), Option.apply(ECommConstants.EMPTY_STRING), false);
                sifsLogger.info("Persisted Party Profile Count in DB!!");
            } else {
                sifsLogger.info("No updates for Party Profile for Count !!");
            }
        } catch (Exception e) {
            sifsLogger.error("Error in persistProfile ", e);
        }
    }

    public static void persistProfileInDB(SparkSession sparkSession, Properties properties, String str, int i) throws Exception {
        try {
            Date valueOf = Date.valueOf(str);
            sifsLogger.info("Compute Profile for " + valueOf);
            persistPartyAndEntProfile(sparkSession, properties, valueOf, i);
            sifsLogger.info("Profile sucessfully computed!!!");
        } catch (Exception e) {
            sifsLogger.error("Error while computing profile in DB " + e);
            throw e;
        }
    }

    private static void persistPartyAndEntProfile(SparkSession sparkSession, Properties properties, Date date, int i) throws Exception {
        if (date != null) {
            Calendar gregorianCalendar = GregorianCalendar.getInstance();
            gregorianCalendar.setTimeInMillis(date.getTime());
            gregorianCalendar.add(5, i * (-1));
            Date date2 = new Date(gregorianCalendar.getTimeInMillis());
            SparkSqlContextFunctions sparkSqlContextFunctions = new SparkSqlContextFunctions(new SQLContext(sparkSession));
            Configuration hbaseConfiguration = HbaseUtil.getHbaseConfiguration();
            String zookeeperUrl = HbaseUtil.getZookeeperUrl(properties);
            Seq seq = ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.DB_PARTY_ID, ECommConstants.DB_RISK_INDICATOR_ID, ECommConstants.DB_SCORE).iterator()).asScala()).toSeq();
            String str = "DATE BETWEEN TO_DATE('" + date2.toString() + "') AND TO_DATE('" + date.toString() + "')";
            Option apply = Option.apply(str);
            Option apply2 = Option.apply(ECommConstants.EMPTY_STRING);
            Option apply3 = Option.apply(zookeeperUrl);
            sifsLogger.debug("predicateStr == " + str);
            Dataset phoenixTableAsDataFrame = sparkSqlContextFunctions.phoenixTableAsDataFrame(ECommConstants.PDB_PARTY_PROFILE_TABLE, seq, apply, apply3, apply2, hbaseConfiguration);
            phoenixTableAsDataFrame.cache();
            Dataset agg = phoenixTableAsDataFrame.groupBy(ECommConstants.DB_PARTY_ID, new String[]{ECommConstants.DB_RISK_INDICATOR_ID}).agg(functions.avg(ECommConstants.DB_SCORE).as(ECommConstants.DB_MEAN), new Column[0]);
            Dataset agg2 = phoenixTableAsDataFrame.groupBy(ECommConstants.DB_PARTY_ID, new String[]{ECommConstants.DB_RISK_INDICATOR_ID}).agg(functions.stddev(ECommConstants.DB_SCORE).as(ECommConstants.DB_STD), new Column[0]);
            Seq seq2 = ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.DB_PARTY_ID, ECommConstants.DB_RISK_INDICATOR_ID).iterator()).asScala()).toSeq();
            Dataset join = agg.join(agg2, seq2);
            join.cache();
            gregorianCalendar.setTimeInMillis(date.getTime());
            gregorianCalendar.add(5, 1);
            Date date3 = new Date(gregorianCalendar.getTimeInMillis());
            Seq seq3 = ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.DB_PARTY_ID, ECommConstants.DB_DATE, ECommConstants.DB_RISK_INDICATOR_ID).iterator()).asScala()).toSeq();
            String str2 = "DATE BETWEEN TO_DATE('" + date.toString() + "') AND TO_DATE('" + date3.toString() + "')";
            Option apply4 = Option.apply(str2);
            sifsLogger.debug("predicateStr next == " + str2);
            Dataset phoenixTableAsDataFrame2 = sparkSqlContextFunctions.phoenixTableAsDataFrame(ECommConstants.PDB_PARTY_PROFILE_TABLE, seq3, apply4, apply3, apply2, hbaseConfiguration);
            phoenixTableAsDataFrame2.cache();
            Dataset withColumn = join.join(phoenixTableAsDataFrame2, seq2, "outer").withColumn(ECommConstants.DB_DATE, functions.lit(date));
            withColumn.cache();
            new DataFrameFunctions(withColumn).saveToPhoenix(ECommConstants.PDB_PARTY_PROFILE_TABLE, hbaseConfiguration, apply3, apply2, false);
            sifsLogger.info("Persisted Party Profile in DB!!");
            Dataset withColumn2 = phoenixTableAsDataFrame.groupBy(ECommConstants.DB_RISK_INDICATOR_ID, new String[0]).agg(functions.avg(ECommConstants.DB_SCORE).as(ECommConstants.DB_MEAN), new Column[0]).join(phoenixTableAsDataFrame.groupBy(ECommConstants.DB_RISK_INDICATOR_ID, new String[0]).agg(functions.stddev(ECommConstants.DB_SCORE).as(ECommConstants.DB_STD), new Column[0]), ECommConstants.DB_RISK_INDICATOR_ID).withColumn(ECommConstants.DB_DATE, functions.lit(date));
            withColumn2.cache();
            new DataFrameFunctions(withColumn2).saveToPhoenix(ECommConstants.PDB_ENTERPRISE_PROFILE_TABLE, hbaseConfiguration, apply3, apply2, false);
            sifsLogger.info("Persisted Enterprise Profile in DB!!");
        }
    }

    public static void persistsAttachmentData(Dataset<Row> dataset, final Properties properties) {
        try {
            sifsLogger.info("persistsData for attachment in Hbase");
            final String jWTToken = SparkUtil.getJWTToken(dataset.sparkSession());
            String[] columns = dataset.columns();
            List asList = Arrays.asList(ECommConstants.FEATURE_MAP, ECommConstants.COMM_SOURCE_ID);
            final ArrayList arrayList = new ArrayList();
            final ArrayList arrayList2 = new ArrayList();
            final ArrayList arrayList3 = new ArrayList();
            for (int i = 0; i < columns.length; i++) {
                if (!asList.contains(columns[i])) {
                    arrayList2.add(columns[i]);
                }
                if (columns[i].contains(ECommConstants.FEATURE)) {
                    arrayList.add(columns[i]);
                }
                if (!asList.contains(columns[i]) && columns[i].contains(ECommConstants.ATTACHMENT)) {
                    arrayList3.add(columns[i]);
                }
            }
            JavaRDD javaRDD = dataset.toJavaRDD();
            JavaRDD distinct = javaRDD.map(new Function<Row, byte[]>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.7
                public byte[] call(Row row) throws Exception {
                    return Bytes.toBytes((String) row.getAs(ECommConstants.COMM_ID));
                }
            }).distinct();
            ArrayList arrayList4 = new ArrayList();
            arrayList4.add(DataTypes.createStructField(ECommConstants.COMM_ID, DataTypes.StringType, true));
            arrayList4.add(DataTypes.createStructField(ECommConstants.ATTACHMENT, DataTypes.StringType, true));
            StructType createStructType = DataTypes.createStructType(arrayList4);
            Dataset createDataFrame = dataset.sparkSession().createDataFrame(javaRDD.map(new Function<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.8
                public Row call(Row row) throws Exception {
                    JSONObject jSONObject = new JSONObject();
                    JSONObject jSONObject2 = new JSONObject();
                    JSONArray jSONArray = new JSONArray();
                    for (String str : arrayList3) {
                        jSONObject.put(str, row.getAs(str));
                    }
                    for (String str2 : arrayList2) {
                        Object as = row.getAs(str2);
                        if (arrayList.contains(str2) && Utility.isNotEmpty(as.toString())) {
                            JSONObject jSONObject3 = new JSONObject(as.toString());
                            if (jSONObject3.has("tags")) {
                                Utility.concatArray(jSONObject3.getJSONArray("tags"), jSONArray);
                            }
                            jSONObject.put(str2, jSONObject3);
                        }
                    }
                    jSONObject.put("tags", jSONArray);
                    jSONObject2.put((String) row.getAs(ECommConstants.ATTACHMENT_ID), jSONObject);
                    return RowFactory.create(new Object[]{row.getAs(ECommConstants.COMM_ID), jSONObject2.toString()});
                }
            }), createStructType);
            Dataset agg = createDataFrame.groupBy(new Column[]{createDataFrame.col(ECommConstants.COMM_ID)}).agg(functions.collect_list(ECommConstants.ATTACHMENT).as("current_attachment"), new Column[0]);
            JavaHBaseContext hbaseContextFromDataset = HbaseUtil.getHbaseContextFromDataset(dataset);
            Dataset join = agg.join(dataset.sparkSession().createDataFrame(hbaseContextFromDataset.bulkGet(TableName.valueOf(ECommConstants.COMMUNICATION_TABLE), Integer.valueOf(distinct.count() > 2147483647L ? Integer.MAX_VALUE : (int) distinct.count()), distinct, new GetFunction(), new ResultFunction()), createStructType.add(ECommConstants.COMM_INIT_NAME, DataTypes.StringType)), ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList(ECommConstants.COMM_ID).iterator()).asScala()).toSeq(), "left_outer");
            JavaRDD javaRDD2 = join.toJavaRDD();
            final byte[] bytes = Bytes.toBytes(ECommConstants.DEFAULT_COLUMN_FAMILY);
            hbaseContextFromDataset.bulkPut(javaRDD2, TableName.valueOf(ECommConstants.COMMUNICATION_TABLE), new Function<Row, Put>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.9
                public Put call(Row row) throws Exception {
                    Put put = new Put(Bytes.toBytes((String) row.getAs(ECommConstants.COMM_ID)));
                    List list = row.getList(row.fieldIndex("current_attachment"));
                    String str = (String) row.getAs(ECommConstants.ATTACHMENT);
                    JSONObject jSONObject = new JSONObject();
                    if (Utility.isNotEmpty(str)) {
                        jSONObject = new JSONObject(str);
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        JSONObject jSONObject2 = new JSONObject((String) it.next());
                        String[] names = JSONObject.getNames(jSONObject2);
                        for (int i2 = 0; i2 < names.length; i2++) {
                            JSONObject jSONObject3 = jSONObject2.getJSONObject(names[i2]);
                            jSONObject3.remove(ECommConstants.ATTACHMENT_TEXT);
                            jSONObject.put(names[i2], jSONObject3);
                        }
                    }
                    DataUtil.sifsLogger.debug("finalAttachObject -- " + jSONObject.toString());
                    put.addColumn(bytes, Bytes.toBytes(ECommConstants.ATTACHMENT.toUpperCase()), Bytes.toBytes(jSONObject.toString()));
                    return put;
                }
            });
            sifsLogger.info("Loaded attachment data");
            JavaRDD flatMap = javaRDD2.flatMap(new FlatMapFunction<Row, Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.10
                public Iterator<Row> call(Row row) throws Exception {
                    ArrayList arrayList5 = new ArrayList();
                    if (arrayList.contains(ECommConstants.ENTITY_FEATURE)) {
                        String str = (String) row.getAs(ECommConstants.COMM_INIT_NAME);
                        Iterator it = row.getList(row.fieldIndex("current_attachment")).iterator();
                        while (it.hasNext()) {
                            JSONObject jSONObject = new JSONObject((String) it.next());
                            for (String str2 : JSONObject.getNames(jSONObject)) {
                                JSONObject jSONObject2 = jSONObject.getJSONObject(str2);
                                if (jSONObject2.has(ECommConstants.ENTITY_FEATURE)) {
                                    String string = jSONObject2.getString(ECommConstants.ENTITY_FEATURE);
                                    if (Utility.isNotEmpty(string) && Utility.isNotEmpty(str)) {
                                        JSONObject jSONObject3 = new JSONObject(string);
                                        String str3 = (String) row.getAs(ECommConstants.COMM_ID);
                                        JSONArray optJSONArray = jSONObject3.optJSONArray(ECommConstants.SERVICE_ENTITIES);
                                        for (int i2 = 0; i2 < optJSONArray.length(); i2++) {
                                            JSONObject jSONObject4 = optJSONArray.getJSONObject(i2);
                                            arrayList5.add(RowFactory.create(new Object[]{str3, ECommConstants.ENTITY_REL_TYPE_MENTIONS, str, ECommConstants.ENTITY_TYPE_PEOPLE, jSONObject4.getString(ECommConstants.SERVICE_KEY_NAME), jSONObject4.getString(ECommConstants.SERVICE_KEY_TYPE)}));
                                        }
                                    }
                                }
                            }
                        }
                    }
                    return arrayList5.iterator();
                }
            });
            ArrayList arrayList5 = new ArrayList();
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_COMMUNICATION_ID, DataTypes.StringType, true));
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_ENT_REL_TYPE, DataTypes.StringType, true));
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_SOURCE_ENT, DataTypes.StringType, true));
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_SOURCE_ENT_TYPE, DataTypes.StringType, true));
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_TARGET_ENT, DataTypes.StringType, true));
            arrayList5.add(DataTypes.createStructField(ECommConstants.PDB_TARGET_ENT_TYPE, DataTypes.StringType, true));
            new DataFrameFunctions(dataset.sparkSession().createDataFrame(flatMap, DataTypes.createStructType(arrayList5))).saveToPhoenix(ECommConstants.PDB_COM_ENTITY_REL_TABLE, HbaseUtil.getHbaseConfiguration(), Option.apply(HbaseUtil.getZookeeperUrl(properties)), Option.apply(ECommConstants.EMPTY_STRING), false);
            sifsLogger.info("Loaded attachment entities");
            join.foreach(new ForeachFunction<Row>() { // from class: com.ibm.sifs.ecomm.util.DataUtil.11
                public void call(Row row) throws Exception {
                    List list = row.getList(row.fieldIndex("current_attachment"));
                    JSONArray jSONArray = new JSONArray();
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        jSONArray.put(new JSONObject((String) it.next()));
                    }
                    JSONObject jSONObject = new JSONObject();
                    jSONObject.put("commid", (String) row.getAs(ECommConstants.COMM_ID));
                    jSONObject.put(ECommConstants.ATTACHMENT, jSONArray);
                    DataUtil.sifsLogger.debug("Attachment Service input " + jSONObject.toString());
                    long nanoTime = System.nanoTime();
                    if (SIFSRestClient.invokePOST(jWTToken, properties.getProperty("CreateAttachmentREST"), jSONObject.toString()) == null) {
                        DataUtil.sifsLogger.error("CDPJ0004E  Persisting data in Solr  COMMID " + jSONObject.get("commid"));
                    }
                    DataUtil.sifsLogger.info("Attachment service processing time " + (System.nanoTime() - nanoTime) + " ns");
                }
            });
        } catch (Exception e) {
            sifsLogger.error("Error while persisting comm data through service ", e);
        }
    }
}
