/*
 * Decompiled with CFR 0.152.
 */
package me.prettyprint.cassandra.connection;

import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import me.prettyprint.cassandra.connection.HThriftClient;
import me.prettyprint.cassandra.connection.PoolMetric;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.exceptions.PoolExhaustedException;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentHClientPool
implements PoolMetric {
    private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class);
    private final ArrayBlockingQueue<HThriftClient> availableClientQueue;
    private final NonBlockingHashSet<HThriftClient> activeClients;
    private final CassandraHost cassandraHost;
    private final AtomicInteger numActive;
    private final AtomicInteger numBlocked;
    private final AtomicBoolean active;
    private final long maxWaitTimeWhenExhausted;

    public ConcurrentHClientPool(CassandraHost host) {
        this.cassandraHost = host;
        this.availableClientQueue = new ArrayBlockingQueue(this.cassandraHost.getMaxActive(), true);
        this.activeClients = new NonBlockingHashSet();
        this.numActive = new AtomicInteger();
        this.numBlocked = new AtomicInteger();
        this.active = new AtomicBoolean(true);
        this.maxWaitTimeWhenExhausted = this.cassandraHost.getMaxWaitTimeWhenExhausted() < 0L ? 0L : this.cassandraHost.getMaxWaitTimeWhenExhausted();
        for (int i = 0; i < this.cassandraHost.getMaxActive() / 3; ++i) {
            this.availableClientQueue.add(new HThriftClient(this.cassandraHost).open());
        }
        if (log.isDebugEnabled()) {
            log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}", new Object[]{this.getNumIdle(), this.cassandraHost.getMaxActive(), this.maxWaitTimeWhenExhausted});
        }
    }

    public HThriftClient borrowClient() throws HectorException {
        if (!this.active.get()) {
            throw new HectorException("Attempt to borrow on in-active pool: " + this.getName());
        }
        int currentActive = this.numActive.incrementAndGet();
        int tillExhausted = this.cassandraHost.getMaxActive() - currentActive;
        this.numBlocked.incrementAndGet();
        HThriftClient cassandraClient = this.availableClientQueue.poll();
        if (cassandraClient == null) {
            if (tillExhausted > 0) {
                this.addClientToPoolGently(new HThriftClient(this.cassandraHost).open());
                log.debug("created new client. NumActive:{} untilExhausted: {}", (Object)currentActive, (Object)tillExhausted);
            }
            if (log.isDebugEnabled()) {
                log.debug("blocking on queue - current block count {}", (Object)this.numBlocked.get());
            }
            if (this.maxWaitTimeWhenExhausted == 0L) {
                while (cassandraClient == null && this.active.get()) {
                    try {
                        cassandraClient = this.availableClientQueue.poll(100L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException ie) {
                        log.error("InterruptedException poll operation on retry forever", (Throwable)ie);
                        break;
                    }
                }
            } else {
                try {
                    cassandraClient = this.availableClientQueue.poll(this.maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
                    if (cassandraClient == null) {
                        this.numBlocked.decrementAndGet();
                        throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s", Thread.currentThread().getName(), this.cassandraHost.getName()));
                    }
                }
                catch (InterruptedException ie) {
                    this.numActive.decrementAndGet();
                }
            }
        }
        if (cassandraClient == null) {
            throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
        }
        this.activeClients.add((Object)cassandraClient);
        this.numBlocked.decrementAndGet();
        return cassandraClient;
    }

    void shutdown() {
        if (!this.active.compareAndSet(true, false)) {
            throw new IllegalArgumentException("shutdown() called for inactive pool: " + this.getName());
        }
        log.error("Shutdown triggered on {}", (Object)this.getName());
        HashSet clients = new HashSet();
        this.availableClientQueue.drainTo(clients);
        if (clients.size() > 0) {
            for (HThriftClient hThriftClient : clients) {
                hThriftClient.close();
            }
        }
        log.error("Shutdown complete on {}", (Object)this.getName());
    }

    public CassandraHost getCassandraHost() {
        return this.cassandraHost;
    }

    @Override
    public String getName() {
        return String.format("<ConcurrentCassandraClientPoolByHost>:{%s}", this.cassandraHost.getName());
    }

    @Override
    public int getNumActive() {
        return this.numActive.intValue();
    }

    public int getNumBeforeExhausted() {
        return this.cassandraHost.getMaxActive() - this.numActive.intValue();
    }

    @Override
    public int getNumBlockedThreads() {
        return this.numBlocked.intValue();
    }

    @Override
    public int getNumIdle() {
        return this.availableClientQueue.size();
    }

    public boolean isExhausted() {
        return this.getNumBeforeExhausted() == 0;
    }

    public int getMaxActive() {
        return this.cassandraHost.getMaxActive();
    }

    @Override
    public boolean getIsActive() {
        return this.active.get();
    }

    public String getStatusAsString() {
        return String.format("%s; IsActive?: %s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d", this.getName(), this.getIsActive(), this.getNumActive(), this.getNumBlockedThreads(), this.getNumIdle(), this.getNumBeforeExhausted());
    }

    public void releaseClient(HThriftClient client) throws HectorException {
        this.activeClients.remove((Object)client);
        this.numActive.decrementAndGet();
        boolean open = client.isOpen();
        if (open) {
            if (this.active.get()) {
                this.addClientToPoolGently(client);
            } else {
                log.info("Open client {} released to in-active pool for host {}. Closing.", (Object)client, (Object)this.cassandraHost);
                client.close();
            }
        } else if (this.activeClients.size() < this.getMaxActive() && this.numBlocked.get() > 0) {
            this.addClientToPoolGently(new HThriftClient(this.cassandraHost).open());
        }
        if (log.isDebugEnabled()) {
            log.debug("Status of releaseClient {} to queue: {}", (Object)client.toString(), (Object)open);
        }
    }

    private void addClientToPoolGently(HThriftClient client) {
        try {
            this.availableClientQueue.add(client);
        }
        catch (IllegalStateException ise) {
            log.error("Capacity hit adding client back to queue. Closing extra.");
            client.close();
        }
    }
}

