package com.tterrag.chatmux.twitch;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.tterrag.chatmux.api.bridge.ChatMessage;
import com.tterrag.chatmux.api.bridge.ChatSource;
import com.tterrag.chatmux.twitch.irc.IRCEvent;
import com.tterrag.chatmux.websocket.FrameParser;
import com.tterrag.chatmux.websocket.SimpleWebSocketClient;
import com.tterrag.chatmux.websocket.WebSocketClient;
import java.util.Locale;
import java.util.Set;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.NonNull;

/* loaded from: input_file:com/tterrag/chatmux/twitch/TwitchSource.class */
public class TwitchSource implements ChatSource<TwitchMessage> {
    private static final Logger log = LoggerFactory.getLogger(TwitchSource.class);
    private final TwitchRequestHelper helper;
    private boolean connected;

    @NonNull
    private final WebSocketClient<IRCEvent, String> send = new SimpleWebSocketClient();

    @NonNull
    private final WebSocketClient<IRCEvent, String> receive = new SimpleWebSocketClient();
    private final Set<String> sentMessages = Sets.newConcurrentHashSet();

    /* renamed from: getType, reason: merged with bridge method [inline-methods] */
    public TwitchService m3getType() {
        return TwitchService.getInstance();
    }

    public Flux<TwitchMessage> connect(String str) {
        if (!this.connected) {
            this.send.connect("wss://irc-ws.chat.twitch.tv:443", new FrameParser(IRCEvent::parse, Function.identity())).subscribe(r1 -> {
            }, th -> {
                log.error("Twitch websocket completed with error", th);
            }, () -> {
                log.error("Twitch websocket completed");
            });
            this.send.outbound().next("PASS oauth:" + TwitchService.getInstance().getData().getTokenSend()).next("NICK " + TwitchService.getInstance().getData().getNickSend()).next("CAP REQ :twitch.tv/tags").next("CAP REQ :twitch.tv/commands");
            this.receive.connect("wss://irc-ws.chat.twitch.tv:443", new FrameParser(IRCEvent::parse, Function.identity())).subscribe(r12 -> {
            }, th2 -> {
                log.error("Twitch websocket completed with error", th2);
            }, () -> {
                log.error("Twitch websocket completed");
            });
            this.receive.outbound().next("PASS oauth:" + TwitchService.getInstance().getData().getTokenReceive()).next("NICK " + TwitchService.getInstance().getData().getNickReceive()).next("CAP REQ :twitch.tv/tags").next("CAP REQ :twitch.tv/commands");
            this.connected = true;
        }
        String lowerCase = str.toLowerCase(Locale.ROOT);
        this.send.outbound().next("JOIN #" + lowerCase);
        this.receive.outbound().next("JOIN #" + lowerCase);
        return Flux.merge(new Publisher[]{this.send.inbound().ofType(IRCEvent.Ping.class).doOnNext(ping -> {
            this.send.outbound().next("PONG :tmi.twitch.tv");
        }), this.receive.inbound().ofType(IRCEvent.Ping.class).doOnNext(ping2 -> {
            this.receive.outbound().next("PONG :tmi.twitch.tv");
        }), this.receive.inbound().ofType(IRCEvent.Message.class).filter(message -> {
            return message.getChannel().equals(lowerCase);
        }).filter(message2 -> {
            return !this.sentMessages.remove(message2.getContent());
        }).flatMap(message3 -> {
            return this.helper.getUser(message3.getUser()).zipWith(this.helper.getUser(message3.getChannel()), (userResponse, userResponse2) -> {
                return new TwitchMessage(this.receive, message3, userResponse2.displayName, userResponse.displayName, userResponse.avatarUrl);
            });
        })}).ofType(TwitchMessage.class);
    }

    public Mono<TwitchMessage> send(String str, ChatMessage<?> chatMessage, boolean z) {
        String content = z ? chatMessage.getContent() : chatMessage.toString();
        String nickSend = TwitchService.getInstance().getData().getNickSend();
        return Mono.just(this.send.outbound()).doOnNext(fluxSink -> {
            this.sentMessages.add(content);
        }).doOnNext(fluxSink2 -> {
            fluxSink2.next("PRIVMSG #" + str.toLowerCase(Locale.ROOT) + " :" + content);
        }).thenReturn(new TwitchMessage(this.send, new IRCEvent.Message(ImmutableMap.of(), nickSend, str, content), str, nickSend, null));
    }

    public void disconnect(String str) {
        this.send.outbound().next("PART #" + str);
        this.receive.outbound().next("PART #" + str);
    }

    public TwitchSource(TwitchRequestHelper twitchRequestHelper) {
        this.helper = twitchRequestHelper;
    }
}
