Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
de43e2a
merge fallout
bluestreak01 Feb 14, 2026
dee93bd
tidy
bluestreak01 Feb 14, 2026
ab2f5b5
move client test
bluestreak01 Feb 14, 2026
73ee623
more tidy
bluestreak01 Feb 14, 2026
49ecfa7
ilpv4 -> QWP (QuestDB Wire Protocol)
bluestreak01 Feb 14, 2026
10c306a
tidy
bluestreak01 Feb 14, 2026
f12a6f9
wip
bluestreak01 Feb 15, 2026
d442198
wip2
bluestreak01 Feb 15, 2026
ea798b8
wip3
bluestreak01 Feb 15, 2026
3e444a0
wip4
bluestreak01 Feb 15, 2026
dd4bb62
wip 5
bluestreak01 Feb 16, 2026
f609b5a
wip 9
bluestreak01 Feb 22, 2026
eb9531d
wip 11
bluestreak01 Feb 22, 2026
7f16328
wip 12
bluestreak01 Feb 22, 2026
8795081
move test workloads/benchmarks to the client maven module
jerrinot Feb 23, 2026
992cd8e
wip 13
bluestreak01 Feb 23, 2026
eacff2d
Merge branch 'jh_experiment_new_ilp' of https://github.com/questdb/ja…
bluestreak01 Feb 23, 2026
332b66f
Fix pong frame clobbering in-progress send buffer
mtopolnik Feb 24, 2026
82955d0
Fix buffer overrun in WebSocket close frame
mtopolnik Feb 24, 2026
8439f45
Echo close frame on WebSocket close (RFC 6455)
mtopolnik Feb 24, 2026
c41aa58
Add WebSocket fragmentation support (RFC 6455)
mtopolnik Feb 24, 2026
8457d1b
Cap recv buffer growth to prevent OOM
mtopolnik Feb 24, 2026
1dc87d7
Use ephemeral ports in WebSocket builder tests
mtopolnik Feb 24, 2026
00c145b
Auto-cleanup test code
mtopolnik Feb 24, 2026
dbca590
Don't default QUESTDB_RUNNING to true
mtopolnik Feb 24, 2026
34cc154
Fix case-sensitive header check in WebSocket handshake
mtopolnik Feb 24, 2026
2834b03
Use bulk copyMemory for WebSocket I/O
mtopolnik Feb 24, 2026
6a94139
Fix delta dict corruption on send failure
mtopolnik Feb 24, 2026
6d7a104
Fix TYPE_CHAR validation in QwpColumnDef
mtopolnik Feb 24, 2026
bed5c32
Throw LineSenderException for token in ws/wss config
mtopolnik Feb 24, 2026
924e26a
Fix racy batch ID counter in MicrobatchBuffer
mtopolnik Feb 24, 2026
34e0cf7
Throw on buffer overflow in QwpBitWriter and Gorilla encoder
mtopolnik Feb 24, 2026
ba49b6b
Add NativeBufferWriter tests
mtopolnik Feb 24, 2026
21aa7bb
Fix stale array offsets after cancelRow truncation
mtopolnik Feb 24, 2026
1ef6d9e
Pass auto-flush config to sync-mode WebSocket sender
mtopolnik Feb 24, 2026
38d96c7
Add DECIMAL64/128/256 to isFixedWidthType and getFixedTypeSize
mtopolnik Feb 24, 2026
e65c882
Pool ArrayCapture to eliminate per-row allocations
mtopolnik Feb 24, 2026
4c0c2ff
Remove dead parseBuffer allocation in ResponseReader
mtopolnik Feb 24, 2026
f71a1ee
Auto-clean up QwpBitReader
mtopolnik Feb 24, 2026
b85d890
Sort members alphabetically, remove section headings
mtopolnik Feb 25, 2026
6e13959
Fix Gorilla encoder bucket boundaries
mtopolnik Feb 25, 2026
b8514c0
Remove unused opcode param from beginFrame()
mtopolnik Feb 25, 2026
7a87c76
Use correct default port for WebSocket protocol
mtopolnik Feb 25, 2026
a274be5
Fix inc() not adding key to list
mtopolnik Feb 25, 2026
7d003e0
Validate low surrogate in UTF-8 encoding
mtopolnik Feb 25, 2026
11326f1
Eliminate hot-path allocations in waitForAck
mtopolnik Feb 25, 2026
5f54bc3
Wait for server ACKs on close() in async mode
mtopolnik Feb 25, 2026
b34ba1e
Add GEOHASH support to QWP table buffers
mtopolnik Feb 25, 2026
cf64a41
Use ChaCha20 CSPRNG for WebSocket masking keys
mtopolnik Feb 25, 2026
b6609cd
Simplify QwpBitReader.alignToByte()
mtopolnik Feb 25, 2026
8d16e0f
Fix putUtf8() lone surrogate encoding mismatch
mtopolnik Feb 25, 2026
9254ebf
Fix addSymbol() null on non-nullable columns
mtopolnik Feb 25, 2026
391fce4
Fix addNull() for array types on non-nullable columns
mtopolnik Feb 25, 2026
a92e7b4
Fix sendCloseFrame/sendPing clobbering sendBuffer
mtopolnik Feb 25, 2026
c627823
Fix WebSocketSendBuffer.grow() integer overflow
mtopolnik Feb 25, 2026
4f4f28b
Fix readFrom() leaking stale error messages
mtopolnik Feb 25, 2026
c984919
Fix reset() to clear all table buffers
mtopolnik Feb 25, 2026
93dce20
Fix ensureBits() 64-bit buffer overflow
mtopolnik Feb 25, 2026
1180216
Add bounds check to NativeBufferWriter.skip()
mtopolnik Feb 25, 2026
c55d982
Invalidate cached timestamp columns on flush
mtopolnik Feb 25, 2026
ecb595f
Fix putBlockOfBytes() long-to-int overflow
mtopolnik Feb 25, 2026
542a667
Merge sender state tests into one class
mtopolnik Feb 25, 2026
243c03e
Remove unused sendQueueCapacity parameter
mtopolnik Feb 26, 2026
c58b7cd
Clarify wire vs buffer type size javadoc
mtopolnik Feb 26, 2026
408c889
Round out GEOHASH support
mtopolnik Feb 26, 2026
11a416c
Deduplicate column encoding in QwpWebSocketEncoder
mtopolnik Feb 26, 2026
bde1a51
Delete WebSocketChannel and ResponseReader
mtopolnik Feb 26, 2026
f304d31
Detect decimal overflow on rescale in QwpTableBuffer
mtopolnik Feb 26, 2026
1e452c6
Port QwpGorillaEncoder tests from core
mtopolnik Feb 26, 2026
ec56390
Explain precondition in gorilla encoder
mtopolnik Feb 26, 2026
c81c80f
Add bounds assertion to jumpTo()
mtopolnik Feb 26, 2026
99876bc
Avoid BigDecimal allocation in decimalColumn
mtopolnik Feb 26, 2026
a999a90
Remove redundant local variable
mtopolnik Feb 26, 2026
19ac1c6
Fix UUID storage allocation in QwpTableBuffer
mtopolnik Feb 26, 2026
c1500f8
Delete unused getSymbolsInRange()
mtopolnik Feb 26, 2026
2a11d37
Move GlobalSymbolDictionaryTest to client
mtopolnik Feb 26, 2026
8630ead
Move DeltaSymbolDictionaryTest to client
mtopolnik Feb 26, 2026
793f6f6
Move 5 pure client tests to client submodule
mtopolnik Feb 26, 2026
233f4ed
Port 5 QWP protocol tests from core
mtopolnik Feb 26, 2026
2ee3ac0
Use VarHandle for InFlightWindow statistics
mtopolnik Feb 26, 2026
8a6cbe8
Replace busy-wait with monitor wait in close()
mtopolnik Feb 26, 2026
e153806
Validate Upgrade and Connection headers in WS handshake
mtopolnik Feb 26, 2026
859c7cb
Use SecureRnd for WebSocket handshake key
mtopolnik Feb 26, 2026
9e91498
Clean up client QwpConstants
mtopolnik Feb 26, 2026
e6ef8e8
Remove serverMode from client WebSocketFrameParser
mtopolnik Feb 26, 2026
44f4db0
Style cleanup in Sender
mtopolnik Feb 26, 2026
6867c50
Delete unused code
mtopolnik Feb 26, 2026
099c183
Clean up WebSocketClientFactory
mtopolnik Feb 26, 2026
254c7c3
Fix copyright year
mtopolnik Feb 26, 2026
2a14356
Delete duplicate method utf8Length()
mtopolnik Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<maven.javadoc.skip>false</maven.javadoc.skip>
<outputPath>target</outputPath>
<runtime.assembly>none</runtime.assembly>
<javac.target>11</javac.target>
<javac.target>17</javac.target>
<argLine>-ea -Dfile.encoding=UTF-8 -XX:+UseParallelGC</argLine>
<test.exclude>None</test.exclude>
<test.include>%regex[.*[^o].class]</test.include><!-- exclude module-info.class-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public BuildInformationHolder(Class<?> clazz) {
String swVersion;
try {
final Attributes manifestAttributes = getManifestAttributes(clazz);
swVersion = getAttr(manifestAttributes, "QuestDB-Client-Version", "[DEVELOPMENT]");
swVersion = getAttr(manifestAttributes, "[DEVELOPMENT]");
} catch (IOException e) {
swVersion = UNKNOWN;
}
Expand All @@ -57,8 +57,8 @@ public String getSwVersion() {
return swVersion;
}

private static String getAttr(final Attributes manifestAttributes, String attributeName, String defaultValue) {
final String value = manifestAttributes.getValue(attributeName);
private static String getAttr(final Attributes manifestAttributes, String defaultValue) {
final String value = manifestAttributes.getValue("QuestDB-Client-Version");
return value != null ? value : defaultValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ default int getMaximumRequestBufferSize() {
return Integer.MAX_VALUE;
}

default int getMaximumResponseBufferSize() {
return Integer.MAX_VALUE;
}

default NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
Expand Down
172 changes: 164 additions & 8 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.questdb.client.cutlass.line.http.AbstractLineHttpSender;
import io.questdb.client.cutlass.line.tcp.DelegatingTlsChannel;
import io.questdb.client.cutlass.line.tcp.PlainTcpLineChannel;
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
import io.questdb.client.impl.ConfStringParser;
import io.questdb.client.network.NetworkFacade;
import io.questdb.client.network.NetworkFacadeImpl;
Expand Down Expand Up @@ -95,7 +96,7 @@
* 2. Call {@link #reset()} to clear the internal buffers and start building a new row
* <br>
* Note: If the underlying error is permanent, retrying {@link #flush()} will fail again.
* Use {@link #reset()} to discard the problematic data and continue with new data. See {@link LineSenderException#isRetryable()}
* Use {@link #reset()} to discard the problematic data and continue with new data.
*
*/
public interface Sender extends Closeable, ArraySender<Sender> {
Expand All @@ -108,7 +109,7 @@ public interface Sender extends Closeable, ArraySender<Sender> {
/**
* Create a Sender builder instance from a configuration string.
* <br>
* This allows to use the configuration string as a template for creating a Sender builder instance and then
* This allows using the configuration string as a template for creating a Sender builder instance and then
* tune options which are not available in the configuration string. Configurations options specified in the
* configuration string cannot be overridden via the builder methods.
* <p>
Expand Down Expand Up @@ -144,7 +145,12 @@ static LineSenderBuilder builder(CharSequence configurationString) {
* @return Builder object to create a new Sender instance.
*/
static LineSenderBuilder builder(Transport transport) {
return new LineSenderBuilder(transport == Transport.HTTP ? LineSenderBuilder.PROTOCOL_HTTP : LineSenderBuilder.PROTOCOL_TCP);
int protocol = switch (transport) {
case HTTP -> LineSenderBuilder.PROTOCOL_HTTP;
case TCP -> LineSenderBuilder.PROTOCOL_TCP;
case WEBSOCKET -> LineSenderBuilder.PROTOCOL_WEBSOCKET;
};
return new LineSenderBuilder(protocol);
}

/**
Expand Down Expand Up @@ -461,7 +467,15 @@ enum Transport {
* and for use-cases where HTTP transport is not suitable, when communicating with a QuestDB server over a high-latency
* network
*/
TCP
TCP,

/**
* Use WebSocket transport to communicate with a QuestDB server.
* <p>
* WebSocket transport uses the ILP v4 binary protocol for efficient data ingestion.
* It supports both synchronous and asynchronous modes with flow control.
*/
WEBSOCKET
}

/**
Expand Down Expand Up @@ -508,12 +522,17 @@ final class LineSenderBuilder {
private static final int DEFAULT_BUFFER_CAPACITY = 64 * 1024;
private static final int DEFAULT_HTTP_PORT = 9000;
private static final int DEFAULT_HTTP_TIMEOUT = 30_000;
private static final int DEFAULT_IN_FLIGHT_WINDOW_SIZE = 8;
private static final int DEFAULT_MAXIMUM_BUFFER_CAPACITY = 100 * 1024 * 1024;
private static final int DEFAULT_MAX_BACKOFF_MILLIS = 1_000;
private static final int DEFAULT_MAX_NAME_LEN = 127;
private static final long DEFAULT_MAX_RETRY_NANOS = TimeUnit.SECONDS.toNanos(10); // keep sync with the contract of the configuration method
private static final long DEFAULT_MIN_REQUEST_THROUGHPUT = 100 * 1024; // 100KB/s, keep in sync with the contract of the configuration method
private static final int DEFAULT_TCP_PORT = 9009;
private static final int DEFAULT_WEBSOCKET_PORT = 9000;
private static final int DEFAULT_WS_AUTO_FLUSH_BYTES = 1024 * 1024; // 1MB
private static final long DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
private static final int DEFAULT_WS_AUTO_FLUSH_ROWS = 500;
private static final int MIN_BUFFER_SIZE = AuthUtils.CHALLENGE_LEN + 1; // challenge size + 1;
// The PARAMETER_NOT_SET_EXPLICITLY constant is used to detect if a parameter was set explicitly in configuration parameters
// where it matters. This is needed to detect invalid combinations of parameters. Why?
Expand All @@ -522,15 +541,19 @@ final class LineSenderBuilder {
private static final int PARAMETER_NOT_SET_EXPLICITLY = -1;
private static final int PROTOCOL_HTTP = 1;
private static final int PROTOCOL_TCP = 0;
private static final int PROTOCOL_WEBSOCKET = 2;
private final ObjList<String> hosts = new ObjList<>();
private final IntList ports = new IntList();
private boolean asyncMode = false;
private int autoFlushBytes = PARAMETER_NOT_SET_EXPLICITLY;
private int autoFlushIntervalMillis = PARAMETER_NOT_SET_EXPLICITLY;
private int autoFlushRows = PARAMETER_NOT_SET_EXPLICITLY;
private int bufferCapacity = PARAMETER_NOT_SET_EXPLICITLY;
private String httpPath;
private String httpSettingsPath;
private int httpTimeout = PARAMETER_NOT_SET_EXPLICITLY;
private String httpToken;
private int inFlightWindowSize = PARAMETER_NOT_SET_EXPLICITLY;
private String keyId;
private int maxBackoffMillis = PARAMETER_NOT_SET_EXPLICITLY;
private int maxNameLength = PARAMETER_NOT_SET_EXPLICITLY;
Expand Down Expand Up @@ -658,6 +681,47 @@ public AdvancedTlsSettings advancedTls() {
return new AdvancedTlsSettings();
}

/**
* Enable asynchronous mode for WebSocket transport.
* <br>
* In async mode, rows are batched and sent asynchronously with flow control.
* This provides higher throughput at the cost of more complex error handling.
* <br>
* This is only used when communicating over WebSocket transport.
* <br>
* Default is synchronous mode (false).
*
* @param enabled whether to enable async mode
* @return this instance for method chaining
*/
public LineSenderBuilder asyncMode(boolean enabled) {
this.asyncMode = enabled;
return this;
}

/**
* Set the maximum number of bytes per batch before auto-flushing.
* <br>
* This is only used when communicating over WebSocket transport.
* <br>
* Default value is 1MB.
*
* @param bytes maximum bytes per batch
* @return this instance for method chaining
*/
public LineSenderBuilder autoFlushBytes(int bytes) {
if (this.autoFlushBytes != PARAMETER_NOT_SET_EXPLICITLY) {
throw new LineSenderException("auto flush bytes was already configured")
.put("[bytes=").put(this.autoFlushBytes).put("]");
}
if (bytes < 0) {
throw new LineSenderException("auto flush bytes cannot be negative")
.put("[bytes=").put(bytes).put("]");
}
this.autoFlushBytes = bytes;
return this;
}

/**
* Set the interval in milliseconds at which the Sender automatically flushes its buffer.
* <br>
Expand Down Expand Up @@ -791,6 +855,40 @@ public Sender build() {
username, password, maxNameLength, actualMaxRetriesNanos, maxBackoffMillis, actualMinRequestThroughput, actualAutoFlushIntervalMillis, protocolVersion);
}

if (protocol == PROTOCOL_WEBSOCKET) {
if (hosts.size() != 1 || ports.size() != 1) {
throw new LineSenderException("only a single address (host:port) is supported for WebSocket transport");
}

int actualAutoFlushRows = autoFlushRows == PARAMETER_NOT_SET_EXPLICITLY ? DEFAULT_WS_AUTO_FLUSH_ROWS : autoFlushRows;
int actualAutoFlushBytes = autoFlushBytes == PARAMETER_NOT_SET_EXPLICITLY ? DEFAULT_WS_AUTO_FLUSH_BYTES : autoFlushBytes;
long actualAutoFlushIntervalNanos = autoFlushIntervalMillis == PARAMETER_NOT_SET_EXPLICITLY
? DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS
: TimeUnit.MILLISECONDS.toNanos(autoFlushIntervalMillis);
int actualInFlightWindowSize = inFlightWindowSize == PARAMETER_NOT_SET_EXPLICITLY ? DEFAULT_IN_FLIGHT_WINDOW_SIZE : inFlightWindowSize;

if (asyncMode) {
return QwpWebSocketSender.connectAsync(
hosts.getQuick(0),
ports.getQuick(0),
tlsEnabled,
actualAutoFlushRows,
actualAutoFlushBytes,
actualAutoFlushIntervalNanos,
actualInFlightWindowSize
);
} else {
return QwpWebSocketSender.connect(
hosts.getQuick(0),
ports.getQuick(0),
tlsEnabled,
actualAutoFlushRows,
actualAutoFlushBytes,
actualAutoFlushIntervalNanos
);
}
}

assert protocol == PROTOCOL_TCP;

if (hosts.size() != 1 || ports.size() != 1) {
Expand Down Expand Up @@ -1048,6 +1146,29 @@ public LineSenderBuilder httpUsernamePassword(String username, String password)
return this;
}

/**
* Set the maximum number of batches that can be in-flight awaiting server acknowledgment.
* <br>
* This is only used when communicating over WebSocket transport with async mode enabled.
* <br>
* Default value is 8.
*
* @param size maximum number of in-flight batches
* @return this instance for method chaining
*/
public LineSenderBuilder inFlightWindowSize(int size) {
if (this.inFlightWindowSize != PARAMETER_NOT_SET_EXPLICITLY) {
throw new LineSenderException("in-flight window size was already configured")
.put("[size=").put(this.inFlightWindowSize).put("]");
}
if (size < 1) {
throw new LineSenderException("in-flight window size must be positive")
.put("[size=").put(size).put("]");
}
this.inFlightWindowSize = size;
return this;
}

/**
* Configures the maximum backoff time between retry attempts when the Sender encounters recoverable errors.
* <br>
Expand Down Expand Up @@ -1275,7 +1396,13 @@ private void configureDefaults() {
maximumBufferCapacity = protocol == PROTOCOL_HTTP ? DEFAULT_MAXIMUM_BUFFER_CAPACITY : bufferCapacity;
}
if (ports.size() == 0) {
ports.add(protocol == PROTOCOL_HTTP ? DEFAULT_HTTP_PORT : DEFAULT_TCP_PORT);
if (protocol == PROTOCOL_HTTP) {
ports.add(DEFAULT_HTTP_PORT);
} else if (protocol == PROTOCOL_WEBSOCKET) {
ports.add(DEFAULT_WEBSOCKET_PORT);
} else {
ports.add(DEFAULT_TCP_PORT);
}
}
if (tlsValidationMode == null) {
tlsValidationMode = TlsValidationMode.DEFAULT;
Expand Down Expand Up @@ -1334,8 +1461,16 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else if (Chars.equals("tcps", sink)) {
tcp();
tlsEnabled = true;
} else if (Chars.equals("ws", sink)) {
if (tlsEnabled) {
throw new LineSenderException("cannot use ws protocol when TLS is enabled. use wss instead");
}
websocket();
} else if (Chars.equals("wss", sink)) {
websocket();
tlsEnabled = true;
} else {
throw new LineSenderException("invalid schema [schema=").put(sink).put(", supported-schemas=[http, https, tcp, tcps]]");
throw new LineSenderException("invalid schema [schema=").put(sink).put(", supported-schemas=[http, https, tcp, tcps, ws, wss]]");
}

String tcpToken = null;
Expand All @@ -1357,7 +1492,9 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
address(sink);
if (ports.size() == hosts.size() - 1) {
// not set
port(protocol == PROTOCOL_TCP ? DEFAULT_TCP_PORT : DEFAULT_HTTP_PORT);
port(protocol == PROTOCOL_TCP ? DEFAULT_TCP_PORT
: protocol == PROTOCOL_WEBSOCKET ? DEFAULT_WEBSOCKET_PORT
: DEFAULT_HTTP_PORT);
}
} else if (Chars.equals("user", sink)) {
// deprecated key: user, new key: username
Expand Down Expand Up @@ -1414,7 +1551,7 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else if (protocol == PROTOCOL_HTTP) {
httpToken(sink.toString());
} else {
throw new AssertionError();
throw new LineSenderException("token is not supported for WebSocket protocol");
}
} else if (Chars.equals("retry_timeout", sink)) {
pos = getValue(configurationString, pos, sink, "retry_timeout");
Expand Down Expand Up @@ -1617,12 +1754,31 @@ private void validateParameters() {
if (autoFlushIntervalMillis != PARAMETER_NOT_SET_EXPLICITLY) {
throw new LineSenderException("auto flush interval is not supported for TCP protocol");
}
} else if (protocol == PROTOCOL_WEBSOCKET) {
if (privateKey != null) {
throw new LineSenderException("TCP authentication is not supported for WebSocket protocol");
}
if (httpToken != null || username != null || password != null) {
// TODO: WebSocket auth not yet implemented
throw new LineSenderException("Authentication is not yet supported for WebSocket protocol");
}
if (inFlightWindowSize != PARAMETER_NOT_SET_EXPLICITLY && !asyncMode) {
throw new LineSenderException("in-flight window size requires async mode");
}
} else {
throw new LineSenderException("unsupported protocol ")
.put("[protocol=").put(protocol).put("]");
}
}

private void websocket() {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY) {
throw new LineSenderException("protocol was already configured ")
.put("[protocol=").put(protocol).put("]");
}
protocol = PROTOCOL_WEBSOCKET;
}

public class AdvancedTlsSettings {
/**
* Configure a custom truststore. This is only needed when using {@link #enableTls()} when your default
Expand Down
Loading
Loading