/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.beakerx.kernel.threads;

import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.kernel.SocketEnum;
import com.twosigma.beakerx.kernel.msg.MessageCreator;
import com.twosigma.beakerx.kernel.msg.MessageHolder;
import com.twosigma.beakerx.kernel.threads.AbstractThread;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionResultSender
implements Observer {
    public static Logger logger = LoggerFactory.getLogger(ExecutionResultSender.class);
    private final ConcurrentLinkedQueue<MessageHolder> messageQueue = new ConcurrentLinkedQueue();
    private AbstractThread workingThread;
    private KernelFunctionality kernel;

    public ExecutionResultSender(KernelFunctionality kernel) {
        this.kernel = kernel;
    }

    @Override
    public synchronized void update(Observable o, Object arg) {
        SimpleEvaluationObject seo = (SimpleEvaluationObject)o;
        if (seo != null) {
            this.messageQueue.addAll(MessageCreator.createMessage(seo));
            if (this.workingThread == null || !this.workingThread.isAlive()) {
                this.workingThread = new MessageRunnable();
                this.workingThread.start();
            }
        }
    }

    public void exit() {
        if (this.workingThread != null) {
            this.workingThread.halt();
        }
    }

    protected class MessageRunnable
    extends AbstractThread {
        protected MessageRunnable() {
        }

        @Override
        public boolean getRunning() {
            return this.running && !ExecutionResultSender.this.messageQueue.isEmpty();
        }

        @Override
        public void run() {
            while (this.getRunning()) {
                MessageHolder job = (MessageHolder)ExecutionResultSender.this.messageQueue.poll();
                if (job == null) continue;
                if (SocketEnum.IOPUB_SOCKET.equals((Object)job.getSocketType())) {
                    ExecutionResultSender.this.kernel.publish(job.getMessage());
                    continue;
                }
                if (!SocketEnum.SHELL_SOCKET.equals((Object)job.getSocketType())) continue;
                ExecutionResultSender.this.kernel.send(job.getMessage());
            }
            logger.debug("MessageRunnable shutdown.");
        }
    }
}

