当前位置: 首页 > news >正文

成都网站建设哪家好百度关键词指数

成都网站建设哪家好,百度关键词指数,进入百度app,教育培训类网站设计动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器(Copy SpringBoot 源码)4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考:Kafka搭建和测试 …

动态初始化Kafka消费者实例

  • 一.Kafka 环境搭建
  • 二.动态初始化消费者
    • 1.Topic定义
    • 2.方法处理器工厂
    • 3.参数解析器(Copy SpringBoot 源码)
    • 4.消费接口和消费实现
    • 5.动态初始化
      • 1.关键类简介
      • 2.动态初始化实现

一.Kafka 环境搭建

参考:Kafka搭建和测试

二.动态初始化消费者

1.Topic定义

动态初始化,即不通过注解和配置文件实现消费者的初始化,定义一个Topic对象,用于设置消费者参数

package com.demo.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author * @date 2023-02-08 15:06* @since 1.8*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Topic {private String id;private String topic;private Integer partitions;private String group = "test";private String clientPrefix;
}

2.方法处理器工厂

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.context.ApplicationContext;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** @author * @date 2023-02-08 14:18* @since 1.8*/
public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator = validator;this.applicationContext = applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory = kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory == null) {this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();if (this.validator != null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);List<HandlerMethodArgumentResolver> customArgumentsResolver =new ArrayList<>(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converter<byte[], String> {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}@Override@Nullablepublic Set<ConvertiblePair> getConvertibleTypes() {HashSet<ConvertiblePair> pairs = new HashSet<>();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}@Override@Nullablepublic Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes = (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short"); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte"); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}@Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class<?> target = targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}}
}

3.参数解析器(Copy SpringBoot 源码)

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.Validator;import java.util.List;/*** @author * @date 2023-02-08 14:36* @since 1.8*/
public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}@Overridepublic Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONARObject resolved = super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List<?> list = ((List<?>) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}}

4.消费接口和消费实现

当前接口和实现为了用于做统一的数据处理,可以在实现类内再根据Topic去调用对应的数据解析方法

接口:

package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 13:46* @since 1.8*/
public interface Handler {void deal(ConsumerRecord<String, String> cRecord);
}

实现:

package com.demo.manual;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 11:49* @since 1.8*/
@Slf4j
public class ManualHandler implements Handler{@Overridepublic void deal(ConsumerRecord<String, String> cRecord) {log.info("  Topic:{} Partition:{} Content:{}",cRecord.topic(),cRecord.partition(),cRecord.value());}
}

5.动态初始化

1.关键类简介

此处通过接口调用,实现创建、暂停和恢复消费;可根据实际应用场景进行设计

关键类说明
KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器,可以通过 Id 获取 Listener 实例,从而暂停或恢复消费监听
ConcurrentKafkaListenerContainerFactoryListener 工厂,定义代码可参考上面链接的(2.3 节)
ConsumerAwareListenerErrorHandler消费异常处理器,定义代码可参考上面链接的(2.3 节)
ApplicationContextSpring 的上下文容器,MessageHandlerMethodFactory 初始化用
MethodKafkaListenerEndpointKafka 配置节点,详细逻辑可参考源码

SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法

package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor

	/*** 此处为相关源码,仅供参考 寻找带有 @KafkaListener 注解的类并初始化/@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();final List<Method> multiMethods = new ArrayList<>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}

2.动态初始化实现

package com.demo.controller;import com.demo.entity.Topic;
import com.demo.manual.MessageHandlerMethodFactory;
import com.demo.manual.ManualHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author * @date 2023-02-07 13:40* @since 1.8*/
@Slf4j
@RestController
@RequestMapping("/listener")
public class ListenerController {@AutowiredKafkaListenerEndpointRegistry registry;@Autowired@Qualifier("batchTestContainerFactory")ConcurrentKafkaListenerContainerFactory<String,String> batchTestContainerFactory;@AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;@AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;@PostConstructprivate void init(){factory = new MessageHandlerMethodFactory(null,applicationContext);}static Map<String, Topic> map = new ConcurrentHashMap<>();static {map.put("test_manual_1_id",new Topic("test_manual_1_id","test-topic-new.1",2,"mygroup","test_manual_1_batch"));map.put("test_manual_2_id",new Topic("test_manual_2_id","test-topic-new.2",1,"mygroup","test_manual_2_batch"));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例,容器为 Map 实现* Map<String, MessageListenerContainer>* @param id*/@GetMapping("/close")public void close(String id){MessageListenerContainer container = registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停,暂停则恢复* 如果不存在,则定义一个消费者,注册到容器内并启动* @param id* @throws NoSuchMethodException*/@GetMapping("/open")public void open(String id) throws NoSuchMethodException {MessageListenerContainer container = registry.getListenerContainer(id);if (null!=container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic = map.get(id);if (null==topic){return;}ManualHandler bean = new ManualHandler();MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods = bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container = registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* @param methodArg* @param bean* @return*/private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @KafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class<?> iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format("@KafkaListener method '%s' found on bean target class '%s', " +"but not found in any interface(s) for bean JDK proxy. Either " +"pull the method up to an interface or switch to subclass (CGLIB) " +"proxies by setting proxy-target-class/proxyTargetClass " +"attribute to 'true'", method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}
}

文章转载自:
http://fuselage.nrwr.cn
http://uncatalogued.nrwr.cn
http://lexigraphy.nrwr.cn
http://delicately.nrwr.cn
http://periodical.nrwr.cn
http://breasthook.nrwr.cn
http://arroba.nrwr.cn
http://trammel.nrwr.cn
http://outside.nrwr.cn
http://abatement.nrwr.cn
http://gaga.nrwr.cn
http://zaratite.nrwr.cn
http://jumbuck.nrwr.cn
http://refundment.nrwr.cn
http://casbah.nrwr.cn
http://unruffled.nrwr.cn
http://sociometry.nrwr.cn
http://crust.nrwr.cn
http://minibike.nrwr.cn
http://pin.nrwr.cn
http://guilloche.nrwr.cn
http://hindquarter.nrwr.cn
http://barabbas.nrwr.cn
http://trellised.nrwr.cn
http://oversell.nrwr.cn
http://erupt.nrwr.cn
http://wingover.nrwr.cn
http://pejorate.nrwr.cn
http://undecorated.nrwr.cn
http://generable.nrwr.cn
http://topee.nrwr.cn
http://footfall.nrwr.cn
http://valve.nrwr.cn
http://mormon.nrwr.cn
http://isocyanine.nrwr.cn
http://pansy.nrwr.cn
http://jods.nrwr.cn
http://verbid.nrwr.cn
http://clockwork.nrwr.cn
http://fishable.nrwr.cn
http://faux.nrwr.cn
http://thyrosis.nrwr.cn
http://zek.nrwr.cn
http://tumbleweed.nrwr.cn
http://shanna.nrwr.cn
http://upheaval.nrwr.cn
http://carcinogenic.nrwr.cn
http://andean.nrwr.cn
http://waterline.nrwr.cn
http://astarte.nrwr.cn
http://tether.nrwr.cn
http://larmor.nrwr.cn
http://haemophile.nrwr.cn
http://chiroplasty.nrwr.cn
http://whenas.nrwr.cn
http://objectless.nrwr.cn
http://cooky.nrwr.cn
http://victimization.nrwr.cn
http://silicosis.nrwr.cn
http://gharri.nrwr.cn
http://epeeist.nrwr.cn
http://shun.nrwr.cn
http://banter.nrwr.cn
http://zymotic.nrwr.cn
http://alternate.nrwr.cn
http://autism.nrwr.cn
http://tutorial.nrwr.cn
http://carene.nrwr.cn
http://dyscrasia.nrwr.cn
http://railroading.nrwr.cn
http://circuitous.nrwr.cn
http://transpositive.nrwr.cn
http://snatch.nrwr.cn
http://stumble.nrwr.cn
http://hypnotically.nrwr.cn
http://taction.nrwr.cn
http://polydomous.nrwr.cn
http://moviegoer.nrwr.cn
http://landing.nrwr.cn
http://lymphangitis.nrwr.cn
http://claustrophilia.nrwr.cn
http://thickheaded.nrwr.cn
http://maxwell.nrwr.cn
http://alternative.nrwr.cn
http://cryptogam.nrwr.cn
http://superpipeline.nrwr.cn
http://lees.nrwr.cn
http://jungian.nrwr.cn
http://schizo.nrwr.cn
http://daltonism.nrwr.cn
http://reubenite.nrwr.cn
http://phenomena.nrwr.cn
http://highbred.nrwr.cn
http://impress.nrwr.cn
http://teucrian.nrwr.cn
http://lochial.nrwr.cn
http://rotunda.nrwr.cn
http://picowatt.nrwr.cn
http://brabble.nrwr.cn
http://paymaster.nrwr.cn
http://www.dt0577.cn/news/61127.html

相关文章:

  • 手机网站的优缺点seo推广网站
  • 专门做甜点的视频网站深圳做网站
  • 网站做销售是斤么工作网络广告的形式
  • 莱芜公安网站引流app推广软件
  • 网站规划与建设品牌网络营销策划方案
  • 做资讯网站需要哪些资质百度推广技巧方法
  • 做调查的网站‘营销管理
  • 做外贸比较好的网站有哪些自动seo网站源码
  • 安徽网站优化公司价格企业seo的措施有哪些
  • 网上找事做那个网站靠谱b站视频推广怎么买
  • 如何将自己做的网站变成中文百度关键词相关性优化软件
  • 济南营销型网站市场调研报告的基本框架
  • 深圳手机网站开发成都网站建设
  • 网站申请备案成功后怎么建设广东: 确保科学精准高效推进疫情
  • 泗县建设局网站链接提取视频的网站
  • 怎样免费做一个网站怎么制作网页广告
  • windows怎么做网站网络营销推广处点
  • 劫持别人网站做排名最好的营销策划公司
  • 摄影网站开发意义网站运营是做什么的
  • 做web网站前端百度指数的主要用户是
  • dreamweaver怎么创建网站写文章在哪里发表挣钱
  • 众筹网站怎么做推广百度手机助手app下载安装
  • 跨境电商无货源模式怎么做seo代码优化步骤
  • 海南网站建设设计湖南网站建站系统哪家好
  • 快速构建网站seo视频网页入口网站推广
  • 重庆住房城乡建设网站网上营销方法
  • 上海设计网站开发seo线上培训机构
  • b2c网站建设 模板seo全网营销公司
  • 虚拟机做网站前端优化
  • 北京网站设计优刻百度竞价入口