diff --git a/packages/apps/src/plugins/http/stream.spec.ts b/packages/apps/src/plugins/http/stream.spec.ts new file mode 100644 index 000000000..4ead48637 --- /dev/null +++ b/packages/apps/src/plugins/http/stream.spec.ts @@ -0,0 +1,161 @@ +import { HttpStream } from './stream'; + +describe('HttpStream', () => { + let client: any; + let ref: any; + let logger: any; + + beforeEach(() => { + client = { + conversations: { + activities: jest.fn().mockReturnValue({ + create: jest.fn(), + update: jest.fn(), + }), + }, + }; + + ref = { + bot: { id: 'bot', name: 'Bot' }, + conversation: { id: 'conversation-id' }, + }; + + logger = { + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + child: () => logger, + }; + }); + + jest.useFakeTimers(); + + function mockCreate(successAfter = 0) { + let calls = 0; + client.conversations.activities().create.mockImplementation( + async (_activity: any) => { + calls++; + if (calls <= successAfter) { + throw new Error('timeout'); + } + + return { _activity, id: `activity-${calls}` }; + } + ); + return () => calls; + } + + test('stream multiple emits with timer', async () => { + const stream = new HttpStream(client, ref, logger); + mockCreate(); + + for (let i = 0; i < 12; i++) { + stream.emit(`Message ${i + 1}`); + } + + // Initial emit triggers immediate flush + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(200); + // next flush will be after 500ms, so no new calls yet + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + stream.emit('Message 13'); + + await jest.advanceTimersByTimeAsync(300); + // 500ms passed since first emit, second flush should happen + expect(client.conversations.activities().create).toHaveBeenCalledTimes(2); + stream.emit('Message 14'); + + await jest.advanceTimersByTimeAsync(500); + // another 500ms passed, third flush should happen + expect(client.conversations.activities().create).toHaveBeenCalledTimes(3); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].text).toBe('Message 1'); + expect(calls[1][0].text).toBe('Message 1Message 2Message 3Message 4Message 5Message 6Message 7Message 8Message 9Message 10Message 11'); + expect(calls[2][0].text).toBe('Message 1Message 2Message 3Message 4Message 5Message 6Message 7Message 8Message 9Message 10Message 11Message 12Message 13Message 14'); + }); + + + test('stream error handled gracefully', async () => { + mockCreate(1); + const stream = new HttpStream(client, ref, logger); + + stream.emit('Test message'); + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + + // retry after 500ms + await jest.advanceTimersByTimeAsync(500); + + expect(client.conversations.activities().create).toHaveBeenCalledTimes(2); + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].text).toBe('Test message'); + expect(calls[1][0].text).toBe('Test message'); + const res = await stream.close(); + expect(res).toBeDefined(); + }); + + test('update sends typing activity', async () => { + + const stream = new HttpStream(client, ref, logger); + + stream.update('Thinking...'); + + // resolve promise microtask queue + await jest.runAllTicks(); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].type).toBe('typing'); + expect(calls[0][0].text).toBe('Thinking...'); + expect(calls[0][0].channelData?.streamType).toBe('informative'); + expect(stream['index']).toBe(0); + }); + + test('stream all timeouts fail handled gracefully', async () => { + const getCallCount = mockCreate(10); + + const stream = new HttpStream(client, ref, logger); + + stream.emit('Test message with all timeouts'); + + // run all timers to exhaust retries + await jest.runAllTimersAsync(); + expect(getCallCount()).toBe(5); + + const res = await stream.close(); + expect(res).toBeUndefined(); + }); + + test('sequence of update and emit', async () => { + + const stream = new HttpStream(client, ref, logger); + + stream.update('Preparing...'); + stream.emit('Final message'); + + await jest.advanceTimersByTimeAsync(500); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls.length).toBe(2); + expect(calls[0][0].type).toBe('typing'); + expect(calls[1][0].text).toContain('Final message'); + + }); + + test('close times out if queue never flushes and id not set', async () => { + const stream = new HttpStream(client, ref, logger); + + stream.emit('Message that will not flush'); + + // promise not resolved yet, so no id set + const res = stream.close(); + + // Fast-forward timers to trigger timeout + await jest.runAllTimersAsync(); + expect(logger.warn).toHaveBeenCalledWith( + 'Timeout while waiting for id and queue to flush' + ); + const result = await res; + expect(result).toBeUndefined(); + }); +}); diff --git a/packages/apps/src/plugins/http/stream.ts b/packages/apps/src/plugins/http/stream.ts index bc8373c04..dd6d507b8 100644 --- a/packages/apps/src/plugins/http/stream.ts +++ b/packages/apps/src/plugins/http/stream.ts @@ -16,6 +16,21 @@ import { ConsoleLogger, EventEmitter, ILogger } from '@microsoft/teams.common'; import { IStreamer, IStreamerEvents } from '../../types'; import { promises } from '../../utils'; +/** + * HTTP-based streaming implementation for Microsoft Teams activities. + * + * Allows sending typing indicators and messages in chunks to Teams. + * Queues incoming activities and flushes them periodically to avoid + * rate limits. + * + * Flow: + * 1. `emit()` adds activities to the queue and starts a flush if none scheduled. + * 2. `_flush()` starts by cancelling any pending flush, then processes up to 10 queued activities under a lock. + * 3. Informative typing updates are sent immediately. + * 4. Message text is combined and sent as a typing activity. + * 5. `_flush()` schedules another flush if more items remain in queue. + * 6. `close()` waits for the queue to empty and sends the final message activity. + */ export class HttpStream implements IStreamer { readonly events = new EventEmitter(); @@ -33,6 +48,7 @@ export class HttpStream implements IStreamer { private _timeout?: NodeJS.Timeout; private _logger: ILogger; private _flushing: boolean = false; + private readonly _totalTimeout = 30000; // 30 seconds constructor(client: Client, ref: ConversationReference, logger?: ILogger) { this.client = client; @@ -40,11 +56,11 @@ export class HttpStream implements IStreamer { this._logger = logger?.child('stream') || new ConsoleLogger('@teams/http/stream'); } + /** + * Emit a new activity or text to the stream. + * @param activity Activity object or string message. + */ emit(activity: Partial | string) { - if (this._timeout) { - clearTimeout(this._timeout); - this._timeout = undefined; - } if (typeof activity === 'string') { activity = { @@ -54,9 +70,17 @@ export class HttpStream implements IStreamer { } this.queue.push(activity); - this._timeout = setTimeout(this.flush.bind(this), 500); + + // Start flush if not already scheduled + if (!this._timeout) { + this.flush(); + } } + /** + * Send a typing/status update without adding to the main text. + * @param text Status text (ex. "Thinking...") + */ update(text: string) { this.emit({ type: 'typing', @@ -65,6 +89,10 @@ export class HttpStream implements IStreamer { }); } + /** + * Close the stream by sending the final message. + * Waits for all queued activities to flush. + */ async close() { if (!this.index && !this.queue.length && !this._flushing) { this._logger.debug('closed with no content'); @@ -76,8 +104,16 @@ export class HttpStream implements IStreamer { return this._result; } - while (!this.id || this.queue.length) { - await new Promise((resolve) => setTimeout(resolve, 200)); + // Wait until all queued activities are flushed + const start = Date.now(); + + while (this.queue.length || !this.id) { + if (Date.now() - start > this._totalTimeout) { + this._logger.warn('Timeout while waiting for id and queue to flush'); + return; + } + this._logger.debug('waiting for id to be set or queue to be empty'); + await new Promise((resolve) => setTimeout(resolve, 100)); } if (this.text === '' && !this.attachments.length) { @@ -85,6 +121,7 @@ export class HttpStream implements IStreamer { return; } + // Build final message activity const activity = new MessageActivity(this.text) .withId(this.id) .addAttachments(...this.attachments) @@ -98,6 +135,7 @@ export class HttpStream implements IStreamer { this.events.emit('close', res); + // Reset internal state this.index = 0; this.id = undefined; this.text = ''; @@ -109,6 +147,10 @@ export class HttpStream implements IStreamer { return res; } + /** + * Flush queued activities. + * Processes up to 10 items at a time. + */ protected async flush() { // if locked or no queue, return early if (!this.queue.length || this._flushing) return; @@ -170,14 +212,21 @@ export class HttpStream implements IStreamer { await this.pushStreamChunk(activity); } + // Schedule another flush if queue is not empty if (this.queue.length) { this._timeout = setTimeout(this.flush.bind(this), 500); } + } catch (err) { + this._logger.error(err, 'flush failed'); } finally { this._flushing = false; } } + /** + * Push a new chunk to the stream. + * @param activity TypingActivity to send. + */ protected async pushStreamChunk(activity: TypingActivity) { if (this.id) { activity.id = this.id; @@ -194,6 +243,10 @@ export class HttpStream implements IStreamer { } } + /** + * Send or update a streaming activity + * @param activity ActivityParams to send. + */ protected async send(activity: ActivityParams) { activity = { ...activity, diff --git a/packages/apps/src/utils/promises/retry.ts b/packages/apps/src/utils/promises/retry.ts index 50a6a3887..be8aeaf2a 100644 --- a/packages/apps/src/utils/promises/retry.ts +++ b/packages/apps/src/utils/promises/retry.ts @@ -2,7 +2,7 @@ import { ILogger } from '@microsoft/teams.common'; export type RetryOptions = { /** - * the max number of retry attempts + * the max number of attempts * @default 5 */ readonly max?: number; @@ -27,7 +27,7 @@ export async function retry(factory: () => Promise, options?: RetryO try { return await factory(); } catch (err) { - if (max > 0) { + if (max > 1) { log?.debug(`delaying ${delay}ms...`); await new Promise((resolve) => setTimeout(resolve, delay)); log?.debug('retrying...');