読者です 読者をやめる 読者になる 読者になる

YAMAGUCHI::weblog

土足で窓から失礼いたします。今日からあなたの息子になります。 当年とって92歳、下町の発明王、エジソンです。

pyzmqでZeroMQを触ってみる (PUSH/PULL)

Python ZeroMQ

はじめに

こんにちは、Python界の情弱です。後れを取っています。さてREQ/REP, PUB/SUBに続いてPUSH/PULLも実装してみました。

参考

PUSH/PULL その1 (分割統治法)

ガイドに載っていた例は分割統治法の例で、ventilatorが1-1000の数字を発行し、workerがその数字に応じた負荷の処理を行い、その結果をsinkでまとめるという流れ。
f:id:ymotongpoo:20120329114034p:image

コード
  • ventilator
import zmq
from random import randint
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

_ = raw_input("Press enter when the workers are ready:")
print "Sending tasks to workers..."

sink.send("0")

random.seed()
total_msec = 0
for task_number in range(100):
  workload = randint(1, 100)
  total_msec += workload
  sender.send(str(workload))

print "Total expected cost: %d msec" % total_msec
time.sleep(1.0)
  • worker.py
import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

while True:
  message = receiver.recv()
  print "%s." % message

  time.sleep(int(message) / 1000.0)
  sender.send("")
  • sink.py
import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

message = receiver.recv()

start_time = time.time()
for task_number in range(0, 100):
  message = receiver.recv()
  if task_number % 10 == 0:
    print ":"
  else:
    print "."

print "Total elapsed time: %f msec" % ((time.time() - start_time) * 1000)
実行結果

実行はまずsink.pyを起動したあとにworker.pyを起動、そして最後にventilator.pyを起動して実行。結果はこうなった。

  • worker 1つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 4773 msec
% python sink.py
...
Total elapsed time: 4801.482916 msec
  • worker 2つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 5131 msec
% python sink.py
...
Total elapsed time: 2725.458145 msec
  • worker 4つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 4865 msec
% python sink.py
...
Total elapsed time: 1348.852158 msec

ちゃんと時間が半分ずつになってっているので面白い。

PUSH/PULL その2

もっと単純にpublisherとconsumerだけ立てたらどうだろうと思って適当にコード書きました。

コード
  • calcpush.py
import zmq
import time
from random import randint

cxt = zmq.Context()
sender = cxt.socket(zmq.PUSH)
sender.bind("tcp://127.0.0.1:5555")

# wait for consumers connected to publisher
time.sleep(10)

sum = 0
for i in range(1000):
  value = randint(5000)
  sum += value
  print i, value
  sender.send(str(value))

print "sum is %d" % sum
time.sleep(10)
  • calcpull.py
import zmq
import sys
import time

cxt = zmq.Context()
receiver = cxt.socket(zmq.PULL)
receiver.connect("tcp://127.0.0.1:5555")

sum = 0
while True:
  message = receiver.recv()
  sum += int(message)
  print "worker %s: sum = %d" % (sys.argv[1], sum)
  time.sleep(0.1)
実行結果

まずconsumerを2つ立ち上げてからpublisherを立ち上げる。

% python calcpush.py
0 1901
1 1876
...
999 66
sum is 2505225
% python calcpull.py 1
worker 1: sum = 1876
...
worker 1: sum = 1251792
worker 1: sum = 1254235
worker 1: sum = 1254301
% python calcpull.py 2
worker 2: sum = 1901
...
worker 2: sum = 1250625
worker 2: sum = 1250924

ちゃんとconsumer 2つの合計が元のリストの合計になってる。めでたい。

おまけ

最初calcpush.pyの方でconsumerが接続するのを待たずにどんどんメッセージ送っちゃうようにしたらなぜか1つのconsumerしか処理してなくて、原因がわからずに困ってた。ドキュメントを見たらまさにそのことが書いてあった。ちゃんとドキュメント読めよ、って話だった。元の実装の方はユーザからの入力待ちのところで接続するまでの時間を稼いでたってことだったのね。指摘してくれた id:moriyoshi & id:Voluntas に感謝。

We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in 〓MQ and there is no easy solution. The 'connect' method takes a certain time. So when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are also connecting. If you don't synchronize the start of the batch somehow, the system won't run in parallel at all. Try removing the wait, and see.