From d6ab65d7cf57e882bb2968bb9c00e26fb009518a Mon Sep 17 00:00:00 2001 From: htom Date: Fri, 15 Aug 2025 16:14:02 +0200 Subject: [PATCH] end of project --- calculator/main.py | 23 +-------- calculator/pkg/calculator.py | 2 +- call_function.py | 53 ++++++++++++++++++++ config.py | 15 ++++++ functions/get_file_content.py | 19 ++++++++ functions/get_files_info.py | 16 ++++++ functions/run_python.py | 57 ++++++++++++++++++++++ functions/write_file.py | 23 +++++++++ main.py | 92 +++++++++++++++++++++++++++++------ tests.py | 19 ++++++-- 10 files changed, 278 insertions(+), 41 deletions(-) create mode 100644 call_function.py create mode 100644 config.py create mode 100644 functions/run_python.py diff --git a/calculator/main.py b/calculator/main.py index a34e44a..b7b760a 100644 --- a/calculator/main.py +++ b/calculator/main.py @@ -1,22 +1 @@ -import sys -from pkg.calculator import Calculator -from pkg.render import render - -def main(): - calculator = Calculator() - if len(sys.argv) <= 1: - print("Calculator App") - print("Usage: python main.py \"\"") - print("Example: python main.py \"3 + 5\"") - return - - expression = " ".join(sys.argv[1:]) - try: - result = calculator.evaluate(expression) - to_print = render(expression, result) - print(to_print) - except Exception as e: - print(f"Error: {e}") - -if __name__ == "__main__": - main() +print(3 + 7 * 2) \ No newline at end of file diff --git a/calculator/pkg/calculator.py b/calculator/pkg/calculator.py index b78c7cd..903199e 100644 --- a/calculator/pkg/calculator.py +++ b/calculator/pkg/calculator.py @@ -9,7 +9,7 @@ class Calculator: "/": lambda a, b: a / b, } self.precedence = { - "+": 1, + "+": 3, "-": 1, "*": 2, "/": 2, diff --git a/call_function.py b/call_function.py new file mode 100644 index 0000000..b510996 --- /dev/null +++ b/call_function.py @@ -0,0 +1,53 @@ +from google.genai import types + +from functions.get_files_info import get_files_info, schema_get_files_info +from functions.get_file_content import get_file_content, schema_get_file_content +from functions.write_file import schema_write_file, write_file +from functions.run_python import run_python_file, schema_run_python_file +from config import WORKING_DIR + +available_functions = types.Tool( + function_declarations=[ + schema_get_files_info, + schema_get_file_content, + schema_run_python_file, + schema_write_file, + ] +) + +def call_function(function_call_part, verbose=False): + if verbose: + print( + f" - Calling function: {function_call_part.name}({function_call_part.args})" + ) + else: + print(f" - Calling function: {function_call_part.name}") + function_map = { + "get_files_info": get_files_info, + "get_file_content": get_file_content, + "run_python_file": run_python_file, + "write_file": write_file, + } + function_name = function_call_part.name + if function_name not in function_map: + return types.Content( + role="tool", + parts=[ + types.Part.from_function_response( + name=function_name, + response={"error": f"Unknown function: {function_name}"}, + ) + ], + ) + args = dict(function_call_part.args) + args["working_directory"] = WORKING_DIR + function_result = function_map[function_name](**args) + return types.Content( + role="tool", + parts=[ + types.Part.from_function_response( + name=function_name, + response={"result": function_result}, + ) + ], + ) diff --git a/config.py b/config.py new file mode 100644 index 0000000..f032be4 --- /dev/null +++ b/config.py @@ -0,0 +1,15 @@ +SYSTEM_PROMPT = """ +You are a helpful AI coding agent. + +When a user asks a question or makes a request, make a function call plan. You can perform the following operations: + +- List files and directories +- Read file contents +- Execute Python files with optional arguments +- Write or overwrite files + +All paths you provide should be relative to the working directory. You do not need to specify the working directory in your function calls as it is automatically injected for security reasons. +""" + +WORKING_DIR = "./calculator" +MAX_ITERS = 20 diff --git a/functions/get_file_content.py b/functions/get_file_content.py index 6d94692..321e2da 100644 --- a/functions/get_file_content.py +++ b/functions/get_file_content.py @@ -1,5 +1,6 @@ import os from functions.config import FILE_LENGTH_LIMIT +from google.genai import types def get_file_content(working_directory, file_path): abs_working_dir = os.path.abspath(working_directory) @@ -18,3 +19,21 @@ def get_file_content(working_directory, file_path): return content except Exception as e: return f'Error reading file "{file_path}": {e}' + +schema_get_file_content = types.FunctionDeclaration( + name="get_file_content", + description="Read file contents.", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "working_directory": types.Schema( + type=types.Type.STRING, + description="The directory to read the file from. If not provided, lists files in the working directory itself.", + ), + "file_path": types.Schema( + type=types.Type.STRING, + description="The file to read the content from. If not provided, the function will not work." + ) + }, + ), +) diff --git a/functions/get_files_info.py b/functions/get_files_info.py index 8533fd4..edb768d 100644 --- a/functions/get_files_info.py +++ b/functions/get_files_info.py @@ -1,4 +1,6 @@ import os +from google.genai import types + def get_files_info(working_directory, directory="."): abs_working_dir = os.path.abspath(working_directory) @@ -20,3 +22,17 @@ def get_files_info(working_directory, directory="."): return "\n".join(files_info) except Exception as e: return f"Error listing files: {e}" + +schema_get_files_info = types.FunctionDeclaration( + name="get_files_info", + description="Lists files in the specified directory along with their sizes, constrained to the working directory.", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "directory": types.Schema( + type=types.Type.STRING, + description="The directory to list files from, relative to the working directory. If not provided, lists files in the working directory itself.", + ), + }, + ), +) diff --git a/functions/run_python.py b/functions/run_python.py new file mode 100644 index 0000000..a425d87 --- /dev/null +++ b/functions/run_python.py @@ -0,0 +1,57 @@ +import os +import subprocess +from google.genai import types + +def run_python_file(working_directory, file_path, args=[]): + abs_working_dir = os.path.abspath(working_directory) + abs_file_path = os.path.abspath(os.path.join(working_directory, file_path)) + if not abs_file_path.startswith(abs_working_dir): + return f'Error: Cannot execute "{file_path}" as it is outside the permitted working directory' + if not os.path.isfile(abs_file_path): + return f'Error: File "{file_path}" not found' + if not abs_file_path.endswith(".py"): + return f'Error: "{file_path}" is not a Python file.' + + try: + completed = subprocess.run(args=["python3", abs_file_path]+args, capture_output=True, cwd=abs_working_dir, timeout=30) + + if completed == None: + raise Exception("Subprocess holder is empty") + + if completed.stdout == None or completed.stdout == "": + return f"No output produced." + + message = f"STDOUT: {completed.stdout.decode()}. STDERR: {completed.stderr.decode()}." + if completed.returncode != 0: + message += f" Error: Process exited with code {completed.returncode}" + + except Exception as e: + return f"Error: executing Python file: {e}" + + return message + +schema_run_python_file = types.FunctionDeclaration( + name="run_python_file", + description="Run the python file", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "working_directory": types.Schema( + type=types.Type.STRING, + description="The directory to read the file from. If not provided, lists files in the working directory itself.", + ), + "file_path": types.Schema( + type=types.Type.STRING, + description="The file to execute. If not provided, the function will not work." + ), + "args": types.Schema( + type=types.Type.ARRAY, + items=types.Schema( + type=types.Type.STRING, + description="Optional arguments to pass to the Python file.", + ), + description="Arguments to run the python file with." + ) + }, + ), +) diff --git a/functions/write_file.py b/functions/write_file.py index 3d21d5f..468e620 100644 --- a/functions/write_file.py +++ b/functions/write_file.py @@ -1,4 +1,5 @@ import os +from google.genai import types def write_file(working_directory, file_path, content): abs_working_dir = os.path.abspath(working_directory) @@ -15,3 +16,25 @@ def write_file(working_directory, file_path, content): return f'Successfully wrote to "{file_path}" ({len(content)} characters written)' except Exception as e: print(f"Error: {e}") + +schema_write_file = types.FunctionDeclaration( + name="write_file", + description="Write file.", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "working_directory": types.Schema( + type=types.Type.STRING, + description="The directory to read the file from. If not provided, lists files in the working directory itself.", + ), + "file_path": types.Schema( + type=types.Type.STRING, + description="The file to write the content to. If not provided, the function will not work." + ), + "content": types.Schema( + type=types.Type.STRING, + description="Content to write to the file." + ) + }, + ), +) diff --git a/main.py b/main.py index 9cf3820..9d61ca4 100644 --- a/main.py +++ b/main.py @@ -5,28 +5,92 @@ from dotenv import load_dotenv from google import genai from google.genai import types +from config import SYSTEM_PROMPT, MAX_ITERS +from call_function import available_functions, call_function + def main(): - print("Hello from python-ai-agent!") - print('argv', sys.argv) - if len(sys.argv) < 2 or str(sys.argv[1]) == "--verbose": + load_dotenv() + + verbose = "--verbose" in sys.argv + args = [] + for arg in sys.argv[1:]: + if not arg.startswith("--"): + args.append(arg) + + if not args: + print("AI Code Assistant") + print('\nUsage: python main.py "your prompt here" [--verbose]') + print('Example: python main.py "How do I fix the calculator?"') sys.exit(1) - load_dotenv() api_key = os.environ.get("GEMINI_API_KEY") - client = genai.Client(api_key=api_key) - + + user_prompt = " ".join(args) + + if verbose: + print(f"User prompt: {user_prompt}\n") + messages = [ - types.Content(role="user", parts=[types.Part(text=sys.argv[1])]), + types.Content(role="user", parts=[types.Part(text=user_prompt)]), ] - response = client.models.generate_content(model="gemini-2.0-flash-001", contents=messages) - print(f"Response: {response.text}") - - if "--verbose" in sys.argv: - print(f"User prompt: {sys.argv[1]}") - print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}") - print(f"Response tokens: {response.usage_metadata.candidates_token_count}") + iters = 0 + while True: + iters += 1 + if iters > MAX_ITERS: + print(f"Maximum iterations ({MAX_ITERS}) reached.") + sys.exit(1) + + try: + final_response = generate_content(client, messages, verbose) + if final_response: + print("Final response:") + print(final_response) + break + except Exception as e: + print(f"Error in generate_content: {e}") + + generate_content(client, messages, verbose) + + +def generate_content(client, messages, verbose): + response = client.models.generate_content( + model="gemini-2.0-flash-001", + contents=messages, + config=types.GenerateContentConfig( + tools=[available_functions], system_instruction=SYSTEM_PROMPT + ), + ) + if verbose: + print("Prompt tokens:", response.usage_metadata.prompt_token_count) + print("Response tokens:", response.usage_metadata.candidates_token_count) + + + if response.candidates: + for candidate in response.candidates: + function_call_content = candidate.content + messages.append(function_call_content) + + if not response.function_calls: + return response.text + + function_responses = [] + for function_call_part in response.function_calls: + function_call_result = call_function(function_call_part, verbose) + if ( + not function_call_result.parts + or not function_call_result.parts[0].function_response + ): + raise Exception("empty function call result") + if verbose: + print(f"-> {function_call_result.parts[0].function_response.response}") + function_responses.append(function_call_result.parts[0]) + + if not function_responses: + raise Exception("no function responses generated, exiting.") + + messages.append(types.Content(role="user", parts=function_responses)) if __name__ == "__main__": main() diff --git a/tests.py b/tests.py index a51dabd..ed9a867 100644 --- a/tests.py +++ b/tests.py @@ -1,9 +1,20 @@ -from functions.write_file import write_file +from functions.run_python import run_python_file def test(): - print(write_file("calculator", "lorem.txt", "wait, this isn't lorem ipsum")) - print(write_file("calculator", "pkg/morelorem.txt", "lorem ipsum dolor sit amet")) - print(write_file("calculator", "/tmp/temp.txt", "this should not be allowed")) + res = run_python_file("calculator", "main.py") + print(res) + + res = run_python_file("calculator", "main.py", ["3 + 5"]) + print(res) + + res = run_python_file("calculator", "tests.py") + print(res) + + res = run_python_file("calculator", "../main.py") + print(res) + + res = run_python_file("calculator", "nonexistent.py") + print(res) if __name__ == "__main__": test()