¡Acceso ilimitado 24/7 a todos nuestros libros y vídeos! Descubra la Biblioteca Online ENI. Pulse aquí
¡Acceso ilimitado 24/7 a todos nuestros libros y vídeos! Descubra la Biblioteca Online ENI. Pulse aquí
  1. Libros
  2. Java Spring
  3. Reactor Core
Extrait - Java Spring Construya aplicaciones reactivas con una arquitectura de microservicios en un entorno Jakarta EE
Extractos del libro
Java Spring Construya aplicaciones reactivas con una arquitectura de microservicios en un entorno Jakarta EE Volver a la página de compra del libro

Reactor Core

Introducción

El principal caso de utilización de Reactor Core es la creación de una aplicación web, intranet o extranet, con Spring WebFlux, el equivalente de Spring MVC en modo reactivo.

Sin embargo, en algunos casos puede ser útil utilizar directamente Reactor Core para:

  • programar un servidor como lo haríamos con Vert.x

  • programar varios servidores web en la misma aplicación

  • programar servidores que no sirvan páginas HTML a través de HTTP

  • disponer de servidores simulados para las pruebas de integración

En este capítulo veremos aspectos más teóricos, ya que la librería Spring Reactor Core rara vez se utiliza por sí sola, por varias razones:

  • Complejidad: Spring Reactor Core es una biblioteca potente pero compleja que implementa el paradigma de la programación reactiva. El uso de Reactor Core por sí solo puede suponer una curva de aprendizaje pronunciada, ya que requiere un conocimiento profundo de los conceptos de flujo, mono y el operador de composición funcional.

  • Falta de facilidad de integración: Spring Reactor Core proporciona principalmente una base para la programación reactiva y no proporciona directamente funciones específicas de la aplicación, como la gestión de solicitudes HTTP, la gestión de dependencias, la inyección de dependencias, etc. Para beneficiarse de una aplicación web reactiva...

Reactor Core

La programación reactiva es un concepto especial. Implica la propagación de eventos o señales, como bolas de pinball que interactúan entre sí. Utilizando un estilo de programación funcional, definimos posibles trayectorias para las bolas y luego las lanzamos en paralelo para que se muevan simultáneamente. Cada bola cambia su comportamiento cuando interactúa o choca con otros elementos, o cuando alcanza un punto específico en el tiempo. El motor gestiona las interacciones llamando al código adecuado para cada evento. Los pone en cola en un bucle de eventos y los ejecuta secuencialmente, dando la impresión de ejecución paralela desde el exterior.

Los bloques básicos de Reactive Streams son:

  • Subscriber: el suscriptor (subscriber) es el objeto que recibe los elementos enviados por el editor (publisher). Consume los elementos y los procesa según la lógica definida en sus métodos. Cuando un Publisher envía un nuevo elemento, éste es enviado al Subscriber a través del método onNext(). El Subscriber también puede gestionar errores mediante el método onError() e indicar el final del flujo de datos mediante el método onComplete().

  • Subscription: la suscripción (subscription) entre el Publisher y el Subscriber. Cuando un Subscriber se abona a un Publisher, el Publisher envia una Subscriptiona un Subscriber . Esta Subscription permite al Subscriber solicitar un determinado número de elementos al Publisher mediante el método request(long n). Este método especifica cuántos elementos el Subscriber desea recibir.

  • Processor: el procesador es una interfaz que combina las funciones de Publisher y de Subscriber. Puede considerarse como un elemento intermediario en el flujo de datos, en el que los elementos son enviados por un Publisher, procesados por el Processory luego consumidos por un Subscriber. Permite crear operaciones de transformación o de filtrado de datos entre el Publisher y el Subscriber.

Estos tres bloques -Publisher, Subscriber y Subscription- constituyen la base de Reactive Streams y permiten crear flujos de datos reactivos asíncronos y no bloqueantes. La interfaz Processor, por su parte, permite realizar operaciones más complejas y gestionar la transformación de datos entre las distintas...

Método Subscribe()

El método subscribe() es uno de los métodos fundamentales en la programación reactiva con Reactor. Se utiliza para desencadenar la ejecución del flujo reactivo suscribiéndose a este flujo. Cuando se crea un flujo, este permanecerá inactivo hasta que se realice una suscripción. La suscripción desencadena la emisión de los elementos del flujo y la ejecución de los tratamientos definidos en las diferentes etapas del flujo.

1. Sobrecarga

El método subscribe() tiene diferentes sobrecargas que permiten especificar cómo debe consumirse el flujo. Esto implica definir cómo procesar los elementos emitidos por el flujo, cómo manejar cualquier error y cómo reaccionar cuando el flujo finaliza.

  • subscribe() no toma ningún argumento y no define ningún procesamiento particular para los eventos emitidos por el flujo. Esto significa que el flujo simplemente se ejecutará, pero los eventos emitidos por el flujo no se procesarán.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); 
flux.subscribe(); 
  • subscribe(Consumer<? super T> consumer) toma como argumento un consumidor (Consumer) que define el tratamiento a aplicar a cada elemento enviado por el flujo. El consumidor recibe los elementos del flujo uno a uno y puede realizar acciones específicas sobre cada elemento.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); 
flux.subscribe(item -> System.out.println("Elemento: " + item)); 
  • subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) permite especificar tanto un consumidor para procesar elementos del flujo como un consumidor para procesar cualquier error emitido por el flujo.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); 
flux.subscribe( 
     item -> System.out.println("Elemento: " + item), 
     error -> System.err.println("Error: " + error.getMessage()) 
 ); 
  • subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer) permite especificar tanto un consumidor para procesar los elementos del flujo, un consumidor para procesar los errores y un consumidor para procesar la notificación de finalización del flujo.

Utilizando las diferentes formas de subscribe(), podemos...

Utilización de BaseSubscriber

En programación reactiva, generalmente utilizamos la programación funcional con los lambdas para definir nuestros tratamientos. Sin embargo, en ciertas situaciones, puede ser útil crear un Subscriber personalizado para gestionar los eventos de un flujo reactivo de una manera específica. Aquí es donde entra en juego la clase de utilidad BaseSubscriber.

BaseSubscriber es una clase de utilidad proporcionada por Reactor que simplifica la creación de suscriptores personalizados para flujos reactivos. Proporciona una implementación básica, así como métodos para manejar eventos clave, facilitando la personalización del comportamiento del suscriptor para adaptarlo a las necesidades específicas de la aplicación. Para crear un Subscriber personalizado, simplemente extendemos la clase BaseSubscriber y sustituimos los métodos apropiados según nuestras necesidades. Por ejemplo, implementando el método hookOnNext(), podemos especificar la acción a realizar cuando un nuevo elemento es enviado por el flujo. Del mismo modo, implementando el método hookOnError(), podemos manejar los errores que se producen durante el procesamiento del flujo.

Es importante señalar que las instancias de BaseSubscriber (o sus subclases) son de un solo uso. Esto significa que un BaseSubscriber cancelará automáticamente su suscripción al primer Publisher si se suscribe a un segundo Publisher. Esta gestión automática garantiza que el suscriptor se utilice adecuadamente en el contexto reactivo.

La BaseSubscriber de Reactor ofrece varios hooks que se pueden sustituir para personalizar el comportamiento del Subscriber en función...

Generar una secuencia manualmente

Las secuencias pueden generarse de forma sincrónica o asincrónica.

1. Modelo sincrónico

Los datos asíncronos de un flujo suelen proceder generalmente de fuentes externas, tales como conexiones de red o señales activadas por planificadores. Sin embargo, puede ser útil generar una secuencia de datos nosotros mismos. Para ello, Reactor ofrece el método generate(), que puede usarse para crear un flujo infinito de elementos usando una lógica de generación personalizada. El método generate()toma dos parámetros: un proveedor de estado inicial (stateSupplier) con una función generadora que toma el estado actual y un SynchronousSink como parámetros. El proveedor de estado inicial se utiliza para inicializar el estado de generación, y la función generadora se llama en cada iteración para generar un nuevo elemento que se enviará en el flujo. 

Por ejemplo, para generar una secuencia infinita de enteros, podemos utilizar generate() con un estado inicial de cero y una función generadora que emita el estado actual y actualice el estado añadiendo 1 en cada iteración.

Flux<Integer> infiniteSequence = Flux.generate( 
    () -> 0, // Estado inicial 
    (state, sink) -> { 
        sink.next(state); // Enviar el elemento en el flujo 
        return state + 1; // Actualizar el estado 
    } 
); 

El método generate() de Reactor permite generar flujos de datos personalizados de manera flexible. Podemos usarlo para generar secuencias numéricas, datos aleatorios o incluso recuperar datos de fuentes externas.

Para recuperar los tres primeros valores de infiniteSequence, es posible utilizar take(3):

    infiniteSequence 
      .take(3) // Toma los 3 primeros elementos 
      .subscribe( 
        item -> System.out.println("Received item: " + item), 
        error -> System.err.println("Error occurred: " + error), 
        () -> System.out.println("Flux...

Método handle()

El método handle() de la clase Flux se utiliza para gestionar de forma personalizada los elementos emitidos por el flujo. Puede utilizarse para transformar los elementos del flujo o para realizar un tratamiento específico sobre cada elemento. La firma del método handle() es la siguiente:

Flux<T> handle(BiConsumer<T, SynchronousSink<R><> handler) 

El parámetro handler es un BiConsumer que toma dos argumentos: el elemento emitido por el flujo (T) y un SynchronousSink<R> utilizado para emitir los elementos resultantes del procesamiento (R). El SynchronousSink es un objeto utilizado para generar de manera síncrona elementos en un flujo. Se utiliza para enviar los elementos resultantes del procesamiento desde el handler. En el handler se pueden realizar diferentes acciones sobre el elemento recibido, como transformarlo en un nuevo elemento (R) para enviarlo en el flujo, ignorarlo, filtrarlo o realizar cualquier otra lógica de procesamiento necesaria.

He aquí un ejemplo:

Flux<Integer> originalFlux = Flux.range(1, 10); 
 
Flux<String> processedFlux = originalFlux.handle((number, sink) -> { 
    // Comprueba si el número es par 
    if (number % 2 == 0) { 
        // Envía el número en forma de cadena de caractères 
        sink.next("Even:...

Scheduler

Spring Reactor ofrece a los desarrolladores varias formas de gestionar la programación de tareas, mediante el uso de planificadores (schedulers),. Éstos controlan la ejecución de tareas de forma asíncrona y eficiente. Se utilizan junto con los métodos publishOn() y subscribeOn()para controlar el contexto de ejecución en el que se realizan las operaciones de flujo. Estos métodos permiten especificar los schedulers que se utilizarán para diferentes partes del flujo, lo que puede tener un impacto significativo en la concurrencia y el rendimiento de la aplicación.

El método publishOn() se utiliza para especificar el scheduler en el cual se ejecutaran las siguientes operaciones. De esta manera, todas las operaciones que vienen después de la llamada a publishOn() se realizarán en el contexto del planificador especificado. Esto puede ser útil para delegar el procesamiento asíncrono a otro thread, en particular para las operaciones que consumen mucho tiempo o son bloqueantes.

Por ejemplo:

    Flux.range(1, 10)   
      .publishOn(Schedulers.boundedElastic()) 
// Utiliza el scheduler boundedElastic() para el tratamiento asíncrono 
      .map(value -> { 
        // Código de procesamiento asíncrono 
        System.out.println("Procesamiento...

Tratar correctamente los casos de errores

Con Spring Reactor Core, puede manejar los errores de diferentes maneras. Aquí hay algunos enfoques comunes para el manejo de errores en un flujo.

1. Operador onErrorResume

El operador onErrorResume() se utiliza para capturar un error y sustituir el flujo que lo generó por otro flujo. Se utiliza del siguiente modo:

Flux<Integer> flux = Flux.just(1, 2, 3) 
    .map(i -> { 
        if (i == 2) { 
            throw new RuntimeException("¡Error!"); 
        } 
        return i; 
    }) 
    .onErrorResume(e -> Flux.just(10, 20, 30)); 
 
flux.subscribe( 
    data -> System.out.println("Data: " + data), 
    error -> System.err.println(""Error: " + error), 
    () -> System.out.println("Terminado") 
); 

2. Operador onErrorReturn

El operador onErrorReturn() se utiliza para capturar un error y devolver un valor de sustitución especificado. Se utiliza del siguiente modo:

Flux<Integer> flux = Flux.just(1, 2, 3) 
    .map(i -> {   ...

Captura y reenvío de excepciones

1. Gestión de excepciones

Cuando se utiliza Reactor, el manejo de excepciones en operadores o funciones es el siguiente:

  • Todos los operadores pueden potencialmente desencadenar excepciones o llamar a funciones de retorno que pueden fallar, por lo tanto, todos ellos contienen algún tipo de gestión de errores.

  • Las excepciones no verificadas siempre se propagan a través de la señal onError. Por ejemplo, una RuntimeException lanzada dentro de una función map() se traduce por un evento onError.

  • Reactor define ciertas excepciones, como OutOfMemoryError, como siendo siempre consideradas fatales y son lanzadas en lugar de propagadas.

  • En algunos casos, una excepción no comprobada no puede propagarse debido a situaciones de concurrencia durante las fases de suscripción y solicitud, y en estos casos el error es abandonado.

Para manejar excepciones verificadas, usted puede usar try y catch, o puede encapsular la excepción comprobada dentro de una excepción no verificada utilizando Exceptions.propagate().

  • Si necesita devolver un fluxy manejar una excepción verificada, puede utilizar Flux.error(ExcepciónVerificada)para crear un flux que produzca un error.

  • La clase de utilidad Exceptions de Reactor puede ayudarle a encapsular y obtener la excepción original si es necesario.

Ejemplo de utilización de Exceptions.propagate() para tratar una IOException en una función map():

public String convert(int i) throws IOException { 
    if (i > 3) { 
        throw new IOException("boom " + i); 
    } 
    return "OK " + i; 
} 
 
Flux<String> converted = Flux 
    .range(1, 10) 
    .map(i -> { 
        try { return convert(i); } 
        catch (IOException e) { throw Exceptions.propagate(e); } 
    }); 
    
converted.subscribe( 
    v -> System.out.println("VALOR RECIBIDO: " + v), 
    e -> { 
        if (Exceptions.unwrap(e)...

Sinks

En Reactor, un sink ofrece la posibilidad de activar manualmente las señales de manera independiente. Crea así una estructura similar a la de un editor de publicación (Publisher) capaz de gestionar varias suscripciones (Subscriber), con la excepción de las variantes unicast (comunicación uno a uno).

Los sinks básicos expuestos por Reactor Core aseguran que el uso de multithread es detectado y no puede conducir a violaciones de la especificación o comportamiento indefinido desde el punto de vista de los suscriptores en aval. Cuando se utiliza la API tryEmit*, las llamadas paralelas fallan rápidamente. Cuando se utiliza la API emit*, el EmissionFailureHandler suministrado se puede utilizar para reintentar en caso de contención (por ejemplo, bucle ocupado), de lo contrario el sink terminará con un error. Esto supone una mejora con respecto a Processor.onNext(), que debe sincronizarse externamente o puede dar lugar a un comportamiento indefinido desde el punto de vista de los suscriptores en aval.

En general, se prefiere la clase Sinks a los objetos Processor, ya que ofrece un enfoque más seguro y sencillo de la gestión de señales. Los constructores Sinks proporcionan una API guiada para los principales tipos de productores soportados. Algunos comportamientos que se encuentran en Flux, como onBackpressureBuffer(), también están presentes en la clase Sinks.

Existen varias categorías de sinks:

  • Sinks.many().unicast(): un Sinks.Many.unicast...

Transformaciones personalizadas en el flujo

Hay una serie de operadores que se utilizan para realizar transformaciones personalizadas en un flujo:

  • transform(): este operador se utiliza para transformar un flujo mediante una función que toma el flujo de entrada como parámetro y devuelve un nuevo flujo. La transformación se realiza cuando se llama al operador, incluso antes de que el flujo sea suscrito.

  • transformDeferred(): este operador es similar a transform(), pero la transformación se difiere hasta que alguien se haya suscrito al flujo. Esto permite aplazar la transformación y aplicarla sólo cuando los suscriptores estén interesados en ella.

  • transformDeferredContextual(): este operador transforma cada elemento del flujo utilizando una función que toma como parámetros el elemento actual del flujo y el ContextView. La transformación se difiere hasta que el flujo tenga un suscriptor, lo que significa que se pueden aplicar transformaciones contextuales específicas a cada elemento del flujo.

He aquí un ejemplo sencillo en el que se utiliza el operador transform() para transformar un flujo de cadenas en un flujo de sus longitudes:

public class TransformExample { 
  public static void main(String[] args) { 
    // Crea un flujo de cadenas 
    Flux<String> stringFlux = Flux.just("rojo", "verde", "naranja"); 
   
    // Transforma el flujo en un flujo de longitudes de cadena 
    Flux<Integer> lengthFlux = 
stringFlux.transform(stringFluxTransformer()); 
   
    // Suscribirse al flujo de longitudes 
    lengthFlux.subscribe(length -> 
System.out.println("Longitud: " + longitud)); 
  } 
   
  private static Function<? super Flux<String>,? 
extends Publisher<Integer>> stringFluxTransformer() { 
    return flux -> flux.map(s -> s.length()); 
  }   
} 

En este ejemplo, creamos un flujo de cadena con tres elementos: "rojo", "verde" y "naranja". A continuación, utilizamos el operador transform() para aplicar nuestra función de transformación...

Global hooks

Los Global hooks (ganchos globales) son puntos de extensión que permiten a los desarrolladores modificar el comportamiento global de determinados aspectos clave del framework. En otras palabras, centralizan la personalización de funciones que se aplican a todo el flujo reactivo de la aplicación.

1. Hooks de supresión

Los hooks de supresión (dropping hooks) son llamados cuando un operador fuente no cumple con la especificación Reactive Streams. Estos errores se salen de la ruta de ejecución habitual, es decir, no pueden transmitirse a través de la señal onError. Esto ocurre generalmente cuando un Publisher llama a onNext() sobre el operador, cuando previamente ha llamado a onCompleted() sobre el operador. En esta situación, el valor devuelto por onNext() se descarta. Lo mismo ocurre con una señal onErrorin necesaria. Los hooks asociados, onNextDropped() y onErrorDropped(), dan la posibilidad de asignar un consumer global para estas supresiones. Por ejemplo, puede usarlo para registrar la supresión y liberar los recursos asociados a un valor si es necesario (ya que nunca llega al resto de la cadena reactiva). Si define los hooks sucesivamente, es un proceso aditivo: cada consumidor que asigne es invocado. Los hooks se pueden restablecer completamente a sus parámetros por defecto utilizando los métodos Hooks.resetOn*Dropped().

Por ejemplo:

  public void onNextDroppedFailReplaces() { 
    AtomicReference<Object> dropHook = new AtomicReference<>(); 
    Publisher<Integer> p = s -> { 
      s.onSubscribe(Operators.emptySubscription()); 
      s.onNext(1); 
      s.onNext(2); 
      s.onNext(3); 
    }; 
    List<Integer> seen = new ArrayList<>(); 
 
    try { 
      Hooks.onNextDropped(dropHook::set); 
      Hooks.onNextDroppedFail(); 
 
      assertThatExceptionOfType(RuntimeException.class) 
          .isThrownBy(() -> Flux.from(p).take(2).subscribe(seen::add)) ...

API Context

La API Context se utiliza para propagar información de manera reactiva a través de Flux y Monos. Permite compartir datos entre las diferentes etapas del procesamiento reactivo sin tener que utilizar variables globales o referencias compartidas. La clase Context representa un contexto asociado a un flujo o a un mono. Se utiliza para almacenar datos en forma de pares clave-valor. Cada flujo o mono puede tener su propio contexto, y cuando un flujo o mono emite un valor, también puede propagar su contexto a los operadores posteriores.

1. Estructura del contexto

Context es una interfaz similar a Map. Almacena pares clave-valor y permite recuperar un valor almacenado por su clave.

Algunas observaciones sobre el contexto:

  • Las claves y los valores son de tipo Object, por lo que una instancia de Context (y ContextView) puede contener cualquier número de valores muy diferentes de diferentes bibliotecas y fuentes.

  • Un Context es inmutable. No puede modificarse una vez creado. Los métodos de escritura como put() y putAll() devuelven una nueva instancia del contexto con los cambios realizados.

Desde la versión 3.4.0, la interfaz Context tiene una versión simplificada de sólo lectura, llamada ContextView. Esta no expone ningún método de escritura. En otras palabras, ContextView sólo se puede utilizar para leer de un contexto.

  • Puede comprobar si una clave está presente utilizando hasKey(Object key).

  • Utilice getOrDefault(Object key, T defaultValue) para recuperar un valor (convertido a T) o para utilizar un valor predeterminado si la instancia Context no contiene esta clave.

  • Utilice getOrEmpty(Object key) para obtener un Optional<T> (la instancia Context intenta convertir el valor almacenado en T).

  • Utilice put(Object key, Object value) para almacenar un par clave-valor, que devuelve una nueva instancia de contexto. También puedes fusionar dos contextos en uno nuevo utilizando putAll(ContextView).

  • Utilice delete(Object key) para eliminar el valor asociado a una clave, que devuelve un nuevo contexto.

Una vez que hayas rellenado un Context, puede que quieras echarle un vistazo en tiempo de ejecución. La mayoría de las veces, es el usuario final el que introduce la información en el contexto, mientras que la explotación de esta información es responsabilidad de la biblioteca de terceros, ya que estas bibliotecas generalmente están...

Pruebas

El enfoque TDD (Test Driven Development, desarrollo guiado por pruebas) requiere pruebas unitarias y pruebas de integración de Spring.

Comenzamos añadiendo la dependencia reactor-test:

<dependency> 
    <groupId>io.projectreactor</groupId> 
    <artifactId>reactor-test</artifactId> 
    <scope>test</scope> 
    (1) 
</dependency> 

Esta dependencia proporciona una funcionalidad adicional para facilitar las pruebas de aplicaciones basadas en Spring Reactor. Las principales contribuciones de esta dependencia son las siguientes:

  • Configuración del scheduler. La dependencia permite simular diferentes tipos de schedulers (como Schedulers.parallel() o Schedulers.single()) para realizar pruebas. Esto facilita la comprobación del comportamiento asíncrono sin tener que esperar realmente a los tiempos de ejecución.

  • La clase StepVerifier, que facilita la comprobación de las secuencias de eventos emitidas por Flux y de Mono. Con StepVerifier, puede comprobar si Flux o Mono emiten los elementos esperados, si terminan correctamente y si generan errores cuando se espera.

  • La clase VirtualTimeScheduler, que ofrece la posibilidad de controlar el tiempo virtual durante las pruebas para las operaciones de retardo y programación. Esto permite simular retrasos sin tener que esperar realmente a que transcurra el tiempo.

  • La clase TestSubscriber, que permite probar un Flux o Mono suscribiéndose con un TestSubscriber, facilitando la inspección de eventos, errores y señales de finalización.

  • La clase StepVerifierOptions, que se utiliza para configurar las opciones de StepVerifier, como activar o desactivar el modo de depuración, que muestra información de depuración adicional cuando se ejecuta StepVerifier.

La dependencia reactor-test proporciona potentes herramientas para probar y validar el comportamiento de Flux y Mono en el contexto de pruebas unitarias o de integración. Facilita la configuración de escenarios de prueba complejos y el control del tiempo y las operaciones asíncronas para garantizar pruebas más fiables y reproducibles.

1. Pruebas con StepVerifier

La clase StepVerifier es una clase de utilidad proporcionada por Reactor Core que se puede utilizar para probar declarativamente...

Reactor de depuración

El modo de depuración de Reactor (Reactor Debug) es una funcionalidad de Spring Reactor Core que permite facilitar la depuración y la comprensión del flujo de eventos cuando se desarrollan aplicaciones reactivas. Proporciona herramientas para visualizar interacciones entre diferentes operadores y suscriptores, así como para monitorizar el estado del flujo en diferentes momentos de su ejecución.

Para activar el modo de depuración, puede utilizar los métodos doOnSubscribe(), doOnNext(), doOnError(), doOnComplete() y doOnTerminate() con el operador log(), que muestra información de depuración en cada etapa del flujo.

Aquí hay un ejemplo de cómo usar las operaciones doOnSubscribe(), doOnNext(), doOnError(), doOnComplete(), doOnTerminate() y log() para habilitar la depuración en un proyecto Reactor.

Flux.just("A", "B", "C", "D") 
  .log() // Mostrar información de registro 
  // Se activa al suscribirse 
  .doOnSubscribe(subscription -> System.out.println("Flux 
subscribed!")) 
  // Se activa con cada nuevo elemento 
  .doOnNext(element -> System.out.println("Next element: " + 
element)) 
  // Se activa en caso de error 
  .doOnError(error -> System.out.println("An error has occurred: " 
+ error.getMessage())) 
  // Se activa cuando el flujo termina normalmente 
  .doOnComplete(() -> System.out.println("¡Flujo completo!")) 
  // Se activa cuando el flujo termina por cualquier otra razón 
  .doOnTerminate(() -> System.out.println("Flujo terminado 
(either onComplete...

Conclusión

En conclusión, el uso de Reactor Core ofrece un enfoque potente y flexible de la programación reactiva en Java. Gracias a su implementación de las especificaciones Reactor y Reactive Streams, permite gestionar flujos de datos asíncronos de forma eficiente, a la vez que proporciona un completo conjunto de operadores para transformar, combinar y gestionar flujos.

Con Reactor Core, los desarrolladores pueden crear pipelines de procesamiento de datos reactivas en las que el procesamiento se realiza de forma asíncrona y sin bloqueos, lo que permite un mejor uso de los recursos del sistema y una mayor extensibilidad. Se proporcionan operadores para manejar errores, gestionar la contrapresión y controlar la programación de tareas, lo que facilita el desarrollo de aplicaciones reactivas robustas y de alto rendimiento.

Los schedulers también ofrecen un control preciso sobre la ejecución de las tareas, permitiendo especificar en qué thread o scheduler deben realizarse las operaciones, lo que facilita la gestión de la concurrencia y la paralelización.

Además, Reactor Core proporciona mecanismos para gestionar las suscripciones y la vida útil de los flujos, en particular con el uso de Disposable y sinks, ofreciendo así una gestión precisa de los recursos y evitando problemas de fugas de memoria.

Reactor Core es, por tanto, una potente herramienta para la programación...