Skip to content

Reasoning

Reasoning Engine

director.core.reasoning.ReasoningEngine

ReasoningEngine(input_message, session)

The Reasoning Engine is the core class that directly interfaces with the user. It interprets natural language input in any conversation and orchestrates agents to fulfill the user's requests. The primary functions of the Reasoning Engine are:

  • Maintain Context of Conversational History: Manage memory, context limits, input, and output experiences to ensure coherent and context-aware interactions.
  • Natural Language Understanding (NLU): Uses LLMs of your choice to have understanding of the task.
  • Intelligent Reference Deduction: Intelligently deduce references to previous messages, outputs, files, agents, etc., to provide relevant and accurate responses.
  • Agent Orchestration: Decide on agents and their workflows to fulfill requests. Multiple strategies can be employed to create agent workflows, such as step-by-step processes or chaining of agents provided by default.
  • Final Control Over Conversation Flow: Maintain ultimate control over the flow of conversation with the user, ensuring coherence and goal alignment.

Initialize the ReasoningEngine with the input message and session.

Parameters:

Name Type Description Default
input_message InputMessage

The input message to the reasoning engine.

required
session Session

The session instance.

required
Source code in backend/director/core/reasoning.py
def __init__(
    self,
    input_message: InputMessage,
    session: Session,
):
    """Initialize the ReasoningEngine with the input message and session.

    :param input_message: The input message to the reasoning engine.
    :param session: The session instance.
    """
    self.input_message = input_message
    self.session = session
    self.system_prompt = REASONING_SYSTEM_PROMPT
    self.max_iterations = 10
    self.llm = OpenAI()
    self.agents: List[BaseAgent] = []
    self.stop_flag = False
    self.output_message: OutputMessage = self.session.output_message
    self.summary_content = None
    self.failed_agents = []

register_agents

register_agents(agents)

Register an agents.

Parameters:

Name Type Description Default
agents List[BaseAgent]

The list of agents to register.

required
Source code in backend/director/core/reasoning.py
def register_agents(self, agents: List[BaseAgent]):
    """Register an agents.

    :param agents: The list of agents to register.
    """
    self.agents.extend(agents)

build_context

build_context()

Build the context for the reasoning engine it adds the information about the video or collection to the reasoning context.

Source code in backend/director/core/reasoning.py
def build_context(self):
    """Build the context for the reasoning engine it adds the information about the video or collection to the reasoning context."""
    input_context = ContextMessage(
        content=self.input_message.content, role=RoleTypes.user
    )
    if self.session.reasoning_context:
        self.session.reasoning_context.append(input_context)
    else:
        if self.session.video_id:
            video = self.session.state["video"]
            self.session.reasoning_context.append(
                ContextMessage(
                    content=self.system_prompt
                    + f"""\nThis is a video in the collection titled {self.session.state["collection"].name} collection_id is {self.session.state["collection"].id} \nHere is the video refer to this for search, summary and editing \n- title: {video.name}, video_id: {video.id}, media_description: {video.description}, length: {video.length}"""
                )
            )
        else:
            videos = self.session.state["collection"].get_videos()
            video_title_list = []
            for video in videos:
                video_title_list.append(
                    f"\n- title: {video.name}, video_id: {video.id}, media_description: {video.description}, length: {video.length}, video_stream: {video.stream_url}"
                )
            video_titles = "\n".join(video_title_list)
            images = self.session.state["collection"].get_images()
            image_title_list = []
            for image in images:
                image_title_list.append(
                    f"\n- title: {image.name}, image_id: {image.id}, url: {image.url}"
                )
            image_titles = "\n".join(image_title_list)
            self.session.reasoning_context.append(
                ContextMessage(
                    content=self.system_prompt
                    + f"""\nThis is a collection of videos and the collection description is {self.session.state["collection"].description} and collection_id is {self.session.state["collection"].id} \n\nHere are the videos in this collection user may refer to them for search, summary and editing {video_titles}\n\nHere are the images in this collection {image_titles}"""
                )
            )
        self.session.reasoning_context.append(input_context)

run_agent

run_agent(agent_name, *args, **kwargs)

Run an agent with the given name and arguments.

Parameters:

Name Type Description Default
agent_name str

The name of the agent to run

required
args

The arguments to pass to the agent

()
kwargs

The keyword arguments to pass to the agent

{}

Returns:

Type Description
AgentResponse

The response from the agent

Source code in backend/director/core/reasoning.py
def run_agent(self, agent_name: str, *args, **kwargs) -> AgentResponse:
    """Run an agent with the given name and arguments.

    :param str agent_name: The name of the agent to run
    :param args: The arguments to pass to the agent
    :param kwargs: The keyword arguments to pass to the agent
    :return: The response from the agent
    """
    print("-" * 40, f"Running {agent_name} Agent", "-" * 40)
    print(kwargs, "\n\n")

    agent = next(
        (agent for agent in self.agents if agent.agent_name == agent_name), None
    )
    self.output_message.actions.append(f"Running @{agent_name} agent")
    self.output_message.agents.append(agent_name)
    self.output_message.push_update()
    return agent.safe_call(*args, **kwargs)

stop

stop()

Flag the tool to stop processing and exit the run() thread.

Source code in backend/director/core/reasoning.py
def stop(self):
    """Flag the tool to stop processing and exit the run() thread."""
    self.stop_flag = True

step

step()

Run a single step of the reasoning engine.

Source code in backend/director/core/reasoning.py
def step(self):
    """Run a single step of the reasoning engine."""
    status = AgentStatus.ERROR
    temp_messages = []
    max_tries = 1
    tries = 0

    while status != AgentStatus.SUCCESS:
        if self.stop_flag:
            break

        tries += 1
        if tries > max_tries:
            break
        print("-" * 40, "Context", "-" * 40)
        print(
            [message.to_llm_msg() for message in self.session.reasoning_context],
            "\n\n",
        )
        llm_response: LLMResponse = self.llm.chat_completions(
            messages=[
                message.to_llm_msg() for message in self.session.reasoning_context
            ]
            + temp_messages,
            tools=[agent.to_llm_format() for agent in self.agents],
        )
        logger.info(f"LLM Response: {llm_response}")

        if not llm_response.status:
            self.output_message.content.append(
                TextContent(
                    text=llm_response.content,
                    status=MsgStatus.error,
                    status_message="Error in reasoning",
                    agent_name="assistant",
                )
            )
            self.output_message.actions.append("Failed to reason the message")
            self.output_message.status = MsgStatus.error
            self.output_message.publish()
            self.stop()
            break

        if llm_response.tool_calls:
            if self.summary_content:
                self.remove_summary_content()

            self.session.reasoning_context.append(
                ContextMessage(
                    content=llm_response.content,
                    tool_calls=llm_response.tool_calls,
                    role=RoleTypes.assistant,
                )
            )
            for tool_call in llm_response.tool_calls:
                agent_response: AgentResponse = self.run_agent(
                    tool_call["tool"]["name"],
                    **tool_call["tool"]["arguments"],
                )
                if agent_response.status == AgentStatus.ERROR:
                    self.failed_agents.append(tool_call["tool"]["name"])
                self.session.reasoning_context.append(
                    ContextMessage(
                        content=agent_response.__str__(),
                        tool_call_id=tool_call["id"],
                        role=RoleTypes.tool,
                    )
                )
                print("-" * 40, "Agent Response", "-" * 40)
                print(agent_response, "\n\n")
                status = agent_response.status

        if not self.summary_content:
            self.add_summary_content()

        if (
            llm_response.finish_reason == "stop"
            or llm_response.finish_reason == "end_turn"
            or self.iterations == 0
        ):
            self.session.reasoning_context.append(
                ContextMessage(
                    content=llm_response.content,
                    role=RoleTypes.assistant,
                )
            )
            if self.iterations == self.max_iterations - 1:
                # Direct response case
                self.summary_content.status_message = "Here is the response"
                self.summary_content.text = llm_response.content
                self.summary_content.status = MsgStatus.success
            else:
                self.session.reasoning_context.append(
                    ContextMessage(
                        content=SUMMARIZATION_PROMPT.format(
                            query=self.input_message.content
                        ),
                        role=RoleTypes.system,
                    )
                )
                summary_response = self.llm.chat_completions(
                    messages=[
                        message.to_llm_msg()
                        for message in self.get_current_run_context()
                    ]
                )
                self.summary_content.text = summary_response.content
                if self.failed_agents:
                    self.summary_content.status = MsgStatus.error
                else:
                    self.summary_content.status = MsgStatus.success
                self.summary_content.status_message = "Final Cut"
            self.output_message.status = MsgStatus.success
            self.output_message.publish()
            print("-" * 40, "Stopping", "-" * 40)
            self.stop()
            break

run

run(max_iterations=None)

Run the reasoning engine.

Parameters:

Name Type Description Default
max_iterations int

The number of max_iterations to run the reasoning engine

None
Source code in backend/director/core/reasoning.py
def run(self, max_iterations: int = None):
    """Run the reasoning engine.

    :param int max_iterations: The number of max_iterations to run the reasoning engine
    """
    self.iterations = max_iterations or self.max_iterations
    self.build_context()
    self.output_message.actions.append("Reasoning the message..")
    self.output_message.push_update()

    it = 0
    while self.iterations > 0:
        self.iterations -= 1
        print("-" * 40, "Reasoning Engine Iteration", it, "-" * 40)
        if self.stop_flag:
            break

        self.step()
        it = it + 1

    self.session.save_context_messages()
    print("-" * 40, "Reasoning Engine Finished", "-" * 40)