package com.tterrag.chatmux.websocket;

import com.tterrag.chatmux.api.websocket.IFrameParser;
import com.tterrag.chatmux.api.websocket.WebSocketClient;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.util.annotation.NonNull;

/* loaded from: input_file:com/tterrag/chatmux/websocket/SimpleWebSocketClient.class */
public class SimpleWebSocketClient<I, O> implements WebSocketClient<I, O> {
    private static final Logger log = LoggerFactory.getLogger(SimpleWebSocketClient.class);

    @NonNull
    private final EmitterProcessor<I> receiver = EmitterProcessor.create(false);

    @NonNull
    private final EmitterProcessor<O> sender = EmitterProcessor.create(false);

    @NonNull
    private final FluxSink<I> receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.LATEST);

    @NonNull
    private final FluxSink<O> senderSink = this.sender.sink(FluxSink.OverflowStrategy.LATEST);

    public Mono<Void> connect(String str, IFrameParser<I, O> iFrameParser) {
        return Mono.defer(() -> {
            Flux doOnComplete = iFrameParser.inbound().doOnError(th -> {
                log.debug("Inbound encountered an error", th);
            }).doOnCancel(() -> {
                log.debug("Inbound cancelled");
            }).doOnComplete(() -> {
                log.debug("Inbound completed");
            });
            FluxSink<I> fluxSink = this.receiverSink;
            fluxSink.getClass();
            Publisher doOnNext = doOnComplete.doOnNext(fluxSink::next);
            Publisher doOnError = this.receiver.log(log.getName()).doOnError(th2 -> {
                log.error("Exception receiving websocket data", th2);
            });
            Flux log2 = this.sender.log(log.getName());
            UnicastProcessor outbound = iFrameParser.outbound();
            outbound.getClass();
            Flux doOnError2 = log2.doOnNext(outbound::onNext).doOnError(th3 -> {
                iFrameParser.close();
            });
            iFrameParser.getClass();
            Publisher doOnComplete2 = doOnError2.doOnComplete(iFrameParser::close);
            HttpClient.WebsocketSender uri = HttpClient.create().observe((connection, state) -> {
                log.debug("{} {}", state, connection);
            }).wiretap(true).websocket().uri(str);
            iFrameParser.getClass();
            return Mono.when(new Publisher[]{doOnNext, doOnError, doOnComplete2, uri.handle(iFrameParser::handle).doOnError(th4 -> {
                log.error("Exception handling websocket data", th4);
            }).doOnTerminate(() -> {
                log.debug("Terminating websocket client, disposing subscriptions");
            }).then()});
        });
    }

    public Flux<I> inbound() {
        return this.receiver;
    }

    public FluxSink<O> outbound() {
        return this.senderSink;
    }
}
