diff --git a/src/main/java/software/amazon/awssdk/crt/iot/SubscriptionStatusEvent.java b/src/main/java/software/amazon/awssdk/crt/iot/SubscriptionStatusEvent.java index fe6867648..8db378cc6 100644 --- a/src/main/java/software/amazon/awssdk/crt/iot/SubscriptionStatusEvent.java +++ b/src/main/java/software/amazon/awssdk/crt/iot/SubscriptionStatusEvent.java @@ -11,17 +11,16 @@ * An event that describes a change in subscription status for a streaming operation. */ public class SubscriptionStatusEvent { - private SubscriptionStatusEventType type; - private Optional error; - - private SubscriptionStatusEvent(SubscriptionStatusEventType type) { - this.type = type; - this.error = Optional.empty(); - } + private final SubscriptionStatusEventType type; + private final Optional error; private SubscriptionStatusEvent(SubscriptionStatusEventType type, int errorCode) { this.type = type; - this.error = Optional.of(errorCode); + if (errorCode != 0) { + this.error = Optional.of(errorCode); + } else { + this.error = Optional.empty(); + } } /** diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 0fa76f5ad..cae30b9a0 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -186,6 +186,17 @@ } ] }, + { + "name": "java.util.function.Consumer", + "methods": [ + { + "name": "accept", + "parameterTypes": [ + "java.lang.Object" + ] + } + ] + }, { "name": "java.util.function.Predicate", "methods": [ diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 005bc1c03..2f4b5c934 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -2430,19 +2430,12 @@ static void s_cache_subscription_status_event_properties(JNIEnv *env) { subscription_status_event_properties.subscription_status_event_class = (*env)->NewGlobalRef(env, cls); - subscription_status_event_properties.constructor_method_id1 = (*env)->GetMethodID( - env, - subscription_status_event_properties.subscription_status_event_class, - "", - "(Lsoftware/amazon/awssdk/crt/iot/SubscriptionStatusEventType;)V"); - AWS_FATAL_ASSERT(subscription_status_event_properties.constructor_method_id1); - - subscription_status_event_properties.constructor_method_id2 = (*env)->GetMethodID( + subscription_status_event_properties.constructor_method_id = (*env)->GetMethodID( env, subscription_status_event_properties.subscription_status_event_class, "", "(Lsoftware/amazon/awssdk/crt/iot/SubscriptionStatusEventType;I)V"); - AWS_FATAL_ASSERT(subscription_status_event_properties.constructor_method_id2); + AWS_FATAL_ASSERT(subscription_status_event_properties.constructor_method_id); } struct java_streaming_operation_options_properties streaming_operation_options_properties; @@ -2465,6 +2458,19 @@ static void s_cache_streaming_operation_options_properties(JNIEnv *env) { AWS_FATAL_ASSERT(streaming_operation_options_properties.subscription_status_event_callback_field_id); } +struct java_consumer_properties consumer_properties; + +static void s_cache_consumer_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "java/util/function/Consumer"); + AWS_FATAL_ASSERT(cls); + + consumer_properties.consumer_class = (*env)->NewGlobalRef(env, cls); + + consumer_properties.accept_method_id = + (*env)->GetMethodID(env, consumer_properties.consumer_class, "accept", "(Ljava/lang/Object;)V"); + AWS_FATAL_ASSERT(consumer_properties.accept_method_id); +} + static void s_cache_java_class_ids(void *user_data) { JNIEnv *env = user_data; s_cache_http_request_body_stream(env); @@ -2577,6 +2583,7 @@ static void s_cache_java_class_ids(void *user_data) { s_cache_subscription_status_event_type_properties(env); s_cache_subscription_status_event_properties(env); s_cache_streaming_operation_options_properties(env); + s_cache_consumer_properties(env); } static aws_thread_once s_cache_once_init = AWS_THREAD_ONCE_STATIC_INIT; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 708d74b7b..2398f113a 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -1017,8 +1017,7 @@ extern struct java_subscription_status_event_type_properties subscription_status /* SubscriptionStatusEvent */ struct java_subscription_status_event_properties { jclass subscription_status_event_class; - jmethodID constructor_method_id1; // SubscriptionStatusEvent(SubscriptionStatusEventType type) - jmethodID constructor_method_id2; // SubscriptionStatusEvent(SubscriptionStatusEventType type, int errorCode) + jmethodID constructor_method_id; }; extern struct java_subscription_status_event_properties subscription_status_event_properties; @@ -1031,6 +1030,13 @@ struct java_streaming_operation_options_properties { }; extern struct java_streaming_operation_options_properties streaming_operation_options_properties; +/* Consumer */ +struct java_consumer_properties { + jclass consumer_class; + jmethodID accept_method_id; +}; +extern struct java_consumer_properties consumer_properties; + /** * All functions bound to JNI MUST call this before doing anything else. * This caches all JNI IDs the first time it is called. Any further calls are no-op; it is thread-safe. diff --git a/src/native/mqtt_request_response.c b/src/native/mqtt_request_response.c index 8bbc79096..f8c42b551 100644 --- a/src/native/mqtt_request_response.c +++ b/src/native/mqtt_request_response.c @@ -43,7 +43,7 @@ static void s_destroy_mqtt_request_response_client_binding(void *context) { JNIEnv *env = aws_jni_acquire_thread_env(jvm); if (env == NULL) { /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ - AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "JNI env no longer resolvable; JVM likely shutting down"); + AWS_LOGF_ERROR(AWS_LS_JAVA_CRT_GENERAL, "JNI env no longer resolvable; JVM likely shutting down"); goto done; } @@ -426,24 +426,25 @@ static void s_on_request_response_operation_completion( int error_code, void *user_data) { + jobject java_result = NULL; + jstring java_topic = NULL; + jbyteArray java_payload = NULL; struct aws_request_response_operation_binding *binding = user_data; JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); if (!env) { goto done; } - jobject java_result = NULL; - if (error_code == AWS_ERROR_SUCCESS) { java_result = (*env)->NewObject( env, mqtt_request_response_properties.mqtt_request_response_class, mqtt_request_response_properties.constructor_method_id); if (java_result != NULL) { - jstring java_topic = aws_jni_string_from_cursor(env, response_topic); + java_topic = aws_jni_string_from_cursor(env, response_topic); (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.topic_field_id, java_topic); - jbyteArray java_payload = aws_jni_byte_array_from_cursor(env, payload); + java_payload = aws_jni_byte_array_from_cursor(env, payload); (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.payload_field_id, java_payload); } } @@ -454,16 +455,33 @@ static void s_on_request_response_operation_completion( } else { int final_error_code = (error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_UNKNOWN : error_code; jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, final_error_code); - - (*env)->CallBooleanMethod( - env, - binding->operation_future, - completable_future_properties.complete_exceptionally_method_id, - crt_exception); + if (crt_exception != NULL) { + (*env)->CallBooleanMethod( + env, + binding->operation_future, + completable_future_properties.complete_exceptionally_method_id, + crt_exception); + + (*env)->DeleteLocalRef(env, crt_exception); + } } + aws_jni_check_and_clear_exception(env); + done: + if (java_result != NULL) { + (*env)->DeleteLocalRef(env, java_result); + } + + if (java_topic != NULL) { + (*env)->DeleteLocalRef(env, java_topic); + } + + if (java_payload != NULL) { + (*env)->DeleteLocalRef(env, java_payload); + } + aws_jni_release_thread_env(binding->jvm, env); s_aws_request_response_operation_binding_destroy(binding); @@ -582,7 +600,7 @@ static void s_aws_streaming_operation_binding_destroy(struct aws_streaming_opera JNIEnv *env = aws_jni_acquire_thread_env(jvm); if (env == NULL) { /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ - AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "JNI env no longer resolvable; JVM likely shutting down"); + AWS_LOGF_ERROR(AWS_LS_JAVA_CRT_GENERAL, "JNI env no longer resolvable; JVM likely shutting down"); goto done; } @@ -596,7 +614,7 @@ static void s_aws_streaming_operation_binding_destroy(struct aws_streaming_opera done: - aws_mem_release(binding->allocator, binding); + aws_mem_release(binding->allocator, binding); } static void s_aws_mqtt_streaming_operation_subscription_status_callback( @@ -605,11 +623,69 @@ static void s_aws_mqtt_streaming_operation_subscription_status_callback( void *user_data) { struct aws_streaming_operation_binding *binding = user_data; - if (!binding->java_incoming_publish_event_callback) { + if (!binding->java_subscription_status_event_callback) { return; } - ??; + /********** JNI ENV ACQUIRE **********/ + JavaVM *jvm = binding->jvm; + JNIEnv *env = aws_jni_acquire_thread_env(jvm); + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + AWS_LOGF_ERROR(AWS_LS_JAVA_CRT_GENERAL, "JNI env no longer resolvable; JVM likely shutting down"); + return; + } + + jobject java_subscription_status_event_type = NULL; + jobject java_subscription_status_event = NULL; + + java_subscription_status_event_type = (*env)->CallStaticObjectMethod( + env, + subscription_status_event_type_properties.subscription_status_event_type_class, + subscription_status_event_type_properties.get_enum_value_from_integer_method_id, + (jint)status); + if (java_subscription_status_event_type == NULL) { + AWS_LOGF_ERROR( + AWS_LS_JAVA_CRT_GENERAL, + "s_aws_mqtt_streaming_operation_subscription_status_callback - could not create subscription status event " + "type"); + goto done; + } + + java_subscription_status_event = (*env)->NewObject( + env, + subscription_status_event_properties.subscription_status_event_class, + subscription_status_event_properties.constructor_method_id, + java_subscription_status_event_type, + (jint)error_code); + + if (java_subscription_status_event == NULL) { + AWS_LOGF_ERROR( + AWS_LS_JAVA_CRT_GENERAL, + "s_aws_mqtt_streaming_operation_subscription_status_callback - could not create subscription status event"); + goto done; + } + + (*env)->CallVoidMethod( + env, + binding->java_subscription_status_event_callback, + consumer_properties.accept_method_id, + java_subscription_status_event); + + aws_jni_check_and_clear_exception(env); + +done: + + if (java_subscription_status_event_type != NULL) { + (*env)->DeleteLocalRef(env, java_subscription_status_event_type); + } + + if (java_subscription_status_event != NULL) { + (*env)->DeleteLocalRef(env, java_subscription_status_event); + } + + /********** JNI ENV RELEASE **********/ + aws_jni_release_thread_env(jvm, env); } static void s_aws_mqtt_streaming_operation_incoming_publish_callback(struct aws_byte_cursor payload, void *user_data) { @@ -619,7 +695,53 @@ static void s_aws_mqtt_streaming_operation_incoming_publish_callback(struct aws_ return; } - ??; + /********** JNI ENV ACQUIRE **********/ + JavaVM *jvm = binding->jvm; + JNIEnv *env = aws_jni_acquire_thread_env(jvm); + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + AWS_LOGF_ERROR(AWS_LS_JAVA_CRT_GENERAL, "JNI env no longer resolvable; JVM likely shutting down"); + return; + } + + jobject java_payload = NULL; + jobject java_incoming_publish_event = NULL; + + java_payload = aws_jni_byte_array_from_cursor(env, &payload); + + java_incoming_publish_event = (*env)->NewObject( + env, + incoming_publish_event_properties.incoming_publish_event_class, + incoming_publish_event_properties.constructor_method_id, + java_payload); + + if (java_incoming_publish_event == NULL) { + AWS_LOGF_ERROR( + AWS_LS_JAVA_CRT_GENERAL, + "s_aws_mqtt_streaming_operation_incoming_publish_callback - could not create incoming publish event"); + goto done; + } + + (*env)->CallVoidMethod( + env, + binding->java_incoming_publish_event_callback, + consumer_properties.accept_method_id, + java_incoming_publish_event); + + aws_jni_check_and_clear_exception(env); + +done: + + if (java_payload != NULL) { + (*env)->DeleteLocalRef(env, java_payload); + } + + if (java_incoming_publish_event != NULL) { + (*env)->DeleteLocalRef(env, java_incoming_publish_event); + } + + /********** JNI ENV RELEASE **********/ + aws_jni_release_thread_env(jvm, env); } static void s_aws_mqtt_streaming_operation_terminated_callback(void *user_data) { @@ -638,14 +760,16 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBa aws_cache_jni_ids(env); - struct aws_crt_mqtt_request_response_client_binding *rr_client_binding = (struct aws_crt_mqtt_request_response_client_binding *)jni_mqtt_request_response_client_handle; + struct aws_crt_mqtt_request_response_client_binding *rr_client_binding = + (struct aws_crt_mqtt_request_response_client_binding *)jni_mqtt_request_response_client_handle; if (rr_client_binding == NULL) { aws_jni_throw_runtime_exception(env, "streamingOperationNew: null request-response client binding"); return (jlong)NULL; } struct aws_allocator *allocator = aws_jni_get_allocator(); - struct aws_streaming_operation_binding *binding = aws_mem_calloc(allocator, 1, sizeof(struct aws_streaming_operation_binding)); + struct aws_streaming_operation_binding *binding = + aws_mem_calloc(allocator, 1, sizeof(struct aws_streaming_operation_binding)); binding->allocator = allocator; jint jvmresult = (*env)->GetJavaVM(env, &binding->jvm); @@ -656,13 +780,13 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBa struct aws_mqtt_streaming_operation_options stream_options; AWS_ZERO_STRUCT(stream_options); - stream_options.subscription_status_callback = ??; - stream_options.incoming_publish_callback = ??; - stream_options.terminated_callback = ??; + stream_options.subscription_status_callback = s_aws_mqtt_streaming_operation_subscription_status_callback; + stream_options.incoming_publish_callback = s_aws_mqtt_streaming_operation_incoming_publish_callback; + stream_options.terminated_callback = s_aws_mqtt_streaming_operation_terminated_callback; stream_options.user_data = binding; - jstring java_topic = (jstring)(*env)->GetObjectField( - env, java_options, streaming_operation_options_properties.topic_field_id); + jstring java_topic = + (jstring)(*env)->GetObjectField(env, java_options, streaming_operation_options_properties.topic_field_id); if (!java_topic) { aws_jni_throw_runtime_exception(env, "streamingOperationNew - topic is null"); goto error; @@ -670,18 +794,20 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBa stream_options.topic_filter = aws_jni_byte_cursor_from_jstring_acquire(env, java_topic); jobject java_incoming_publish_event_callback = (*env)->GetObjectField( - env, java_options, streaming_operation_options_properties.incoming_publish_event_callback_field_id); + env, java_options, streaming_operation_options_properties.incoming_publish_event_callback_field_id); if (java_incoming_publish_event_callback) { binding->java_incoming_publish_event_callback = (*env)->NewGlobalRef(env, java_incoming_publish_event_callback); } jobject java_subscription_status_event_callback = (*env)->GetObjectField( - env, java_options, streaming_operation_options_properties.subscription_status_event_callback_field_id); + env, java_options, streaming_operation_options_properties.subscription_status_event_callback_field_id); if (java_subscription_status_event_callback) { - binding->java_subscription_status_event_callback = (*env)->NewGlobalRef(env, java_subscription_status_event_callback); + binding->java_subscription_status_event_callback = + (*env)->NewGlobalRef(env, java_subscription_status_event_callback); } - binding->stream = aws_mqtt_request_response_client_create_streaming_operation(rr_client_binding->client, &stream_options); + binding->stream = + aws_mqtt_request_response_client_create_streaming_operation(rr_client_binding->client, &stream_options); aws_jni_byte_cursor_from_jstring_release(env, java_topic, stream_options.topic_filter); @@ -708,7 +834,8 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBas (void)jni_class; - struct aws_streaming_operation_binding *binding = (struct aws_streaming_operation_binding *)jni_streaming_operation_handle; + struct aws_streaming_operation_binding *binding = + (struct aws_streaming_operation_binding *)jni_streaming_operation_handle; struct aws_mqtt_rr_client_operation *stream = binding->stream; if (aws_mqtt_rr_client_operation_activate(stream)) { @@ -724,7 +851,8 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBas (void)env; (void)jni_class; - struct aws_streaming_operation_binding *binding = (struct aws_streaming_operation_binding *)jni_streaming_operation_handle; + struct aws_streaming_operation_binding *binding = + (struct aws_streaming_operation_binding *)jni_streaming_operation_handle; struct aws_mqtt_rr_client_operation *stream = binding->stream; binding->stream = NULL;