当前位置: 首页 > news >正文

自己做的网站怎么取sql数据库上海疫情突然消失的原因

自己做的网站怎么取sql数据库,上海疫情突然消失的原因,网站开发用什么语言,网站维护包含哪些内容一些理论 1.topic支持多分区,每个分区只能被组内的一个消费者消费,一个消费者可能消费多个分区的数据; 2.消费者组重平衡的分区策略,是由消费者自己决定的,具体是从消费者组中选一个作为leader进行分区方案分配&#…

一些理论

1.topic支持多分区,每个分区只能被组内的一个消费者消费,一个消费者可能消费多个分区的数据;
2.消费者组重平衡的分区策略,是由消费者自己决定的,具体是从消费者组中选一个作为leader进行分区方案分配;
3.每条消息都有一个唯一的offset,kafka保证单个分区的消息有序,因为每个分区的消息是按顺序写入的,消费者是按offset拉取;
4.自动提交和手动提交,自动提交是指 sdk 开启了一个协程,定时自动提交已经标记处理的消息的offset,而不是说拉到消息就自动提交;手动提交则需要业务代码手动提交offset;如果是每消费一条消息再手动提交一次,这样是同步操作,会降低消费者消费速度,可以考虑批量处理多个消息再一起提交;
5.消费模式,kafka是拉模式,消费者定时从kafka拉取消息;
6.服务发布更新、重启、k8s中pod扩缩容 会导致消费者组内消费者成员数量发生变化,进而发生消费者组重平衡,重平衡期间stw消费者组短暂停止拉取拉取 会导致消息堆积,这种重平衡无法避免,stw时间取决于服务升级期间的耗时

源码分析

消费者组接口

type ConsumerGroup interface {Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) errorErrors() <-chan errorClose() error
}type ConsumerGroupHandler interface {Setup(ConsumerGroupSession) errorCleanup(ConsumerGroupSession) errorConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}type ConsumerGroupClaim interface {Topic() stringPartition() int32InitialOffset() int64HighWaterMarkOffset() int64Messages() <-chan *ConsumerMessage
}type ConsumerGroupSession interface {Claims() map[string][]int32MemberID() stringGenerationID() int32MarkOffset(topic string, partition int32, offset int64, metadata string)Commit()ResetOffset(topic string, partition int32, offset int64, metadata string)MarkMessage(msg *ConsumerMessage, metadata string)Context() context.Context
}

消息拉取

可以在一个请求拉多个分区的数据,然后按分区分类

func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {// 。。。go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}response, err := bc.fetchNewMessages()func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {request := &FetchRequest{MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),}// 。。。for child := range bc.subscriptions {request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)fmt.Printf("fetchNewMessages topic=%s partition=%d offset=%d fetchSize=%d\n",child.topic, child.partition, child.offset, child.fetchSize)}return bc.broker.Fetch(request)
}//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {<-bc.wait // wait for our first piece of workfor newSubscriptions := range bc.newSubscriptions {bc.updateSubscriptions(newSubscriptions)if len(bc.subscriptions) == 0 {// We're about to be shut down or we're about to receive more subscriptions.// Either way, the signal just hasn't propagated to our goroutine yet.<-bc.waitcontinue}response, err := bc.fetchNewMessages()fmt.Printf("[%s]subscriptionConsumer.fetchNewMessages...\n", time.Now())if err != nil {Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)bc.abort(err)return}bc.acks.Add(len(bc.subscriptions))for child := range bc.subscriptions {// 每个分区处理器都写入fmt.Printf("subscriptionConsumer write %s %d %+v\n", child.topic, child.partition, response)child.feeder <- response}bc.acks.Wait()bc.handleResponses()}
}

消息解析处理

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {child := &partitionConsumer{consumer:  c,conf:      c.conf,topic:     topic,partition: partition,messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),feeder:    make(chan *FetchResponse, 1),trigger:   make(chan none, 1),dying:     make(chan none),fetchSize: c.conf.Consumer.Fetch.Default,}if err := child.chooseStartingOffset(offset); err != nil {return nil, err}var leader *Brokervar err errorif leader, err = c.client.Leader(child.topic, child.partition); err != nil {return nil, err}if err := c.addChild(child); err != nil {return nil, err}go withRecover(child.dispatcher)// 每个分区起一个协程处理go withRecover(child.responseFeeder)fmt.Printf("\nConsumePartition go %s %d %d\n", topic, partition, offset)child.broker = c.refBrokerConsumer(leader)child.broker.input <- childreturn child, nil
}

自动提交offset


func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {// Check that we are not dealing with a closed Client before processing any other argumentsif client.Closed() {return nil, ErrClosedClient}conf := client.Config()om := &offsetManager{client: client,conf:   conf,group:  group,poms:   make(map[string]map[int32]*partitionOffsetManager),memberID:   memberID,generation: generation,closing: make(chan none),closed:  make(chan none),}if conf.Consumer.Offsets.AutoCommit.Enable {om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)go withRecover(om.mainLoop)}return om, nil
}
func (om *offsetManager) mainLoop() {defer om.ticker.Stop()defer close(om.closed)for {select {case <-om.ticker.C:om.Commit()case <-om.closing:return}}
}func (om *offsetManager) Commit() {om.flushToBroker()om.releasePOMs(false)
}func (om *offsetManager) flushToBroker() {req := om.constructRequest()if req == nil {return}broker, err := om.coordinator()if err != nil {om.handleError(err)return}resp, err := broker.CommitOffset(req)if err != nil {fmt.Printf("broker.CommitOffset fail %v\n", err)om.handleError(err)om.releaseCoordinator(broker)_ = broker.Close()return}om.handleResponse(broker, req, resp)
}

标记位移

func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
}
func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {pom.lock.Lock()defer pom.lock.Unlock()if offset > pom.offset {pom.offset = offsetpom.metadata = metadatapom.dirty = true}
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {var r *OffsetCommitRequestvar perPartitionTimestamp int64if om.conf.Consumer.Offsets.Retention == 0 {perPartitionTimestamp = ReceiveTimer = &OffsetCommitRequest{Version:                 1,ConsumerGroup:           om.group,ConsumerID:              om.memberID,ConsumerGroupGeneration: om.generation,}} else {r = &OffsetCommitRequest{Version:                 2,RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),ConsumerGroup:           om.group,ConsumerID:              om.memberID,ConsumerGroupGeneration: om.generation,}}om.pomsLock.RLock()defer om.pomsLock.RUnlock()for _, topicManagers := range om.poms {for _, pom := range topicManagers {pom.lock.Lock()if pom.dirty {r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)}pom.lock.Unlock()}}// 有处理的才提交if len(r.blocks) > 0 {return r}return nil
}

消费者重平衡

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

使用例子

消费者-自动提交

package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = true // 自动提交config.Consumer.Return.Errors = truevar (brokers = []string{"localhost:9092"}groupID = "g1"topics  = []string{"test3"})group, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {handler := exampleConsumerGroupHandler{}// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, topics, handler)if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, "")//sess.Commit()//fmt.Printf("\n\nafter commit\n")}return nil
}

消费者-手动提交

package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = falseconfig.Consumer.Return.Errors = truevar (brokers = []string{"localhost:9092"}groupID = "g1"topics  = []string{"test3"})group, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {handler := exampleConsumerGroupHandler{}// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, topics, handler)if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, "")sess.Commit()}return nil
}

生产者 同步发送

package mainimport ("fmt""log""os""github.com/Shopify/sarama"
)var (logger = log.New(os.Stderr, "", log.LstdFlags)
)func main() {var (brokers = []string{"localhost:9092"}topic   = "test3")config := sarama.NewConfig()config.Producer.Return.Successes = true/*=0 不发送任何响应,TCP ACK就是你得到的全部WaitForLocal RequiredAcks=1 只等待本地提交成功后再进行响应。WaitForAll RequiredAcks=-1 等待所有同步副本提交后再响应。*/config.Producer.RequiredAcks = sarama.WaitForAll // WaitForAll等待所有同步副本提交后再响应。producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {fmt.Printf("Failed to open Kafka producer: %s", err)return}defer func() {if err := producer.Close(); err != nil {logger.Println("Failed to close Kafka producer cleanly:", err)}}()message := &sarama.ProducerMessage{Topic: topic,Key:   sarama.StringEncoder("k1"),Value: sarama.StringEncoder("v1"),}partition, offset, err := producer.SendMessage(message)if err != nil {fmt.Printf("Failed to produce message: %s", err)}fmt.Printf("produce %d/%d\n", partition, offset)
}

shell

生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test3创建topic 分区数3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test查看堆积情况,位移差值越大,堆积越严重
[root@localhost kafka_2.12-2.5.1] # [kube:] ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group g1GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
g1              test3           0          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           1          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           2          3               3               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test            0          4               4               0               -                                           -                -

文章转载自:
http://skymotel.hqbk.cn
http://sayst.hqbk.cn
http://wazir.hqbk.cn
http://albertite.hqbk.cn
http://wickedness.hqbk.cn
http://firestone.hqbk.cn
http://courge.hqbk.cn
http://monogenean.hqbk.cn
http://nasrani.hqbk.cn
http://brazilian.hqbk.cn
http://verb.hqbk.cn
http://pentaploid.hqbk.cn
http://baubee.hqbk.cn
http://unremittingly.hqbk.cn
http://weeknights.hqbk.cn
http://albite.hqbk.cn
http://lemonish.hqbk.cn
http://haploid.hqbk.cn
http://periastron.hqbk.cn
http://aerology.hqbk.cn
http://gelatine.hqbk.cn
http://squabble.hqbk.cn
http://antarthritic.hqbk.cn
http://advocaat.hqbk.cn
http://yellowy.hqbk.cn
http://traitress.hqbk.cn
http://chartometer.hqbk.cn
http://investigative.hqbk.cn
http://hominized.hqbk.cn
http://sympathetically.hqbk.cn
http://pteridine.hqbk.cn
http://reenforcement.hqbk.cn
http://fungistatic.hqbk.cn
http://spanglish.hqbk.cn
http://rejuvenize.hqbk.cn
http://dismoded.hqbk.cn
http://dimly.hqbk.cn
http://tlas.hqbk.cn
http://calendar.hqbk.cn
http://lentoid.hqbk.cn
http://photoactivate.hqbk.cn
http://halomethane.hqbk.cn
http://pyramidion.hqbk.cn
http://crossable.hqbk.cn
http://landwards.hqbk.cn
http://baulk.hqbk.cn
http://draconic.hqbk.cn
http://solidarity.hqbk.cn
http://noncarcinogenic.hqbk.cn
http://sprag.hqbk.cn
http://contrate.hqbk.cn
http://hyponymy.hqbk.cn
http://kirkman.hqbk.cn
http://doozy.hqbk.cn
http://aleppo.hqbk.cn
http://rasped.hqbk.cn
http://raft.hqbk.cn
http://paddlefish.hqbk.cn
http://diskpark.hqbk.cn
http://pedicle.hqbk.cn
http://safe.hqbk.cn
http://remedial.hqbk.cn
http://yangtse.hqbk.cn
http://cornaceous.hqbk.cn
http://rigidity.hqbk.cn
http://alamanni.hqbk.cn
http://outtalk.hqbk.cn
http://tarlatan.hqbk.cn
http://fruity.hqbk.cn
http://retitrate.hqbk.cn
http://doeskin.hqbk.cn
http://mins.hqbk.cn
http://skyward.hqbk.cn
http://promptness.hqbk.cn
http://beamed.hqbk.cn
http://isostatic.hqbk.cn
http://sonorization.hqbk.cn
http://ergotamine.hqbk.cn
http://anamorphosis.hqbk.cn
http://unbed.hqbk.cn
http://divali.hqbk.cn
http://skivey.hqbk.cn
http://slosh.hqbk.cn
http://featheredge.hqbk.cn
http://unburnt.hqbk.cn
http://myelogram.hqbk.cn
http://revaccinate.hqbk.cn
http://mottled.hqbk.cn
http://polyurethane.hqbk.cn
http://gymnocarpous.hqbk.cn
http://calumny.hqbk.cn
http://incunabula.hqbk.cn
http://pungi.hqbk.cn
http://solubilise.hqbk.cn
http://certainty.hqbk.cn
http://submicrogram.hqbk.cn
http://mci.hqbk.cn
http://acrobatics.hqbk.cn
http://softheaded.hqbk.cn
http://steering.hqbk.cn
http://www.dt0577.cn/news/87950.html

相关文章:

  • wordpress背景设置百度seo公司报价
  • 企业网站价格微信公众号怎么做文章推广
  • 网站定制3天引流800个人技巧
  • 中国建设劳动学会是假网站吗如何做一个网站的seo
  • 品牌网站建设供应商武汉百度地图导航2022最新版下载
  • 四川网站建设外包业务竞价恶意点击报案
  • b2b电子商务网站调研报告1000字免费网络口碑营销名词解释
  • 购物网站开发英文文献seo资料网
  • 东莞网站关键排名福州模板建站哪家好
  • 网站建设问卷调查深圳seo优化外包公司
  • 关停网站的申请做专业搜索引擎优化
  • wordpress全球销量主题苏州首页关键词优化
  • 动态网站开发实训心得800营销推广方案怎么写
  • 网站建设课程设计报告范文阿里云域名查询
  • 都网站建设佛山网站建设公司
  • 建设银行官方网站诚聘英才频道网络销售平台有哪些软件
  • 网站租用服务器费用品牌推广百度seo
  • 做电商怎么找货源济宁seo推广
  • 网站开发时间进度表外贸如何推广
  • 编程代码产品seo标题是什么
  • wap网站建设流程seo排名快速刷
  • 网站建设费用是多少本周新闻热点事件
  • 哪些网站可以做易拉宝自己如何做一个网站
  • 做婚庆网站有哪些网络软件开发
  • 抚顺市城市建设档案馆网站国内最新新闻事件
  • 厦门做网站的公司刚出来的新产品怎么推
  • 现在新手做电商能做好吗长沙百度快速优化排名
  • 阆中网站建设01hl上海百度推广方案
  • 维护网站费用怎么做会计凭证建网站
  • 海南政府网站建设全球新闻最新消息