diff --git a/httpclient5-sse/src/main/java/org/apache/hc/client5/http/sse/impl/DefaultEventSource.java b/httpclient5-sse/src/main/java/org/apache/hc/client5/http/sse/impl/DefaultEventSource.java
index 0211212642..41169a7c3a 100644
--- a/httpclient5-sse/src/main/java/org/apache/hc/client5/http/sse/impl/DefaultEventSource.java
+++ b/httpclient5-sse/src/main/java/org/apache/hc/client5/http/sse/impl/DefaultEventSource.java
@@ -183,11 +183,10 @@ public DefaultEventSource(final CloseableHttpAsyncClient client,
if (scheduler != null) {
this.scheduler = scheduler;
- this.ownScheduler = scheduler != SHARED_SCHED;
} else {
this.scheduler = SHARED_SCHED;
- this.ownScheduler = false;
}
+ this.ownScheduler = false;
this.callbackExecutor = callbackExecutor != null ? callbackExecutor : Runnable::run;
@@ -352,6 +351,11 @@ public void completed(final Void v) {
@Override
public void failed(final Exception ex) {
connected.set(false);
+ if (ex instanceof SseResponseConsumer.StopReconnectException) {
+ dispatch(() -> listener.onFailure(ex, false));
+ notifyClosedOnce();
+ return;
+ }
if (cancelled.get() || isBenignCancel(ex)) {
notifyClosedOnce();
return;
diff --git a/httpclient5-sse/src/test/java/org/apache/hc/client5/http/sse/impl/DefaultEventSourceTest.java b/httpclient5-sse/src/test/java/org/apache/hc/client5/http/sse/impl/DefaultEventSourceTest.java
new file mode 100644
index 0000000000..894d6d35e9
--- /dev/null
+++ b/httpclient5-sse/src/test/java/org/apache/hc/client5/http/sse/impl/DefaultEventSourceTest.java
@@ -0,0 +1,268 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.sse.impl;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.sse.EventSourceListener;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.IOReactorStatus;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.jupiter.api.Test;
+
+class DefaultEventSourceTest {
+
+ @Test
+ void stopReconnectExceptionDoesNotScheduleReconnect() throws Exception {
+ final RecordingScheduler scheduler = new RecordingScheduler();
+ final CapturingClient client = new CapturingClient();
+ final CountDownLatch failure = new CountDownLatch(1);
+ final CountDownLatch closed = new CountDownLatch(1);
+
+ final EventSourceListener listener = new EventSourceListener() {
+ @Override
+ public void onFailure(final Throwable t, final boolean willReconnect) {
+ if (!willReconnect) {
+ failure.countDown();
+ }
+ }
+
+ @Override
+ public void onClosed() {
+ closed.countDown();
+ }
+
+ @Override
+ public void onEvent(final String id, final String type, final String data) {
+ }
+ };
+
+ final DefaultEventSource es = new DefaultEventSource(
+ client,
+ URI.create("http://example.org/sse"),
+ Collections.emptyMap(),
+ listener,
+ scheduler,
+ null,
+ null,
+ SseParser.CHAR);
+
+ es.start();
+ client.lastCallback.failed(new SseResponseConsumer.StopReconnectException("Server closed stream (204)"));
+
+ assertTrue(failure.await(1, TimeUnit.SECONDS), "failure observed");
+ assertTrue(closed.await(1, TimeUnit.SECONDS), "closed observed");
+ assertTrue(scheduler.scheduledCount.get() == 0, "no reconnect scheduled");
+ }
+
+ @Test
+ void callerSchedulerIsNotShutdownOnCancel() {
+ final RecordingScheduler scheduler = new RecordingScheduler();
+ final CapturingClient client = new CapturingClient();
+
+ final DefaultEventSource es = new DefaultEventSource(
+ client,
+ URI.create("http://example.org/sse"),
+ Collections.emptyMap(),
+ (id, type, data) -> { },
+ scheduler,
+ null,
+ null,
+ SseParser.CHAR);
+
+ es.start();
+ es.cancel();
+
+ assertFalse(scheduler.shutdownCalled.get(), "caller scheduler not shutdown");
+ }
+
+ static final class CapturingClient extends CloseableHttpAsyncClient {
+ volatile FutureCallback lastCallback;
+
+ @Override
+ public void start() { }
+
+ @Override
+ public IOReactorStatus getStatus() {
+ return IOReactorStatus.ACTIVE;
+ }
+
+ @Override
+ public void awaitShutdown(final TimeValue waitTime) throws InterruptedException { }
+
+ @Override
+ public void initiateShutdown() { }
+
+ @Override
+ public void close(final CloseMode closeMode) { }
+
+ @Override
+ public void close() { }
+
+ @Override
+ protected Future doExecute(
+ final HttpHost target,
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer responseConsumer,
+ final HandlerFactory pushHandlerFactory,
+ final HttpContext context,
+ final FutureCallback callback) {
+ @SuppressWarnings("unchecked") final FutureCallback cb = (FutureCallback) callback;
+ this.lastCallback = cb;
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ @Deprecated
+ public void register(final String hostname, final String uriPattern, final Supplier supplier) {
+ }
+ }
+
+ static final class RecordingScheduler extends AbstractExecutorService implements ScheduledExecutorService {
+ final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+ final AtomicBoolean shutdown = new AtomicBoolean(false);
+ final AtomicInteger scheduledCount = new AtomicInteger(0);
+
+ @Override
+ public void shutdown() {
+ shutdown.set(true);
+ }
+
+ @Override
+ public List shutdownNow() {
+ shutdownCalled.set(true);
+ shutdown.set(true);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return shutdown.get();
+ }
+
+ @Override
+ public boolean awaitTermination(final long timeout, final TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ public void execute(final Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public ScheduledFuture> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ scheduledCount.incrementAndGet();
+ return new DummyScheduledFuture<>();
+ }
+
+ @Override
+ public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) {
+ scheduledCount.incrementAndGet();
+ return new DummyScheduledFuture<>();
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(
+ final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+ throw new UnsupportedOperationException("not used");
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(
+ final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+ throw new UnsupportedOperationException("not used");
+ }
+ }
+
+ static final class DummyScheduledFuture implements ScheduledFuture {
+ @Override
+ public long getDelay(final TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(final Delayed o) {
+ return 0;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public V get() {
+ return null;
+ }
+
+ @Override
+ public V get(final long timeout, final TimeUnit unit) {
+ return null;
+ }
+ }
+}