Skip to content

Commit

Permalink
Very initial work on issue stompgem#121 analysis.
Browse files Browse the repository at this point in the history
  • Loading branch information
gmallard committed May 16, 2016
1 parent 148993c commit ed6df82
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 0 deletions.
113 changes: 113 additions & 0 deletions adhoc/issue121_01.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- encoding: utf-8 -*-

require 'rubygems' if RUBY_VERSION < "1.9"
require 'stomp'

# Focus on this gem's capabilities.
require 'memory_profiler'

if Kernel.respond_to?(:require_relative)
require_relative("stomp_adhoc_common")
else
$LOAD_PATH << File.dirname(__FILE__)
require "stomp_adhoc_common"
end
include Stomp11Common

# Initial testing around issue #121.

class Issue121Examp01

attr_reader :client, :session

# Initialize.
def initialize(topic = false)
@client, @session, @topic = nil, nil, topic
@nmsgs = nmsgs()
@queue = make_destination("issue121/test_01")
@id = "issue121_01"
@block = cli_block()
end # initialize

# Startup
def start
#
client_hdrs = {"accept-version" => "1.1,1.2",
"host" => virt_host,
}
#
client_hash = { :hosts => [
{:login => login(), :passcode => passcode(), :host => host(), :port => port()},
],
:connect_headers => client_hdrs,
}
#
@client = Stomp::Client.new(client_hash)
puts "START: Client Connect complete"
raise "START: Connection failed!!" unless @client.open?
raise "START: Unexpected protocol level!!" if @client.protocol() == Stomp::SPL_10
cf = @client.connection_frame()
puts "START: Connection frame: #{cf}"
raise "START: Connect error!!: #{cf.body}" if @client.connection_frame().command == Stomp::CMD_ERROR
@session = @client.connection_frame().headers['session']
puts "START: Queue/Topic Name: #{@queue}"
puts "START: Session: #{@session}"
end # start

#
def shutdown
@client.close
puts "SHUT: Shutdown complete"
end # shutdown

def publish
m = "Message: "
@nmsgs.times do |n|
mo = "#{m} #{n}"
puts mo
hs = {:session => @session}
if @block
@client.publish(@queue,
mo,
hs) {|m|
ip = m
puts "PUB: HAVE_RECEIPT: #{ip}"
}
else
@client.publish(@queue, mo, hs)
end
end # do @nmsgs
end # publish

def subscribe
puts "SUB: Subscribe starts For: #{@queue}"
rmc, done = 0, false
sh = {:id => "#{@id}", :ack => "auto"}
@client.subscribe(@queue, sh) {|m|
rmc += 1
rm = m
puts "SUB: HAVE_MESSAGE: #{rm}"
if rmc >= @nmsgs
done = true
Thread.done
puts "SUB: Subscribe is ending for #{@queue}"
end
}
while !done do
ts = rand(6)
ts = 1 if ts == 0
break if done
end
puts "SUB: Receives Done For: #{@queue}"
end # subscribe

end # class

#
puts "BEG: Memory Profiler Version is: #{MemoryProfiler::VERSION}"
#
e = Issue121Examp01.new
e.start
e.publish
e.subscribe
e.shutdown
95 changes: 95 additions & 0 deletions adhoc/stomp_adhoc_common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# -*- encoding: utf-8 -*-

#
# Common Stomp 1.1 code.
#
require "rubygems" if RUBY_VERSION < "1.9"
require "stomp"
#
module Stomp11Common
# Port Constants locally
STOMP_AMQ_PORT = ENV['STOMP_AMQ_PORT'] ? ENV['STOMP_AMQ_PORT'].to_i : 61613
STOMP_APOLLO_PORT = ENV['STOMP_APOLLO_PORT'] ? ENV['STOMP_APOLLO_PORT'].to_i : 62613
STOMP_ARTEMIS_PORT = ENV['STOMP_ARTEMIS_PORT'] ? ENV['STOMP_ARTEMIS_PORT'].to_i : 31613
STOMP_SSNG_PORT = ENV['STOMP_SSNG_PORT'] ? ENV['STOMP_SSNG_PORT'].to_i : 51613
STOMP_RMQ_PORT = ENV['STOMP_RMQ_PORT'] ? ENV['STOMP_RMQ_PORT'].to_i : 41613

# Vhost Constants
STOMP_RMQ_VHOST = ENV['STOMP_RMQ_VHOST'] || '/'
STOMP_VHOST = ENV['STOMP_VHOST'] || 'localhost'

# Client Protocol List
STOMP_PROTOCOL = ENV['STOMP_PROTOCOL'] || "1.2"

# User id
def login()
ENV['STOMP_USER'] || 'guest'
end
# Password
def passcode()
ENV['STOMP_PASSCODE'] || 'guest'
end
# Server host
def host()
ENV['STOMP_HOST'] || "localhost" # The connect host name
end
# Server port
def port()
if ENV['STOMP_AMQ']
STOMP_AMQ_PORT
elsif ENV['STOMP_APOLLO']
STOMP_APOLLO_PORT
elsif ENV['STOMP_RMQ']
STOMP_RMQ_PORT
elsif ENV['STOMP_SSNG']
STOMP_SSNG_PORT
elsif ENV['STOMP_PORT']
ENV['STOMP_PORT'].to_i
else
61613 # The default ActiveMQ stomp listener port
end
end
# Required vhost name
def virt_host()
if ENV['STOMP_RMQ']
STOMP_RMQ_VHOST
else
STOMP_VHOST
end
end
# Create a 1.1 commection
def get_connection()
conn_hdrs = {"accept-version" => STOMP_PROTOCOL,
"host" => virt_host(), # the vhost
}
conn_hash = { :hosts => [
{:login => login(), :passcode => passcode(), :host => host(), :port => port()},
],
:connect_headers => conn_hdrs,
}
conn = Stomp::Connection.new(conn_hash)
end

# Number of messages
def nmsgs()
(ENV['STOMP_NMSGS'] || 1).to_i # Number of messages
end

# Queue / Topic Name
def make_destination(right_part = nil, topic = false)
if ENV['STOMP_DOTQUEUE']
right_part.gsub!('/', '.')
end
if topic
"/topic/#{right_part}"
else
"/queue/#{right_part}"
end
end

# True if client should supply a receipt block on 'publish'
def cli_block()
ENV['STOMP_CLI_BLOCK']
end

end # module

0 comments on commit ed6df82

Please sign in to comment.