/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.vapi.internal.bindings;

import com.vmware.vapi.bindings.client.InvocationConfig;
import com.vmware.vapi.bindings.type.StructType;
import com.vmware.vapi.bindings.type.Type;
import com.vmware.vapi.core.AsyncHandle;
import com.vmware.vapi.core.Consumer;
import com.vmware.vapi.core.ExecutionContext;
import com.vmware.vapi.core.MethodIdentifier;
import com.vmware.vapi.core.MethodResult;
import com.vmware.vapi.data.DataValue;
import com.vmware.vapi.data.StructValue;
import com.vmware.vapi.diagnostics.LogDiagnosticUtil;
import com.vmware.vapi.diagnostics.Slf4jMDCLogConfigurator;
import com.vmware.vapi.internal.bindings.ResultTranslatingHandle;
import com.vmware.vapi.internal.bindings.RetryingHandle;
import com.vmware.vapi.internal.bindings.Stub;
import com.vmware.vapi.internal.core.abort.AbortHandle;
import com.vmware.vapi.internal.core.abort.AbortHandleImpl;
import com.vmware.vapi.internal.core.abort.AbortableAsyncHandle;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamPublisher<T>
implements Publisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamPublisher.class);
    private final AbortHandle abortHandle = new AbortHandleImpl();
    private final AtomicBoolean subscribedTo = new AtomicBoolean();
    private volatile Subscriber<? super T> subscriber;
    private volatile Consumer<AsyncHandle<MethodResult>> nextChunkConsumer;
    private final Stub stub;
    private final String serviceId;
    private final String operationId;
    private final ExecutionContext execCtx;
    private final Type outputType;
    private final Collection<Type> errorTypes;
    private StructValue inputValue;
    final AtomicBoolean finished = new AtomicBoolean();
    final AtomicLong demandCount = new AtomicLong();

    public StreamPublisher(Stub stub, MethodIdentifier methodId, StructValue inputValue, StructType inputType, Type outputType, Collection<Type> errorTypes, InvocationConfig invocationConfig) {
        this.stub = stub;
        this.serviceId = methodId.getInterfaceIdentifier().getName();
        this.operationId = methodId.getName();
        this.inputValue = inputValue;
        this.execCtx = stub.getExecutionContext(invocationConfig);
        this.outputType = outputType;
        this.errorTypes = errorTypes;
    }

    public void subscribe(Subscriber<? super T> s) {
        Objects.requireNonNull(s);
        if (!this.subscribedTo.compareAndSet(false, true)) {
            this.rejectSubscription(s);
            return;
        }
        this.subscriber = s;
        s.onSubscribe((Subscription)new AsyncSubscription());
    }

    private void rejectSubscription(Subscriber<? super T> s) {
        s.onSubscribe(new Subscription(){

            public void request(long n) {
            }

            public void cancel() {
            }
        });
        s.onError((Throwable)new IllegalStateException("This instance has already been subscribed to"));
    }

    public ResultTranslatingHandle<T> createHandle(final Subscriber s) {
        ResultTranslatingHandle ah = new ResultTranslatingHandle<T>(this.stub, this.outputType, this.errorTypes){

            @Override
            public void updateProgress(DataValue progress) {
            }

            @Override
            protected void postProcessResponse(MethodResult result) {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            void onSuccess(T result, Consumer<AsyncHandle<MethodResult>> next) {
                long demand;
                if (StreamPublisher.this.finished.get()) {
                    LOGGER.debug("Finished safeguard triggered.");
                    return;
                }
                StreamPublisher.this.nextChunkConsumer = next;
                try {
                    if (result != null) {
                        LOGGER.trace("Executing subscriber#onNext with result - {}", result);
                        s.onNext(result);
                    }
                }
                finally {
                    demand = StreamPublisher.this.demandCount.decrementAndGet();
                }
                if (next == null) {
                    StreamPublisher.this.finished.set(true);
                    LOGGER.trace("Streaming complete - next handle not received.");
                    s.onComplete();
                    return;
                }
                LOGGER.trace("Demand is {} .", (Object)demand);
                if (demand > 0L) {
                    LOGGER.trace("Resume requesting.");
                    next.accept(StreamPublisher.this.createHandle(s));
                }
            }

            @Override
            void onFailure(RuntimeException error) {
                StreamPublisher.this.abort(error);
            }
        };
        return ah;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invoke(final ResultTranslatingHandle<T> handle, ExecutionContext executionContext, final int invocationAttempt) {
        RetryingHandle ah = handle;
        if (this.stub.isRetryingConfigured()) {
            LOGGER.trace("Creating retrying handle.");
            ah = new RetryingHandle<T>(this.stub, this.outputType, this.errorTypes, handle, this.serviceId, this.operationId, executionContext, this.inputValue, invocationAttempt){

                @Override
                void onFailure(RuntimeException error) {
                    LOGGER.debug("Error during streaming occurred.", (Throwable)error);
                    if (StreamPublisher.this.finished.get()) {
                        LOGGER.debug("Streaming has finished prior receiving the error.", (Throwable)error);
                        return;
                    }
                    super.onFailure(error);
                }

                @Override
                void onRetry(ExecutionContext retryContext) {
                    LOGGER.debug("Retrying invocation of the streaming request {} {}.", (Object)StreamPublisher.this.serviceId, (Object)StreamPublisher.this.operationId);
                    StreamPublisher.this.invoke(handle, retryContext, invocationAttempt + 1);
                }
            };
        }
        ah = new AbortableAsyncHandle<MethodResult>(ah, this.abortHandle);
        Slf4jMDCLogConfigurator logConfig = new Slf4jMDCLogConfigurator();
        try {
            logConfig.configureContext(LogDiagnosticUtil.getDiagnosticContext(executionContext));
            LOGGER.trace("Starting streaming invocation request {} {}.", (Object)this.serviceId, (Object)this.operationId);
            this.stub.apiProvider.invoke(this.serviceId, this.operationId, this.inputValue, executionContext, ah);
            this.inputValue = null;
        }
        finally {
            logConfig.cleanUpContext(LogDiagnosticUtil.getDiagnosticKeys());
        }
    }

    private void abort(Throwable t) {
        LOGGER.debug("Stream processing failed.", t);
        if (!this.finished.compareAndSet(false, true)) {
            return;
        }
        this.abortHandle.abort();
        if (t != null) {
            try {
                this.subscriber.onError(t);
            }
            catch (RuntimeException e) {
                String message = String.format("Exception while invoking %s.onError for %s.%s", this.subscriber, this.serviceId, this.operationId);
                LOGGER.warn(message, (Throwable)e);
            }
        }
        this.subscriber = null;
    }

    private class AsyncSubscription
    implements Subscription {
        private AtomicBoolean isStreamingInitiated = new AtomicBoolean();

        private AsyncSubscription() {
        }

        public void request(long n) {
            LOGGER.debug("Requested demand - {} .", (Object)n);
            if (StreamPublisher.this.finished.get()) {
                LOGGER.debug("Streaming has finished, no more requests are processed.");
                return;
            }
            if (n <= 0L) {
                LOGGER.debug("Invalid request count received - {}.", (Object)n);
                StreamPublisher.this.abort(new IllegalArgumentException("non-positive subscription request signals are illegal"));
                return;
            }
            long newDemand = this.guardedAddAndGet(n);
            if (newDemand == n) {
                this.initiateStreaming();
            }
        }

        private void initiateStreaming() {
            LOGGER.debug("Stream initiation requested.");
            if (this.isStreamingInitiated.compareAndSet(false, true)) {
                LOGGER.trace("Publisher is initiating communication.");
                StreamPublisher.this.invoke(StreamPublisher.this.createHandle(StreamPublisher.this.subscriber), StreamPublisher.this.execCtx, 0);
            } else {
                Consumer next = StreamPublisher.this.nextChunkConsumer;
                if (next != null) {
                    LOGGER.trace(String.format("Publisher continues suspended communication via consumer - %h.", next));
                    next.accept(StreamPublisher.this.createHandle(StreamPublisher.this.subscriber));
                }
            }
        }

        private long guardedAddAndGet(long n) {
            long next;
            long prev;
            do {
                if ((next = (prev = StreamPublisher.this.demandCount.get()) + n) >= 0L) continue;
                next = Long.MAX_VALUE;
            } while (!StreamPublisher.this.demandCount.compareAndSet(prev, next));
            return next;
        }

        public void cancel() {
            StreamPublisher.this.abort(null);
        }
    }
}

