/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.beakerx.widget;

import com.twosigma.beakerx.TryResult;
import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.kernel.KernelManager;
import com.twosigma.beakerx.kernel.PlainCode;
import com.twosigma.beakerx.message.Message;
import com.twosigma.beakerx.widget.SparkManager;
import com.twosigma.beakerx.widget.SparkUIManager;
import com.twosigma.beakerx.widget.SparkVariable;
import java.lang.reflect.Field;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.collection.mutable.HashMap;

public class SparkManagerImpl
implements SparkManager {
    public static final String SPARK_APP_NAME = "spark.app.name";
    public static final String SPARK_CORES_MAX = "spark.cores.max";
    private SparkUIManager sparkContextManager;
    private SparkSession.Builder sparkSessionBuilder;

    private SparkManagerImpl(SparkSession.Builder sparkSessionBuilder) {
        this.sparkSessionBuilder = sparkSessionBuilder;
    }

    @Override
    public TryResult configure(KernelFunctionality kernel, SparkUIManager sparkContextManager) {
        this.sparkContextManager = sparkContextManager;
        SparkConf sparkConf = this.configureSparkConf(this.getSparkConf());
        this.sparkSessionBuilder.config(sparkConf);
        SparkSession sparkSession = this.getOrCreate();
        this.addListener(this.getOrCreate().sparkContext());
        SparkVariable.putSparkContext(this.getOrCreate().sparkContext());
        SparkVariable.putSparkSession(sparkSession);
        TryResult tryResultSparkContext = this.initSparkContextInShell(kernel);
        if (!tryResultSparkContext.isError()) {
            kernel.registerCancelHook(SparkVariable::cancelAllJobs);
        }
        return tryResultSparkContext;
    }

    @Override
    public SparkSession getOrCreate() {
        return this.sparkSessionBuilder.getOrCreate();
    }

    @Override
    public SparkContext sparkContext() {
        return this.getOrCreate().sparkContext();
    }

    @Override
    public SparkSession.Builder getBuilder() {
        return this.sparkSessionBuilder;
    }

    private TryResult initSparkContextInShell(KernelFunctionality kernel) {
        String addSc = String.format("import com.twosigma.beakerx.widget.SparkVariable\nval %s = SparkVariable.getSparkSession()\nimport org.apache.spark.SparkContext._\nimport %s.implicits._\nimport %s.sql\nimport org.apache.spark.sql.functions._\n", "spark", "spark", "spark");
        SimpleEvaluationObject seo = PlainCode.createSimpleEvaluationObject((String)addSc, (KernelFunctionality)kernel, (Message)new Message(), (int)1);
        return kernel.executeCode(addSc, seo);
    }

    private SparkConf createSparkConf() {
        SparkConf sparkConf = new SparkConf();
        try {
            Field options = this.sparkSessionBuilder.getClass().getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options");
            options.setAccessible(true);
            for (Tuple2 x : (HashMap)options.get(this.sparkSessionBuilder)) {
                sparkConf.set((String)x._1, (String)x._2);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return sparkConf;
    }

    @Override
    public SparkConf getSparkConf() {
        return this.createSparkConf();
    }

    private SparkConf configureSparkConf(SparkConf sparkConf) {
        if (!sparkConf.contains(SPARK_APP_NAME)) {
            sparkConf.setAppName("beaker_" + UUID.randomUUID().toString());
        }
        if (this.sparkContextManager.getMasterURL().getValue() != null && !((String)((Object)this.sparkContextManager.getMasterURL().getValue())).isEmpty()) {
            sparkConf.set("spark.master", (String)((Object)this.sparkContextManager.getMasterURL().getValue()));
        }
        if (!SparkManagerImpl.isLocalSpark(sparkConf)) {
            sparkConf.set("spark.repl.class.outputDir", KernelManager.get().getOutDir());
        }
        if (this.sparkContextManager.getExecutorMemory().getValue() != null && !((String)((Object)this.sparkContextManager.getExecutorMemory().getValue())).isEmpty()) {
            sparkConf.set("spark.executor.memory", (String)((Object)this.sparkContextManager.getExecutorMemory().getValue()));
        }
        if (this.sparkContextManager.getExecutorCores().getValue() != null && !((String)((Object)this.sparkContextManager.getExecutorCores().getValue())).isEmpty()) {
            sparkConf.set("spark.executor.cores", (String)((Object)this.sparkContextManager.getExecutorCores().getValue()));
        }
        if (!sparkConf.contains(SPARK_CORES_MAX)) {
            sparkConf.set(SPARK_CORES_MAX, "100");
        }
        return sparkConf;
    }

    private SparkContext addListener(SparkContext sc) {
        sc.addSparkListener((SparkListenerInterface)new SparkListener(){

            public void onJobStart(SparkListenerJobStart jobStart) {
                super.onJobStart(jobStart);
            }

            public void onJobEnd(SparkListenerJobEnd jobEnd) {
                super.onJobEnd(jobEnd);
            }

            public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
                super.onStageSubmitted(stageSubmitted);
                SparkManagerImpl.this.sparkContextManager.startStage(stageSubmitted.stageInfo().stageId(), stageSubmitted.stageInfo().numTasks());
            }

            public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
                super.onStageCompleted(stageCompleted);
                SparkManagerImpl.this.sparkContextManager.endStage(stageCompleted.stageInfo().stageId());
            }

            public void onTaskStart(SparkListenerTaskStart taskStart) {
                super.onTaskStart(taskStart);
                SparkManagerImpl.this.sparkContextManager.taskStart(taskStart.stageId(), taskStart.taskInfo().taskId());
            }

            public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
                super.onTaskEnd(taskEnd);
                if (taskEnd.reason().toString().equals("Success")) {
                    SparkManagerImpl.this.sparkContextManager.taskEnd(taskEnd.stageId(), taskEnd.taskInfo().taskId());
                }
            }
        });
        return sc;
    }

    private static boolean isLocalSpark(SparkConf sparkConf) {
        return sparkConf.contains("spark.master") && sparkConf.get("spark.master") != null && sparkConf.get("spark.master").startsWith("local");
    }

    public static class SparkManagerFactoryImpl
    implements SparkManager.SparkManagerFactory {
        @Override
        public SparkManager create(SparkSession.Builder sparkSessionBuilder) {
            return new SparkManagerImpl(sparkSessionBuilder);
        }
    }
}

