从数据处理到并发编程

  • 协程和生成器的关系
  • 你可以合理的编写一些组件来连接协程和生成器
  • 你可以自己做一个数据流,工作流的方式来处理这种场景下的数据
  • 你可以自己编写一个事件驱动系统

一个常见的主题

  • 将数据交由协程处理
  • 将数据交由线程处理(通过queue)
  • 将数据交由进程处理(通过消息)

线程和协程共同处理

    # -*- coding: utf-8 -*-
    
    from threading import Thread
    from queue import Queue
    
    
    def coroutine(func):
        def wrapper(*args, **kwargs):
            # start the func
            cr = func(*args, **kwargs)
            cr.next()
            return cr
    
        return wrapper
    
    
    def do_print():
        while True:
            item = yield
            print item
    
    
    @coroutine
    def printer():
        print 'coroutine printer has start '
        try:
            while True:
                line = yield
                print line
        except GeneratorExit:
            print 'coroutine printer has stoped'
    
    
    @coroutine
    def threaded(target):
        messages = Queue()
    
        def run_target():
            while True:
                item = messages.get()
                if item is GeneratorExit:
                    target.close()
                    return
                else:
                    target.send(item)
    
        Thread(target=run_target).start()
    
        try:
            while True:
                item = yield
                messages.put(item)
        except GeneratorExit:
            messages.put(GeneratorExit)
    
    
    def do_thread():
        s = threaded(printer())
        s.send(1)
        for p in xrange(1000):
            s.send(p)
    
    
    do_thread()

输出结果

    ...
    990
    991
    992
    993
    994
    995
    996
    997
    998
    999
    coroutine printer has stoped

我们首先实例化一个协程,并在func里面启动线程处理,并且不停的往message 里面写入要处理的数据,协程接收数据,线程处理数据

反思分析

  • 我们不是写的while true的形式吗?为什么程序会自己结束?
  • 如果不想结束,应该怎么写

我们知道,函数自己有一个结束的标志位,当整个的函数运行完时,会自己触发退出时,这样会默认的给里面的协程发送GeneratorExit,如果我们换成这种写法,不去定义一个do_thread的函数

    s = threaded(printer())
    s.send(1)
    for p in xrange(1000):
        s.send(p)

当前的运行时就会一直存在,那如何退出呢,简单

    s = threaded(printer())
    s.send(1)
    for p in xrange(1000):
        s.send(p)
    
    s.send(GeneratorExit)

拓展与环境

  • 通过协程,你可以很轻松的将各个任务的执行切换到线程,进程或者其它的环境之内
  • 协程可以直接当作执行环境