/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.zookeeper;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.util.StateHandleStoreUtils;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.ACLPathAndBytesable;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.SetDataBuilder;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperStateHandleStore<T extends Serializable>
implements StateHandleStore<T, IntegerResourceVersion> {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS = new HashSet<Class>(Arrays.asList(KeeperException.NodeExistsException.class, KeeperException.BadArgumentsException.class, KeeperException.NoNodeException.class, KeeperException.NoAuthException.class, KeeperException.BadVersionException.class, KeeperException.AuthFailedException.class, KeeperException.InvalidACLException.class, KeeperException.SessionMovedException.class, KeeperException.NotReadOnlyException.class));
    private final CuratorFramework client;
    private final RetrievableStateStorageHelper<T> storage;
    private final String lockNode;

    public ZooKeeperStateHandleStore(CuratorFramework client, RetrievableStateStorageHelper<T> storage) {
        this.client = Preconditions.checkNotNull(client, "Curator client");
        this.storage = Preconditions.checkNotNull(storage, "State storage");
        this.lockNode = UUID.randomUUID().toString();
    }

    @Override
    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws PossibleInconsistentStateException, Exception {
        Preconditions.checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
        Preconditions.checkNotNull(state, "State");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        Optional<Stat> maybeStat = this.getStat(path);
        if (maybeStat.isPresent()) {
            if (ZooKeeperStateHandleStore.isNotMarkedForDeletion(maybeStat.get())) {
                throw new StateHandleStore.AlreadyExistException(String.format("ZooKeeper node %s already exists.", path));
            }
            Preconditions.checkState(this.releaseAndTryRemove(path), "The state is marked for deletion and, therefore, should be deletable.");
        }
        RetrievableStateHandle<T> storeHandle = this.storage.store(state);
        byte[] serializedStoreHandle = StateHandleStoreUtils.serializeOrDiscard(storeHandle);
        try {
            this.writeStoreHandleTransactionally(path, serializedStoreHandle);
            return storeHandle;
        }
        catch (KeeperException.NodeExistsException e) {
            return storeHandle;
        }
        catch (Exception e) {
            if (this.indicatesPossiblyInconsistentState(e)) {
                throw new PossibleInconsistentStateException(e);
            }
            storeHandle.discardState();
            throw e;
        }
    }

    @VisibleForTesting
    void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception {
        ((CuratorTransactionBridge)((ACLPathAndBytesable)((CuratorTransactionBridge)((ACLPathAndBytesable)((CuratorTransactionBridge)((ACLPathAndBytesable)this.client.inTransaction().create().withMode(CreateMode.PERSISTENT)).forPath(path, serializedStoreHandle)).and().create().withMode(CreateMode.PERSISTENT)).forPath(ZooKeeperStateHandleStore.getRootLockPath(path))).and().create().withMode(CreateMode.EPHEMERAL)).forPath(this.getInstanceLockPath(path))).and().commit();
    }

    @Override
    public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersion, T state) throws Exception {
        Preconditions.checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
        Preconditions.checkNotNull(state, "State");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        Preconditions.checkState(this.hasLock(path), "'{}' is only allowed to be replaced if the instance has a lock on this node.", path);
        RetrievableStateHandle<T> oldStateHandle = this.get(path, false);
        RetrievableStateHandle<T> newStateHandle = this.storage.store(state);
        byte[] serializedStateHandle = StateHandleStoreUtils.serializeOrDiscard(newStateHandle);
        boolean discardOldState = false;
        boolean discardNewState = true;
        try {
            this.setStateHandle(path, serializedStateHandle, expectedVersion.getValue());
            discardOldState = true;
            discardNewState = false;
        }
        catch (Exception e) {
            if (this.indicatesPossiblyInconsistentState(e)) {
                discardNewState = false;
                throw new PossibleInconsistentStateException(e);
            }
            throw ExceptionUtils.findThrowable((Throwable)e, KeeperException.NoNodeException.class).map(nnee -> new StateHandleStore.NotExistException("ZooKeeper node " + path + " does not exist.", (Throwable)nnee)).orElseThrow(() -> e);
        }
        finally {
            if (discardOldState) {
                oldStateHandle.discardState();
            }
            if (discardNewState) {
                newStateHandle.discardState();
            }
        }
    }

    @VisibleForTesting
    protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) throws Exception {
        ((BackgroundPathAndBytesable)((SetDataBuilder)this.client.setData().idempotent()).withVersion(expectedVersion)).forPath(path, serializedStateHandle);
    }

    private boolean indicatesPossiblyInconsistentState(Exception e) {
        return !SAFE_PRE_COMMIT_EXCEPTIONS.contains(e.getClass());
    }

    @Override
    public IntegerResourceVersion exists(String pathInZooKeeper) throws Exception {
        Preconditions.checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
        return this.getStat(pathInZooKeeper).filter(ZooKeeperStateHandleStore::isNotMarkedForDeletion).map(stat -> IntegerResourceVersion.valueOf(stat.getVersion())).orElse(IntegerResourceVersion.notExisting());
    }

    private static boolean isNotMarkedForDeletion(Stat stat) {
        return stat.getNumChildren() > 0;
    }

    private Optional<Stat> getStat(String path) throws Exception {
        String normalizedPath = ZooKeeperStateHandleStore.normalizePath(path);
        return Optional.ofNullable(this.client.checkExists().forPath(normalizedPath));
    }

    @Override
    public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception {
        return this.get(pathInZooKeeper, true);
    }

    @Override
    public Collection<String> getAllHandles() throws Exception {
        String path = "/";
        Stat stat;
        while ((stat = (Stat)this.client.checkExists().forPath("/")) != null) {
            try {
                return (Collection)this.client.getChildren().forPath("/");
            }
            catch (KeeperException.NoNodeException noNodeException) {
                continue;
            }
            break;
        }
        return Collections.emptyList();
    }

    @Override
    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
        return this.getAllAndLock(parentNodePath -> (List)this.client.getChildren().forPath(parentNodePath));
    }

    @VisibleForTesting
    List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock(FunctionWithException<String, List<String>, Exception> getNodeChildren) throws Exception {
        ArrayList<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<Tuple2<RetrievableStateHandle<T>, String>>();
        String rootPath = "/";
        boolean success = false;
        while (!success) {
            stateHandles.clear();
            Stat stat = (Stat)this.client.checkExists().forPath("/");
            if (stat == null) break;
            int initialCVersion = stat.getCversion();
            List<String> children = getNodeChildren.apply("/");
            for (String path : children) {
                path = "/" + path;
                try {
                    RetrievableStateHandle<T> stateHandle = this.getAndLock(path);
                    stateHandles.add(new Tuple2<RetrievableStateHandle<T>, String>(stateHandle, path));
                }
                catch (StateHandleStore.NotExistException stateHandle) {
                }
                catch (IOException ioException) {
                    LOG.warn("Could not get all ZooKeeper children. Node {} contained corrupted data. Ignoring this node.", (Object)path, (Object)ioException);
                }
            }
            int finalCVersion = ((Stat)this.client.checkExists().forPath("/")).getCversion();
            success = initialCVersion == finalCVersion;
        }
        return stateHandles;
    }

    @Override
    public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
        Preconditions.checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        RetrievableStateHandle<T> stateHandle = null;
        try {
            stateHandle = this.get(path, false);
        }
        catch (Exception e) {
            LOG.warn("Could not retrieve the state handle from node {}.", (Object)path, (Object)e);
        }
        this.release(pathInZooKeeper);
        try {
            this.deleteIfExists(ZooKeeperStateHandleStore.getRootLockPath(path));
        }
        catch (KeeperException.NotEmptyException ignored) {
            LOG.debug("Could not delete znode {} because it is still locked.", (Object)ZooKeeperStateHandleStore.getRootLockPath(path));
            return false;
        }
        if (stateHandle != null) {
            stateHandle.discardState();
        }
        this.deleteIfExists(path);
        return true;
    }

    @Override
    public void releaseAndTryRemoveAll() throws Exception {
        Collection<String> children = this.getAllHandles();
        Exception exception = null;
        for (String child : children) {
            try {
                this.releaseAndTryRemove('/' + child);
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
        }
        if (exception != null) {
            throw new Exception("Could not properly release and try removing all state nodes.", exception);
        }
    }

    @Override
    public void release(String pathInZooKeeper) throws Exception {
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        String lockPath = this.getInstanceLockPath(path);
        try {
            this.deleteIfExists(lockPath);
        }
        catch (Exception e) {
            throw new Exception("Could not release the lock: " + lockPath + '.', e);
        }
    }

    private void deleteIfExists(String path) throws Exception {
        String normalizedPath = ZooKeeperStateHandleStore.normalizePath(path);
        try {
            this.client.delete().forPath(normalizedPath);
        }
        catch (KeeperException.NoNodeException ignored) {
            LOG.debug("ZNode '{}' is already marked for deletion. Command is ignored.", (Object)normalizedPath);
        }
    }

    @Override
    public void releaseAll() throws Exception {
        Collection<String> children = this.getAllHandles();
        Exception exception = null;
        for (String child : children) {
            try {
                this.release(child);
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
        }
        if (exception != null) {
            throw new Exception("Could not properly release all state nodes.", exception);
        }
    }

    @Override
    public void clearEntries() throws Exception {
        String path = "/" + this.client.getNamespace();
        LOG.info("Removing {} from ZooKeeper", (Object)path);
        ZKPaths.deleteChildren((ZooKeeper)this.client.getZookeeperClient().getZooKeeper(), (String)path, (boolean)true);
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{namespace='" + this.client.getNamespace() + "'}";
    }

    private boolean hasLock(String rootPath) throws Exception {
        String normalizedRootPath = ZooKeeperStateHandleStore.normalizePath(rootPath);
        try {
            return this.client.checkExists().forPath(this.getInstanceLockPath(normalizedRootPath)) != null;
        }
        catch (KeeperException.NoNodeException e) {
            return false;
        }
    }

    @VisibleForTesting
    String getInstanceLockPath(String rootPath) {
        return ZooKeeperStateHandleStore.getRootLockPath(rootPath) + '/' + this.lockNode;
    }

    @VisibleForTesting
    static String getRootLockPath(String rootPath) {
        return rootPath + "/locks";
    }

    private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) throws Exception {
        Preconditions.checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
        String path = ZooKeeperStateHandleStore.normalizePath(pathInZooKeeper);
        if (lock) {
            try {
                ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(this.getInstanceLockPath(path));
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
            }
            catch (KeeperException.NoNodeException ex) {
                throw new StateHandleStore.NotExistException("ZooKeeper node " + path + " does not exist.", ex);
            }
        }
        boolean success = false;
        try {
            byte[] data = (byte[])this.client.getData().forPath(path);
            RetrievableStateHandle retrievableStateHandle = (RetrievableStateHandle)StateHandleStoreUtils.deserialize(data);
            success = true;
            RetrievableStateHandle retrievableStateHandle2 = retrievableStateHandle;
            return retrievableStateHandle2;
        }
        catch (KeeperException.NoNodeException ex) {
            throw new StateHandleStore.NotExistException("ZooKeeper node " + path + " does not exist.", ex);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + path + '.', e);
        }
        finally {
            if (!success && lock) {
                this.release(path);
            }
        }
    }

    private static String normalizePath(String path) {
        if (path.startsWith("/")) {
            return path;
        }
        return '/' + path;
    }
}

