consumer
代碼如下,其他還有一部分為mvc的代碼
package com.settlement.kafka;
import org.springframework.kafka.annotation.KafkaListener;
public class Consumer {
@KafkaListener(topics = {"${kafka.topic}"})
public void consumer(String msg){
System.out.println(msg);
}
}
package com.settlement.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-server}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(5000);
return factory;
}
@Bean
public ConsumerFactory<String,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String,Object> consumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//自行控制提交offset
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");//提交延遲毫秒數
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");//執(zhí)行超時時間
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"dsafas");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//開始消費位置
return propsMap;
}
@Bean
public Consumer listener(){
return new Consumer();
}
}
Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2018-02-26 20:47:26.256 ERROR 47792 --- [ main] o.s.b.SpringApplication : Application startup failed
java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:463) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(MessagingMessageListenerAdapter.java:106) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.<init>(RecordMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListenerInstance(MethodKafkaListenerEndpoint.java:172) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:374) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:359) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:246) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:243) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at
settlement.Application.main(Application.java:12) [classes/:?]
2018-02-26 20:47:26.261 INFO 47792 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
2018-02-26 20:47:26.263 WARN 47792 --- [ main] ationConfigEmbeddedWebApplicationContext : Exception thrown from LifecycleProcessor on context close
java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
at org.springframework.context.support.AbstractApplicationContext.getLifecycleProcessor(AbstractApplicationContext.java:427) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:999) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:958) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:750) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at settlement.Application.main(Application.java:12) [classes/:?]
2018-02-26 20:47:26.264 INFO 47792 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
Process finished with exit code 1
北大青鳥APTECH成立于1999年。依托北京大學優(yōu)質雄厚的教育資源和背景,秉承“教育改變生活”的發(fā)展理念,致力于培養(yǎng)中國IT技能型緊缺人才,是大數據專業(yè)的國家
達內教育集團成立于2002年,是一家由留學海歸創(chuàng)辦的高端職業(yè)教育培訓機構,是中國一站式人才培養(yǎng)平臺、一站式人才輸送平臺。2014年4月3日在美國成功上市,融資1
北大課工場是北京大學校辦產業(yè)為響應國家深化產教融合/校企合作的政策,積極推進“中國制造2025”,實現中華民族偉大復興的升級產業(yè)鏈。利用北京大學優(yōu)質教育資源及背
博為峰,中國職業(yè)人才培訓領域的先行者
曾工作于聯(lián)想擔任系統(tǒng)開發(fā)工程師,曾在博彥科技股份有限公司擔任項目經理從事移動互聯(lián)網管理及研發(fā)工作,曾創(chuàng)辦藍懿科技有限責任公司從事總經理職務負責iOS教學及管理工作。
浪潮集團項目經理。精通Java與.NET 技術, 熟練的跨平臺面向對象開發(fā)經驗,技術功底深厚。 授課風格 授課風格清新自然、條理清晰、主次分明、重點難點突出、引人入勝。
精通HTML5和CSS3;Javascript及主流js庫,具有快速界面開發(fā)的能力,對瀏覽器兼容性、前端性能優(yōu)化等有深入理解。精通網頁制作和網頁游戲開發(fā)。
具有10 年的Java 企業(yè)應用開發(fā)經驗。曾經歷任德國Software AG 技術顧問,美國Dachieve 系統(tǒng)架構師,美國AngelEngineers Inc. 系統(tǒng)架構師。