Is it possible to aggregate an object instad of a string with spring cloud stream api?
up vote
0
down vote
favorite
I want to use the spring cloud stream api to aggreate events from a topic.
Therefore i use as input a KStream.
KStream<Object, LoggerCreatedMessage>
Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
Why can i only use a String as an Initializer is it not possible to use any Object?
Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:
Bad return type in method reference: cannot convert LoggerDomain to VR
Do i miss something?
apache-kafka-streams spring-cloud-stream spring-kafka
add a comment |
up vote
0
down vote
favorite
I want to use the spring cloud stream api to aggreate events from a topic.
Therefore i use as input a KStream.
KStream<Object, LoggerCreatedMessage>
Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
Why can i only use a String as an Initializer is it not possible to use any Object?
Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:
Bad return type in method reference: cannot convert LoggerDomain to VR
Do i miss something?
apache-kafka-streams spring-cloud-stream spring-kafka
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I want to use the spring cloud stream api to aggreate events from a topic.
Therefore i use as input a KStream.
KStream<Object, LoggerCreatedMessage>
Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
Why can i only use a String as an Initializer is it not possible to use any Object?
Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:
Bad return type in method reference: cannot convert LoggerDomain to VR
Do i miss something?
apache-kafka-streams spring-cloud-stream spring-kafka
I want to use the spring cloud stream api to aggreate events from a topic.
Therefore i use as input a KStream.
KStream<Object, LoggerCreatedMessage>
Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
Why can i only use a String as an Initializer is it not possible to use any Object?
Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:
Bad return type in method reference: cannot convert LoggerDomain to VR
Do i miss something?
apache-kafka-streams spring-cloud-stream spring-kafka
apache-kafka-streams spring-cloud-stream spring-kafka
asked Nov 7 at 6:04
axelDiterta
84
84
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().
Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().
Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.
add a comment |
up vote
0
down vote
You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().
Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.
add a comment |
up vote
0
down vote
up vote
0
down vote
You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().
Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.
You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().
Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.
answered Nov 7 at 7:36
Matthias J. Sax
26.8k34471
26.8k34471
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53184314%2fis-it-possible-to-aggregate-an-object-instad-of-a-string-with-spring-cloud-strea%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password