L’API Stream Gatherers dans Java 24

Clément de Tastes

Tech Lead


Publié le 03/04/2025Temps de lecture : 8 minutes
Description

L’API Stream Gatherers dans Java 24

Alors que Java 8 sortait il y a un peu plus de onze ans en mars 2014, il apportait une fonctionnalité qui allait significativement impacter la façon dont nous écrivons notre code grâce à l’API Stream et aux expressions lambdas. Avec son approche fonctionnelle, l’API Stream offre une manière élégante et expressive de réaliser des traitements sur des flux de données.

Le JDK 24 vient enrichir en standard l’API Stream avec le concept de "gatherer".

Quelques rappels

Avant de plonger dans le vif du sujet, rappelons qu’un Stream repose sur 3 éléments clés :

  • Une source de données

  • De 0 à n opérations intermédiaires

  • Une unique opération terminale

La source du Stream peut être construite de diverses manières, par exemple :

  • À partir d’une java.util.Collection grâce aux méthodes stream() ou parallelStream()

  • Depuis un tableau avec la méthode Arrays.stream(Object[])

  • Via l’une des fabriques Stream::of, IntStream::range, Stream::iterate

  • Avec BufferedReader.lines() pour manipuler les lignes d’un fichier

  • Depuis diverses méthodes de la classe java.util.Random comme ints() ou doubles()

  • …​

Les opérations intermédiaires sont toujours lazy, c’est-à-dire que leur exécution en tant que telles ne réalisent pas directement le traitement, mais renvoient plutôt une nouvelle instance de Stream qui appliquera les traitements lorsque celui-ci sera parcouru. Le parcours du stream ne débute que lorsque l’opération terminale est invoquée. Ces opérations peuvent être stateless (map(), filter()) ou stateful (distinct(), sorted()).

L'opération terminale peut traverser le stream pour produire un résultat (ex. reduce()) ou un effet (ex. forEach()).

Les opérations intermédiaires et terminales peuvent être greedy (tous les éléments seront parcourus, comme pour IntStream::sum) ou court-circuit (un sous-ensemble seulement est parcouru avant de produire un résultat, comme pour limit() ou anyMatch()).

Exemple de stream basique
var list = Stream.of("Paris", "Lyon", "Marseille", "Cheverny") (1)
  .filter(str -> str.contains("a")) (2)
  .map(String::toUpperCase) (2)
  .toList(); (3)

System.out.println(list); // Affiche [PARIS, MARSEILLE]
1 Source du stream
2 Opérations intermédiaires
3 Opération terminale

Un des atouts de l’API Stream est sa capacité à s’exécuter aussi bien de manière séquentielle que parallèle sans modifier en profondeur le code. Les méthodes Stream::parallel et Stream::sequential permettent de basculer entre ces modes, offrant ainsi une flexibilité d’exécution adaptée aux besoins de performance. Un stream parallèle exploite le ForkJoinPool commun pour répartir le traitement sur plusieurs threads, ce qui peut accélérer les opérations sur de grandes collections. L’exécution en parallèle nécessitant toutefois certaines attentions (ordre, effets de bord…​).

Bascule en stream parallèle
var list = Stream.of("Paris", "Lyon", "Marseille", "Cheverny")
  .parallel() (1)
  .filter(str -> str.contains("a"))
  .map(String::toUpperCase)
  .toList();
1 Appel à Stream::parallel pour basculer en stream parallèle

Bien que nombreuses, les opérations intermédiaires proposées par l’API ne répondent pas à tous les cas d’usage. Supposons que nous souhaitions traiter séquentiellement un flux composé de paires successives de String (prénom, nom) pour reconstruire une liste de personnages en mappant sur le type record Hero(String firstName, String lastName) {} :

record Hero(String firstName, String lastName) {}
Stream.of("Duke", "Skydancer", "Hank", "Solau")
  .xxx // Besoin d'une opération intermédiaire personnalisée

L’utilisation d’un stream pour un tel traitement devient inadaptée : une itération classique et des manipulations sur les index seraient plus propices.

La pièce manquante du puzzle Stream

L’API Stream propose de nombreuses opérations terminales telles que forEach(), findFirst(), count(), …​ répondant chacune à un besoin courant précis. Il est également possible de définir ses propres opérations terminales en définissant sa propre implémentation de Collector<? super T, A, R>, et en la fournissant comme paramètre de l’opération terminale collect(). La classe Collectors vient aussi, à mi-chemin, enrichir l’API en fournissant diverses fabriques pour créer des instances personnalisées de Collector avec par exemple Collectors::groupingBy ou Collectors::joining.

Néanmoins, l’API ne proposait jusqu’alors pas de richesse équivalente pour ce qui est des opérations intermédiaires, en ne fournissant pas de possibilité de créer ses implémentations personnalisées. Bien que l’API ait été enrichie au cours du temps (par exemple, ajout des opérations dropWhile() et takeWhile() en Java 9 et mapMulti() en JDK 16), il n’y avait toujours pas l’équivalent d’un collect() pour les opérations intermédiaires.
La JEP 485: Stream Gatherers vient apporter un outil supplémentaire en ajoutant l’opération intermédiaire Stream::gather qui accepte une instance de java.util.stream.Gatherer. Après une période de maturation de 2 previews (JEP 461 en Java 22 et JEP 473 en Java 23), la fonctionnalité est standard en Java 24.

Table 1. Analogies entre opérations intermédiaires et terminales

Opérations spécifiques

Opération générique

Fabrique

Opérations
intermédiaires

map()
filter()
flatMap()
peek()
…​

gather()

Gatherers

Opérations
terminales

forEach()
toList()
findAny()
reduce()
…​

collect()

Collectors

L’interface java.util.stream.Gatherer

Un Gatherer a pour but de représenter quasiment tout type d’opération intermédiaire, et il peut être :

  • exécuté en séquentiel ou en parallèle

  • stateless ou stateful

  • court-circuit (peut s’arrêter avant la fin) ou greedy (traite forcément tous les éléments)

L’interface Gatherer définit 4 opérations qui fonctionnent de concert, selon les besoins :

  • default Supplier<A> initializer() : optionnelle, permet de fournir un objet pour stocker un état privé qui pourra être utilisé lors de la consommation des éléments du flux.

  • Integrator<A, T, R> integrator() : fournit une instance d'Integrator, interface fonctionnelle qui définit la façon dont sont intégrés les éléments du flux entrant vers le flux de sortie, en tenant éventuellement compte de l’état privé.

  • default BinaryOperator<A> combiner() : optionnelle, combine deux états dans le cas d’un Stream parallèle.

  • default BiConsumer<A, Downstream<? super R>> finisher() : optionnelle, invoquée lorsqu’il n’y a plus d’éléments à traiter. Elle peut utiliser l’état privé pour éventuellement, émettre des éléments supplémentaires vers le flux de sortie.

Les fabriques de Gatherer

L’interface Gatherer fournit plusieurs fabriques permettant d’obtenir une instance de Gatherer à partir d’une implémentation de tout ou partie des quatre opérations. La fourniture d’une implémentation d’un Integrator est le minimum requis, les autres opérations étant quant à elles optionnelles.

Cette instance de Gatherer peut être :

  • parallélisable via les surcharges de Gatherer::of

  • séquentielle via les surcharges de Gatherer::ofSequential

ofSequential() ne propose pas de surcharge faisant intervenir de combiner car cela est réservé aux Gatherer parallélisables.

La définition d’un Integrator

Il est possible d’émettre ou non un ou plusieurs éléments vers le flux de sortie, tout comme d’interrompre prématurément le traitement avant d’avoir atteint la fin des éléments. La signature de la méthode est la suivante : boolean integrate(A state, T element, Downstream<? super R> downstream)

  • A state état optionnel

  • T element élément provenant de l'upstream Stream<T>

  • Downstream<? super R> downstream flux de sortie, dont le type générique peut être différent du flux d’entrée

Le retour de type booléen indique s’il faut continuer à traiter de nouveaux éléments ou court-circuiter.

La ré-implémentation d’une opération existante

Armé de cet outil "à tout faire", un bon exercice pour se familiariser avec l’API peut être de réimplémenter une opération intermédiaire existante, par exemple le cas de map(). Pour chaque élément de l'upstream, map() applique la Function passée en paramètre de la méthode puis transmet l’élément au downstream. Pour cela, nous n’avons besoin que de définir un Integrator.

Par exemple pour transformer un flux de String en leurs versions en lettres capitales :

Définition d’un gatherer qui map les éléments en lettres capitales
Integrator<Void, String, String> integrator = (_, element, downstream) -> { (1)
  downstream.push(element.toUpperCase()); (2)
  return true; (3)
};
Gatherer<String, Void, String> mapper = Gatherer.of(integrator); (4)

Stream.of("this", "is", "the", "way")
  .gather(mapper) (5)
  .forEach(System.out::println);
1 Définition de l'integrator, stateless donc on utilise Void et on n’utilise pas l’état
2 Transmission de l’élément transformé en lettres capitales au flux descendant
3 On traite tous les éléments du flux
4 Utilisation de la fabrique of(Integrator<Void, T, R> integrator) pour obtenir une instance de Gatherer
5 On passe l’instance du gatherer à l’opération intermédiaire gather()
Affichage dans la console
THIS
IS
THE
WAY

L’implémentation d’une opération avancée

Tâchons d’aller plus loin cette fois-ci en créant un gatherer séquentiel qui répond au besoin énoncé précédemment : traiter un flux d’entrée composé de paires de String (nom, prénom) pour reconstruire une liste de record Hero(String firstName, String lastName) {}.

Ce gatherer est stateful car nous devons conserver l’état d’avancement dans le flux. Nous allons donc devoir gérer cet état et fournir un initializer.

Il s’agit simplement d’un Supplier<A> qui permet de préciser le type A de l’état et qui fournit un moyen de l’initialiser.

class State { (1)
  String firstName;
}

record Hero(String firstName, String lastName) {}

Gatherer<String, State, Hero> heroGatherer = Gatherer.ofSequential( (2)
  State::new, (3)
  (state, element, downstream) -> {
    if (state.firstName == null) {
      state.firstName = element; (4)
    } else {
      downstream.push(new Hero(state.firstName, element)); (5)
      state.firstName = null;
    }
    return true;
  }
);

Stream.of("Duke", "Skydancer", "Hank", "Solau")
  .gather(heroGatherer)
  .forEach(System.out::println);
1 Définition d’un type mutable pour conserver l’état
2 Utilisation de la fabrique ofSequential(initializer, integrator) pour définir un gatherer séquentiel
3 Initialisation de l’état
4 L’état est vide, on conserve l’élément courant qui correspond au prénom
5 L’état est présent, on crée une instance de Hero complète à partir de l’état (prénom) et de l’élément courant (nom) que l’on passe au downstream avant de réinitialiser l’état
Affichage dans la console
Hero[firstName=Duke, lastName=Skydancer]
Hero[firstName=Hank, lastName=Solau]

L’utilisation d’un finisher

Le finisher permet de réaliser des traitements une fois tous les éléments du flux d’entrée consommés, pouvant impliquer l’état privé ainsi que le downstream fournis en paramètres.

Avec notre exemple précédent, supposons que nous souhaitions quand même obtenir une instance de Hero avec une quantité de données impaire. Nous pouvons définir un finisher qui transmet au downstream un Hero contenant le seul prénom.

Il s’agit d’un BiConsumer<A, Downstream<? super R>> qui permet l’utilisation optionnelle de l’état A et du downstream.

Gatherer<String, List<String>, Hero> heroGatherer = Gatherer.ofSequential(
  ArrayList::new,
  (state, element, downstream) -> {
    if (state.isEmpty()) {
      state.add(element);
    } else {
      downstream.push(new Hero(state.getFirst(), element));
      state.clear();
    }
    return true;
  },
  (state, downstream) -> { (1)
    if (state.firstName != null) {
      downstream.push(new Hero(state.firstName, null)); (2)
    }
  }
);
1 Définition du finisher
2 Utilisation de l’état courant pour transmettre un élément au downstream
Affichage dans la console
Hero[firstName=Duke, lastName=Skydancer]
Hero[firstName=Hank, lastName=Solau]
Hero[firstName=Rando, lastName=null]

L’utilisation d’un combiner

Une des marques de fabrique de la trilogie Star Gatherers est la parallélisation des événements. Pendant que Duke Skydancer suit les enseignements de Maître Yoga sur la planète Gadobah, Hank Solau et ses compagnons fuient l’empire et recherchent de l’aide auprès de Rando Galrissian. Il est désormais temps de rassembler nos héros avant d’affronter Dork Vapor, et c’est bien d’un combiner dont ils vont avoir besoin.

En étoffant notre type Hero d’un attribut enum Strength, utilisons un gatherer pour les regrouper par Strength afin de générer des Category.

Jeu de données
enum Strength { LOW, MID, HIGH }

record Hero(String firstName, String lastName, Strength strength) {}

record Category(Strength strength, List<String> firstNames) {}

Stream<Hero> heroes = Stream.of(
  new Hero("Duke", "Skydancer", Strength.HIGH),
  new Hero("Léa", "Origami", Strength.HIGH),
  new Hero("Hank", "Solau", Strength.MID),
  new Hero("Obi-Two", "Kanobi", Strength.HIGH),
  new Hero("Yoga", "", Strength.HIGH),
  new Hero("Chewbarka", "", Strength.MID),
  new Hero("Rando", "Galrissian", Strength.MID),
  new Hero("Vedge", "Antillus", Strength.MID),
  new Hero("C-4PA", "", Strength.LOW),
  new Hero("R3-D3", "", Strength.LOW)
);

On utilise une Map<Strength, List<String>> pour conserver l’état courant et le combiner aura pour rôle de fusionner deux jeux de données dans une même Map.

Gatherer<Hero, Map<Strength, List<String>>, Category> rebellionGatherer =
  Gatherer.of( (1)
    // Initializer
    HashMap::new,

    // Integrator
    (state, hero, _) -> {
      state.computeIfAbsent(hero.strength, _ -> new ArrayList<>()).add(hero.firstName); (2)
      return true;
    },

    // Combiner
    (left, right) -> {
      right.forEach((key, value) ->
        left.computeIfAbsent(key, _ -> new ArrayList<>()).addAll(value)); (3)
      return left;
    },

    // Finisher
    (state, downstream) -> state.forEach((strength, names) -> {
      Category category = new Category(strength, names);
      downstream.push(category); (4)
    })
  );
1 Utilisation de la fabrique of() qui accepte les 4 familles d’opérations : initializer, integrator, combiner et finisher
2 Catégorisation de l’élément parcouru en le stockant dans l’état interne
3 Fusion des deux Map
4 Emission des catégories vers le downstream
Exécution du stream en parallèle
heroes
  .parallel()
  .gather(rebellionGatherer)
  .forEach(System.out::println);
Affichage dans la console
Category[strength=HIGH, firstNames=[Duke, Léa, Obi-Two, Yoga]]
Category[strength=LOW, firstNames=[C-4PA, R3-D3]]
Category[strength=MID, firstNames=[Hank, Chewbarka, Rando, Vedge]]

Les méthodes Gatherer::defaultInitializer, Gatherer::defaultCombiner et Gatherer::defaultFinisher

L’opération integrator est requise pour définir un gatherer mais les initializer, combiner et finisher sont optionnelles. Les différentes fabriques de Gatherer of() et ofSequential() offrent diverses combinaisons logiques de ces opérations. Cependant, pour un stream parallèle par exemple, il n’est parfois pas nécessaire d’avoir de traitement particulier dans le finisher. Or la seule fabrique permettant de construire un gatherer parallèle impose de fournir les 4 opérations, on pourra alors utiliser Gatherer::defaultFinisher qui évite de redéfinir une coquille vide et apporte une plus-value sémantique.

Exemple d’un gatherer parallèle qui renvoie le plus grand élément rencontré, mais qui s’arrête si cette valeur dépasse 100
class State {
  Integer max = null;
}

Gatherer<Integer, ?, Integer> gatherer = Gatherer.of(
  State::new,
  (state, element, downstream) -> {
    if (state.max == null || element > state.max) {
      state.max = element;
    }
    if (state.max > 100) {
      downstream.push(state.max);
      return false;
    }
    return true;
  },
  (e1, e2) -> (e1.max > e2.max) ? e1 : e2,
  Gatherer.defaultFinisher() (1)
);
1 Utilisation de defaultFinisher() car il n’y a pas de traitement particulier à réaliser à la fin

Quelques optimisations

L’API propose quelques outils pour optimiser le traitement des streams utilisant des gatherers.

La méthode Downstream::isRejecting

L’interface Downstream fournit la méthode boolean isRejecting() qui indique si le downstream continue d’accepter de nouveaux éléments ou non. Comme son nom l’indique, si l’invocation de la méthode renvoie true, le downstream n’accepte plus de nouvel élément.

Cette information peut être exploitée par un gatherer pour s’éviter de réaliser des traitements qui s’avéreraient inutiles, puisque le downstream rejette tout nouvel élément qui lui serait transmis.

Utilisation de isRejecting()
(state, element, downstream) -> {
  if (downstream.isRejecting()) {
    // Le downstream n'accepte plus de nouveaux éléments
    return false;
  }
  Object result = process(element);
  downstream.push(result);
  return true;
}

Le retour de la méthode Downstream::push

La méthode Downstream::push renvoie un booléen : si sa valeur est false, alors le downstream n’accepte plus de nouveaux éléments. On pourra l’utiliser de manière analogue à isRejecting() et ainsi améliorer le code précédent :

Utilisation de isRejecting()
(state, element, downstream) -> {
  if (downstream.isRejecting()) {
    // Le downstream n'accepte plus de nouveaux éléments
    return false;
  }
  Object result = process(element);
  return downstream.push(result); (1)
}
1 Utilisation du retour de la méthode push()

On peut retenir le fonctionnement suivant :

  • un nouveau downstream est toujours initialisé dans un état qui accepte un nouvel élément

  • un downstream peut passer de l’état "non-rejecting" à "rejecting", une seule fois, et uniquement dans ce sens

  • un downstream ne peut changer d’état que lorsqu’un élément lui est envoyé via la méthode push()

La fabrique Integrator::ofGreedy

L’interface Integrator fournit la fabrique ofGreedy() permettant d’obtenir une instance de type Integrator conçue pour consommer l’intégralité de ses données d’entrée (si l’en est que le downstream continue d’accepter des éléments).
Elle accepte une instance de type Greedy qui étend simplement Integrator : interface Greedy<A, T, R> extends Integrator<A, T, R> {}.

On peut donc l’utiliser en lui fournissant une expression lambda de la même manière que pour définir notre integrator :

Utilisation d'ofGreedy() appliqué à un précédent exemple
...
Integrator.ofGreedy((state, hero, _) -> {
  state.computeIfAbsent(hero.strength, _ -> new ArrayList<>()).add(hero.firstName);
  return true;
}),
...

Outre la sémantique explicite qu’apporte cette fabrique (l'integrator n’est pas court-circuit), l’API peut utiliser cette information pour réaliser des optimisations lors de l’exécution du stream.
En effet, les streams utilisent des java.util.Spliterator pour parcourir les éléments de la source de données. Leur nom vient de split (découper) et iterator (itérateur), car ils permettent non seulement d’itérer sur les éléments, mais aussi de diviser la source en plusieurs sous-parties pour le traitement parallèle.
Lorsque l'integrator est greedy, on sait que l’on doit traiter tous les éléments donc le stream peut utiliser Spliterator::forEachRemaining qui sera plus optimisé pour un parcours complet. Dans l’autre cas, le stream utilisera Spliterator::tryAdvance car l’on ne sait pas si et quand le parcours se termine prématurément.

La classe Gatherers

Un certain nombre de fabriques pour des implémentations de Gatherer répondant à des usages courants sont disponibles dans la classe java.util.stream.Gatherers.

La fabrique Gatherers::windowFixed

windowFixed(int windowSize) renvoie un Gatherer séquentiel de type "many-to-many" qui regroupe les éléments d’entrée dans des listes de la taille fournie et transmet les listes en sortie lorsqu’elles sont pleines ou qu’il n’y a plus d’éléments. Cette fabrique peut être utilisée pour définir notre Gatherer qui traite les éléments deux par deux (prénom, nom) pour reconstituer Hero :

Utilisation du gatherer Gatherers::windowFixed
Stream.of("Duke", "Skydancer", "Hank", "Solau")
  .gather(Gatherers.windowFixed(2))
  .map(list -> new Hero(list.getFirst(), list.getLast()))
  .forEach(System.out::println);
Affichage dans la console
Hero[firstName=Duke, lastName=Skydancer]
Hero[firstName=Hank, lastName=Solau]

La fabrique Gatherers::windowSliding

windowSliding(int windowSize) renvoie un Gatherer du même type qui regroupe les éléments d’entrée dans des listes de la taille fournie. Après la première fenêtre, chaque liste suivante est créée à partir d’une copie de la précédente en supprimant le premier élément et en ajoutant l’élément suivant à partir du flux d’entrée.

Utilisation du gatherer Gatherers::windowSliding
Stream.of(1, 2, 3, 4, 5, 6, 7)
  .gather(Gatherers.windowSliding(3))
  .forEach(System.out::println);
Affichage dans la console
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]

La fabrique Gatherers::scan

scan(Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> scanner) renvoie un Gatherer séquentiel de type "one-to-one" qui applique la fonction fournie à l’état actuel et à l’élément courant pour produire l’élément suivant, qu’il transmet en sortie.

Stream.of(1, 2, 3, 4, 5, 6, 7)
  .gather(Gatherers.scan(
    () -> "0",
    (state, element) -> state + element
  )).forEach(System.out::println);

La fabrique Gatherers::fold

fold(Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> folder) renvoie un Gatherer séquentiel de type "many-to-one" qui agrège les données du flux de manière incrémentale et renvoie le résultat une fois tous les éléments du flux entrant consommés.

Les paramètres attendus sont les suivants :

  • Supplier<R> initial : fournit la valeur initiale, du même type que le type de sortie du stream (R)

  • BiFunction<? super R, ? super T, ? extends R> folder : bi-fonction qui implémente la logique du traitement à opérer avec l’état consolidé de type R et l’élément courant de type T.

Utilisation du gatherer Gatherers::fold
Stream.of(1, 2, 3, 4, 5, 6, 7)
  .gather(Gatherers.fold(
    () -> "0",
    (state, element) -> state + element
  )).forEach(System.out::println);
Affichage dans la console
01234567

La fabrique Gatherers::mapConcurrent

mapConcurrent(final int maxConcurrency, final Function<? super T, ? extends R> mapper) renvoie un Gatherer "one-to-one" qui invoque la fonction fournie sur chaque élément du flux en parallèle avec des threads virtuels, dont le nombre maximal est défini par maxConcurrency.

Utilisation du gatherer Gatherers::mapConcurrent
class Service {
  AtomicInteger concurrentAccessCount = new AtomicInteger(0); (1)

  int process(int i) {
    int count = concurrentAccessCount.incrementAndGet();
    System.out.println(Thread.currentThread() + " - Accès concurrent(s) : " + count); (2)
    try {
      Thread.sleep(100);
    } catch (InterruptedException _) {
    }
    concurrentAccessCount.decrementAndGet();
    return i;
  }
}

Service service = new Service();
int sum = Stream.of(1, 2, 3, 4, 5, 6, 7)
  .gather(Gatherers.mapConcurrent(2, service::process)) (3)
  .mapToInt(Integer::intValue)
  .sum();
System.out.println(sum);
1 On trace le nombre d’accès concurrents à l’instant t
2 Affichage du thread courant ayant accédé au service
3 Création d’un gatherer via mapConcurrent() avec 2 accès simultanés au maximum
Affichage dans la console
VirtualThread[#43]/runnable@ForkJoinPool-1-worker-1 - Accès concurrent(s) : 1
VirtualThread[#45]/runnable@ForkJoinPool-1-worker-2 - Accès concurrent(s) : 2
VirtualThread[#51]/runnable@ForkJoinPool-1-worker-1 - Accès concurrent(s) : 1
VirtualThread[#50]/runnable@ForkJoinPool-1-worker-3 - Accès concurrent(s) : 2
VirtualThread[#55]/runnable@ForkJoinPool-1-worker-5 - Accès concurrent(s) : 2
VirtualThread[#56]/runnable@ForkJoinPool-1-worker-3 - Accès concurrent(s) : 1
VirtualThread[#58]/runnable@ForkJoinPool-1-worker-5 - Accès concurrent(s) : 1
28

La composition de Gatherer

Les gatherers supportent la composition via la méthode andThen(Gatherer) qui joint deux gatherers, où le premier produit des éléments que le second peut consommer.

Ainsi sémantiquement :

source.gather(a).gather(b).gather(c).collect(…​)

Est équivalent à :

source.gather(a.andThen(b).andThen(c)).collect(…​)

Composition de gatherers
// Multiplie les éléments du stream par 2
Gatherer<Integer, Void, Integer> multiplier = Gatherer.of(
  (_, element, downstream) -> downstream.push(element * 2)
);

// Limite le traitement à 3 éléments
Gatherer<Integer, AtomicInteger, Integer> limiter = Gatherer.ofSequential(
  AtomicInteger::new,
  (state, element, downstream) -> {
    if (state.getAndIncrement() >= 3) {
      return false;
    }
    return downstream.push(element);
  }
);

// Composition
var composed = limiter.andThen(multiplier); (1)
Stream.of(1, 2, 3, 4, 5, 6, 7)
  .gather(composed)
  .forEach(System.out::println);
1 Création d’un gatherer composé de multiplier et limiter via andThen()
Affichage dans la console
2
4
6

Conclusion

Java 24 vient de sortir et fournit maintenant en standard l’API Stream Gatherers. Celle-ci offre les outils nécessaires aux développeurs pour créer des opérations intermédiaires personnalisées stateless ou stateful, greedy ou court-circuit, séquentielles ou parallèles…​
Elle fournit également quelques fabriques utiles grâce à la classe Gatherers et l’on voit déjà fleurir des bibliothèques qui proposent différents gatherers, comme gatherers4j ou encore packrat.

À votre tour de télécharger un JDK 24 et créer les vôtres.