从数据处理到并发编程
- 协程和生成器的关系
- 你可以合理的编写一些组件来连接协程和生成器
- 你可以自己做一个数据流,工作流的方式来处理这种场景下的数据
- 你可以自己编写一个事件驱动系统
一个常见的主题
- 将数据交由协程处理
- 将数据交由线程处理(通过queue)
- 将数据交由进程处理(通过消息)
线程和协程共同处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# -*- coding: utf-8 -*- from threading import Thread from queue import Queue def coroutine(func): def wrapper(*args, **kwargs): # start the func cr = func(*args, **kwargs) cr.next() return cr return wrapper def do_print(): while True: item = yield print item @coroutine def printer(): print 'coroutine printer has start ' try: while True: line = yield print line except GeneratorExit: print 'coroutine printer has stoped' @coroutine def threaded(target): messages = Queue() def run_target(): while True: item = messages.get() if item is GeneratorExit: target.close() return else: target.send(item) Thread(target=run_target).start() try: while True: item = yield messages.put(item) except GeneratorExit: messages.put(GeneratorExit) def do_thread(): s = threaded(printer()) s.send(1) for p in xrange(1000): s.send(p) do_thread() |
输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 |
... 990 991 992 993 994 995 996 997 998 999 coroutine printer has stoped |
我们首先实例化一个协程,并在func里面启动线程处理,并且不停的往message 里面写入要处理的数据,协程接收数据,线程处理数据
反思分析
- 我们不是写的while true的形式吗?为什么程序会自己结束?
- 如果不想结束,应该怎么写
我们知道,函数自己有一个结束的标志位,当整个的函数运行完时,会自己触发退出时,这样会默认的给里面的协程发送GeneratorExit,如果我们换成这种写法,不去定义一个do_thread的函数
1 2 3 4 5 |
s = threaded(printer()) s.send(1) for p in xrange(1000): s.send(p) |
当前的运行时就会一直存在,那如何退出呢,简单
1 2 3 4 5 6 7 |
s = threaded(printer()) s.send(1) for p in xrange(1000): s.send(p) s.send(GeneratorExit) |
拓展与环境
- 通过协程,你可以很轻松的将各个任务的执行切换到线程,进程或者其它的环境之内
- 协程可以直接当作执行环境