/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamInSession {
    private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
    private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap();
    private final List<PendingFile> files = new ArrayList<PendingFile>();
    private final Pair<InetAddress, Long> context;
    private final Runnable callback;
    private String table;
    private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
    private ColumnFamilyStore cfs;
    private PendingFile current;

    private StreamInSession(Pair<InetAddress, Long> context, Runnable callback) {
        this.context = context;
        this.callback = callback;
    }

    public static StreamInSession create(InetAddress host, Runnable callback) {
        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
        StreamInSession session = new StreamInSession(context, callback);
        sessions.put(context, session);
        return session;
    }

    public static StreamInSession get(InetAddress host, long sessionId) {
        StreamInSession possibleNew;
        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
        StreamInSession session = (StreamInSession)sessions.get(context);
        if (session == null && (session = sessions.putIfAbsent(context, possibleNew = new StreamInSession(context, null))) == null) {
            session = possibleNew;
        }
        return session;
    }

    public void setCurrentFile(PendingFile file) {
        this.current = file;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public void addFiles(Collection<PendingFile> files) {
        for (PendingFile file : files) {
            if (logger.isDebugEnabled()) {
                logger.debug("Adding file {} to Stream Request queue", (Object)file.getFilename());
            }
            this.files.add(file);
            if (this.cfs != null) continue;
            this.cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
        }
    }

    public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("Finished {}. Sending ack to {}", (Object)remoteFile, (Object)this);
        }
        Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc);
        this.buildFutures.add(future);
        this.files.remove(remoteFile);
        if (remoteFile.equals(this.current)) {
            this.current = null;
        }
        StreamReply reply = new StreamReply(remoteFile.getFilename(), this.getSessionId(), StreamReply.Status.FILE_FINISHED);
        MessagingService.instance.sendOneWay(reply.createMessage(), this.getHost());
    }

    public void retry(PendingFile remoteFile) throws IOException {
        StreamReply reply = new StreamReply(remoteFile.getFilename(), this.getSessionId(), StreamReply.Status.FILE_RETRY);
        logger.info("Streaming of file {} from {} failed: requesting a retry.", (Object)remoteFile, (Object)this);
        MessagingService.instance.sendOneWay(reply.createMessage(), this.getHost());
    }

    public void closeIfFinished() throws IOException {
        if (this.files.isEmpty()) {
            ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>(this.buildFutures.size());
            for (Future<SSTableReader> future : this.buildFutures) {
                try {
                    SSTableReader sstable = future.get();
                    if (sstable == null) continue;
                    this.cfs.addSSTable(sstable);
                    sstables.add(sstable);
                }
                catch (InterruptedException e) {
                    throw new AssertionError((Object)e);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.cfs != null && !this.cfs.getIndexedColumns().isEmpty()) {
                this.cfs.buildSecondaryIndexes(sstables, this.cfs.getIndexedColumns());
            }
            StreamReply reply = new StreamReply("", this.getSessionId(), StreamReply.Status.SESSION_FINISHED);
            logger.info("Finished streaming session {} from {}", (Object)this.getSessionId(), (Object)this.getHost());
            MessagingService.instance.sendOneWay(reply.createMessage(), this.getHost());
            if (this.callback != null) {
                this.callback.run();
            }
            sessions.remove(this.context);
        }
    }

    public long getSessionId() {
        return (Long)this.context.right;
    }

    public InetAddress getHost() {
        return (InetAddress)this.context.left;
    }

    public static Set<InetAddress> getSources() {
        HashSet<InetAddress> set = new HashSet<InetAddress>();
        for (StreamInSession session : sessions.values()) {
            set.add(session.getHost());
        }
        return set;
    }

    public static Set<PendingFile> getIncomingFiles(InetAddress host) {
        HashSet<PendingFile> set = new HashSet<PendingFile>();
        for (Map.Entry entry : sessions.entrySet()) {
            if (!((InetAddress)((Pair)entry.getKey()).left).equals(host)) continue;
            StreamInSession session = (StreamInSession)entry.getValue();
            set.addAll(session.files);
            if (session.current == null) continue;
            set.add(session.current);
        }
        return set;
    }
}

