package discord4j.gateway;

import discord4j.common.close.CloseException;
import discord4j.common.close.CloseHandlerAdapter;
import discord4j.common.close.CloseStatus;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/gateway/DiscordWebSocketHandler.class */
public class DiscordWebSocketHandler {
    private final FluxSink<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final MonoProcessor<CloseStatus> closeTrigger;
    private final MonoProcessor<Void> completionNotifier = MonoProcessor.create();
    private final ZlibDecompressor decompressor = new ZlibDecompressor();
    private final Logger log = shardLogger("");
    private final int shardIndex;
    private static final String HANDLER = "client.last.closeHandler";

    public DiscordWebSocketHandler(FluxSink<ByteBuf> fluxSink, Flux<ByteBuf> flux, MonoProcessor<CloseStatus> monoProcessor, int i) {
        this.inbound = fluxSink;
        this.outbound = flux;
        this.closeTrigger = monoProcessor;
        this.shardIndex = i;
    }

    public Mono<Void> handle(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        AtomicReference atomicReference = new AtomicReference();
        websocketInbound.withConnection(connection -> {
            connection.addHandlerLast(HANDLER, new CloseHandlerAdapter(atomicReference, this.log));
        });
        Mono<Void> log = this.completionNotifier.log(shardLogger(".zip"), Level.FINEST, false, new SignalType[0]);
        Mono<Void> log2 = websocketOutbound.options((v0) -> {
            v0.flushOnEach();
        }).sendObject((Publisher<?>) Flux.merge(this.closeTrigger.map(closeStatus -> {
            return new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason());
        }), this.outbound.map(TextWebSocketFrame::new))).then().doOnTerminate(() -> {
            shardLogger(".outbound").info("Sender completed on shard {}", Integer.valueOf(this.shardIndex));
            if (this.closeTrigger.isTerminated()) {
                return;
            }
            CloseStatus closeStatus2 = (CloseStatus) atomicReference.get();
            if (closeStatus2 == null) {
                error(new RuntimeException("Sender completed"));
            } else {
                shardLogger(".outbound").info("Forwarding close reason: {}", closeStatus2);
                error(new CloseException(closeStatus2));
            }
        }).log(shardLogger(".zip.out"), Level.FINEST, false, new SignalType[0]);
        Flux<V> map = websocketInbound.aggregateFrames().receiveFrames().map((v0) -> {
            return v0.content();
        });
        ZlibDecompressor zlibDecompressor = this.decompressor;
        zlibDecompressor.getClass();
        Flux compose = map.compose(zlibDecompressor::completeMessages);
        FluxSink<ByteBuf> fluxSink = this.inbound;
        fluxSink.getClass();
        return Mono.zip(log, log2, compose.doOnNext((v1) -> {
            r1.next(v1);
        }).doOnError(this::error).then(Mono.defer(() -> {
            shardLogger(".inbound").info("Receiver completed on shard {}", Integer.valueOf(this.shardIndex));
            CloseStatus closeStatus2 = (CloseStatus) atomicReference.get();
            if (closeStatus2 == null || this.closeTrigger.isTerminated()) {
                return Mono.empty();
            }
            shardLogger(".inbound").info("Forwarding close reason: {}", closeStatus2);
            return Mono.error(new CloseException(closeStatus2));
        })).log(shardLogger(".zip.in"), Level.FINEST, false, new SignalType[0])).doOnError(th -> {
            this.log.debug("WebSocket session threw an error: {}", th.toString());
        }).then();
    }

    public void close() {
        this.log.info("Triggering close sequence");
        this.closeTrigger.onNext(CloseStatus.NORMAL_CLOSE);
        this.completionNotifier.onComplete();
    }

    public void error(Throwable th) {
        this.log.warn("Triggering error sequence ({})", th.toString());
        if (this.completionNotifier.isTerminated()) {
            return;
        }
        if (th instanceof CloseException) {
            this.completionNotifier.onError(th);
        } else {
            this.completionNotifier.onError(new CloseException(new CloseStatus(1006, th.toString()), th));
        }
    }

    private Logger shardLogger(String str) {
        return Loggers.getLogger("discord4j.gateway" + str + "." + this.shardIndex);
    }
}
