回顾
- 上一篇我们就简单的介绍了yield的两种场景
- yield生产数据 (生成器)
- yield消费数据 (协程)
Coroutines,Pipelines 以及Dataflow
数据流处理的pipeline [串行方式]
- 我们使用coroutine 可以将数据以pipeline的方式进行处理
send() -> coroutine -> send() -> coroutine -> send() -> coroutine
- 我们将整个的coroutines串行起来,使用.send()方法,将数据一层一层的处理就可以完成整个数的操作流了
数据流的源头
- 整个的数据流的源头应该是一个生产者
- 由数据源头来驱动整个数据流
def source(target):
while not done:
item = produce_item()
....
....
target.send(item)
target.close()
- 但是从技术层面上来讲,这并不是一个coroutine
pipeline slink (数据终端管道)
- 所有的pipeline 必须有一个终点(slink)
send() -> coroutine() -> send() -> slink
- 收集数据所有向它传入的数据并且 处理它们
@coroutine
def slink():
try:
while True:
item = yield
except GeneratorExit:
# Done
print 'the slink has ended now'
一个实际的例子
import time
def coroutine(func):
def wrapper(*args, **kwargs):
# start the func
cr = func(*args, **kwargs)
cr.next()
return cr
return wrapper
def follow(thefile, target):
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
target.send(line)
@coroutine
def grep(partten, target):
while True:
line = yield
if partten in line:
target.send(line)
@coroutine
def printer():
print 'coroutine printer has start '
try:
while True:
line = yield
print line
except GeneratorExit:
print 'coroutine printer has stoped'
thefile = open('test.txt', 'r')
follow(
thefile, grep('python', printer())
)
- 上面的代码示意
follow() -> grep() -> printer()
生产数据,传递数据,消费数据
以广播的形式来处理数据
import time
def coroutine(func):
def wrapper(*args, **kwargs):
# start the func
cr = func(*args, **kwargs)
cr.next()
return cr
return wrapper
@coroutine
def grep(partten, target):
while True:
line = yield
if partten in line:
target.send(line)
@coroutine
def printer():
print 'coroutine printer has start '
try:
while True:
line = yield
print line
except GeneratorExit:
print 'coroutine printer has stoped'
@coroutine
def broadcast(targets):
while True:
line = yield
if not line:
time.sleep(0.1)
continue
for target in targets:
target.send(line)
def follow(thefile, broadcast):
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
broadcast.send(line)
thefile = open('test.txt', 'r')
follow(
thefile, broadcast([
grep('python', printer()),
grep('ssssss', printer()),
grep('good', printer())
])
)
图解
- 上面的例子相当于生成了3个管道再处理数据
- 如果只需要一个slink(最后落地的数据管道来处理呢)
分析
- 协程提供了很多强大的数据流的功能,这可比单纯的迭代器强多了
- 如果你要编写一些数据处理组件,你可以依靠上面的这些例子,来做分支,数据管道,或者合流等操作