Interface BranchedKStream<K,V>
- Type Parameters:
K
- the key type of this streamV
- the value type of this stream
BranchedKStream
is an abstraction of a branched record stream of key-value
pairs.
It is an intermediate representation of a KStream
in order to split the original KStream
into
multiple sub-streams
(called branches).
The process of routing the records to different branches is a stateless record-by-record operation.
Branches are defined via branch(Predicate, Branched)
or defaultBranch(Branched)
methods.
Each input record is evaluated against the predicate
supplied via Branched
parameters, and is routed
to the first branch for which its respective predicate evaluates to true
, and is included in this
branch only.
If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch
is created.
For details about multicasting/broadcasting records into more than one KStream
, see KStream.split()
.
Each branch
can be processed either by a Function
or a
Consumer
provided via a Branched
parameter.
If certain conditions are met (see below), all created branches can be accessed from the Map
returned by an
optional defaultBranch(Branched)
or noDefaultBranch()
method call.
Rules of forming the resulting Map
The keys of the Map<String, KStream<K, V>>
entries returned by
defaultBranch(Branched)
or noDefaultBranch()
are defined by the following rules:
- If
Named
parameter was provided forKStream.split(Named)
, its value is used as a prefix for each key. By default, no prefix is used. - If a branch name is provided in
branch(Predicate, Branched)
via theBranched
parameter, its value is appended to the prefix to form theMap
key. - If a name is not provided for the branch, then the key defaults to
prefix + position
of the branch as a decimal number, starting from"1"
. - If a name is not provided for the
defaultBranch()
, then the key defaults toprefix + "0"
.
Map<Stream, KStream<K, V>>
entries are formed as following:
- If no
chain function
orconsumer
is provided inbranch(Predicate, Branched)
via theBranched
parameter, then the branch itself is added to theMap
. - If a
chain function
is provided, and it returns a non-null
value for a given branch, then the value is the result returned by this function. - If a
chain function
returnsnull
for a given branch, then no entry is added to theMap
. - If a
consumer
is provided for a given branch, then no entry is added to theMap
.
Map<String, KStream<..., ...>> result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
.branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
.branch(predicate5) // "foo-5": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch
Usage examples
Direct branch processing
If no single scope for all the branches is required, and each branch can be processed completely independently of others, 'consuming' lambdas or method references inBranched
parameter can be used:
source.split()
.branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
.branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));
Collecting branches in a single scope
If multiple branches need to be processed in the same scope, for example for merging or joining branches again after splitting, theMap
returned by defaultBranch()
or noDefaultBranch()
methods provides
access to all the branches in the same scope:
Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
.branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
.defaultBranch(Branched.as("non-null"));
KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
Dynamic branching
There is also a case when dynamic branch creating is needed, e.g., one branch per enum value:
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values()) {
branched.branch((k, v) -> v.getRecType() == recordType, Branched.withConsumer(recordType::processRecords));
}
-
Method Summary
Modifier and TypeMethodDescriptionDefine a branch for records that match the predicate.Define a branch for records that match the predicate.Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.defaultBranch
(Branched<K, V> branched) Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Finalize the construction of branches without forming a default branch.
-
Method Details
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate
- APredicate
instance, against which each record will be evaluated. If this predicate returnstrue
for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.- Returns:
this
to facilitate method chaining
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate
- APredicate
instance, against which each record will be evaluated. If this predicate returnstrue
for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.branched
- ABranched
parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream
)- Returns:
this
to facilitate method chaining
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranch
ornoDefaultBranch()
is optional.- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranch
ornoDefaultBranch()
is optional.- Parameters:
branched
- ABranched
parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream
)- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-
noDefaultBranch
Finalize the construction of branches without forming a default branch. Calling#noDefaultBranch()
ordefaultBranch()
is optional.- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-