はじめに
こんにちは、Python界の情弱です。後れを取っています。さてREQ/REP, PUB/SUBに続いてPUSH/PULLも実装してみました。
参考
- ØMQ - The Guide - ØMQ - The Guide
- 以下にある引用と画像はiMatix Corporationに帰属しています。(CC BY-SA 3.0)
PUSH/PULL その1 (分割統治法)
ガイドに載っていた例は分割統治法の例で、ventilatorが1-1000の数字を発行し、workerがその数字に応じた負荷の処理を行い、その結果をsinkでまとめるという流れ。
コード
- ventilator
import zmq from random import randint import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") _ = raw_input("Press enter when the workers are ready:") print "Sending tasks to workers..." sink.send("0") random.seed() total_msec = 0 for task_number in range(100): workload = randint(1, 100) total_msec += workload sender.send(str(workload)) print "Total expected cost: %d msec" % total_msec time.sleep(1.0)
- worker.py
import zmq import time context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: message = receiver.recv() print "%s." % message time.sleep(int(message) / 1000.0) sender.send("")
- sink.py
import zmq import time context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") message = receiver.recv() start_time = time.time() for task_number in range(0, 100): message = receiver.recv() if task_number % 10 == 0: print ":" else: print "." print "Total elapsed time: %f msec" % ((time.time() - start_time) * 1000)
実行結果
実行はまずsink.pyを起動したあとにworker.pyを起動、そして最後にventilator.pyを起動して実行。結果はこうなった。
- worker 1つ
% python ventilator.py Press enter when the workers are ready: Sending tasks to workers... Total expected cost: 4773 msec
% python sink.py ... Total elapsed time: 4801.482916 msec
- worker 2つ
% python ventilator.py Press enter when the workers are ready: Sending tasks to workers... Total expected cost: 5131 msec
% python sink.py ... Total elapsed time: 2725.458145 msec
- worker 4つ
% python ventilator.py Press enter when the workers are ready: Sending tasks to workers... Total expected cost: 4865 msec
% python sink.py ... Total elapsed time: 1348.852158 msec
ちゃんと時間が半分ずつになってっているので面白い。
PUSH/PULL その2
もっと単純にpublisherとconsumerだけ立てたらどうだろうと思って適当にコード書きました。
コード
- calcpush.py
import zmq import time from random import randint cxt = zmq.Context() sender = cxt.socket(zmq.PUSH) sender.bind("tcp://127.0.0.1:5555") # wait for consumers connected to publisher time.sleep(10) sum = 0 for i in range(1000): value = randint(5000) sum += value print i, value sender.send(str(value)) print "sum is %d" % sum time.sleep(10)
- calcpull.py
import zmq import sys import time cxt = zmq.Context() receiver = cxt.socket(zmq.PULL) receiver.connect("tcp://127.0.0.1:5555") sum = 0 while True: message = receiver.recv() sum += int(message) print "worker %s: sum = %d" % (sys.argv[1], sum) time.sleep(0.1)
実行結果
まずconsumerを2つ立ち上げてからpublisherを立ち上げる。
% python calcpush.py 0 1901 1 1876 ... 999 66 sum is 2505225
% python calcpull.py 1 worker 1: sum = 1876 ... worker 1: sum = 1251792 worker 1: sum = 1254235 worker 1: sum = 1254301
% python calcpull.py 2 worker 2: sum = 1901 ... worker 2: sum = 1250625 worker 2: sum = 1250924
ちゃんとconsumer 2つの合計が元のリストの合計になってる。めでたい。
おまけ
最初calcpush.pyの方でconsumerが接続するのを待たずにどんどんメッセージ送っちゃうようにしたらなぜか1つのconsumerしか処理してなくて、原因がわからずに困ってた。ドキュメントを見たらまさにそのことが書いてあった。ちゃんとドキュメント読めよ、って話だった。元の実装の方はユーザからの入力待ちのところで接続するまでの時間を稼いでたってことだったのね。指摘してくれた id:moriyoshi & id:Voluntas に感謝。
We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in 〓MQ and there is no easy solution. The 'connect' method takes a certain time. So when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are also connecting. If you don't synchronize the start of the batch somehow, the system won't run in parallel at all. Try removing the wait, and see.