diff --git a/android/proguard.txt b/android/proguard.txt index 71c100707..de4affb05 100644 --- a/android/proguard.txt +++ b/android/proguard.txt @@ -1,8 +1,28 @@ -keep public class io.ably.lib.transport.WebSocketTransport$Factory {*;} -keep class io.ably.lib.types.** {*;} --keep class io.ably.lib.objects.*LiveObjectsPlugin {*;} --keep class io.ably.lib.objects.serialization.*Serializer {*;} --keep class io.ably.lib.objects.ObjectsJsonSerializer {*;} + +# LiveObjects implementations are resolved at runtime via Class.forName(...) + +# getDeclaredConstructor(...).newInstance(...) against hard-coded class-name strings +# (see LiveObjectsPlugin.Factory, ObjectSerializer.Holder, LiveCounter#create and +# LiveMap#create). R8 must not rename or strip these classes or their reflectively +# invoked constructors. +-keep class io.ably.lib.liveobjects.DefaultLiveObjectsPlugin { (...); } +-keep class io.ably.lib.liveobjects.serialization.DefaultObjectsSerializer { (...); } +-keep class io.ably.lib.liveobjects.value.livecounter.DefaultLiveCounter { (...); } +-keep class io.ably.lib.liveobjects.value.livemap.DefaultLiveMap { (...); } + +# ObjectJsonSerializer and WireObjectDataJsonSerializer are instantiated reflectively +# by Gson via @JsonAdapter annotations (on ProtocolMessage and WireObjectData +# respectively); keep their no-arg constructors. +-keep class io.ably.lib.liveobjects.serialization.ObjectJsonSerializer { (...); } +-keep class io.ably.lib.liveobjects.serialization.WireObjectDataJsonSerializer { (...); } + +# The Wire* object model is (de)serialized to/from JSON by Gson via field-name +# reflection (DefaultObjectsSerializer -> JsonSerialization.gson.fromJson/toJsonTree). +# R8 must not rename or strip these fields or JSON transport silently breaks. Mirrors +# the io.ably.lib.types.** rule above. (The MessagePack path uses explicit string keys +# and is unaffected.) +-keep class io.ably.lib.liveobjects.message.** { *; } -keep class org.msgpack.core.** {*;} -keepclasseswithmembers class io.ably.lib.rest.Auth** {*;} diff --git a/lib/src/main/java/io/ably/lib/liveobjects/message/ObjectsMapEntry.java b/lib/src/main/java/io/ably/lib/liveobjects/message/ObjectsMapEntry.java index 09df91fbb..3252042d1 100644 --- a/lib/src/main/java/io/ably/lib/liveobjects/message/ObjectsMapEntry.java +++ b/lib/src/main/java/io/ably/lib/liveobjects/message/ObjectsMapEntry.java @@ -5,7 +5,7 @@ /** * Represents the value at a given key in a {@code LiveMap} object. * - *

Spec: ME1 + *

Spec: OME1 */ public interface ObjectsMapEntry { diff --git a/lib/src/main/java/io/ably/lib/liveobjects/value/LiveCounter.java b/lib/src/main/java/io/ably/lib/liveobjects/value/LiveCounter.java index 484c1be15..1581d2d78 100644 --- a/lib/src/main/java/io/ably/lib/liveobjects/value/LiveCounter.java +++ b/lib/src/main/java/io/ably/lib/liveobjects/value/LiveCounter.java @@ -23,7 +23,7 @@ */ public abstract class LiveCounter { - private static final String IMPLEMENTATION_CLASS = "io.ably.lib.liveobjects.value.DefaultLiveCounter"; + private static final String IMPLEMENTATION_CLASS = "io.ably.lib.liveobjects.value.livecounter.DefaultLiveCounter"; /** * Extended by the LiveObjects implementation; not intended for diff --git a/lib/src/main/java/io/ably/lib/liveobjects/value/LiveMap.java b/lib/src/main/java/io/ably/lib/liveobjects/value/LiveMap.java index 022622d8b..35fed8631 100644 --- a/lib/src/main/java/io/ably/lib/liveobjects/value/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/liveobjects/value/LiveMap.java @@ -26,7 +26,7 @@ */ public abstract class LiveMap { - private static final String IMPLEMENTATION_CLASS = "io.ably.lib.liveobjects.value.DefaultLiveMap"; + private static final String IMPLEMENTATION_CLASS = "io.ably.lib.liveobjects.value.livemap.DefaultLiveMap"; /** * Extended by the LiveObjects implementation; not intended for diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt new file mode 100644 index 000000000..dbdc103ed --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultLiveObjectsPlugin.kt @@ -0,0 +1,36 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.realtime.ChannelState +import io.ably.lib.types.ProtocolMessage +import java.util.concurrent.ConcurrentHashMap + +public class DefaultLiveObjectsPlugin(private val adapter: AblyClientAdapter) : LiveObjectsPlugin { + + private val objects = ConcurrentHashMap() + + override fun getInstance(channelName: String): RealtimeObject { + return objects.getOrPut(channelName) { DefaultRealtimeObject(channelName, adapter) } + } + + override fun handle(msg: ProtocolMessage) { + val channelName = msg.channel + objects[channelName]?.handle(msg) + } + + override fun handleStateChange(channelName: String, state: ChannelState, hasObjects: Boolean) { + objects[channelName]?.handleStateChange(state, hasObjects) + } + + override fun dispose(channelName: String) { + objects.remove(channelName) + ?.dispose(clientError("Channel has been released using channels.release()")) + } + + override fun dispose() { + objects.values.forEach { + it.dispose(clientError("AblyClient has been closed using client.close()")) + } + objects.clear() + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt index ff6dbb070..03e32332a 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt @@ -1,9 +1,32 @@ package io.ably.lib.liveobjects import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.liveobjects.message.WireCounterCreateWithObjectId +import io.ably.lib.liveobjects.message.WireMapCreateWithObjectId +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction import io.ably.lib.liveobjects.path.types.LiveMapPathObject +import io.ably.lib.liveobjects.serialization.gson import io.ably.lib.liveobjects.state.ObjectStateChange import io.ably.lib.liveobjects.state.ObjectStateEvent +import io.ably.lib.liveobjects.value.LiveCounter +import io.ably.lib.liveobjects.value.LiveMap +import io.ably.lib.liveobjects.value.LiveMapValue +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import io.ably.lib.realtime.ChannelState +import io.ably.lib.types.AblyException +import io.ably.lib.types.ProtocolMessage +import io.ably.lib.types.PublishResult +import io.ably.lib.util.Clock +import io.ably.lib.util.Log +import io.ably.lib.util.SystemClock +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.flow.MutableSharedFlow +import java.util.concurrent.CancellationException import java.util.concurrent.CompletableFuture /** @@ -20,22 +43,333 @@ internal class DefaultRealtimeObject( internal val adapter: AblyClientAdapter, ) : RealtimeObject { - override fun get(): CompletableFuture = TODO("Not yet implemented") + private val tag = "DefaultRealtimeObjects" + /** + * @spec RTO3 - Objects pool storing all objects by object ID + */ + internal val objectsPool = ObjectsPool(this) + + internal var state = ObjectsState.Initialized + + /** + * Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo. + * @spec RTO7b, RTO7b1 + */ + internal val appliedOnAckSerials = mutableSetOf() + + /** + * @spec RTO4 - Used for handling object messages and object sync messages + */ + private val objectsManager = ObjectsManager(this) + + /** + * Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues. + */ + private val sequentialScope = + CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob()) + + /** + * Event bus for handling incoming object messages sequentially. + * Processes messages inside [incomingObjectsHandler] job created using [sequentialScope]. + */ + private val objectsEventBus = MutableSharedFlow(extraBufferCapacity = UNLIMITED) + private val incomingObjectsHandler: Job + + init { + incomingObjectsHandler = initializeHandlerForIncomingObjectMessages() + } + + override fun get(): CompletableFuture { + throwIfInvalidAccessApiConfiguration() + TODO("Not yet implemented, this should call getRootAsync") + } override fun on(event: ObjectStateEvent, listener: ObjectStateChange.Listener): Subscription { - // TODO - subscribe logic goes here - return onceSubscription { - // TODO - remove ObjectStateChange.Listener + throwIfInvalidAccessApiConfiguration() + return objectsManager.on(event, listener) + } + + override fun off(listener: ObjectStateChange.Listener) { + throwIfInvalidAccessApiConfiguration() + objectsManager.off(listener) + } + + override fun offAll() { + throwIfInvalidAccessApiConfiguration() + objectsManager.offAll() + } + + private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) { + adapter.ensureAttached(channelName) + objectsManager.ensureSynced(state) + objectsPool.get(ROOT_OBJECT_ID) as LiveMap + } + + private suspend fun createMapAsync(entries: MutableMap): LiveMap { + throwIfInvalidWriteApiConfiguration() // RTO26 + + if (entries.keys.any { it.isEmpty() }) { // RTLMV4b + throw invalidInputError("Map keys should not be empty") + } + + // RTLMV4e - Create initial value operation + val initialMapValue = InternalLiveMap.initialValue(entries) + + // RTLMV4f - Create initial value JSON string + val initialValueJSONString = gson.toJson(initialMapValue) + + // RTO14 - Create object ID from initial value + val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Map, initialValueJSONString) + + // Create ObjectMessage with the operation + val msg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.MapCreate, + objectId = objectId, + mapCreateWithObjectId = WireMapCreateWithObjectId( + nonce = nonce, + initialValue = initialValueJSONString, + derivedFrom = initialMapValue, + ), + ) + ) + + // RTLMV3 - publish and apply locally on ACK + publishAndApply(arrayOf(msg)) + + // RTLMV3 - Return existing object if found after apply + return objectsPool.get(objectId) as? LiveMap + ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTLMV3 + } + + private suspend fun createCounterAsync(initialValue: Number): LiveCounter { + throwIfInvalidWriteApiConfiguration() // RTO26 + + // Validate input parameter + if (initialValue.toDouble().isNaN() || initialValue.toDouble().isInfinite()) { + throw invalidInputError("Counter value should be a valid number") + } + + // RTLCV4b + val initialCounterValue = InternalLiveCounter.initialValue(initialValue) + // RTLCV4c - Create initial value operation + val initialValueJSONString = gson.toJson(initialCounterValue) + + // RTO14 - Create object ID from initial value + val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Counter, initialValueJSONString) + + // Create ObjectMessage with the operation + val msg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.CounterCreate, + objectId = objectId, + counterCreateWithObjectId = WireCounterCreateWithObjectId( + nonce = nonce, + initialValue = initialValueJSONString, + derivedFrom = initialCounterValue, + ), + ) + ) + + // RTLCV3 - publish and apply locally on ACK + publishAndApply(arrayOf(msg)) + + // RTLCV3 - Return existing object if found after apply + return objectsPool.get(objectId) as? LiveCounter + ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTLCV3 + } + + /** + * Spec: RTO14 + */ + private suspend fun getObjectIdStringWithNonce(objectType: ObjectType, initialValue: String): Pair { + val nonce = generateNonce() + val msTimestamp = ServerTime.getCurrentTime(adapter) // RTO16 - Get server time for nonce generation + return Pair(ObjectId.fromInitialValue(objectType, initialValue, nonce, msTimestamp).toString(), nonce) + } + + /** + * Spec: RTO15 + */ + internal suspend fun publish(wireObjectMessages: Array): PublishResult { + // RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing + adapter.throwIfUnpublishableState(channelName) + adapter.ensureMessageSizeWithinLimit(wireObjectMessages) + // RTO15e - Must construct the ProtocolMessage as per RTO15e1, RTO15e2, RTO15e3 + val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName) + protocolMessage.state = wireObjectMessages + // RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure + return adapter.sendAsync(protocolMessage) // RTO15h + } + + /** + * Publishes the given object messages and, upon receiving the ACK, immediately applies them + * locally as synthetic inbound messages using the assigned serial and connection's siteCode. + * + * Spec: RTO20 + */ + internal suspend fun publishAndApply(wireObjectMessages: Array) { + // RTO20b - publish, propagate failure + val publishResult = publish(wireObjectMessages) + + // RTO20c - validate required info + val siteCode = adapter.connectionManager.siteCode + if (siteCode == null) { + Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed") + return + } + val serials = publishResult.serials + if (serials == null || serials.size != wireObjectMessages.size) { + Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed") + return + } + + // RTO20d - create synthetic inbound ObjectMessages + val syntheticMessages = mutableListOf() + wireObjectMessages.forEachIndexed { i, msg -> + val serial = serials[i] + if (serial == null) { + Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping") + return@forEachIndexed + } + syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3 + } + if (syntheticMessages.isEmpty()) return + + // RTO20e, RTO20f - dispatch to sequential scope for ordering + withContext(sequentialScope.coroutineContext) { + objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f) } } - override fun off(listener: ObjectStateChange.Listener): Unit = TODO("Not yet implemented") + /** + * Handles a ProtocolMessage containing proto action as `object` or `object_sync`. + * @spec RTL1 - Processes incoming object messages and object sync messages + */ + internal fun handle(protocolMessage: ProtocolMessage) { + // RTL15b - Set channel serial for OBJECT messages + adapter.setChannelSerial(channelName, protocolMessage) - override fun offAll(): Unit = TODO("Not yet implemented") + if (protocolMessage.state == null || protocolMessage.state.isEmpty()) { + Log.w(tag, "Received ProtocolMessage with null or empty objects, ignoring") + return + } + + objectsEventBus.tryEmit(protocolMessage) + } + + /** + * Initializes the handler for incoming object messages and object sync messages. + * Processes the messages sequentially to ensure thread safety and correct order of operations. + * + * @spec OM2 - Populates missing fields from parent protocol message + */ + private fun initializeHandlerForIncomingObjectMessages(): Job { + return sequentialScope.launch { + objectsEventBus.collect { protocolMessage -> + // OM2 - Populate missing fields from parent + val objects = protocolMessage.state.filterIsInstance() + .mapIndexed { index, objMsg -> + objMsg.copy( + connectionId = objMsg.connectionId ?: protocolMessage.connectionId, // OM2c + timestamp = objMsg.timestamp ?: protocolMessage.timestamp, // OM2e + id = objMsg.id ?: (protocolMessage.id + ':' + index) // OM2a + ) + } + + try { + when (protocolMessage.action) { + ProtocolMessage.Action.`object` -> objectsManager.handleObjectMessages(objects) + ProtocolMessage.Action.object_sync -> objectsManager.handleObjectSyncMessages( + objects, + protocolMessage.channelSerial + ) + else -> Log.w(tag, "Ignoring protocol message with unhandled action: ${protocolMessage.action}") + } + } catch (exception: Exception) { + // Skip current message if an error occurs, don't rethrow to avoid crashing the collector + Log.e(tag, "Error handling objects message with protocolMsg id ${protocolMessage.id}", exception) + } + } + } + } + + internal fun handleStateChange(state: ChannelState, hasObjects: Boolean) { + sequentialScope.launch { + when (state) { + ChannelState.attached -> { + Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects") + + objectsManager.clearBufferedObjectOperations() // RTO4d - clear unconditionally on ATTACHED + + // RTO4a + val fromInitializedState = this@DefaultRealtimeObject.state == ObjectsState.Initialized + if (hasObjects || fromInitializedState) { + // should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value. + // this guarantees we emit both "syncing" -> "synced" events in that order. + objectsManager.startNewSync(null) + } + + // RTO4b + if (!hasObjects) { + // if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel. + // reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes. + objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2 + objectsManager.clearSyncObjectsPool() // RTO4b3 + // RTO4b5 removed — buffer already cleared by RTO4d above + // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. + // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. + objectsManager.endSync() // RTO4b4 + } + } + ChannelState.detached, + ChannelState.suspended, + ChannelState.failed -> { + val errorReason = try { + adapter.getChannel(channelName).reason + } catch (e: Exception) { + null + } + val error = ablyException( + "publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync", + ObjectErrorCode.PublishAndApplyFailedDueToChannelState, + ObjectHttpStatusCode.BadRequest, + cause = errorReason?.let { AblyException.fromErrorInfo(it) } + ) + objectsManager.failBufferedAcks(error) // RTO20e1 + if (state != ChannelState.suspended) { + // do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states + objectsPool.clearObjectsData(false) + objectsManager.clearSyncObjectsPool() + } + } + else -> { + // No action needed for other states + } + } + } + } + + // Dispose of any resources associated with this RealtimeObjects instance + fun dispose(cause: AblyException) { + val disposeReason = CancellationException().apply { initCause(cause) } + incomingObjectsHandler.cancel(disposeReason) // objectsEventBus automatically garbage collected when collector is cancelled + objectsPool.dispose() + objectsManager.dispose() + // Don't cancel sequentialScope (needed in getRoot method), just cancel ongoing coroutines + sequentialScope.coroutineContext.cancelChildren(disposeReason) + } /** Validates the channel is configured for access (read/subscribe) operations. Spec: RTO25 */ internal fun throwIfInvalidAccessApiConfiguration() = adapter.throwIfInvalidAccessApiConfiguration(channelName) /** Validates the channel is configured for write (mutation) operations. Spec: RTO26 */ internal fun throwIfInvalidWriteApiConfiguration() = adapter.throwIfInvalidWriteApiConfiguration(channelName) + + /** + * Provides the default Clock instance for the DefaultRealtimeObject. + * This Clock is derived from the system clock, utilizing the client options + * from the adapter configuration. + */ + internal val clock: Clock get() = SystemClock.clockFrom(adapter.clientOptions) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Errors.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Errors.kt index 98bd89691..e1931d72a 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Errors.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Errors.kt @@ -16,7 +16,7 @@ internal enum class ObjectErrorCode(val code: Int) { InvalidInputParams(40_003), MapValueDataTypeUnsupported(40_013), PathNotResolved(92_005), // RTPO3c2 - write operation on a path that does not resolve - ObjectsTypeMismatch(92_007), // RTTS5d2/RTTS9d2 - operation on a cast wrapper with mismatched resolved type + ObjectsTypeMismatch(92_007), // RTTS5d2/RTTS9d - operation on a cast wrapper with mismatched resolved type // Channel mode and state validation error codes ChannelModeRequired(40_024), ChannelStateError(90_001), @@ -46,7 +46,7 @@ internal fun invalidInputError(message: String) = internal fun pathNotResolvedError(path: String) = objectException("Path could not be resolved: \"$path\"", ObjectErrorCode.PathNotResolved) -/** ErrorInfo 400 / 92007 - resolved/wrapped type does not match the typed wrapper (RTTS5d2/RTTS9d2). */ +/** ErrorInfo 400 / 92007 - resolved/wrapped type does not match the typed wrapper (RTTS5d2/RTTS9d). */ internal fun typeMismatchError(message: String) = objectException(message, ObjectErrorCode.ObjectsTypeMismatch) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectId.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectId.kt new file mode 100644 index 000000000..66ff9798a --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectId.kt @@ -0,0 +1,79 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.value.ObjectType +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.Base64 + +internal class ObjectId private constructor( + internal val type: ObjectType, + private val hash: String, + private val timestampMs: Long +) { + /** + * Converts ObjectId to string representation. + * Spec: RTO6b1 + */ + override fun toString(): String { + return "${type.value}:$hash@$timestampMs" + } + + companion object { + + /** + * Spec: RTO14 + */ + internal fun fromInitialValue(objectType: ObjectType, initialValue: String, nonce: String, msTimeStamp: Long): ObjectId { + val valueForHash = "$initialValue:$nonce".toByteArray(StandardCharsets.UTF_8) + // RTO14b - Hash the initial value and nonce to create a unique identifier + val hashBytes = MessageDigest.getInstance("SHA-256").digest(valueForHash) + val urlSafeHash = Base64.getUrlEncoder().withoutPadding().encodeToString(hashBytes) + + return ObjectId(objectType, urlSafeHash, msTimeStamp) + } + + /** + * Creates ObjectId instance from hashed object id string. + */ + internal fun fromString(objectId: String): ObjectId { + if (objectId.isEmpty()) { + throw objectError("Invalid object id: $objectId") + } + + // Parse format: type:hash@msTimestamp + val parts = objectId.split(':') + if (parts.size != 2) { + throw objectError("Invalid object id: $objectId") + } + + val (typeStr, rest) = parts + + val type = when (typeStr) { + "map" -> ObjectType.Map + "counter" -> ObjectType.Counter + else -> throw objectError("Invalid object type in object id: $objectId") + } + + val hashAndTimestamp = rest.split('@') + if (hashAndTimestamp.size != 2) { + throw objectError("Invalid object id: $objectId") + } + + val hash = hashAndTimestamp[0] + + if (hash.isEmpty()) { + throw objectError("Invalid object id: $objectId") + } + + val msTimestampStr = hashAndTimestamp[1] + + val msTimestamp = try { + msTimestampStr.toLong() + } catch (e: NumberFormatException) { + throw objectError("Invalid object id: $objectId", e) + } + + return ObjectId(type, hash, msTimestamp) + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt new file mode 100644 index 000000000..a2aaaaeb7 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt @@ -0,0 +1,333 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.message.WireObjectsMap +import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import io.ably.lib.types.AblyException +import io.ably.lib.util.Log +import kotlinx.coroutines.CompletableDeferred + +/** + * @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences + * @spec RTO6 - Creates zero-value objects when needed + */ +internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject): ObjectsStateCoordinator() { + private val tag = "ObjectsManager" + /** + * @spec RTO5 - Sync objects pool for collecting sync messages + */ + private val syncObjectsPool = mutableMapOf() + private var currentSyncId: String? = null + /** + * @spec RTO7 - Buffered object operations during sync + */ + private val bufferedObjectOperations = mutableListOf() // RTO7a + private var syncCompletionWaiter: CompletableDeferred? = null + + /** + * Handles object messages (non-sync messages). + * + * @spec RTO8 - Buffers messages if not synced, applies immediately if synced + */ + internal fun handleObjectMessages(wireObjectMessages: List) { + if (realtimeObjects.state != ObjectsState.Synced) { + // RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence. + // Some of the incoming object messages may have already been applied to the objects described in + // the sync sequence, but others may not; therefore we must buffer these messages so that we can apply + // them to the objects once the sync is complete. + Log.v(tag, "Buffering ${wireObjectMessages.size} object messages, state: ${realtimeObjects.state}") + bufferedObjectOperations.addAll(wireObjectMessages) // RTO8a + return + } + + // Apply messages immediately if synced + applyObjectMessages(wireObjectMessages, ObjectsOperationSource.CHANNEL) // RTO8b + } + + /** + * Handles object sync messages. + * + * @spec RTO5 - Parses sync channel serial and manages sync sequences + */ + internal fun handleObjectSyncMessages(wireObjectMessages: List, syncChannelSerial: String?) { + val syncTracker = ObjectsSyncTracker(syncChannelSerial) + val isNewSync = syncTracker.hasSyncStarted(currentSyncId) + if (isNewSync) { + // RTO5a2 - new sync sequence started + startNewSync(syncTracker.syncId) + } + + // RTO5a3 - continue current sync sequence + applyObjectSyncMessages(wireObjectMessages) // RTO5f + + // RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync + if (syncTracker.hasSyncEnded()) { + // defer the state change event until the next tick if this was a new sync sequence + // to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. + endSync() + } + } + + /** + * Starts a new sync sequence. + * + * @spec RTO5 - Sync sequence initialization + */ + internal fun startNewSync(syncId: String?) { + Log.v(tag, "Starting new sync sequence: syncId=$syncId") + + syncObjectsPool.clear() // RTO5a2a + currentSyncId = syncId + syncCompletionWaiter = CompletableDeferred() + stateChange(ObjectsState.Syncing) + } + + /** + * Ends the current sync sequence. + * + * @spec RTO5c - Applies sync data and buffered operations + */ + internal fun endSync() { + Log.v(tag, "Ending sync sequence") + applySync() // RTO5c1/2/7 + applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6 + bufferedObjectOperations.clear() // RTO5c5 + syncObjectsPool.clear() // RTO5c4 + currentSyncId = null // RTO5c3 + realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 + stateChange(ObjectsState.Synced) // RTO5c8 + syncCompletionWaiter?.complete(Unit) + syncCompletionWaiter = null + } + + /** + * Called from publishAndApply (via withContext sequentialScope). + * If SYNCED: apply immediately with LOCAL source. + * If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply. + */ + internal suspend fun applyAckResult(messages: List) { + if (realtimeObjects.state != ObjectsState.Synced) { + if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred() + syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1) + } + applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f + } + + /** + * Fails all pending apply waiters. + * Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1). + */ + internal fun failBufferedAcks(error: AblyException) { + syncCompletionWaiter?.completeExceptionally(error) + syncCompletionWaiter = null + } + + /** + * Clears the sync objects pool. + * Used by DefaultRealtimeObjects.handleStateChange. + */ + internal fun clearSyncObjectsPool() { + syncObjectsPool.clear() + } + + /** + * Clears the buffered object operations. + * Used by DefaultRealtimeObjects.handleStateChange. + */ + internal fun clearBufferedObjectOperations() { + bufferedObjectOperations.clear() + } + + /** + * Applies sync data to objects pool. + * + * @spec RTO5c - Processes sync data and updates objects pool + */ + private fun applySync() { + if (syncObjectsPool.isEmpty()) { + return + } + + val receivedObjectIds = mutableSetOf() + // RTO5c1a2 - List to collect updates for existing objects + val existingObjectUpdates = mutableListOf>() + + // RTO5c1 + for ((objectId, objectMessage) in syncObjectsPool) { + val wireObjectState = objectMessage.objectState as WireObjectState // we have non-null objectState here due to RTO5f + receivedObjectIds.add(objectId) + val existingObject = realtimeObjects.objectsPool.get(objectId) + + // RTO5c1a + if (existingObject != null) { + // Update existing object + val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1 + existingObjectUpdates.add(Pair(existingObject, update)) + } else { // RTO5c1b + // RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool + val newObject = createObjectFromState(wireObjectState) ?: continue // RTO5c1b1c - skip unsupported + newObject.applyObjectSync(objectMessage) + realtimeObjects.objectsPool.set(objectId, newObject) + } + } + + // RTO5c2 - need to remove realtimeObject instances from the ObjectsPool for which objectIds were not received during the sync sequence + realtimeObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds) + + // RTO5c7 - call subscription callbacks for all updated existing objects + existingObjectUpdates.forEach { (obj, update) -> + obj.notifyUpdated(update) + } + } + + /** + * Applies object messages to objects. + * + * @spec RTO9 - Creates zero-value objects if they don't exist + */ + private fun applyObjectMessages( + wireObjectMessages: List, + source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL, + ) { + // RTO9a + for (objectMessage in wireObjectMessages) { + if (objectMessage.operation == null) { + // RTO9a1 + Log.w(tag, "Object message received without operation field, skipping message: ${objectMessage.id}") + continue + } + + val wireObjectOperation: WireObjectOperation = objectMessage.operation // RTO9a2 + if (wireObjectOperation.action == WireObjectOperationAction.Unknown) { + // RTO9a2b - object operation action is unknown, skip the message + Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}") + continue + } + + // RTO9a3 - skip operations already applied on ACK (discard without taking any further action). + // This check comes before zero-value object creation (RTO9a2a1) so that no zero-value object is + // created for an objectId not yet in the pool when the echo is being discarded. + // Note: siteTimeserials is NOT updated here intentionally — updating it to the echo's serial would + // incorrectly reject older-but-unprocessed operations from the same site that arrive after the echo. + if (objectMessage.serial != null && + realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) { + Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo") + realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial) + continue // discard without taking any further action + } + + // RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to be able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. + val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(wireObjectOperation.objectId) // RTO9a2a1 + val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3 + if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) { + realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4 + } + } + } + + /** + * Applies sync messages to sync data pool, merging partial sync messages for the same objectId. + * + * @spec RTO5f - Collects and merges object states during sync sequence + */ + private fun applyObjectSyncMessages(wireObjectMessages: List) { + for (objectMessage in wireObjectMessages) { + if (objectMessage.objectState == null) { + Log.w(tag, "Object message received during OBJECT_SYNC without object field, skipping message: ${objectMessage.id}") + continue + } + + val wireObjectState: WireObjectState = objectMessage.objectState + val objectId = wireObjectState.objectId + val existingEntry = syncObjectsPool[objectId] + + if (existingEntry == null) { + // RTO5f1 - objectId not in pool, store directly + if (wireObjectState.counter != null || wireObjectState.map != null) { + syncObjectsPool[objectId] = objectMessage + } else { + // RTO5c1b1c - object state must contain either counter or map data + Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}") + } + continue + } + + // RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type + when { + wireObjectState.map != null -> { + // RTO5f2a - map object: merge entries + if (wireObjectState.tombstone) { + // RTO5f2a1 - tombstone: replace pool entry entirely + syncObjectsPool[objectId] = objectMessage + } else { + // RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials + val existingState = existingEntry.objectState!! // non-null for existing entry + val mergedEntries = existingState.map?.entries.orEmpty() + wireObjectState.map.entries.orEmpty() + val mergedMap = (existingState.map ?: WireObjectsMap()).copy(entries = mergedEntries) + val mergedState = existingState.copy(map = mergedMap) + syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState) + } + } + wireObjectState.counter != null -> { + // RTO5f2b - counter objects must never be split across messages + Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}") + } + else -> { + // RTO5f3 - unsupported type, log warning and skip + Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}") + } + } + } + } + + /** + * Creates an object from object state. + * + * @spec RTO5c1b - Creates objects from object state based on type + */ + private fun createObjectFromState(wireObjectState: WireObjectState): BaseRealtimeObject? { + return when { + wireObjectState.counter != null -> InternalLiveCounter.zeroValue(wireObjectState.objectId, realtimeObjects) // RTO5c1b1a + wireObjectState.map != null -> InternalLiveMap.zeroValue(wireObjectState.objectId, realtimeObjects) // RTO5c1b1b + else -> { + // RTO5c1b1c - unsupported object type, skip gracefully + Log.w(tag, "Received unsupported object state during OBJECT_SYNC (no counter or map), skipping objectId: ${wireObjectState.objectId}") + null + } + } + } + + /** + * Changes the state and emits events. + * + * @spec RTO2 - Emits state change events for syncing and synced states + */ + private fun stateChange(newState: ObjectsState) { + if (realtimeObjects.state == newState) { + return + } + Log.v(tag, "Objects state changed to: $newState from ${realtimeObjects.state}") + realtimeObjects.state = newState + + // deferEvent not needed since objectsStateChanged processes events in a sequential coroutine scope + objectsStateChanged(newState) + } + + internal fun dispose() { + syncCompletionWaiter?.cancel() + syncObjectsPool.clear() + bufferedObjectOperations.clear() + disposeObjectsStateListeners() + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsOperationSource.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsOperationSource.kt new file mode 100644 index 000000000..ac988fb96 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsOperationSource.kt @@ -0,0 +1,7 @@ +package io.ably.lib.liveobjects + +/** @spec RTO22 */ +internal enum class ObjectsOperationSource { + LOCAL, // RTO22a - applied upon receipt of ACK + CHANNEL // RTO22b - received over a Realtime channel +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt new file mode 100644 index 000000000..afb460d13 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt @@ -0,0 +1,172 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import io.ably.lib.util.Log +import kotlinx.coroutines.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Constants for ObjectsPool configuration + */ +internal object ObjectsPoolDefaults { + const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes + /** + * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` + * object of the `CONNECTED` event. + * If the server does not provide this value, the SDK will fall back to this default value. + * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation + * with an earlier serial that would not have been applied if the tombstone still existed. + * + * Applies both for map entries tombstones and object tombstones. + */ + const val GC_GRACE_PERIOD_MS = 1000L * 60 * 60 * 24 // 24 hours +} + +/** + * Root object ID constant + */ +internal const val ROOT_OBJECT_ID = "root" + +/** + * ObjectsPool manages a pool of objects for a channel. + * + * @spec RTO3 - Maintains an objects pool for all objects on the channel + */ +internal class ObjectsPool( + private val realtimeObjects: DefaultRealtimeObject +) { + private val tag = "ObjectsPool" + + /** + * ConcurrentHashMap for thread-safe access from public APIs in LiveMap and LiveCounter. + * @spec RTO3a - Pool storing all ably objects by object ID + */ + private val pool = ConcurrentHashMap() + + /** + * Coroutine scope for garbage collection + */ + private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private var gcJob: Job // Job for the garbage collection coroutine + + @Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + private var gcPeriodSubscription: Subscription + + init { + // RTO3b - Initialize pool with root object + pool[ROOT_OBJECT_ID] = InternalLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) + // Start garbage collection coroutine with server-provided grace period if available + gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period -> + period?.let { + gcGracePeriod = it + Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") + } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms") + } + gcJob = startGCJob() + } + + /** + * Gets an object from the pool by object ID. + */ + internal fun get(objectId: String): BaseRealtimeObject? { + return pool[objectId] + } + + /** + * Sets a realtime object in the pool. + */ + internal fun set(objectId: String, realtimeObject: BaseRealtimeObject) { + pool[objectId] = realtimeObject + } + + /** + * Removes all objects but root from the pool and clears the data for root. + * Does not create a new root object, so the reference to the root object remains the same. + */ + internal fun resetToInitialPool(emitUpdateEvents: Boolean) { + pool.entries.removeIf { (key, _) -> key != ROOT_OBJECT_ID } // only keep the root object + clearObjectsData(emitUpdateEvents) // RTO4b2a - clear the root object and emit update events + } + + + /** + * Deletes objects from the pool for which object ids are not found in the provided array of ids. + * Spec: RTO5c2 + */ + internal fun deleteExtraObjectIds(objectIds: MutableSet) { + pool.entries.removeIf { (key, _) -> key !in objectIds && key != ROOT_OBJECT_ID } // RTO5c2a - Keep root object + } + + /** + * Clears the data stored for all objects in the pool. + */ + internal fun clearObjectsData(emitUpdateEvents: Boolean) { + for (obj in pool.values) { + val update = obj.clearData() + if (emitUpdateEvents) obj.notifyUpdated(update) + } + } + + /** + * Creates a zero-value object if it doesn't exist in the pool. + * + * @spec RTO6 - Creates zero-value objects when needed + */ + internal fun createZeroValueObjectIfNotExists(objectId: String): BaseRealtimeObject { + val existingObject = get(objectId) + if (existingObject != null) { + return existingObject // RTO6a + } + + val parsedObjectId = ObjectId.fromString(objectId) // RTO6b + return when (parsedObjectId.type) { + ObjectType.Map -> InternalLiveMap.zeroValue(objectId, realtimeObjects) // RTO6b2 + ObjectType.Counter -> InternalLiveCounter.zeroValue(objectId, realtimeObjects) // RTO6b3 + }.apply { + set(objectId, this) // RTO6b2, RTO6b3 - Add the zero-value object to the pool + } + } + + /** + * Garbage collection interval handler. + */ + private fun onGCInterval() { + pool.entries.removeIf { (_, obj) -> + if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool + else { + obj.onGCInterval(gcGracePeriod) + false // Keep in pool + } + } + } + + /** + * Starts the garbage collection coroutine. + */ + private fun startGCJob() : Job { + return gcScope.launch { + while (isActive) { + try { + onGCInterval() + } catch (e: Exception) { + Log.e(tag, "Error during garbage collection", e) + } + delay(ObjectsPoolDefaults.GC_INTERVAL_MS) + } + } + } + + /** + * Disposes of the ObjectsPool, cleaning up resources. + * Should be called when the pool is no longer needed. + */ + fun dispose() { + gcPeriodSubscription.unsubscribe() + gcJob.cancel() + gcScope.cancel() + pool.clear() + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsState.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsState.kt new file mode 100644 index 000000000..d20eb76ed --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsState.kt @@ -0,0 +1,108 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.state.ObjectStateChange +import io.ably.lib.liveobjects.state.ObjectStateEvent +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log +import kotlinx.coroutines.* + +/** + * @spec RTO2 - enum representing objects state + */ +internal enum class ObjectsState { + Initialized, + Syncing, + Synced +} + +/** + * Maps internal ObjectsState values to their corresponding public ObjectsStateEvent values. + * Used to determine which events should be emitted when state changes occur. + * INITIALIZED maps to null (no event), while SYNCING and SYNCED map to their respective events. + */ +private val objectsStateToEventMap = mapOf( + ObjectsState.Initialized to null, + ObjectsState.Syncing to ObjectStateEvent.SYNCING, + ObjectsState.Synced to ObjectStateEvent.SYNCED +) + +/** + * An interface for managing and communicating changes in the synchronization state of objects. + * + * Implementations should ensure thread-safe event emission and proper synchronization + * between state change notifications. + */ +internal interface HandlesObjectsStateChange { + /** + * Handles changes in the state of objects by notifying all registered listeners. + * Implementations should ensure thread-safe event emission to both internal and public listeners. + * Makes sure every event is processed in the order they were received. + * @param newState The new state of the objects, SYNCING or SYNCED. + */ + fun objectsStateChanged(newState: ObjectsState) + + /** + * Suspends the current coroutine until objects are synchronized. + * Returns immediately if state is already SYNCED, otherwise waits for the SYNCED event. + * + * @param currentState The current state of objects to determine if waiting is necessary + */ + suspend fun ensureSynced(currentState: ObjectsState) + + /** + * Disposes all registered state change listeners and cancels any pending operations. + * Should be called when the associated RealtimeObjects instance is no longer needed. + */ + fun disposeObjectsStateListeners() +} + + +internal abstract class ObjectsStateCoordinator : ObjectStateChange, HandlesObjectsStateChange { + private val tag = "ObjectsStateCoordinator" + private val internalObjectStateEmitter = ObjectsStateEmitter() + // related to RTC10, should have a separate EventEmitter for users of the library + private val externalObjectStateEmitter = ObjectsStateEmitter() + + override fun on(event: ObjectStateEvent, listener: ObjectStateChange.Listener): Subscription { + externalObjectStateEmitter.on(event, listener) + return onceSubscription { + externalObjectStateEmitter.off(event, listener) + } + } + + override fun off(listener: ObjectStateChange.Listener) = externalObjectStateEmitter.off(listener) + + override fun offAll() = externalObjectStateEmitter.off() + + override fun objectsStateChanged(newState: ObjectsState) { + objectsStateToEventMap[newState]?.let { objectsStateEvent -> + internalObjectStateEmitter.emit(objectsStateEvent) + externalObjectStateEmitter.emit(objectsStateEvent) + } + } + + override suspend fun ensureSynced(currentState: ObjectsState) { + if (currentState != ObjectsState.Synced) { + val deferred = CompletableDeferred() + internalObjectStateEmitter.once(ObjectStateEvent.SYNCED) { + Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced") + deferred.complete(Unit) + } + deferred.await() + } + } + + override fun disposeObjectsStateListeners() = offAll() +} + +private class ObjectsStateEmitter : EventEmitter() { + private val tag = "ObjectsStateEmitter" + override fun apply(listener: ObjectStateChange.Listener?, event: ObjectStateEvent?, vararg args: Any?) { + try { + event?.let { listener?.onStateChanged(it) } + ?: Log.w(tag, "Null event passed to ObjectsStateChange Listener callback") + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsSyncTracker.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsSyncTracker.kt new file mode 100644 index 000000000..14ec73586 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsSyncTracker.kt @@ -0,0 +1,63 @@ +package io.ably.lib.liveobjects + +/** + * @spec RTO5 - SyncTracker class for tracking objects sync status + */ +internal class ObjectsSyncTracker(syncChannelSerial: String?) { + private val syncSerial: String? = syncChannelSerial + internal val syncId: String? + internal val syncCursor: String? + + init { + val parsed = parseSyncChannelSerial(syncChannelSerial) + syncId = parsed.first + syncCursor = parsed.second + } + + /** + * Checks if a new sync sequence has started. + * + * @param prevSyncId The previously stored sync ID + * @return true if a new sync sequence has started, false otherwise + * + * Spec: RTO5a5, RTO5a2 + */ + internal fun hasSyncStarted(prevSyncId: String?): Boolean { + return syncSerial.isNullOrEmpty() || prevSyncId != syncId + } + + /** + * Checks if the current sync sequence has ended. + * + * @return true if the sync sequence has ended, false otherwise + * + * Spec: RTO5a5, RTO5a4 + */ + internal fun hasSyncEnded(): Boolean { + return syncSerial.isNullOrEmpty() || syncCursor.isNullOrEmpty() + } + + companion object { + /** + * Parses sync channel serial to extract syncId and syncCursor. + * + * @param syncChannelSerial The sync channel serial to parse + * @return Pair of syncId and syncCursor, both null if parsing fails + */ + private fun parseSyncChannelSerial(syncChannelSerial: String?): Pair { + if (syncChannelSerial.isNullOrEmpty()) { + return Pair(null, null) + } + + // RTO5a1 - syncChannelSerial is a two-part identifier: : + val match = Regex("^([\\w-]+):(.*)$").find(syncChannelSerial) + return if (match != null) { + val syncId = match.groupValues[1] + val syncCursor = match.groupValues[2] + Pair(syncId, syncCursor) + } else { + Pair(null, null) + } + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ServerTime.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ServerTime.kt new file mode 100644 index 000000000..2fbeb1a28 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ServerTime.kt @@ -0,0 +1,38 @@ +package io.ably.lib.liveobjects + +import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.types.AblyException +import io.ably.lib.util.SystemClock +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import kotlin.concurrent.Volatile + +/** + * ServerTime is a utility object that provides the current server time + * Spec: RTO16 + */ +internal object ServerTime { + @Volatile + private var serverTimeOffset: Long? = null + private val mutex = Mutex() + + /** + * Spec: RTO16a + */ + @Throws(AblyException::class) + internal suspend fun getCurrentTime(adapter: AblyClientAdapter): Long { + val clock = SystemClock.clockFrom(adapter.clientOptions) + if (serverTimeOffset == null) { + mutex.withLock { + if (serverTimeOffset == null) { // Double-checked locking to ensure thread safety + val serverTime: Long = withContext(Dispatchers.IO) { adapter.time } + serverTimeOffset = serverTime - clock.currentTimeMillis() + return serverTime + } + } + } + return clock.currentTimeMillis() + serverTimeOffset!! + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message/WireObjectMessage.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message/WireObjectMessage.kt index e29b73533..dcdd23eed 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message/WireObjectMessage.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/message/WireObjectMessage.kt @@ -288,3 +288,12 @@ private fun WireObjectData.size(): Int { json?.let { return it.toString().byteSize } // Spec: OD3e return 0 } + +internal fun WireObjectData?.isInvalid(): Boolean { + return this?.objectId.isNullOrEmpty() && + this?.string == null && + this?.number == null && + this?.boolean == null && + this?.bytes == null && + this?.json == null +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt new file mode 100644 index 000000000..bb1847308 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt @@ -0,0 +1,233 @@ +package io.ably.lib.liveobjects.value + +import io.ably.lib.liveobjects.ObjectsOperationSource +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.objectError +import io.ably.lib.liveobjects.value.livecounter.noOpCounterUpdate +import io.ably.lib.liveobjects.value.livemap.noOpMapUpdate +import io.ably.lib.util.Clock +import io.ably.lib.util.Log +import io.ably.lib.util.SystemClock + +internal enum class ObjectType(val value: String) { + Map("map"), + Counter("counter") +} + +// Spec: RTLO4b4b +// TODO - Check what to do about `ObjectUpdate` field, whether we really need to keep it or not +internal data class ObjectUpdate(val data: Any?) + +internal val ObjectUpdate.noOp get() = this.data == null + +/** + * Provides common functionality and base implementation for LiveMap and LiveCounter. + * + * @spec RTLO1/RTLO2 - Base class for LiveMap/LiveCounter object + * + * This should also be included in logging + */ +internal abstract class BaseRealtimeObject( + internal val objectId: String, // // RTLO3a + internal val objectType: ObjectType, + internal val clock: Clock = SystemClock.INSTANCE, +) { + + protected open val tag = "BaseRealtimeObject" + + internal val siteTimeserials = mutableMapOf() // RTLO3b + + internal var createOperationIsMerged = false // RTLO3c + + @Volatile + internal var isTombstoned = false // Accessed from public API for LiveMap/LiveCounter + + private var tombstonedAt: Long? = null + + /** + * This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object_sync` + * @return an update describing the changes + * + * @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter + */ + internal fun applyObjectSync(wireObjectMessage: WireObjectMessage): ObjectUpdate { + val wireObjectState = wireObjectMessage.objectState as WireObjectState // we have non-null objectState here due to RTO5f + validate(wireObjectState) + // object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation. + // should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object. + siteTimeserials.clear() + siteTimeserials.putAll(wireObjectState.siteTimeserials) // RTLC6a, RTLM6a + + if (isTombstoned) { + // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing + if (objectType == ObjectType.Map) { + return noOpMapUpdate + } + return noOpCounterUpdate + } + return applyObjectState(wireObjectState, wireObjectMessage) // RTLM6, RTLC6 + } + + /** + * This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object` + * @return true if the operation was meaningfully applied, false otherwise + * + * @spec RTLM15/RTLC7 - Applies ObjectMessage with object data operations to LiveMap/LiveCounter + */ + internal fun applyObject(wireObjectMessage: WireObjectMessage, source: ObjectsOperationSource): Boolean { + validateObjectId(wireObjectMessage.operation?.objectId) + + val msgTimeSerial = wireObjectMessage.serial + val msgSiteCode = wireObjectMessage.siteCode + val wireObjectOperation = wireObjectMessage.operation as WireObjectOperation + + if (!canApplyOperation(msgSiteCode, msgTimeSerial)) { + // RTLC7b, RTLM15b + Log.v( + tag, + "Skipping ${wireObjectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " + + "objectId=$objectId" + ) + return false // RTLC7b / RTLM15b + } + // RTLC7c / RTLM15c - only update siteTimeserials for CHANNEL source + if (source == ObjectsOperationSource.CHANNEL) { + siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c + } + + if (isTombstoned) { + // this object is tombstoned so the operation cannot be applied + return false // RTLC7e / RTLM15e + } + return applyObjectOperation(wireObjectOperation, wireObjectMessage) // RTLC7d + } + + /** + * Checks if an operation can be applied based on serial comparison. + * + * @spec RTLO4a - Serial comparison logic for LiveMap/LiveCounter operations + */ + internal fun canApplyOperation(siteCode: String?, timeSerial: String?): Boolean { + if (timeSerial.isNullOrEmpty()) { + throw objectError("Invalid serial: $timeSerial") // RTLO4a3 + } + if (siteCode.isNullOrEmpty()) { + throw objectError("Invalid site code: $siteCode") // RTLO4a3 + } + val existingSiteSerial = siteTimeserials[siteCode] // RTLO4a4 + return existingSiteSerial == null || timeSerial > existingSiteSerial // RTLO4a5, RTLO4a6 + } + + internal fun validateObjectId(objectId: String?) { + if (this.objectId != objectId) { + throw objectError("Invalid object: incoming objectId=$objectId; $objectType objectId=${this.objectId}") + } + } + + /** + * Marks the object as tombstoned. + */ + internal fun tombstone(serialTimestamp: Long?): ObjectUpdate { + if (serialTimestamp == null) { + Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead") + } + isTombstoned = true + tombstonedAt = serialTimestamp?: clock.currentTimeMillis() + val update = clearData() + // TODO - Emit object lifecycle event for deletion + return update + } + + /** + * Checks if the object is eligible for garbage collection. + * + * An object is eligible for garbage collection if it has been tombstoned and + * the time since tombstoning exceeds the specified grace period. + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned objects + * should be kept before being eligible for collection. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * @return true if the object is tombstoned and the grace period has elapsed, + * false otherwise + */ + internal fun isEligibleForGc(gcGracePeriod: Long): Boolean { + val currentTime = clock.currentTimeMillis() + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true + } + + /** + * Validates that the provided object state is compatible with this object. + * Checks object ID, type-specific validations, and any included create operations. + */ + abstract fun validate(state: WireObjectState) + + /** + * Applies an object state received during synchronization to this object. + * This method should update the internal data structure with the complete state + * received from the server. + * + * @param wireObjectState The complete state to apply to this object + * @return A map describing the changes made to the object's data + * + */ + abstract fun applyObjectState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate + + /** + * Applies an operation to this object. + * This method handles the specific operation actions (e.g., update, remove) + * by modifying the underlying data structure accordingly. + * + * @param operation The operation containing the action and data to apply + * @param message The complete object message containing the operation + * @return true if the operation was meaningfully applied, false otherwise + * + */ + abstract fun applyObjectOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean + + /** + * Clears the object's data and returns an update describing the changes. + * This is called during tombstoning and explicit clear operations. + * + * This method: + * 1. Calculates a diff between the current state and an empty state + * 2. Clears all entries from the underlying data structure + * 3. Returns a map containing metadata about what was cleared + * + * The returned map is used to notifying other components about what entries were removed. + * + * @return A map representing the diff of changes made + */ + abstract fun clearData(): ObjectUpdate + + /** + * Notifies subscribers about changes made to this object. Propagates updates through the + * appropriate manager after converting the generic update map to type-specific update objects. + * Spec: RTLO4b4c + */ + abstract fun notifyUpdated(update: ObjectUpdate) + + /** + * Called during garbage collection intervals to clean up expired entries. + * + * This method is invoked periodically (every 5 minutes) by the ObjectsPool + * to perform cleanup of tombstoned data that has exceeded the grace period. + * + * This method should identify and remove entries that: + * - Have been marked as tombstoned + * - Have a tombstone timestamp older than the specified grace period + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned entries + * should be kept before being eligible for removal. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * Must be greater than 2 minutes to ensure proper operation + * ordering and avoid issues with delayed operations. + * + * Implementations typically use single-pass removal techniques to + * efficiently clean up expired data without creating temporary collections. + */ + abstract fun onGCInterval(gcGracePeriod: Long) +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt similarity index 88% rename from liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveCounter.kt rename to liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt index bbeeb60ea..4ec0d3edf 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt @@ -1,4 +1,6 @@ -package io.ably.lib.liveobjects.value +package io.ably.lib.liveobjects.value.livecounter + +import io.ably.lib.liveobjects.value.LiveCounter /** * Default implementation of the [LiveCounter] value type - an immutable holder for @@ -18,5 +20,5 @@ internal class DefaultLiveCounter( internal val initialCount: Number, ) : LiveCounter() { // TODO - build the COUNTER_CREATE ObjectMessage from `initialCount`, mirroring - // ably-js LiveCounterValueType.createCounterCreateMessage. Spec: RTO12f + // ably-js LiveCounterValueType.createCounterCreateMessage. Spec: RTLCV4 } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt new file mode 100644 index 000000000..65de1eef8 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt @@ -0,0 +1,123 @@ +package io.ably.lib.liveobjects.value.livecounter + +import io.ably.lib.liveobjects.* +import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.liveobjects.invalidInputError +import io.ably.lib.liveobjects.message.* +import io.ably.lib.liveobjects.message.WireCounterInc +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.liveobjects.value.noOp +import java.util.concurrent.atomic.AtomicReference +import io.ably.lib.util.Log + +/** + * @spec RTLC1/RTLC2 - LiveCounter implementation extends BaseRealtimeObject + */ +internal class InternalLiveCounter private constructor( + objectId: String, + private val realtimeObject: DefaultRealtimeObject, +) : BaseRealtimeObject(objectId, ObjectType.Counter, realtimeObject.clock) { + + override val tag = "LiveCounter" + + /** + * Thread-safe reference to hold the counter data value. + * Accessed from public API for LiveCounter and updated by LiveCounterManager. + */ + internal val data = AtomicReference(0.0) // RTLC3 + + /** + * liveCounterManager instance for managing LiveCounter operations + */ + private val liveCounterManager = LiveCounterManager(this) + + private val channelName = realtimeObject.channelName + private val adapter: AblyClientAdapter get() = realtimeObject.adapter + + internal suspend fun increment(amount: Number) = incrementAsync(amount.toDouble()) + + internal suspend fun decrement(amount: Number) = incrementAsync(-amount.toDouble()) + + internal fun value(): Double { + return data.get() + } + + internal fun subscribe(listener: LiveCounterChangeListener): Subscription { + return liveCounterManager.subscribe(listener) + } + + override fun validate(state: WireObjectState) = liveCounterManager.validate(state) + + private suspend fun incrementAsync(amount: Double) { + // RTLC12e1 - Validate input parameter + if (amount.isNaN() || amount.isInfinite()) { + throw invalidInputError("Counter value increment should be a valid number") + } + + // RTLC12e2, RTLC12e3, RTLC12e5 - Create ObjectMessage with the COUNTER_INC operation + val msg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.CounterInc, + objectId = objectId, + counterInc = WireCounterInc(number = amount) + ) + ) + + // RTLC12g - publish and apply locally on ACK + realtimeObject.publishAndApply(arrayOf(msg)) + } + + override fun applyObjectState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { + return liveCounterManager.applyState(wireObjectState, message.serialTimestamp) + } + + override fun applyObjectOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { + return liveCounterManager.applyOperation(operation, message.serialTimestamp) + } + + override fun clearData(): ObjectUpdate { + return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { this@InternalLiveCounter.data.set(0.0) } + } + + override fun notifyUpdated(update: ObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + + // TODO - Emit a proper LiveCounterChangeEvent once the Instance/ObjectMessage subscription + // pipeline is wired up. ObjectUpdate is not a LiveCounterChangeEvent, so casting it (as was + // done previously) always throws ClassCastException; emission is deferred until then. + liveCounterManager.notify(update as LiveCounterChangeEvent) + } + + override fun onGCInterval(gcGracePeriod: Long) { + // Nothing to GC for a counter object + return + } + + companion object { + /** + * Creates a zero-value counter object. + * @spec RTLC4 - Returns LiveCounter with 0 value + */ + internal fun zeroValue(objectId: String, realtimeObjects: DefaultRealtimeObject): InternalLiveCounter { + return InternalLiveCounter(objectId, realtimeObjects) + } + + /** + * Creates initial value payload for counter creation. + * Spec: RTLCV4b + */ + internal fun initialValue(count: Number): WireCounterCreate { + return WireCounterCreate(count = count.toDouble()) + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt new file mode 100644 index 000000000..e148bb55b --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt @@ -0,0 +1,53 @@ +package io.ably.lib.liveobjects.value.livecounter + +import io.ably.lib.liveobjects.Subscription +import io.ably.lib.liveobjects.instance.InstanceListener +import io.ably.lib.liveobjects.instance.InstanceSubscriptionEvent +import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpCounterUpdate = ObjectUpdate(null) + +/** + * Interface for handling live counter changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ +internal interface HandlesLiveCounterChange { + /** + * Notifies all registered listeners about a counter update by propagating the change through the event system. + * This method is called when counter data changes and triggers the emission of update events to subscribers. + */ + fun notify(update: LiveCounterChangeEvent) +} + +internal interface LiveCounterChangeListener : InstanceListener + +internal interface LiveCounterChangeEvent : InstanceSubscriptionEvent + +internal abstract class LiveCounterChangeCoordinator: HandlesLiveCounterChange { + private val counterChangeEmitter = LiveCounterChangeEmitter() + + fun subscribe(listener: LiveCounterChangeListener): Subscription { + counterChangeEmitter.on(listener) + return onceSubscription { + counterChangeEmitter.off(listener) + } + } + + override fun notify(update: LiveCounterChangeEvent) = counterChangeEmitter.emit(update) +} + +private class LiveCounterChangeEmitter : EventEmitter() { + private val tag = "LiveCounterChangeEmitter" + + override fun apply(listener: LiveCounterChangeListener?, event: LiveCounterChangeEvent, vararg args: Any?) { + try { + event?.let { listener?.onUpdated(it) } + ?: Log.w(tag, "Null event passed to LiveCounterChange listener callback") + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt new file mode 100644 index 000000000..a08bb5844 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt @@ -0,0 +1,135 @@ +package io.ably.lib.liveobjects.value.livecounter + +import io.ably.lib.liveobjects.message.WireCounterInc +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.objectError +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.util.Log + +internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): LiveCounterChangeCoordinator() { + + private val objectId = liveCounter.objectId + + private val tag = "LiveCounterManager" + + /** + * @spec RTLC6 - Overrides counter data with state from sync + */ + internal fun applyState(wireObjectState: WireObjectState, serialTimestamp: Long?): ObjectUpdate { + val previousData = liveCounter.data.get() + + if (wireObjectState.tombstone) { + liveCounter.tombstone(serialTimestamp) + } else { + // override data for this object with data from the object state + liveCounter.createOperationIsMerged = false // RTLC6b + liveCounter.data.set(wireObjectState.counter?.count ?: 0.0) // RTLC6c + + // RTLC6d + wireObjectState.createOp?.let { createOp -> + mergeInitialDataFromCreateOperation(createOp) + } + } + + return calculateUpdateFromDataDiff(previousData, liveCounter.data.get()) + } + + /** + * @spec RTLC7 - Applies operations to LiveCounter + */ + internal fun applyOperation(operation: WireObjectOperation, serialTimestamp: Long?): Boolean { + return when (operation.action) { + WireObjectOperationAction.CounterCreate -> { + val update = applyCounterCreate(operation) // RTLC7d1 + liveCounter.notifyUpdated(update) // RTLC7d1a + true // RTLC7d1b + } + WireObjectOperationAction.CounterInc -> { + if (operation.counterInc != null) { + val update = applyCounterInc(operation.counterInc) // RTLC7d5 + liveCounter.notifyUpdated(update) // RTLC7d5a + true // RTLC7d5b + } else { + throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}") + } + } + WireObjectOperationAction.ObjectDelete -> { + val update = liveCounter.tombstone(serialTimestamp) + liveCounter.notifyUpdated(update) + true // RTLC7d4b + } + else -> { + Log.w(tag, "Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 + false + } + } + } + + /** + * @spec RTLC8 - Applies counter create operation + */ + private fun applyCounterCreate(operation: WireObjectOperation): ObjectUpdate { + if (liveCounter.createOperationIsMerged) { + // RTLC8b + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. + Log.v( + tag, + "Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId" + ) + return noOpCounterUpdate // RTLC8c + } + + return mergeInitialDataFromCreateOperation(operation) // RTLC8c + } + + /** + * @spec RTLC9 - Applies counter increment operation + */ + private fun applyCounterInc(wireCounterInc: WireCounterInc): ObjectUpdate { + val amount = wireCounterInc.number + val previousValue = liveCounter.data.get() + liveCounter.data.set(previousValue + amount) // RTLC9f + return ObjectUpdate(amount) + } + + internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): ObjectUpdate { + // A zero delta means the value did not change (e.g. clearing an already-zero counter). + // Return the no-op update so notifyUpdated() short-circuits and no event is emitted. + return if (newData == prevData) noOpCounterUpdate else ObjectUpdate(newData - prevData) + } + + /** + * @spec RTLC16 - Merges initial data from create operation + */ + private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation): ObjectUpdate { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + // note that it is intentional to SUM the incoming count from the create op. + // if we got here, it means that current counter instance is missing the initial value in its data reference, + // which we're going to add now. + val count = operation.counterCreateWithObjectId?.derivedFrom?.count + ?: operation.counterCreate?.count + ?: 0.0 + val previousValue = liveCounter.data.get() + liveCounter.data.set(previousValue + count) // RTLC16 + liveCounter.createOperationIsMerged = true // RTLC16 + return ObjectUpdate(count) + } + + internal fun validate(state: WireObjectState) { + liveCounter.validateObjectId(state.objectId) + state.createOp?.let { createOp -> + liveCounter.validateObjectId(createOp.objectId) + validateCounterCreateAction(createOp.action) + } + } + + private fun validateCounterCreateAction(action: WireObjectOperationAction) { + if (action != WireObjectOperationAction.CounterCreate) { + throw objectError("Invalid create operation action $action for LiveCounter objectId=${objectId}") + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt similarity index 80% rename from liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveMap.kt rename to liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt index b0fcd9abb..dd90a6f7b 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt @@ -1,4 +1,7 @@ -package io.ably.lib.liveobjects.value +package io.ably.lib.liveobjects.value.livemap + +import io.ably.lib.liveobjects.value.LiveMap +import io.ably.lib.liveobjects.value.LiveMapValue /** * Default implementation of the [LiveMap] value type - an immutable holder for the @@ -16,9 +19,9 @@ package io.ably.lib.liveobjects.value * Spec: RTLMV1, RTLMV2, RTLMV3 */ internal class DefaultLiveMap( - internal val entries: Map, + internal val entries: Map, ) : LiveMap() { // TODO - build the MAP_CREATE ObjectMessage (plus nested object create messages) // from `entries`, mirroring ably-js LiveMapValueType.createMapCreateMessage. - // Spec: RTO11f + // Spec: RTLMV4 } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt new file mode 100644 index 000000000..0cf81043f --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt @@ -0,0 +1,223 @@ +package io.ably.lib.liveobjects.value.livemap + +import io.ably.lib.liveobjects.* +import io.ably.lib.liveobjects.ObjectsPool +import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.liveobjects.message.* +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.message.WireObjectsMapSemantics +import io.ably.lib.liveobjects.value.* +import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.liveobjects.value.noOp +import io.ably.lib.util.Log +import java.util.Base64 +import java.util.concurrent.ConcurrentHashMap +import java.util.AbstractMap + +/** + * @spec RTLM1/RTLM2 - LiveMap implementation extends BaseRealtimeObject + */ +internal class InternalLiveMap private constructor( + objectId: String, + private val realtimeObject: DefaultRealtimeObject, + internal val semantics: WireObjectsMapSemantics = WireObjectsMapSemantics.LWW +) : BaseRealtimeObject(objectId, ObjectType.Map, realtimeObject.clock) { + + override val tag = "LiveMap" + + /** + * ConcurrentHashMap for thread-safe access from public APIs in LiveMap and LiveMapManager. + */ + internal val data = ConcurrentHashMap() + + /** @spec RTLM25 */ + internal var clearTimeserial: String? = null + + /** + * LiveMapManager instance for managing LiveMap operations + */ + private val liveMapManager = LiveMapManager(this) + + private val channelName = realtimeObject.channelName + private val adapter: AblyClientAdapter get() = realtimeObject.adapter + internal val objectsPool: ObjectsPool get() = realtimeObject.objectsPool + + internal fun get(keyName: String): LiveMapValue? { + if (isTombstoned) { + return null + } + data[keyName]?.let { liveMapEntry -> + return liveMapEntry.getResolvedValue(objectsPool) + } + return null // RTLM5d1 + } + + internal fun entries(): Iterable> { + return sequence> { + for ((key, entry) in data.entries) { + val value = entry.getResolvedValue(objectsPool) // RTLM11d, RTLM11d2 + value?.let { + yield(AbstractMap.SimpleImmutableEntry(key, it)) + } + } + }.asIterable() + } + + internal fun keys(): Iterable { + val iterableEntries = entries() + return sequence { + for (entry in iterableEntries) { + yield(entry.key) // RTLM12b + } + }.asIterable() + } + + internal fun values(): Iterable { + val iterableEntries = entries() + return sequence { + for (entry in iterableEntries) { + yield(entry.value) // RTLM13b + } + }.asIterable() + } + + internal fun size(): Long { + return data.values.count { !it.isEntryOrRefTombstoned(objectsPool) }.toLong() // RTLM10d + } + + internal suspend fun set(keyName: String, value: LiveMapValue) = setAsync(keyName, value) + + internal suspend fun remove(keyName: String) = removeAsync(keyName) + + override fun validate(state: WireObjectState) = liveMapManager.validate(state) + + internal fun subscribe(listener: LiveMapChangeListener): Subscription { + return liveMapManager.subscribe(listener) + } + + private suspend fun setAsync(keyName: String, value: LiveMapValue) { + // Validate input parameters + if (keyName.isEmpty()) { + throw invalidInputError("Map key should not be empty") + } + + // RTLM20e - Create ObjectMessage with the MAP_SET operation + val msg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.MapSet, + objectId = objectId, + mapSet = WireMapSet( + key = keyName, + value = fromLiveMapValue(value) + ) + ) + ) + + // RTLM20h - publish and apply locally on ACK + realtimeObject.publishAndApply(arrayOf(msg)) + } + + private suspend fun removeAsync(keyName: String) { + // Validate input parameter + if (keyName.isEmpty()) { + throw invalidInputError("Map key should not be empty") + } + + // RTLM21e - Create ObjectMessage with the MAP_REMOVE operation + val msg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.MapRemove, + objectId = objectId, + mapRemove = WireMapRemove(key = keyName) + ) + ) + + // RTLM21g - publish and apply locally on ACK + realtimeObject.publishAndApply(arrayOf(msg)) + } + + override fun applyObjectState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { + return liveMapManager.applyState(wireObjectState, message.serialTimestamp) + } + + override fun applyObjectOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { + return liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp) + } + + override fun clearData(): ObjectUpdate { + clearTimeserial = null // RTLM4 + return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) + .apply { this@InternalLiveMap.data.clear() } + } + + override fun notifyUpdated(update: ObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + + // TODO - Emit a proper LiveMapChangeEvent once the Instance/ObjectMessage subscription + // pipeline is wired up. ObjectUpdate is not a LiveMapChangeEvent, so casting it (as was + // done previously) always throws ClassCastException; emission is deferred until then. + liveMapManager.notify(update as LiveMapChangeEvent) + } + + override fun onGCInterval(gcGracePeriod: Long) { + data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod, clock) } + } + + companion object { + /** + * Creates a zero-value map object. + * @spec RTLM4 - Returns LiveMap with empty map data + */ + internal fun zeroValue(objectId: String, objects: DefaultRealtimeObject): InternalLiveMap { + return InternalLiveMap(objectId, objects) + } + + /** + * Creates a MapCreate payload from map entries. + * Spec: RTLMV4e + */ + internal fun initialValue(entries: MutableMap): WireMapCreate { + return WireMapCreate( + semantics = WireObjectsMapSemantics.LWW, + entries = entries.mapValues { (_, value) -> + WireObjectsMapEntry( + tombstone = false, + data = fromLiveMapValue(value) + ) + } + ) + } + + /** + * Spec: RTLM20e7 + */ + private fun fromLiveMapValue(value: LiveMapValue): WireObjectData { + return when { + value.isLiveMap || value.isLiveCounter -> + WireObjectData(objectId = (value.value as BaseRealtimeObject).objectId) + value.isBoolean -> + WireObjectData(boolean = value.asBoolean) + value.isBinary -> + WireObjectData(bytes = Base64.getEncoder().encodeToString(value.asBinary)) + value.isNumber -> + WireObjectData(number = value.asNumber.toDouble()) + value.isString -> + WireObjectData(string = value.asString) + value.isJsonObject -> + WireObjectData(json = value.asJsonObject) + value.isJsonArray -> + WireObjectData(json = value.asJsonArray) + else -> + throw IllegalArgumentException("Unsupported value type") + } + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt new file mode 100644 index 000000000..4848f37b8 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt @@ -0,0 +1,53 @@ +package io.ably.lib.liveobjects.value.livemap + +import io.ably.lib.liveobjects.Subscription +import io.ably.lib.liveobjects.instance.InstanceListener +import io.ably.lib.liveobjects.instance.InstanceSubscriptionEvent +import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpMapUpdate = ObjectUpdate(null) + +/** + * Interface for handling live map changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ +internal interface HandlesLiveMapChange { + /** + * Notifies all registered listeners about a map update by propagating the change through the event system. + * This method is called when map data changes and triggers the emission of update events to subscribers. + */ + fun notify(update: LiveMapChangeEvent) +} + +internal interface LiveMapChangeListener : InstanceListener + +internal interface LiveMapChangeEvent : InstanceSubscriptionEvent + +internal abstract class LiveMapChangeCoordinator: HandlesLiveMapChange { + private val mapChangeEmitter = LiveMapChangeEmitter() + + fun subscribe(listener: LiveMapChangeListener): Subscription { + mapChangeEmitter.on(listener) + return onceSubscription { + mapChangeEmitter.off(listener) + } + } + + override fun notify(update: LiveMapChangeEvent) = mapChangeEmitter.emit(update) +} + +private class LiveMapChangeEmitter : EventEmitter() { + private val tag = "LiveMapChangeEmitter" + + override fun apply(listener: LiveMapChangeListener?, event: LiveMapChangeEvent?, vararg args: Any?) { + try { + event?.let { listener?.onUpdated(it) } + ?: Log.w(tag, "Null event passed to LiveMapChange listener callback") + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt new file mode 100644 index 000000000..15f388a12 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt @@ -0,0 +1,83 @@ +package io.ably.lib.liveobjects.value.livemap + +import io.ably.lib.liveobjects.ObjectsPool +import io.ably.lib.liveobjects.message.WireObjectData +import io.ably.lib.liveobjects.value.* +import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.util.Clock +import java.util.Base64 + +/** + * @spec RTLM3 - Map data structure storing entries + */ +internal data class LiveMapEntry( + val isTombstoned: Boolean = false, + val tombstonedAt: Long? = null, + val timeserial: String? = null, + val data: WireObjectData? = null +) + +/** + * Checks if entry is directly tombstoned or references a tombstoned object. Spec: RTLM14 + * @param objectsPool The object pool containing referenced DefaultRealtimeObjects + */ +internal fun LiveMapEntry.isEntryOrRefTombstoned(objectsPool: ObjectsPool): Boolean { + if (isTombstoned) { + return true // RTLM14a + } + data?.objectId?.let { refId -> // RTLM5d2f -has an objectId reference + objectsPool.get(refId)?.let { refObject -> + if (refObject.isTombstoned) { + return true + } + } + } + return false // RTLM14b +} + +/** + * Returns value as is if object data stores a primitive type or + * a reference to another RealtimeObject from the pool if it stores an objectId. + */ +internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapValue? { + if (isTombstoned) { return null } // RTLM5d2h + + data?.let { d -> // RTLM5d2b, RTLM5d2c, RTLM5d2d, RTLM5d2e + d.string?.let { return LiveMapValue.of(it) } + d.number?.let { return LiveMapValue.of(it) } + d.boolean?.let { return LiveMapValue.of(it) } + d.bytes?.let { return LiveMapValue.of(Base64.getDecoder().decode(it)) } + d.json?.let { parsed -> + return when { + parsed.isJsonObject -> LiveMapValue.of(parsed.asJsonObject) + parsed.isJsonArray -> LiveMapValue.of(parsed.asJsonArray) + else -> null + } + } + d.objectId?.let { refId -> // RTLM5d2f - has an objectId reference + objectsPool.get(refId)?.let { refObject -> + if (refObject.isTombstoned) { + return null // tombstoned objects must not be surfaced to the end users + } + return fromRealtimeObject(refObject) // RTLM5d2f2 + } + } + } + return null // RTLM5d2g, RTLM5d2f1 +} + +/** + * Extension function to check if a LiveMapEntry is expired and ready for garbage collection + */ +internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long, clock: Clock): Boolean { + val currentTime = clock.currentTimeMillis() + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true +} + +private fun fromRealtimeObject(realtimeObject: BaseRealtimeObject): LiveMapValue { + return when (realtimeObject.objectType) { + ObjectType.Map -> LiveMapValue.of(realtimeObject as LiveMap) + ObjectType.Counter -> LiveMapValue.of(realtimeObject as LiveCounter) + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt new file mode 100644 index 000000000..d35e535a4 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt @@ -0,0 +1,415 @@ +package io.ably.lib.liveobjects.value.livemap + +import io.ably.lib.liveobjects.message.* +import io.ably.lib.liveobjects.message.WireMapSet +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.objectError +import io.ably.lib.liveobjects.value.ObjectUpdate +import io.ably.lib.liveobjects.value.noOp +import io.ably.lib.util.Log +import kotlin.collections.iterator + +internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChangeCoordinator() { + + private val objectId = liveMap.objectId + + private val tag = "LiveMapManager" + + /** + * @spec RTLM6 - Overrides object data with state from sync + */ + internal fun applyState(wireObjectState: WireObjectState, serialTimestamp: Long?): ObjectUpdate { + val previousData = liveMap.data.toMap() + + if (wireObjectState.tombstone) { + liveMap.tombstone(serialTimestamp) + } else { + // override data for this object with data from the object state + liveMap.createOperationIsMerged = false // RTLM6b + liveMap.data.clear() + + liveMap.clearTimeserial = wireObjectState.map?.clearTimeserial // RTLM6i + + wireObjectState.map?.entries?.forEach { (key, entry) -> + liveMap.data[key] = LiveMapEntry( + isTombstoned = entry.tombstone ?: false, + tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp + ?: liveMap.clock.currentTimeMillis() else null, + timeserial = entry.timeserial, + data = entry.data + ) + } // RTLM6c + + // RTLM6d + wireObjectState.createOp?.let { createOp -> + mergeInitialDataFromCreateOperation(createOp) + } + } + + return calculateUpdateFromDataDiff(previousData, liveMap.data.toMap()) + } + + /** + * @spec RTLM15 - Applies operations to LiveMap + */ + internal fun applyOperation(operation: WireObjectOperation, serial: String?, serialTimestamp: Long?): Boolean { + return when (operation.action) { + WireObjectOperationAction.MapCreate -> { + val update = applyMapCreate(operation) // RTLM15d1 + liveMap.notifyUpdated(update) // RTLM15d1a + true // RTLM15d1b + } + WireObjectOperationAction.MapSet -> { + if (operation.mapSet != null) { + val update = applyMapSet(operation.mapSet, serial) // RTLM15d6 + liveMap.notifyUpdated(update) // RTLM15d6a + true // RTLM15d6b + } else { + throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") + } + } + WireObjectOperationAction.MapRemove -> { + if (operation.mapRemove != null) { + val update = applyMapRemove(operation.mapRemove, serial, serialTimestamp) // RTLM15d7 + liveMap.notifyUpdated(update) // RTLM15d7a + true // RTLM15d7b + } else { + throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") + } + } + WireObjectOperationAction.ObjectDelete -> { + val update = liveMap.tombstone(serialTimestamp) + liveMap.notifyUpdated(update) + true // RTLM15d5b + } + WireObjectOperationAction.MapClear -> { + val update = applyMapClear(serial) // RTLM15d8 + liveMap.notifyUpdated(update) // RTLM15d8a + true // RTLM15d8b + } + else -> { + Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 + false + } + } + } + + /** + * @spec RTLM16 - Applies map create operation + */ + private fun applyMapCreate(operation: WireObjectOperation): ObjectUpdate { + if (liveMap.createOperationIsMerged) { + // RTLM16b + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. + Log.v( + tag, + "Skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${objectId}" + ) + return noOpMapUpdate + } + + validateMapSemantics(getEffectiveMapCreate(operation)?.semantics) // RTLM16c + + return mergeInitialDataFromCreateOperation(operation) // RTLM16d + } + + /** + * @spec RTLM7 - Applies MAP_SET operation to LiveMap + */ + private fun applyMapSet( + wireMapSet: WireMapSet, // RTLM7d3 + timeSerial: String?, // RTLM7d2 + ): ObjectUpdate { + // RTLM7h - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_SET for key=\"${wireMapSet.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + + val existingEntry = liveMap.data[wireMapSet.key] + + // RTLM7a + if (existingEntry != null && !canApplyMapOperation(existingEntry.timeserial, timeSerial)) { + // RTLM7a1 - the operation's serial <= the entry's serial, ignore the operation + Log.v(tag, + "Skipping update for key=\"${wireMapSet.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial};" + + " objectId=${objectId}" + ) + return noOpMapUpdate + } + + if (wireMapSet.value.isInvalid()) { + throw objectError("Invalid object data for MAP_SET op on objectId=${objectId} on key=${wireMapSet.key}") + } + + // RTLM7g + wireMapSet.value.objectId?.let { + // this MAP_SET op is setting a key to point to another object via its object id, + // but it is possible that we don't have the corresponding object in the pool yet (for example, we haven't seen the *_CREATE op for it). + // we don't want to return undefined from this map's .get() method even if we don't have the object, + // so instead we create a zero-value object for that object id if it not exists. + liveMap.objectsPool.createZeroValueObjectIfNotExists(it) // RTLM7g1 + } + + if (existingEntry != null) { + // RTLM7a2 - Replace existing entry with new one instead of mutating + liveMap.data[wireMapSet.key] = LiveMapEntry( + isTombstoned = false, // RTLM7a2c + timeserial = timeSerial, // RTLM7a2b + data = wireMapSet.value // RTLM7a2e + ) + } else { + // RTLM7b, RTLM7b4 + liveMap.data[wireMapSet.key] = LiveMapEntry( + isTombstoned = false, // RTLM7b2 + timeserial = timeSerial, + data = wireMapSet.value + ) + } + + return ObjectUpdate(mapOf(wireMapSet.key to "updated")) + } + + /** + * @spec RTLM8 - Applies MAP_REMOVE operation to LiveMap + */ + private fun applyMapRemove( + wireMapRemove: WireMapRemove, // RTLM8c4 + timeSerial: String?, // RTLM8c2 + timeStamp: Long?, // RTLM8c3 + ): ObjectUpdate { + // RTLM8g - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_REMOVE for key=\"${wireMapRemove.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + + val existingEntry = liveMap.data[wireMapRemove.key] + + // RTLM8a + if (existingEntry != null && !canApplyMapOperation(existingEntry.timeserial, timeSerial)) { + // RTLM8a1 - the operation's serial <= the entry's serial, ignore the operation + Log.v( + tag, + "Skipping remove for key=\"${wireMapRemove.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial}; " + + "objectId=${objectId}" + ) + return noOpMapUpdate + } + + val tombstonedAt = if (timeStamp != null) timeStamp else { + Log.w( + tag, + "No timestamp provided for MAP_REMOVE op on key=\"${wireMapRemove.key}\"; using current time as tombstone time; " + + "objectId=${objectId}" + ) + liveMap.clock.currentTimeMillis() + } + + if (existingEntry != null) { + // RTLM8a2 - Replace existing entry with new one instead of mutating + liveMap.data[wireMapRemove.key] = LiveMapEntry( + isTombstoned = true, // RTLM8a2c + tombstonedAt = tombstonedAt, + timeserial = timeSerial, // RTLM8a2b + data = null // RTLM8a2a + ) + } else { + // RTLM8b, RTLM8b1 + liveMap.data[wireMapRemove.key] = LiveMapEntry( + isTombstoned = true, // RTLM8b2 + tombstonedAt = tombstonedAt, + timeserial = timeSerial + ) + } + + return ObjectUpdate(mapOf(wireMapRemove.key to "removed")) + } + + /** + * @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap + */ + private fun applyMapClear(timeSerial: String?): ObjectUpdate { + val clearSerial = liveMap.clearTimeserial + + // RTLM24c - skip if existing clear serial is strictly newer than incoming op serial + if (clearSerial != null && (timeSerial == null || clearSerial > timeSerial)) { + Log.v(tag, + "Skipping MAP_CLEAR: op serial $timeSerial <= current clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + + Log.v(tag, + "Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId") + liveMap.clearTimeserial = timeSerial // RTLM24d + + val update = mutableMapOf() + + // RTLM24e - remove all entries whose serial is older than (or equal to missing) the clear serial + liveMap.data.entries.removeIf { + val (key, entry) = it + val entrySerial = entry.timeserial + if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) { + update[key] = "removed" + true + } else { + false + } + } + + return ObjectUpdate(update) + } + + /** + * For Lww CRDT semantics (the only supported LiveMap semantic) an operation + * Should only be applied if incoming serial is strictly greater than existing entry's serial. + * @spec RTLM9 - Serial comparison logic for map operations + */ + private fun canApplyMapOperation(existingMapEntrySerial: String?, timeSerial: String?): Boolean { + if (existingMapEntrySerial.isNullOrEmpty() && timeSerial.isNullOrEmpty()) { // RTLM9b + return false + } + if (existingMapEntrySerial.isNullOrEmpty()) { // RTLM9d - If true, means timeSerial is not empty based on previous checks + return true + } + if (timeSerial.isNullOrEmpty()) { // RTLM9c - Check reached here means existingMapEntrySerial is not empty + return false + } + return timeSerial > existingMapEntrySerial // RTLM9e - both are not empty + } + + /** + * @spec RTLM23 - Merges initial data from create operation + */ + private fun getEffectiveMapCreate(operation: WireObjectOperation): WireMapCreate? = + operation.mapCreateWithObjectId?.derivedFrom ?: operation.mapCreate + + private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation): ObjectUpdate { + val effectiveMapCreate = getEffectiveMapCreate(operation) + if (effectiveMapCreate?.entries.isNullOrEmpty()) { // no map entries in MAP_CREATE op + return noOpMapUpdate + } + + val aggregatedUpdate = mutableListOf() + + // RTLM23a + // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. + // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. + effectiveMapCreate?.entries?.forEach { (key, entry) -> + // for a MAP_CREATE operation we must use the serial value available on an entry, instead of a serial on a message + val opTimeserial = entry.timeserial + val update = if (entry.tombstone == true) { + // RTLM23a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op + applyMapRemove(WireMapRemove(key), opTimeserial, entry.serialTimestamp) + } else { + // RTLM23a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op + applyMapSet(WireMapSet(key, entry.data ?: throw objectError("MAP_SET operation without data")), opTimeserial) + } + + // skip noop updates + if (update.noOp) { + return@forEach + } + + aggregatedUpdate.add(update) + } + + liveMap.createOperationIsMerged = true // RTLM23b + + // TODO - This will need some rework as per new spec, we have commented out old code. + // Maybe we no more calculate the difference, so we might get rid of the code. + return ObjectUpdate(aggregatedUpdate) +// return ObjectUpdate( +// aggregatedUpdate.map { it.update }.fold(emptyMap()) { acc, map -> acc + map } +// ) + } + + internal fun calculateUpdateFromDataDiff( + prevData: Map, + newData: Map + ): ObjectUpdate { + val update = mutableMapOf() + + // Check for removed entries + for ((key, prevEntry) in prevData) { + if (!prevEntry.isTombstoned && !newData.containsKey(key)) { + update[key] = "removed" + } + } + + // Check for added/updated entries + for ((key, newEntry) in newData) { + if (!prevData.containsKey(key)) { + // if property does not exist in current map, but new data has it as non-tombstoned property - got updated + if (!newEntry.isTombstoned) { + update[key] = "updated" + } + // otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway + continue + } + + // properties that exist both in current and new map data need to have their values compared to decide on update type + val prevEntry = prevData[key]!! + + // compare tombstones first + if (prevEntry.isTombstoned && !newEntry.isTombstoned) { + // prev prop is tombstoned, but new is not. it means prop was updated to a meaningful value + update[key] = "updated" + continue + } + if (!prevEntry.isTombstoned && newEntry.isTombstoned) { + // prev prop is not tombstoned, but new is. it means prop was removed + update[key] = "removed" + continue + } + if (prevEntry.isTombstoned && newEntry.isTombstoned) { + // props are tombstoned - treat as noop, as there is no data to compare + continue + } + + // both props exist and are not tombstoned, need to compare values to see if it was changed + val valueChanged = prevEntry.data != newEntry.data + if (valueChanged) { + update[key] = "updated" + continue + } + } + + // An empty diff means nothing actually changed (e.g. clearing an already-empty root + // map on a channel with no objects). Return the no-op update so notifyUpdated() + // short-circuits and no change event is emitted. Spec: RTLM/RTO4b. + return if (update.isEmpty()) noOpMapUpdate else ObjectUpdate(update) + } + + internal fun validate(state: WireObjectState) { + liveMap.validateObjectId(state.objectId) + validateMapSemantics(state.map?.semantics) + state.createOp?.let { createOp -> + liveMap.validateObjectId(createOp.objectId) + validateMapCreateAction(createOp.action) + validateMapSemantics(getEffectiveMapCreate(createOp)?.semantics) + } + } + + private fun validateMapCreateAction(action: WireObjectOperationAction) { + if (action != WireObjectOperationAction.MapCreate) { + throw objectError("Invalid create operation action $action for LiveMap objectId=${objectId}") + } + } + + private fun validateMapSemantics(semantics: WireObjectsMapSemantics?) { + if (semantics != liveMap.semantics) { + throw objectError( + "Invalid object: incoming object map semantics=$semantics; current map semantics=${WireObjectsMapSemantics.LWW}" + ) + } + } +}