Is it possible to aggregate an object instad of a string with spring cloud stream api?











up vote
0
down vote

favorite












I want to use the spring cloud stream api to aggreate events from a topic.
Therefore i use as input a KStream.



KStream<Object, LoggerCreatedMessage>


Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:



input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);


Why can i only use a String as an Initializer is it not possible to use any Object?



Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:




Bad return type in method reference: cannot convert LoggerDomain to VR




Do i miss something?










share|improve this question


























    up vote
    0
    down vote

    favorite












    I want to use the spring cloud stream api to aggreate events from a topic.
    Therefore i use as input a KStream.



    KStream<Object, LoggerCreatedMessage>


    Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:



    input
    .map((key, value) -> {
    return new KeyValue<>(value.logger_id,value);
    })
    /*.groupBy(
    (s, loggerEvent) -> loggerEvent.logger_id,
    Serialized.with(null, loggerEventSerde))*/
    .groupByKey()
    .aggregate(
    String::new,
    (s, loggerEvent, vr) -> {
    return vr;
    },
    Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
    withValueSerde(Serdes.String())
    );


    Why can i only use a String as an Initializer is it not possible to use any Object?



    Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:




    Bad return type in method reference: cannot convert LoggerDomain to VR




    Do i miss something?










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I want to use the spring cloud stream api to aggreate events from a topic.
      Therefore i use as input a KStream.



      KStream<Object, LoggerCreatedMessage>


      Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:



      input
      .map((key, value) -> {
      return new KeyValue<>(value.logger_id,value);
      })
      /*.groupBy(
      (s, loggerEvent) -> loggerEvent.logger_id,
      Serialized.with(null, loggerEventSerde))*/
      .groupByKey()
      .aggregate(
      String::new,
      (s, loggerEvent, vr) -> {
      return vr;
      },
      Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
      withValueSerde(Serdes.String())
      );


      Why can i only use a String as an Initializer is it not possible to use any Object?



      Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:




      Bad return type in method reference: cannot convert LoggerDomain to VR




      Do i miss something?










      share|improve this question













      I want to use the spring cloud stream api to aggreate events from a topic.
      Therefore i use as input a KStream.



      KStream<Object, LoggerCreatedMessage>


      Now i want to use an aggregator to store my new Object in a KeyValue Store, so i use following code:



      input
      .map((key, value) -> {
      return new KeyValue<>(value.logger_id,value);
      })
      /*.groupBy(
      (s, loggerEvent) -> loggerEvent.logger_id,
      Serialized.with(null, loggerEventSerde))*/
      .groupByKey()
      .aggregate(
      String::new,
      (s, loggerEvent, vr) -> {
      return vr;
      },
      Materialized.<String, String, KeyValueStore<Bytes, byte>>as(STORE_NAME).withKeySerde(Serdes.String()).
      withValueSerde(Serdes.String())
      );


      Why can i only use a String as an Initializer is it not possible to use any Object?



      Instead of String::new i wanted to use LoggerDomain::new, but i only get this error message:




      Bad return type in method reference: cannot convert LoggerDomain to VR




      Do i miss something?







      apache-kafka-streams spring-cloud-stream spring-kafka






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 7 at 6:04









      axelDiterta

      84




      84
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote













          You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().



          Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














             

            draft saved


            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53184314%2fis-it-possible-to-aggregate-an-object-instad-of-a-string-with-spring-cloud-strea%23new-answer', 'question_page');
            }
            );

            Post as a guest
































            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            0
            down vote













            You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().



            Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.






            share|improve this answer

























              up vote
              0
              down vote













              You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().



              Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.






              share|improve this answer























                up vote
                0
                down vote










                up vote
                0
                down vote









                You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().



                Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.






                share|improve this answer












                You define <key,value> as <String, String> via Materialized.<String, String, KeyValueStore<Bytes, byte>> -- if you value type should be LoggerDomain, it should be Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte>>().



                Note that you need to provide a custom Serde for LoggerDomain for this case to Materialized, too.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 7 at 7:36









                Matthias J. Sax

                26.8k34471




                26.8k34471






























                     

                    draft saved


                    draft discarded



















































                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53184314%2fis-it-possible-to-aggregate-an-object-instad-of-a-string-with-spring-cloud-strea%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest




















































































                    Popular posts from this blog

                    横浜市

                    Prokocim

                    Hungria