YAMAGUCHI::weblog

噛み付き地蔵に憧れて、この神の世界にやってきました。マドンナみたいな男の子、コッペです。

pyzmqでZeroMQを触ってみる (PUSH/PULL)

はじめに

こんにちは、Python界の情弱です。後れを取っています。さてREQ/REP, PUB/SUBに続いてPUSH/PULLも実装してみました。

参考

PUSH/PULL その1 (分割統治法)

ガイドに載っていた例は分割統治法の例で、ventilatorが1-1000の数字を発行し、workerがその数字に応じた負荷の処理を行い、その結果をsinkでまとめるという流れ。
f:id:ymotongpoo:20120329114034p:image

コード
  • ventilator
import zmq
from random import randint
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

_ = raw_input("Press enter when the workers are ready:")
print "Sending tasks to workers..."

sink.send("0")

random.seed()
total_msec = 0
for task_number in range(100):
  workload = randint(1, 100)
  total_msec += workload
  sender.send(str(workload))

print "Total expected cost: %d msec" % total_msec
time.sleep(1.0)
  • worker.py
import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

while True:
  message = receiver.recv()
  print "%s." % message

  time.sleep(int(message) / 1000.0)
  sender.send("")
  • sink.py
import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

message = receiver.recv()

start_time = time.time()
for task_number in range(0, 100):
  message = receiver.recv()
  if task_number % 10 == 0:
    print ":"
  else:
    print "."

print "Total elapsed time: %f msec" % ((time.time() - start_time) * 1000)
実行結果

実行はまずsink.pyを起動したあとにworker.pyを起動、そして最後にventilator.pyを起動して実行。結果はこうなった。

  • worker 1つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 4773 msec
% python sink.py
...
Total elapsed time: 4801.482916 msec
  • worker 2つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 5131 msec
% python sink.py
...
Total elapsed time: 2725.458145 msec
  • worker 4つ
% python ventilator.py
Press enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 4865 msec
% python sink.py
...
Total elapsed time: 1348.852158 msec

ちゃんと時間が半分ずつになってっているので面白い。

PUSH/PULL その2

もっと単純にpublisherとconsumerだけ立てたらどうだろうと思って適当にコード書きました。

コード
  • calcpush.py
import zmq
import time
from random import randint

cxt = zmq.Context()
sender = cxt.socket(zmq.PUSH)
sender.bind("tcp://127.0.0.1:5555")

# wait for consumers connected to publisher
time.sleep(10)

sum = 0
for i in range(1000):
  value = randint(5000)
  sum += value
  print i, value
  sender.send(str(value))

print "sum is %d" % sum
time.sleep(10)
  • calcpull.py
import zmq
import sys
import time

cxt = zmq.Context()
receiver = cxt.socket(zmq.PULL)
receiver.connect("tcp://127.0.0.1:5555")

sum = 0
while True:
  message = receiver.recv()
  sum += int(message)
  print "worker %s: sum = %d" % (sys.argv[1], sum)
  time.sleep(0.1)
実行結果

まずconsumerを2つ立ち上げてからpublisherを立ち上げる。

% python calcpush.py
0 1901
1 1876
...
999 66
sum is 2505225
% python calcpull.py 1
worker 1: sum = 1876
...
worker 1: sum = 1251792
worker 1: sum = 1254235
worker 1: sum = 1254301
% python calcpull.py 2
worker 2: sum = 1901
...
worker 2: sum = 1250625
worker 2: sum = 1250924

ちゃんとconsumer 2つの合計が元のリストの合計になってる。めでたい。

おまけ

最初calcpush.pyの方でconsumerが接続するのを待たずにどんどんメッセージ送っちゃうようにしたらなぜか1つのconsumerしか処理してなくて、原因がわからずに困ってた。ドキュメントを見たらまさにそのことが書いてあった。ちゃんとドキュメント読めよ、って話だった。元の実装の方はユーザからの入力待ちのところで接続するまでの時間を稼いでたってことだったのね。指摘してくれた id:moriyoshi & id:Voluntas に感謝。

We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in 〓MQ and there is no easy solution. The 'connect' method takes a certain time. So when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are also connecting. If you don't synchronize the start of the batch somehow, the system won't run in parallel at all. Try removing the wait, and see.

pyzmqでZeroMQを触ってみる (PUB/SUB)

はじめに

こんにちは、Python界の情弱です。前回に引き続きpyzmqでZeroMQを触ってみます。The Guideの流れに従ってREQ/REPの次はPUB/SUBをやってみました。

PUB/SUB

ZeroMQのガイドに載っているサンプルは天気予報を通知するPublisherと、それを受け取って特定の都市のデータだけフィルタして平均気温を表示するSubscriberがいるというやつだった。

コード
  • weathersrv(天気予報)
import zmq
import time
from random import randrange

context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

message_id = 0
while True:
  zipcode     = randrange(10001, 10010)
  temprature  = randrange(0, 215) - 80
  relhumidity = randrange(0, 50) + 10

  update = "%05d %d %d %d" % (zipcode, temprature, relhumidity, message_id)
  message_id += 1
  print update
  time.sleep(1.0)
  publisher.send(update)
  • weathercli
import zmq
import sys

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")

# receive only message with zipcode being 10001
zipfilter = sys.argv if len(sys.argv) > 1 else "10001 "
subscriber.setsockopt(zmq.SUBSCRIBE, zipfilter)

update_samples = 10
for updates in range(update_samples):
  message = subscriber.recv()
  zipcode, temprature, relhumidity, message_id = message.split()
  print ("zip:%s, temp:%s, relh:%s, id:%s" % 
         (zipcode, temprature, relhumidity, message_id))
  total_temp = float(temprature)

print ("average temprature for zipcode '%s' was '%f'" % 
       (zipfilter, total_temp / update_samples))
実行結果

まずSubscriberを2つ起動してからPublisherを起動する。

% python weathercli.py
% python weathercli.py

この状態でPublisher起動。

% python weathersrv.py

すると

  • weathersrv.py
% python weathersrv.py
10005 -23 33 0
10001 11 56 1
10006 124 33 2
10006 7 49 3
10007 115 57 4
10009 76 10 5
10001 106 20 6
10009 -69 43 7
10007 14 24 8
10009 -24 51 9
10006 -32 58 10
10007 62 59 11
10004 2 40 12
10006 110 30 13
10006 53 31 14
...

と表示されて、Subsriberの方は

  • weathercli.py (#1)
% python weathercli.py
zip:10001, temp:11, relh:56, id:1
zip:10001, temp:106, relh:20, id:6
zip:10001, temp:68, relh:57, id:21
zip:10001, temp:89, relh:34, id:34
zip:10001, temp:-73, relh:54, id:35
zip:10001, temp:45, relh:31, id:45
zip:10001, temp:-25, relh:12, id:48
zip:10001, temp:125, relh:27, id:62
zip:10001, temp:-29, relh:35, id:83
zip:10001, temp:48, relh:33, id:88
average temprature for zipcode '10001 ' was '4.800000'
  • weathercli.py (#2)
% python weathercli.py
zip:10001, temp:11, relh:56, id:1
zip:10001, temp:106, relh:20, id:6
zip:10001, temp:68, relh:57, id:21
zip:10001, temp:89, relh:34, id:34
zip:10001, temp:-73, relh:54, id:35
zip:10001, temp:45, relh:31, id:45
zip:10001, temp:-25, relh:12, id:48
zip:10001, temp:125, relh:27, id:62
zip:10001, temp:-29, relh:35, id:83
zip:10001, temp:48, relh:33, id:88
average temprature for zipcode '10001 ' was '4.800000'

と同じ値が表示されました。めでたし。

おまけ

最初Publisherのほうのコードをビジーループで実装してたらSubscriberが受信するメッセージが全然違ってしまってすごく焦った。

pyzmqでZeroMQを触ってみる (REQ/REP)

はじめに

こんにちは、Python界の情弱です。ZeroMQを使って色々やろうと思うことがあったので、ZeroMQのガイドをPython版でやってみることにした。

REQ/REP

コード

Hello World Server/Client

import zmq
import time

context = zmq.Context()
responder = context.socket(zmq.REP)
responder.bind("tcp://localhost:5555")

while True:
  request = responder.recv()
  print "recieved request: [%s]" % request
  time.sleep(1.0)
  responder.send("World at %s" % time.strftime("%Y-%m-%dT%H:%M:%S"))
import zmq
import sys

context = zmq.Context()

# Socket to talk to server
print "connecting to hello world server"
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

for request in range(1, 100):
  print "sending request %d ..." % request
  socket.send("Hello from %s" % sys.argv[1]) # show client id

  message = socket.recv()
  print "received reply %d [ %s ]" % (request, message)
実行結果

まずhelloworldsrv.pyを起動

% python helloworldsrv.py

次に2つのクライアントをそれぞれ起動

% python helloworldcli.py 1
% python helloworldcli.py 2

すると

  • helloworldsrv.py
% python helloworldsrv.py
recieved request: [Hello from 1]
recieved request: [Hello from 1]
recieved request: [Hello from 1]
recieved request: [Hello from 2]
recieved request: [Hello from 1]
recieved request: [Hello from 2]
recieved request: [Hello from 1]
recieved request: [Hello from 2]
  • helloworldcli.py
% python helloworldcli.py 1
connecting to hello world server
sending request 1 ...
received reply 1 [ World at 2012-03-27T22:15:56 ]
sending request 2 ...
received reply 2 [ World at 2012-03-27T22:15:57 ]
sending request 3 ...
received reply 3 [ World at 2012-03-27T22:15:58 ]
sending request 4 ...
received reply 4 [ World at 2012-03-27T22:16:00 ]
sending request 5 ...
received reply 5 [ World at 2012-03-27T22:16:02 ]
% python helloworldcli.py 2
connecting to hello world server
sending request 1 ...
received reply 1 [ World at 2012-03-27T22:15:59 ]
sending request 2 ...
received reply 2 [ World at 2012-03-27T22:16:01 ]
sending request 3 ...
received reply 3 [ World at 2012-03-27T22:16:03 ]

しかし、Amazonで色々探してみたけど、メッセージキューを使った設計に関する本とかって和書は出てないんだね。企業とかで結構使ってると思うんだけど、みんなどうやってんのかね。

Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions (Addison-Wesley Signature Series (Fowler))

Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions (Addison-Wesley Signature Series (Fowler))