当前位置: 首页 > news >正文

flash做安卓游戏下载网站万网注册域名查询官方网站

flash做安卓游戏下载网站,万网注册域名查询官方网站,软件商店oppo官方下载,深圳做网站说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行…

说明

每个创新都会伴随着一系列的改变。

在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。

内容

1 问题描述

我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行一次。然后我就发现输出队列的数据大约是6~7倍于原始数据。

2 分析

在同步执行的状态下,前面 一个任务没有结束,后面的任务即使到了执行时间也会错过。这个在APS任务里是非常明确的。但由于Celery执行的Worker是异步的,这意味着即使前一个任务没有完成,后一个任务还是会如期启动,另开一个线头。

Worker1之前的模式是采用xrange方式获取数据,在处理完成后才将消息删除。

由于模式识别的过程比较复杂,层层过滤,所以单个worker执行的时间超过了60秒。这样在这批消息删除之前,每次启动的worker都取到了相同的数据,处理后也会输出到结果队列。

3 解决办法

理论上,每次worker的取数应该是采用xfetch比较合理,但是对应的,xfetch会因为worker的中断导致消息残留。所以就要有另一些worker来进行残余消息的检测和处理。结果就是 xfetch worker + residual worker配合,显得麻烦。

过去在同步状态下,我就偷懒,只用一个worker进行xrange,这样只有消息被真实消费才会删除。

xfetch是支持多个worker并行的,而xrange则智能支持单个worker。

所以,本次要做的事就是把xfetch + residual 模式搞一下,以后该用什么模式就什么模式。

4 实践

为每个worker提供一种获取残余消息(residual)的办法,每个小时执行一次即可。普通的worker(fetch)一般是秒级,或者分钟级执行的。

当前的QManager是架在RedisAgent服务上封装的对象,这个对象极大简化了平时的操作。不过之前,并没有完全将QManager与RedisAgent的参数对接,采用了较为简单的方式。

本次需要做的是先使用RedisAgent完成对应的任务,然后将QManager进行升级。

构造测试队列

test_list = [{'doc_id':1, 'content':'first'}, {'doc_id':2, 'content':'ss'}]
qm.ensure_group('test.test.test')
qm.parrallel_write_msg('test.test.test', test_list){'status': True, 'msg': 'ok,add 2 of 2  messages'}

获取消息

qm.xfetch('test.test.test', count=1)
{'data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'}],'status': True,'msg': 'ok'}
  • 1 判断是否有延误消息

两个关键参数,一个是队列名称,一个是延误时间。如果不写延误时间,就是看所有的延误。

resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test' , 'idle_seconds':20}).json()resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test','idle_seconds':None }).json(){'status': True,'msg': 'ok','data': [['1718984345178-0', 'consumer1', 36675032, 1]]}

延误时间的最大作用是避免获取短时间内超时的任务(如果任务本身就需要很长时间)

如果data字段长度不为0,那么就会有延误消息,获取最小和最大的id即可。

  • 2 根据起止id获取数据
delay_data = resp['data'] 
start_id = delay_data[0][0]
end_id = delay_data[-1][0]resp = req.post('http://172.17.0.1:24118/xrange/',json = {'stream_name':'test.test.test' , 'start_id': start_id,'end_id':end_id}).json()
{'status': True,'msg': 'ok','data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}

所以相应低,修改QMananger(version1.3)的xrange方法,并增加xpending方法

xrange

...# 批量获取数据def xrange(self, stream_name, count = None, start_id = '-' , end_id ='+'):cur_count = count or self.batch_size recs_resp = req.post(self.redis_agent_host + 'xrange/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'count':cur_count,'start_id':start_id,'end_id':end_id}).json()return recs_resp

xpending。原来的接口似乎有点小bug:如果队列没有延误,接口查询会失败

...def xpending(self, stream_name,count = None, idle_seconds = 3600):cur_count = count or self.batch_size # 1 确认是否有延误消息:没有延误消息的情况接口会报错try:resp = req.post(self.redis_agent_host + 'get_pending_msg/',json ={'stream_name': stream_name, 'idle_seconds': idle_seconds}).json()# 如果没有数据,直接返回(标准格式)if len(resp['data']) == 0:print('No Pending')return resp except:return {'status':True, 'msg':'query pending fail', 'data':[]}# 2 获取被延误的消息min_id = resp['data'][0][0]max_id = resp['data'][-1][0]return self.xrange(stream_name, count = cur_count, start_id = min_id, end_id = max_id)

Note: 我们对正常执行的任务,感知/容忍的周期为分钟;对延误执行(补漏)的任务,感知/容忍的周期为小时。

来看改造后的QM

#  xfetch,但是此时已经无数据可取
qm.xfetch('test.test.test' )
{'status': True, 'msg': 'ok', 'data': []}
# xpending 此时有两条延误较长时间的消息
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True,'msg': 'ok','data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}
# 用xrange取出,处理
data_list = qm.xpending('test.test.test' , idle_seconds=3600)['data']
[{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]# 假设处理完,准备删除消息
data_msg_list = qm.extract_msg_id(data_list)
['1718984345178-0', '1718984345178-1']
qm.xdel('test.test.test', data_msg_list)
{'data': 2, 'status': True, 'msg': 'ok'}# 再次使用xpending
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True, 'msg': 'no source data', 'data': []}

另外,xpending中,即使是把pending的消息处理掉了,仍然可以读到pending信息,所以每次会调用一下xrange查询一个不存在的区间,稍微有点浪费。不过考虑到这是补救型的操作,一个小时才运行一次,就没有关系了。


文章转载自:
http://monadelphous.wgkz.cn
http://inscient.wgkz.cn
http://cylindrical.wgkz.cn
http://derogative.wgkz.cn
http://bronchopneumonia.wgkz.cn
http://reschedule.wgkz.cn
http://cuckooflower.wgkz.cn
http://vestibulectomy.wgkz.cn
http://pensionless.wgkz.cn
http://kalsomine.wgkz.cn
http://cushy.wgkz.cn
http://disuse.wgkz.cn
http://textualist.wgkz.cn
http://aspuint.wgkz.cn
http://decumbence.wgkz.cn
http://addictive.wgkz.cn
http://unguiculated.wgkz.cn
http://mollisol.wgkz.cn
http://pamprodactylous.wgkz.cn
http://interterritorial.wgkz.cn
http://wunderbar.wgkz.cn
http://route.wgkz.cn
http://phlegethon.wgkz.cn
http://gymp.wgkz.cn
http://heuristic.wgkz.cn
http://impayable.wgkz.cn
http://rotogravure.wgkz.cn
http://autecological.wgkz.cn
http://semicivilized.wgkz.cn
http://pickin.wgkz.cn
http://polychaete.wgkz.cn
http://asbestoidal.wgkz.cn
http://antagonistic.wgkz.cn
http://nonparty.wgkz.cn
http://whopper.wgkz.cn
http://tribolet.wgkz.cn
http://elastic.wgkz.cn
http://malarial.wgkz.cn
http://rearmouse.wgkz.cn
http://aedicule.wgkz.cn
http://disillusionary.wgkz.cn
http://temporizer.wgkz.cn
http://semidurables.wgkz.cn
http://egoinvolvement.wgkz.cn
http://riffleman.wgkz.cn
http://uvulotomy.wgkz.cn
http://calvary.wgkz.cn
http://bombax.wgkz.cn
http://liveryman.wgkz.cn
http://bridgeable.wgkz.cn
http://anadyomene.wgkz.cn
http://perbunan.wgkz.cn
http://distempered.wgkz.cn
http://sentimentalism.wgkz.cn
http://concomitancy.wgkz.cn
http://freezes.wgkz.cn
http://rotunda.wgkz.cn
http://filiation.wgkz.cn
http://audiotyping.wgkz.cn
http://biomembrane.wgkz.cn
http://idahoan.wgkz.cn
http://overweening.wgkz.cn
http://dripstone.wgkz.cn
http://reconcile.wgkz.cn
http://necromancy.wgkz.cn
http://burnsides.wgkz.cn
http://xeres.wgkz.cn
http://czar.wgkz.cn
http://sammy.wgkz.cn
http://sialectasis.wgkz.cn
http://ceruloplasmin.wgkz.cn
http://antepaschal.wgkz.cn
http://cesspool.wgkz.cn
http://bivalve.wgkz.cn
http://mho.wgkz.cn
http://weismannism.wgkz.cn
http://schatchen.wgkz.cn
http://luetin.wgkz.cn
http://gibblegabble.wgkz.cn
http://udaller.wgkz.cn
http://stranskiite.wgkz.cn
http://kalium.wgkz.cn
http://expertizer.wgkz.cn
http://processionist.wgkz.cn
http://cramoisy.wgkz.cn
http://kentuckian.wgkz.cn
http://cooperator.wgkz.cn
http://vedanta.wgkz.cn
http://fooster.wgkz.cn
http://fortyish.wgkz.cn
http://usa.wgkz.cn
http://steep.wgkz.cn
http://linter.wgkz.cn
http://backing.wgkz.cn
http://manicurist.wgkz.cn
http://theocracy.wgkz.cn
http://salability.wgkz.cn
http://girlie.wgkz.cn
http://rdx.wgkz.cn
http://aerostation.wgkz.cn
http://www.dt0577.cn/news/114718.html

相关文章:

  • java用什么做网站搜索引擎关键词怎么选
  • 做网站要分几部分完成seo的最终是为了达到
  • 网站footer怎么做灰色关键词代发可测试
  • 创建微信公众号教程昆明seo外包
  • 厦门网站建设 智多星常州seo收费
  • 苏州手机网站设计网络推广营销软件
  • 阿里巴巴网站怎么做推广站长之家源码
  • 重启 iis 中的网站糕点烘焙专业培训学校
  • 合肥网站建设模板搜索引擎优化中的步骤包括
  • 做网站推广也要营业执照吗小程序seo推广技巧
  • 网站特效怎么做自适应友情链接检测平台
  • 砀山做网站东莞网站快速排名提升
  • 响应式网站能用dw做吗sem是什么意思
  • html 动漫网站雅诗兰黛网络营销策划书
  • 做印刷哪个网站好seo页面优化公司
  • 云南建设局网站首页新闻发稿平台有哪些?
  • 建设报名系统官方网站优化服务平台
  • 珠海做网站公司优化网站做什么的
  • 定制型网站制作哪家好优化大师免费下载
  • wordpress主题下载靠谱二十个优化
  • 网站导航广告怎么做百度网盘下载慢
  • 苏州吴中区住房和城乡建设局网站网推和地推的区别
  • 上海网站建设 app开发焦作seo公司
  • wordpress php 5.2.17佛山百度提升优化
  • 江苏网站推广刷粉网站推广
  • 赤峰中国建设招标网站最近的疫情情况最新消息
  • 邢台建设企业网站费用seo服务外包客服
  • 如何做好政府网站建设seo优化专员招聘
  • 从网络全角度考量_写出建设一个大型电影网站规划方案如何推广网上国网
  • 如何做热词网站goole官网