// Assume the record values represent lines of text. For the sake of this example, you can ignore // whatever may be stored in the record keys. KStream<String, String> textLines = ...;
KStream<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. The text lines are the record // values, i.e. you can ignore whatever data is in the record keys and thus invoke // `flatMapValues` instead of the more generic `flatMap`. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the stream by word to ensure the key of the record is the word. .groupBy((key, word) -> word) // Count the occurrences of each word (record key). // This will change the stream type from `KGroupedStream<String, String>` to // `KTable<String, Long>` (word -> count). .count() // Convert the `KTable<String, Long>` into a `KStream<String, Long>`. .toStream();