feat(liveobjects): LiveObjects implementation to new path-based API (Kotlin)#1223
Conversation
…alue types
Fills in the Kotlin implementation behind the LiveObjects public interfaces
(lib/.../liveobjects), replacing the TODO stubs in DefaultRealtimeObject with
a working objects sync engine and the LiveCounter / LiveMap value types.
Sync engine (io.ably.lib.liveobjects):
- ObjectsManager - processes OBJECT / OBJECT_SYNC messages, buffers ops
during sync, creates zero-value objects (RTO5-RTO7).
- ObjectsPool - object store keyed by objectId with tombstone GC
(grace period from connectionDetails, RTO3).
- ObjectsSyncTracker/ObjectsState - track sync sequence + state transitions
and emit SYNCING/SYNCED events (RTO2, RTO5).
- ObjectId - SHA-256 based object identity from initialValue+nonce
(RTO6b1, RTO14).
- ServerTime - server-time offset with double-checked locking (RTO16).
- ObjectsOperationSource - LOCAL vs CHANNEL operation origin (RTO22).
- DefaultRealtimeObject - wires the above into the RealtimeObject entry point.
Value types (io.ably.lib.liveobjects.value):
- LiveCounter and LiveMap are each split into an Internal* value holder, a
*Manager (applies state/operations), and a *ChangeCoordinator (subscriber
notification), over a shared BaseRealtimeLiveObject. DefaultLiveCounter /
DefaultLiveMap move into livecounter/ and livemap/ subpackages.
Reflection / R8 consistency (see prior review):
- LiveCounter.java / LiveMap.java IMPLEMENTATION_CLASS constants updated to the
new value.livecounter / value.livemap package locations (previously pointed
at the old value.* path and would have thrown ClassNotFoundException).
- android/proguard.txt keep-rules migrated to io.ably.lib.liveobjects.*, aligned
with the actual class locations and reflective constructor signatures, plus
added keeps for WireObjectDataJsonSerializer and the Wire* message model that
Gson (de)serializes by field-name reflection on the JSON transport path.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
WalkthroughThis PR implements the full LiveObjects feature (LiveMap and LiveCounter) for the Kotlin ChangesLiveObjects Feature Implementation
Estimated code review effort: 5 (Critical) | ~120 minutes Sequence Diagram(s)sequenceDiagram
participant Client
participant InternalLiveMap
participant DefaultRealtimeObject
participant Adapter
participant ObjectsManager
Client->>InternalLiveMap: set(key, value)
InternalLiveMap->>DefaultRealtimeObject: publishAndApply(MapSet message)
DefaultRealtimeObject->>Adapter: send ProtocolMessage
Adapter-->>DefaultRealtimeObject: ack (serial, siteCode)
DefaultRealtimeObject->>ObjectsManager: applyAckResult(synthetic message)
ObjectsManager->>InternalLiveMap: applyObjectOperation
InternalLiveMap-->>Client: updated value
sequenceDiagram
participant Channel
participant DefaultRealtimeObject
participant ObjectsManager
participant ObjectsPool
Channel->>DefaultRealtimeObject: handle(protocol message)
DefaultRealtimeObject->>DefaultRealtimeObject: objectsEventBus.emit(message)
DefaultRealtimeObject->>ObjectsManager: handleObjectSyncMessages / handleObjectMessages
ObjectsManager->>ObjectsPool: createZeroValueObjectIfNotExists / get
ObjectsPool-->>ObjectsManager: BaseRealtimeObject
ObjectsManager-->>DefaultRealtimeObject: sync state updated
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
LiveObjects implementation to new path-based API (Kotlin)
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (1)
liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt (1)
63-69: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueEnum-based no-op selection is a minor OCP smell.
Branching on
objectTypeto picknoOpMapUpdate/noOpCounterUpdatecouples the base class to the concrete set of subtypes. Consider exposing an abstractnoOpUpdate: ObjectUpdateproperty/hook overridden byInternalLiveMap/InternalLiveCounterinstead, consistent with the other abstract hooks (applyObjectState,clearData, etc.) already used for type-specific behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt` around lines 63 - 69, The tombstoned-state handling in BaseRealtimeLiveObject is branching on objectType to choose between noOpMapUpdate and noOpCounterUpdate, which tightly couples the base class to concrete subtypes. Replace that enum-based selection with an abstract noOpUpdate: ObjectUpdate hook on BaseRealtimeLiveObject, and override it in InternalLiveMap and InternalLiveCounter alongside the existing type-specific hooks like applyObjectState and clearData so tombstoned processing can just return noOpUpdate.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt`:
- Around line 10-14: The getInstance implementation in DefaultLiveObjectsPlugin
uses ConcurrentHashMap.getOrPut, which can create extra DefaultRealtimeObject
instances during concurrent first access and leave losing instances undisposed.
Replace the getOrPut usage with ConcurrentMap.computeIfAbsent on objects so only
one DefaultRealtimeObject is constructed per channel, and keep the caching
behavior inside getInstance unchanged.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt`:
- Around line 315-322: The exception handling in DefaultRealtimeObject while
deriving the channel error reason is swallowing unexpected failures from
adapter.getChannel(channelName). Update the try/catch in the channel state
handling block to distinguish the expected missing-channel case from real
errors, and log or surface unexpected exceptions before returning null. Use the
surrounding getChannel(channelName) / errorReason logic to locate the fix and
keep the fallback behavior only for the anticipated case.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt`:
- Around line 82-88: The startNewSync() flow is replacing syncCompletionWaiter
and can orphan an ACK waiter already created by applyAckResult() when the
manager is not yet Synced. Update ObjectsManager.startNewSync() so it preserves
any existing waiter and only creates a new CompletableDeferred when none exists,
allowing endSync() to resume the original suspended caller instead of leaving it
hanging.
- Around line 152-155: The early return in ObjectsManager.applySync() skips
stale-object cleanup when syncObjectsPool is empty, so previous objects can
remain in the pool. Update applySync() to always run deleteExtraObjectIds() even
for empty syncs, and only guard the later sync-processing logic behind the empty
check. Use the applySync() and deleteExtraObjectIds() symbols to keep the
cleanup path reachable regardless of syncObjectsPool contents.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt`:
- Around line 136-143: The GC cleanup in ObjectsPool.onGCInterval can remove
ROOT_OBJECT_ID even though other cleanup paths preserve it. Update the removeIf
logic so the root entry is never eligible for removal, and keep only non-root
objects subject to isEligibleForGc(gcGracePeriod) while still calling
obj.onGCInterval(gcGracePeriod) for retained entries. Make the fix in
ObjectsPool and ensure getRootAsync can continue to rely on the root LiveMap
always being present.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ServerTime.kt`:
- Around line 17-36: Scope the cached server-time offset to the adapter clock
instead of keeping it in the process-global ServerTime.serverTimeOffset. Update
ServerTime.getCurrentTime to store and read the offset per SystemClock derived
from adapter.clientOptions, or move the cache into the owning
realtime/liveobjects instance so each client lifecycle has its own value. Keep
the mutex-based lazy initialization, but ensure the cached offset cannot be
reused across different clocks/adapters.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt`:
- Around line 44-47: The tombstone state in BaseRealtimeLiveObject is published
in the wrong order: tombstone() currently sets the volatile isTombstoned flag
before updating tombstonedAt, which can let isEligibleForGc observe a stale null
timestamp. Update the write order in BaseRealtimeLiveObject.tombstone() so
tombstonedAt is assigned first and isTombstoned is set last, preserving the
volatile visibility guarantee for the GC coroutine. Also verify any other
tombstone-related writes in BaseRealtimeLiveObject use the same
guarded-state-then-flag pattern.
- Around line 79-121: `applyObject()` is throwing on malformed per-object
fields, but the current handling is only at the broader protocol-message level,
so one bad object can stop later items in the same batch. Move the error
isolation to the per-item loop in the protocol message handler that calls
`applyObject()` and catch/report exceptions around each object entry
individually. Keep `applyObject()` and `canApplyOperation()` as the validation
points, but ensure a failure for one `WireObjectMessage` does not prevent
processing the remaining `protocolMessage.state` items.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt`:
- Around line 96-104: In InternalLiveCounter.notifyUpdated, the current cast
from ObjectUpdate to LiveCounterChangeEvent is invalid and will break on real
updates. Replace the cast with creation/emission of the appropriate
LiveCounterChangeEvent based on the received ObjectUpdate, and pass that to
liveCounterManager.notify so subscribers continue receiving counter updates.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt`:
- Around line 44-48: The CounterCreate path in LiveCounterManager.applyOperation
is always reporting success even when applyCounterCreate returns a no-op update
for an already-merged create. Update the CounterCreate branch to base its return
value on whether applyCounterCreate produced a real change (for example by
checking the returned update against the no-op case) so
BaseRealtimeLiveObject.applyObject can correctly treat duplicate creates as not
meaningfully applied.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt`:
- Around line 63-97: Add the same tombstone short-circuit used by
InternalLiveMap.get() to entries(), keys(), values(), and size() so tombstoned
maps do not expose stale data; check isTombstoned early in each method and
return an empty iterable or zero size before touching data or calling
getResolvedValue()/count. Keep the fix localized in InternalLiveMap and reuse
the existing tombstone behavior pattern already present in get().
- Around line 171-179: The notifyUpdated method in InternalLiveMap is casting
ObjectUpdate to LiveMapChangeEvent, which will fail at runtime. Update this path
so liveMapManager.notify receives the actual change event object already
available in this flow instead of the ObjectUpdate instance; keep the noOp guard
and logging intact, but replace the invalid cast with the correct event payload
used by the live map update pipeline.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt`:
- Around line 389-397: The validate(state: WireObjectState) method in
LiveMapManager is validating map semantics even for tombstoned states, which can
fail when state.map is null. Update the semantics check in validate() to skip
tombstoned payloads by gating validateMapSemantics on !state.tombstone or
state.map != null, while keeping the existing objectId and createOp validation
intact.
---
Nitpick comments:
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt`:
- Around line 63-69: The tombstoned-state handling in BaseRealtimeLiveObject is
branching on objectType to choose between noOpMapUpdate and noOpCounterUpdate,
which tightly couples the base class to concrete subtypes. Replace that
enum-based selection with an abstract noOpUpdate: ObjectUpdate hook on
BaseRealtimeLiveObject, and override it in InternalLiveMap and
InternalLiveCounter alongside the existing type-specific hooks like
applyObjectState and clearData so tombstoned processing can just return
noOpUpdate.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bb581655-c962-49c0-b607-c041a53ae951
📒 Files selected for processing (23)
android/proguard.txtlib/src/main/java/io/ably/lib/liveobjects/value/LiveCounter.javalib/src/main/java/io/ably/lib/liveobjects/value/LiveMap.javaliveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectId.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsOperationSource.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsState.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsSyncTracker.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/ServerTime.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/message/WireObjectMessage.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.ktliveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt
| private val objects = ConcurrentHashMap<String, DefaultRealtimeObject>() | ||
|
|
||
| override fun getInstance(channelName: String): RealtimeObject { | ||
| return objects.getOrPut(channelName) { DefaultRealtimeObject(channelName, adapter) } | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the plugin and object lifecycle code referenced in the comment.
git ls-files 'liveobjects/src/main/kotlin/io/ably/lib/liveobjects/*.kt'
echo '---'
sed -n '1,120p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt
echo '---'
sed -n '1,220p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt
echo '---'
sed -n '1,220p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/RealtimeObject.kt
echo '---'
rg -n "getOrPut|computeIfAbsent|dispose\(|incomingObjectsHandler|launch\(" liveobjects/src/main/kotlin/io/ably/lib/liveobjectsRepository: ably/ably-java
Length of output: 14405
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Read the remaining lifecycle methods and locate the RealtimeObject interface.
sed -n '220,420p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt
echo '---'
fd -a 'RealtimeObject.kt' .
echo '---'
rg -n "interface RealtimeObject|class DefaultRealtimeObject|dispose\(" liveobjects/src/main/kotlin/io/ably/lib/liveobjectsRepository: ably/ably-java
Length of output: 8199
🌐 Web query:
Kotlin stdlib ConcurrentMap getOrPut defaultValue may be invoked even if key is already in the map documentation
💡 Result:
Yes, it is explicitly documented that the defaultValue function for ConcurrentMap.getOrPut in the Kotlin standard library may be invoked even if the key is already present in the map [1][2][3]. The official Kotlin documentation states: "This method guarantees not to put the value into the map if the key is already there, but the defaultValue function may be invoked even if the key is already in the map" [2][4][3]. This behavior occurs because of the implementation of the function [1][3]: public inline fun <K, V> ConcurrentMap<K, V>.getOrPut(key: K, defaultValue: -> V): V { // Do not use computeIfAbsent on JVM8 as it would change locking behavior return this.get(key)?: defaultValue.let { default -> this.putIfAbsent(key, default)?: default } } As shown above, the function first attempts to retrieve the value using get(key) [1][3]. If the key is missing or mapped to null, it proceeds to invoke the defaultValue function before attempting to put the result into the map using putIfAbsent [1][3]. Because this check and the subsequent invocation are not performed within a single atomic operation, the defaultValue function may be called unnecessarily if another thread inserts the key into the map after the initial get(key) call but before the defaultValue function is executed [5].
Citations:
- 1: https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/jvm/src/kotlin/collections/MapsJVM.kt
- 2: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/get-or-put.html
- 3: https://github.com/JetBrains/kotlin/blob/2.3.0/libraries/stdlib/jvm/src/kotlin/collections/MapsJVM.kt
- 4: https://kotlinlang.org/api/core/kotlin-stdlib/kotlin.collections/get-or-put.html
- 5: https://stackoverflow.com/questions/76514680/should-we-use-computeifabsent-instead-of-getorput
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect build configuration for Java/Kotlin target compatibility.
git ls-files '*build.gradle*' '*settings.gradle*' 'gradle.properties' 'gradle/libs.versions.toml' 'pom.xml'
echo '---'
rg -n "sourceCompatibility|targetCompatibility|jvmTarget|kotlinOptions|toolchain|javaVersion|JavaVersion|kotlin.*jvm|org.jetbrains.kotlin.jvm|jvmToolchain" \
-g '!*build*' -g '!**/build/**' \
.Repository: ably/ably-java
Length of output: 1287
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1,220p' liveobjects/build.gradle.kts
echo '---'
sed -n '1,220p' build.gradle.kts
echo '---'
sed -n '1,220p' gradle.propertiesRepository: ably/ably-java
Length of output: 3505
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n "JavaVersion\.VERSION_|sourceCompatibility|targetCompatibility|jvmTarget|jvmToolchain|toolchain|VERSION_1_8|VERSION_11|VERSION_17" \
liveobjects build.gradle.kts gradle.properties gradle/libs.versions.toml java build.gradle.ktsRepository: ably/ably-java
Length of output: 302
Avoid getOrPut here; concurrent first-time calls can start and drop extra DefaultRealtimeObject instances.
ConcurrentMap.getOrPut may invoke the factory even when another thread wins the race, and DefaultRealtimeObject starts a long-lived coroutine in init. That leaves losing instances undisposed. Use computeIfAbsent so only one object is constructed per channel.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt`
around lines 10 - 14, The getInstance implementation in DefaultLiveObjectsPlugin
uses ConcurrentHashMap.getOrPut, which can create extra DefaultRealtimeObject
instances during concurrent first access and leave losing instances undisposed.
Replace the getOrPut usage with ConcurrentMap.computeIfAbsent on objects so only
one DefaultRealtimeObject is constructed per channel, and keep the caching
behavior inside getInstance unchanged.
| ChannelState.detached, | ||
| ChannelState.suspended, | ||
| ChannelState.failed -> { | ||
| val errorReason = try { | ||
| adapter.getChannel(channelName).reason | ||
| } catch (e: Exception) { | ||
| null | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
Swallowed exception when deriving the channel's error reason.
The caught exception is discarded without logging, hiding potential unexpected failures from adapter.getChannel(channelName) beyond the expected "channel missing" case.
🔧 Suggested fix
val errorReason = try {
adapter.getChannel(channelName).reason
- } catch (e: Exception) {
+ } catch (e: Exception) {
+ Log.v(tag, "Could not retrieve channel reason while handling state change for channel=$channelName", e)
null
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ChannelState.detached, | |
| ChannelState.suspended, | |
| ChannelState.failed -> { | |
| val errorReason = try { | |
| adapter.getChannel(channelName).reason | |
| } catch (e: Exception) { | |
| null | |
| } | |
| ChannelState.detached, | |
| ChannelState.suspended, | |
| ChannelState.failed -> { | |
| val errorReason = try { | |
| adapter.getChannel(channelName).reason | |
| } catch (e: Exception) { | |
| Log.v(tag, "Could not retrieve channel reason while handling state change for channel=$channelName", e) | |
| null | |
| } |
🧰 Tools
🪛 detekt (1.23.8)
[warning] 320-320: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt`
around lines 315 - 322, The exception handling in DefaultRealtimeObject while
deriving the channel error reason is swallowing unexpected failures from
adapter.getChannel(channelName). Update the try/catch in the channel state
handling block to distinguish the expected missing-channel case from real
errors, and log or surface unexpected exceptions before returning null. Use the
surrounding getChannel(channelName) / errorReason logic to locate the fix and
keep the fallback behavior only for the anticipated case.
Source: Linters/SAST tools
| internal fun startNewSync(syncId: String?) { | ||
| Log.v(tag, "Starting new sync sequence: syncId=$syncId") | ||
|
|
||
| syncObjectsPool.clear() // RTO5a2a | ||
| currentSyncId = syncId | ||
| syncCompletionWaiter = CompletableDeferred() | ||
| stateChange(ObjectsState.Syncing) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Preserve existing ACK waiters when sync starts.
Line 87 replaces syncCompletionWaiter. If applyAckResult() already created a waiter while state was not Synced, that suspended caller is orphaned and never resumed by endSync().
Proposed fix
- syncCompletionWaiter = CompletableDeferred()
+ if (syncCompletionWaiter == null) {
+ syncCompletionWaiter = CompletableDeferred()
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| internal fun startNewSync(syncId: String?) { | |
| Log.v(tag, "Starting new sync sequence: syncId=$syncId") | |
| syncObjectsPool.clear() // RTO5a2a | |
| currentSyncId = syncId | |
| syncCompletionWaiter = CompletableDeferred() | |
| stateChange(ObjectsState.Syncing) | |
| internal fun startNewSync(syncId: String?) { | |
| Log.v(tag, "Starting new sync sequence: syncId=$syncId") | |
| syncObjectsPool.clear() // RTO5a2a | |
| currentSyncId = syncId | |
| if (syncCompletionWaiter == null) { | |
| syncCompletionWaiter = CompletableDeferred() | |
| } | |
| stateChange(ObjectsState.Syncing) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt` around
lines 82 - 88, The startNewSync() flow is replacing syncCompletionWaiter and can
orphan an ACK waiter already created by applyAckResult() when the manager is not
yet Synced. Update ObjectsManager.startNewSync() so it preserves any existing
waiter and only creates a new CompletableDeferred when none exists, allowing
endSync() to resume the original suspended caller instead of leaving it hanging.
| private fun applySync() { | ||
| if (syncObjectsPool.isEmpty()) { | ||
| return | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Do not skip stale-object cleanup for empty syncs.
An empty syncObjectsPool currently returns before deleteExtraObjectIds(), leaving objects from the previous state in the pool even though they were not received in the sync sequence.
Proposed fix
private fun applySync() {
+ val receivedObjectIds = mutableSetOf<String>()
if (syncObjectsPool.isEmpty()) {
+ realtimeObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds)
return
}
- val receivedObjectIds = mutableSetOf<String>()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private fun applySync() { | |
| if (syncObjectsPool.isEmpty()) { | |
| return | |
| } | |
| private fun applySync() { | |
| val receivedObjectIds = mutableSetOf<String>() | |
| if (syncObjectsPool.isEmpty()) { | |
| realtimeObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds) | |
| return | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt` around
lines 152 - 155, The early return in ObjectsManager.applySync() skips
stale-object cleanup when syncObjectsPool is empty, so previous objects can
remain in the pool. Update applySync() to always run deleteExtraObjectIds() even
for empty syncs, and only guard the later sync-processing logic behind the empty
check. Use the applySync() and deleteExtraObjectIds() symbols to keep the
cleanup path reachable regardless of syncObjectsPool contents.
| private fun onGCInterval() { | ||
| pool.entries.removeIf { (_, obj) -> | ||
| if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool | ||
| else { | ||
| obj.onGCInterval(gcGracePeriod) | ||
| false // Keep in pool | ||
| } | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Keep the root object in the pool during GC.
Other pool cleanup paths preserve ROOT_OBJECT_ID, but GC can remove it if isEligibleForGc() returns true. getRootAsync() later assumes root exists and casts it to LiveMap.
Proposed fix
private fun onGCInterval() {
- pool.entries.removeIf { (_, obj) ->
+ pool.entries.removeIf { (key, obj) ->
+ if (key == ROOT_OBJECT_ID) {
+ obj.onGCInterval(gcGracePeriod)
+ return@removeIf false
+ }
if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool
else {
obj.onGCInterval(gcGracePeriod)
false // Keep in pool📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private fun onGCInterval() { | |
| pool.entries.removeIf { (_, obj) -> | |
| if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool | |
| else { | |
| obj.onGCInterval(gcGracePeriod) | |
| false // Keep in pool | |
| } | |
| } | |
| private fun onGCInterval() { | |
| pool.entries.removeIf { (key, obj) -> | |
| if (key == ROOT_OBJECT_ID) { | |
| obj.onGCInterval(gcGracePeriod) | |
| return@removeIf false | |
| } | |
| if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool | |
| else { | |
| obj.onGCInterval(gcGracePeriod) | |
| false // Keep in pool | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt` around
lines 136 - 143, The GC cleanup in ObjectsPool.onGCInterval can remove
ROOT_OBJECT_ID even though other cleanup paths preserve it. Update the removeIf
logic so the root entry is never eligible for removal, and keep only non-root
objects subject to isEligibleForGc(gcGracePeriod) while still calling
obj.onGCInterval(gcGracePeriod) for retained entries. Make the fix in
ObjectsPool and ensure getRootAsync can continue to rely on the root LiveMap
always being present.
| override fun notifyUpdated(update: ObjectUpdate) { | ||
| if (update.noOp) { | ||
| return | ||
| } | ||
| Log.v(tag, "Object $objectId updated: $update") | ||
|
|
||
| // TODO - Current cast for emitting event is wrong, need to fix the same. | ||
| liveCounterManager.notify(update as LiveCounterChangeEvent) | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🔴 Critical | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n "class ObjectUpdate" -A 10 liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt
rg -n "interface InstanceSubscriptionEvent" -A 10 liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instanceRepository: ably/ably-java
Length of output: 501
Replace the ObjectUpdate cast before emitting counter updates
notifyUpdated() receives an ObjectUpdate, not a LiveCounterChangeEvent, so update as LiveCounterChangeEvent will fail on the first non-no-op update and stop subscriber notifications. Emit a real counter event instead of casting.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt`
around lines 96 - 104, In InternalLiveCounter.notifyUpdated, the current cast
from ObjectUpdate to LiveCounterChangeEvent is invalid and will break on real
updates. Replace the cast with creation/emission of the appropriate
LiveCounterChangeEvent based on the received ObjectUpdate, and pass that to
liveCounterManager.notify so subscribers continue receiving counter updates.
Source: Learnings
| WireObjectOperationAction.CounterCreate -> { | ||
| val update = applyCounterCreate(operation) // RTLC7d1 | ||
| liveCounter.notifyUpdated(update) // RTLC7d1a | ||
| true // RTLC7d1b | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
CounterCreate branch always returns true, even for already-merged (no-op) creates.
applyCounterCreate returns noOpCounterUpdate when the create op was already merged (Lines 74-84), but applyOperation unconditionally returns true for this branch. This breaks the "true if the operation was meaningfully applied, false otherwise" contract that BaseRealtimeLiveObject.applyObject relies on for dedup/ack tracking, potentially causing duplicate creates to be counted as meaningful applications downstream.
🐛 Proposed fix
WireObjectOperationAction.CounterCreate -> {
val update = applyCounterCreate(operation) // RTLC7d1
liveCounter.notifyUpdated(update) // RTLC7d1a
- true // RTLC7d1b
+ !update.noOp // RTLC7d1b - false when the create op was already merged
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| WireObjectOperationAction.CounterCreate -> { | |
| val update = applyCounterCreate(operation) // RTLC7d1 | |
| liveCounter.notifyUpdated(update) // RTLC7d1a | |
| true // RTLC7d1b | |
| } | |
| WireObjectOperationAction.CounterCreate -> { | |
| val update = applyCounterCreate(operation) // RTLC7d1 | |
| liveCounter.notifyUpdated(update) // RTLC7d1a | |
| !update.noOp // RTLC7d1b - false when the create op was already merged | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt`
around lines 44 - 48, The CounterCreate path in
LiveCounterManager.applyOperation is always reporting success even when
applyCounterCreate returns a no-op update for an already-merged create. Update
the CounterCreate branch to base its return value on whether applyCounterCreate
produced a real change (for example by checking the returned update against the
no-op case) so BaseRealtimeLiveObject.applyObject can correctly treat duplicate
creates as not meaningfully applied.
| fun entries(): Iterable<Map.Entry<String, LiveMapValue>> { | ||
| adapter.throwIfInvalidAccessApiConfiguration(channelName) // RTLM11b, RTLM11c | ||
|
|
||
| return sequence<Map.Entry<String, LiveMapValue>> { | ||
| for ((key, entry) in data.entries) { | ||
| val value = entry.getResolvedValue(objectsPool) // RTLM11d, RTLM11d2 | ||
| value?.let { | ||
| yield(AbstractMap.SimpleImmutableEntry(key, it)) | ||
| } | ||
| } | ||
| }.asIterable() | ||
| } | ||
|
|
||
| fun keys(): Iterable<String> { | ||
| val iterableEntries = entries() | ||
| return sequence { | ||
| for (entry in iterableEntries) { | ||
| yield(entry.key) // RTLM12b | ||
| } | ||
| }.asIterable() | ||
| } | ||
|
|
||
| fun values(): Iterable<LiveMapValue> { | ||
| val iterableEntries = entries() | ||
| return sequence { | ||
| for (entry in iterableEntries) { | ||
| yield(entry.value) // RTLM13b | ||
| } | ||
| }.asIterable() | ||
| } | ||
|
|
||
| fun size(): Long { | ||
| adapter.throwIfInvalidAccessApiConfiguration(channelName) | ||
| return data.values.count { !it.isEntryOrRefTombstoned(objectsPool) }.toLong() // RTLM10d | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Missing tombstone guard on entries()/keys()/values()/size().
get() explicitly short-circuits when isTombstoned (RTLM5b/c), but entries(), keys(), values(), and size() do not perform the same check before reading/counting data. If the map object is tombstoned while data isn't guaranteed empty, these methods can surface stale entries or a non-zero size for a deleted object.
🐛 Proposed fix
fun entries(): Iterable<Map.Entry<String, LiveMapValue>> {
adapter.throwIfInvalidAccessApiConfiguration(channelName) // RTLM11b, RTLM11c
+ if (isTombstoned) {
+ return emptyList()
+ }
return sequence<Map.Entry<String, LiveMapValue>> {Apply an analogous guard to size() as well.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt`
around lines 63 - 97, Add the same tombstone short-circuit used by
InternalLiveMap.get() to entries(), keys(), values(), and size() so tombstoned
maps do not expose stale data; check isTombstoned early in each method and
return an empty iterable or zero size before touching data or calling
getResolvedValue()/count. Keep the fix localized in InternalLiveMap and reuse
the existing tombstone behavior pattern already present in get().
| override fun notifyUpdated(update: ObjectUpdate) { | ||
| if (update.noOp) { | ||
| return | ||
| } | ||
| Log.v(tag, "Object $objectId updated: $update") | ||
|
|
||
| // TODO - Current cast for emitting event is wrong, need to fix the same. | ||
| liveMapManager.notify(update as LiveMapChangeEvent) | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🔴 Critical | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
ast-grep run --pattern 'class ObjectUpdate $$$' --lang kotlin liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt
rg -n 'class ObjectUpdate|interface LiveMapChangeEvent|interface InstanceSubscriptionEvent' -A5 liveobjects/src/main/kotlinRepository: ably/ably-java
Length of output: 2033
Replace the invalid cast before emitting updates. ObjectUpdate is a separate data class, so liveMapManager.notify(update as LiveMapChangeEvent) will throw a ClassCastException on real map updates. Pass the actual change event here instead.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt`
around lines 171 - 179, The notifyUpdated method in InternalLiveMap is casting
ObjectUpdate to LiveMapChangeEvent, which will fail at runtime. Update this path
so liveMapManager.notify receives the actual change event object already
available in this flow instead of the ObjectUpdate instance; keep the noOp guard
and logging intact, but replace the invalid cast with the correct event payload
used by the live map update pipeline.
| internal fun validate(state: WireObjectState) { | ||
| liveMap.validateObjectId(state.objectId) | ||
| validateMapSemantics(state.map?.semantics) | ||
| state.createOp?.let { createOp -> | ||
| liveMap.validateObjectId(createOp.objectId) | ||
| validateMapCreateAction(createOp.action) | ||
| validateMapSemantics(getEffectiveMapCreate(createOp)?.semantics) | ||
| } | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n 'fun validate\(|state.tombstone|\.map\s*!=\s*null' -A5 liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.ktRepository: ably/ably-java
Length of output: 422
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- LiveMapManager.kt outline ---'
ast-grep outline liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt --view expanded
echo
echo '--- BaseRealtimeLiveObject.kt outline ---'
ast-grep outline liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt --view expanded
echo
echo '--- Validate-related context in LiveMapManager.kt ---'
rg -n 'internal fun validate|validateMapSemantics|state\.tombstone|createOp|tombstone|WireObjectState' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt -A6 -B6
echo
echo '--- WireObjectState definition ---'
rg -n 'data class WireObjectState|class WireObjectState|interface WireObjectState|tombstone|map:' liveobjects/src/main/kotlin -A8 -B4
echo
echo '--- Calls to validate\(state\) ---'
rg -n '\.validate\(state\)|validate\(state\)' liveobjects/src/main/kotlin -A4 -B4Repository: ably/ably-java
Length of output: 50370
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the specific implementation details with line numbers.
sed -n '360,430p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt
echo
sed -n '1,260p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt
echo
# Search for the wire state type and tombstone handling.
rg -n 'WireObjectState|tombstone|map\?' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value -A3 -B3Repository: ably/ably-java
Length of output: 50370
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Narrow line-numbered reads to avoid large output.
nl -ba liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt | sed -n '370,420p'
echo
nl -ba liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt | sed -n '140,210p'
echo
# Find the concrete WireObjectState declaration, if any.
rg -n 'data class WireObjectState|class WireObjectState|interface WireObjectState' liveobjects/src/main/kotlin -A12 -B4Repository: ably/ably-java
Length of output: 192
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the exact implementation around applyState and validate.
nl -ba liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt | sed -n '1,140p'
echo
# Find tests or fixtures that construct tombstoned WireObjectState.
rg -n 'tombstone|WireObjectState\(' liveobjects/src/test liveobjects/src/main/test liveobjects/src -A4 -B4Repository: ably/ably-java
Length of output: 192
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- WireObjectState declaration ---'
rg -n 'data class WireObjectState|class WireObjectState|interface WireObjectState' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message -A30 -B8 --max-count 20
echo
echo '--- map / tombstone fields in message models ---'
rg -n 'val map:|val tombstone:|WireObjectsMap|WireMapCreate|WireMapEntry' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message -A8 -B4 --max-count 40
echo
echo '--- LiveMapManager.validate lines only ---'
sed -n '389,410p' liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.ktRepository: ably/ably-java
Length of output: 27091
Skip map semantics validation for tombstoned states. BaseRealtimeObject.applyObjectSync() calls validate() before the tombstone short-circuit, and WireObjectState.map is nullable, so a valid tombstoned payload without map will fail here. Gate the semantics check on !state.tombstone or state.map != null.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt`
around lines 389 - 397, The validate(state: WireObjectState) method in
LiveMapManager is validating map semantics even for tombstoned states, which can
fail when state.map is null. Update the semantics check in validate() to skip
tombstoned payloads by gating validateMapSemantics on !state.tombstone or
state.map != null, while keeping the existing objectId and createOp validation
intact.
Intent
The LiveObjects public interfaces (
lib/src/main/java/io/ably/lib/liveobjects) were already in place, but theliveobjectsKotlin module behind them was mostly TODO stubs —DefaultRealtimeObjectreturnedTODO("Not yet implemented")for its core entry points, and there was no engine to process object sync messages or maintain object state.This PR fills in that implementation: a working objects sync engine plus the
LiveCounter/LiveMapvalue types, following theRTO*/RTLC*/RTLM*spec points (referenced inline in the code as@spec/Spec:tags).What's included
Sync engine —
io.ably.lib.liveobjectsObjectsManagerOBJECT/OBJECT_SYNCmessages, buffers operations during a sync sequence, creates zero-value objectsObjectsPoolobjectId, with tombstone GC (grace period sourced fromconnectionDetails, default fallback 24h)ObjectsSyncTrackerObjectsStateSYNCING/SYNCEDeventsObjectIdinitialValue+nonceServerTimeObjectsOperationSourceLOCAL(on ACK) vsCHANNELoperation originDefaultRealtimeObjectRealtimeObjectentry point (was stubs)Value types —
io.ably.lib.liveobjects.valueBoth
LiveCounterandLiveMapfollow the same three-part split over a sharedBaseRealtimeLiveObject:Internal*— the value holder / state.*Manager— applies sync state and operations to the value.*ChangeCoordinator— notifies subscribers of updates.The concrete
DefaultLiveCounter/DefaultLiveMap(reflectively instantiated by the publicLiveCounter#create/LiveMap#createfactories) move intovalue/livecounter/andvalue/livemap/subpackages alongside their managers.Reflection / R8 consistency
A companion review of the runtime-reflection surface (
Class.forName, Gson@JsonAdapter) surfaced two issues, fixed here:LiveCounter.java/LiveMap.javaIMPLEMENTATION_CLASSconstants still pointed at the pre-refactorvalue.DefaultLive*path —Class.forNamewould have thrownClassNotFoundExceptionon every build. Updated to the newvalue.livecounter.*/value.livemap.*locations.android/proguard.txt: keep-rules migrated from the oldio.ably.lib.objects.*package toio.ably.lib.liveobjects.*, aligned with the real class locations and reflective constructor signatures. Added keeps forWireObjectDataJsonSerializerand the wholeWire*message model, which Gson (de)serializes by field-name reflection on the JSON transport path (the MessagePack path uses explicit keys and is unaffected). Mirrors the existingio.ably.lib.types.**rule.Verification
./gradlew checkWithCodenarc checkstyleMain checkstyleTest— passing../gradlew :liveobjects:compileKotlin— passing.consumerProguardFilesrules are only exercised by a downstream minified build, so they are not validated by this repo's CI; an end-to-end check would need a minified consumer round-tripping a LiveObjects JSON message.Notes for reviewers
feature/uts-liveobjects-unit-tests(notmain) — this stacks under the UTS unit-test work.settings.gradle.ktstweak are intentionally not part of this PR.🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes