package com.tterrag.chatmux.websocket;

import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.util.annotation.NonNull;

/* loaded from: input_file:com/tterrag/chatmux/websocket/SimpleWebSocketClient.class */
public class SimpleWebSocketClient<I, O> implements WebSocketClient<I, O> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleWebSocketClient.class);

    @NonNull
    private final EmitterProcessor<I> receiver = EmitterProcessor.create(false);

    @NonNull
    private final EmitterProcessor<O> sender = EmitterProcessor.create(false);

    @NonNull
    private final FluxSink<I> receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.LATEST);

    @NonNull
    private final FluxSink<O> senderSink = this.sender.sink(FluxSink.OverflowStrategy.LATEST);

    @Override // com.tterrag.chatmux.websocket.WebSocketClient
    public Mono<Void> connect(@NonNull String str, FrameParser<I, O> frameParser) {
        return Mono.defer(() -> {
            Flux<I> doOnComplete = frameParser.inbound().doOnError(th -> {
                log.debug("Inbound encountered an error", th);
            }).doOnCancel(() -> {
                log.debug("Inbound cancelled");
            }).doOnComplete(() -> {
                log.debug("Inbound completed");
            });
            FluxSink<I> fluxSink = this.receiverSink;
            fluxSink.getClass();
            Disposable subscribe = doOnComplete.subscribe(fluxSink::next);
            Disposable subscribe2 = this.receiver.log().doOnError((v0) -> {
                v0.printStackTrace();
            }).subscribe();
            Flux<O> log2 = this.sender.log();
            UnicastProcessor<O> outbound = frameParser.outbound();
            outbound.getClass();
            Consumer<? super O> consumer = outbound::onNext;
            Consumer<? super Throwable> consumer2 = th2 -> {
                th2.printStackTrace();
                frameParser.close();
            };
            frameParser.getClass();
            Disposable subscribe3 = log2.subscribe(consumer, consumer2, frameParser::close);
            HttpClient.WebsocketSender websocketSender = (HttpClient.WebsocketSender) HttpClient.create().observe((connection, state) -> {
                log.debug("{} {}", state, connection);
            }).wiretap(true).websocket().uri(str);
            frameParser.getClass();
            return websocketSender.handle(frameParser::handle).doOnError((v0) -> {
                v0.printStackTrace();
            }).doOnTerminate(() -> {
                log.debug("Terminating websocket client, disposing subscriptions");
                subscribe.dispose();
                subscribe2.dispose();
                subscribe3.dispose();
            }).then();
        });
    }

    @Override // com.tterrag.chatmux.websocket.WebSocketClient
    public Flux<I> inbound() {
        return this.receiver;
    }

    @Override // com.tterrag.chatmux.websocket.WebSocketClient
    public FluxSink<O> outbound() {
        return this.senderSink;
    }
}
