ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Gevent / Celery 에 대하여
    programming 2019. 3. 13. 19:18

    Python framework 를 이용하여 개발을 하다보면 Gevent, Celery를 붙여서 개발할 경우가 있다. 보통 mailing or push 서버처럼 많은 메세지를 처리해야 하는경우 메세지큐를 이용하고 Celery통해 worker들을 생성하여 다량의 메세지를 빠르게 처리 하며 대량의 네트워크 요청을 처리하기 위해 Gevent를 사용하여 성능개선을 한다.
    그리고 이 둘을 결합하여 사용하는 케이스들이 있다.

    하지만 대략적인 개념만 알고 쓰다보니 어느날 Gevent on Celery 구조의 프로젝트를 수정하려다보니 헷갈리는 부분이 있어서 정리하고자 포스팅을 한다.

    What is gevent?

    gevent is a coroutine -based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

    Gevent 공식 홈페이지에 나와있는 설명이다.
    libev 기반의 동시성 라이브러리라는 소개인데 동시성과 libev가 무엇일까?

    동시성이란 파이썬에서 중요한 개념으로 파이썬에서는 병렬처리가 간단하지 않기 때문이다. 많은곳에서 설명하듯이 파이썬 언어 구조적 문제로 GIL로 인한 IO가 동시에 불가능하기 때문에(쓰레드에서 IO를 발생하면 GIL를 들고가야한다. 따라서 1쓰레드에서 IO를 할때 다른 쓰레드에서 IO는 불가능하다) 보통 단일쓰레드 구조라고 많이들 이야기한다. 하지만 이런 문제를 해결하기 위해 Multiprocessing을 통해 퍼포먼스를 개선하고 IO부분은 동시성을 통해 처리를한다.
    따라서 병렬에 한계가 존재하기 때문에 퍼포먼스를 위해서 동시성으로 눈을 돌렸다.

    Concurrency / Coroutine ?

    동시성을 위해 Coroutine(코루틴)을 이용하며 코루틴을 예전 Unity를 잠깐 개발할 일이 있을때 처음 접했던것 같다.
    파이썬을 써본 사람이라면 yield를 써보거나 코드에서 본 적이 있을것이다.
    예를들어 함수 안에서 yield를 호출하게 되면 yield 지점에서 return 과 같이 함수를 호출한 지점으로 제어권이 넘어가고 그 상태로 계속 진행이된다. yield를 호출한 지점으로 돌아가고 싶다면 next를 호출하여 yield 이후 부터 다시 진행을 할 수 있다.
    이처럼 제어권을 넘기고 다시 제어권을 넘겨준 쪽으로 돌아가고 이러한 개념을 코루틴이라 한다.
    #link(python coroutine libs)

    libev

    libev는 동시에 많은 클라이언트의 요청을 처리하기 위한 고성능 어플리케이션에서 사용될 수 있다.
    libev는 이벤트 처리 방식을 eventloop 방식으로 성능개선을 하였으며 eventloop은 이벤트를 처리하는 중 IO 요청이 오면 커널에게 IO 작업 후 호출할 callback을 등록시키고 다른 요청을 처리하는 것 이다.
    libev는 libevent가 만들어진 이후에 생겼으며 file fd가 많아질 수록 libev가 퍼포먼스 측면에서 유리하다고 한다.
    참고 URL

    다시 Gevent..

    Gevent로 non-blocking socket IO를 이용하기 위한 코드는 간단하다.

    from gevent import monkey
    monkey.patch_all()

    보통 monkey patch_all을 통해서 간단하게 사용할 수 있기 때문에 이렇게 많이들 권장한다. 저 코드가 하는일은 non-blocking library로 교체를 해준다.

    def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, httplib=False,
    subprocess=True, sys=False, aggressive=True, Event=False,
    builtins=True, signal=True):
    """
    Do all of the default monkey patching (calls every other applicable
    function in this module).
    .. versionchanged:: 1.1
    Issue a :mod:`warning <warnings>` if this function is called multiple times
    with different arguments. The second and subsequent calls will only add more
    patches, they can never remove existing patches by setting an argument to ``False``.
    .. versionchanged:: 1.1
    Issue a :mod:`warning <warnings>` if this function is called with ``os=False``
    and ``signal=True``. This will cause SIGCHLD handlers to not be called. This may
    be an error in the future.
    """
    # Check to see if they're changing the patched list
    _warnings, first_time = _check_repatching(**locals())
    if not _warnings and not first_time:
    # Nothing to do, identical args to what we just
    # did
    return
    # order is important
    if os:
    patch_os()
    if time:
    patch_time()
    if thread:
    patch_thread(Event=Event)
    # sys must be patched after thread. in other cases threading._shutdown will be
    # initiated to _MainThread with real thread ident
    if sys:
    patch_sys()
    if socket:
    patch_socket(dns=dns, aggressive=aggressive)
    if select:
    patch_select(aggressive=aggressive)
    if ssl:
    patch_ssl()
    if httplib:
    raise ValueError('gevent.httplib is no longer provided, httplib must be False')
    if subprocess:
    patch_subprocess()
    if builtins:

    별도로 networking 관련 모듈을 안쓴다면 patch_all을 해도 상관없지만 예전에 smtp 모듈을 이용하여 메일 발송하는 부분이 있었는데 patch_all을 하니 smtp 프로토콜 모듈에 non-blocking 구현된 모듈이 없어서 patch_all을 쓰지 못했던 경험이 있으니 참고하면 좋을 것 같다.

    socket 통신 외에 동시성으로 처리를 해보자.

    import time
    import gevent
    from gevent import select

    start = time.time()
    tic = lambda: 'at %1.1f seconds' % (time.time() - start)

    def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

    def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

    def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)

    gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
    ])
    Result:
    Started Polling: at 0.0 seconds
    Started Polling: at 0.0 seconds
    Hey lets do some stuff while the greenlets poll, at 0.0 seconds
    Ended Polling: at 2.0 seconds
    Ended Polling: at 2.0 seconds

    이 처럼 spawn과 joinall을 이용하여 가능하니 참고하자.

    Celery

    Distributed Task Queue .. 설명을 잘 해놓았다.
    길게쓰다보니 지쳐서 간략하게 쓰겠다.

    Celery는 Task Queue이며 Worker들을 여러개 생성하며 Broker에서 Task를 꺼내와서 처리한다. 이때 동시성을 위해 Concurrency 셋팅과 Pool을 gevent를 사용할 수 있다.

    Celery option

    위에서 언급된 Broker의 Message queue로 default가 Rabbitmq로 알고있다. 하지만 많은 사람들이 Redis가 편해서 인지 Redis로 많이들 쓰고 나도 쓴적이 있다.
    그런데 pycon2014자료였나? 거기서 Celery 자료가 있는데 Rabbitmq를 써야 퍼포먼스가 더 좋다고 Celery 개발자도 권고하는 사항이라고 본거같다.
    따라서 별 이슈 없으면 Rabbitmq를 쓰도록 하자.

    이외에 알아야할 만한 부분은 크게 없으니 필요한 부분은 더 찾아보면 될거 같다.

    반응형

    댓글

Designed by Tistory.