Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4939e5d
refactor(service): remove uses of tokio::spawn
AadamZ5 Feb 28, 2026
3b79366
refactor(operation-processor): remove uses of tokio::spawn
AadamZ5 Feb 28, 2026
8d6655b
refactor(progress): remove need for spawning on drop
AadamZ5 Mar 1, 2026
dc4204c
refactor(child-process): wip experiment with new child process transport
AadamZ5 Mar 1, 2026
1db16c8
refactor(child-process): implement tokio child process and use in test
AadamZ5 Mar 1, 2026
d0bd6ca
refactor(child-process): continue to build command abstraction
AadamZ5 Mar 1, 2026
02b53cd
refactor(child-process): add env to command, move builder to separate…
AadamZ5 Mar 1, 2026
86977b6
refactor(example): fix example compilation
AadamZ5 Mar 1, 2026
b746384
refactor(child-process): rename module back to "child-process"
AadamZ5 Mar 1, 2026
0adab25
refactor(test): re-introduce tests for child process dropping
AadamZ5 Mar 1, 2026
007dd92
refactor: revert some unnecessary module visibility changes
AadamZ5 Mar 1, 2026
319a77a
refactor(tests): update calls to serve in all unit tests
AadamZ5 Mar 3, 2026
aee0d4f
refactor(test,examples): update remaining calls to `serve(...)`
AadamZ5 Mar 3, 2026
5482545
refactor(docs): update docs to new call convention for serving
AadamZ5 Mar 3, 2026
a35067a
refactor(http): change to futures unordered
AadamZ5 Mar 4, 2026
a0724d3
refactor(worker): remove spawn from worker
AadamZ5 Mar 4, 2026
d85b6df
refactor(http): explicitly bubble work task up to be spawned
AadamZ5 Mar 5, 2026
7fa7a3f
fix(docs): fix doc examples so they compile
AadamZ5 Mar 5, 2026
d526a9b
refactor(http): use different timeout API for futures
AadamZ5 Mar 5, 2026
595cb6f
Merge branch 'main' into dev/remove-tokio-rt
AadamZ5 Mar 5, 2026
4732493
chore(examples): cleanup examples so they compile
AadamZ5 Mar 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,32 @@ Json Schema generation (version 2020-12):
<summary>Start a client</summary>

```rust, ignore
use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
use rmcp::{
ServiceExt,
transport::{CommandBuilder, ChildProcessTransport, tokio::TokioChildProcessRunner}
};
use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| {
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
}))?).await?;

// Build and spawn a child process
let command = CommandBuilder::<TokioChildProcessRunner>::new("npx")
.arg("-y")
.arg("@modelcontextprotocol/server-everything")
.spawn_dyn()?

// Create a transport via the child process's STDIN and STDOUT streams
let transport = ChildProcessTransport::new(command)?

let (client, work) = ().serve(transport).await?;
// Spawn the async work loop on the background
tokio::spawn(work);

// Use the client ...

// Finish using the client
client.cancel().await;
Ok(())
}
```
Expand Down Expand Up @@ -99,7 +117,7 @@ let service = common::counter::Counter::new();

```rust, ignore
// this call will finish the initialization process
let server = service.serve(transport).await?;
let (server, work) = service.serve(transport).await?;
```
</details>

Expand Down
97 changes: 63 additions & 34 deletions conformance/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,16 @@ async fn perform_oauth_flow_preregistered(
async fn run_auth_client(server_url: &str, ctx: &ConformanceContext) -> anyhow::Result<()> {
let auth_client = perform_oauth_flow(server_url, ctx).await?;

let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
auth_client,
StreamableHttpClientTransportConfig::with_uri(server_url),
);
tokio::spawn(http_work);

let (client, work) = BasicClientHandler.serve(transport).await?;
// Run the client work loop in the background while we interact with it
tokio::spawn(work);

let client = BasicClientHandler.serve(transport).await?;
tracing::debug!("Connected (authenticated)");

let tools = client.list_tools(Default::default()).await?;
Expand All @@ -344,7 +348,7 @@ async fn run_auth_client(server_url: &str, ctx: &ConformanceContext) -> anyhow::
.await;
}

client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down Expand Up @@ -374,12 +378,15 @@ async fn run_auth_scope_step_up_client(
.ok_or_else(|| anyhow::anyhow!("No AM"))?;
let auth_client = AuthClient::new(reqwest::Client::default(), am);

let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
auth_client.clone(),
StreamableHttpClientTransportConfig::with_uri(server_url),
);
tokio::spawn(http_work);

let client = BasicClientHandler.serve(transport).await?;
let (client, work) = BasicClientHandler.serve(transport).await?;
// Run the client work loop in the background while we interact with it
tokio::spawn(work);

let tools = client.list_tools(Default::default()).await?;
tracing::debug!("Listed {} tools", tools.tools.len());
Expand All @@ -402,7 +409,7 @@ async fn run_auth_scope_step_up_client(
Err(_) => {
tracing::debug!("Tool call failed (likely 403), attempting scope upgrade...");
// Drop old client, re-auth with upgraded scopes
client.cancel().await.ok();
client.cancel().await;

// Re-do the full flow; the server will give us the right scopes
// on the second authorization request.
Expand All @@ -422,11 +429,13 @@ async fn run_auth_scope_step_up_client(

let am2 = oauth2.into_authorization_manager().unwrap();
let auth_client2 = AuthClient::new(reqwest::Client::default(), am2);
let transport2 = StreamableHttpClientTransport::with_client(
let (transport2, http_work2) = StreamableHttpClientTransport::with_client(
auth_client2,
StreamableHttpClientTransportConfig::with_uri(server_url),
);
let client2 = BasicClientHandler.serve(transport2).await?;
tokio::spawn(http_work2);
let (client2, work2) = BasicClientHandler.serve(transport2).await?;
tokio::spawn(work2);
let _ = client2
.call_tool(CallToolRequestParams {
meta: None,
Expand All @@ -435,13 +444,13 @@ async fn run_auth_scope_step_up_client(
task: None,
})
.await;
client2.cancel().await.ok();
client2.cancel().await;
return Ok(());
}
}
}

client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down Expand Up @@ -469,12 +478,15 @@ async fn run_auth_scope_retry_limit_client(

let am = oauth.into_authorization_manager().unwrap();
let auth_client = AuthClient::new(reqwest::Client::default(), am);
let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
auth_client,
StreamableHttpClientTransportConfig::with_uri(server_url),
);
tokio::spawn(http_work);

let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);

let client = BasicClientHandler.serve(transport).await?;
let tools = client.list_tools(Default::default()).await?;

let mut got_403 = false;
Expand All @@ -496,7 +508,7 @@ async fn run_auth_scope_retry_limit_client(
}
}
}
client.cancel().await.ok();
client.cancel().await;

if !got_403 {
break;
Expand Down Expand Up @@ -527,12 +539,15 @@ async fn run_auth_preregistered_client(
let auth_client =
perform_oauth_flow_preregistered(server_url, client_id, client_secret).await?;

let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
auth_client,
StreamableHttpClientTransportConfig::with_uri(server_url),
);
tokio::spawn(http_work);

let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);

let client = BasicClientHandler.serve(transport).await?;
let tools = client.list_tools(Default::default()).await?;
tracing::debug!("Listed {} tools", tools.tools.len());

Expand All @@ -547,7 +562,7 @@ async fn run_auth_preregistered_client(
})
.await;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down Expand Up @@ -585,13 +600,16 @@ async fn run_client_credentials_basic(
.ok_or_else(|| anyhow::anyhow!("No access_token in response"))?;

// Use static token
let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
reqwest::Client::default(),
StreamableHttpClientTransportConfig::with_uri(server_url)
.auth_header(access_token.to_string()),
);
tokio::spawn(http_work);

let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);

let client = BasicClientHandler.serve(transport).await?;
let tools = client.list_tools(Default::default()).await?;
tracing::debug!("Listed {} tools", tools.tools.len());
for tool in &tools.tools {
Expand All @@ -605,7 +623,7 @@ async fn run_client_credentials_basic(
})
.await;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down Expand Up @@ -655,13 +673,16 @@ async fn run_client_credentials_jwt(
.as_str()
.ok_or_else(|| anyhow::anyhow!("No access_token: {}", token_resp))?;

let transport = StreamableHttpClientTransport::with_client(
let (transport, http_work) = StreamableHttpClientTransport::with_client(
reqwest::Client::default(),
StreamableHttpClientTransportConfig::with_uri(server_url)
.auth_header(access_token.to_string()),
);
tokio::spawn(http_work);

let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);

let client = BasicClientHandler.serve(transport).await?;
let tools = client.list_tools(Default::default()).await?;
tracing::debug!("Listed {} tools", tools.tools.len());
for tool in &tools.tools {
Expand All @@ -675,7 +696,7 @@ async fn run_client_credentials_jwt(
})
.await;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down Expand Up @@ -825,17 +846,21 @@ fn build_tool_arguments(tool: &Tool) -> Option<serde_json::Map<String, Value>> {
// ─── Non-auth scenarios ─────────────────────────────────────────────────────

async fn run_basic_client(server_url: &str) -> anyhow::Result<()> {
let transport = StreamableHttpClientTransport::from_uri(server_url);
let client = BasicClientHandler.serve(transport).await?;
let (transport, http_work) = StreamableHttpClientTransport::from_uri(server_url);
tokio::spawn(http_work);
let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);
let tools = client.list_tools(Default::default()).await?;
tracing::debug!("Listed {} tools", tools.tools.len());
client.cancel().await?;
client.cancel().await;
Ok(())
}

async fn run_tools_call_client(server_url: &str) -> anyhow::Result<()> {
let transport = StreamableHttpClientTransport::from_uri(server_url);
let client = FullClientHandler.serve(transport).await?;
let (transport, http_work) = StreamableHttpClientTransport::from_uri(server_url);
tokio::spawn(http_work);
let (client, work) = FullClientHandler.serve(transport).await?;
tokio::spawn(work);
let tools = client.list_tools(Default::default()).await?;
for tool in &tools.tools {
let args = build_tool_arguments(tool);
Expand All @@ -848,13 +873,15 @@ async fn run_tools_call_client(server_url: &str) -> anyhow::Result<()> {
})
.await?;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

async fn run_elicitation_defaults_client(server_url: &str) -> anyhow::Result<()> {
let transport = StreamableHttpClientTransport::from_uri(server_url);
let client = ElicitationDefaultsClientHandler.serve(transport).await?;
let (transport, http_work) = StreamableHttpClientTransport::from_uri(server_url);
tokio::spawn(http_work);
let (client, work) = ElicitationDefaultsClientHandler.serve(transport).await?;
tokio::spawn(work);
let tools = client.list_tools(Default::default()).await?;
let test_tool = tools.tools.iter().find(|t| {
let n = t.name.as_ref();
Expand All @@ -870,13 +897,15 @@ async fn run_elicitation_defaults_client(server_url: &str) -> anyhow::Result<()>
})
.await?;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

async fn run_sse_retry_client(server_url: &str) -> anyhow::Result<()> {
let transport = StreamableHttpClientTransport::from_uri(server_url);
let client = BasicClientHandler.serve(transport).await?;
let (transport, http_work) = StreamableHttpClientTransport::from_uri(server_url);
tokio::spawn(http_work);
let (client, work) = BasicClientHandler.serve(transport).await?;
tokio::spawn(work);
let tools = client.list_tools(Default::default()).await?;
if let Some(tool) = tools
.tools
Expand All @@ -892,7 +921,7 @@ async fn run_sse_retry_client(server_url: &str) -> anyhow::Result<()> {
})
.await?;
}
client.cancel().await?;
client.cancel().await;
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion conformance/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,11 +943,12 @@ async fn main() -> anyhow::Result<()> {
stateful_mode: true,
..Default::default()
};
let service = StreamableHttpService::new(
let (service, http_work) = StreamableHttpService::new(
move || Ok(server.clone()),
LocalSessionManager::default().into(),
config,
);
tokio::spawn(http_work);

let router = axum::Router::new().nest_service("/mcp", service);

Expand Down
Loading