Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target logging, Streaming API cleanup, Improved Log Message Component #227

Merged
merged 25 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1de2059
data reduction updates in work
ryanmelt Sep 29, 2022
f375a1e
Start target logging
ryanmelt Oct 13, 2022
3ec2c99
merge master
ryanmelt Oct 20, 2022
36fd796
Merge branch 'reduction_updates' into target_logging
ryanmelt Oct 21, 2022
e15e328
continue updating stream_api, and log formats
ryanmelt Oct 28, 2022
5729749
merge master
ryanmelt Oct 28, 2022
7a6651d
Backend architecture complete
ryanmelt Oct 31, 2022
b4113e4
fixing unit tests
ryanmelt Oct 31, 2022
beef6a6
unit tests passing
ryanmelt Nov 1, 2022
08b0134
Unit tests passing
ryanmelt Nov 2, 2022
fe9b0ed
Remove debugging puts
ryanmelt Nov 2, 2022
2b278f2
create buckets only in init container
ryanmelt Nov 3, 2022
b983a68
fix bucket init
ryanmelt Nov 3, 2022
7579060
fix specs
ryanmelt Nov 3, 2022
b25cfd2
Control compose logging. Existing UI streams working without reduced
ryanmelt Nov 5, 2022
0912c8d
Merge branch 'master' into target_logging
jmthomas Nov 6, 2022
86cb05b
Remove AstroBadge. Full Reduced support in DataExtractor. Fix Bugs
ryanmelt Nov 6, 2022
6a9a7c0
Merge branch 'target_logging' of https://github.com/OpenC3/openc3 int…
ryanmelt Nov 6, 2022
e8239d0
Add keymap to log file format.
ryanmelt Nov 7, 2022
8de6baa
cbor encode logs. Gzip logged data
ryanmelt Nov 7, 2022
01ffc29
Fix duplicate detection for playwright
ryanmelt Nov 7, 2022
7ec89b5
give plw more time to shutdown in spec
ryanmelt Nov 7, 2022
844389d
Reduce only tlm files. Format Microservice timestamps. Prevent reproc…
ryanmelt Nov 7, 2022
06b266c
Merge branch 'master' into target_logging
ryanmelt Nov 7, 2022
e701979
fix tests and operator_process finalize
ryanmelt Nov 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 132 additions & 123 deletions openc3-cmd-tlm-api/app/models/logged_streaming_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,157 +20,166 @@
# TODO : Handoff to realtime thread

require_relative 'streaming_thread'
require_relative 'streaming_object_file_reader'
OpenC3.require_file 'openc3/utilities/bucket_file_cache'

class LoggedStreamingThread < StreamingThread
ALLOWABLE_START_TIME_OFFSET_NSEC = 60 * Time::NSEC_PER_SECOND

def initialize(thread_id, channel, collection, stream_mode, max_batch_size = 100, scope:)
super(channel, collection, stream_mode, max_batch_size)
@thread_id = thread_id
def initialize(streaming_api, collection, max_batch_size = 100, scope:)
super(streaming_api, collection, max_batch_size)
@thread_mode = :SETUP
# Reduced has no Redis streams so go direct to file
@thread_mode = :FILE if stream_mode.to_s.upcase.include?("REDUCED")
@scope = scope
end

def thread_body
objects = @collection.objects_by_thread_id[@thread_id]
objects = @collection.objects
# Cancel if we don't have any objects ... this can happen as things are processed
# or if someone calls remove() from the StreamingApi
@cancel_thread = true unless objects and objects.length > 0
return if @cancel_thread

first_object = objects[0]
if @thread_mode == :SETUP
# The goal of this mode is to determine if we are starting with files or from
# realtime

# start_time can be at most 1 minute in the future to prevent
# spinning up threads that just block forever
if (first_object.start_time - ALLOWABLE_START_TIME_OFFSET_NSEC) > Time.now.to_nsec_from_epoch
OpenC3::Logger.info "Finishing stream start_time too far in future"
finish(objects)
@cancel_thread = true
return
end
setup_thread_body(objects)
elsif @thread_mode == :STREAM
redis_thread_body()
@cancel_thread = attempt_handoff_to_realtime()
else # @thread_mode == :FILE
file_thread_body(objects)
end
end

# Check the topic to figure out what we have in Redis
oldest_msg_id, oldest_msg_hash = OpenC3::Topic.get_oldest_message(first_object.topic)
def setup_thread_body(objects)
first_object = objects[0]

if oldest_msg_id
# We have data in Redis
# Determine oldest timestamp in stream to determine if we need to go to file
oldest_time = oldest_msg_hash['time'].to_i
# The goal of this mode is to determine if we are starting with files or from
# realtime

# OpenC3::Logger.debug "first start time:#{first_object.start_time} oldest:#{oldest_time}"
if first_object.start_time < oldest_time
# Stream from Files
@thread_mode = :FILE
else
if first_object.end_time and first_object.end_time < oldest_time
# Bad times - just end
OpenC3::Logger.info "Finishing stream - start_time after end_time"
finish(objects)
@cancel_thread = true
return
else
# Stream from Redis
# Guesstimate start offset in stream based on first packet time and redis time
redis_time = oldest_msg_id.split('-')[0].to_i * 1_000_000
delta = redis_time - oldest_time
# Start streaming from calculated redis time
offset = ((first_object.start_time + delta) / 1_000_000).to_s + '-0'
# OpenC3::Logger.debug "stream from Redis offset:#{offset} redis_time:#{redis_time} delta:#{delta}"
objects.each {|object| object.offset = offset}
@thread_mode = :STREAM
end
end
else
# Might still have data in files
@thread_mode = :FILE
end
elsif @thread_mode == :STREAM
objects_by_topic = { objects[0].topic => objects }
redis_thread_body([first_object.topic], [first_object.offset], objects_by_topic)
else # @thread_mode == :FILE
# Get next file from file cache
file_end_time = first_object.end_time
file_end_time = Time.now.to_nsec_from_epoch unless file_end_time
file_path = BucketFileCache.instance.reserve_file(first_object.cmd_or_tlm, first_object.target_name, first_object.packet_name,
first_object.start_time, file_end_time, @stream_mode, scope: @scope) # TODO: look at how @stream_mode is being used
if file_path
file_path_split = File.basename(file_path).split("__")
file_end_time = DateTime.strptime(file_path_split[1], BucketFileCache::TIMESTAMP_FORMAT).to_f * Time::NSEC_PER_SECOND # TODO: get format from different class' constant?

# Scan forward to find first packet needed
# Stream forward until packet > end_time or no more packets
results = []
plr = OpenC3::PacketLogReader.new()
topic_without_hashtag = first_object.topic.gsub(/{|}/, '') # This removes all curly braces, and we don't allow curly braces in our keys
done = plr.each(file_path, false, Time.from_nsec_from_epoch(first_object.start_time), Time.from_nsec_from_epoch(first_object.end_time)) do |packet|
time = packet.received_time if packet.respond_to? :received_time
time ||= packet.packet_time
result = nil
if @stream_mode == :RAW
result = handle_raw_packet(packet.buffer, objects, time.to_nsec_from_epoch, topic_without_hashtag)
else # @stream_mode == :DECOM
result = handle_json_packet(packet, objects, topic_without_hashtag)
end
if result
results << result
else
break
end
if results.length > @max_batch_size
transmit_results(results)
results.clear
end
break if @cancel_thread
end
transmit_results(results)
@last_file_redis_offset = plr.redis_offset
# start_time can be at most 1 minute in the future to prevent
# spinning up threads that just block forever
if (first_object.start_time - ALLOWABLE_START_TIME_OFFSET_NSEC) > Time.now.to_nsec_from_epoch
OpenC3::Logger.info "Finishing stream start_time too far in future"
finish(objects)
@cancel_thread = true
return
end

# Move to the next file
BucketFileCache.instance.unreserve_file(file_path)
objects.each {|object| object.start_time = file_end_time}
# Check the topic to figure out what we have in Redis
oldest_msg_id, oldest_msg_hash = OpenC3::Topic.get_oldest_message(first_object.topic)

if done # We reached the end time
OpenC3::Logger.info "Finishing stream for topic: #{first_object.topic} - End of files"
finish(objects)
end
if oldest_msg_id
# We have data in Redis
# Determine oldest timestamp in stream to determine if we need to go to file
oldest_time = oldest_msg_hash['time'].to_i

# OpenC3::Logger.debug "first start time:#{first_object.start_time} oldest:#{oldest_time}"
if first_object.start_time < oldest_time
# Stream from Files
@thread_mode = :FILE
else
# Switch to stream from Redis
# Determine oldest timestamp in stream
msg_id, msg_hash = OpenC3::Topic.get_oldest_message(first_object.topic)
if msg_hash
OpenC3::Logger.info "Switch stream from file to Redis"
oldest_time = msg_hash['time'].to_i
if first_object.end_time and first_object.end_time < oldest_time
# Bad times - just end
OpenC3::Logger.info "Finishing stream - start_time after end_time"
finish(objects)
@cancel_thread = true
return
else
# Stream from Redis
offset = @last_file_redis_offset if @last_file_redis_offset
if !offset
# Guesstimate start offset in stream based on first packet time and redis time
redis_time = msg_id.split('-')[0].to_i * 1000000
delta = redis_time - oldest_time
# Start streaming from calculated redis time
offset = ((first_object.start_time + delta) / 1_000_000).to_s + '-0'
end
OpenC3::Logger.debug "Oldest Redis id:#{msg_id} msg time:#{oldest_time} last object time:#{first_object.start_time} offset:#{offset}"
# Guesstimate start offset in stream based on first packet time and redis time
redis_time = oldest_msg_id.split('-')[0].to_i * 1_000_000
delta = redis_time - oldest_time
# Start streaming from calculated redis time
offset = ((first_object.start_time + delta) / 1_000_000).to_s + '-0'
# OpenC3::Logger.debug "stream from Redis offset:#{offset} redis_time:#{redis_time} delta:#{delta}"
objects.each {|object| object.offset = offset}
@thread_mode = :STREAM
else
OpenC3::Logger.info "Finishing stream for topic: #{first_object.topic} - No data in Redis"
finish(objects)
end
end
else
# Might still have data in files
@thread_mode = :FILE
end
end

def file_thread_body(objects)
topics, offsets, item_objects_by_topic, packet_objects_by_topic = @collection.topics_offsets_and_objects
results = []

# This will read out packets until nothing is left
file_reader = StreamingObjectFileReader.new(@collection, scope: @scope)
done = file_reader.each do |packet, topic|
break if @cancel_thread

# Transfers item to realtime thread when complete (if continued)
# Needs to mutex transfer
# checks if equal offset if packet already exists in realtime
# if doesn't exist adds with item offset
# if does exist and equal - transfer
# if does exist and less than - add item with less offset
# if does exist and greater than - catch up and try again
# Get the item objects that need this topic
objects = item_objects_by_topic[topic]

break if @cancel_thread
if objects and objects.length > 0
result_entry = handle_packet(packet, objects)
results << result_entry if result_entry
end
break if @cancel_thread

# Transmit if we have a full batch or more
if results.length >= @max_batch_size
@streaming_api.transmit_results(results)
results.clear
end

# Get the packet objects that need this topic
objects = packet_objects_by_topic[topic]

if objects
objects.each do |object|
break if @cancel_thread
result_entry = handle_packet(packet, [object])
results << result_entry if result_entry
# Transmit if we have a full batch or more
if results.length >= @max_batch_size
@streaming_api.transmit_results(results)
results.clear
end
end
end

break if @cancel_thread
end
return if @cancel_thread

# Transmit less than a batch if we have that
@streaming_api.transmit_results(results)
results.clear

if done # We reached the end time
OpenC3::Logger.info "Finishing LoggedStreamingThread for #{@collection.length} objects - Reached End Time"
finish(@collection.objects)
return
end

# Switch to Redis
@thread_mode = :STREAM
end

def handle_packet(packet, objects)
first_object = objects[0]
if first_object.stream_mode == :RAW
return handle_raw_packet(packet.buffer(false), objects, packet.packet_time.to_nsec_from_epoch)
else # @stream_mode == :DECOM or :REDUCED_X
return handle_json_packet(packet, objects)
end
end

# Transfers item to realtime thread when complete (if continued)
# Needs to mutex transfer
# checks if equal offset if packet already exists in realtime
# if doesn't exist adds with item offset
# if does exist and equal - transfer
# if does exist and less than - add item with less offset
# if does exist and greater than - catch up and try again
def attempt_handoff_to_realtime
if @collection.includes_realtime
return @streaming_api.handoff_to_realtime(@collection)
end
return false
end
end
3 changes: 1 addition & 2 deletions openc3-cmd-tlm-api/app/models/realtime_streaming_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

class RealtimeStreamingThread < StreamingThread
def thread_body
topics, offsets, objects_by_topic = @collection.realtime_topics_offsets_and_objects
redis_thread_body(topics, offsets, objects_by_topic)
redis_thread_body()
end
end
Loading