aws学习笔记第三十二课-深入使用cdkAPI-Gateway-event-bridge
aws(学习笔记第三十二课) 深入使用cdk(API Gateway + event bridge)
aws(学习笔记第三十二课) 深入使用cdk
- 使用
cdk生成aws API Gateway+lambda以及eventbridge等等
学习内容:
- 使用
aws API Gateway+lambda - 使用
event bridge练习producer和consumer
1. 使用 aws API Gateway + lambda
1.1. 以前的练习
以前的例子
这个例子中已经使用了手动创建,使用练习了
aws API Gateway+lambda使用
cdk来创建这里,采用
cdk的方式来创建API Gateway+lambda。
1.2. 使用 cdk 创建 API Gateway + lambda
整体架构

代码解析
创建
lambda函数base_lambda = _lambda.Function(self, 'ApiCorsLambda', handler='lambda-handler.handler', runtime=_lambda.Runtime.PYTHON_3_12, code=_lambda.Code.from_asset('lambda'))注意,这里没有创建
VPC,因为这里不需要显示的创建VPC。
创建
API并且添加resourcebase_api = _apigw.RestApi(self, 'ApiGatewayWithCors', rest_api_name='ApiGatewayWithCors') example_entity = base_api.root.add_resource( 'example', default_cors_preflight_options=_apigw.CorsOptions( allow_methods=['GET', 'OPTIONS'], allow_origins=_apigw.Cors.ALL_ORIGINS)
创建
LambdaIntegration将API和lambda进行绑定example_entity_lambda_integration = _apigw.LambdaIntegration( base_lambda, proxy=False, integration_responses=[ _apigw.IntegrationResponse( status_code="200", response_parameters={ 'method.response.header.Access-Control-Allow-Origin': "'*'" } ) ] )
对
API加入methodexample_entity.add_method( 'GET', example_entity_lambda_integration, method_responses=[ _apigw.MethodResponse( status_code="200", response_parameters={ 'method.response.header.Access-Control-Allow-Origin': True } ) ] )
1.3. 确认 cdk 创建 API Gateway + lambda
执行创建的
cdkcdk --require-approval never deploy查看创建的结果
lambda创建结果
API创建结果
确认
API的调用URL
访问
API的调用URL之后不要忘记
cdk destroy
2. 使用 event bridge 练习 producer 和 consumer
2.1. 代码链接
这里主要练习使用
eventbridge
,进行
producer
和
consumer
的练习。
2.2. 开始练习
整体架构

代码解析
生成
producer# # Producer Lambda # event_producer_lambda = _lambda.Function(self, "eventProducerLambda", runtime=_lambda.Runtime.PYTHON_3_12, handler="event_producer_lambda.lambda_handler", code=_lambda.Code.from_asset("lambda") ) event_policy = iam.PolicyStatement(effect=iam.Effect.ALLOW, resources=['*'], actions=['events:PutEvents']) event_producer_lambda.add_to_role_policy(event_policy)这里,
producer被赋予权限putEvents,因为之后要向eventbridge进行putEvents操作。
producer的处理代码def lambda_handler(event, context): eventbridge_client = boto3.client('events') request_body = event["body"] if request_body is None: request_body = "" # Structure of EventBridge Event eventbridge_event = { 'Time': datetime.datetime.now(), 'Source': 'com.mycompany.myapp', 'Detail': request_body, 'DetailType': 'service_status' } logger.info(eventbridge_event) # Send event to EventBridge response = eventbridge_client.put_events( Entries=[ eventbridge_event ] ) logger.info(response) # Returns success reponse to API Gateway return { "statusCode": 200, "body": json.dumps({ "result": "from Producer" }), }这里,使用了
boto3这个python package,AWS Boto3 是亚马逊云服务(AWS)官方提供的Python SDK,主要用于通过代码与AWS服务进行交互和管理。这里使用boto3向eventbridge进行putEvents。consumer1和consumer2# # Approved Consumer1 # event_consumer1_lambda = _lambda.Function(self, "eventConsumer1Lambda", runtime=_lambda.Runtime.PYTHON_3_8, handler="event_consumer_lambda.lambda_handler", code=_lambda.Code.from_asset("lambda") ) event_consumer1_rule = events.Rule(self, 'eventConsumer1LambdaRule', description='Approved Transactions', event_pattern=events.EventPattern(source=['com.mycompany.myapp'] )) event_consumer1_rule.add_target(targets.LambdaFunction(handler=event_consumer1_lambda)) # # Approved Consumer2 # event_consumer2_lambda = _lambda.Function(self, "eventConsumer2Lambda", runtime=_lambda.Runtime.PYTHON_3_8, handler="event_consumer_lambda.lambda_handler", code=_lambda.Code.from_asset("lambda") ) event_consumer2_rule = events.Rule(self, 'eventConsumer2LambdaRule', description='Approved Transactions', event_pattern=events.EventPattern(source=['com.mycompany.myapp'] )) event_consumer2_rule.add_target(targets.LambdaFunction(handler=event_consumer2_lambda))consumer1和consumer2类似,就是接受到了eventbridge的event之后,进行log输出。def lambda_handler(event, context): logger.info(event) return { "statusCode": 200, "body": json.dumps({ "result": "testing..." }), }
consumer3使用kinesisfirehose对event进行接受,并保存到S3 bucket上# # Approved Consumer3 # # Create S3 bucket for KinesisFirehose destination ingest_bucket = s3.Bucket(self, 'test-ngest-bucket') # Create a Role for KinesisFirehose firehose_role = iam.Role( self, 'myRole', assumed_by=iam.ServicePrincipal('firehose.amazonaws.com')) # Create and attach policy that gives permissions to write in to the S3 bucket. iam.Policy( self, 's3_attr', policy_name='s3kinesis', statements=[iam.PolicyStatement( actions=['s3:*'], resources=['arn:aws:s3:::' + ingest_bucket.bucket_name + '/*'])], # resources=['*'])], roles=[firehose_role], ) event_consumer3_kinesisfirehose = _firehose.CfnDeliveryStream(self, "consumer3-firehose", s3_destination_configuration=_firehose.CfnDeliveryStream.S3DestinationConfigurationProperty( bucket_arn=ingest_bucket.bucket_arn, buffering_hints=_firehose.CfnDeliveryStream.BufferingHintsProperty( interval_in_seconds=60 ), compression_format="UNCOMPRESSED", role_arn=firehose_role.role_arn )) event_consumer3_rule = events.Rule(self, 'eventConsumer3KinesisRule', description='Approved Transactions', event_pattern=events.EventPattern(source=['com.mycompany.myapp'] )) event_consumer3_rule.add_target(targets.KinesisFirehoseStream(stream=event_consumer3_kinesisfirehose))
将
producer通过API Gateway进行公开# defines an API Gateway REST API resource backed by our "atm_producer_lambda" function. api = api_gw.LambdaRestApi(self, 'SampleAPI-EventBridge-Multi-Consumer', handler=event_producer_lambda, proxy=False ) items = api.root.add_resource("items") items.add_method("POST") # POST /items
2.3. 代码部署的确认
接下来进行
cdk deploy
来确认执行效果。
一个
producer与两个consumer,都是lambda
第三个
consumer,一个kinesisFireHose
API的producer
2.4. 对部署进行测试
对
API进行测试调用返回了正常的结果。这里,需要对请求正文
request body一定要设定参数这里设置如下。{"item1":"123","item2":"234"}使用
API调用,之后启动producer的lamdba,向eventbridge进行putEvents。
对
consumer1和consumer2进行确认
对
consumer3进行确认这里主要是对
S3 bucket进行确认。可以看到,producer->event->eventbridge->consumer3->kinesis firehose->S3 bucket最后
cdk destroy。


















