回顾

  • 上一篇我们就简单的介绍了yield的两种场景
  • yield生产数据 (生成器)
  • yield消费数据 (协程)

Coroutines,Pipelines 以及Dataflow

数据流处理的pipeline [串行方式]

  • 我们使用coroutine 可以将数据以pipeline的方式进行处理 send() -> coroutine -> send() -> coroutine -> send() -> coroutine
  • 我们将整个的coroutines串行起来,使用.send()方法,将数据一层一层的处理就可以完成整个数的操作流了

数据流的源头

  • 整个的数据流的源头应该是一个生产者
  • 由数据源头来驱动整个数据流
    def source(target):
        while not done:
            item = produce_item()
            ....
            ....
            target.send(item)
        target.close()
  • 但是从技术层面上来讲,这并不是一个coroutine
  • 所有的pipeline 必须有一个终点(slink) send() -> coroutine() -> send() -> slink
  • 收集数据所有向它传入的数据并且 处理它们
    @coroutine
    def slink():
        try:
            while True:
                item = yield
        except GeneratorExit:
            # Done
            print 'the slink has ended now'

一个实际的例子

    import time
    
    
    def coroutine(func):
        def wrapper(*args, **kwargs):
            # start the func
            cr = func(*args, **kwargs)
            cr.next()
            return cr
        return wrapper
    
    
    def follow(thefile, target):
        while True:
            line = thefile.readline()
            if not line:
                time.sleep(0.1)
                continue
            target.send(line)
    
    
    @coroutine
    def grep(partten, target):
        while True:
            line = yield
            if partten in line:
                target.send(line)
    
    
    @coroutine
    def printer():
        print 'coroutine printer has start '
        try:
            while True:
                line = yield
                print line
        except GeneratorExit:
            print 'coroutine printer has stoped'
    
    
    thefile = open('test.txt', 'r')
    follow(
        thefile, grep('python', printer())
    )
    
  • 上面的代码示意 follow() -> grep() -> printer() 生产数据,传递数据,消费数据

以广播的形式来处理数据

    import time
    
    
    def coroutine(func):
        def wrapper(*args, **kwargs):
            # start the func
            cr = func(*args, **kwargs)
            cr.next()
            return cr
        return wrapper
    
    
    @coroutine
    def grep(partten, target):
        while True:
            line = yield
            if partten in line:
                target.send(line)
    
    
    @coroutine
    def printer():
        print 'coroutine printer has start '
        try:
            while True:
                line = yield
                print line
        except GeneratorExit:
            print 'coroutine printer has stoped'
    
    @coroutine
    def broadcast(targets):
        while True:
            line = yield
            if not line:
                time.sleep(0.1)
                continue
    
            for target in targets:
                target.send(line)
    
    def follow(thefile, broadcast):
        while True:
            line = thefile.readline()
    
            if not line:
                time.sleep(0.1)
                continue
    
            broadcast.send(line)
    
    
    thefile = open('test.txt', 'r')
    
    follow(
        thefile, broadcast([
            grep('python', printer()),
            grep('ssssss', printer()),
            grep('good', printer())
        ])
    )

图解

  • 上面的例子相当于生成了3个管道再处理数据
  • 如果只需要一个slink(最后落地的数据管道来处理呢)

分析

  • 协程提供了很多强大的数据流的功能,这可比单纯的迭代器强多了
  • 如果你要编写一些数据处理组件,你可以依靠上面的这些例子,来做分支,数据管道,或者合流等操作