WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 9f73042

Browse files
committed
Enhance disposal handling in Dispatcher & CapProcessingServer Introduce `_disposed` flag in `Dispatcher` to track disposal state. Update `Start` and `Dispose` methods to manage `CancellationTokenSource` lifecycle, allowing safe restart after disposal. Modify `CapProcessingServer` to allow reassignment of `_cts` and implement similar disposal and recreation logic. Ensure proper resource management and restart capability.
1 parent 92b9b41 commit 9f73042

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class Dispatcher : IDispatcher
3131
private CancellationTokenSource? _tasksCts;
3232
private Channel<MediumMessage> _publishedChannel = default!;
3333
private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> _receivedChannel = default!;
34+
private bool _disposed;
3435

3536
public Dispatcher(ILogger<Dispatcher> logger, IMessageSender sender, IOptions<CapOptions> options,
3637
ISubscribeExecutor executor, IDataStorage storage)
@@ -47,6 +48,14 @@ public Dispatcher(ILogger<Dispatcher> logger, IMessageSender sender, IOptions<Ca
4748

4849
public async Task Start(CancellationToken stoppingToken)
4950
{
51+
// If already disposed and restarting, recreate the CancellationTokenSource and reset state
52+
if (_disposed || (_tasksCts != null && _tasksCts.IsCancellationRequested))
53+
{
54+
_tasksCts?.Dispose();
55+
_tasksCts = null;
56+
_disposed = false;
57+
}
58+
5059
stoppingToken.ThrowIfCancellationRequested();
5160
_tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None);
5261

@@ -226,6 +235,8 @@ public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorD
226235

227236
public void Dispose()
228237
{
238+
if (_disposed) return;
239+
_disposed = true;
229240
_tasksCts?.Dispose();
230241
GC.SuppressFinalize(this);
231242
}

src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Processor;
1414

1515
public class CapProcessingServer : IProcessingServer
1616
{
17-
private readonly CancellationTokenSource _cts;
17+
private CancellationTokenSource _cts;
1818
private readonly ILogger _logger;
1919
private readonly ILoggerFactory _loggerFactory;
2020
private readonly IServiceProvider _provider;
@@ -36,6 +36,14 @@ public CapProcessingServer(
3636

3737
public Task Start(CancellationToken stoppingToken)
3838
{
39+
// If already disposed and restarting, recreate the CancellationTokenSource
40+
if (_disposed || _cts.IsCancellationRequested)
41+
{
42+
_cts?.Dispose();
43+
_cts = new CancellationTokenSource();
44+
_disposed = false;
45+
}
46+
3947
stoppingToken.Register(() => _cts.Cancel());
4048

4149
_logger.ServerStarting();

0 commit comments

Comments
 (0)