DynamoDBとAWS LambdaでアクセスランキングAPIをつくる!

プラコレのアドベントカレンダー本日から始まります!!会社として初めてのアドベントカレンダーなのでお手柔らかにお願いします!

こんにちは!沖縄に住んでプラコレのCTOをやっている森です。12月になって沖縄も寒くなってきましたがまだまだ20度くらいで過ごしやすいです。今回弊社のメディアDressyのバックエンドのリニューアルをするために汎用的に使える一週間ごとに集計したアクセスランキングAPIを作成したのでそれについて書こうと思います!

今回作成したアクセスランキングAPIは汎用的に運用できるので、静的ページでも使えたりなにかのアプリケーションに組み込むなども簡単で、アクセス数が増えてもDynamoDBのオートスケーリングで柔軟に対応できます。

アクセスランキングAPIは一定期間(今回は一週間)のURLごとのアクセスランキングを返すAPIです。記事ではテーブルの定義とLambda関数のNode.jsでの実装までを載せていますので、LambdaやAPI Gatewayへのデプロイなどはserverless frameworkやAWS SAMなどを使用して設定することをおすすめします。

目次

  • APIの詳細
  • DynamoDBのテーブル定義
  • DynamoDBのインデックス定義
  • Lambda関数の作成
  • アクセスを日毎に記録する関数
  • 日毎のデータを1週間のデータに集計する関数
  • 結果を返す関数
  • 最後に

APIの詳細

アクセスの記録

ページにアクセスがあったときに呼び出されるAPIです。アクセスがあったときにLambdaでmodule.exports.accessが呼び出されるようにします。

リクエスト内容はURLが/contents/123にアクセスがあった場合このような感じのリクエストを送ります。

dev環境でaccessにPOSTで紐付けた場合のリクエスト例

curl -X POST https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/access  -H 'Content-Type: application/json' -d '{"url": "/contents/123"}'

レスポンスはこのようになります。

{
  "message": "success"
}

アクセスランキングデータの取得

アクセスランキングのデータを取得したい時に呼び出すAPIです。module.exports.rankingが呼び出されるようにします。

dev環境でrankingにGETで紐付けた場合のリクエスト例

curl -X GET https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/ranking -H 'Content-Type: application/json'

レスポンスはこのようになります。

{
  "message": "success",
  "data": {
    "Items": [
      {
        "date": "20191209",
        "interval": 7,
        "url": "/contents/123",
        "num": 1024
      },
      {
        "date": "20191209",
        "interval": 7,
        "url": "/contents/aaa",
        "num": 256
      }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "LastEvaluatedKey": {
      "url": "/contents/aaa",
      "date": "20191209",
      "num": 256
    }
  }
}

Lambda関数とDynamoDB

APIの作成にあたり2つのDynamoDBのテーブルと3つのLambda関数を作成しました。

DynamoDBテーブル

  • 日毎のデータを記録するテーブル
  • 集計したデータを記録するテーブル

Lambda関数

  • アクセスを日毎に記録する関数
  • 日毎のデータを1週間のデータに集計する関数
  • 結果を返す関数

DynamoDBのテーブル定義

まずはDynamoDBのテーブルを設定します。日毎のアクセス数記録用と集計用で2つ作成しました。

RankingItemテーブル

アクセスがあった時にurlとdateで引いてnumに1を加算する必要があるのでHASHをurl、RANGEをdateにしています。

KeyType AttributeName AttributeType
HASH url 文字列 URLを正規化した値
RANGE date 文字列 アクセス日をYYYYMMDDの形式
- num 数値 アクセス数

RankingSummaryテーブル

KeyType AttributeName AttributeType
HASH date 文字列 アクセス日をYYYYMMDDの形式
RANGE url 文字列 URLを正規化した値
- num 数値 アクセス数の合計
- interval 数値 何日分を集計したのか

DynamoDBのインデックス定義

GlobalSecondaryIndexを以下のようにそれぞれのテーブルに定義しました。

itemGSI

日毎のデータを集計してRankingSummaryテーブルに入れるため、itemGSIという名前でRankingItemに以下のように定義しました。

特定の日付のURLを全部取得する必要があるのでdateをHASHキーにしています(RANGEキーにnumを指定しているのはRankingSummaryテーブルにデータがなかった場合1日分のアクセスランキングを表示できるようにアクセス数で並び替えができるようにしています)

KeyType AttributeName
HASH date
RANGE num

summaryGSI

アクセスランキングAPIでは集計したデータをもとに上位何件かを返す必要があるため、以下のように日付で引いてnumで並び替えできるようにしています。

KeyType AttributeName
HASH date
RANGE num

Lambda関数の作成

関数は別ファイルにしても良いですが、今回は簡略化のためにすべて同じファイルに作成することとしています。

まずは共通で読み込むライブラリの読み込みをします。日付の処理でdayjsを使用しているため事前にnpm install dayjsをしておいてください。

コードの途中にコメントを入れているような感じで書いているので以下現れるコードをつなぎ合わせるとすべてのLambda関数が完成します。

'use strict';

const AWS = require('aws-sdk')
const docClient = new AWS.DynamoDB.DocumentClient({region: 'ap-northeast-1'})
const dayjs = require('dayjs')

アクセスを日毎に記録する関数

アクセスを記録するためにaccessという関数を作成しますが、その前にURLを正規化するメソッドを作成します。例えばcontent/123?hoge=fugaというURLが来た場合に/content/123というキーにするようにしてアクセス数が意図せずバラけないようにします(適宜用途によって書き換えてください)

const normalizeUrl = (path) => {
  const removedParams = (path || '').split('?')[0] || ''
  const removedTailSlash = removedParams.replace(/\/$/, '')
  if(!!removedTailSlash.match(/^\//)){
    return removedTailSlash
  }else{
    return `/${removedTailSlash}`
  }
}

リクエストがapplication/jsonでurlというキーで送られることを想定しています。

先程のメソッドで正規化されたURLと年月日で引いた項目のnumの値をインクリメントするのですが、項目がない場合にもエラーなく対応する必要があるため、UpdateExpressionでif_not_exists(num, :default)として項目がない場合(numがない場合)の対応をしています。

module.exports.access = async (event, context, callback) => {
  return new Promise((resolve, reject) => {
    const url = JSON.parse(event.body).url

    const params = {
      TableName: 'RankingItems',
      Key: {
        url: normalizeUrl(url),
        date: dayjs().format('YYYYMMDD')
      },
      UpdateExpression: "set num = if_not_exists(num, :default) + :increment",
      ExpressionAttributeValues:{
        ":default": 0,
        ":increment": 1
      },
      ReturnValues:"UPDATED_NEW"
    }
    docClient.update(params, (err, data) => {
      if (err) {
        reject(err)
      } else {
        resolve({
          statusCode: 200,
          body: JSON.stringify({
            message: 'success'
          })
        })
      }
    })
  })
}

日毎のデータを1週間のデータに集計する関数

rankingItemsメソッドではrankingItemsテーブルから指定した日にちのデータを再帰的に取得して返します。

queryでは1MB分一度に取得できますが、もしそれを超えた場合にLastEvaluatedKeyにキー(この場合はHASHとRANGEキー)が入ってくるのでキーが入っていた場合はそのキーを渡して残りのデータを取得するようにしています。

const rankingItems = (date, items=[], lastEvaluatedKey=null) => {
  const params = {
    TableName: 'rankingItems',
    IndexName: 'itemGSI',
    KeyConditionExpression: '#dt = :dt',
    ExpressionAttributeNames : {
      '#dt': 'date'
    },
    ExpressionAttributeValues:{
      ':dt': date
    },
    ScanIndexForward: false
  }

  if(lastEvaluatedKey) params.ExclusiveStartKey = lastEvaluatedKey

  return new Promise((resolve, reject) => {
    docClient.query(params, (err, data) => {
      if(err){
        reject(err)
      }

      data.Items.forEach((item)=>{
        items.push(item)
      })

      if(typeof data.LastEvaluatedKey !== "undefined"){
        (async ()=>{
          const rankingItems = await rankingItems(date, items, data.LastEvaluatedKey)
          resolve(items)
        })()
      }else{
        resolve(items)
      }
    })
  })
}

aggrigationItemsメソッドではintervalで指定された日にち分のデータをrankingItemsメソッドを使ってrankingItemsテーブルからアクセス数を取得して集計します。また、集計したデータをdictionary形式で返します。

const aggrigationItems = async (interval) => {
  const ranking = {}
  for(let i=1; i<=interval; i++){
    const daysAgo = i * -1
    const items = await rankingItems(dayjs().add(daysAgo, 'day').format('YYYYMMDD'))
    items.forEach((item)=>{
      if(ranking[item.url]){
        ranking[item.url] += item.num
      }else{
        ranking[item.url] = item.num
      }
    })
  }

  return ranking
}

上記でrankingItemsテーブルから取得して集計したデータを受け取りrankingSummariesテーブルに保存します。

const updateSummary = async (rankingData, interval)=>{
  const updateList = []

  Object.keys(rankingData).forEach((url)=>{
    const num = rankingData[url]
    const params = {
      TableName: 'rankingSummaries',
      Key: {
        url: url,
        date: dayjs().add(-1, 'day').format('YYYYMMDD')
      },
      UpdateExpression: "set num = :value, #interval = :interval",
      ExpressionAttributeNames: {
        "#interval": "interval"
      },
      ExpressionAttributeValues:{
        ":value": num,
        ":interval": interval
      },
      ReturnValues:"UPDATED_NEW"
    }
    const updateItem = new Promise((resolve, reject)=>{
      docClient.update(params, (err, data) => {
        console.log(`update ${url}`)
        if (err) {
          console.log(err)
          reject(err)
        } else {
          resolve()
        }
      })
    })

    updateList.push(updateItem)
  })

  return new Promise((resolve, reject)=>{
    Promise.all(updateList).then(res=>{
      resolve()
    }).catch(rej=>{
      reject()
    })
  })
}

最後に、上記で作成したメソッドをまとめてLambda関数とします。アクセスランキングを更新するため毎日この関数を実行するようにします。

module.exports.summaryTask = async (event, context, callback) => {
  const intervalDate = 7

  const ranking = await aggrigationItems(intervalDate)
  await updateSummary(ranking, intervalDate)

  return ranking
}

結果を返す関数

アクセスランキングの結果を返す関数は基本的にはrankingSummariesからデータを取得して返すだけなのですが、取得データがない場合(日付が切り替わった直後などで集計中の場合など)に空データはなるべく返したくないので、rankingItemsテーブルにある前日のデータを取得して返すようにしています。

この機能を実装するためgetAccessRankingItemsメソッドではGlobalSecondaryインデックスを使用してrankingItemsから上位のデータを取得するようにしています。

const topNum = 10

const getAccessRankingItems = (date)=>{
  return new Promise((resolve, reject) => {
    const params = {
      TableName: 'rankingItems',
      IndexName: 'itemGSI',
      KeyConditionExpression: '#dt = :dt',
      ExpressionAttributeNames : {
        '#dt': 'date'
      },
      ExpressionAttributeValues:{
        ':dt': date
      },
      ScanIndexForward: false,
      Limit: topNum
    }

    docClient.query(params, (err, data) => {
      if (err) {
        reject(err)
      } else {
        resolve(data)
      }
    })
  })
}

getAccessRankingSummariesメソッドは集計したデータをrankingSummariesテーブルから取得して返します。

const getAccessRankingSummaries = (date)=>{
  return new Promise((resolve, reject) => {
    const params = {
      TableName: 'rankingSummaries',
      IndexName: 'summaryGSI',
      KeyConditionExpression: '#dt = :dt',
      ExpressionAttributeNames : {
        '#dt': 'date'
      },
      ExpressionAttributeValues:{
        ':dt': date
      },
      ScanIndexForward: false,
      Limit: topNum
    }
    docClient.query(params, (err, data) => {
      if (err) {
        reject(err)
      } else {
        resolve(data)
      }
    })
  })
}

基本的にはrankingSummariesからデータを取得してもしデータがなかった時にバックアップとして昨日のアクセスランキングを取得するようにしています。

module.exports.ranking = async (event, context, callback) => {
  return new Promise((resolve, reject) => {
    const date = dayjs().add(-1, 'day').format('YYYYMMDD')

    getAccessRankingSummaries(date).then((result)=>{
      if(result.Items.length>0){
        resolve({
          statusCode: 200,
          body: JSON.stringify({
            message: 'success',
            data: result
          })
        })
      }else{
        getAccessRankingItems(date).then((result)=>{
          resolve({
            statusCode: 200,
            body: JSON.stringify({
              message: 'empty summaries',
              data: result
            })
          })
        }).catch((err)=>{
          reject(err)
        })
      }
    }).catch((err)=>{
      reject(err)
    })
  })
}

最後に

明日は最近看護師から転職して成長著しい森下さんです!CSSアニメーションについてとのこと!楽しみにしています!!

プラコレではDynamoDBを触りたいエンジニアも!もっと違うことをやりたいエンジニアも!デザイナーも!募集しています!!

プラコレはプラコレWedding以外にもウェディングのメディアDressyfarnyなども運営しています。ぜひ見ていってもらえると嬉しいです!