humming-bird-taking-nectar-from-blooming-flower

嘿,所有,今天我将展示一种方法,以动态生成多个消费组与春天卡夫卡。在此方法之前,让我们使用注释来执行此操作。我们只是创建一个配置类,该配置类由生成 我们的 的弹簧 @Bean KafkaListenerContainerFactory 组成。

您可能还喜欢:Apache Kafka 第 1 部分:错误处理、消息转换和事务支持

使用者配置文件示例:

bootstrap.servers=127.0.0.1:9093
key.deserializer=org.apache.kafka.common.serialization.LongDeserializer
value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages=com.sample.pojo,java.util, java.lang
group.id=CONSUMER-1-GROUP
auto.offset.reset=earliest

 public class GeneralUtils { 

 public static Map<String, Object>  provideKafkaConfig(String property) {

            Map<String, Object> configProps = new HashMap<>();
            InputStream input = null;
            try {
                Properties prop = new Properties();

                input = new FileInputStream(property);
                if (input == null) {
                    log.error("Sorry, unable to find " + property);
                    return null;
                }
                prop.load(input);
                Enumeration<?> e = prop.propertyNames();
                while (e.hasMoreElements()) {
                    String key = (String) e.nextElement();
                    String value = prop.getProperty(key);
                    configProps.put(key, value);
                }
            } catch (IOException e) {
                log.error(GeneralUtils.getExcStackTrace(e));
            } finally {
                try {
                    if (input != null)
                        input.close();
                } catch (Exception ex) {
                    log.error(GeneralUtils.getExcStackTrace(ex));
                }

            }
            return configProps;
        }
    }

@ComponentScan({"com.sample.config.*"})
@EnableKafka
@Slf4j
@Configuration
public class KafkaConsumerConfig {


    public Map<String, Object> configMap(boolean isOnline) {
        Map<String, Object> configProps = new HashMap<>();
        //log4j config
        String property = System.getProperty("kafkaConsumerPropFilePath") :             
        if (null != property)
                configProps=GeneralUtils.provideKafkaConfig(property);
        return configProps;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, LogDay>> onlineKafkaListenerContainerFactory() {

        Map<String, Object> propMap = configMap(true);
        ConcurrentKafkaListenerContainerFactory<Long, LogDay> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(5);
        factory.getContainerProperties().setPollTimeout(1000l);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(propMap));
        return factory;
    }

}

然后,我们生成我们的侦听器方法,它使用我们的KafkaListenerContainerFactory…//
}

那么,如何从文件或数据库中读取我们的使用者组并动态生成它们?

只需查看下面的示例。我们不使用任何 Spring-Kafka 注释,也不会 KafkaListenerContainerfactory 将 定义为 @Bean

我们使用 DefaultKafaConsumerFactory Kafka 使用者属性映射从类生成对象。然后,我们从相关主题生成一个对象, Container Propertie 并在其中设置消息侦听器。

最后,我们生成 ConcurrentMessageListenerContainer 并启动它。

@Slf4j
@Component
public class ListenerBean {

    public Map<String, Object> configMap(String propName) {
        Map<String, Object> configProps = new HashMap<>();
        //log4j config
        String property = System.getProperty(propName);
        if (null != property)
            configProps = GeneralUtils.provideKafkaConfig(property);
        return configProps;
    }
    @EventListener
    public void handleEvent(ContextRefreshedEvent event) {


        List<TargetSystemDto> targetSystemDtoList = new ArrayList<>();
        TargetSystemDto targetSystemDto=new TargetSystemDto();
        targetSystemDto.setConsumerPropName("targetSystem1PropFilePath");
        targetSystemDto.setSystemName("TARGET-SYSTEM-1");
         targetSystemDto.setWsUrl("https://sample.com.tr:443/sampleRestApi");       
        targetSystemDtoList.add(targetSystemDto);
        targetSystemDto=new TargetSystemDto();
        targetSystemDto.setSystemName("TARGET-SYSTEM-2");
        targetSystemDto.setConsumerPropName("targetSystem2PropFilePath");
      targetSystemDto.setWsUrl("https://sample.com.tr:443/sampleRestApi2");       
        targetSystemDtoList.add(targetSystemDto);
        targetSystemDtoList.forEach(dto -> generateAndStartConsumerGroup(dto));

    }

    private void generateAndStartConsumerGroup(TargetSystemDto dto) {

        Map<String, Object> propMap = configMap(dto.getConsumerPropName());
        DefaultKafkaConsumerFactory<Long, PaymentPojo> factory = new DefaultKafkaConsumerFactory<>(propMap);
        ContainerProperties containerProperties = new ContainerProperties(dto.getTopicName());
        containerProperties.setMessageListener(
                (MessageListener<String, PaymentPojo>) messageObject ->
                {
                    PaymentPojo paymentPojo = messageObject.value();
                   /* for instance do some condition check
                   for the related consumer group and  call  a  rest api which has standart parameters
                   or transfer it to a different topic                
                   ......
                    */
                });
        ConcurrentMessageListenerContainer container =
                new ConcurrentMessageListenerContainer<>(
                        factory,
                        containerProperties);
        container.start();
        log.info("{} consumer is configured and started",dto.getSystemName());


    }

相关文章

Comments are closed.