How It Works
This page explains the key implementation patterns that enable ARES’s RL abstraction. Understanding these details isn’t required to use ARES, but can help when debugging, extending the framework, or implementing custom components.
Queue-Mediated Communication
The most critical implementation pattern in ARES is the queue-mediated LLM client. This pattern enables the RL abstraction by intercepting LLM calls from code agents transparently.
The Problem
How do you create an RL environment where:
Code agents are written in natural, linear code (reason → execute → repeat)
The environment exposes LLM interactions as observations and actions
Agents remain unaware of the RL loop
How It Works
The QueueMediatedLLMClient implements the LLMClient protocol, but instead of making API calls, it:
Puts requests into an async queue: When code agent calls
await llm_client(request)Waits on a Future: The call blocks until someone provides a response
Returns the response: Code agent continues with the response
Meanwhile, the environment:
Watches the queue: Extracts
LLMRequestobjects as they arriveExposes them as observations: Returns them from
reset()andstep()Provides responses: When you call
step(action), sets the Future’s result
Implementation
The core implementation is simple:
@dataclass(frozen=True)
class QueueMediatedLLMClient(LLMClient):
q: asyncio.Queue[ValueAndFuture[LLMRequest, LLMResponse]]
async def __call__(self, request: LLMRequest) -> LLMResponse:
future = asyncio.Future[LLMResponse]()
await self.q.put(ValueAndFuture(value=request, future=future))
return await future # Blocks until env provides response
The environment side:
async def _get_time_step(self) -> TimeStep:
# Wait for code agent to finish OR make an LLM request
get_request = asyncio.create_task(self._llm_client.q.get())
done, _ = await asyncio.wait(
[self._code_agent_task, get_request],
return_when=asyncio.FIRST_COMPLETED
)
if get_request in done:
value_and_future = get_request.result()
self._llm_req_future = value_and_future.future
return TimeStep(step_type="MID", observation=value_and_future.value, ...)
async def step(self, action: LLMResponse) -> TimeStep:
# Unblock the code agent by providing response
self._llm_req_future.set_result(action)
return await self._get_time_step()
To dive into the code, see ares.llms.queue_mediated_client.QueueMediatedLLMClient and ares.environments.code_env.CodeEnvironment._get_time_step().
Multiple Environments
ARES environments are async and independent, enabling parallel data collection:
async def collect_episodes(num_parallel: int):
async def run_episode(env):
async with env:
ts = await env.reset()
while not ts.last():
action = await policy(ts.observation)
ts = await env.step(action)
return ts.reward
envs = [create_env() for _ in range(num_parallel)]
rewards = await asyncio.gather(*[run_episode(env) for env in envs])
return rewards
Each environment runs independently with its own container, code agent, and queue. This scales naturally to hundreds of parallel episodes for distributed training.
Limitations and Trade-offs
- Not Suitable For
Sub-millisecond latency requirements (queue adds overhead)
Synchronous (non-async) agent code (requires async/await)
- Design Trade-offs
Async Requirement: Both agent and environment must use async/await
Hidden Control Flow: Agent appears to block but yields control to environment (can surprise when debugging)
Further Reading
Core Concepts - Overview of ARES architecture
dm_env specification - The RL interface ARES implements