Skip to content
This repository has been archived by the owner on Sep 2, 2019. It is now read-only.

Commit

Permalink
Getting there...
Browse files Browse the repository at this point in the history
Former-commit-id: 05c8031bca9f3bfa06940bd0e2dc0346cd5d725e
  • Loading branch information
opoudjis committed Jan 23, 2016
1 parent 7d9b545 commit 90121e6
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 7 deletions.
10 changes: 10 additions & 0 deletions launch_core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def launch
'naplan.sifxmlout',
'naplan.csv',
'naplan.csvstudents',
'naplan.csv_staff',
'naplan.csvstaff',
'naplan.sifxml_staff',
'naplan.sifxml_staff.none',
'naplan.sifxml_staff.low',
'naplan.sifxml_staff.medium',
'naplan.sifxml_staff.high',
'naplan.sifxml_staff.extreme',
'naplan.sifxmlout_staff',
'naplan.csvstaff_out',
'test.test1',
'json.test',
'rspec.test'
Expand Down
4 changes: 3 additions & 1 deletion launch_nias.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def launch
'cons-oneroster-sms-storage.rb',
'cons-prod-oneroster-parser.rb',
'cons-prod-sif2scv-studentpersonal-naplanreg-parser.rb',
'cons-prod-csv2sif-studentpersonal-naplanreg-parser.rb'
'cons-prod-csv2sif-studentpersonal-naplanreg-parser.rb',
'cons-prod-csv2sif-staffpersonal-naplanreg-parser.rb',
'cons-prod-sif2csv-staffpersonal-naplanreg-parser.rb',
]

sms_services.each_with_index do | service, i |
Expand Down
115 changes: 115 additions & 0 deletions sms/services/cons-prod-csv2sif-staffpersonal-naplanreg-parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# cons-prod-csv2sif-studentpersonal-naplanreg-parser.rb

# consumer that reads in studentpersonal records from naplan/csv stream,
# and generates csv equivalent records in naplan/sifxmlout stream


require 'nokogiri'
require 'json'
require 'poseidon'
require 'hashids'
require 'csv'
require 'securerandom'
require_relative 'cvsheaders-naplan'

@inbound = 'naplan.csv_staff'
@outbound = 'naplan.sifxmlout_staff'

@idgen = Hashids.new( 'nsip random temp uid' )

@servicename = 'cons-prod-csv2sif-staffpersonal-naplanreg-parser'

# create consumer
consumer = Poseidon::PartitionConsumer.new(@servicename, "localhost", 9092,
@inbound, 0, :latest_offset)


# set up producer pool - busier the broker the better for speed
producers = []
(1..10).each do | i |
p = Poseidon::Producer.new(["localhost:9092"], @servicename, {:partitioner => Proc.new { |key, partition_count| 0 } })
producers << p
end
@pool = producers.cycle

loop do

begin
messages = []
outbound_messages = []
messages = consumer.fetch

messages.each do |m|
row = JSON.parse(m.value)
# Carriage return unacceptable
row.each_key do |key|
row[key].gsub!("[ ]*\n[ ]*", " ")
end

classcodes = row['ClassCode'].split(/,/)
classcodes_xml = ''
classcodes.each { |x| classcodes_xml << " <ClassCode>#{x}</ClassCode>\n" }

xml = <<XML
<StaffPersonal RefId="#{SecureRandom.uuid}">
<LocalId>#{row['LocalId']}</LocalId>
<PersonInfo>
<Name Type="LGL">
<FamilyName>#{row['FamilyName']}</FamilyName>
<GivenName>#{row['GivenName']}</GivenName>
</Name>
<EmailList>
<Email Type="01">#{row['EmailAddress']}</Email>
</EmailList>
</PersonInfo>
<MostRecent>
<SchoolLocalId>#{row['SchoolLocalId']}</SchoolLocalId>
<SchoolACARAId>#{row['ASLSchoolId']}</SchoolACARAId>
<LocalCampusId>#{row['LocalCampusId']}</LocalCampusId>
<NAPLANClassList>
#{classcodes_xml}
</NAPLANClassList>
<HomeGroup>#{row['Homegroup']}</HomeGroup>
</MostRecent>
</SaffPersonal>
XML

nodes = Nokogiri::XML( xml ) do |config|
config.nonet.noblanks
end
nodes.xpath('//StaffPersonal//child::*[not(node())]').each do |node|
node.remove
end
outbound_messages << Poseidon::MessageToSend.new( "#{@outbound}", nodes.root.to_s, "indexed" )
end

# send results to indexer to create sms data graph
outbound_messages.each_slice(20) do | batch |
@pool.next.send_messages( batch )
end


# puts "cons-prod-oneroster-parser: Resuming message consumption from: #{consumer.next_offset}"

rescue Poseidon::Errors::UnknownTopicOrPartition
puts "Topic #{@inbound} does not exist yet, will retry in 30 seconds"
sleep 30
end

# puts "Resuming message consumption from: #{consumer.next_offset}"

# trap to allow console interrupt
trap("INT") {
puts "\n#{@servicename} service shutting down...\n\n"
exit 130
}

sleep 1

end





Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def Postcode2State( postcodestr )

messages.each do |m|
row = JSON.parse(m.value)

xml = <<XML
<StudentPersonal RefId="#{SecureRandom.uuid}">
<LocalId>#{row['LocalId']}</LocalId>
Expand Down Expand Up @@ -193,7 +194,7 @@ def Postcode2State( postcodestr )

# trap to allow console interrupt
trap("INT") {
puts "\ncons-prod-oneroster-parser service shutting down...\n\n"
puts "\n#{@servicename} service shutting down...\n\n"
exit 130
}

Expand Down
2 changes: 1 addition & 1 deletion sms/services/cons-prod-oneroster-parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@

# trap to allow console interrupt
trap("INT") {
puts "\ncons-prod-oneroster-parser service shutting down...\n\n"
puts "\n#{@servicename} service shutting down...\n\n"
exit 130
}

Expand Down
120 changes: 120 additions & 0 deletions sms/services/cons-prod-sif2csv-staffpersonal-naplanreg-parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# cons-prod-sif2scv-studentpersonal-naplanreg-parser.rb

# consumer that reads in studentpersonal records from naplan/sifxml stream,
# and generates csv equivalent records in naplan/csvstudents stream


require 'json'
require 'nokogiri'
require 'poseidon'
require 'poseidon_cluster' # to track offset, which seems to get lost for bulk data
require 'hashids'
require 'csv'
require_relative 'cvsheaders-naplan'

@inbound = 'naplan.sifxml_staff.none'
@outbound = 'naplan.csvstaff_out'

@servicename = 'cons-prod-sif2csv-staffpersonal-naplanreg-parser'

@idgen = Hashids.new( 'nsip random temp uid' )

# create consumer
consumer = Poseidon::PartitionConsumer.new(@servicename, "localhost", 9092, @inbound, 0, :latest_offset)
#consumer = Poseidon::ConsumerGroup.new(@servicename, ["localhost:9092"], ["localhost:2181"], @inbound)

#puts "#{@servicename} fetching offset #{ consumer.offset(0) } "

# set up producer pool - busier the broker the better for speed
producers = []
(1..10).each do | i |
p = Poseidon::Producer.new(["localhost:9092"], @servicename, {:partitioner => Proc.new { |key, partition_count| 0 } })
producers << p
end
@pool = producers.cycle

def lookup_xpath(nodes, xpath)
@ret = nodes.at_xpath(xpath)
return "" if @ret.nil?
return @ret.child
end

def lookup_xpath_multi(nodes, xpath)
@ret = nodes.xpath(xpath)
return "" if @ret.nil?
return @ret.map { |x| x.child.to_s }.join(",")
# we don't want the native ruby csv encoding of nested arrays, which is "[""x", ""y""]", but just "x,y"
end

def csv_object2array(csv)
@ret = Array.new(@csvheaders_staff.length)
@csvheaders_staff.each_with_index do |key, i|
@ret[i] = csv[key]
end
return @ret
end

loop do

begin
messages = []
outbound_messages = []
messages = consumer.fetch

#puts "#{@servicename} fetching offset #{ consumer.offset(n) } "
#puts messages[0].value.lines[0..10].join("\n") + "\n\n" unless messages.empty?
messages.each do |m|

# create csv object
csv = { }
payload = m.value

# read xml message
nodes = Nokogiri::XML( payload ) do |config|
config.nonet.noblanks
end

type = nodes.root.name
next unless type == 'StaffPersonal'

csv['LocalId'] = lookup_xpath(nodes, "//xmlns:LocalId")
csv['FamilyName'] = lookup_xpath(nodes, "//xmlns:PersonInfo/xmlns:Name/xmlns:FamilyName")
csv['GivenName'] = lookup_xpath(nodes, "//xmlns:PersonInfo/xmlns:Name/xmlns:GivenName")
csv['Homegroup'] = lookup_xpath(nodes, "//xmlns:MostRecent/xmlns:HomeGroup")
csv['ClassCode'] = lookup_xpath_multi(nodes, "//xmlns:MostRecent/xmlns:NAPLANClassList/xmlns:ClassCode")
csv['ASLSchoolId'] = lookup_xpath(nodes, "//xmlns:MostRecent/xmlns:SchoolACARAId")
csv['SchoolLocalId'] = lookup_xpath(nodes, "//xmlns:MostRecent/xmlns:SchoolLocalId")
csv['LocalCampusId'] = lookup_xpath(nodes, "//xmlns:MostRecent/xmlns:LocalCampusId")
csv['EmailAddress'] = lookup_xpath(nodes, "//xmlns:PersonInfo/xmlns:EmailList/xmlns:Email")

# puts "\nParser Index = #{idx.to_json}\n\n"
outbound_messages << Poseidon::MessageToSend.new( "#{@outbound}", csv_object2array(csv).to_csv.chomp.gsub(/\s+/, " ") + "\n", "indexed" )

end
# send results to indexer to create sms data graph
outbound_messages.each_slice(20) do | batch |
#puts batch[0].value.lines[0..10].join("\n") + "\n\n" unless batch.empty?
@pool.next.send_messages( batch )
end
#end


# puts "cons-prod-sif-parser: Resuming message consumption from: #{consumer.next_offset}"

rescue Poseidon::Errors::UnknownTopicOrPartition
puts "Topic #{@inbound} does not exist yet, will retry in 30 seconds"
sleep 30
end

# puts "Resuming message consumption from: #{consumer.next_offset}"

# trap to allow console interrupt
trap("INT") {
puts "\n#{@servicename} service shutting down...\n\n"
exit 130
}

sleep 1

end

Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def lookup_xpath(nodes, xpath)
end

def csv_object2array(csv)
@ret = Array.new(@csvheaders.length)
@csvheaders.each_with_index do |key, i|
@ret = Array.new(@csvheaders_students.length)
@csvheaders_students.each_with_index do |key, i|
@ret[i] = csv[key]
end
return @ret
Expand Down Expand Up @@ -130,8 +130,9 @@ def csv_object2array(csv)
csv['StateTerritory'] = lookup_xpath(nodes, "//xmlns:PersonInfo/xmlns:AddressList/xmlns:Address[@Role = '012B']/xmlns:StateProvince")

# puts "\nParser Index = #{idx.to_json}\n\n"


outbound_messages << Poseidon::MessageToSend.new( "#{@outbound}", csv_object2array(csv).to_csv, "indexed" )
outbound_messages << Poseidon::MessageToSend.new( "#{@outbound}", csv_object2array(csv).to_csv.chomp.gsub(/\s+/, " ") + "\n", "indexed" )

end
# send results to indexer to create sms data graph
Expand Down
16 changes: 15 additions & 1 deletion sms/services/cvsheaders-naplan.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@csvheaders = [
@csvheaders_students = [
'LocalId',
'SectorId',
'DiocesanId',
Expand Down Expand Up @@ -56,3 +56,17 @@
'Postcode',
'StateTerritory',
]

@csvheaders_staff = [
'LocalId',
'GivenName',
'FamilyName',
'Homegroup',
'ClassCode',
'ASLSchoolId',
'SchoolLocalId',
'LocalCampusId',
'EmailAddress',
'ReceiveAdditionalInformation',
'StaffSchoolRole',
]
Loading

0 comments on commit 90121e6

Please sign in to comment.