回顾
- 上一篇我们就简单的介绍了yield的两种场景
- yield生产数据 (生成器)
- yield消费数据 (协程)
Coroutines,Pipelines 以及Dataflow
数据流处理的pipeline [串行方式]
- 我们使用coroutine 可以将数据以pipeline的方式进行处理
send() -> coroutine -> send() -> coroutine -> send() -> coroutine
-
我们将整个的coroutines串行起来,使用.send()方法,将数据一层一层的处理就可以完成整个数的操作流了
数据流的源头
- 整个的数据流的源头应该是一个生产者
- 由数据源头来驱动整个数据流
1 2 3 4 5 6 7 8 |
def source(target): while not done: item = produce_item() .... .... target.send(item) target.close() |
- 但是从技术层面上来讲,这并不是一个coroutine
pipeline slink (数据终端管道)
- 所有的pipeline 必须有一个终点(slink)
send() -> coroutine() -> send() -> slink
- 收集数据所有向它传入的数据并且 处理它们
1 2 3 4 5 6 7 8 9 |
@coroutine def slink(): try: while True: item = yield except GeneratorExit: # Done print 'the slink has ended now' |
一个实际的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import time def coroutine(func): def wrapper(*args, **kwargs): # start the func cr = func(*args, **kwargs) cr.next() return cr return wrapper def follow(thefile, target): while True: line = thefile.readline() if not line: time.sleep(0.1) continue target.send(line) @coroutine def grep(partten, target): while True: line = yield if partten in line: target.send(line) @coroutine def printer(): print 'coroutine printer has start ' try: while True: line = yield print line except GeneratorExit: print 'coroutine printer has stoped' thefile = open('test.txt', 'r') follow( thefile, grep('python', printer()) ) |
- 上面的代码示意
follow() -> grep() -> printer()
生产数据,传递数据,消费数据
以广播的形式来处理数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
import time def coroutine(func): def wrapper(*args, **kwargs): # start the func cr = func(*args, **kwargs) cr.next() return cr return wrapper @coroutine def grep(partten, target): while True: line = yield if partten in line: target.send(line) @coroutine def printer(): print 'coroutine printer has start ' try: while True: line = yield print line except GeneratorExit: print 'coroutine printer has stoped' @coroutine def broadcast(targets): while True: line = yield if not line: time.sleep(0.1) continue for target in targets: target.send(line) def follow(thefile, broadcast): while True: line = thefile.readline() if not line: time.sleep(0.1) continue broadcast.send(line) thefile = open('test.txt', 'r') follow( thefile, broadcast([ grep('python', printer()), grep('ssssss', printer()), grep('good', printer()) ]) ) |
图解
分析
- 协程提供了很多强大的数据流的功能,这可比单纯的迭代器强多了
- 如果你要编写一些数据处理组件,你可以依靠上面的这些例子,来做分支,数据管道,或者合流等操作