本文
DynamoDB Localというローカル環境で動くDynamoDBがあり *1,これはDynamoDBの動作検証やテストに利用するためのミドルウェアなんですが (もちろん本番環境で使うものではない),これがDynamoDB Streamsをサポートしていることはあまり知られていません.僕も一昨日知りました.
Yes, the latest version of DynamoDB Local supports DynamoDB Streams on the same port configured for the DynamoDB service (by default 8000).
https://forums.aws.amazon.com/thread.jspa?threadID=231696
つまりこれはDynamoDB Streamsを使っているようなソフトウェアのテストや動作の検証をDynamoDB Localを使って行えるということです.
以下はgoの場合のサンプルコードですが,内容はAWS上で動いているDynamoDBでDynamoDB Streamsを利用する場合と大差はありません.
dynamoSession, err := session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
Endpoint: aws.String("http://localhost:8000"),
})
if err != nil {
panic (err)
}
dynamo := dynamodb.New(dynamoSession)
dynamoStream := dynamodbstreams.New(session)
DynamoDB Localに向けたDynamoDB及びDynamoDB Streamsのクライアントを作ります.
_, err = dynamo.CreateTable(&dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("ID"),
AttributeType: aws.String("S"),
},
{
AttributeName: aws.String("Name"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("ID"),
KeyType: aws.String("HASH"),
},
{
AttributeName: aws.String("Name"),
KeyType: aws.String("RANGE"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String("User"),
StreamSpecification: &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(true),
StreamViewType: aws.String("NEW_AND_OLD_IMAGES"),
},
})
StreamSpecification
を用いてStreamsを有効にしたテーブルを作成します.
_, err = dynamo.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("User"),
Item: map[string]*dynamodb.AttributeValue{
"ID": {
S: aws.String("ID-1"),
},
"Name": {
S: aws.String("moznion"),
},
},
ReturnConsumedCapacity: aws.String("none"),
})
作成したテーブルに対してitemをputしてみます.これでUser
tableにレコードが入っているはず.
table, _ := dynamo.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String("User"),
})
streamArn := table.Table.LatestStreamArn
stream, _ := dynamoStream.DescribeStream(&dynamodbstreams.DescribeStreamInput{
StreamArn: streamArn,
})
shards := stream.StreamDescription.Shards
DescribeTable
でtable情報を引いて,DynamoDB StreamsのstreamArnを取得します.そのstreamArnを利用してStreamのshardsを引っ張ってきます.
Shard:
for _, shard := range shards {
out, err := dynamoStream.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{
StreamArn: streamArn,
ShardId: shard.ShardId,
ShardIteratorType: aws.String(dynamodbstreams.ShardIteratorTypeTrimHorizon),
})
if err != nil {
logrus.Error(err)
continue
}
nextItr := out.ShardIterator
for nextItr != nil {
record, err := dynamoStream.GetRecords(&dynamodbstreams.GetRecordsInput{
ShardIterator: nextItr,
})
if err != nil {
continue Shard
}
records := record.Records
nextItr = record.NextShardIterator
}
}
shards
を用いて各shardのShardIteratorを取得し,そのshard iteratorを使うことでstreamからレコードを取ってくることが可能となります.後は煮るなり焼くなりできるでしょう.
なおこのコードはあくまでサンプルなので,実際にはshards周りの処理は状況に応じて書き換える必要があるでしょう.