package com.tterrag.chatmux.websocket;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.annotation.NonNull;

/* loaded from: input_file:com/tterrag/chatmux/websocket/FrameParser.class */
public class FrameParser<I, O> implements ConnectionObserver {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FrameParser.class);
    private final Function<String, I> deserializer;
    private final Function<O, String> serializer;

    @NonNull
    private final UnicastProcessor<I> inboundExchange;

    @NonNull
    private final UnicastProcessor<O> outboundExchange;
    private final MonoProcessor<Void> completionNotifier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tterrag/chatmux/websocket/FrameParser$CloseHandlerAdapter.class */
    public static class CloseHandlerAdapter extends ChannelInboundHandlerAdapter {
        private final AtomicReference<CloseStatus> closeStatus;

        private CloseHandlerAdapter(AtomicReference<CloseStatus> atomicReference) {
            this.closeStatus = atomicReference;
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            if ((obj instanceof CloseWebSocketFrame) && ((CloseWebSocketFrame) obj).isFinalFragment()) {
                CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) obj;
                FrameParser.log.debug("Close status: {} {}", Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText());
                this.closeStatus.set(new CloseStatus(closeWebSocketFrame.statusCode(), closeWebSocketFrame.reasonText()));
            }
            channelHandlerContext.fireChannelRead(obj);
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
            if (obj instanceof SslCloseCompletionEvent) {
                SslCloseCompletionEvent sslCloseCompletionEvent = (SslCloseCompletionEvent) obj;
                if (!sslCloseCompletionEvent.isSuccess()) {
                    FrameParser.log.debug("Abnormal close status: {}", sslCloseCompletionEvent.cause().toString());
                    this.closeStatus.set(new CloseStatus(1006, sslCloseCompletionEvent.cause().toString()));
                }
            }
            channelHandlerContext.fireUserEventTriggered(obj);
        }
    }

    public FrameParser(ObjectMapper objectMapper, Class<? extends I> cls) {
        this(str -> {
            try {
                return objectMapper.readValue(str, cls);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, objectMapper);
    }

    public FrameParser(Function<String, I> function, ObjectMapper objectMapper) {
        this(function, obj -> {
            try {
                return objectMapper.writeValueAsString(obj);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override // reactor.netty.ConnectionObserver
    public void onStateChange(@NonNull Connection connection, @NonNull ConnectionObserver.State state) {
        log.debug("{} {}", state, connection);
    }

    public Mono<Void> handle(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        AtomicReference atomicReference = new AtomicReference();
        websocketInbound.withConnection(connection -> {
            connection.addHandlerLast("client.last.closeHandler", new CloseHandlerAdapter(atomicReference));
        });
        NettyOutbound options = websocketOutbound.options((v0) -> {
            v0.flushOnEach();
        });
        Flux<O> log2 = this.outboundExchange.log();
        Function<O, String> function = this.serializer;
        function.getClass();
        options.sendObject((Publisher<?>) log2.map(function::apply).map(TextWebSocketFrame::new)).then().log().doOnError(th -> {
            log.debug("Sender encountered an error", th);
        }).doOnSuccess(r3 -> {
            log.debug("Sender succeeded");
        }).doOnCancel(() -> {
            log.debug("Sender cancelled");
        }).doOnTerminate(() -> {
            log.debug("Sender terminated");
        }).subscribe();
        Flux map = websocketInbound.receiveFrames().map((v0) -> {
            return v0.content();
        }).map(byteBuf -> {
            byte[] bArr = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bArr);
            return new String(bArr);
        });
        Function<String, I> function2 = this.deserializer;
        function2.getClass();
        Flux map2 = map.map((v1) -> {
            return r1.apply(v1);
        });
        UnicastProcessor<I> unicastProcessor = this.inboundExchange;
        unicastProcessor.getClass();
        return map2.doOnNext(unicastProcessor::onNext).doOnError((v0) -> {
            v0.printStackTrace();
        }).doOnComplete(() -> {
            log.debug("Receiver completed");
            CloseStatus closeStatus = (CloseStatus) atomicReference.get();
            if (closeStatus != null) {
                log.debug("Forwarding close reason: {}", closeStatus);
                log.debug("Triggering error sequence ({})", new CloseException(closeStatus).toString());
                this.outboundExchange.onNext(null);
                log.debug("Preparing to complete outbound exchange after error");
                this.outboundExchange.onComplete();
            }
        }).then().log();
    }

    public void error(Throwable th) {
        log.debug("Triggering error sequence ({})", th.toString());
        if (!this.completionNotifier.isTerminated()) {
            if (th instanceof CloseException) {
                log.debug("Signaling completion notifier as error with same CloseException");
                this.completionNotifier.onError(th);
            } else {
                log.debug("Signaling completion notifier as error with wrapping CloseException");
                this.completionNotifier.onError(new CloseException(new CloseStatus(1006, th.toString()), th));
            }
        }
        this.outboundExchange.onNext(null);
        log.debug("Preparing to complete outbound exchange after error");
        this.outboundExchange.onComplete();
        log.debug("Preparing to complete inbound exchange after error");
        this.inboundExchange.onComplete();
    }

    public void close() {
        log.debug("Triggering close sequence - signaling completion notifier");
        this.completionNotifier.onComplete();
        log.debug("Preparing to complete outbound exchange after close");
        this.outboundExchange.onComplete();
        log.debug("Preparing to complete inbound exchange after close");
        this.inboundExchange.onComplete();
    }

    public UnicastProcessor<I> inbound() {
        return this.inboundExchange;
    }

    public UnicastProcessor<O> outbound() {
        return this.outboundExchange;
    }

    public FrameParser(Function<String, I> function, Function<O, String> function2) {
        this.inboundExchange = UnicastProcessor.create();
        this.outboundExchange = UnicastProcessor.create();
        this.completionNotifier = MonoProcessor.create();
        this.deserializer = function;
        this.serializer = function2;
    }
}
