Artificial intelligence

A Coding Implementation Demonstration of ClawTeam’s Multi-Agent Swarm Orchestration with OpenAI Function Calling

SWARM_TOOLS = [
   {
       "type": "function",
       "function": {
           "name": "task_update",
           "description": "Update the status of a task. Use 'in_progress' when starting, 'completed' when done.",
           "parameters": {
               "type": "object",
               "properties": {
                   "task_id": {"type": "string", "description": "The task ID"},
                   "status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
                   "result": {"type": "string", "description": "Result or output of the task"},
               },
               "required": ["task_id", "status"],
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "inbox_send",
           "description": "Send a message to another agent (e.g., 'leader' or a worker name).",
           "parameters": {
               "type": "object",
               "properties": {
                   "to": {"type": "string", "description": "Recipient agent name"},
                   "message": {"type": "string", "description": "Message content"},
               },
               "required": ["to", "message"],
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "inbox_receive",
           "description": "Check and consume all messages in your inbox.",
           "parameters": {
               "type": "object",
               "properties": {},
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "task_list",
           "description": "List tasks assigned to you or all team tasks.",
           "parameters": {
               "type": "object",
               "properties": {
                   "owner": {"type": "string", "description": "Filter by owner name (optional)"},
               },
           },
       },
   },
]




class SwarmAgent:


   def __init__(
       self,
       name: str,
       role: str,
       system_prompt: str,
       task_board: TaskBoard,
       inbox: InboxSystem,
       registry: TeamRegistry,
   ):
       self.name = name
       self.role = role
       self.system_prompt = system_prompt
       self.task_board = task_board
       self.inbox = inbox
       self.registry = registry
       self.conversation_history: list[dict] = []
       self.inbox.register(name)
       self.registry.register(name, role)


   def _build_system_prompt(self) -> str:
       coord_protocol = f"""
## Coordination Protocol (auto-injected — you are agent '{self.name}')


You are part of an AI agent swarm. Your role: {self.role}
Your name: {self.name}


Available tools (equivalent to ClawTeam CLI):
- task_list: Check your assigned tasks (like `clawteam task list`)
- task_update: Update task status to in_progress/completed/failed (like `clawteam task update`)
- inbox_send: Send messages to other agents (like `clawteam inbox send`)
- inbox_receive: Check your inbox for messages (like `clawteam inbox receive`)


WORKFLOW:
1. Check your tasks with task_list
2. Mark a task as in_progress when you start
3. Do the work (think, analyze, produce output)
4. Mark the task as completed with your result
5. Send a summary message to 'leader' when done
"""
       return self.system_prompt + "n" + coord_protocol


   def _handle_tool_call(self, tool_name: str, args: dict) -> str:
       if tool_name == "task_update":
           status = TaskStatus(args["status"])
           result = args.get("result", "")
           self.task_board.update_status(args["task_id"], status, result)
           if status == TaskStatus.COMPLETED:
               self.registry.increment_completed(self.name)
           return json.dumps({"ok": True, "task_id": args["task_id"], "new_status": args["status"]})


       elif tool_name == "inbox_send":
           self.inbox.send(self.name, args["to"], args["message"])
           return json.dumps({"ok": True, "sent_to": args["to"]})


       elif tool_name == "inbox_receive":
           msgs = self.inbox.receive(self.name)
           if not msgs:
               return json.dumps({"messages": [], "note": "No new messages"})
           return json.dumps({
               "messages": [
                   {"from": m.sender, "content": m.content, "time": m.timestamp}
                   for m in msgs
               ]
           })


       elif tool_name == "task_list":
           owner = args.get("owner", self.name)
           tasks = self.task_board.get_tasks(owner=owner)
           return json.dumps({"tasks": [t.to_dict() for t in tasks]})


       return json.dumps({"error": f"Unknown tool: {tool_name}"})


   def run(self, user_message: str, max_iterations: int = 6) -> str:
       self.conversation_history.append({"role": "user", "content": user_message})


       for iteration in range(max_iterations):
           try:
               response = client.chat.completions.create(
                   model=MODEL,
                   messages=[
                       {"role": "system", "content": self._build_system_prompt()},
                       *self.conversation_history,
                   ],
                   tools=SWARM_TOOLS,
                   tool_choice="auto",
                   temperature=0.4,
               )
           except Exception as e:
               return f"[API Error] {e}"


           choice = response.choices[0]
           msg = choice.message


           assistant_msg = {"role": "assistant", "content": msg.content or ""}
           if msg.tool_calls:
               assistant_msg["tool_calls"] = [
                   {
                       "id": tc.id,
                       "type": "function",
                       "function": {"name": tc.function.name, "arguments": tc.function.arguments},
                   }
                   for tc in msg.tool_calls
               ]
           self.conversation_history.append(assistant_msg)


           if not msg.tool_calls:
               return msg.content or "(No response)"


           for tc in msg.tool_calls:
               fn_name = tc.function.name
               fn_args = json.loads(tc.function.arguments)
               result = self._handle_tool_call(fn_name, fn_args)
               self.conversation_history.append({
                   "role": "tool",
                   "tool_call_id": tc.id,
                   "content": result,
               })


       return "(Agent reached max iterations)"

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button