AWS实战:Dynamodb到Redshift数据同步

news2025/6/8 9:24:10

AWS Dynamodb简介

  • Amazon DynamoDB 是一种完全托管式、无服务器的 NoSQL 键值数据库,旨在运行任何规模的高性能应用程序。
  • DynamoDB能在任何规模下实现不到10毫秒级的一致响应,并且它的存储空间无限,可在任何规模提供可靠的性能。
  • DynamoDB 提供内置安全性、连续备份、自动多区域复制、内存缓存和数据导出工具。

Redshift简介

  • Amazon Redshift是一个快速、功能强大、完全托管的PB级别数据仓库服务。用户可以在刚开始使用几百GB的数据,然后在后期扩容到PB级别的数据容量。
  • Redshift是一种联机分析处理OLAP(Online Analytics Processing)的类型,支持复杂的分析操作,侧重决策支持,并且能提供直观易懂的查询结果。

资源准备

VPC

  • vpc
    • cird block: 10.10.0.0/16
  • internet gateway
  • elastic ip address
  • nat gateway:使用elastic ip address作为public ip
  • public subnet
    • 三个Availability Zone
  • private subnet
    • 三个Availability Zone
  • public route table:public subnet关联的route table
    • destination: 0.0.0.0/0 target: internet-gateway-id(允许与外界进行通信)
    • destination:10.10.0.0/16 local(内部通信)
  • private route table:private subnet关联的route table
    • destination:10.10.0.0/16 local(内部通信)
    • destination: 0.0.0.0/0 target: nat-gateway-id(允许内部访问外界)
  • web server security group
    • 允许任意ip对443端口进行访问
    • 允许自己的ipdui22端口进行访问,以便ssh到服务器上向数据库插入数据
  • glue redshift connection security group
    • 只包含一条self-referencing rule ,允许同一个security group对所有tcp端口进行访
    • 创建Glue connection时需要使用该security group:
    • Reference: glue connection security group must have a self-referencing rule to allow to allow AWS Glue components to communicate. Specifically, add or confirm that there is a rule of Type All TCP, Protocol is TCP, Port Range includes all ports, and whose Source is the same security group name as the Group ID.
  • private redshift security group
    • 允许vpc内部(10.10.0.0/24)对5439端口进行访问
    • 允许glue connection security group对5439端口进行访问
  • public redshift security group
    • 允许vpc内部(10.10.0.0/24)对5439端口进行访问
    • 允许kenisis firehose所在region的public ip 对5439端口进行访问
      • 13.58.135.96/27 for US East (Ohio)

      • 52.70.63.192/27 for US East (N. Virginia)

      • 13.57.135.192/27 for US West (N. California)

      • 52.89.255.224/27 for US West (Oregon)

      • 18.253.138.96/27 for AWS GovCloud (US-East)

      • 52.61.204.160/27 for AWS GovCloud (US-West)

      • 35.183.92.128/27 for Canada (Central)

      • 18.162.221.32/27 for Asia Pacific (Hong Kong)

      • 13.232.67.32/27 for Asia Pacific (Mumbai)

      • 13.209.1.64/27 for Asia Pacific (Seoul)

      • 13.228.64.192/27 for Asia Pacific (Singapore)

      • 13.210.67.224/27 for Asia Pacific (Sydney)

      • 13.113.196.224/27 for Asia Pacific (Tokyo)

      • 52.81.151.32/27 for China (Beijing)

      • 161.189.23.64/27 for China (Ningxia)

      • 35.158.127.160/27 for Europe (Frankfurt)

      • 52.19.239.192/27 for Europe (Ireland)

      • 18.130.1.96/27 for Europe (London)

      • 35.180.1.96/27 for Europe (Paris)

      • 13.53.63.224/27 for Europe (Stockholm)

      • 15.185.91.0/27 for Middle East (Bahrain)

      • 18.228.1.128/27 for South America (São Paulo)

      • 15.161.135.128/27 for Europe (Milan)

      • 13.244.121.224/27 for Africa (Cape Town)

      • 13.208.177.192/27 for Asia Pacific (Osaka)

      • 108.136.221.64/27 for Asia Pacific (Jakarta)

      • 3.28.159.32/27 for Middle East (UAE)

      • 18.100.71.96/27 for Europe (Spain)

      • 16.62.183.32/27 for Europe (Zurich)

      • 18.60.192.128/27 for Asia Pacific (Hyderabad)

VPC全部资源的serverless文件:

  • custom:bucketNamePrefix 替换为自己的创建的bucket
  • service: dynamodb-to-redshift-vpc
    
    custom:
      bucketNamePrefix: "jessica"
    
    provider:
      name: aws
      region: ${opt:region, "ap-southeast-1"}
      stackName: ${self:service}
      deploymentBucket:
        name: com.${self:custom.bucketNamePrefix}.deploy-bucket
        serverSideEncryption: AES256
    
    resources:
      Parameters:
        VpcName:
          Type: String
          Default: "test-vpc"
    
      Resources:
        VPC:
          Type: "AWS::EC2::VPC"
          Properties:
            CidrBlock: "10.10.0.0/16"
            EnableDnsSupport: true
            EnableDnsHostnames: true
            InstanceTenancy: default
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}"
        # Internet Gateway
        InternetGateway:
          Type: "AWS::EC2::InternetGateway"
          Properties:
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_InternetGateway"
        VPCGatewayAttachment:
          Type: "AWS::EC2::VPCGatewayAttachment"
          Properties:
            VpcId: !Ref VPC
            InternetGatewayId: !Ref InternetGateway
    
        # web server security group
        WebServerSecurityGroup:
          Type: AWS::EC2::SecurityGroup
          Properties:
            GroupDescription: Allow access from public
            VpcId: !Ref VPC
            SecurityGroupIngress:
              - IpProtocol: tcp
                FromPort: 443
                ToPort: 443
                CidrIp: "0.0.0.0/0"
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_WebServerSecurityGroup"
    
        # public route table
        RouteTablePublic:
          Type: "AWS::EC2::RouteTable"
          Properties:
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_RouteTablePublic"
        RouteTablePublicInternetRoute:
          Type: "AWS::EC2::Route"
          DependsOn: VPCGatewayAttachment
          Properties:
            RouteTableId: !Ref RouteTablePublic
            DestinationCidrBlock: "0.0.0.0/0"
            GatewayId: !Ref InternetGateway
    
        # public subnet
        SubnetAPublic:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [0, !GetAZs ""]
            CidrBlock: "10.10.0.0/24"
            MapPublicIpOnLaunch: true
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetAPublic"
        RouteTableAssociationAPublic:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetAPublic
            RouteTableId: !Ref RouteTablePublic
    
        SubnetBPublic:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [1, !GetAZs ""]
            CidrBlock: "10.10.32.0/24"
            MapPublicIpOnLaunch: true
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetBPublic"
        RouteTableAssociationBPublic:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetBPublic
            RouteTableId: !Ref RouteTablePublic
    
        SubnetCPublic:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [2, !GetAZs ""]
            CidrBlock: "10.10.64.0/24"
            MapPublicIpOnLaunch: true
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetCPublic"
        RouteTableAssociationCPublic:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetCPublic
            RouteTableId: !Ref RouteTablePublic
    
        # redshift security group
        PrivateRedshiftSecurityGroup:
          Type: AWS::EC2::SecurityGroup
          Properties:
            GroupDescription: Allow access from inside vpc
            VpcId: !Ref VPC
            SecurityGroupIngress:
              - IpProtocol: tcp
                FromPort: 5439
                ToPort: 5439
                CidrIp: 10.10.0.0/24
              - IpProtocol: tcp
                FromPort: 5439
                ToPort: 5439
                SourceSecurityGroupId: !GetAtt GlueRedshiftConnectionSecurityGroup.GroupId
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_PrivateRedshiftSecurityGroup"
        # redshift security group
        PublicRedshiftSecurityGroup:
          Type: AWS::EC2::SecurityGroup
          Properties:
            GroupDescription: Allow access from inside vpc and Kinesis Data Firehose CIDR block
            VpcId: !Ref VPC
            SecurityGroupIngress:
              - IpProtocol: tcp
                FromPort: 5439
                ToPort: 5439
                CidrIp: 10.10.0.0/24
              - IpProtocol: tcp
                FromPort: 5439
                ToPort: 5439
                CidrIp: 13.228.64.192/27
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_PublicRedshiftSecurityGroup"
        GlueRedshiftConnectionSecurityGroup:
          Type: AWS::EC2::SecurityGroup
          Properties:
            GroupDescription: Allow self referring for all tcp ports
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_GlueRedshiftConnectionSecurityGroup"
        GlueRedshiftConnectionSecurityGroupSelfReferringInboundRule:
          Type: "AWS::EC2::SecurityGroupIngress"
          Properties:
            GroupId: !GetAtt GlueRedshiftConnectionSecurityGroup.GroupId
            IpProtocol: tcp
            FromPort: 0
            ToPort: 65535
            SourceSecurityGroupId: !GetAtt GlueRedshiftConnectionSecurityGroup.GroupId
            SourceSecurityGroupOwnerId: !Sub "${aws:accountId}"
        # nat gateway
        EIP:
          Type: "AWS::EC2::EIP"
          Properties:
            Domain: vpc
        NatGateway:
          Type: "AWS::EC2::NatGateway"
          Properties:
            AllocationId: !GetAtt "EIP.AllocationId"
            SubnetId: !Ref SubnetAPublic
    
        # private route table
        RouteTablePrivate:
          Type: "AWS::EC2::RouteTable"
          Properties:
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_RouteTablePrivate"
        RouteTablePrivateRoute:
          Type: "AWS::EC2::Route"
          Properties:
            RouteTableId: !Ref RouteTablePrivate
            DestinationCidrBlock: "0.0.0.0/0"
            NatGatewayId: !Ref NatGateway
    
        # private subnet
        SubnetAPrivate:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [0, !GetAZs ""]
            CidrBlock: "10.10.16.0/24"
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetAPrivate"
        RouteTableAssociationAPrivate:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetAPrivate
            RouteTableId: !Ref RouteTablePrivate
        SubnetBPrivate:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [1, !GetAZs ""]
            CidrBlock: "10.10.48.0/24"
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetBPrivate"
        RouteTableAssociationBPrivate:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetBPrivate
            RouteTableId: !Ref RouteTablePrivate
        SubnetCPrivate:
          Type: "AWS::EC2::Subnet"
          Properties:
            AvailabilityZone: !Select [2, !GetAZs ""]
            CidrBlock: "10.10.80.0/24"
            VpcId: !Ref VPC
            Tags:
              - Key: Name
                Value: !Sub "VPC_${VpcName}_SubnetCPrivate"
        RouteTableAssociationCPrivate:
          Type: "AWS::EC2::SubnetRouteTableAssociation"
          Properties:
            SubnetId: !Ref SubnetCPrivate
            RouteTableId: !Ref RouteTablePrivate
    
      Outputs:
        VPC:
          Description: "VPC."
          Value: !Ref VPC
          Export:
            Name: !Sub "${self:provider.stackName}"
        SubnetsPublic:
          Description: "Subnets public."
          Value:
            !Join [
              ",",
              [!Ref SubnetAPublic, !Ref SubnetBPublic, !Ref SubnetCPublic],
            ]
          Export:
            Name: !Sub "${self:provider.stackName}-PublicSubnets"
        SubnetsPrivate:
          Description: "Subnets private."
          Value:
            !Join [
              ",",
              [!Ref SubnetAPrivate, !Ref SubnetBPrivate, !Ref SubnetCPrivate],
            ]
          Export:
            Name: !Sub "${self:provider.stackName}-PrivateSubnets"
        DefaultSecurityGroup:
          Description: "VPC Default Security Group"
          Value: !GetAtt VPC.DefaultSecurityGroup
          Export:
            Name: !Sub "${self:provider.stackName}-DefaultSecurityGroup"
        WebServerSecurityGroup:
          Description: "VPC Web Server Security Group"
          Value: !Ref WebServerSecurityGroup
          Export:
            Name: !Sub "${self:provider.stackName}-WebServerSecurityGroup"
        PrivateRedshiftSecurityGroup:
          Description: "The id of the RedshiftSecurityGroup"
          Value: !Ref PrivateRedshiftSecurityGroup
          Export:
            Name: !Sub "${self:provider.stackName}-PrivateRedshiftSecurityGroup"
        PublicRedshiftSecurityGroup:
          Description: "The id of the RedshiftSecurityGroup"
          Value: !Ref PublicRedshiftSecurityGroup
          Export:
            Name: !Sub "${self:provider.stackName}-PublicRedshiftSecurityGroup"
        GlueRedshiftConnectionSecurityGroup:
          Description: "The id of the self referring security group"
          Value: !Ref GlueRedshiftConnectionSecurityGroup
          Export:
            Name: !Sub "${self:provider.stackName}-GlueSelfRefringSecurityGroup"
    

Redshift Cluster

  • Private Cluster subnet group
    • 创建一个包含private subnet的private subnet group
  • Private Cluster:用于测试glue job同步数据到redshift,PubliclyAccessible必须设为false,否则glue job无法连接
    • ClusterSubnetGroupName
      • 使用private subnet group
    • VpcSecurityGroupIds
      • 使用private redshift security group
    • NodeType: dc2.large
    • ClusterType: single-node
    • PubliclyAccessible: false

  • Public Cluster subnet group
    • 创建一个包含public subnet的public subnet group
  • Public Cluster:用于测试glue job同步数据到redshift,PubliclyAccessible必须设为true,且security group允许kinesis firehose public ip对5439端口进行访问,否则firehose无法连接到redshift
    • ClusterSubnetGroupName
      • 使用public subnet group
    • VpcSecurityGroupIds
      • 使用public redshift security group
    • NodeType: dc2.large
    • ClusterType: single-node
    • PubliclyAccessible: true

redshift全部资源的serverless文件:

  • custom:bucketNamePrefix 替换为自己的创建的bucket
  • service: dynamodb-to-redshift-redshift
    
    custom:
      bucketNamePrefix: "jessica"
    
    provider:
      name: aws
      region: ${opt:region, "ap-southeast-1"}
      stackName: ${self:service}
      deploymentBucket:
        name: com.${self:custom.bucketNamePrefix}.deploy-bucket
        serverSideEncryption: AES256
    
    resources:
      Parameters:
        ServiceName:
          Type: String
          Default: dynamodb-to-redshift
    
      Resources:
        PrivateClusterSubnetGroup:
          Type: "AWS::Redshift::ClusterSubnetGroup"
          Properties:
            Description: Private Cluster Subnet Group
            SubnetIds:
              Fn::Split:
                - ","
                - Fn::ImportValue: !Sub ${ServiceName}-vpc-PrivateSubnets
            Tags:
              - Key: Name
                Value: private-subnet
        PrivateCluster:
          Type: "AWS::Redshift::Cluster"
          Properties:
            ClusterIdentifier: test-data-sync-redshift
            ClusterSubnetGroupName: !Ref ClusterSubnetGroup
            VpcSecurityGroupIds:
              - Fn::ImportValue: !Sub ${ServiceName}-vpc-PrivateRedshiftSecurityGroup
            DBName: dev
            MasterUsername: admin
            MasterUserPassword: Redshift_admin_2022
            NodeType: dc2.large
            ClusterType: single-node
            PubliclyAccessible: false
        PublicClusterSubnetGroup:
          Type: "AWS::Redshift::ClusterSubnetGroup"
          Properties:
            Description: Public Cluster Subnet Group
            SubnetIds:
              Fn::Split:
                - ","
                - Fn::ImportValue: !Sub ${ServiceName}-vpc-PublicSubnets
            Tags:
              - Key: Name
                Value: public-subnet
        PublicCluster:
          Type: "AWS::Redshift::Cluster"
          Properties:
            ClusterIdentifier: test-data-sync-redshift-public
            ClusterSubnetGroupName: !Ref PublicClusterSubnetGroup
            VpcSecurityGroupIds:
              - Fn::ImportValue: !Sub ${ServiceName}-vpc-PublicRedshiftSecurityGroup
            DBName: dev
            MasterUsername: admin
            MasterUserPassword: Redshift_admin_2022
            NodeType: dc2.large
            ClusterType: single-node
            PubliclyAccessible: true
      Outputs:
        PrivateRedshiftEndpoint:
          Description: "Redshift endpoint"
          Value: !GetAtt Cluster.Endpoint.Address
          Export:
            Name: !Sub "${self:provider.stackName}-PrivateRedshiftEndpoint"
        PrivateRedshiftPort:
          Description: "Redshift port"
          Value: !GetAtt Cluster.Endpoint.Port
          Export:
            Name: !Sub "${self:provider.stackName}-PrivateRedshiftPort"
        PublicRedshiftEndpoint:
          Description: "Public Redshift endpoint"
          Value: !GetAtt PublicCluster.Endpoint.Address
          Export:
            Name: !Sub "${self:provider.stackName}-PublicRedshiftEndpoint"
        PublicRedshiftPort:
          Description: "Public Redshift port"
          Value: !GetAtt PublicCluster.Endpoint.Port
          Export:
            Name: !Sub "${self:provider.stackName}-PublicRedshiftPort"
    

使用AWS Glue ETL Job进行同步

适用场景

  • 一次性整表同步
  • 对于典型的时间序列数据(当前的数据写入和读取频率高,越老的数据读写频率越低),通常会采用为每个时间段(每天)创建一张表的方式来合理的分配WCU和RCU。
  • 如果在当时时间段结束之后,需要对该时间段内的所有数据进行复杂的分析操作,则需要将dynamodb的整表同步到redshift

架构

优点

  • 使用AWS Glue Crawler可以自动管理源表和目标表的scheme,在Glue Job script中可以省去mapping的过程,Glue Job script代码易维护

资源部署

  • Dynamodb table: 源数据表
  • IAM role for glue crawler,crawler需要连接dynamodb和redshift的权限以读取表的scheme
  • Dynamodb glue catalog database:用于存储crawler生成的dynamodb table scheme
  • Redshift glue catalog database:用于存储crawler生成的redshift table scheme
  • Dynamodb glue crawler:用于读取dynamodb表,生成对应的dynamodb table scheme​​​​​​​
  • Redshift glue crawler:用于读取redshift表,生成对应的redshift table scheme
  • Glue connection:glue job连接redshift需要用到的connection
  • IAM role for glue job:Glue job需要
  • S3 bucket for glue job
  • glue job

如何部署:

  • sls deploy -c glue-etl.yml
    #replace ${bucketNamePrefix} to your own glue bucket name crate in glue-etl.yml
    aws s3 cp dynamodb-to-redshift.py s3://com.${bucketNamePrefix}.glue-temp-bucket/script/

部署文件:glue-etl.yml

  • service: dynamodb-to-redshift-glue-etl
    
    custom:
      bucketNamePrefix: "jessica"
    
    provider:
      name: aws
      region: ${opt:region, "ap-southeast-1"}
      stackName: ${self:service}
      deploymentBucket:
        name: com.${self:custom.bucketNamePrefix}.deploy-bucket
        serverSideEncryption: AES256
    
    resources:
      Parameters:
        DynamodbTableName:
          Type: String
          Default: "TestSyncToRedshift"
        ServiceName:
          Type: String
          Default: dynamodb-to-redshift
        GlueBucketName:
          Type: String
          Default: com.${self:custom.bucketNamePrefix}.glue-etl-temp-bucket
    
      Resources:
        TestTable:
          Type: AWS::DynamoDB::Table
          Properties:
            TableName: !Sub ${DynamodbTableName}
            BillingMode: PAY_PER_REQUEST
            AttributeDefinitions:
              - AttributeName: pk
                AttributeType: S
              - AttributeName: sk
                AttributeType: S
            KeySchema:
              - AttributeName: pk
                KeyType: HASH
              - AttributeName: sk
                KeyType: RANGE
        CrawlerRole:
          Type: AWS::IAM::Role
          Properties:
            RoleName: CrawlerRole
            AssumeRolePolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Effect: "Allow"
                  Principal:
                    Service:
                      - "glue.amazonaws.com"
                  Action:
                    - "sts:AssumeRole"
            ManagedPolicyArns:
              - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
              - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
              - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
              - arn:aws:iam::aws:policy/AmazonS3FullAccess
        DynamodbDatabase:
          Type: AWS::Glue::Database
          Properties:
            CatalogId: !Ref AWS::AccountId
            DatabaseInput:
              Name: "dynamodb-database"
        DynamodbCrawler:
          Type: AWS::Glue::Crawler
          Properties:
            Name: "dynamodb-crawler"
            Configuration:
            Role: !GetAtt CrawlerRole.Arn
            DatabaseName: !Ref DynamodbDatabase
            Targets:
              DynamoDBTargets:
                - Path: !Sub ${DynamodbTableName}
            SchemaChangePolicy:
              UpdateBehavior: "UPDATE_IN_DATABASE"
              DeleteBehavior: "LOG"
            Schedule:
              ScheduleExpression: cron(0/10 * * * ? *) # run every 10 minutes
        GlueRedshiftConnection:
          Type: AWS::Glue::Connection
          Properties:
            CatalogId: !Sub "${aws:accountId}"
            ConnectionInput:
              Name: ${self:service}-redshift-connection
              ConnectionType: JDBC
              MatchCriteria: []
              PhysicalConnectionRequirements:
                SecurityGroupIdList:
                  - Fn::ImportValue: !Sub ${ServiceName}-vpc-GlueSelfRefringSecurityGroup
                SubnetId:
                  Fn::Select:
                    - 1
                    - Fn::Split:
                        - ","
                        - Fn::ImportValue: !Sub "${ServiceName}-vpc-PrivateSubnets"
              ConnectionProperties:
                JDBC_CONNECTION_URL:
                  Fn::Join:
                    - ""
                    - - "jdbc:redshift://"
                      - Fn::ImportValue: !Sub ${ServiceName}-redshift-PrivateRedshiftEndpoint
                      - ":"
                      - Fn::ImportValue: !Sub ${ServiceName}-redshift-PrivateRedshiftPort
                      - "/dev"
                JDBC_ENFORCE_SSL: false
                USERNAME: admin
                PASSWORD: Redshift_admin_2022
        RedshiftDatabase:
          Type: AWS::Glue::Database
          Properties:
            CatalogId: !Ref AWS::AccountId
            DatabaseInput:
              Name: "redshift-database"
        RedshiftCrawler:
          Type: AWS::Glue::Crawler
          Properties:
            Name: "redshift-crawler"
            Configuration:
            Role: !GetAtt CrawlerRole.Arn
            DatabaseName: !Ref RedshiftDatabase
            Targets:
              JdbcTargets:
                - ConnectionName: !Ref GlueRedshiftConnection
                  Path: dev/public/test_sync_to_redshift
            SchemaChangePolicy:
              UpdateBehavior: "UPDATE_IN_DATABASE"
              DeleteBehavior: "LOG"
        RedshiftGlueJobRole:
          Type: AWS::IAM::Role
          Properties:
            RoleName: RedshiftGlueJobRole
            AssumeRolePolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Effect: Allow
                  Principal:
                    Service:
                      - glue.amazonaws.com
                  Action: sts:AssumeRole
            ManagedPolicyArns:
              - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
              - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
              - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
              - arn:aws:iam::aws:policy/AmazonS3FullAccess
              - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
    
        GlueTempBucket:
          Type: AWS::S3::Bucket
          Properties:
            BucketName: !Sub ${GlueBucketName}
    
        GlueJob:
          Type: AWS::Glue::Job
          Properties:
            Name: dynamodb-to-redshift-glue-etl-job
            Role: !GetAtt RedshiftGlueJobRole.Arn
            Command:
              Name: glueetl
              ScriptLocation: !Sub "s3://${GlueBucketName}/script/dynamodb-to-redshift.py"
              PythonVersion: 3
            DefaultArguments:
              --TempDir: !Sub "s3://${GlueBucketName}/tmp/dynamodb-to-redshift/"
            WorkerType: G.1X
            NumberOfWorkers: 2
            GlueVersion: "3.0"
            Connections:
              Connections:
                - !Ref GlueRedshiftConnection
    

glue job脚本:dynamodb-to-redshift.py

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

params = [
    'JOB_NAME',
    'TempDir',
]

args = getResolvedOptions(sys.argv, params)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

DynamoDBtable_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="dynamodb-database",
    table_name="testsynctoredshift",
    transformation_ctx="DynamoDBtable_node1",
)

RedshiftCluster_node2 = glueContext.write_dynamic_frame.from_catalog(
    frame=DynamoDBtable_node1,
    database="redshift-database",
    table_name="dev_public_test_sync_to_redshift",
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="RedshiftCluster_node2",
)

job.commit()

测试

  1. insert some data to dynamodb table with aws web console first, otherwise, the crawler can not detect the table scheme

    2. run dynamodb-crawler, after run success, you can see the database and table in [glue console](https://ap-southeast-1.console.aws.amazon.com/glue/home?region=ap-southeast-1#catalog:tab=tables)

    3. create redshift table with [Redshift query editor v2](https://ap-southeast-1.console.aws.amazon.com/sqlworkbench/home?region=ap-southeast-1#/client)

    ```

    CREATE TABLE "public"."test_sync_to_redshift"(pk varchar(200) not null, sk varchar(200) NOT NULL, primary key(pk, sk));

    ```

    4. run redshift-crawler, if encounter no valid connection error, please update password in the redshift-connection manually with aws console, don't know why the password is not correct when deploy with cloudformation. After run success, you can see the database and table in [glue console](https://ap-southeast-1.console.aws.amazon.com/glue/home?region=ap-southeast-1#catalog:tab=tables)

    5. run glue etl job, after run success, you can check data in redshift table with [Redshift query editor v2](https://ap-southeast-1.console.aws.amazon.com/sqlworkbench/home?region=ap-southeast-1#/client).

    This glue etl job will `insert all data in dynamodb table` to redshift table directly, as for redshift, [primary key, and foreign key constraints are informational only; they are not enforced by Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/t_Defining_constraints.html), so if you run the jon serval times, you will see duplicate data with some primary key in redshift table.

使用AWS Glue Streaming Job进行同步

适用场景

  • 持续增量同步
  • 表的操作支持插入,更新和删除

架构图

资源部署

  • Dynamodb表
  • VPC
  • Redshift Cluster
  • Glue Crawler
  • GlueJob

优点

  • 可以支持表的插入,更新和删除操作的同步

缺点

使用AWS kinesis Firehose进行同步

适用场景

  • 持续增量同步
  • 表的操作只支持插入,部分支持更新,不支持删除,比如记录传感器每秒收集的数据,记录网站用户的行为事件
    • 由于kinesis firehose是通过Redshift COPY命令与redshift进行集成的,而redshift是不保证primary key的唯一性,对redshift来说,primary key只是提供信息,并没有保证primary key的唯一性,如果在COPY命令的源数据中包含多条primary key相同的数据(比如对一条数据进行多次修改),则会导致redshift表出现多条primary key相同的数据。
    • 部分支持更新的意思就是如果多条primary key相同的数据对你的业务逻辑没有影响,那也可以使用AWS kinesis Firehose进行同步,如果多条primary key对业务逻辑有影响,那就不可以使用
    • 由于kinesis firehose是通过Redshift COPY命令与redshift进行集成的,COPY命令是不支持删除的

架构图

资源部署

Reference

Setting up networking for development for AWS Glue - AWS Glue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/334497.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux破解root密码

✅作者简介:热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏:Linux操作…

Postgresql中不支持事务块中调用plpgsql回滚(多层exception、事务块有检查点)

前言 Postgresql使用子事务来实现EXCEPTION的功能,即在进入EXCEPTION的存储过程前,会自动起一个子事务,如果发生了异常,则自动回滚子事务,达成EXCEPTION的效果。那么如果在事务块内本身就带子事务(SAVEPOI…

Python文件操作-代码案例

文章目录文件打开文件open写文件上下文管理器第三方库简单应用案例使用python生成二维码使用python操作excel程序员鼓励师学生管理系统文件 变量就在内存中,文件在硬盘中. 内存空间更小,访问速度快,成本贵,数据容易丢失,硬盘空间大,访问慢,偏移,持久化存储. \\在才是 \的含义…

十分钟上手把玩树莓派——系统创建指南

无意中发现一个落灰的树莓派 故事便开始了…… 准备工作 树莓派 3B一张大于 8G 的 micro SD 卡一个读卡器HDMI 显示器及连接线、键盘、鼠标等外围设备 系统镜像下载 推荐两个树莓派镜像下载网站 树莓派官方网站:https://www.raspberrypi.com/software/树莓派实…

“赶快回家网”首页制作

“赶快回家网”首页制作一、实验名称:二、实验日期:三、实验目的:四、实验内容:五、实验步骤:六、实验结果:七、源程序:八、心得体会:一、实验名称: “赶快回家网”首页…

使用Jmeter抓取手机APP报文并进行APP接口测试

Jmeter是一个比较常用的接口测试工具,尤其是接口性能测试。当然它也可以用来测试手机APP的HTTP接口,我在Fiddler抓取手机APP报文 和 接口测试代理工具charles mock测试 分别介绍了Fiddler和charles 如何抓取APP报文,本文介绍使用Jmeter来抓取…

第六章 关系数据理论(规范化详解)

第六章 关系数据理论 6.1 问题的提出 本章主要讨论关系数据理论。在讨论数据库的时候,绕不开的一个问题是:针对一个具体问题,应该如何构建一个适合他的数据库模式。这是数据库设计的问题,确切地讲是关系数据库逻辑设计的问题。为…

python最新采集某站美女,还不快学起来,下载可能下架视频

前言 大家早好、午好、晚好吖 ❤ ~ 这个页面大家认识吧~ 喜欢看吧 那我们今天就来采集一下它呀~ 开发环境: 版 本: python 3.8 编辑器: pycharm 2022.3.2 专业版 requests >>> pip install requests ffmpeg 音视频合成软件 如果安装python第三方模块: win R 输…

民用建筑电力系统运行和节能中的应用——电力监控系统篇

【摘要】本文中概述电力监控系统结构和作用,通过列举工程实例,详细介绍了电力监控系统在民用建筑电力系统运行和节能中的应用,以及在推广和发展方面需要改进的问题。 【关键词】民用建筑;电力监控系统;运行和节能中的…

cmd常用的操作命令

使用windows系统,通常在cmd中输入指令,会调用相应的一些程序或者执行一些功能,学会使用CMD中的命令,可以加快我们一些操作,省时省力。 ipconfig ------查询IP地址 gpedit.msc-----组策略 sndrec32-------录音机 Nsloo…

小程序自动化测试框架【Minium】系列(三)元素定位详解

元素定位 元素定位,应该是很多UI自动化测试入门学习必会的技能了,下面我将为大家举例演示元素定位的几种方法。 1、CSS选择器 Minium 可以通过 WXSS 选择器定位元素,如下图所示: 如果有[CSS选择器]基础会上手更快 ,如…

Maven知识点-插件-maven-surefire-plugin简介

Maven本身并不是一个单元测试框架,Java 世界中主流的单元测试框架为JUnit 和TestNG。 Maven 所做的只是在构建执行到特定生命周期阶段的时候,通过插件来执行JUnit或者TestNG的测试用例。 这一插件就是maven-surefire-plugin,可以称之为测试…

Docker安装ElasticSearch,并进行ik和hanlp分词

我按装的目标: 利用ElastiSearch存储数据,ik和hanlp分词插件 对 搜索词进行分词,在ES存储的库中找到与搜索词相近的内容。 安装感受: 原始环境安装老版本的ES,BUG不断,ES相关解答博客对新手有点不友好,完整的解释不多&…

Elasticsearch7.8.0版本进阶——分布式集群(故障转移)

目录一、Elasticsearch集群的安装1.1、Elasticsearch集群的安装(win10环境)1.2、Elasticsearch集群的安装(linux环境)二、故障转移的概述三、故障转移(win10环境集群演示)一、Elasticsearch集群的安装 1.1…

分布式高级篇1 —— 全文检索

Elasticsearch Elasticsearch简介一、基本概念1、index(索引)2、Type(类型)3、Document(文档)4、倒排索引二、Docker 安装 EL1、拉取镜像2、创建实例三、初步探索1、_cat2、索引一个文档(保存)3、查询文档3、更新文档4、删除文档&索引5、_bulk 批量 AP6、样本测试数据四、进…

安全测试的最常用方法你知道多少呢?

安全性测试(Security Testing)是指有关验证应用程序的安全等级和识别潜在安全性缺陷的过程,其主要目的是查找软件自身程序设计中存在的安全隐患,并检查应用程序对非法侵入的防范能力,安全指标不同,测试策略也不同。 但安全是相对的…

【ESP32+freeRTOS学习笔记-(七)中断管理】

目录1、概述2、在ISR中使用FreeRTOS中专用的API2.1 独立的用于ISR中的API2.2 关于xHigherPriorityTaskWoken 参数的初步理解3、延迟中断处理的方法-将中断中的处理推迟到任务中去4 方法一:用二进制信号量来同步ISR与”延时处理的任务“4.1 二进制信号量4.2 函数用法…

高中生用台灯哪种好?2023最好的台灯品牌排行榜

高中生的学习时长是最长的,所以导致现在许多高中生都戴上了眼镜,主要是因为长时间对着书本,没有合理的让眼睛休息,导致眼疲劳,而选择护眼台灯是最好的,台灯内置的护眼技术是非常实用的,可以改善…

SIP协议的一键对讲终端

SIP对讲终端是一款采用了ARMDSP架构,接收网络音频流,实时解码播放;配置了麦克风输入和扬声器输出,作为网络数字广播的播放终端。主要用于银行、部门机构、酒店等场所的网络广播、网络对讲。本产品配置了麦克风和3W扬声器&#xff…

psudohash:一款基于变异机制的密码列表生成工具

关于psudohash psudohash是一款功能强大的密码列表生成工具,该工具基于关键词变异技术实现其功能,并且能够根据常用密码创建模式来生成字典文件。 psudohash可以用于密码爆破任务中,以帮助广大研究人员测试密码的安全性。该工具能够模仿人类…