Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
Expand Down Expand Up @@ -43,6 +45,8 @@ object OneSignalDispatchers {
"$BASE_THREAD_NAME-IO" // Thread name prefix for I/O operations
private const val DEFAULT_THREAD_NAME_PREFIX =
"$BASE_THREAD_NAME-Default" // Thread name prefix for CPU operations
private const val SERIAL_IO_THREAD_NAME =
"$BASE_THREAD_NAME-SerialIO" // Single, named thread for order-sensitive work

private class OptimizedThreadFactory(
private val namePrefix: String,
Expand Down Expand Up @@ -80,6 +84,27 @@ object OneSignalDispatchers {
}
}

/**
* Single-thread executor for order-sensitive work (e.g. lifecycle event handlers that
* must run in the order the events fired on the main thread). Submission order on the
* main thread equals execution order on this thread, so callers don't need their own
* synchronization to preserve sequence — they only need to capture any time-sensitive
* state (timestamps, current state snapshots) on the caller before dispatching.
*/
private val serialIOExecutor: ExecutorService by lazy {
try {
Executors.newSingleThreadExecutor(
OptimizedThreadFactory(
namePrefix = SERIAL_IO_THREAD_NAME,
priority = Thread.NORM_PRIORITY - 1,
),
)
} catch (e: Exception) {
Logging.error("OneSignalDispatchers: Failed to create SerialIO executor: ${e.message}")
throw e
}
}

private val defaultExecutor: ThreadPoolExecutor by lazy {
try {
ThreadPoolExecutor(
Expand Down Expand Up @@ -117,6 +142,18 @@ object OneSignalDispatchers {
}
}

val SerialIO: CoroutineDispatcher by lazy {
try {
serialIOExecutor.asCoroutineDispatcher()
} catch (e: Exception) {
// Fall back to a limited-parallelism view of Dispatchers.IO. A single shared view
// serializes our submissions even when our own executor failed to start.
Logging.error("OneSignalDispatchers: Using fallback serialized Dispatchers.IO: ${e.message}")
@Suppress("OPT_IN_USAGE")
Dispatchers.IO.limitedParallelism(1)
}
}

private val IOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + IO)
}
Expand All @@ -125,6 +162,10 @@ object OneSignalDispatchers {
CoroutineScope(SupervisorJob() + Default)
}

private val SerialIOScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + SerialIO)
}

fun launchOnIO(block: suspend () -> Unit): Job {
return IOScope.launch { block() }
}
Expand All @@ -133,57 +174,136 @@ object OneSignalDispatchers {
return DefaultScope.launch { block() }
}

/**
* Launches [block] on the dedicated serial IO thread. Tasks submitted from any thread
* run one-at-a-time in submission order, which makes this the right entry point for
* order-sensitive lifecycle work (e.g. focus/unfocus handlers).
*/
fun launchOnSerialIO(block: suspend () -> Unit): Job {
return SerialIOScope.launch { block() }
}

@Volatile
private var prewarmStarted = false
private val prewarmLock = Any()

/**
* Triggers the lazy initialization of [IO], [Default], and [SerialIO] (and their backing
* executors, dispatchers, and scopes) on a short-lived background thread.
*
* Background:
* The lazy `by lazy` properties below construct `ThreadPoolExecutor` instances and wrap them
* in `asCoroutineDispatcher() + SupervisorJob() + CoroutineScope(...)`. Production OTel
* shows that when the **first** caller of [launchOnIO] / [launchOnSerialIO] is on the main
* thread (Activity-lifecycle handler, `JobService.onStartJob`, etc.), the construction cost
* — which includes a `kotlinx.coroutines.BuildersKt.launch` that hits
* `ThreadPoolExecutor.execute` and `LinkedBlockingQueue.offer` synchronously — is paid on
* the calling thread, blocking the main thread for many seconds on cold start. See SDK-4507.
*
* Calling [prewarm] from a non-time-sensitive spot in [com.onesignal.OneSignal.initWithContext]
* shifts that cost to a dedicated `OneSignal-prewarm` daemon thread, so the first
* production caller — including main-thread lifecycle handlers — only pays the much cheaper
* "submit work to an already-constructed executor" cost.
*
* Idempotent and fire-and-forget: a no-op on second and subsequent calls. Safe to invoke
* from any thread; the heavy lifting always happens on the daemon thread we spawn here, not
* on the caller. Failures are logged and swallowed because the executors retain their
* existing fallback paths (e.g. `Dispatchers.IO.limitedParallelism(1)` for [SerialIO]) and a
* failed prewarm will simply mean the first production caller pays the original cost.
*/
fun prewarm() {
if (prewarmStarted) return
synchronized(prewarmLock) {
if (prewarmStarted) return
prewarmStarted = true
}
val prewarmThread = Thread(
{
try {
// Each launch* call below triggers the corresponding lazy chain
// (executor -> dispatcher -> scope) and submits an empty coroutine,
// which forces the worker thread(s) to start as well.
launchOnIO { /* warm IOScope + ioExecutor */ }
launchOnDefault { /* warm DefaultScope + defaultExecutor */ }
launchOnSerialIO { /* warm SerialIOScope + serialIOExecutor */ }
} catch (e: Exception) {
Logging.warn("OneSignalDispatchers.prewarm failed: ${e.message}")
}
},
"$BASE_THREAD_NAME-prewarm",
)
prewarmThread.isDaemon = true
prewarmThread.priority = Thread.NORM_PRIORITY - 2
prewarmThread.start()
}

/**
* Test-only hook to reset [prewarmStarted] so different specs can exercise the
* "first call wins" branch independently. Not part of any public contract.
*/
internal fun resetPrewarmForTest() {
synchronized(prewarmLock) {
prewarmStarted = false
}
}

internal fun getPerformanceMetrics(): String {
return try {
val serialQueueSize =
(serialIOExecutor as? ThreadPoolExecutor)?.queue?.size?.toString() ?: "n/a"
val serialCompleted =
(serialIOExecutor as? ThreadPoolExecutor)?.completedTaskCount ?: 0L
"""
OneSignalDispatchers Performance Metrics:
- IO Pool: ${ioExecutor.activeCount}/${ioExecutor.corePoolSize} active/core threads
- IO Queue: ${ioExecutor.queue.size} pending tasks
- Default Pool: ${defaultExecutor.activeCount}/${defaultExecutor.corePoolSize} active/core threads
- Default Queue: ${defaultExecutor.queue.size} pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount) * 1024}KB (thread stacks, ~1MB each)
- SerialIO Queue: $serialQueueSize pending tasks
- Total completed tasks: ${ioExecutor.completedTaskCount + defaultExecutor.completedTaskCount + serialCompleted}
- Memory usage: ~${(ioExecutor.activeCount + defaultExecutor.activeCount + 1) * 1024}KB (thread stacks, ~1MB each)
""".trimIndent()
} catch (e: Exception) {
"OneSignalDispatchers not initialized or using fallback dispatchers ${e.message}"
}
}

internal fun getStatus(): String {
val ioExecutorStatus =
try {
if (ioExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"ioExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val defaultExecutorStatus =
try {
if (defaultExecutor.isShutdown) "Shutdown" else "Active"
} catch (e: Exception) {
"defaultExecutor Not initialized ${e.message ?: "Unknown error"}"
}

val ioScopeStatus =
try {
if (IOScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"IOScope Not initialized ${e.message ?: "Unknown error"}"
}

val defaultScopeStatus =
try {
if (DefaultScope.isActive) "Active" else "Cancelled"
} catch (e: Exception) {
"DefaultScope Not initialized ${e.message ?: "Unknown error"}"
}

return """
OneSignalDispatchers Status:
- IO Executor: $ioExecutorStatus
- Default Executor: $defaultExecutorStatus
- IO Scope: $ioScopeStatus
- Default Scope: $defaultScopeStatus
- IO Executor: ${executorStatus("ioExecutor") { ioExecutor.isShutdown }}
- Default Executor: ${executorStatus("defaultExecutor") { defaultExecutor.isShutdown }}
- SerialIO Executor: ${executorStatus("serialIOExecutor") { serialIOExecutor.isShutdown }}
- IO Scope: ${scopeStatus("IOScope") { IOScope.isActive }}
- Default Scope: ${scopeStatus("DefaultScope") { DefaultScope.isActive }}
- SerialIO Scope: ${scopeStatus("SerialIOScope") { SerialIOScope.isActive }}
""".trimIndent()
}

// Visible to tests so the failure branch (where `isShutdown()` itself throws — happens
// when the underlying executor's lazy initializer threw and re-throws on every access)
// can be exercised without forcing a real ThreadPoolExecutor to fail to construct, which
// is impractical to trigger in a unit test.
internal fun executorStatus(
name: String,
isShutdown: () -> Boolean,
): String =
try {
if (isShutdown()) "Shutdown" else "Active"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

internal fun scopeStatus(
name: String,
isActive: () -> Boolean,
): String =
try {
if (isActive()) "Active" else "Cancelled"
} catch (e: Exception) {
"$name $NOT_INITIALIZED ${e.message ?: UNKNOWN_ERROR}"
}

private const val NOT_INITIALIZED = "Not initialized"
private const val UNKNOWN_ERROR = "Unknown error"
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,61 @@ fun suspendifyOnDefault(block: suspend () -> Unit) {
suspendifyWithCompletion(useIO = false, block = block, onComplete = null)
}

/**
* Allows a non suspending function to create a scope that runs on the dedicated
* single-thread serial IO dispatcher. Tasks submitted from any thread run one-at-a-time
* in submission order, so callers do not need their own synchronization to preserve event
* ordering. This is the right entry point for lifecycle event handlers that do per-event
* work (timing, analytics, scheduler state) and must observe events in the order the
* Android framework fired them.
*
* Always routes through [OneSignalDispatchers.launchOnSerialIO] regardless of
* [ThreadingMode.useBackgroundThreading]: the serial ordering guarantee is the whole
* point of this helper, and the dedicated single thread carries none of the resource
* concerns that motivated the background-threading FF in the first place.
*
* Callers should capture any time-sensitive state (timestamps, "current" snapshots) on
* the caller's thread before invoking this — the work itself may run a few hundred ms
* later under load.
*
* @param block The suspending code to execute
*/
fun suspendifyOnSerialIO(block: suspend () -> Unit) {
OneSignalDispatchers.launchOnSerialIO {
try {
block()
} catch (e: Exception) {
Logging.error("Exception in suspendifyOnSerialIO", e)
}
}
}

/**
* Conditionally dispatches [block] to the serial IO thread when the SDK_BACKGROUND_THREADING
* remote feature flag is enabled, otherwise runs [block] inline on the calling thread.
*
* Used to gate incremental rollouts of off-main lifecycle work where we want to A/B compare
* the offloaded behavior against the previous inline behavior in production. Customers without
* the flag see the original code path; customers with the flag see the offloaded one.
*
* Differs from [suspendifyOnSerialIO] (which always routes through the serial dispatcher
* because its ordering guarantee is the whole point) and from [suspendifyOnIO] (which always
* dispatches off-main, just to a different pool depending on the flag).
*
* The block is non-suspending because the FF-off branch executes on the caller's thread —
* any suspending work would force a [kotlinx.coroutines.runBlocking] there, defeating the
* purpose of the comparison.
*
* @param block Non-suspending work to run inline (FF off) or on the serial IO thread (FF on).
*/
fun runOnSerialIOIfBackgroundThreading(block: () -> Unit) {
if (ThreadingMode.useBackgroundThreading) {
suspendifyOnSerialIO { block() }
} else {
block()
}
}

/**
* Modern utility for executing suspending code with completion callback.
* Uses OneSignal's centralized thread management for better resource control.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import android.content.ComponentName
import android.content.Context
import android.content.pm.PackageManager
import androidx.core.content.ContextCompat
import com.onesignal.common.threading.ThreadingMode
import com.onesignal.common.threading.suspendifyOnSerialIO
import com.onesignal.core.internal.application.IApplicationLifecycleHandler
import com.onesignal.core.internal.application.IApplicationService
import com.onesignal.core.internal.background.IBackgroundManager
Expand Down Expand Up @@ -70,12 +72,42 @@ internal class BackgroundManager(
_applicationService.addApplicationLifecycleHandler(this)
}

// JobScheduler.cancel / .schedule are synchronous Binder transactions to
// system_server that can block the calling thread for many seconds on some
// devices (notably Xiaomi/MIUI under power-save). Lifecycle callbacks fire
// on the main thread, so running these inline produces the SDK-4505 ANR.
//
// The offload is gated on ThreadingMode.useBackgroundThreading (remote FF
// `sdk_background_threading`, latched at app startup) so the fix can be
// rolled out gradually and an A/B comparison against the legacy inline
// path is possible:
//
// FF on -> suspendifyOnSerialIO { ... }
// Routes through OneSignalDispatchers.SerialIO, a dedicated
// single-thread executor. Submission order on the main thread
// equals execution order, so a rapid `unfocused -> focused`
// burst can't reorder a stale `schedule` past a fresh
// `cancel` (which a multi-threaded pool can). Required as
// soon as these handlers grow per-event work (session
// timing, analytics, focus counters) that can't be
// reconciled from a single state snapshot.
//
// FF off -> legacy inline path. Retains the ANR-prone behavior for the
// control cohort.
override fun onFocus(firedOnSubscribe: Boolean) {
cancelSyncTask()
if (ThreadingMode.useBackgroundThreading) {
suspendifyOnSerialIO { cancelSyncTask() }
} else {
cancelSyncTask()
}
}

override fun onUnfocused() {
scheduleBackground()
if (ThreadingMode.useBackgroundThreading) {
suspendifyOnSerialIO { scheduleBackground() }
} else {
scheduleBackground()
}
}

private fun scheduleBackground() {
Expand Down
Loading
Loading