/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Charsets;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.RawColumnDefinition;
import org.apache.cassandra.config.RawColumnFamily;
import org.apache.cassandra.config.RawKeyspace;
import org.apache.cassandra.db.BinaryVerbHandler;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DefinitionsAnnounceVerbHandler;
import org.apache.cassandra.db.DefinitionsUpdateResponseVerbHandler;
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadRepairVerbHandler;
import org.apache.cassandra.db.ReadVerbHandler;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutationVerbHandler;
import org.apache.cassandra.db.SchemaCheckVerbHandler;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TruncateVerbHandler;
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.GossipDigestAck2VerbHandler;
import org.apache.cassandra.gms.GossipDigestAckVerbHandler;
import org.apache.cassandra.gms.GossipDigestSynVerbHandler;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.ConsistencyChecker;
import org.apache.cassandra.service.GCInspector;
import org.apache.cassandra.service.IndexScanVerbHandler;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.RangeSliceVerbHandler;
import org.apache.cassandra.service.StorageLoadBalancer;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.ReplicationFinishedVerbHandler;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamReplyVerbHandler;
import org.apache.cassandra.streaming.StreamRequestVerbHandler;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SkipNullRepresenter;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Dumper;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;

public class StorageService
implements IEndpointStateChangeSubscriber,
StorageServiceMBean {
    private static Logger logger_ = LoggerFactory.getLogger(StorageService.class);
    public static final int RING_DELAY = 30000;
    public static final Verb[] VERBS = Verb.values();
    public static final EnumMap<Verb, Stage> verbStages = new EnumMap<Verb, Stage>(Verb.class){
        {
            this.put(Verb.MUTATION, Stage.MUTATION);
            this.put(Verb.BINARY, Stage.MUTATION);
            this.put(Verb.READ_REPAIR, Stage.MUTATION);
            this.put(Verb.READ, Stage.READ);
            this.put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
            this.put(Verb.STREAM_REPLY, Stage.MISC);
            this.put(Verb.STREAM_REQUEST, Stage.STREAM);
            this.put(Verb.RANGE_SLICE, Stage.READ);
            this.put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
            this.put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
            this.put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
            this.put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
            this.put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
            this.put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
            this.put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
            this.put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
            this.put(Verb.TRUNCATE, Stage.MUTATION);
            this.put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
            this.put(Verb.INDEX_SCAN, Stage.READ);
            this.put(Verb.REPLICATION_FINISHED, Stage.MISC);
            this.put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
        }
    };
    public static final RetryingScheduledThreadPoolExecutor scheduledTasks = new RetryingScheduledThreadPoolExecutor("ScheduledTasks");
    private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
    public static VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
    public static final StorageService instance = new StorageService();
    private TokenMetadata tokenMetadata_ = new TokenMetadata();
    private ExecutorService consistencyManager_ = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(), DatabaseDescriptor.getConsistencyThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("ReadRepair"), "request");
    private Set<InetAddress> replicatingNodes;
    private InetAddress removingNode;
    private boolean isBootstrapMode;
    private boolean isClientMode;
    private boolean initialized;
    private String operationMode;
    private MigrationManager migrationManager = new MigrationManager();
    private volatile int totalCFs;
    private volatile int remainingCFs;

    public static IPartitioner getPartitioner() {
        return partitioner_;
    }

    public Collection<Range> getLocalRanges(String table) {
        return this.getRangesForEndpoint(table, FBUtilities.getLocalAddress());
    }

    public Range getLocalPrimaryRange() {
        return this.getPrimaryRangeForEndpoint(FBUtilities.getLocalAddress());
    }

    public void finishBootstrapping() {
        this.isBootstrapMode = false;
        this.setToken(this.getLocalToken());
        logger_.info("Bootstrap/move completed! Now serving reads.");
    }

    public void setToken(Token token) {
        if (logger_.isDebugEnabled()) {
            logger_.debug("Setting token to {}", (Object)token);
        }
        SystemTable.updateToken(token);
        this.tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(this.getLocalToken()));
        this.setMode("Normal", false);
    }

    public StorageService() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=StorageService"));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.INDEX_SCAN, new IndexScanVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new StreamReplyVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new AntiEntropyService.TreeRequestVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new DefinitionsAnnounceVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE, new DefinitionsUpdateResponseVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler());
        if (StreamingService.instance == null) {
            throw new RuntimeException("Streaming service is unavailable.");
        }
    }

    public void stopClient() {
        Gossiper.instance.unregister(this.migrationManager);
        Gossiper.instance.unregister(this);
        Gossiper.instance.stop();
        MessagingService.shutdown();
        StageManager.shutdownNow();
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public synchronized void initClient() throws IOException {
        if (this.initialized) {
            if (!this.isClientMode) {
                throw new UnsupportedOperationException("StorageService does not support switching modes.");
            }
            return;
        }
        this.initialized = true;
        this.isClientMode = true;
        logger_.info("Starting up client gossip");
        this.setMode("Client", false);
        Gossiper.instance.register(this);
        Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000L));
        MessagingService.instance.listen(FBUtilities.getLocalAddress());
        try {
            Thread.sleep(5000L);
        }
        catch (Exception ex) {
            throw new IOError(ex);
        }
        MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
    }

    public synchronized void initServer() throws IOException, ConfigurationException {
        Token token;
        logger_.info("Cassandra version: " + FBUtilities.getReleaseVersionString());
        logger_.info("Thrift API version: 19.4.0");
        if (this.initialized) {
            if (this.isClientMode) {
                throw new UnsupportedOperationException("StorageService does not support switching modes.");
            }
            return;
        }
        this.initialized = true;
        this.isClientMode = false;
        try {
            GCInspector.instance.start();
        }
        catch (Throwable t) {
            logger_.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
        }
        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) {
            logger_.info("Loading persisted ring state");
            for (Map.Entry<Token, InetAddress> entry : SystemTable.loadTokens().entrySet()) {
                this.tokenMetadata_.updateNormalToken(entry.getKey(), entry.getValue());
                Gossiper.instance.addSavedEndpoint(entry.getValue());
            }
        }
        logger_.info("Starting up server gossip");
        Gossiper.instance.register(this);
        Gossiper.instance.register(this.migrationManager);
        Gossiper.instance.start(FBUtilities.getLocalAddress(), SystemTable.incrementAndGetGeneration());
        MessagingService.instance.listen(FBUtilities.getLocalAddress());
        StorageLoadBalancer.instance.startBroadcasting();
        MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
        if (DatabaseDescriptor.isAutoBootstrap() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) && !SystemTable.isBootstrapped()) {
            logger_.info("This node will not auto bootstrap because it is configured to be a seed node.");
        }
        if (DatabaseDescriptor.isAutoBootstrap() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) && !SystemTable.isBootstrapped()) {
            this.setMode("Joining: getting load information", true);
            StorageLoadBalancer.instance.waitForLoadInfo();
            if (logger_.isDebugEnabled()) {
                logger_.debug("... got load info");
            }
            if (this.tokenMetadata_.isMember(FBUtilities.getLocalAddress())) {
                String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
                throw new UnsupportedOperationException(s);
            }
            this.setMode("Joining: getting bootstrap token", true);
            token = BootStrapper.getBootstrapToken(this.tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo());
            if (DatabaseDescriptor.getNonSystemTables().size() > 0) {
                this.bootstrap(token);
                assert (!this.isBootstrapMode);
            }
        } else {
            token = SystemTable.getSavedToken();
            if (token == null) {
                String initialToken = DatabaseDescriptor.getInitialToken();
                if (initialToken == null) {
                    token = partitioner_.getRandomToken();
                    logger_.warn("Generated random token " + token + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
                } else {
                    token = partitioner_.getTokenFactory().fromString(initialToken);
                    logger_.info("Saved token not found. Using " + token + " from configuration");
                }
            } else {
                logger_.info("Using saved token " + token);
            }
        }
        SystemTable.setBootstrapped(true);
        this.setToken(token);
        assert (this.tokenMetadata_.sortedTokens().size() > 0);
    }

    private void setMode(String m, boolean log) {
        this.operationMode = m;
        if (log) {
            logger_.info(m);
        }
    }

    private void bootstrap(Token token) throws IOException {
        this.isBootstrapMode = true;
        SystemTable.updateToken(token);
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.bootstrapping(token));
        this.setMode("Joining: sleeping 30000 ms for pending range setup", true);
        try {
            Thread.sleep(30000L);
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
        this.setMode("Bootstrapping", true);
        new BootStrapper(FBUtilities.getLocalAddress(), token, this.tokenMetadata_).bootstrap();
    }

    public boolean isBootstrapMode() {
        return this.isBootstrapMode;
    }

    public TokenMetadata getTokenMetadata() {
        return this.tokenMetadata_;
    }

    public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource) {
        List<InetAddress> endpoints = instance.getLiveNaturalEndpoints(command.table, command.key);
        if (endpoints.size() > 1) {
            this.consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints, dataSource));
        }
    }

    @Override
    public Map<Range, List<String>> getRangeToEndpointMap(String keyspace) {
        if (keyspace == null) {
            keyspace = DatabaseDescriptor.getNonSystemTables().get(0);
        }
        HashMap<Range, List<String>> map = new HashMap<Range, List<String>>();
        for (Map.Entry<Range, List<InetAddress>> entry : this.getRangeToAddressMap(keyspace).entrySet()) {
            map.put(entry.getKey(), this.stringify((Iterable<InetAddress>)entry.getValue()));
        }
        return map;
    }

    @Override
    public Map<Range, List<String>> getPendingRangeToEndpointMap(String keyspace) {
        if (keyspace == null) {
            keyspace = DatabaseDescriptor.getNonSystemTables().get(0);
        }
        HashMap<Range, List<String>> map = new HashMap<Range, List<String>>();
        for (Map.Entry<Range, Collection<InetAddress>> entry : this.tokenMetadata_.getPendingRanges(keyspace).entrySet()) {
            ArrayList<InetAddress> l = new ArrayList<InetAddress>(entry.getValue());
            map.put(entry.getKey(), this.stringify(l));
        }
        return map;
    }

    public Map<Range, List<InetAddress>> getRangeToAddressMap(String keyspace) {
        List<Range> ranges = this.getAllRanges(this.tokenMetadata_.sortedTokens());
        return this.constructRangeToEndpointMap(keyspace, ranges);
    }

    @Override
    public Map<Token, String> getTokenToEndpointMap() {
        Map<Token, InetAddress> mapInetAddress = this.tokenMetadata_.getTokenToEndpointMap();
        HashMap<Token, String> mapString = new HashMap<Token, String>(mapInetAddress.size());
        for (Map.Entry<Token, InetAddress> entry : mapInetAddress.entrySet()) {
            mapString.put(entry.getKey(), entry.getValue().getHostAddress());
        }
        return mapString;
    }

    private Map<Range, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range> ranges) {
        HashMap<Range, List<InetAddress>> rangeToEndpointMap = new HashMap<Range, List<InetAddress>>();
        for (Range range : ranges) {
            rangeToEndpointMap.put(range, Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
        }
        return rangeToEndpointMap;
    }

    @Override
    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
        if (state != ApplicationState.STATUS) {
            return;
        }
        String apStateValue = value.value;
        String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
        assert (pieces.length > 0);
        String moveName = pieces[0];
        if (moveName.equals("BOOT")) {
            this.handleStateBootstrap(endpoint, pieces);
        } else if (moveName.equals("NORMAL")) {
            this.handleStateNormal(endpoint, pieces);
        } else if (moveName.equals("LEAVING")) {
            this.handleStateLeaving(endpoint, pieces);
        } else if (moveName.equals("LEFT")) {
            this.handleStateLeft(endpoint, pieces);
        }
    }

    private void handleStateBootstrap(InetAddress endpoint, String[] pieces) {
        assert (pieces.length == 2);
        Token token = StorageService.getPartitioner().getTokenFactory().fromString(pieces[1]);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Node " + endpoint + " state bootstrapping, token " + token);
        }
        if (this.tokenMetadata_.isMember(endpoint)) {
            if (!this.tokenMetadata_.isLeaving(endpoint)) {
                logger_.info("Node " + endpoint + " state jump to bootstrap");
            }
            this.tokenMetadata_.removeEndpoint(endpoint);
        }
        this.tokenMetadata_.addBootstrapToken(token, endpoint);
        this.calculatePendingRanges();
    }

    private void handleStateNormal(InetAddress endpoint, String[] pieces) {
        InetAddress currentNode;
        assert (pieces.length >= 2);
        Token token = StorageService.getPartitioner().getTokenFactory().fromString(pieces[1]);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Node " + endpoint + " state normal, token " + token);
        }
        if (this.tokenMetadata_.isMember(endpoint)) {
            logger_.info("Node " + endpoint + " state jump to normal");
        }
        if ((currentNode = this.tokenMetadata_.getEndpoint(token)) == null) {
            logger_.debug("New node " + endpoint + " at token " + token);
            this.tokenMetadata_.updateNormalToken(token, endpoint);
            if (!this.isClientMode) {
                SystemTable.updateToken(endpoint, token);
            }
        } else if (!endpoint.equals(currentNode)) {
            if (Gossiper.instance.compareEndpointStartup(endpoint, currentNode) > 0) {
                logger_.info(String.format("Nodes %s and %s have the same token %s.  %s is the new owner", endpoint, currentNode, token, endpoint));
                this.tokenMetadata_.updateNormalToken(token, endpoint);
                if (!this.isClientMode) {
                    SystemTable.updateToken(endpoint, token);
                }
            } else {
                logger_.info(String.format("Nodes %s and %s have the same token %s.  Ignoring %s", endpoint, currentNode, token, endpoint));
            }
        }
        if (pieces.length > 2) {
            assert (pieces.length == 4);
            this.handleStateRemoving(endpoint, StorageService.getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]);
        }
        this.calculatePendingRanges();
    }

    private void handleStateLeaving(InetAddress endpoint, String[] pieces) {
        assert (pieces.length == 2);
        String moveValue = pieces[1];
        Token token = StorageService.getPartitioner().getTokenFactory().fromString(moveValue);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Node " + endpoint + " state leaving, token " + token);
        }
        if (!this.tokenMetadata_.isMember(endpoint)) {
            logger_.info("Node " + endpoint + " state jump to leaving");
            this.tokenMetadata_.updateNormalToken(token, endpoint);
        } else if (!this.tokenMetadata_.getToken(endpoint).equals(token)) {
            logger_.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?");
            this.tokenMetadata_.updateNormalToken(token, endpoint);
        }
        this.tokenMetadata_.addLeavingEndpoint(endpoint);
        this.calculatePendingRanges();
    }

    private void handleStateLeft(InetAddress endpoint, String[] pieces) {
        assert (pieces.length == 2);
        Token token = StorageService.getPartitioner().getTokenFactory().fromString(pieces[1]);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Node " + endpoint + " state left, token " + token);
        }
        this.excise(token, endpoint);
    }

    private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state) {
        InetAddress removeEndpoint = this.tokenMetadata_.getEndpoint(removeToken);
        if (removeEndpoint == null) {
            return;
        }
        if (removeEndpoint.equals(FBUtilities.getLocalAddress())) {
            logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
            return;
        }
        if ("removed".equals(state)) {
            this.excise(removeToken, removeEndpoint);
        } else if ("removing".equals(state)) {
            if (logger_.isDebugEnabled()) {
                logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")");
            }
            this.tokenMetadata_.addLeavingEndpoint(removeEndpoint);
            this.calculatePendingRanges();
            this.restoreReplicaCount(removeEndpoint, endpoint);
        }
    }

    private void excise(Token token, InetAddress endpoint) {
        Gossiper.instance.removeEndpoint(endpoint);
        this.tokenMetadata_.removeEndpoint(endpoint);
        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
        this.tokenMetadata_.removeBootstrapToken(token);
        this.calculatePendingRanges();
        if (!this.isClientMode) {
            logger_.info("Removing token " + token + " for " + endpoint);
            SystemTable.removeToken(token);
        }
    }

    private void calculatePendingRanges() {
        for (String table : DatabaseDescriptor.getNonSystemTables()) {
            StorageService.calculatePendingRanges(Table.open(table).getReplicationStrategy(), table);
        }
    }

    public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table) {
        TokenMetadata tm = instance.getTokenMetadata();
        HashMultimap pendingRanges = HashMultimap.create();
        Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
        Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
        if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty()) {
            if (logger_.isDebugEnabled()) {
                logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges for {}", (Object)table);
            }
            tm.setPendingRanges(table, (Multimap<Range, InetAddress>)pendingRanges);
            return;
        }
        Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges();
        TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
        HashSet affectedRanges = new HashSet();
        for (InetAddress inetAddress : leavingEndpoints) {
            affectedRanges.addAll(addressRanges.get((Object)inetAddress));
        }
        for (Range range : affectedRanges) {
            List<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm);
            List<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata);
            newEndpoints.removeAll(currentEndpoints);
            pendingRanges.putAll((Object)range, newEndpoints);
        }
        for (Map.Entry entry : bootstrapTokens.entrySet()) {
            InetAddress endpoint = (InetAddress)entry.getValue();
            allLeftMetadata.updateNormalToken((Token)entry.getKey(), endpoint);
            for (Range range : strategy.getAddressRanges(allLeftMetadata).get((Object)endpoint)) {
                pendingRanges.put((Object)range, (Object)endpoint);
            }
            allLeftMetadata.removeEndpoint(endpoint);
        }
        tm.setPendingRanges(table, (Multimap<Range, InetAddress>)pendingRanges);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
        }
    }

    private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges) {
        InetAddress myAddress = FBUtilities.getLocalAddress();
        Multimap<Range, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(this.tokenMetadata_);
        HashMultimap sourceRanges = HashMultimap.create();
        IFailureDetector failureDetector = FailureDetector.instance;
        block0: for (Range range : ranges) {
            Collection possibleRanges = rangeAddresses.get((Object)range);
            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
            List<InetAddress> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
            assert (!sources.contains(myAddress));
            for (InetAddress source : sources) {
                if (!failureDetector.isAlive(source)) continue;
                sourceRanges.put((Object)source, (Object)range);
                continue block0;
            }
        }
        return sourceRanges;
    }

    private void sendReplicationNotification(InetAddress local, InetAddress remote) {
        Message msg = new Message(local, Verb.REPLICATION_FINISHED, new byte[0]);
        IFailureDetector failureDetector = FailureDetector.instance;
        while (failureDetector.isAlive(remote)) {
            IAsyncResult iar = MessagingService.instance.sendRR(msg, remote);
            try {
                iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
                return;
            }
            catch (TimeoutException e) {
            }
        }
    }

    private void restoreReplicaCount(InetAddress endpoint, InetAddress notifyEndpoint) {
        HashMultimap fetchSources = HashMultimap.create();
        HashMultimap rangesToFetch = HashMultimap.create();
        InetAddress myAddress = FBUtilities.getLocalAddress();
        for (String table : DatabaseDescriptor.getNonSystemTables()) {
            Multimap<Range, InetAddress> changedRanges = this.getChangedRangesForLeaving(table, endpoint);
            HashSet<Range> myNewRanges = new HashSet<Range>();
            for (Map.Entry entry : changedRanges.entries()) {
                if (!((InetAddress)entry.getValue()).equals(myAddress)) continue;
                myNewRanges.add((Range)entry.getKey());
            }
            Multimap<InetAddress, Range> sourceRanges = this.getNewSourceRanges(table, myNewRanges);
            for (Map.Entry entry : sourceRanges.asMap().entrySet()) {
                fetchSources.put(entry.getKey(), (Object)table);
                rangesToFetch.put((Object)table, entry);
            }
        }
        for (String table : rangesToFetch.keySet()) {
            for (Map.Entry entry : rangesToFetch.get((Object)table)) {
                InetAddress source = (InetAddress)entry.getKey();
                Collection ranges = (Collection)entry.getValue();
                Runnable callback = new Runnable((Multimap)fetchSources, source, table, myAddress, notifyEndpoint){
                    final /* synthetic */ Multimap val$fetchSources;
                    final /* synthetic */ InetAddress val$source;
                    final /* synthetic */ String val$table;
                    final /* synthetic */ InetAddress val$myAddress;
                    final /* synthetic */ InetAddress val$notifyEndpoint;
                    {
                        this.val$fetchSources = multimap;
                        this.val$source = inetAddress;
                        this.val$table = string;
                        this.val$myAddress = inetAddress2;
                        this.val$notifyEndpoint = inetAddress3;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        Multimap multimap = this.val$fetchSources;
                        synchronized (multimap) {
                            this.val$fetchSources.remove((Object)this.val$source, (Object)this.val$table);
                            if (this.val$fetchSources.isEmpty()) {
                                StorageService.this.sendReplicationNotification(this.val$myAddress, this.val$notifyEndpoint);
                            }
                        }
                    }
                };
                if (logger_.isDebugEnabled()) {
                    logger_.debug("Requesting from " + source + " ranges " + StringUtils.join((Collection)ranges, (String)", "));
                }
                StreamIn.requestRanges(source, table, ranges, callback);
            }
        }
    }

    private Multimap<Range, InetAddress> getChangedRangesForLeaving(String table, InetAddress endpoint) {
        Collection<Range> ranges = this.getRangesForEndpoint(table, endpoint);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, (String)", ") + "]");
        }
        HashMap<Range, List<InetAddress>> currentReplicaEndpoints = new HashMap<Range, List<InetAddress>>();
        for (Range range : ranges) {
            currentReplicaEndpoints.put(range, Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, this.tokenMetadata_));
        }
        TokenMetadata temp = this.tokenMetadata_.cloneAfterAllLeft();
        if (temp.isMember(endpoint)) {
            temp.removeEndpoint(endpoint);
        }
        HashMultimap changedRanges = HashMultimap.create();
        for (Range range : ranges) {
            List<InetAddress> newReplicaEndpoints = Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
            newReplicaEndpoints.removeAll((Collection)currentReplicaEndpoints.get(range));
            if (logger_.isDebugEnabled()) {
                if (newReplicaEndpoints.isEmpty()) {
                    logger_.debug("Range " + range + " already in all replicas");
                } else {
                    logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, (String)", "));
                }
            }
            changedRanges.putAll((Object)range, newReplicaEndpoints);
        }
        return changedRanges;
    }

    @Override
    public void onJoin(InetAddress endpoint, EndpointState epState) {
        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) {
            this.onChange(endpoint, entry.getKey(), entry.getValue());
        }
    }

    @Override
    public void onAlive(InetAddress endpoint, EndpointState state) {
        if (!this.isClientMode) {
            this.deliverHints(endpoint);
        }
    }

    @Override
    public void onRemove(InetAddress endpoint) {
        this.tokenMetadata_.removeEndpoint(endpoint);
        this.calculatePendingRanges();
    }

    @Override
    public void onDead(InetAddress endpoint, EndpointState state) {
        MessagingService.instance.convict(endpoint);
    }

    @Override
    public double getLoad() {
        double bytes = 0.0;
        for (String tableName : DatabaseDescriptor.getTables()) {
            Table table = Table.open(tableName);
            for (ColumnFamilyStore cfs : table.getColumnFamilyStores()) {
                bytes += (double)cfs.getLiveDiskSpaceUsed();
            }
        }
        return bytes;
    }

    @Override
    public String getLoadString() {
        return FileUtils.stringifyFileSize(this.getLoad());
    }

    @Override
    public Map<String, String> getLoadMap() {
        HashMap<String, String> map = new HashMap<String, String>();
        for (Map.Entry<InetAddress, Double> entry : StorageLoadBalancer.instance.getLoadInfo().entrySet()) {
            map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
        }
        map.put(FBUtilities.getLocalAddress().getHostAddress(), this.getLoadString());
        return map;
    }

    public final void deliverHints(InetAddress endpoint) {
        HintedHandOffManager.instance.deliverHints(endpoint);
    }

    @Override
    public final void deliverHints(String host) throws UnknownHostException {
        HintedHandOffManager.instance.deliverHints(host);
    }

    public Token getLocalToken() {
        Token token = SystemTable.getSavedToken();
        assert (token != null);
        return token;
    }

    @Override
    public String getToken() {
        return this.getLocalToken().toString();
    }

    @Override
    public String getReleaseVersion() {
        return FBUtilities.getReleaseVersionString();
    }

    @Override
    public List<String> getLeavingNodes() {
        return this.stringify(this.tokenMetadata_.getLeavingEndpoints());
    }

    @Override
    public List<String> getJoiningNodes() {
        return this.stringify(this.tokenMetadata_.getBootstrapTokens().values());
    }

    @Override
    public List<String> getLiveNodes() {
        return this.stringify(Gossiper.instance.getLiveMembers());
    }

    @Override
    public List<String> getUnreachableNodes() {
        return this.stringify(Gossiper.instance.getUnreachableMembers());
    }

    private List<String> stringify(Iterable<InetAddress> endpoints) {
        ArrayList<String> stringEndpoints = new ArrayList<String>();
        for (InetAddress ep : endpoints) {
            stringEndpoints.add(ep.getHostAddress());
        }
        return stringEndpoints;
    }

    @Override
    public int getCurrentGenerationNumber() {
        return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
    }

    @Override
    public void forceTableCleanup(String tableName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException {
        if (tableName.equals("system")) {
            throw new RuntimeException("Cleanup of the system table is neither necessary nor wise");
        }
        for (ColumnFamilyStore cfStore : this.getValidColumnFamilies(tableName, columnFamilies)) {
            cfStore.forceCleanup();
        }
    }

    @Override
    public void forceTableCompaction(String tableName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException {
        for (ColumnFamilyStore cfStore : this.getValidColumnFamilies(tableName, columnFamilies)) {
            cfStore.forceMajorCompaction();
        }
    }

    @Override
    public void takeSnapshot(String tableName, String tag) throws IOException {
        Table tableInstance = this.getValidTable(tableName);
        tableInstance.snapshot(tag);
    }

    private Table getValidTable(String tableName) throws IOException {
        if (!DatabaseDescriptor.getTables().contains(tableName)) {
            throw new IOException("Table " + tableName + "does not exist");
        }
        return Table.open(tableName);
    }

    @Override
    public void takeAllSnapshot(String tag) throws IOException {
        for (Table table : Table.all()) {
            table.snapshot(tag);
        }
    }

    @Override
    public void clearSnapshot() throws IOException {
        for (Table table : Table.all()) {
            table.clearSnapshot();
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug("Cleared out all snapshot directories");
        }
    }

    public Iterable<ColumnFamilyStore> getValidColumnFamilies(String tableName, String ... cfNames) throws IOException {
        Table table = this.getValidTable(tableName);
        if (cfNames.length == 0) {
            return table.getColumnFamilyStores();
        }
        HashSet<ColumnFamilyStore> valid = new HashSet<ColumnFamilyStore>();
        for (String cfName : cfNames) {
            ColumnFamilyStore cfStore = table.getColumnFamilyStore(cfName);
            if (cfStore == null) {
                logger_.warn(String.format("Invalid column family specified: %s. Proceeding with others.", cfName));
                continue;
            }
            valid.add(cfStore);
        }
        return valid;
    }

    @Override
    public void forceTableFlush(String tableName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException {
        for (ColumnFamilyStore cfStore : this.getValidColumnFamilies(tableName, columnFamilies)) {
            logger_.debug("Forcing binary flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
            cfStore.forceFlushBinary();
            logger_.debug("Forcing flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
            cfStore.forceBlockingFlush();
        }
    }

    @Override
    public void forceTableRepair(String tableName, String ... columnFamilies) throws IOException {
        String[] families;
        if (columnFamilies.length == 0) {
            ArrayList<String> names = new ArrayList<String>();
            for (ColumnFamilyStore cfStore : this.getValidColumnFamilies(tableName, new String[0])) {
                names.add(cfStore.getColumnFamilyName());
            }
            families = names.toArray(new String[0]);
        } else {
            families = columnFamilies;
        }
        AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(tableName, families);
        try {
            sess.start();
            sess.join();
        }
        catch (InterruptedException e) {
            throw new IOException("Repair session " + sess + " failed.", e);
        }
    }

    InetAddress getPredecessor(InetAddress ep) {
        Token token = this.tokenMetadata_.getToken(ep);
        return this.tokenMetadata_.getEndpoint(this.tokenMetadata_.getPredecessor(token));
    }

    public InetAddress getSuccessor(InetAddress ep) {
        Token token = this.tokenMetadata_.getToken(ep);
        return this.tokenMetadata_.getEndpoint(this.tokenMetadata_.getSuccessor(token));
    }

    public Range getPrimaryRangeForEndpoint(InetAddress ep) {
        return this.tokenMetadata_.getPrimaryRangeFor(this.tokenMetadata_.getToken(ep));
    }

    Collection<Range> getRangesForEndpoint(String table, InetAddress ep) {
        return Table.open(table).getReplicationStrategy().getAddressRanges().get((Object)ep);
    }

    public List<Range> getAllRanges(List<Token> sortedTokens) {
        if (logger_.isDebugEnabled()) {
            logger_.debug("computing ranges for " + StringUtils.join(sortedTokens, (String)", "));
        }
        if (sortedTokens.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Range> ranges = new ArrayList<Range>();
        int size = sortedTokens.size();
        for (int i = 1; i < size; ++i) {
            Range range = new Range(sortedTokens.get(i - 1), sortedTokens.get(i));
            ranges.add(range);
        }
        Range range = new Range(sortedTokens.get(size - 1), sortedTokens.get(0));
        ranges.add(range);
        return ranges;
    }

    public List<InetAddress> getNaturalEndpoints(String table, ByteBuffer key) {
        return this.getNaturalEndpoints(table, (Token)partitioner_.getToken(key));
    }

    @Override
    public List<InetAddress> getNaturalEndpoints(String table, byte[] key) {
        return this.getNaturalEndpoints(table, ByteBuffer.wrap(key));
    }

    public List<InetAddress> getNaturalEndpoints(String table, Token token) {
        return Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
    }

    public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key) {
        return this.getLiveNaturalEndpoints(table, (Token)partitioner_.getToken(key));
    }

    public List<InetAddress> getLiveNaturalEndpoints(String table, Token token) {
        ArrayList<InetAddress> liveEps = new ArrayList<InetAddress>();
        ArrayList<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
        for (InetAddress endpoint : endpoints) {
            if (!FailureDetector.instance.isAlive(endpoint)) continue;
            liveEps.add(endpoint);
        }
        return liveEps;
    }

    public InetAddress findSuitableEndpoint(String table, ByteBuffer key) throws IOException, UnavailableException {
        List<InetAddress> endpoints = this.getNaturalEndpoints(table, key);
        DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
        if (logger_.isDebugEnabled()) {
            logger_.debug("Sorted endpoints are " + StringUtils.join(endpoints, (String)","));
        }
        for (InetAddress endpoint : endpoints) {
            if (!FailureDetector.instance.isAlive(endpoint)) continue;
            return endpoint;
        }
        throw new UnavailableException();
    }

    @Override
    public void setLog4jLevel(String classQualifier, String rawLevel) {
        Level level = Level.toLevel((String)rawLevel);
        org.apache.log4j.Logger.getLogger((String)classQualifier).setLevel(level);
        logger_.info("set log level to " + level + " for classes under '" + classQualifier + "' (if the level doesn't look like '" + rawLevel + "' then log4j couldn't parse '" + rawLevel + "')");
    }

    public List<Token> getSplits(String table, String cfName, Range range, int keysPerSplit) {
        ArrayList<Token> tokens = new ArrayList<Token>();
        tokens.add(range.left);
        ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
        Table t = Table.open(table);
        ColumnFamilyStore cfs = t.getColumnFamilyStore(cfName);
        for (DecoratedKey sample : cfs.allKeySamples()) {
            if (!range.contains((Token)sample.token)) continue;
            keys.add(sample);
        }
        FBUtilities.sortSampledKeys(keys, range);
        int splits = keys.size() * DatabaseDescriptor.getIndexInterval() / keysPerSplit;
        if (keys.size() >= splits) {
            for (int i = 1; i < splits; ++i) {
                int index = i * (keys.size() / splits);
                tokens.add((Token)((DecoratedKey)keys.get((int)index)).token);
            }
        }
        tokens.add(range.right);
        return tokens;
    }

    public Token getBootstrapToken() {
        Range range = this.getLocalPrimaryRange();
        ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            for (DecoratedKey key : cfs.allKeySamples()) {
                if (!range.contains((Token)key.token)) continue;
                keys.add(key);
            }
        }
        FBUtilities.sortSampledKeys(keys, range);
        if (keys.size() < 3) {
            return partitioner_.midpoint(range.left, range.right);
        }
        return ((DecoratedKey)keys.get((int)(keys.size() / 2))).token;
    }

    private void startLeaving() {
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(this.getLocalToken()));
        this.tokenMetadata_.addLeavingEndpoint(FBUtilities.getLocalAddress());
        this.calculatePendingRanges();
    }

    @Override
    public void decommission() throws InterruptedException {
        if (!this.tokenMetadata_.isMember(FBUtilities.getLocalAddress())) {
            throw new UnsupportedOperationException("local node is not a member of the token ring yet");
        }
        if (this.tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2) {
            throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
        }
        for (String table : DatabaseDescriptor.getNonSystemTables()) {
            if (this.tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() <= 0) continue;
            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug("DECOMMISSIONING");
        }
        this.startLeaving();
        this.setMode("Leaving: sleeping 30000 ms for pending range setup", true);
        Thread.sleep(30000L);
        Runnable finishLeaving = new Runnable(){

            @Override
            public void run() {
                Gossiper.instance.stop();
                MessagingService.shutdown();
                StageManager.shutdownNow();
                StorageService.this.setMode("Decommissioned", true);
            }
        };
        this.unbootstrap(finishLeaving);
    }

    private void leaveRing() {
        SystemTable.setBootstrapped(false);
        this.tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
        this.calculatePendingRanges();
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(this.getLocalToken()));
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    private void unbootstrap(Runnable onFinish) {
        final CountDownLatch latch = new CountDownLatch(DatabaseDescriptor.getNonSystemTables().size());
        for (final String table : DatabaseDescriptor.getNonSystemTables()) {
            Multimap<Range, InetAddress> rangesMM = this.getChangedRangesForLeaving(table, FBUtilities.getLocalAddress());
            if (logger_.isDebugEnabled()) {
                logger_.debug("Ranges needing transfer are [" + StringUtils.join((Collection)rangesMM.keySet(), (String)",") + "]");
            }
            if (rangesMM.isEmpty()) {
                latch.countDown();
                continue;
            }
            this.setMode("Leaving: streaming data to other nodes", true);
            final HashSet pending = new HashSet(rangesMM.entries());
            for (final Map.Entry entry : rangesMM.entries()) {
                final Range range = (Range)entry.getKey();
                final InetAddress newEndpoint = (InetAddress)entry.getValue();
                final Runnable callback = new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        Set set = pending;
                        synchronized (set) {
                            pending.remove(entry);
                            if (pending.isEmpty()) {
                                latch.countDown();
                            }
                        }
                    }
                };
                StageManager.getStage(Stage.STREAM).execute(new Runnable(){

                    @Override
                    public void run() {
                        StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback);
                    }
                });
            }
        }
        logger_.debug("waiting for stream aks.");
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        logger_.debug("stream acks all received.");
        this.leaveRing();
        onFinish.run();
    }

    @Override
    public void move(String newToken) throws IOException, InterruptedException {
        this.move(partitioner_.getTokenFactory().fromString(newToken));
    }

    @Override
    public void loadBalance() throws IOException, InterruptedException {
        this.move((Token)null);
    }

    private void move(final Token token) throws IOException, InterruptedException {
        for (String table : DatabaseDescriptor.getTables()) {
            if (this.tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() <= 0) continue;
            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
        }
        if (token != null && this.tokenMetadata_.sortedTokens().contains(token)) {
            throw new IOException("target token " + token + " is already owned by another node");
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug("Leaving: old token was " + this.getLocalToken());
        }
        this.startLeaving();
        this.setMode("Leaving: sleeping 30000 ms for pending range setup", true);
        Thread.sleep(30000L);
        WrappedRunnable finishMoving = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws IOException {
                Token bootstrapToken = token;
                if (bootstrapToken == null) {
                    StorageLoadBalancer.instance.waitForLoadInfo();
                    bootstrapToken = BootStrapper.getBalancedToken(StorageService.this.tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo());
                }
                logger_.info("re-bootstrapping to new token {}", (Object)bootstrapToken);
                StorageService.this.bootstrap(bootstrapToken);
            }
        };
        this.unbootstrap(finishMoving);
    }

    @Override
    public String getRemovalStatus() {
        if (this.removingNode == null) {
            return "No token removals in process.";
        }
        return String.format("Removing token (%s). Waiting for replication confirmation from [%s].", this.tokenMetadata_.getToken(this.removingNode), StringUtils.join(this.replicatingNodes, (String)","));
    }

    @Override
    public void forceRemoveCompletion() {
        if (!this.replicatingNodes.isEmpty()) {
            logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, (String)","));
        }
        this.replicatingNodes.clear();
    }

    @Override
    public void removeToken(String tokenString) {
        InetAddress myAddress = FBUtilities.getLocalAddress();
        Token localToken = this.tokenMetadata_.getToken(myAddress);
        Token token = partitioner_.getTokenFactory().fromString(tokenString);
        InetAddress endpoint = this.tokenMetadata_.getEndpoint(token);
        if (endpoint == null) {
            throw new UnsupportedOperationException("Token not found.");
        }
        if (endpoint.equals(myAddress)) {
            throw new UnsupportedOperationException("Cannot remove node's own token");
        }
        if (Gossiper.instance.getLiveMembers().contains(endpoint)) {
            throw new UnsupportedOperationException("Node " + endpoint + " is alive and owns this token. Use decommission command to remove it from the ring");
        }
        if (this.tokenMetadata_.isLeaving(endpoint)) {
            throw new UnsupportedOperationException("Node " + endpoint + " is already being removed.");
        }
        if (this.replicatingNodes != null) {
            throw new UnsupportedOperationException("This node is already processing a removal. Wait for it to complete.");
        }
        this.replicatingNodes = Collections.synchronizedSet(new HashSet());
        for (String table : DatabaseDescriptor.getNonSystemTables()) {
            if (Table.open(table).getReplicationStrategy().getReplicationFactor() == 1) continue;
            Multimap<Range, InetAddress> changedRanges = this.getChangedRangesForLeaving(table, endpoint);
            IFailureDetector failureDetector = FailureDetector.instance;
            for (InetAddress ep : changedRanges.values()) {
                if (failureDetector.isAlive(ep)) {
                    this.replicatingNodes.add(ep);
                    continue;
                }
                logger_.warn("Endpoint " + ep + " is down and will not receive data for re-replication of " + endpoint);
            }
        }
        this.removingNode = endpoint;
        this.tokenMetadata_.addLeavingEndpoint(endpoint);
        this.calculatePendingRanges();
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token));
        this.restoreReplicaCount(endpoint, myAddress);
        while (!this.replicatingNodes.isEmpty()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
        }
        this.excise(token, endpoint);
        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token));
        this.replicatingNodes = null;
        this.removingNode = null;
    }

    public void confirmReplication(InetAddress node) {
        assert (this.replicatingNodes != null);
        this.replicatingNodes.remove(node);
    }

    public boolean isClientMode() {
        return this.isClientMode;
    }

    public synchronized void requestGC() {
        if (this.hasUnreclaimedSpace()) {
            logger_.info("requesting GC to free disk space");
            System.gc();
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
        }
    }

    private boolean hasUnreclaimedSpace() {
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            if (!cfs.hasUnreclaimedSpace()) continue;
            return true;
        }
        return false;
    }

    @Override
    public String getOperationMode() {
        return this.operationMode;
    }

    @Override
    public String getDrainProgress() {
        return String.format("Drained %s/%s ColumnFamilies", this.remainingCFs, this.totalCFs);
    }

    @Override
    public synchronized void drain() throws IOException, InterruptedException, ExecutionException {
        ThreadPoolExecutor mutationStage = StageManager.getStage(Stage.MUTATION);
        if (mutationStage.isTerminated()) {
            logger_.warn("Cannot drain node (did it already happen?)");
            return;
        }
        this.setMode("Starting drain process", true);
        Gossiper.instance.stop();
        this.setMode("Draining: shutting down MessageService", false);
        MessagingService.shutdown();
        this.setMode("Draining: emptying MessageService pools", false);
        MessagingService.waitFor();
        this.setMode("Draining: clearing mutation stage", false);
        mutationStage.shutdown();
        mutationStage.awaitTermination(3600L, TimeUnit.SECONDS);
        this.setMode("Draining: flushing column families", false);
        ArrayList<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
        for (String tableName : DatabaseDescriptor.getNonSystemTables()) {
            Table table = Table.open(tableName);
            cfses.addAll(table.getColumnFamilyStores());
        }
        this.totalCFs = this.remainingCFs = cfses.size();
        for (ColumnFamilyStore cfs : cfses) {
            cfs.forceBlockingFlush();
            --this.remainingCFs;
        }
        ColumnFamilyStore.postFlushExecutor.shutdown();
        ColumnFamilyStore.postFlushExecutor.awaitTermination(60L, TimeUnit.SECONDS);
        DeletionService.waitFor();
        this.setMode("Node is drained", true);
    }

    @Override
    public void loadSchemaFromYAML() throws ConfigurationException, IOException {
        Migration migration;
        final Collection<KSMetaData> tables = DatabaseDescriptor.readTablesFromYaml();
        for (KSMetaData table : tables) {
            if (!table.name.matches("\\w+")) {
                throw new ConfigurationException("Invalid table name: " + table.name);
            }
            for (CFMetaData cfm : table.cfMetaData().values()) {
                if (Migration.isLegalName(cfm.cfName)) continue;
                throw new ConfigurationException("Invalid column family name: " + cfm.cfName);
            }
        }
        Callable<Migration> call = new Callable<Migration>(){

            @Override
            public Migration call() throws Exception {
                if (DatabaseDescriptor.getDefsVersion().timestamp() > 0L || Migration.getLastMigrationId() != null) {
                    throw new ConfigurationException("Cannot import schema when one already exists");
                }
                AddKeyspace migration = null;
                for (KSMetaData table : tables) {
                    migration = new AddKeyspace(table);
                    migration.apply();
                }
                return migration;
            }
        };
        try {
            migration = StageManager.getStage(Stage.MIGRATION).submit(call).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof ConfigurationException) {
                throw (ConfigurationException)e.getCause();
            }
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            if (e.getCause() instanceof Exception) {
                throw new ConfigurationException(e.getCause().getMessage(), (Exception)e.getCause());
            }
            throw new RuntimeException(e);
        }
        assert (DatabaseDescriptor.getDefsVersion().timestamp() > 0L);
        DefsTable.dumpToStorage(DatabaseDescriptor.getDefsVersion());
        ArrayList flushers = new ArrayList();
        flushers.addAll(Table.open("system").flush());
        for (Future future : flushers) {
            try {
                future.get();
            }
            catch (Exception e) {
                ConfigurationException ce = new ConfigurationException(e.getMessage());
                ce.initCause(e);
                throw ce;
            }
        }
        if (migration != null) {
            migration.announce();
        }
    }

    @Override
    public String exportSchema() throws IOException {
        ArrayList<RawKeyspace> keyspaces = new ArrayList<RawKeyspace>();
        for (String ksname : DatabaseDescriptor.getNonSystemTables()) {
            KSMetaData ksm = DatabaseDescriptor.getTableDefinition(ksname);
            RawKeyspace rks = new RawKeyspace();
            rks.name = ksm.name;
            rks.replica_placement_strategy = ksm.strategyClass.getName();
            rks.replication_factor = ksm.replicationFactor;
            rks.column_families = new RawColumnFamily[ksm.cfMetaData().size()];
            int i = 0;
            for (CFMetaData cfm : ksm.cfMetaData().values()) {
                RawColumnFamily rcf = new RawColumnFamily();
                rcf.name = cfm.cfName;
                rcf.compare_with = cfm.comparator.getClass().getName();
                rcf.default_validation_class = cfm.getDefaultValidator().getClass().getName();
                rcf.compare_subcolumns_with = cfm.subcolumnComparator == null ? null : cfm.subcolumnComparator.getClass().getName();
                rcf.column_type = cfm.cfType;
                rcf.comment = cfm.getComment();
                rcf.keys_cached = cfm.getKeyCacheSize();
                rcf.read_repair_chance = cfm.getReadRepairChance();
                rcf.gc_grace_seconds = cfm.getGcGraceSeconds();
                rcf.rows_cached = cfm.getRowCacheSize();
                rcf.column_metadata = new RawColumnDefinition[cfm.getColumn_metadata().size()];
                int j = 0;
                for (ColumnDefinition cd : cfm.getColumn_metadata().values()) {
                    RawColumnDefinition rcd = new RawColumnDefinition();
                    rcd.index_name = cd.getIndexName();
                    rcd.index_type = cd.getIndexType();
                    rcd.name = ByteBufferUtil.string(cd.name, Charsets.UTF_8);
                    rcd.validator_class = cd.validator.getClass().getName();
                    rcf.column_metadata[j++] = rcd;
                }
                if (j == 0) {
                    rcf.column_metadata = null;
                }
                rks.column_families[i++] = rcf;
            }
            keyspaces.add(rks);
        }
        DumperOptions options = new DumperOptions();
        options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
        SkipNullRepresenter representer = new SkipNullRepresenter();
        representer.addClassTag(RawColumnFamily.class, Tag.MAP);
        representer.addClassTag(Keyspaces.class, Tag.MAP);
        representer.addClassTag(ColumnDefinition.class, Tag.MAP);
        Dumper dumper = new Dumper((Representer)representer, options);
        Yaml yaml = new Yaml(dumper);
        Keyspaces ks = new Keyspaces();
        ks.keyspaces = keyspaces;
        return yaml.dump((Object)ks);
    }

    IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) {
        IPartitioner oldPartitioner = partitioner_;
        partitioner_ = newPartitioner;
        valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
        return oldPartitioner;
    }

    TokenMetadata setTokenMetadataUnsafe(TokenMetadata tmd) {
        TokenMetadata old = this.tokenMetadata_;
        this.tokenMetadata_ = tmd;
        return old;
    }

    @Override
    public void truncate(String keyspace, String columnFamily) throws UnavailableException, TimeoutException, IOException {
        StorageProxy.truncateBlocking(keyspace, columnFamily);
    }

    @Override
    public void saveCaches() throws ExecutionException, InterruptedException {
        ArrayList futures = new ArrayList();
        logger_.debug("submitting cache saves");
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            futures.add(cfs.submitKeyCacheWrite());
            futures.add(cfs.submitRowCacheWrite());
        }
        FBUtilities.waitOnFutures(futures);
        logger_.debug("cache saves completed");
    }

    @Override
    public Map<Token, Float> getOwnership() {
        ArrayList<Token> sortedTokens = new ArrayList<Token>(this.getTokenToEndpointMap().keySet());
        Collections.sort(sortedTokens);
        return partitioner_.describeOwnership(sortedTokens);
    }

    @Override
    public List<String> getKeyspaces() {
        ArrayList<String> tableslist = new ArrayList<String>(DatabaseDescriptor.getTables());
        return Collections.unmodifiableList(tableslist);
    }

    public class Keyspaces {
        public List<RawKeyspace> keyspaces;
    }

    public static enum Verb {
        MUTATION,
        BINARY,
        READ_REPAIR,
        READ,
        REQUEST_RESPONSE,
        STREAM_INITIATE,
        STREAM_INITIATE_DONE,
        STREAM_REPLY,
        STREAM_REQUEST,
        RANGE_SLICE,
        BOOTSTRAP_TOKEN,
        TREE_REQUEST,
        TREE_RESPONSE,
        JOIN,
        GOSSIP_DIGEST_SYN,
        GOSSIP_DIGEST_ACK,
        GOSSIP_DIGEST_ACK2,
        DEFINITIONS_ANNOUNCE,
        DEFINITIONS_UPDATE_RESPONSE,
        TRUNCATE,
        SCHEMA_CHECK,
        INDEX_SCAN,
        REPLICATION_FINISHED,
        INTERNAL_RESPONSE;

    }
}

