-
Notifications
You must be signed in to change notification settings - Fork 89
2.1 用法例子说明
ydf0509 edited this page Apr 20, 2021
·
1 revision
2.1为简单例子,介绍了入参意义,这里是介绍的是手动实例化生成消费者的方式,这个是本质。
下面的2.2 是装饰器生成消费者的使用方式,这个是简洁一丝丝。
"""
运行后悔自动在你的当前项目根目录下生成一个 distributed_frame_config.py 的文件,在里面修改写配置就好了,框架会自动读取这个文件的配置。
"""
import time
from function_scheduling_distributed_framework import get_consumer
# 主要的消费函数,演示做加法,假设需要花10秒钟。
# 为什么消费举例大多数是time.sleep(n秒钟),主要是用框架用来验证并发和控频。
# 消费函数适合是io密集型和cpu密集型。
# 如果消费函数是个简单地两数求和或者简单print一下hello world,那不需要使用这种分布式任务框架
# 消费函数如果能在1us之内结束运行,那就直接使用调用函数的行方式不香吗,对运行时间很短暂的函数引入分布式任务框架是多此一举。
def f2(a, b):
print(f'消费此消息 {a} + {b} 中。。。。。')
time.sleep(10) # 模拟做某事需要阻塞10秒种,必须用并发绕过此阻塞。
print(f'计算 {a} + {b} 得到的结果是 {a + b}')
# 把消费的函数名传给consuming_function,就这么简单。
# 通过设置broker_kind,一键切换中间件为mq或redis等7种中间件或包。
# 额外参数支持超过10种控制功能,celery支持的控制方式,都全部支持。
# 这里演示使用本地持久化队列,本机多个脚本之间可以相互通信共享任务,无需安装任何中间件,降低初次使用门槛。
# 框架使用很简单,全部源码的函数和类都不需要深入了解,只需要看懂get_consumer这一个函数的参数就可以就可以。
"""
使用工厂模式再包一层,通过设置数字来生成基于不同中间件或包的consumer。
:param queue_name: 队列名字。
:param consuming_function: 处理消息的函数。 指定队列名字和指定消费函数这两个参数是必传,必须指定,
这2个是这个消费框架的本质核心参数,其他参数都是可选的。
:param function_timeout : 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
:param concurrent_num:并发数量,
:param specify_concurrent_pool:使用指定的线程池(协程池),可以多个消费者共使用一个线程池,不为None时候。threads_num失效
:param concurrent_mode:并发模式,1线程 2gevent 3eventlet 4 asyncio
:param max_retry_times: 最大自动重试次数,当函数发生错误,立即自动重试运行n次,对一些特殊不稳定情况会有效果。
可以在函数中主动抛出重试的异常ExceptionForRetry,框架也会立即自动重试。
主动抛出ExceptionForRequeue异常,则当前消息会重返中间件。
:param log_level:框架的日志级别,默认是debug级别,可以看到详细的执行信息,如果不想看到太多详细的日志,可以设置为logging.INFO常量(20) 或者数字20。
:param is_print_detail_exception:是否打印详细的堆栈错误。为0则打印简略的错误占用控制台屏幕行数少。
:param msg_schedule_time_intercal:消息调度的时间间隔,用于控频的关键。
:param qps:指定1秒内的函数执行次数,qps会覆盖msg_schedule_time_intercal,以后废弃msg_schedule_time_intercal这个参数。
:param msg_expire_senconds:消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
:param is_using_distributed_frequency_control: 是否使用分布式空频(依赖redis计数),默认只对当前实例化的消费者空频有效。假如实例化了2个qps为10的使用同一队列名的消费者,
并且都启动,则每秒运行次数会达到20。如果使用分布式空频则所有消费者加起来的总运行次数是10。
:param is_send_consumer_hearbeat_to_redis 时候将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。
:param logger_prefix: 日志前缀,可使不同的消费者生成不同的日志
:param create_logger_file : 是否创建文件日志
:param do_task_filtering :是否执行基于函数参数的任务过滤
:param task_filtering_expire_seconds:任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 ,
30分钟前发布过1 + 2 的任务,现在仍然执行,
如果是30分钟以内发布过这个任务,则不执行1 + 2,现在把这个逻辑集成到框架,一般用于接口价格缓存。
:param is_consuming_function_use_multi_params 函数的参数是否是传统的多参数,不为单个body字典表示多个参数。
:param is_do_not_run_by_specify_time_effect :是否使不运行的时间段生效
:param do_not_run_by_specify_time :不运行的时间段
:param schedule_tasks_on_main_thread :直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
:param function_result_status_persistance_conf :配置。是否保存函数的入参,运行结果和运行状态到mongodb。
这一步用于后续的参数追溯,任务统计和web展示,需要安装mongo。
:param is_using_rpc_mode 是否使用rpc模式,可以在发布端获取消费端的结果回调,但消耗一定性能,使用async_result.result时候会等待阻塞住当前线程。。
:param broker_kind:中间件种类,。 0 使用pika链接rabbitmqmq,1使用rabbitpy包实现的操作rabbitmnq,2使用redis,
3使用python内置Queue,4使用amqpstorm包实现的操作rabbitmq,5使用mongo,6使用本机磁盘持久化。
7使用nsq,8使用kafka,9也是使用redis但支持消费确认。10为sqlachemy,支持mysql sqlite postgre oracel sqlserver
11使用rocketmq. 12使用redis的 stream 数据结作为中间件,这个也支持消费确认 。13 zeromq
:return AbstractConsumer
'''
有人会抱怨入参超多很复杂,是因为要实现一切控制方式,实现的运行控制手段非常丰富,所以参数就会多。
看这个里面的参数解释非常重要,几乎能想到的控制功能全部都有。比如有人说日志太多,不想看那么详细的提示日志
,早就通过参数提供实现了,自己抱怨参数多又以为没提供这个功能,简直是自相矛盾。
想入参参数少那就看readme的6.5 “新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理。“
这个例子的框架啥控制手段都没有,参数自然就很少。
'''
"""
consumer = get_consumer('queue_test2', consuming_function=f2, broker_kind=6)
# 推送需要消费的任务,可以变消费边推送。发布的内容字典需要和函数所能接收的参数一一对应,
# 并且函数参数需要能被json序列化,不要把自定义的类型作为消费函数的参数。
consumer.publisher_of_same_queue.clear()
[consumer.publisher_of_same_queue.publish(dict(a=i,b=i * 2)) for i in range(100)]
# 开始从中间件循环取出任务,使用指定的函数消费中间件里面的消息。
consumer.start_consuming_message()