Create and test function in AWS Lambda
Our addition function
import json
def lambda_handler(event, context):
json_input = event
if 'body' in event: # it can be an external request via API gateway
json_input = json.loads(event['body'])
if 'a' in json_input and 'b' in json_input:
a = int(json_input['a'])
b = int(json_input['b'])
return a + b
else:
return "No input found"
Code explanation
def lambda_handler(event, context):
The lambda_handler is the entry point for any AWS Lambda function.
event:
This parameter contains the input data for the function.
This data is sent to the function when it's invoked.
context:
This parameter provides information about the invocation, function,
and execution environment.
if 'body' in event:
This checks if the event dictionary contains a key named body.
This is crucial because AWS API Gateway
(a common way to trigger Lambda functions)
often sends data within a body key.
Add trigger
Check API endpoint
Invoke function via API
curl --header "Content-Type: application/json" --request POST --data "{\"a\": \"2\", \"b\": \"3\"}" <API endpoint>
5
{"message":"Internal Server Error"}
import json
def lambda_handler(event, context):
# Initialize response
response = {
'statusCode': 200,
'body': ''
}
json_input = event
if 'body' in event: # it can be an external request via API gateway
json_input = json.loads(event['body'])
if 'a' in json_input and 'b' in json_input:
a = int(json_input['a'])
b = int(json_input['b'])
result = a + b
response['body'] = json.dumps({'result': result})
return response
else:
response['body'] = json.dumps({'error': 'No input found'})
return response
Locust for load testing
目标:
- 实践实验:入门无服务器计算
议程:
- 部署您的第一个无服务器应用程序
- 作业介绍
创建和测试 AWS Lambda 函数
- 让我们使用 AWS Lambda 创建一个加法函数。
要创建加法函数:
- 导航到 AWS Lambda 控制台并点击“创建函数”。
- 选择“从头开始创建”,为您的函数命名,选择 Python 3.12 作为运行时,并从现有角色列表中选择 LabRole 执行角色。
- 在提供的 lambda_function.py 模板中实现加法函数。
- 部署函数。
- 加法函数代码
-
import json def lambda_handler(event, context): json_input = event if 'body' in event: # 通过 API 网关的外部请求可能带有 body json_input = json.loads(event['body']) if 'a' in json_input and 'b' in json_input: a = int(json_input['a']) b = int(json_input['b']) return a + b else: return "未找到输入"
lambda_handler
是任何 AWS Lambda 函数的入口。
event
: 包含函数的输入数据。此数据是在函数被调用时发送的。context
: 提供关于调用、函数和执行环境的信息。
if 'body' in event:
此处检查 event
字典中是否包含名为 body
的键。
这很重要,因为 AWS API Gateway(触发 Lambda 函数的常见方式)通常会在 body
键中发送数据。
添加触发器
- 添加 API Gateway 触发器:
- 转到函数的概览页面,点击“添加触发器”。
- 选择“API Gateway”作为触发源。
- 创建一个新 API:选择“HTTP API”作为 API 类型,并选择“开放”作为安全设置。
- 点击“添加”。
检查 API 端点
通过 API 调用函数
- 发送 POST 请求
curl --header "Content-Type: application/json" --request POST --data "{\"a\": \"2\", \"b\": \"3\"}" <API 端点>
我直接用curl会报错:
PS C:\Users\Eric1> curl --header "Content-Type: application/json" --request POST --data "{\"a\": \"2\", \"b\": \"3\"}" https://qgb4uu66o3.execute-api.us-east-1.amazonaws.com/default/myFirstLambda
Invoke-WebRequest : 找不到接受实际参数“Content-Type: application/json”的位置形式参数。
所在位置 行:1 字符: 1
+ curl --header "Content-Type: application/json" --request POST --data ...
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ CategoryInfo : InvalidArgument: (:) [Invoke-WebRequest],ParameterBindingException
+ FullyQualifiedErrorId : PositionalParameterNotFound,Microsoft.PowerShell.Commands.InvokeWebRequestCommand
PS C:\Users\Eric1> Invoke-RestMethod -Uri "https://qgb4uu66o3.execute-api.us-east-1.amazonaws.com/default/myFirstLambda" -Method Post -Body '{"a": "2", "b": "3"}' -ContentType "application/json"
result
------
5
-
预期响应:
- 5
-
实际得到:
{"message":"Internal Server Error"}
很神奇,后来又好了:
C:\Users\Eric1>curl --header "Content-Type: application/json" --request POST --data "{\"a\": \"2\", \"b\": \"3\"}" https://qgb4uu66o3.execute-api.us-east-1.amazonaws.com/default/MyAddition
{"result": 5}
为什么会这样? 通过 API Gateway 触发的 AWS Lambda 函数必须返回特定的结构,包括 statusCode
和 body
。如果没有这种结构,API Gateway 可能会返回 500 内部服务器错误。
我们来修改我们的响应格式!
import json
def lambda_handler(event, context):
# 初始化响应
response = {
'statusCode': 200,
'body': ''
}
json_input = event
if 'body' in event: # 通过 API 网关的外部请求可能带有 body
json_input = json.loads(event['body'])
if 'a' in json_input and 'b' in json_input:
a = int(json_input['a'])
b = int(json_input['b'])
result = a + b
response['body'] = json.dumps({'result': result})
return response
else:
response['body'] = json.dumps({'error': '未找到输入'})
return response
- 更全面的错误处理: 引入了
try-except
块来捕获和处理不同的错误(JSON 解析、缺少参数和意外错误)。 - 更具信息性的错误响应: 针对不同的失败场景提供了特定的错误消息。
我们接下来会做什么:
- 开发用于简单机器学习模型推理的 Lambda 函数。
- 使用 AWS Lambda 和 API Gateway 部署机器学习模型作为无服务器应用程序。
- 分析与无服务器函数相关的冷启动现象。
使用的环境:
- 本地机器
- AWS 控制台
- Google Colab
使用 Locust 进行负载测试