WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 3fc685e

Browse files
committed
SSE/Java9: implementation
1 parent 35c6dbd commit 3fc685e

File tree

7 files changed

+247
-0
lines changed

7 files changed

+247
-0
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[//]: # (TODO)
2+
[//]: # (TODO: test)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
apply plugin: 'java-library'
2+
apply plugin: 'org.jetbrains.kotlin.jvm'
3+
apply plugin: 'com.vanniktech.maven.publish'
4+
apply plugin: 'org.jetbrains.dokka'
5+
6+
dependencies {
7+
api projects.retrofit
8+
api libs.okhttp.sse
9+
compileOnly libs.findBugsAnnotations
10+
11+
// testImplementation libs.junit
12+
// testImplementation libs.truth
13+
// testImplementation libs.guava
14+
testImplementation libs.okhttp.mockwebserver
15+
}
16+
17+
jar {
18+
manifest {
19+
attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-java9'
20+
}
21+
}
22+
23+
kotlin {
24+
jvmToolchain 9
25+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
POM_ARTIFACT_ID=adapter-sse-java9
2+
POM_NAME=Adapter: SSE Java 9
3+
POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with Java 9's Flow.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2017 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package retrofit2.adapter.sse
17+
18+
/**
19+
* A server-sent event.
20+
*/
21+
data class ServerSentEvent<ID : Any, TYPE : Any, DATA : Any>(
22+
@get:JvmName("id")
23+
val id: ID?,
24+
25+
@get:JvmName("type")
26+
val type: TYPE?,
27+
28+
@get:JvmName("data")
29+
val data: DATA,
30+
)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright (C) 2017 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package retrofit2.adapter.sse.java9
17+
18+
import java.lang.reflect.ParameterizedType
19+
import java.lang.reflect.Type
20+
import java.util.concurrent.Executor
21+
import java.util.concurrent.Executors
22+
import java.util.concurrent.Flow
23+
import java.util.concurrent.ForkJoinPool
24+
import java.util.concurrent.SubmissionPublisher
25+
import okhttp3.sse.EventSource
26+
import okhttp3.sse.EventSourceListener
27+
import okhttp3.sse.EventSources
28+
import retrofit2.Call
29+
import retrofit2.CallAdapter
30+
import retrofit2.Callback
31+
import retrofit2.Converter
32+
import retrofit2.Response
33+
import retrofit2.Retrofit
34+
import retrofit2.adapter.sse.ServerSentEvent
35+
36+
internal class SseJucFlowCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
37+
private val idType: Type,
38+
private val typeType: Type,
39+
private val dataType: Type,
40+
private val idConverter: Converter<String?, ID?>,
41+
private val typeConverter: Converter<String?, TYPE?>,
42+
private val dataConverter: Converter<String, DATA>,
43+
private val retrofit: Retrofit,
44+
) : CallAdapter<Any, Flow.Publisher<ServerSentEvent<ID, TYPE, DATA>>> {
45+
46+
private val executor: Executor =
47+
retrofit.callbackExecutor()
48+
?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 }
49+
?: Executors.newCachedThreadPool()
50+
51+
private val responseType = object : ParameterizedType {
52+
override fun getActualTypeArguments(): Array<Type> {
53+
return arrayOf(idType, typeType, dataType)
54+
}
55+
56+
override fun getOwnerType(): Type? {
57+
return null
58+
}
59+
60+
override fun getRawType(): Type {
61+
return Flow.Publisher::class.java
62+
}
63+
}
64+
65+
override fun responseType(): Type = responseType
66+
67+
override fun adapt(call: Call<in Any>): Flow.Publisher<ServerSentEvent<ID, TYPE, DATA>> = SsePublisher(call)
68+
69+
inner class SsePublisher(
70+
private val call: Call<Any>,
71+
) : SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>(executor, Flow.defaultBufferSize()) {
72+
override fun subscribe(subscriber: Flow.Subscriber<in ServerSentEvent<ID, TYPE, DATA>>?) {
73+
super.subscribe(subscriber)
74+
call.enqueue(PublisherCallback(this))
75+
}
76+
77+
override fun close() {
78+
call.cancel()
79+
super.close()
80+
}
81+
}
82+
83+
inner class PublisherCallback(private val publisher: SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>) :
84+
Callback<Any> {
85+
override fun onResponse(call: Call<Any>, response: Response<Any>) {
86+
EventSources.processResponse(
87+
response.raw(),
88+
object : EventSourceListener() {
89+
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
90+
val convertedId =
91+
if (id != null) idConverter.convert(id) ?: error("Failed to convert $id to $idType") else null
92+
val convertedType =
93+
if (type != null) typeConverter.convert(type) ?: error("Failed to convert $type to $typeType") else null
94+
val convertedData = dataConverter.convert(data) ?: error("Failed to convert $data to $dataType")
95+
publisher.submit(ServerSentEvent(convertedId, convertedType, convertedData))
96+
}
97+
98+
override fun onClosed(eventSource: EventSource) {
99+
publisher.close()
100+
}
101+
102+
override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) {
103+
publisher.closeExceptionally(t ?: RuntimeException()) // TODO
104+
}
105+
},
106+
)
107+
}
108+
109+
override fun onFailure(call: Call<Any>, t: Throwable) {
110+
publisher.closeExceptionally(t)
111+
}
112+
}
113+
114+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (C) 2017 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package retrofit2.adapter.sse.java9
17+
18+
import java.lang.reflect.ParameterizedType
19+
import java.lang.reflect.Type
20+
import java.util.concurrent.Flow
21+
import retrofit2.CallAdapter
22+
import retrofit2.Retrofit
23+
import retrofit2.adapter.sse.ServerSentEvent
24+
25+
class SseJucFlowCallAdapterFactory : CallAdapter.Factory() {
26+
companion object {
27+
private val EMPTY_ARRAY = emptyArray<Annotation>()
28+
}
29+
30+
override fun get(
31+
returnType: Type,
32+
annotations: Array<out Annotation?>,
33+
retrofit: Retrofit,
34+
): CallAdapter<*, *>? {
35+
if (getRawType(returnType) != Flow.Publisher::class.java) {
36+
return null
37+
}
38+
39+
if (returnType !is ParameterizedType) {
40+
error(
41+
"Flow.Publisher return type must be parameterized as Flow.Publisher<Foo> or Flow.Publisher<? extends Foo>",
42+
)
43+
}
44+
45+
val innerType = getParameterUpperBound(0, returnType)
46+
47+
if (getRawType(innerType) != ServerSentEvent::class.java) {
48+
return null
49+
}
50+
51+
if (innerType !is ParameterizedType) {
52+
error(
53+
"ServerSentEvent must be parameterized as ServerSentEvent<ID, TYPE, DATA>" +
54+
" or ServerSentEvent<? extends ID, ? extends TYPE, ? extends DATA>",
55+
)
56+
}
57+
58+
val idType = getParameterUpperBound(0, innerType)
59+
val typeType = getParameterUpperBound(1, innerType)
60+
val dataType = getParameterUpperBound(2, innerType)
61+
62+
return SseJucFlowCallAdapter(
63+
idType,
64+
typeType,
65+
dataType,
66+
retrofit.stringConverter(idType, EMPTY_ARRAY),
67+
retrofit.stringConverter(typeType, EMPTY_ARRAY),
68+
retrofit.stringConverter(dataType, EMPTY_ARRAY),
69+
retrofit,
70+
)
71+
}
72+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ include ':retrofit-adapters:rxjava'
3030
include ':retrofit-adapters:rxjava2'
3131
include ':retrofit-adapters:rxjava3'
3232
include ':retrofit-adapters:scala'
33+
include ':retrofit-adapters:sse-java9'
3334

3435
include ':retrofit-converters:gson'
3536
include ':retrofit-converters:guava'

0 commit comments

Comments
 (0)