Interface BranchedKStream<K,V>

Type Parameters:
K - the key type of this stream
V - the value type of this stream

public interface BranchedKStream<K,V>
BranchedKStream is an abstraction of a branched record stream of key-value pairs. It is an intermediate representation of a KStream in order to split the original KStream into multiple sub-streams (called branches). The process of routing the records to different branches is a stateless record-by-record operation.

Branches are defined via branch(Predicate, Branched) or defaultBranch(Branched) methods. Each input record is evaluated against the predicate supplied via Branched parameters, and is routed to the first branch for which its respective predicate evaluates to true, and is included in this branch only. If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch is created. For details about multicasting/broadcasting records into more than one KStream, see KStream.split().

Each branch can be processed either by a Function or a Consumer provided via a Branched parameter. If certain conditions are met (see below), all created branches can be accessed from the Map returned by an optional defaultBranch(Branched) or noDefaultBranch() method call.

Rules of forming the resulting Map
The keys of the Map<String, KStream<K, V>> entries returned by defaultBranch(Branched) or noDefaultBranch() are defined by the following rules:
  • If Named parameter was provided for KStream.split(Named), its value is used as a prefix for each key. By default, no prefix is used.
  • If a branch name is provided in branch(Predicate, Branched) via the Branched parameter, its value is appended to the prefix to form the Map key.
  • If a name is not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1".
  • If a name is not provided for the defaultBranch(), then the key defaults to prefix + "0".
The values of the respective Map<Stream, KStream<K, V>> entries are formed as following: For example:

 Map<String, KStream<..., ...>> result =
   source.split(Named.as("foo-"))
     .branch(predicate1, Branched.as("bar"))                    // "foo-bar"
     .branch(predicate2, Branched.withConsumer(ks->ks.to("A"))  // no entry: a Consumer is provided
     .branch(predicate3, Branched.withFunction(ks->null))       // no entry: chain function returns null
     .branch(predicate4, Branched.withFunction(ks->ks))         // "foo-4": chain function returns non-null value
     .branch(predicate5)                                        // "foo-5": name defaults to the branch position
     .defaultBranch()                                           // "foo-0": "0" is the default name for the default branch
 

Usage examples

Direct branch processing
If no single scope for all the branches is required, and each branch can be processed completely independently of others, 'consuming' lambdas or method references in Branched parameter can be used:

 source.split()
   .branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
   .branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
   .defaultBranch(Branched.withConsumer(ks->ks.to("C")));
 
Collecting branches in a single scope
If multiple branches need to be processed in the same scope, for example for merging or joining branches again after splitting, the Map returned by defaultBranch() or noDefaultBranch() methods provides access to all the branches in the same scope:

 Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
   .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
   .defaultBranch(Branched.as("non-null"));

 KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
 
Dynamic branching
There is also a case when dynamic branch creating is needed, e.g., one branch per enum value:

 BranchedKStream branched = stream.split();
 for (RecordType recordType : RecordType.values()) {
   branched.branch((k, v) -> v.getRecType() == recordType, Branched.withConsumer(recordType::processRecords));
 }
 
  • Method Details

    • branch

      BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate)
      Define a branch for records that match the predicate.
      Parameters:
      predicate - A Predicate instance, against which each record will be evaluated. If this predicate returns true for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.
      Returns:
      this to facilitate method chaining
    • branch

      BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
      Define a branch for records that match the predicate.
      Parameters:
      predicate - A Predicate instance, against which each record will be evaluated. If this predicate returns true for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.
      branched - A Branched parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples for BranchedKStream)
      Returns:
      this to facilitate method chaining
    • defaultBranch

      Map<String,KStream<K,V>> defaultBranch()
      Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. Calling defaultBranch or noDefaultBranch() is optional.
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.
    • defaultBranch

      Map<String,KStream<K,V>> defaultBranch(Branched<K,V> branched)
      Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. Calling defaultBranch or noDefaultBranch() is optional.
      Parameters:
      branched - A Branched parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples for BranchedKStream)
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.
    • noDefaultBranch

      Map<String,KStream<K,V>> noDefaultBranch()
      Finalize the construction of branches without forming a default branch. Calling #noDefaultBranch() or defaultBranch() is optional.
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.