Skip to content

Commit

Permalink
Merge pull request #1404 from OpenC3/calendar_settings
Browse files Browse the repository at this point in the history
Calendar settings
  • Loading branch information
jmthomas authored Jul 23, 2024
2 parents b240aac + fcbbcd8 commit 649dbe5
Show file tree
Hide file tree
Showing 4 changed files with 490 additions and 248 deletions.
115 changes: 70 additions & 45 deletions openc3/lib/openc3/microservices/timeline_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
require 'openc3/microservices/microservice'
require 'openc3/models/activity_model'
require 'openc3/models/timeline_model'
require 'openc3/models/tool_config_model'
require 'openc3/topics/timeline_topic'

require 'openc3/script'
Expand Down Expand Up @@ -82,45 +83,66 @@ def run_activity(activity)
end
end

def get_exec_setting()
json = ToolConfigModel.load_config('calendar-settings', 'default', scope: @scope)
if json
settings = JSON.parse(json)
return settings['execEnabled']
else
# Default is execute
return true
end
end

def run_command(activity)
@logger.info "#{@timeline_name} run_command > #{activity.as_json(:allow_nan => true)}"
begin
username = activity.data['username']
token = get_token(username)
raise "No token available for username: #{username}" unless token
cmd_no_hazardous_check(activity.data['command'], scope: @scope, token: token)
activity.commit(status: 'completed', fulfillment: true)
if get_exec_setting()
username = activity.data['username']
token = get_token(username)
raise "No token available for username: #{username}" unless token
cmd_no_hazardous_check(activity.data['command'], scope: @scope, token: token)
activity.commit(status: 'completed', fulfillment: true)
else
activity.commit(status: 'disabled', message: 'Execution is disabled')
@logger.warn "#{@timeline_name} run_command disabled > #{activity.as_json(:allow_nan => true)}"
end
rescue StandardError => e
activity.commit(status: 'failed', message: e.message)
@logger.error "#{@timeline_name} run_cmd failed > #{activity.as_json(:allow_nan => true)}, #{e.formatted}"
@logger.error "#{@timeline_name} run_command failed > #{activity.as_json(:allow_nan => true)}, #{e.formatted}"
end
end

def run_script(activity)
@logger.info "#{@timeline_name} run_script > #{activity.as_json(:allow_nan => true)}"
begin
username = activity.data['username']
token = get_token(username)
raise "No token available for username: #{username}" unless token
request = Net::HTTP::Post.new(
"/script-api/scripts/#{activity.data['script']}/run?scope=#{@scope}",
'Content-Type' => 'application/json',
'Authorization' => token
)
request.body = JSON.generate({
'scope' => @scope,
'environment' => activity.data['environment'],
'timeline' => @timeline_name,
'id' => activity.start
})
hostname = ENV['OPENC3_SCRIPT_HOSTNAME'] || 'openc3-cosmos-script-runner-api'
response = Net::HTTP.new(hostname, 2902).request(request)
raise "failed to call #{hostname}, for script: #{activity.data['script']}, response code: #{response.code}" if response.code != '200'

activity.commit(status: 'completed', message: "#{activity.data['script']} => #{response.body}", fulfillment: true)
if get_exec_setting()
username = activity.data['username']
token = get_token(username)
raise "No token available for username: #{username}" unless token
request = Net::HTTP::Post.new(
"/script-api/scripts/#{activity.data['script']}/run?scope=#{@scope}",
'Content-Type' => 'application/json',
'Authorization' => token
)
request.body = JSON.generate({
'scope' => @scope,
'environment' => activity.data['environment'],
'timeline' => @timeline_name,
'id' => activity.start
})
hostname = ENV['OPENC3_SCRIPT_HOSTNAME'] || 'openc3-cosmos-script-runner-api'
response = Net::HTTP.new(hostname, 2902).request(request)
raise "failed to call #{hostname}, for script: #{activity.data['script']}, response code: #{response.code}" if response.code != '200'

activity.commit(status: 'completed', message: "#{activity.data['script']} => #{response.body}", fulfillment: true)
else
activity.commit(status: 'disabled', message: 'Execution is disabled')
@logger.warn "#{@timeline_name} run_script disabled > #{activity.as_json(:allow_nan => true)}"
end
rescue StandardError => e
activity.commit(status: 'failed', message: e.message)
@logger.error "#{@timeline_name} run_script failed > #{activity.as_json(:allow_nan => true).to_s}, #{e.message}"
@logger.error "#{@timeline_name} run_script failed > #{activity.as_json(:allow_nan => true)}, #{e.message}"
end
end

Expand Down Expand Up @@ -212,7 +234,7 @@ def generate_thread_pool
def run
@logger.info "#{@timeline_name} timeline manager running"
loop do
start = Time.now.to_i
start = Time.now.to_f
@schedule.activities.each do |activity|
start_difference = activity.start - start
if start_difference <= 0 && @schedule.not_queued?(activity.start)
Expand All @@ -230,18 +252,18 @@ def run
sleep(1)
break if @cancel_thread
end
@logger.info "#{@timeline_name} timeine manager exiting"
@logger.info "#{@timeline_name} timeline manager exiting"
end

# Add task to remove events older than 7 days
def add_expire_activity
now = Time.now.to_i
@expire = now + 3_000
now = Time.now.to_f
@expire = now + 3540 # Needs to be less than 3600 which is the hour we store in memory
activity = ActivityModel.new(
name: @timeline_name,
scope: @scope,
start: 0,
stop: (now - 86_400 * 7),
stop: (now - (86_400 * 7)),
kind: 'expire',
data: {}
)
Expand Down Expand Up @@ -283,13 +305,13 @@ def initialize(name)
super(name)
@timeline_name = name.split('__')[2]
@schedule = Schedule.new(@timeline_name)
@manager = TimelineManager.new(name: @timeline_name, logger: @logger, scope: scope, schedule: @schedule)
@manager = TimelineManager.new(name: @timeline_name, logger: @logger, scope: @scope, schedule: @schedule)
@manager_thread = nil
@read_topic = true
end

def run
@logger.info "#{@name} timeine running"
@logger.info "#{@name} timeline running"
@manager_thread = Thread.new { @manager.run }
loop do
current_activities = ActivityModel.activities(name: @timeline_name, scope: @scope)
Expand All @@ -299,19 +321,19 @@ def run
block_for_updates()
break if @cancel_thread
end
@logger.info "#{@name} timeine exitting"
@logger.info "#{@name} timeline exiting"
end

def topic_lookup_functions
{
'timeline' => {
'created' => :timeline_nop,
'created' => :timeline_noop,
'refresh' => :schedule_refresh,
'updated' => :timeline_nop,
'deleted' => :timeline_nop
'updated' => :timeline_noop,
'deleted' => :timeline_noop
},
'activity' => {
'event' => :timeline_nop,
'event' => :timeline_noop,
'created' => :create_activity_from_event,
'updated' => :schedule_refresh,
'deleted' => :remove_activity_from_event
Expand All @@ -335,7 +357,7 @@ def block_for_updates
end
end

def timeline_nop(data)
def timeline_noop(data)
@logger.debug "#{@name} timeline web socket event: #{data}"
end

Expand All @@ -347,8 +369,8 @@ def schedule_refresh(data)
# Add the activity to the schedule. We don't need to hold the job in memory
# if it is longer than an hour away. A refresh task will update that.
def create_activity_from_event(data)
diff = data['start'] - Time.now.to_i
return unless (2..3600).include? diff
diff = data['start'] - Time.now.to_f
return if diff < 0 or diff > 3600

activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
@schedule.add_activity(activity)
Expand All @@ -357,17 +379,20 @@ def create_activity_from_event(data)
# Remove the activity from the schedule. We don't need to remove the activity
# if it is longer than an hour away. It will be removed from the data.
def remove_activity_from_event(data)
diff = data['start'] - Time.now.to_i
return unless (2..3600).include? diff
diff = data['start'] - Time.now.to_f
return if diff < 0 or diff > 3600

activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
@schedule.remove_activity(activity)
end

def shutdown
@read_topic = false
@manager.shutdown
super
# super also sets @cancel_thread = true but we want to set it first
# so when we set @read_topic = false the run loop stops
@cancel_thread = true
@read_topic = false
super()
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion openc3/python/openc3/script/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async def _listen(self, endpoint, sub_msg, callback):
except Exception as e:
logging.exception(e)
finally:
logging.debug(f"exitting task: {endpoint}")
logging.debug(f"exiting task: {endpoint}")
self._queues.pop(endpoint, None)
self._events.pop(endpoint, None)

Expand Down
Loading

0 comments on commit 649dbe5

Please sign in to comment.