diff --git a/plugins/radar-android-phone-telephony/src/main/java/org/radarbase/passive/phone/telephony/PhoneLogManager.kt b/plugins/radar-android-phone-telephony/src/main/java/org/radarbase/passive/phone/telephony/PhoneLogManager.kt index 562caf8b8..4fb2409bb 100644 --- a/plugins/radar-android-phone-telephony/src/main/java/org/radarbase/passive/phone/telephony/PhoneLogManager.kt +++ b/plugins/radar-android-phone-telephony/src/main/java/org/radarbase/passive/phone/telephony/PhoneLogManager.kt @@ -39,6 +39,7 @@ import java.util.* import java.util.concurrent.TimeUnit import java.util.regex.Pattern import android.os.Bundle +import org.radarbase.android.util.CoroutineTaskExecutor class PhoneLogManager(context: PhoneLogService) : AbstractSourceManager(context) { private val callTopic: DataCache = createCache( @@ -60,6 +61,8 @@ class PhoneLogManager(context: PhoneLogService) : AbstractSourceManager @@ -225,16 +229,20 @@ class PhoneLogManager(context: PhoneLogService) : AbstractSourceManager PhoneCallType.MISSED else -> PhoneCallType.UNKNOWN } - - send(callTopic, PhoneCall( - eventTimestamp, - currentTime, - duration, - targetKey, - type, - targetIsContact, - phoneNumber == null, - target.length)) + logsExecutor.execute { + send( + callTopic, PhoneCall( + eventTimestamp, + currentTime, + duration, + targetKey, + type, + targetIsContact, + phoneNumber == null, + target.length + ) + ) + } } private fun sendPhoneSms(eventTimestamp: Double, target: String, typeCode: Int, message: String, targetIsContact: Boolean) { @@ -255,21 +263,27 @@ class PhoneLogManager(context: PhoneLogService) : AbstractSourceManager val mac = bd.address - val hash = hashGenerator.createHashByteBuffer(mac + "$hashSaltReference") + val hash = + hashGenerator.createHashByteBuffer(mac + "$hashSaltReference") - send(bluetoothScannedTopic, scannedTopicBuilder.apply { + bluetoothTaskExecutor.execute { + send(bluetoothScannedTopic, scannedTopicBuilder.apply { this.macAddressHash = hash this.pairedState = bd.bondState.toPairedState() this.hashSaltReference = hashSaltReference }.build()) } - - send(bluetoothScannedTopic, scannedTopicBuilder.apply { - this.macAddressHash = macAddressHash - this.pairedState = device.bondState.toPairedState() - this.hashSaltReference = hashSaltReference - }.build()) - + } + bluetoothTaskExecutor.execute { + send(bluetoothScannedTopic, scannedTopicBuilder.apply { + this.macAddressHash = macAddressHash + this.pairedState = device.bondState.toPairedState() + this.hashSaltReference = hashSaltReference + }.build()) + } } BluetoothAdapter.ACTION_DISCOVERY_FINISHED -> { @@ -164,8 +172,13 @@ class PhoneBluetoothManager(service: PhoneBluetoothService) : AbstractSourceMana if (!isClosed) { val time = currentTime - send(bluetoothDevicesTopic, PhoneBluetoothDevices( - time, time, bondedDevices, numberOfDevices, true)) + bluetoothTaskExecutor.execute { + send( + bluetoothDevicesTopic, PhoneBluetoothDevices( + time, time, bondedDevices, numberOfDevices, true + ) + ) + } } } } @@ -182,7 +195,9 @@ class PhoneBluetoothManager(service: PhoneBluetoothService) : AbstractSourceMana } override fun onClose() { - processor.stop() + bluetoothTaskExecutor.stop { + processor.stop() + } bluetoothBroadcastReceiver?.let { try { service.unregisterReceiver(it) diff --git a/plugins/radar-android-phone/src/main/java/org/radarbase/passive/phone/PhoneContactListManager.kt b/plugins/radar-android-phone/src/main/java/org/radarbase/passive/phone/PhoneContactListManager.kt index 635e23164..d638130fe 100644 --- a/plugins/radar-android-phone/src/main/java/org/radarbase/passive/phone/PhoneContactListManager.kt +++ b/plugins/radar-android-phone/src/main/java/org/radarbase/passive/phone/PhoneContactListManager.kt @@ -27,6 +27,7 @@ import org.radarbase.android.data.DataCache import org.radarbase.android.source.AbstractSourceManager import org.radarbase.android.source.BaseSourceState import org.radarbase.android.source.SourceStatusListener +import org.radarbase.android.util.CoroutineTaskExecutor import org.radarbase.android.util.OfflineProcessor import org.radarcns.kafka.ObservationKey import org.radarcns.passive.phone.PhoneContactList @@ -43,6 +44,8 @@ class PhoneContactListManager(service: PhoneContactsListService) : AbstractSourc private val db: ContentResolver = service.contentResolver private var savedContactLookups: Set = emptySet() + private val contactsTaskExecutor = CoroutineTaskExecutor(this::class.simpleName!!) + init { name = service.getString(R.string.contact_list) processor = OfflineProcessor(service) { @@ -51,6 +54,7 @@ class PhoneContactListManager(service: PhoneContactsListService) : AbstractSourc requestName = ACTION_UPDATE_CONTACTS_LIST wake = false } + contactsTaskExecutor.start() } override fun start(acceptableIds: Set) { @@ -112,7 +116,9 @@ class PhoneContactListManager(service: PhoneContactsListService) : AbstractSourc } override fun onClose() { - processor.stop() + contactsTaskExecutor.stop { + processor.stop() + } } private fun makeQuery( @@ -136,7 +142,7 @@ class PhoneContactListManager(service: PhoneContactsListService) : AbstractSourc } } - private fun processContacts() { + private suspend fun processContacts() { val newContactLookups = queryContacts() ?: return var added: Int? = null diff --git a/plugins/radar-android-weather/src/main/java/net/aksingh/owmjapis/OpenWeatherMap.kt b/plugins/radar-android-weather/src/main/java/net/aksingh/owmjapis/OpenWeatherMap.kt index 8dfb149ca..6b17d0adf 100755 --- a/plugins/radar-android-weather/src/main/java/net/aksingh/owmjapis/OpenWeatherMap.kt +++ b/plugins/radar-android-weather/src/main/java/net/aksingh/owmjapis/OpenWeatherMap.kt @@ -21,9 +21,14 @@ */ package net.aksingh.owmjapis -import okhttp3.CacheControl -import okhttp3.OkHttpClient -import okhttp3.Request +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.request.header +import io.ktor.client.request.prepareRequest +import io.ktor.client.request.url +import io.ktor.http.HttpHeaders +import io.ktor.http.isSuccess +import kotlinx.coroutines.runBlocking import org.json.JSONException import org.json.JSONObject import org.slf4j.LoggerFactory @@ -66,7 +71,7 @@ import java.net.URLEncoder * @since 2.5.0.1 */ @Suppress("unused") -class OpenWeatherMap(units: String, lang: String, apiKey: String, client: OkHttpClient) { +class OpenWeatherMap(units: String, lang: String, apiKey: String, client: HttpClient) { private val owmAddressInstance: OWMAddress = OWMAddress(units, lang, apiKey) private val owmResponse: OWMResponse = OWMResponse(client, owmAddressInstance) @@ -327,7 +332,7 @@ class OpenWeatherMap(units: String, lang: String, apiKey: String, client: OkHttp * @since 2.5.0.3 */ private class OWMResponse( - private val client: OkHttpClient, + private val client: HttpClient, private val owmAddress: OWMAddress, ) { /* @@ -412,29 +417,31 @@ class OpenWeatherMap(units: String, lang: String, apiKey: String, client: OkHttp * @return Response if successful, else `null` * @see [HTTP - ](http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html) */ - private fun httpGet(requestAddress: String): String? { - val request: Request = Request.Builder() - .get() - .url(requestAddress) - .cacheControl(CacheControl.FORCE_NETWORK) - .header("Accept-Encoding", "gzip, deflate") - .build() - return try { - client.newCall(request).execute().use { response -> - val responseString = response.body?.string() - if (response.isSuccessful && responseString != null) { + private fun httpGet(requestAddress: String): String? = runBlocking{ + + val request = client.prepareRequest { + url(requestAddress) + header(HttpHeaders.AcceptEncoding, "gzip, deflate") + header(HttpHeaders.CacheControl, "no-cache") + header(HttpHeaders.Pragma, "no-cache") + } + + try { + request.execute { response -> + val responseString = response.body() + if (response.status.isSuccess() && responseString.isNotEmpty()) { responseString } else { logger.error( "Failed to request body (HTTP code {}): {}", - response.code, + response.status.value, responseString, ) null } } - } catch (e: IOException) { - logger.error("Failed to call OpenWeatherMap API", e) + } catch (e: Exception) { + logger.error("Failed to call API", e) null } } diff --git a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/OpenWeatherMapApi.kt b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/OpenWeatherMapApi.kt index dc88a0bd9..ad25cc227 100644 --- a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/OpenWeatherMapApi.kt +++ b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/OpenWeatherMapApi.kt @@ -16,16 +16,16 @@ package org.radarbase.passive.weather +import io.ktor.client.HttpClient import net.aksingh.owmjapis.CurrentWeather import net.aksingh.owmjapis.OpenWeatherMap -import okhttp3.OkHttpClient import org.json.JSONException import org.radarcns.passive.weather.WeatherCondition import java.io.IOException import java.math.BigDecimal import java.util.* -internal class OpenWeatherMapApi(apiKey: String, client: OkHttpClient) : WeatherApi { +internal class OpenWeatherMapApi(apiKey: String, client: HttpClient) : WeatherApi { private val owm: OpenWeatherMap = OpenWeatherMap(OpenWeatherMap.UNITS_METRIC, OpenWeatherMap.LANGUAGE_ENGLISH, apiKey, client) diff --git a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiManager.kt b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiManager.kt index db4101b74..6294a194b 100755 --- a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiManager.kt +++ b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiManager.kt @@ -21,10 +21,12 @@ import android.location.Location import android.location.LocationManager import android.location.LocationManager.GPS_PROVIDER import android.location.LocationManager.NETWORK_PROVIDER -import okhttp3.OkHttpClient +import io.ktor.client.HttpClient +import kotlinx.coroutines.SupervisorJob import org.radarbase.android.source.AbstractSourceManager import org.radarbase.android.source.BaseSourceState import org.radarbase.android.source.SourceStatusListener +import org.radarbase.android.util.CoroutineTaskExecutor import org.radarbase.android.util.NetworkConnectedReceiver import org.radarbase.android.util.OfflineProcessor import org.radarbase.passive.weather.WeatherApiService.Companion.WEATHER_QUERY_INTERVAL_DEFAULT @@ -34,12 +36,14 @@ import org.slf4j.LoggerFactory import java.io.IOException import java.util.concurrent.TimeUnit -class WeatherApiManager(service: WeatherApiService, private val client: OkHttpClient) : AbstractSourceManager(service) { +class WeatherApiManager(service: WeatherApiService, private val client: HttpClient) : AbstractSourceManager(service) { private val processor: OfflineProcessor private val weatherTopic = createCache("android_local_weather", LocalWeather()) private val networkReceiver: NetworkConnectedReceiver + private val weatherTaskExecutor: CoroutineTaskExecutor = CoroutineTaskExecutor(this::class.simpleName!!) + private val locationManager: LocationManager? = service.getSystemService(Context.LOCATION_SERVICE) as LocationManager? @get:Synchronized @@ -79,14 +83,16 @@ class WeatherApiManager(service: WeatherApiService, private val client: OkHttpCl } logger.info("Starting WeatherApiManager") - networkReceiver.register() + weatherTaskExecutor.execute { + networkReceiver.monitor() + } processor.start() - + weatherTaskExecutor.start(SupervisorJob()) status = SourceStatusListener.Status.CONNECTED } - private fun processWeather() { - if (!networkReceiver.state.isConnected) { + private suspend fun processWeather() { + if (networkReceiver.latestState !is NetworkConnectedReceiver.NetworkState.Connected) { logger.warn("No internet connection. Skipping weather query.") } @@ -158,8 +164,9 @@ class WeatherApiManager(service: WeatherApiService, private val client: OkHttpCl } override fun onClose() { - networkReceiver.unregister() - processor.stop() + weatherTaskExecutor.stop { + processor.stop() + } } companion object { diff --git a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiService.kt b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiService.kt index 8a72d598c..0e1532b3a 100644 --- a/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiService.kt +++ b/plugins/radar-android-weather/src/main/java/org/radarbase/passive/weather/WeatherApiService.kt @@ -16,28 +16,25 @@ package org.radarbase.passive.weather -import okhttp3.OkHttpClient +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO import org.radarbase.android.config.SingleRadarConfiguration import org.radarbase.android.source.BaseSourceState import org.radarbase.android.source.SourceManager import org.radarbase.android.source.SourceService import org.radarbase.config.ServerConfig import org.radarbase.passive.weather.WeatherApiManager.Companion.SOURCE_OPENWEATHERMAP -import org.radarbase.producer.rest.RestClient import java.util.concurrent.TimeUnit class WeatherApiService : SourceService() { - private lateinit var client: OkHttpClient + private lateinit var client: HttpClient override val defaultState: BaseSourceState get() = BaseSourceState() override fun onCreate() { super.onCreate() - client = RestClient.global() - .server(ServerConfig()) - .build() - .httpClient // global OkHttpClient + client = HttpClient(CIO) } override fun createSourceManager() = WeatherApiManager(this, client) @@ -51,6 +48,11 @@ class WeatherApiService : SourceService() { ) } + override fun onDestroy() { + super.onDestroy() + client.close() + } + companion object { private const val WEATHER_QUERY_INTERVAL = "weather_query_interval_seconds" private const val WEATHER_API_SOURCE = "weather_api_source"