AutoGPT/classic/original_autogpt/autogpt/app/main.py

775 lines
26 KiB
Python

"""
The application entry point. Can be invoked by a CLI or any other front end application.
"""
import enum
import logging
import math
import os
import re
import signal
import sys
from pathlib import Path
from types import FrameType
from typing import TYPE_CHECKING, Optional
from colorama import Fore, Style
from forge.agent_protocol.database import AgentDB
from forge.components.code_executor.code_executor import (
is_docker_available,
we_are_running_in_a_docker_container,
)
from forge.config.ai_directives import AIDirectives
from forge.config.ai_profile import AIProfile
from forge.file_storage import FileStorageBackendName, get_storage
from forge.llm.providers import MultiProvider
from forge.logging.config import configure_logging
from forge.logging.utils import print_attribute, speak
from forge.models.action import ActionInterruptedByHuman, ActionProposal
from forge.models.utils import ModelWithSummary
from forge.utils.const import FINISH_COMMAND
from forge.utils.exceptions import AgentTerminated, InvalidAgentResponseError
from autogpt.agent_factory.configurators import configure_agent_with_state, create_agent
from autogpt.agents.agent_manager import AgentManager
from autogpt.agents.prompt_strategies.one_shot import AssistantThoughts
from autogpt.app.config import (
AppConfig,
ConfigBuilder,
assert_config_has_required_llm_api_keys,
)
if TYPE_CHECKING:
from autogpt.agents.agent import Agent
from .configurator import apply_overrides_to_config
from .input import clean_input
from .setup import apply_overrides_to_ai_settings, interactively_revise_ai_settings
from .spinner import Spinner
from .utils import (
coroutine,
get_legal_warning,
markdown_to_ansi_style,
print_git_branch_info,
print_motd,
print_python_version_info,
)
@coroutine
async def run_auto_gpt(
continuous: bool = False,
continuous_limit: Optional[int] = None,
skip_reprompt: bool = False,
speak: bool = False,
debug: bool = False,
log_level: Optional[str] = None,
log_format: Optional[str] = None,
log_file_format: Optional[str] = None,
skip_news: bool = False,
install_plugin_deps: bool = False,
override_ai_name: Optional[str] = None,
override_ai_role: Optional[str] = None,
resources: Optional[list[str]] = None,
constraints: Optional[list[str]] = None,
best_practices: Optional[list[str]] = None,
override_directives: bool = False,
component_config_file: Optional[Path] = None,
):
# Set up configuration
config = ConfigBuilder.build_config_from_env()
# Storage
local = config.file_storage_backend == FileStorageBackendName.LOCAL
restrict_to_root = not local or config.restrict_to_workspace
file_storage = get_storage(
config.file_storage_backend,
root_path=Path("data"),
restrict_to_root=restrict_to_root,
)
file_storage.initialize()
# Set up logging module
if speak:
config.tts_config.speak_mode = True
configure_logging(
debug=debug,
level=log_level,
log_format=log_format,
log_file_format=log_file_format,
config=config.logging,
tts_config=config.tts_config,
)
await assert_config_has_required_llm_api_keys(config)
await apply_overrides_to_config(
config=config,
continuous=continuous,
continuous_limit=continuous_limit,
skip_reprompt=skip_reprompt,
skip_news=skip_news,
)
llm_provider = _configure_llm_provider(config)
logger = logging.getLogger(__name__)
if config.continuous_mode:
for line in get_legal_warning().split("\n"):
logger.warning(
extra={
"title": "LEGAL:",
"title_color": Fore.RED,
"preserve_color": True,
},
msg=markdown_to_ansi_style(line),
)
if not config.skip_news:
print_motd(logger)
print_git_branch_info(logger)
print_python_version_info(logger)
print_attribute("Smart LLM", config.smart_llm)
print_attribute("Fast LLM", config.fast_llm)
if config.continuous_mode:
print_attribute("Continuous Mode", "ENABLED", title_color=Fore.YELLOW)
if continuous_limit:
print_attribute("Continuous Limit", config.continuous_limit)
if config.tts_config.speak_mode:
print_attribute("Speak Mode", "ENABLED")
if we_are_running_in_a_docker_container() or is_docker_available():
print_attribute("Code Execution", "ENABLED")
else:
print_attribute(
"Code Execution",
"DISABLED (Docker unavailable)",
title_color=Fore.YELLOW,
)
# Let user choose an existing agent to run
agent_manager = AgentManager(file_storage)
existing_agents = agent_manager.list_agents()
load_existing_agent = ""
if existing_agents:
print(
"Existing agents\n---------------\n"
+ "\n".join(f"{i} - {id}" for i, id in enumerate(existing_agents, 1))
)
load_existing_agent = clean_input(
"Enter the number or name of the agent to run,"
" or hit enter to create a new one:",
)
if re.match(r"^\d+$", load_existing_agent.strip()) and 0 < int(
load_existing_agent
) <= len(existing_agents):
load_existing_agent = existing_agents[int(load_existing_agent) - 1]
if load_existing_agent != "" and load_existing_agent not in existing_agents:
logger.info(
f"Unknown agent '{load_existing_agent}', "
f"creating a new one instead.",
extra={"color": Fore.YELLOW},
)
load_existing_agent = ""
# Either load existing or set up new agent state
agent = None
agent_state = None
############################
# Resume an Existing Agent #
############################
if load_existing_agent:
agent_state = None
while True:
answer = clean_input("Resume? [Y/n]")
if answer == "" or answer.lower() == "y":
agent_state = agent_manager.load_agent_state(load_existing_agent)
break
elif answer.lower() == "n":
break
if agent_state:
agent = configure_agent_with_state(
state=agent_state,
app_config=config,
file_storage=file_storage,
llm_provider=llm_provider,
)
apply_overrides_to_ai_settings(
ai_profile=agent.state.ai_profile,
directives=agent.state.directives,
override_name=override_ai_name,
override_role=override_ai_role,
resources=resources,
constraints=constraints,
best_practices=best_practices,
replace_directives=override_directives,
)
if (
(current_episode := agent.event_history.current_episode)
and current_episode.action.use_tool.name == FINISH_COMMAND
and not current_episode.result
):
# Agent was resumed after `finish` -> rewrite result of `finish` action
finish_reason = current_episode.action.use_tool.arguments["reason"]
print(f"Agent previously self-terminated; reason: '{finish_reason}'")
new_assignment = clean_input(
"Please give a follow-up question or assignment:"
)
agent.event_history.register_result(
ActionInterruptedByHuman(feedback=new_assignment)
)
# If any of these are specified as arguments,
# assume the user doesn't want to revise them
if not any(
[
override_ai_name,
override_ai_role,
resources,
constraints,
best_practices,
]
):
ai_profile, ai_directives = await interactively_revise_ai_settings(
ai_profile=agent.state.ai_profile,
directives=agent.state.directives,
app_config=config,
)
else:
logger.info("AI config overrides specified through CLI; skipping revision")
######################
# Set up a new Agent #
######################
if not agent:
task = ""
while task.strip() == "":
task = clean_input(
"Enter the task that you want AutoGPT to execute,"
" with as much detail as possible:",
)
ai_profile = AIProfile()
additional_ai_directives = AIDirectives()
apply_overrides_to_ai_settings(
ai_profile=ai_profile,
directives=additional_ai_directives,
override_name=override_ai_name,
override_role=override_ai_role,
resources=resources,
constraints=constraints,
best_practices=best_practices,
replace_directives=override_directives,
)
# If any of these are specified as arguments,
# assume the user doesn't want to revise them
if not any(
[
override_ai_name,
override_ai_role,
resources,
constraints,
best_practices,
]
):
(
ai_profile,
additional_ai_directives,
) = await interactively_revise_ai_settings(
ai_profile=ai_profile,
directives=additional_ai_directives,
app_config=config,
)
else:
logger.info("AI config overrides specified through CLI; skipping revision")
agent = create_agent(
agent_id=agent_manager.generate_id(ai_profile.ai_name),
task=task,
ai_profile=ai_profile,
directives=additional_ai_directives,
app_config=config,
file_storage=file_storage,
llm_provider=llm_provider,
)
file_manager = agent.file_manager
if file_manager and not agent.config.allow_fs_access:
logger.info(
f"{Fore.YELLOW}"
"NOTE: All files/directories created by this agent can be found "
f"inside its workspace at:{Fore.RESET} {file_manager.workspace.root}",
extra={"preserve_color": True},
)
# TODO: re-evaluate performance benefit of task-oriented profiles
# # Concurrently generate a custom profile for the agent and apply it once done
# def update_agent_directives(
# task: asyncio.Task[tuple[AIProfile, AIDirectives]]
# ):
# logger.debug(f"Updating AIProfile: {task.result()[0]}")
# logger.debug(f"Adding AIDirectives: {task.result()[1]}")
# agent.state.ai_profile = task.result()[0]
# agent.state.directives = agent.state.directives + task.result()[1]
# asyncio.create_task(
# generate_agent_profile_for_task(
# task, app_config=config, llm_provider=llm_provider
# )
# ).add_done_callback(update_agent_directives)
# Load component configuration from file
if _config_file := component_config_file or config.component_config_file:
try:
logger.info(f"Loading component configuration from {_config_file}")
agent.load_component_configs(_config_file.read_text())
except Exception as e:
logger.error(f"Could not load component configuration: {e}")
#################
# Run the Agent #
#################
try:
await run_interaction_loop(agent)
except AgentTerminated:
agent_id = agent.state.agent_id
logger.info(f"Saving state of {agent_id}...")
# Allow user to Save As other ID
save_as_id = clean_input(
f"Press enter to save as '{agent_id}',"
" or enter a different ID to save to:",
)
# TODO: allow many-to-one relations of agents and workspaces
await agent.file_manager.save_state(
save_as_id.strip() if not save_as_id.isspace() else None
)
@coroutine
async def run_auto_gpt_server(
debug: bool = False,
log_level: Optional[str] = None,
log_format: Optional[str] = None,
log_file_format: Optional[str] = None,
install_plugin_deps: bool = False,
):
from .agent_protocol_server import AgentProtocolServer
config = ConfigBuilder.build_config_from_env()
# Storage
local = config.file_storage_backend == FileStorageBackendName.LOCAL
restrict_to_root = not local or config.restrict_to_workspace
file_storage = get_storage(
config.file_storage_backend,
root_path=Path("data"),
restrict_to_root=restrict_to_root,
)
file_storage.initialize()
# Set up logging module
configure_logging(
debug=debug,
level=log_level,
log_format=log_format,
log_file_format=log_file_format,
config=config.logging,
tts_config=config.tts_config,
)
await assert_config_has_required_llm_api_keys(config)
await apply_overrides_to_config(
config=config,
)
llm_provider = _configure_llm_provider(config)
# Set up & start server
database = AgentDB(
database_string=os.getenv("AP_SERVER_DB_URL", "sqlite:///data/ap_server.db"),
debug_enabled=debug,
)
port: int = int(os.getenv("AP_SERVER_PORT", default=8000))
server = AgentProtocolServer(
app_config=config,
database=database,
file_storage=file_storage,
llm_provider=llm_provider,
)
await server.start(port=port)
logging.getLogger().info(
f"Total OpenAI session cost: "
f"${round(sum(b.total_cost for b in server._task_budgets.values()), 2)}"
)
def _configure_llm_provider(config: AppConfig) -> MultiProvider:
multi_provider = MultiProvider()
for model in [config.smart_llm, config.fast_llm]:
# Ensure model providers for configured LLMs are available
multi_provider.get_model_provider(model)
return multi_provider
def _get_cycle_budget(continuous_mode: bool, continuous_limit: int) -> int | float:
# Translate from the continuous_mode/continuous_limit config
# to a cycle_budget (maximum number of cycles to run without checking in with the
# user) and a count of cycles_remaining before we check in..
if continuous_mode:
cycle_budget = continuous_limit if continuous_limit else math.inf
else:
cycle_budget = 1
return cycle_budget
class UserFeedback(str, enum.Enum):
"""Enum for user feedback."""
AUTHORIZE = "GENERATE NEXT COMMAND JSON"
EXIT = "EXIT"
TEXT = "TEXT"
async def run_interaction_loop(
agent: "Agent",
) -> None:
"""Run the main interaction loop for the agent.
Args:
agent: The agent to run the interaction loop for.
Returns:
None
"""
# These contain both application config and agent config, so grab them here.
app_config = agent.app_config
ai_profile = agent.state.ai_profile
logger = logging.getLogger(__name__)
cycle_budget = cycles_remaining = _get_cycle_budget(
app_config.continuous_mode, app_config.continuous_limit
)
spinner = Spinner(
"Thinking...", plain_output=app_config.logging.plain_console_output
)
stop_reason = None
def graceful_agent_interrupt(signum: int, frame: Optional[FrameType]) -> None:
nonlocal cycle_budget, cycles_remaining, spinner, stop_reason
if stop_reason:
logger.error("Quitting immediately...")
sys.exit()
if cycles_remaining in [0, 1]:
logger.warning("Interrupt signal received: shutting down gracefully.")
logger.warning(
"Press Ctrl+C again if you want to stop AutoGPT immediately."
)
stop_reason = AgentTerminated("Interrupt signal received")
else:
restart_spinner = spinner.running
if spinner.running:
spinner.stop()
logger.error(
"Interrupt signal received: stopping continuous command execution."
)
cycles_remaining = 1
if restart_spinner:
spinner.start()
def handle_stop_signal() -> None:
if stop_reason:
raise stop_reason
# Set up an interrupt signal for the agent.
signal.signal(signal.SIGINT, graceful_agent_interrupt)
#########################
# Application Main Loop #
#########################
# Keep track of consecutive failures of the agent
consecutive_failures = 0
while cycles_remaining > 0:
logger.debug(f"Cycle budget: {cycle_budget}; remaining: {cycles_remaining}")
########
# Plan #
########
handle_stop_signal()
# Have the agent determine the next action to take.
if not (_ep := agent.event_history.current_episode) or _ep.result:
with spinner:
try:
action_proposal = await agent.propose_action()
except InvalidAgentResponseError as e:
logger.warning(f"The agent's thoughts could not be parsed: {e}")
consecutive_failures += 1
if consecutive_failures >= 3:
logger.error(
"The agent failed to output valid thoughts"
f" {consecutive_failures} times in a row. Terminating..."
)
raise AgentTerminated(
"The agent failed to output valid thoughts"
f" {consecutive_failures} times in a row."
)
continue
else:
action_proposal = _ep.action
consecutive_failures = 0
###############
# Update User #
###############
# Print the assistant's thoughts and the next command to the user.
update_user(
ai_profile,
action_proposal,
speak_mode=app_config.tts_config.speak_mode,
)
##################
# Get user input #
##################
handle_stop_signal()
if cycles_remaining == 1: # Last cycle
feedback_type, feedback, new_cycles_remaining = await get_user_feedback(
app_config,
ai_profile,
)
if feedback_type == UserFeedback.AUTHORIZE:
if new_cycles_remaining is not None:
# Case 1: User is altering the cycle budget.
if cycle_budget > 1:
cycle_budget = new_cycles_remaining + 1
# Case 2: User is running iteratively and
# has initiated a one-time continuous cycle
cycles_remaining = new_cycles_remaining + 1
else:
# Case 1: Continuous iteration was interrupted -> resume
if cycle_budget > 1:
logger.info(
f"The cycle budget is {cycle_budget}.",
extra={
"title": "RESUMING CONTINUOUS EXECUTION",
"title_color": Fore.MAGENTA,
},
)
# Case 2: The agent used up its cycle budget -> reset
cycles_remaining = cycle_budget + 1
logger.info(
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=",
extra={"color": Fore.MAGENTA},
)
elif feedback_type == UserFeedback.EXIT:
logger.warning("Exiting...")
exit()
else: # user_feedback == UserFeedback.TEXT
pass
else:
feedback = ""
# First log new-line so user can differentiate sections better in console
print()
if cycles_remaining != math.inf:
# Print authorized commands left value
print_attribute(
"AUTHORIZED_COMMANDS_LEFT", cycles_remaining, title_color=Fore.CYAN
)
###################
# Execute Command #
###################
# Decrement the cycle counter first to reduce the likelihood of a SIGINT
# happening during command execution, setting the cycles remaining to 1,
# and then having the decrement set it to 0, exiting the application.
if not feedback:
cycles_remaining -= 1
if not action_proposal.use_tool:
continue
handle_stop_signal()
if not feedback:
result = await agent.execute(action_proposal)
else:
result = await agent.do_not_execute(action_proposal, feedback)
if result.status == "success":
logger.info(result, extra={"title": "SYSTEM:", "title_color": Fore.YELLOW})
elif result.status == "error":
logger.warning(
f"Command {action_proposal.use_tool.name} returned an error: "
f"{result.error or result.reason}"
)
def update_user(
ai_profile: AIProfile,
action_proposal: "ActionProposal",
speak_mode: bool = False,
) -> None:
"""Prints the assistant's thoughts and the next command to the user.
Args:
config: The program's configuration.
ai_profile: The AI's personality/profile
command_name: The name of the command to execute.
command_args: The arguments for the command.
assistant_reply_dict: The assistant's reply.
"""
logger = logging.getLogger(__name__)
print_assistant_thoughts(
ai_name=ai_profile.ai_name,
thoughts=action_proposal.thoughts,
speak_mode=speak_mode,
)
if speak_mode:
speak(f"I want to execute {action_proposal.use_tool.name}")
# First log new-line so user can differentiate sections better in console
print()
safe_tool_name = remove_ansi_escape(action_proposal.use_tool.name)
logger.info(
f"COMMAND = {Fore.CYAN}{safe_tool_name}{Style.RESET_ALL} "
f"ARGUMENTS = {Fore.CYAN}{action_proposal.use_tool.arguments}{Style.RESET_ALL}",
extra={
"title": "NEXT ACTION:",
"title_color": Fore.CYAN,
"preserve_color": True,
},
)
async def get_user_feedback(
config: AppConfig,
ai_profile: AIProfile,
) -> tuple[UserFeedback, str, int | None]:
"""Gets the user's feedback on the assistant's reply.
Args:
config: The program's configuration.
ai_profile: The AI's configuration.
Returns:
A tuple of the user's feedback, the user's input, and the number of
cycles remaining if the user has initiated a continuous cycle.
"""
logger = logging.getLogger(__name__)
# ### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
# Get key press: Prompt the user to press enter to continue or escape
# to exit
logger.info(
f"Enter '{config.authorise_key}' to authorise command, "
f"'{config.authorise_key} -N' to run N continuous commands, "
f"'{config.exit_key}' to exit program, or enter feedback for "
f"{ai_profile.ai_name}..."
)
user_feedback = None
user_input = ""
new_cycles_remaining = None
while user_feedback is None:
# Get input from user
console_input = clean_input(Fore.MAGENTA + "Input:" + Style.RESET_ALL)
# Parse user input
if console_input.lower().strip() == config.authorise_key:
user_feedback = UserFeedback.AUTHORIZE
elif console_input.lower().strip() == "":
logger.warning("Invalid input format.")
elif console_input.lower().startswith(f"{config.authorise_key} -"):
try:
user_feedback = UserFeedback.AUTHORIZE
new_cycles_remaining = abs(int(console_input.split(" ")[1]))
except ValueError:
logger.warning(
f"Invalid input format. "
f"Please enter '{config.authorise_key} -N'"
" where N is the number of continuous tasks."
)
elif console_input.lower() in [config.exit_key, "exit"]:
user_feedback = UserFeedback.EXIT
else:
user_feedback = UserFeedback.TEXT
user_input = console_input
return user_feedback, user_input, new_cycles_remaining
def print_assistant_thoughts(
ai_name: str,
thoughts: str | ModelWithSummary | AssistantThoughts,
speak_mode: bool = False,
) -> None:
logger = logging.getLogger(__name__)
thoughts_text = remove_ansi_escape(
thoughts.text
if isinstance(thoughts, AssistantThoughts)
else thoughts.summary()
if isinstance(thoughts, ModelWithSummary)
else thoughts
)
print_attribute(
f"{ai_name.upper()} THOUGHTS", thoughts_text, title_color=Fore.YELLOW
)
if isinstance(thoughts, AssistantThoughts):
print_attribute(
"REASONING", remove_ansi_escape(thoughts.reasoning), title_color=Fore.YELLOW
)
if assistant_thoughts_plan := remove_ansi_escape(
"\n".join(f"- {p}" for p in thoughts.plan)
):
print_attribute("PLAN", "", title_color=Fore.YELLOW)
# If it's a list, join it into a string
if isinstance(assistant_thoughts_plan, list):
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan)
elif isinstance(assistant_thoughts_plan, dict):
assistant_thoughts_plan = str(assistant_thoughts_plan)
# Split the input_string using the newline character and dashes
lines = assistant_thoughts_plan.split("\n")
for line in lines:
line = line.lstrip("- ")
logger.info(
line.strip(), extra={"title": "- ", "title_color": Fore.GREEN}
)
print_attribute(
"CRITICISM",
remove_ansi_escape(thoughts.self_criticism),
title_color=Fore.YELLOW,
)
# Speak the assistant's thoughts
if assistant_thoughts_speak := remove_ansi_escape(thoughts.speak):
if speak_mode:
speak(assistant_thoughts_speak)
else:
print_attribute(
"SPEAK", assistant_thoughts_speak, title_color=Fore.YELLOW
)
else:
speak(thoughts_text)
def remove_ansi_escape(s: str) -> str:
return s.replace("\x1B", "")