package org.springframework.integration.util;

import java.time.Duration;
import org.reactivestreams.Publisher;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:datasets/datasets-service.jar:BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar:org/springframework/integration/util/IntegrationReactiveUtils.class */
public final class IntegrationReactiveUtils {
    public static final String DELAY_WHEN_EMPTY_KEY = "DELAY_WHEN_EMPTY_KEY";
    public static final Duration DEFAULT_DELAY_WHEN_EMPTY = Duration.ofSeconds(1);

    private IntegrationReactiveUtils() {
    }

    public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                monoSink.success(messageSource.receive());
            });
        }).doOnSuccess(message -> {
            AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
        }).doOnError(MessagingException.class, messagingException -> {
            Message<?> failedMessage = messagingException.getFailedMessage();
            if (failedMessage != null) {
                AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
            }
        }).subscribeOn(Schedulers.boundedElastic()).repeatWhenEmpty(flux -> {
            return flux.flatMap(l -> {
                return Mono.subscriberContext().flatMap(context -> {
                    return Mono.delay((Duration) context.getOrDefault(DELAY_WHEN_EMPTY_KEY, DEFAULT_DELAY_WHEN_EMPTY));
                });
            });
        }).repeat().retry();
    }

    public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
        if (messageChannel instanceof Publisher) {
            return Flux.from((Publisher) messageChannel);
        }
        if (messageChannel instanceof SubscribableChannel) {
            return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
        }
        if (messageChannel instanceof PollableChannel) {
            return messageSourceToFlux(() -> {
                return ((PollableChannel) messageChannel).receive(0L);
            });
        }
        throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, SubscribableChannel or PollableChannel, not: " + messageChannel);
    }

    private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel subscribableChannel) {
        return Flux.defer(() -> {
            EmitterProcessor create = EmitterProcessor.create(1);
            MessageHandler messageHandler = message -> {
                create.onNext(message);
            };
            subscribableChannel.subscribe(messageHandler);
            return create.doOnCancel(() -> {
                subscribableChannel.unsubscribe(messageHandler);
            });
        });
    }
}
