How to Build a Stateless, Secure, and Asynchronous MCP-Style Protocol for a Degradable Agent Workflow

In this tutorial, we build a clean, advanced demonstration of modern MCP design by focusing on three core ideas: stateless communication, strong validation at the SDK level, and asynchronous, long-running operations. We implement a lightweight protocol similar to MCP using structured envelopes, signed requests, and Pydantic’s proven tools to demonstrate how agents and services can interact securely without relying on persistent sessions. Check out FULL CODES here.
import asyncio, time, json, uuid, hmac, hashlib
from dataclasses import dataclass
from typing import Any, Dict, Optional, Literal, List
from pydantic import BaseModel, Field, ValidationError, ConfigDict
def _now_ms():
return int(time.time() * 1000)
def _uuid():
return str(uuid.uuid4())
def _canonical_json(obj):
return json.dumps(obj, separators=(",", ":"), sort_keys=True).encode()
def _hmac_hex(secret, payload):
return hmac.new(secret, _canonical_json(payload), hashlib.sha256).hexdigest()
We set up key resources needed for the entire system, including timers, UUID generation, JSON canonical generation, and cryptographic signing. We ensure that all requests and responses can be signed and verified using HMAC. Check out FULL CODES here.
class MCPEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal["mcp/0.1"] = "mcp/0.1"
request_id: str = Field(default_factory=_uuid)
ts_ms: int = Field(default_factory=_now_ms)
client_id: str
server_id: str
tool: str
args: Dict[str, Any] = Field(default_factory=dict)
nonce: str = Field(default_factory=_uuid)
signature: str
class MCPResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal["mcp/0.1"] = "mcp/0.1"
request_id: str
ts_ms: int = Field(default_factory=_now_ms)
ok: bool
server_id: str
status: Literal["ok", "accepted", "running", "done", "error"]
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
signature: str
We describe the formal MCP envelope and response formats that all interactions follow. We implement robust schemas using Pydantic to ensure that invalid or unexpected fields are rejected early. It ensures consistent contracts between clients and servers, which is essential for SDK setup. Check out FULL CODES here.
class ServerIdentityOut(BaseModel):
model_config = ConfigDict(extra="forbid")
server_id: str
fingerprint: str
capabilities: Dict[str, Any]
class BatchSumIn(BaseModel):
model_config = ConfigDict(extra="forbid")
numbers: List[float] = Field(min_length=1)
class BatchSumOut(BaseModel):
model_config = ConfigDict(extra="forbid")
count: int
total: float
class StartLongTaskIn(BaseModel):
model_config = ConfigDict(extra="forbid")
seconds: int = Field(ge=1, le=20)
payload: Dict[str, Any] = Field(default_factory=dict)
class PollJobIn(BaseModel):
model_config = ConfigDict(extra="forbid")
job_id: str
We declare the validated input and output models for each tool exposed by the server. We use Pydantic constraints to clearly define what each tool accepts and returns. It makes the behavior of tools predictable and safe, even when invoked by LLM-driven agents. Check out FULL CODES here.
@dataclass
class JobState:
job_id: str
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class MCPServer:
def __init__(self, server_id, secret):
self.server_id = server_id
self.secret = secret
self.jobs = {}
self.tasks = {}
def _fingerprint(self):
return hashlib.sha256(self.secret).hexdigest()[:16]
async def handle(self, env_dict, client_secret):
env = MCPEnvelope(**env_dict)
payload = env.model_dump()
sig = payload.pop("signature")
if _hmac_hex(client_secret, payload) != sig:
return {"error": "bad signature"}
if env.tool == "server_identity":
out = ServerIdentityOut(
server_id=self.server_id,
fingerprint=self._fingerprint(),
capabilities={"async": True, "stateless": True},
)
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "batch_sum":
args = BatchSumIn(**env.args)
out = BatchSumOut(count=len(args.numbers), total=sum(args.numbers))
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "start_long_task":
args = StartLongTaskIn(**env.args)
jid = _uuid()
self.jobs[jid] = JobState(jid, "running")
async def run():
await asyncio.sleep(args.seconds)
self.jobs[jid].status = "done"
self.jobs[jid].result = args.payload
self.tasks[jid] = asyncio.create_task(run())
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="accepted",
result={"job_id": jid},
signature="",
)
elif env.tool == "poll_job":
args = PollJobIn(**env.args)
job = self.jobs[args.job_id]
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status=job.status,
result=job.result,
signature="",
)
payload = resp.model_dump()
resp.signature = _hmac_hex(self.secret, payload)
return resp.model_dump()
We use a stateless and async MCP server for its task management. We handle application authentication, tool deployment, and long-running execution without relying on session state. By returning job identifiers and allowing polling, we demonstrate non-blocking, multitasking. Check out FULL CODES here.
class MCPClient:
def __init__(self, client_id, secret, server):
self.client_id = client_id
self.secret = secret
self.server = server
async def call(self, tool, args=None):
env = MCPEnvelope(
client_id=self.client_id,
server_id=self.server.server_id,
tool=tool,
args=args or {},
signature="",
).model_dump()
env["signature"] = _hmac_hex(self.secret, {k: v for k, v in env.items() if k != "signature"})
return await self.server.handle(env, self.secret)
async def demo():
server_secret = b"server_secret"
client_secret = b"client_secret"
server = MCPServer("mcp-server-001", server_secret)
client = MCPClient("client-001", client_secret, server)
print(await client.call("server_identity"))
print(await client.call("batch_sum", {"numbers": [1, 2, 3]}))
start = await client.call("start_long_task", {"seconds": 2, "payload": {"task": "demo"}})
jid = start["result"]["job_id"]
while True:
poll = await client.call("poll_job", {"job_id": jid})
if poll["status"] == "done":
print(poll)
break
await asyncio.sleep(0.5)
await demo()
We build a lightweight stateless client that signs each request and interacts with the server through structured envelopes. We demonstrate synchronous calls, input validation failures, and asynchronous task polling in a single flow. It shows how clients can reliably use MCP-style services in real agent pipelines.
In conclusion, we have shown how MCP evolves from a simple interface for calling a tool into a robust protocol suitable for real-world systems. We started operations asynchronously and polled results without blocking execution, used transparent contracts with schema validation, and relied on stateless, signed messages to maintain security and flexibility. Together, these patterns show how modern MCP-style systems support reliable, enterprise-ready agent workflows while remaining simple, transparent, and easy to extend.
Check out FULL CODES here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the power of Artificial Intelligence for the benefit of society. His latest endeavor is the launch of Artificial Intelligence Media Platform, Marktechpost, which stands out for its extensive coverage of machine learning and deep learning stories that sound technically sound and easily understood by a wide audience. The platform boasts of more than 2 million monthly views, which shows its popularity among viewers.



