Skip to content

Commit ec7999b

Browse files
feat(history-sync): emit messaging-history.set event on sync completion and fix race condition
Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database persistence, fixing a race condition where consumers received the event before data was queryable. Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%, allowing consumers to know exactly when history sync is complete and messages are available in the database. Register the new event across all transport types (Webhook, WebSocket, RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas.
1 parent cd800f2 commit ec7999b

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -989,12 +989,12 @@ export class BaileysStartupService extends ChannelStartupService {
989989
chatsRaw.push({ remoteJid: chat.id, instanceId: this.instanceId, name: chat.name });
990990
}
991991

992-
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
993-
994992
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
995993
await this.prismaRepository.chat.createMany({ data: chatsRaw, skipDuplicates: true });
996994
}
997995

996+
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
997+
998998
const messagesRaw: any[] = [];
999999

10001000
const messagesRepository: Set<string> = new Set(
@@ -1046,15 +1046,15 @@ export class BaileysStartupService extends ChannelStartupService {
10461046
messagesRaw.push(this.prepareMessage(m));
10471047
}
10481048

1049+
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
1050+
await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true });
1051+
}
1052+
10491053
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, {
10501054
isLatest,
10511055
progress,
10521056
});
10531057

1054-
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
1055-
await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true });
1056-
}
1057-
10581058
if (
10591059
this.configService.get<Chatwoot>('CHATWOOT').ENABLED &&
10601060
this.localChatwoot?.enabled &&
@@ -1071,6 +1071,14 @@ export class BaileysStartupService extends ChannelStartupService {
10711071
contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })),
10721072
);
10731073

1074+
if (progress === 100) {
1075+
this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
1076+
messageCount: messagesRaw.length,
1077+
chatCount: chatsRaw.length,
1078+
contactCount: contacts?.length ?? 0,
1079+
});
1080+
}
1081+
10741082
contacts = undefined;
10751083
messages = undefined;
10761084
chats = undefined;

src/api/integrations/event/event.controller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ export class EventController {
162162
'CALL',
163163
'TYPEBOT_START',
164164
'TYPEBOT_CHANGE_STATUS',
165+
'MESSAGING_HISTORY_SET',
165166
'REMOVE_INSTANCE',
166167
'LOGOUT_INSTANCE',
167168
'INSTANCE_CREATE',

src/config/env.config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export type EventsRabbitmq = {
9191
CALL: boolean;
9292
TYPEBOT_START: boolean;
9393
TYPEBOT_CHANGE_STATUS: boolean;
94+
MESSAGING_HISTORY_SET: boolean;
9495
};
9596

9697
export type Rabbitmq = {
@@ -150,6 +151,7 @@ export type Sqs = {
150151
SEND_MESSAGE: boolean;
151152
TYPEBOT_CHANGE_STATUS: boolean;
152153
TYPEBOT_START: boolean;
154+
MESSAGING_HISTORY_SET: boolean;
153155
};
154156
};
155157

@@ -223,6 +225,7 @@ export type EventsWebhook = {
223225
CALL: boolean;
224226
TYPEBOT_START: boolean;
225227
TYPEBOT_CHANGE_STATUS: boolean;
228+
MESSAGING_HISTORY_SET: boolean;
226229
ERRORS: boolean;
227230
ERRORS_WEBHOOK: string;
228231
};
@@ -256,6 +259,7 @@ export type EventsPusher = {
256259
CALL: boolean;
257260
TYPEBOT_START: boolean;
258261
TYPEBOT_CHANGE_STATUS: boolean;
262+
MESSAGING_HISTORY_SET: boolean;
259263
};
260264

261265
export type ApiKey = { KEY: string };
@@ -537,6 +541,7 @@ export class ConfigService {
537541
CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true',
538542
TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true',
539543
TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
544+
MESSAGING_HISTORY_SET: process.env?.RABBITMQ_EVENTS_MESSAGING_HISTORY_SET === 'true',
540545
},
541546
},
542547
NATS: {
@@ -574,6 +579,7 @@ export class ConfigService {
574579
CALL: process.env?.NATS_EVENTS_CALL === 'true',
575580
TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true',
576581
TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
582+
MESSAGING_HISTORY_SET: process.env?.NATS_EVENTS_MESSAGING_HISTORY_SET === 'true',
577583
},
578584
},
579585
SQS: {
@@ -614,6 +620,7 @@ export class ConfigService {
614620
SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true',
615621
TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true',
616622
TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true',
623+
MESSAGING_HISTORY_SET: process.env?.SQS_GLOBAL_MESSAGING_HISTORY_SET === 'true',
617624
},
618625
},
619626
KAFKA: {
@@ -657,6 +664,7 @@ export class ConfigService {
657664
CALL: process.env?.KAFKA_EVENTS_CALL === 'true',
658665
TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true',
659666
TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
667+
MESSAGING_HISTORY_SET: process.env?.KAFKA_EVENTS_MESSAGING_HISTORY_SET === 'true',
660668
},
661669
SASL:
662670
process.env?.KAFKA_SASL_ENABLED === 'true'
@@ -722,6 +730,7 @@ export class ConfigService {
722730
CALL: process.env?.PUSHER_EVENTS_CALL === 'true',
723731
TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true',
724732
TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
733+
MESSAGING_HISTORY_SET: process.env?.PUSHER_EVENTS_MESSAGING_HISTORY_SET === 'true',
725734
},
726735
},
727736
WA_BUSINESS: {
@@ -779,6 +788,7 @@ export class ConfigService {
779788
CALL: process.env?.WEBHOOK_EVENTS_CALL === 'true',
780789
TYPEBOT_START: process.env?.WEBHOOK_EVENTS_TYPEBOT_START === 'true',
781790
TYPEBOT_CHANGE_STATUS: process.env?.WEBHOOK_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
791+
MESSAGING_HISTORY_SET: process.env?.WEBHOOK_EVENTS_MESSAGING_HISTORY_SET === 'true',
782792
ERRORS: process.env?.WEBHOOK_EVENTS_ERRORS === 'true',
783793
ERRORS_WEBHOOK: process.env?.WEBHOOK_EVENTS_ERRORS_WEBHOOK || '',
784794
},

src/validate/instance.schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export const instanceSchema: JSONSchema7 = {
8686
'CALL',
8787
'TYPEBOT_START',
8888
'TYPEBOT_CHANGE_STATUS',
89+
'MESSAGING_HISTORY_SET',
8990
],
9091
},
9192
},
@@ -123,6 +124,7 @@ export const instanceSchema: JSONSchema7 = {
123124
'CALL',
124125
'TYPEBOT_START',
125126
'TYPEBOT_CHANGE_STATUS',
127+
'MESSAGING_HISTORY_SET',
126128
],
127129
},
128130
},
@@ -160,6 +162,7 @@ export const instanceSchema: JSONSchema7 = {
160162
'CALL',
161163
'TYPEBOT_START',
162164
'TYPEBOT_CHANGE_STATUS',
165+
'MESSAGING_HISTORY_SET',
163166
],
164167
},
165168
},
@@ -197,6 +200,7 @@ export const instanceSchema: JSONSchema7 = {
197200
'CALL',
198201
'TYPEBOT_START',
199202
'TYPEBOT_CHANGE_STATUS',
203+
'MESSAGING_HISTORY_SET',
200204
],
201205
},
202206
},

0 commit comments

Comments
 (0)