From 37fbbe600cb65ae9c5d5c7dfafdc55bf275da349 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 16 Oct 2025 15:01:01 -0700 Subject: [PATCH 1/5] fix streaming:no reset of timeout for each chunk --- packages/apps/src/plugins/http/stream.ts | 53 +++++++++++++++++++++--- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/packages/apps/src/plugins/http/stream.ts b/packages/apps/src/plugins/http/stream.ts index bc8373c04..d531b4fd2 100644 --- a/packages/apps/src/plugins/http/stream.ts +++ b/packages/apps/src/plugins/http/stream.ts @@ -16,6 +16,21 @@ import { ConsoleLogger, EventEmitter, ILogger } from '@microsoft/teams.common'; import { IStreamer, IStreamerEvents } from '../../types'; import { promises } from '../../utils'; +/** + * HTTP-based streaming implementation for Microsoft Teams activities. + * + * Allows sending typing indicators and messages in chunks to Teams. + * Queues incoming activities and flushes them periodically to avoid + * rate limits. + * + * Flow: + * 1. `emit()` adds activities to the queue and starts a flush if none scheduled. + * 2. `_flush()` starts by cancelling any pending flush, then processes up to 10 queued activities under a lock. + * 3. Informative typing updates are sent immediately. + * 4. Message text is combined and sent as a typing activity. + * 5. `_flush()` schedules another flush if more items remain in queue. + * 6. `close()` waits for the queue to empty and sends the final message activity. + */ export class HttpStream implements IStreamer { readonly events = new EventEmitter(); @@ -40,11 +55,11 @@ export class HttpStream implements IStreamer { this._logger = logger?.child('stream') || new ConsoleLogger('@teams/http/stream'); } + /** + * Emit a new activity or text to the stream. + * @param activity Activity object or string message. + */ emit(activity: Partial | string) { - if (this._timeout) { - clearTimeout(this._timeout); - this._timeout = undefined; - } if (typeof activity === 'string') { activity = { @@ -54,9 +69,17 @@ export class HttpStream implements IStreamer { } this.queue.push(activity); - this._timeout = setTimeout(this.flush.bind(this), 500); + + // Start flush if not already scheduled + if (!this._timeout) { + this.flush(); + } } + /** + * Send a typing/status update without adding to the main text. + * @param text Status text (ex. "Thinking...") + */ update(text: string) { this.emit({ type: 'typing', @@ -65,6 +88,10 @@ export class HttpStream implements IStreamer { }); } + /** + * Close the stream by sending the final message. + * Waits for all queued activities to flush. + */ async close() { if (!this.index && !this.queue.length && !this._flushing) { this._logger.debug('closed with no content'); @@ -76,6 +103,7 @@ export class HttpStream implements IStreamer { return this._result; } + // Wait until all queued activities are flushed while (!this.id || this.queue.length) { await new Promise((resolve) => setTimeout(resolve, 200)); } @@ -85,6 +113,7 @@ export class HttpStream implements IStreamer { return; } + // Build final message activity const activity = new MessageActivity(this.text) .withId(this.id) .addAttachments(...this.attachments) @@ -98,6 +127,7 @@ export class HttpStream implements IStreamer { this.events.emit('close', res); + // Reset internal state this.index = 0; this.id = undefined; this.text = ''; @@ -109,6 +139,10 @@ export class HttpStream implements IStreamer { return res; } + /** + * Flush queued activities. + * Processes up to 10 items at a time. + */ protected async flush() { // if locked or no queue, return early if (!this.queue.length || this._flushing) return; @@ -170,6 +204,7 @@ export class HttpStream implements IStreamer { await this.pushStreamChunk(activity); } + // Schedule another flush if queue is not empty if (this.queue.length) { this._timeout = setTimeout(this.flush.bind(this), 500); } @@ -178,6 +213,10 @@ export class HttpStream implements IStreamer { } } + /** + * Push a new chunk to the stream. + * @param activity TypingActivity to send. + */ protected async pushStreamChunk(activity: TypingActivity) { if (this.id) { activity.id = this.id; @@ -194,6 +233,10 @@ export class HttpStream implements IStreamer { } } + /** + * Send or update a streaming activity + * @param activity ActivityParams to send. + */ protected async send(activity: ActivityParams) { activity = { ...activity, From 38d3ee297e54cf331e3974b377a05ad83bf70478 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Wed, 17 Dec 2025 11:09:12 -0800 Subject: [PATCH 2/5] tests for streaming --- packages/apps/src/plugins/http/stream.spec.ts | 111 ++++++++++++++++++ packages/apps/src/plugins/http/stream.ts | 9 +- 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 packages/apps/src/plugins/http/stream.spec.ts diff --git a/packages/apps/src/plugins/http/stream.spec.ts b/packages/apps/src/plugins/http/stream.spec.ts new file mode 100644 index 000000000..bb9087dcd --- /dev/null +++ b/packages/apps/src/plugins/http/stream.spec.ts @@ -0,0 +1,111 @@ +import { HttpStream } from './stream'; + +describe('HttpStream', () => { + let client: any; + let ref: any; + let logger: any; + + beforeEach(() => { + client = { + conversations: { + activities: jest.fn().mockReturnValue({ + create: jest.fn(), + update: jest.fn(), + }), + }, + }; + + ref = { + bot: { id: 'bot', name: 'Bot' }, + conversation: { id: 'conversation-id' }, + }; + + logger = { + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + child: () => logger, + }; + }); + + jest.useFakeTimers(); + + function mockCreate(successAfter = 0) { + let calls = 0; + client.conversations.activities().create.mockImplementation( + async (_activity: any) => { + calls++; + if (calls <= successAfter) { + throw new Error('timeout'); + } + return { id: `activity-${calls}` }; + } + ); + return () => calls; + } + + test('stream multiple emits with timer', async () => { + const stream = new HttpStream(client, ref, logger); + mockCreate(); + console.log('Starting test stream multiple emits with timer'); + + for (let i = 0; i < 12; i++) { + stream.emit(`Message ${i + 1}`); + } + + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + await jest.runAllTimersAsync(); + expect(client.conversations.activities().create.mock.calls.length).toBeGreaterThanOrEqual(2); + }); + + test('stream error handled gracefully', async () => { + mockCreate(1); + const stream = new HttpStream(client, ref, logger); + + stream.emit('Test message'); + await jest.runAllTimersAsync(); + + expect(client.conversations.activities().create).toHaveBeenCalledTimes(2); + const res = await stream.close(); + expect(res).toBeDefined(); + }); + + test('update sends typing activity', async () => { + const sent: any[] = []; + client.conversations.activities().create.mockImplementation( + async (activity: any) => { + sent.push(activity); + return { id: `activity-${sent.length}` }; + } + ); + + const stream = new HttpStream(client, ref, logger); + + stream.update('Thinking...'); + + expect(sent[0].type).toBe('typing'); + expect(sent[0].text).toBe('Thinking...'); + expect(sent[0].channelData?.streamType).toBe('informative'); + expect(stream['index']).toBeGreaterThanOrEqual(0); + }); + + test('sequence of update and emit', async () => { + const sent: any[] = []; + client.conversations.activities().create.mockImplementation( + async (activity: any) => { + sent.push(activity); + return { id: `activity-${sent.length}` }; + } + ); + + const stream = new HttpStream(client, ref, logger); + + stream.update('Preparing...'); + stream.emit('Final message'); + + await jest.runAllTimersAsync(); + expect(sent[0].type).toBe('typing'); + expect(sent[1].text).toContain('Final message'); + + }); +}); diff --git a/packages/apps/src/plugins/http/stream.ts b/packages/apps/src/plugins/http/stream.ts index d531b4fd2..4e367044a 100644 --- a/packages/apps/src/plugins/http/stream.ts +++ b/packages/apps/src/plugins/http/stream.ts @@ -48,6 +48,7 @@ export class HttpStream implements IStreamer { private _timeout?: NodeJS.Timeout; private _logger: ILogger; private _flushing: boolean = false; + private readonly _totalTimeout = 30000; // 30 seconds constructor(client: Client, ref: ConversationReference, logger?: ILogger) { this.client = client; @@ -104,8 +105,14 @@ export class HttpStream implements IStreamer { } // Wait until all queued activities are flushed + const start = Date.now(); + while (!this.id || this.queue.length) { - await new Promise((resolve) => setTimeout(resolve, 200)); + if (Date.now() - start > this._totalTimeout) { + this._logger.warn('Timeout while waiting for id and queue to flush'); + } + this._logger.debug('waiting for id to be set or queue to be empty'); + await new Promise((resolve) => setTimeout(resolve, 100)); } if (this.text === '' && !this.attachments.length) { From aa17054695c0bafdc05385ad833f7048c6d2bb04 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 18 Dec 2025 08:28:41 -0800 Subject: [PATCH 3/5] add all retries failed test --- packages/apps/src/plugins/http/stream.spec.ts | 21 +++++++++++++++++++ packages/apps/src/plugins/http/stream.ts | 4 +++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/apps/src/plugins/http/stream.spec.ts b/packages/apps/src/plugins/http/stream.spec.ts index bb9087dcd..3225ad1b1 100644 --- a/packages/apps/src/plugins/http/stream.spec.ts +++ b/packages/apps/src/plugins/http/stream.spec.ts @@ -89,6 +89,27 @@ describe('HttpStream', () => { expect(stream['index']).toBeGreaterThanOrEqual(0); }); + test('stream all timeouts fail handled gracefully', async () => { + const unhandled = jest.fn(); + process.on('unhandledRejection', unhandled); + let callCount = 0; + + client.conversations.activities().create.mockImplementation( + async () => { + callCount++; + throw new Error('All operations timed out'); + } + ); + + const stream = new HttpStream(client, ref, logger); + + stream.emit('Test message with all timeouts'); + + await jest.runAllTimersAsync(); + expect(callCount).toBe(6); + process.off('unhandledRejection', unhandled); + }); + test('sequence of update and emit', async () => { const sent: any[] = []; client.conversations.activities().create.mockImplementation( diff --git a/packages/apps/src/plugins/http/stream.ts b/packages/apps/src/plugins/http/stream.ts index 4e367044a..caddf8e3d 100644 --- a/packages/apps/src/plugins/http/stream.ts +++ b/packages/apps/src/plugins/http/stream.ts @@ -107,7 +107,7 @@ export class HttpStream implements IStreamer { // Wait until all queued activities are flushed const start = Date.now(); - while (!this.id || this.queue.length) { + while (this.queue.length || !this.id) { if (Date.now() - start > this._totalTimeout) { this._logger.warn('Timeout while waiting for id and queue to flush'); } @@ -215,6 +215,8 @@ export class HttpStream implements IStreamer { if (this.queue.length) { this._timeout = setTimeout(this.flush.bind(this), 500); } + } catch (err) { + this._logger.error(err, 'flush failed'); } finally { this._flushing = false; } From aa270ff2bc251dc697e61fdec512dfee12db7218 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 18 Dec 2025 08:32:58 -0800 Subject: [PATCH 4/5] tests nit --- packages/apps/src/plugins/http/stream.spec.ts | 5 +---- packages/apps/src/utils/promises/retry.ts | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/apps/src/plugins/http/stream.spec.ts b/packages/apps/src/plugins/http/stream.spec.ts index 3225ad1b1..c14d7c5a0 100644 --- a/packages/apps/src/plugins/http/stream.spec.ts +++ b/packages/apps/src/plugins/http/stream.spec.ts @@ -90,8 +90,6 @@ describe('HttpStream', () => { }); test('stream all timeouts fail handled gracefully', async () => { - const unhandled = jest.fn(); - process.on('unhandledRejection', unhandled); let callCount = 0; client.conversations.activities().create.mockImplementation( @@ -106,8 +104,7 @@ describe('HttpStream', () => { stream.emit('Test message with all timeouts'); await jest.runAllTimersAsync(); - expect(callCount).toBe(6); - process.off('unhandledRejection', unhandled); + expect(callCount).toBe(5); }); test('sequence of update and emit', async () => { diff --git a/packages/apps/src/utils/promises/retry.ts b/packages/apps/src/utils/promises/retry.ts index 50a6a3887..be8aeaf2a 100644 --- a/packages/apps/src/utils/promises/retry.ts +++ b/packages/apps/src/utils/promises/retry.ts @@ -2,7 +2,7 @@ import { ILogger } from '@microsoft/teams.common'; export type RetryOptions = { /** - * the max number of retry attempts + * the max number of attempts * @default 5 */ readonly max?: number; @@ -27,7 +27,7 @@ export async function retry(factory: () => Promise, options?: RetryO try { return await factory(); } catch (err) { - if (max > 0) { + if (max > 1) { log?.debug(`delaying ${delay}ms...`); await new Promise((resolve) => setTimeout(resolve, delay)); log?.debug('retrying...'); From 3948cc03a80ed306bff94639b434e80c0d54b9c7 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Mon, 22 Dec 2025 13:42:14 -0800 Subject: [PATCH 5/5] rewrite some tests --- packages/apps/src/plugins/http/stream.spec.ts | 102 ++++++++++++------ packages/apps/src/plugins/http/stream.ts | 1 + 2 files changed, 68 insertions(+), 35 deletions(-) diff --git a/packages/apps/src/plugins/http/stream.spec.ts b/packages/apps/src/plugins/http/stream.spec.ts index c14d7c5a0..4ead48637 100644 --- a/packages/apps/src/plugins/http/stream.spec.ts +++ b/packages/apps/src/plugins/http/stream.spec.ts @@ -38,7 +38,8 @@ describe('HttpStream', () => { if (calls <= successAfter) { throw new Error('timeout'); } - return { id: `activity-${calls}` }; + + return { _activity, id: `activity-${calls}` }; } ); return () => calls; @@ -47,83 +48,114 @@ describe('HttpStream', () => { test('stream multiple emits with timer', async () => { const stream = new HttpStream(client, ref, logger); mockCreate(); - console.log('Starting test stream multiple emits with timer'); for (let i = 0; i < 12; i++) { stream.emit(`Message ${i + 1}`); } + // Initial emit triggers immediate flush expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); - await jest.runAllTimersAsync(); - expect(client.conversations.activities().create.mock.calls.length).toBeGreaterThanOrEqual(2); + + await jest.advanceTimersByTimeAsync(200); + // next flush will be after 500ms, so no new calls yet + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + stream.emit('Message 13'); + + await jest.advanceTimersByTimeAsync(300); + // 500ms passed since first emit, second flush should happen + expect(client.conversations.activities().create).toHaveBeenCalledTimes(2); + stream.emit('Message 14'); + + await jest.advanceTimersByTimeAsync(500); + // another 500ms passed, third flush should happen + expect(client.conversations.activities().create).toHaveBeenCalledTimes(3); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].text).toBe('Message 1'); + expect(calls[1][0].text).toBe('Message 1Message 2Message 3Message 4Message 5Message 6Message 7Message 8Message 9Message 10Message 11'); + expect(calls[2][0].text).toBe('Message 1Message 2Message 3Message 4Message 5Message 6Message 7Message 8Message 9Message 10Message 11Message 12Message 13Message 14'); }); + test('stream error handled gracefully', async () => { mockCreate(1); const stream = new HttpStream(client, ref, logger); stream.emit('Test message'); - await jest.runAllTimersAsync(); + expect(client.conversations.activities().create).toHaveBeenCalledTimes(1); + + // retry after 500ms + await jest.advanceTimersByTimeAsync(500); expect(client.conversations.activities().create).toHaveBeenCalledTimes(2); + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].text).toBe('Test message'); + expect(calls[1][0].text).toBe('Test message'); const res = await stream.close(); expect(res).toBeDefined(); }); test('update sends typing activity', async () => { - const sent: any[] = []; - client.conversations.activities().create.mockImplementation( - async (activity: any) => { - sent.push(activity); - return { id: `activity-${sent.length}` }; - } - ); const stream = new HttpStream(client, ref, logger); stream.update('Thinking...'); - expect(sent[0].type).toBe('typing'); - expect(sent[0].text).toBe('Thinking...'); - expect(sent[0].channelData?.streamType).toBe('informative'); - expect(stream['index']).toBeGreaterThanOrEqual(0); + // resolve promise microtask queue + await jest.runAllTicks(); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls[0][0].type).toBe('typing'); + expect(calls[0][0].text).toBe('Thinking...'); + expect(calls[0][0].channelData?.streamType).toBe('informative'); + expect(stream['index']).toBe(0); }); test('stream all timeouts fail handled gracefully', async () => { - let callCount = 0; - - client.conversations.activities().create.mockImplementation( - async () => { - callCount++; - throw new Error('All operations timed out'); - } - ); + const getCallCount = mockCreate(10); const stream = new HttpStream(client, ref, logger); stream.emit('Test message with all timeouts'); + // run all timers to exhaust retries await jest.runAllTimersAsync(); - expect(callCount).toBe(5); + expect(getCallCount()).toBe(5); + + const res = await stream.close(); + expect(res).toBeUndefined(); }); test('sequence of update and emit', async () => { - const sent: any[] = []; - client.conversations.activities().create.mockImplementation( - async (activity: any) => { - sent.push(activity); - return { id: `activity-${sent.length}` }; - } - ); const stream = new HttpStream(client, ref, logger); stream.update('Preparing...'); stream.emit('Final message'); - await jest.runAllTimersAsync(); - expect(sent[0].type).toBe('typing'); - expect(sent[1].text).toContain('Final message'); + await jest.advanceTimersByTimeAsync(500); + + const calls = client.conversations.activities().create.mock.calls; + expect(calls.length).toBe(2); + expect(calls[0][0].type).toBe('typing'); + expect(calls[1][0].text).toContain('Final message'); }); + + test('close times out if queue never flushes and id not set', async () => { + const stream = new HttpStream(client, ref, logger); + + stream.emit('Message that will not flush'); + + // promise not resolved yet, so no id set + const res = stream.close(); + + // Fast-forward timers to trigger timeout + await jest.runAllTimersAsync(); + expect(logger.warn).toHaveBeenCalledWith( + 'Timeout while waiting for id and queue to flush' + ); + const result = await res; + expect(result).toBeUndefined(); + }); }); diff --git a/packages/apps/src/plugins/http/stream.ts b/packages/apps/src/plugins/http/stream.ts index caddf8e3d..dd6d507b8 100644 --- a/packages/apps/src/plugins/http/stream.ts +++ b/packages/apps/src/plugins/http/stream.ts @@ -110,6 +110,7 @@ export class HttpStream implements IStreamer { while (this.queue.length || !this.id) { if (Date.now() - start > this._totalTimeout) { this._logger.warn('Timeout while waiting for id and queue to flush'); + return; } this._logger.debug('waiting for id to be set or queue to be empty'); await new Promise((resolve) => setTimeout(resolve, 100));