diff --git a/.agent-docs/2026-01-17-unarchive-chat-on-receive.md b/.agent-docs/2026-01-17-unarchive-chat-on-receive.md new file mode 100644 index 00000000..8d96f7c9 --- /dev/null +++ b/.agent-docs/2026-01-17-unarchive-chat-on-receive.md @@ -0,0 +1,7 @@ +# Unarchive Chat On Incoming Message + +## Plan +- [x] Inspect current dialog/archive handling across server, protocol, and clients. +- [x] Add server-side unarchiveIfNeeded module and call it during message send with user-bucket persistence. +- [x] Add dialog-archive update to protos, regenerate code, and wire server sync + client update application. +- [x] Remove client-side unarchive-on-receive logic and add tests for server unarchive behavior. diff --git a/apple/InlineKit/Sources/InlineKit/Models/Message.swift b/apple/InlineKit/Sources/InlineKit/Models/Message.swift index 40d909eb..3a60aaae 100644 --- a/apple/InlineKit/Sources/InlineKit/Models/Message.swift +++ b/apple/InlineKit/Sources/InlineKit/Models/Message.swift @@ -399,12 +399,6 @@ public extension Message { // Save the message let message = try saveAndFetch(db, onConflict: .ignore) - // Handle unarchiving for incoming messages - if !isExisting, out != true { - // TODO: move this out of here - try unarchiveIncomingMessagesChat(db, peerId: peerId) - } - // Publish changes if needed if publishChanges { let message = self // Create an immutable copy @@ -425,30 +419,6 @@ public extension Message { return message } - func unarchiveIncomingMessagesChat( - _ db: Database, - peerId: Peer - ) throws { - if let dialog = try Dialog.fetchOne(db, id: Dialog.getDialogId(peerId: peerId)), - dialog.archived == true - { - var updatedDialog = dialog - updatedDialog.archived = false - try updatedDialog.save(db, onConflict: .replace) - - // Schedule API update after transaction - let peer = peerId - db.afterNextTransaction { _ in - Task { - try? await ApiClient.shared.updateDialog( - peerId: peer, - pinned: nil, - archived: false - ) - } - } - } - } } public extension ApiMessage { diff --git a/apple/InlineKit/Sources/InlineKit/RealtimeAPI/Updates.swift b/apple/InlineKit/Sources/InlineKit/RealtimeAPI/Updates.swift index adc65476..bda67e50 100644 --- a/apple/InlineKit/Sources/InlineKit/RealtimeAPI/Updates.swift +++ b/apple/InlineKit/Sources/InlineKit/RealtimeAPI/Updates.swift @@ -87,6 +87,9 @@ public actor UpdatesEngine: Sendable { case let .updateReadMaxID(updateReadMaxID): try updateReadMaxID.apply(db) + case let .dialogArchived(dialogArchived): + try dialogArchived.apply(db) + default: break } @@ -751,6 +754,20 @@ extension InlineProtocol.UpdateMarkAsUnread { } } +extension InlineProtocol.UpdateDialogArchived { + func apply(_ db: Database) throws { + Log.shared.debug("update dialog archived for peer \(peerID.toPeer()) archived: \(archived)") + + if var dialog = try Dialog.get(peerId: peerID.toPeer()).fetchOne(db) { + dialog.archived = archived + try dialog.update(db) + Log.shared.debug("Updated dialog archived to \(archived)") + } else { + Log.shared.warning("Could not find dialog for peer \(peerID.toPeer()) to update archived state") + } + } +} + extension InlineProtocol.UpdateReadMaxId { func apply(_ db: Database) throws { Log.shared.debug( diff --git a/apple/InlineKit/Sources/InlineProtocol/client.pb.swift b/apple/InlineKit/Sources/InlineProtocol/client.pb.swift index 811f49ea..37f2b571 100644 --- a/apple/InlineKit/Sources/InlineProtocol/client.pb.swift +++ b/apple/InlineKit/Sources/InlineProtocol/client.pb.swift @@ -8,7 +8,6 @@ // For information on using the generated types, please see the documentation: // https://github.com/apple/swift-protobuf/ -import Foundation import SwiftProtobuf // If the compiler emits an error on this type, it is because this file diff --git a/apple/InlineKit/Sources/InlineProtocol/core.pb.swift b/apple/InlineKit/Sources/InlineProtocol/core.pb.swift index 1e1f1985..2300d2ca 100644 --- a/apple/InlineKit/Sources/InlineProtocol/core.pb.swift +++ b/apple/InlineKit/Sources/InlineProtocol/core.pb.swift @@ -4510,6 +4510,14 @@ public struct Update: @unchecked Sendable { set {_uniqueStorage()._update = .chatVisibility(newValue)} } + public var dialogArchived: UpdateDialogArchived { + get { + if case .dialogArchived(let v)? = _storage._update {return v} + return UpdateDialogArchived() + } + set {_uniqueStorage()._update = .dialogArchived(newValue)} + } + public var unknownFields = SwiftProtobuf.UnknownStorage() public enum OneOf_Update: Equatable, Sendable { @@ -4548,6 +4556,7 @@ public struct Update: @unchecked Sendable { case spaceHasNewUpdates(UpdateSpaceHasNewUpdates) case spaceMemberUpdate(UpdateSpaceMemberUpdate) case chatVisibility(UpdateChatVisibility) + case dialogArchived(UpdateDialogArchived) } @@ -4869,6 +4878,32 @@ public struct UpdateMarkAsUnread: Sendable { fileprivate var _peerID: Peer? = nil } +/// Update when a dialog is archived or unarchived +public struct UpdateDialogArchived: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Peer ID of the dialog that changed + public var peerID: Peer { + get {return _peerID ?? Peer()} + set {_peerID = newValue} + } + /// Returns true if `peerID` has been explicitly set. + public var hasPeerID: Bool {return self._peerID != nil} + /// Clears the value of `peerID`. Subsequent reads from it will return its default value. + public mutating func clearPeerID() {self._peerID = nil} + + /// Whether it's archived (true) or unarchived (false) + public var archived: Bool = false + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _peerID: Peer? = nil +} + /// Update when a new chat is created either in space or a private chat public struct UpdateNewChat: @unchecked Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the @@ -12259,6 +12294,7 @@ extension Update: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBas 26: .standard(proto: "space_has_new_updates"), 27: .standard(proto: "space_member_update"), 28: .standard(proto: "chat_visibility"), + 29: .standard(proto: "dialog_archived"), ] fileprivate class _StorageClass { @@ -12627,6 +12663,19 @@ extension Update: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBas _storage._update = .chatVisibility(v) } }() + case 29: try { + var v: UpdateDialogArchived? + var hadOneofValue = false + if let current = _storage._update { + hadOneofValue = true + if case .dialogArchived(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + _storage._update = .dialogArchived(v) + } + }() default: break } } @@ -12746,6 +12795,10 @@ extension Update: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBas guard case .chatVisibility(let v)? = _storage._update else { preconditionFailure() } try visitor.visitSingularMessageField(value: v, fieldNumber: 28) }() + case .dialogArchived?: try { + guard case .dialogArchived(let v)? = _storage._update else { preconditionFailure() } + try visitor.visitSingularMessageField(value: v, fieldNumber: 29) + }() case nil: break } } @@ -13301,6 +13354,48 @@ extension UpdateMarkAsUnread: SwiftProtobuf.Message, SwiftProtobuf._MessageImple } } +extension UpdateDialogArchived: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "UpdateDialogArchived" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .standard(proto: "peer_id"), + 2: .same(proto: "archived"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularMessageField(value: &self._peerID) }() + case 2: try { try decoder.decodeSingularBoolField(value: &self.archived) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every if/case branch local when no optimizations + // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and + // https://github.com/apple/swift-protobuf/issues/1182 + try { if let v = self._peerID { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } }() + if self.archived != false { + try visitor.visitSingularBoolField(value: self.archived, fieldNumber: 2) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: UpdateDialogArchived, rhs: UpdateDialogArchived) -> Bool { + if lhs._peerID != rhs._peerID {return false} + if lhs.archived != rhs.archived {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + extension UpdateNewChat: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = "UpdateNewChat" public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ diff --git a/apple/InlineKit/Sources/RealtimeV2/Sync/Sync.swift b/apple/InlineKit/Sources/RealtimeV2/Sync/Sync.swift index 8ec8b4b1..b03f19b6 100644 --- a/apple/InlineKit/Sources/RealtimeV2/Sync/Sync.swift +++ b/apple/InlineKit/Sources/RealtimeV2/Sync/Sync.swift @@ -432,7 +432,7 @@ actor Sync { .space(id: payload.member.spaceID) case let .joinSpace(payload): .space(id: payload.space.id) - case .updateUserStatus, .updateUserSettings, .newChat: + case .updateUserStatus, .updateUserSettings, .newChat, .dialogArchived: .user case let .participantAdd(payload): .chat(peer: .with { $0.chat = .with { $0.chatID = payload.chatID } }) @@ -506,6 +506,8 @@ actor BucketActor { true case .spaceMemberAdd: true + case .dialogArchived: + true case .newMessage, .editMessage, .messageAttachment: enableMessageUpdates default: diff --git a/proto/core.proto b/proto/core.proto index 2baefe80..f645ba12 100644 --- a/proto/core.proto +++ b/proto/core.proto @@ -1111,6 +1111,7 @@ message Update { UpdateSpaceHasNewUpdates space_has_new_updates = 26; UpdateSpaceMemberUpdate space_member_update = 27; UpdateChatVisibility chat_visibility = 28; + UpdateDialogArchived dialog_archived = 29; } } @@ -1206,6 +1207,15 @@ message UpdateMarkAsUnread { bool unread_mark = 2; } +// Update when a dialog is archived or unarchived +message UpdateDialogArchived { + // Peer ID of the dialog that changed + Peer peer_id = 1; + + // Whether it's archived (true) or unarchived (false) + bool archived = 2; +} + // Update when a new chat is created either in space or a private chat message UpdateNewChat { // Chat diff --git a/proto/server.proto b/proto/server.proto index d07a9ebb..56d67ed6 100644 --- a/proto/server.proto +++ b/proto/server.proto @@ -26,6 +26,7 @@ message ServerUpdate { // User bucket updates ServerUserUpdateSpaceMemberDelete user_space_member_delete = 10; ServerUserUpdateChatParticipantDelete user_chat_participant_delete = 11; + ServerUserUpdateDialogArchived user_dialog_archived = 14; } } @@ -88,3 +89,9 @@ message ServerUserUpdateSpaceMemberDelete { int64 space_id = 1; } // Update for a user when they were removed from a chat message ServerUserUpdateChatParticipantDelete { int64 chat_id = 1; } + +// Update for a user when a dialog is archived or unarchived +message ServerUserUpdateDialogArchived { + Peer peer_id = 1; + bool archived = 2; +} diff --git a/server/packages/protocol/src/core.ts b/server/packages/protocol/src/core.ts index a009e357..5ab6898a 100644 --- a/server/packages/protocol/src/core.ts +++ b/server/packages/protocol/src/core.ts @@ -2954,6 +2954,12 @@ export interface Update { * @generated from protobuf field: UpdateChatVisibility chat_visibility = 28; */ chatVisibility: UpdateChatVisibility; + } | { + oneofKind: "dialogArchived"; + /** + * @generated from protobuf field: UpdateDialogArchived dialog_archived = 29; + */ + dialogArchived: UpdateDialogArchived; } | { oneofKind: undefined; }; @@ -3174,6 +3180,25 @@ export interface UpdateMarkAsUnread { */ unreadMark: boolean; } +/** + * Update when a dialog is archived or unarchived + * + * @generated from protobuf message UpdateDialogArchived + */ +export interface UpdateDialogArchived { + /** + * Peer ID of the dialog that changed + * + * @generated from protobuf field: Peer peer_id = 1; + */ + peerId?: Peer; + /** + * Whether it's archived (true) or unarchived (false) + * + * @generated from protobuf field: bool archived = 2; + */ + archived: boolean; +} /** * Update when a new chat is created either in space or a private chat * @@ -10839,7 +10864,8 @@ class Update$Type extends MessageType { { no: 25, name: "chat_has_new_updates", kind: "message", oneof: "update", T: () => UpdateChatHasNewUpdates }, { no: 26, name: "space_has_new_updates", kind: "message", oneof: "update", T: () => UpdateSpaceHasNewUpdates }, { no: 27, name: "space_member_update", kind: "message", oneof: "update", T: () => UpdateSpaceMemberUpdate }, - { no: 28, name: "chat_visibility", kind: "message", oneof: "update", T: () => UpdateChatVisibility } + { no: 28, name: "chat_visibility", kind: "message", oneof: "update", T: () => UpdateChatVisibility }, + { no: 29, name: "dialog_archived", kind: "message", oneof: "update", T: () => UpdateDialogArchived } ]); } create(value?: PartialMessage): Update { @@ -11010,6 +11036,12 @@ class Update$Type extends MessageType { chatVisibility: UpdateChatVisibility.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).chatVisibility) }; break; + case /* UpdateDialogArchived dialog_archived */ 29: + message.update = { + oneofKind: "dialogArchived", + dialogArchived: UpdateDialogArchived.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).dialogArchived) + }; + break; default: let u = options.readUnknownField; if (u === "throw") @@ -11103,6 +11135,9 @@ class Update$Type extends MessageType { /* UpdateChatVisibility chat_visibility = 28; */ if (message.update.oneofKind === "chatVisibility") UpdateChatVisibility.internalBinaryWrite(message.update.chatVisibility, writer.tag(28, WireType.LengthDelimited).fork(), options).join(); + /* UpdateDialogArchived dialog_archived = 29; */ + if (message.update.oneofKind === "dialogArchived") + UpdateDialogArchived.internalBinaryWrite(message.update.dialogArchived, writer.tag(29, WireType.LengthDelimited).fork(), options).join(); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -11756,6 +11791,60 @@ class UpdateMarkAsUnread$Type extends MessageType { */ export const UpdateMarkAsUnread = new UpdateMarkAsUnread$Type(); // @generated message type with reflection information, may provide speed optimized methods +class UpdateDialogArchived$Type extends MessageType { + constructor() { + super("UpdateDialogArchived", [ + { no: 1, name: "peer_id", kind: "message", T: () => Peer }, + { no: 2, name: "archived", kind: "scalar", T: 8 /*ScalarType.BOOL*/ } + ]); + } + create(value?: PartialMessage): UpdateDialogArchived { + const message = globalThis.Object.create((this.messagePrototype!)); + message.archived = false; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: UpdateDialogArchived): UpdateDialogArchived { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* Peer peer_id */ 1: + message.peerId = Peer.internalBinaryRead(reader, reader.uint32(), options, message.peerId); + break; + case /* bool archived */ 2: + message.archived = reader.bool(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: UpdateDialogArchived, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* Peer peer_id = 1; */ + if (message.peerId) + Peer.internalBinaryWrite(message.peerId, writer.tag(1, WireType.LengthDelimited).fork(), options).join(); + /* bool archived = 2; */ + if (message.archived !== false) + writer.tag(2, WireType.Varint).bool(message.archived); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message UpdateDialogArchived + */ +export const UpdateDialogArchived = new UpdateDialogArchived$Type(); +// @generated message type with reflection information, may provide speed optimized methods class UpdateNewChat$Type extends MessageType { constructor() { super("UpdateNewChat", [ diff --git a/server/packages/protocol/src/server.ts b/server/packages/protocol/src/server.ts index 35c5290a..a40d8e75 100644 --- a/server/packages/protocol/src/server.ts +++ b/server/packages/protocol/src/server.ts @@ -10,6 +10,7 @@ import { UnknownFieldHandler } from "@protobuf-ts/runtime"; import type { PartialMessage } from "@protobuf-ts/runtime"; import { reflectionMergePartial } from "@protobuf-ts/runtime"; import { MessageType } from "@protobuf-ts/runtime"; +import { Peer } from "./core"; import { Member } from "./core"; /** * @generated from protobuf message server.ServerUpdate @@ -90,6 +91,12 @@ export interface ServerUpdate { * @generated from protobuf field: server.ServerUserUpdateChatParticipantDelete user_chat_participant_delete = 11; */ userChatParticipantDelete: ServerUserUpdateChatParticipantDelete; + } | { + oneofKind: "userDialogArchived"; + /** + * @generated from protobuf field: server.ServerUserUpdateDialogArchived user_dialog_archived = 14; + */ + userDialogArchived: ServerUserUpdateDialogArchived; } | { oneofKind: undefined; }; @@ -240,6 +247,21 @@ export interface ServerUserUpdateChatParticipantDelete { */ chatId: bigint; } +/** + * Update for a user when a dialog is archived or unarchived + * + * @generated from protobuf message server.ServerUserUpdateDialogArchived + */ +export interface ServerUserUpdateDialogArchived { + /** + * @generated from protobuf field: Peer peer_id = 1; + */ + peerId?: Peer; + /** + * @generated from protobuf field: bool archived = 2; + */ + archived: boolean; +} // @generated message type with reflection information, may provide speed optimized methods class ServerUpdate$Type extends MessageType { constructor() { @@ -255,7 +277,8 @@ class ServerUpdate$Type extends MessageType { { no: 9, name: "space_remove_member", kind: "message", oneof: "update", T: () => ServerSpaceUpdateRemoveMember }, { no: 12, name: "space_member_update", kind: "message", oneof: "update", T: () => ServerSpaceUpdateMemberUpdate }, { no: 10, name: "user_space_member_delete", kind: "message", oneof: "update", T: () => ServerUserUpdateSpaceMemberDelete }, - { no: 11, name: "user_chat_participant_delete", kind: "message", oneof: "update", T: () => ServerUserUpdateChatParticipantDelete } + { no: 11, name: "user_chat_participant_delete", kind: "message", oneof: "update", T: () => ServerUserUpdateChatParticipantDelete }, + { no: 14, name: "user_dialog_archived", kind: "message", oneof: "update", T: () => ServerUserUpdateDialogArchived } ]); } create(value?: PartialMessage): ServerUpdate { @@ -338,6 +361,12 @@ class ServerUpdate$Type extends MessageType { userChatParticipantDelete: ServerUserUpdateChatParticipantDelete.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).userChatParticipantDelete) }; break; + case /* server.ServerUserUpdateDialogArchived user_dialog_archived */ 14: + message.update = { + oneofKind: "userDialogArchived", + userDialogArchived: ServerUserUpdateDialogArchived.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).userDialogArchived) + }; + break; default: let u = options.readUnknownField; if (u === "throw") @@ -386,6 +415,9 @@ class ServerUpdate$Type extends MessageType { /* server.ServerUserUpdateChatParticipantDelete user_chat_participant_delete = 11; */ if (message.update.oneofKind === "userChatParticipantDelete") ServerUserUpdateChatParticipantDelete.internalBinaryWrite(message.update.userChatParticipantDelete, writer.tag(11, WireType.LengthDelimited).fork(), options).join(); + /* server.ServerUserUpdateDialogArchived user_dialog_archived = 14; */ + if (message.update.oneofKind === "userDialogArchived") + ServerUserUpdateDialogArchived.internalBinaryWrite(message.update.userDialogArchived, writer.tag(14, WireType.LengthDelimited).fork(), options).join(); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -921,3 +953,57 @@ class ServerUserUpdateChatParticipantDelete$Type extends MessageType { + constructor() { + super("server.ServerUserUpdateDialogArchived", [ + { no: 1, name: "peer_id", kind: "message", T: () => Peer }, + { no: 2, name: "archived", kind: "scalar", T: 8 /*ScalarType.BOOL*/ } + ]); + } + create(value?: PartialMessage): ServerUserUpdateDialogArchived { + const message = globalThis.Object.create((this.messagePrototype!)); + message.archived = false; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: ServerUserUpdateDialogArchived): ServerUserUpdateDialogArchived { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* Peer peer_id */ 1: + message.peerId = Peer.internalBinaryRead(reader, reader.uint32(), options, message.peerId); + break; + case /* bool archived */ 2: + message.archived = reader.bool(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: ServerUserUpdateDialogArchived, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* Peer peer_id = 1; */ + if (message.peerId) + Peer.internalBinaryWrite(message.peerId, writer.tag(1, WireType.LengthDelimited).fork(), options).join(); + /* bool archived = 2; */ + if (message.archived !== false) + writer.tag(2, WireType.Varint).bool(message.archived); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message server.ServerUserUpdateDialogArchived + */ +export const ServerUserUpdateDialogArchived = new ServerUserUpdateDialogArchived$Type(); diff --git a/server/src/__tests__/functions/sendMessage.test.ts b/server/src/__tests__/functions/sendMessage.test.ts index 66860506..75e41235 100644 --- a/server/src/__tests__/functions/sendMessage.test.ts +++ b/server/src/__tests__/functions/sendMessage.test.ts @@ -4,12 +4,21 @@ import { setupTestDatabase, testUtils } from "../setup" import { sendMessage } from "@in/server/functions/messages.sendMessage" import type { DbChat, DbUser } from "@in/server/db/schema" import type { FunctionContext } from "@in/server/functions/_types" +import { db } from "@in/server/db" +import { dialogs } from "@in/server/db/schema" +import { and, eq } from "drizzle-orm" +import { UpdateBucket } from "@in/server/db/schema/updates" +import { UpdatesModel } from "@in/server/db/models/updates" // Test state let currentUser: DbUser let privateChat: DbChat let privateChatPeerId: InputPeer let context: FunctionContext +let userIndex = 0 + +const runId = Date.now() +const nextEmail = (label: string) => `${label}-${runId}-${userIndex++}@example.com` // Helpers function extractMessage(result: SendMessageResult): Message | null { @@ -23,7 +32,7 @@ function extractMessage(result: SendMessageResult): Message | null { describe("sendMessage", () => { beforeAll(async () => { await setupTestDatabase() - currentUser = (await testUtils.createUser("test@example.com"))! + currentUser = (await testUtils.createUser(nextEmail("test-user")))! privateChat = (await testUtils.createPrivateChat(currentUser, currentUser))! privateChatPeerId = { type: { oneofKind: "chat" as const, chat: { chatId: BigInt(privateChat.id) } }, @@ -104,4 +113,53 @@ describe("sendMessage", () => { expect(message!.entities!.entities[0]!.offset).toBe(0n) expect(message!.entities!.entities[0]!.length).toBe(3n) }) + + test("unarchives recipient dialog and enqueues a user update", async () => { + const sender = (await testUtils.createUser(nextEmail("sender")))! + const recipient = (await testUtils.createUser(nextEmail("recipient")))! + const { chat } = await testUtils.createPrivateChatWithOptionalDialog({ + userA: sender, + userB: recipient, + createDialogForUserA: true, + createDialogForUserB: true, + }) + + await db + .update(dialogs) + .set({ archived: true }) + .where(and(eq(dialogs.chatId, chat.id), eq(dialogs.userId, recipient.id))) + .execute() + + const peerId: InputPeer = { + type: { oneofKind: "chat" as const, chat: { chatId: BigInt(chat.id) } }, + } + const senderContext = testUtils.functionContext({ userId: sender.id, sessionId: 1 }) + + await sendMessage({ peerId, message: "hello" }, senderContext) + + const [updatedDialog] = await db + .select() + .from(dialogs) + .where(and(eq(dialogs.chatId, chat.id), eq(dialogs.userId, recipient.id))) + .limit(1) + + expect(updatedDialog?.archived).toBe(false) + + const userUpdates = await db.query.updates.findMany({ + where: { + bucket: UpdateBucket.User, + entityId: recipient.id, + }, + }) + + const hasDialogArchivedUpdate = userUpdates + .map((update) => UpdatesModel.decrypt(update)) + .some( + (update) => + update.payload.update.oneofKind === "userDialogArchived" && + update.payload.update.userDialogArchived.archived === false, + ) + + expect(hasDialogArchivedUpdate).toBe(true) + }) }) diff --git a/server/src/functions/messages.sendMessage.ts b/server/src/functions/messages.sendMessage.ts index 3217b0bc..28cd9fe5 100644 --- a/server/src/functions/messages.sendMessage.ts +++ b/server/src/functions/messages.sendMessage.ts @@ -38,6 +38,7 @@ import { AccessGuards } from "@in/server/modules/authorization/accessGuards" import { getCachedUserProfilePhotoUrl } from "@in/server/modules/cache/userPhotos" import { processAttachments } from "@in/server/db/models/messages" import { eq } from "drizzle-orm" +import { unarchiveIfNeeded } from "@in/server/modules/message/unarchiveIfNeeded" type Input = { peerId: InputPeer @@ -229,13 +230,25 @@ export const sendMessage = async (input: Input, context: FunctionContext): Promi // we can also separate the sequence caching. this will speed up and // remove the need to lock the chat row. then we should deliver the update // with sequence number so we can ensure gap-free delivery. - let { selfUpdates, updateGroup } = await pushUpdates({ + const updateGroup = await getUpdateGroupFromInputPeer(inputPeer, { currentUserId }) + const { updates: unarchiveUpdates } = await unarchiveIfNeeded({ + chat, + updateGroup, + senderUserId: currentUserId, + }) + + unarchiveUpdates.forEach(({ userId, update }) => { + RealtimeUpdates.pushToUser(userId, [update]) + }) + + let { selfUpdates } = await pushUpdates({ inputPeer, messageInfo, currentUserId, update, currentSessionId: context.currentSessionId, publishToSelfSession: currentUserLayer < 2 || hasAttachments, + updateGroup, }) // send notification @@ -328,6 +341,7 @@ const pushUpdates = async ({ update, publishToSelfSession, currentSessionId, + updateGroup, }: { inputPeer: InputPeer messageInfo: MessageInfo @@ -335,8 +349,9 @@ const pushUpdates = async ({ update: UpdateSeqAndDate currentSessionId: number publishToSelfSession: boolean + updateGroup?: UpdateGroup }): Promise<{ selfUpdates: Update[]; updateGroup: UpdateGroup }> => { - const updateGroup = await getUpdateGroupFromInputPeer(inputPeer, { currentUserId }) + const resolvedUpdateGroup = updateGroup ?? (await getUpdateGroupFromInputPeer(inputPeer, { currentUserId })) const skipSessionId = publishToSelfSession ? undefined : currentSessionId let messageIdUpdate: Update = { @@ -351,8 +366,8 @@ const pushUpdates = async ({ let selfUpdates: Update[] = [] - if (updateGroup.type === "dmUsers") { - updateGroup.userIds.forEach((userId) => { + if (resolvedUpdateGroup.type === "dmUsers") { + resolvedUpdateGroup.userIds.forEach((userId) => { const encodingForUserId = userId const encodingForInputPeer: InputPeer = userId === currentUserId ? inputPeer : { type: { oneofKind: "user", user: { userId: BigInt(currentUserId) } } } @@ -363,7 +378,7 @@ const pushUpdates = async ({ newMessage: { message: encodeMessageForUser({ messageInfo, - updateGroup, + updateGroup: resolvedUpdateGroup, inputPeer, currentUserId, targetUserId: userId, @@ -396,8 +411,8 @@ const pushUpdates = async ({ RealtimeUpdates.pushToUser(userId, [newMessageUpdate]) } }) - } else if (updateGroup.type === "threadUsers") { - updateGroup.userIds.forEach((userId) => { + } else if (resolvedUpdateGroup.type === "threadUsers") { + resolvedUpdateGroup.userIds.forEach((userId) => { // New updates let newMessageUpdate: Update = { update: { @@ -405,7 +420,7 @@ const pushUpdates = async ({ newMessage: { message: encodeMessageForUser({ messageInfo, - updateGroup, + updateGroup: resolvedUpdateGroup, inputPeer, currentUserId, targetUserId: userId, @@ -440,7 +455,7 @@ const pushUpdates = async ({ }) } - return { selfUpdates, updateGroup } + return { selfUpdates, updateGroup: resolvedUpdateGroup } } const buildAttachmentUpdates = async ({ diff --git a/server/src/modules/message/unarchiveIfNeeded.ts b/server/src/modules/message/unarchiveIfNeeded.ts new file mode 100644 index 00000000..23756cbd --- /dev/null +++ b/server/src/modules/message/unarchiveIfNeeded.ts @@ -0,0 +1,96 @@ +import type { Update } from "@in/protocol/core" +import type { ServerUpdate } from "@in/protocol/server" +import { db } from "@in/server/db" +import { dialogs } from "@in/server/db/schema" +import type { DbChat } from "@in/server/db/schema" +import type { UpdateGroup } from "@in/server/modules/updates" +import { UserBucketUpdates } from "@in/server/modules/updates/userBucketUpdates" +import { encodeOutputPeerFromChat } from "@in/server/realtime/encoders/encodePeer" +import { and, eq, inArray } from "drizzle-orm" + +type UnarchiveIfNeededInput = { + chat: DbChat + updateGroup: UpdateGroup + senderUserId: number +} + +type UnarchiveIfNeededOutput = { + updates: { userId: number; update: Update }[] +} + +export const unarchiveIfNeeded = async (input: UnarchiveIfNeededInput): Promise => { + const { chat, updateGroup, senderUserId } = input + const candidateUserIds = updateGroup.userIds.filter((userId) => userId !== senderUserId) + if (candidateUserIds.length === 0) { + return { updates: [] } + } + + const archivedRows = await db + .select({ userId: dialogs.userId }) + .from(dialogs) + .where( + and( + eq(dialogs.chatId, chat.id), + inArray(dialogs.userId, candidateUserIds), + eq(dialogs.archived, true), + ), + ) + + if (archivedRows.length === 0) { + return { updates: [] } + } + + const targets = archivedRows.map((row) => ({ + userId: row.userId, + peerId: encodeOutputPeerFromChat(chat, { currentUserId: row.userId }), + })) + + await db.transaction(async (tx) => { + await tx + .update(dialogs) + .set({ archived: false }) + .where( + and( + eq(dialogs.chatId, chat.id), + inArray( + dialogs.userId, + targets.map((target) => target.userId), + ), + eq(dialogs.archived, true), + ), + ) + + for (const target of targets) { + const userUpdate: ServerUpdate["update"] = { + oneofKind: "userDialogArchived", + userDialogArchived: { + peerId: target.peerId, + archived: false, + }, + } + + await UserBucketUpdates.enqueue( + { + userId: target.userId, + update: userUpdate, + }, + { tx }, + ) + } + }) + + const updates = targets.map((target) => ({ + userId: target.userId, + update: { + update: { + oneofKind: "dialogArchived", + dialogArchived: { + peerId: target.peerId, + archived: false, + }, + }, + }, + })) + + return { updates } +} diff --git a/server/src/modules/updates/sync.ts b/server/src/modules/updates/sync.ts index 1563ee93..bccfe7d7 100644 --- a/server/src/modules/updates/sync.ts +++ b/server/src/modules/updates/sync.ts @@ -303,6 +303,19 @@ function convertUserUpdate(decrypted: DecryptedUpdate, userId: number): Update | }, } + case "userDialogArchived": + return { + seq, + date, + update: { + oneofKind: "dialogArchived", + dialogArchived: { + peerId: payload.userDialogArchived.peerId, + archived: payload.userDialogArchived.archived, + }, + }, + } + default: log.warn("Unhandled user update", { type: payload.oneofKind }) return null diff --git a/web/packages/client/src/realtime/updates/apply-updates.ts b/web/packages/client/src/realtime/updates/apply-updates.ts index b80cb18d..52ab5d4c 100644 --- a/web/packages/client/src/realtime/updates/apply-updates.ts +++ b/web/packages/client/src/realtime/updates/apply-updates.ts @@ -19,11 +19,16 @@ const getPeerUserId = (peer: Peer | undefined) => { return toNumber(peer.type.user.userId) } -const updateDialogForPeer = (db: Db, peer: Peer | undefined, changes: { - unreadCount?: number - unreadMark?: boolean - readMaxId?: number -}) => { +const updateDialogForPeer = ( + db: Db, + peer: Peer | undefined, + changes: { + unreadCount?: number + unreadMark?: boolean + readMaxId?: number + archived?: boolean + }, +) => { if (!peer || peer.type.oneofKind === undefined) return const dialogId = peer.type.oneofKind === "chat" ? getPeerChatId(peer) : getPeerUserId(peer) if (dialogId == null) return @@ -37,6 +42,7 @@ const updateDialogForPeer = (db: Db, peer: Peer | undefined, changes: { unreadCount: changes.unreadCount ?? existing.unreadCount, unreadMark: changes.unreadMark ?? existing.unreadMark, readMaxId: changes.readMaxId ?? existing.readMaxId, + archived: changes.archived ?? existing.archived, }) } @@ -131,6 +137,12 @@ export const applyUpdates = (db: Db, updates: Update[]) => { }) break + case "dialogArchived": + updateDialogForPeer(db, update.update.dialogArchived.peerId, { + archived: update.update.dialogArchived.archived, + }) + break + case "participantAdd": case "participantDelete": case "messageAttachment": diff --git a/web/packages/protocol/src/core.ts b/web/packages/protocol/src/core.ts index a009e357..5ab6898a 100644 --- a/web/packages/protocol/src/core.ts +++ b/web/packages/protocol/src/core.ts @@ -2954,6 +2954,12 @@ export interface Update { * @generated from protobuf field: UpdateChatVisibility chat_visibility = 28; */ chatVisibility: UpdateChatVisibility; + } | { + oneofKind: "dialogArchived"; + /** + * @generated from protobuf field: UpdateDialogArchived dialog_archived = 29; + */ + dialogArchived: UpdateDialogArchived; } | { oneofKind: undefined; }; @@ -3174,6 +3180,25 @@ export interface UpdateMarkAsUnread { */ unreadMark: boolean; } +/** + * Update when a dialog is archived or unarchived + * + * @generated from protobuf message UpdateDialogArchived + */ +export interface UpdateDialogArchived { + /** + * Peer ID of the dialog that changed + * + * @generated from protobuf field: Peer peer_id = 1; + */ + peerId?: Peer; + /** + * Whether it's archived (true) or unarchived (false) + * + * @generated from protobuf field: bool archived = 2; + */ + archived: boolean; +} /** * Update when a new chat is created either in space or a private chat * @@ -10839,7 +10864,8 @@ class Update$Type extends MessageType { { no: 25, name: "chat_has_new_updates", kind: "message", oneof: "update", T: () => UpdateChatHasNewUpdates }, { no: 26, name: "space_has_new_updates", kind: "message", oneof: "update", T: () => UpdateSpaceHasNewUpdates }, { no: 27, name: "space_member_update", kind: "message", oneof: "update", T: () => UpdateSpaceMemberUpdate }, - { no: 28, name: "chat_visibility", kind: "message", oneof: "update", T: () => UpdateChatVisibility } + { no: 28, name: "chat_visibility", kind: "message", oneof: "update", T: () => UpdateChatVisibility }, + { no: 29, name: "dialog_archived", kind: "message", oneof: "update", T: () => UpdateDialogArchived } ]); } create(value?: PartialMessage): Update { @@ -11010,6 +11036,12 @@ class Update$Type extends MessageType { chatVisibility: UpdateChatVisibility.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).chatVisibility) }; break; + case /* UpdateDialogArchived dialog_archived */ 29: + message.update = { + oneofKind: "dialogArchived", + dialogArchived: UpdateDialogArchived.internalBinaryRead(reader, reader.uint32(), options, (message.update as any).dialogArchived) + }; + break; default: let u = options.readUnknownField; if (u === "throw") @@ -11103,6 +11135,9 @@ class Update$Type extends MessageType { /* UpdateChatVisibility chat_visibility = 28; */ if (message.update.oneofKind === "chatVisibility") UpdateChatVisibility.internalBinaryWrite(message.update.chatVisibility, writer.tag(28, WireType.LengthDelimited).fork(), options).join(); + /* UpdateDialogArchived dialog_archived = 29; */ + if (message.update.oneofKind === "dialogArchived") + UpdateDialogArchived.internalBinaryWrite(message.update.dialogArchived, writer.tag(29, WireType.LengthDelimited).fork(), options).join(); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -11756,6 +11791,60 @@ class UpdateMarkAsUnread$Type extends MessageType { */ export const UpdateMarkAsUnread = new UpdateMarkAsUnread$Type(); // @generated message type with reflection information, may provide speed optimized methods +class UpdateDialogArchived$Type extends MessageType { + constructor() { + super("UpdateDialogArchived", [ + { no: 1, name: "peer_id", kind: "message", T: () => Peer }, + { no: 2, name: "archived", kind: "scalar", T: 8 /*ScalarType.BOOL*/ } + ]); + } + create(value?: PartialMessage): UpdateDialogArchived { + const message = globalThis.Object.create((this.messagePrototype!)); + message.archived = false; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: UpdateDialogArchived): UpdateDialogArchived { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* Peer peer_id */ 1: + message.peerId = Peer.internalBinaryRead(reader, reader.uint32(), options, message.peerId); + break; + case /* bool archived */ 2: + message.archived = reader.bool(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: UpdateDialogArchived, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* Peer peer_id = 1; */ + if (message.peerId) + Peer.internalBinaryWrite(message.peerId, writer.tag(1, WireType.LengthDelimited).fork(), options).join(); + /* bool archived = 2; */ + if (message.archived !== false) + writer.tag(2, WireType.Varint).bool(message.archived); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message UpdateDialogArchived + */ +export const UpdateDialogArchived = new UpdateDialogArchived$Type(); +// @generated message type with reflection information, may provide speed optimized methods class UpdateNewChat$Type extends MessageType { constructor() { super("UpdateNewChat", [