LoginSignup
44
33

More than 5 years have passed since last update.

Google Cloud Functions [Alpha] を使って GCP の課金データを BigQuery に投入してみた

Last updated at Posted at 2016-05-17

Google I/O 2016  wezardnet 
 Google Cloud Functions 使 Google Cloud PlatformGCP  BigQuery 

1. Cloud Functions 


Cloud Functions  Node.js  AWS Labmda  AWS 

Cloud Functions 3 Node.js 


Cloud Pub/Sub topic

Cloud Storage bucket = Object Change Notification

HTTP trigger (HTTP POST only)


 Cloud Functions  Google Container EngineGKE cluster ...
5/4   cluster  m(__)m

2. Cloud Functions 使


Cloud Functions 

 Google Cloud Functions - Alpha Sign Up
 image01.png
 Google  API Manager  Cloud Functions API Enable

 image02.png
 Cloud Functions 使

3. 


 GCP  BigQuery  Google Developer Console  Billing export  Google Cloud StorageGCS

 image03.png
 GCS 

 image04.png

4. Function 

4.1. 


Cloud Functions  Node.js  JavaScript  Node.js  :sweat:

 GCP  gcloud-node.js module 使 package.json 
package.json
{
    "name": "bqbilling", 
    "version": "1.0.0", 
    "main": "bqbilling.js", 
    "dependencies": {
        "gcloud": "^0.28.0", 
        "node-uuid": "^1.4.7", 
        "moment": "^2.10.6", 
        "moment-timezone": "^0.4.0"
    }
}

次に GCS のバケットに出力された課金データを読み込んで BigQuery に投入するメインな処理を書きます。今回は GCS と BigQuery を利用するので、最初に次のように記述しておきます。keyFilename は特に指定しなくても API へのアクセスは許可されるようです。手間が省けますね。

bqbilling.js
var gcloud = require('gcloud')({
    projectId: '********'
});
var gcs = gcloud.storage() ;
var bq = gcloud.bigquery() ;

projectId の値は対象の GCP プロジェクトの ID を記述します。これは Google Developer Console の Home 画面のダッシュボードからコピペします。

 image06.png

Cloud Functions の Cloud Storage bucket トリガーで呼び出される部分は次のように実装しています。このトリガーは GCS が備える Object Chage Notification(OCN) と同じ機能になります。GCS にオブジェクトが 追加(アップロード)/更新(変更)/削除 された時に呼び出される(通知される)仕掛けになります。

bqbilling.js
exports.main = function(context, data){
    var bucket = gcs.bucket(data.bucket) ;
    var file = bucket.file(data.name) ;
    var buffer = new Buffer('') ;
    file.createReadStream()
        .on('data', function(chunk){
            buffer = Buffer.concat([buffer, chunk]) ;
        })
        .on('finish', function(){
            var jsonBilling = JSON.parse(buffer) ;
            execute(data.name, jsonBilling) ;
        });
    context.success() ;
};

前述した課金データは毎日 GCS の指定バケットに追加されるので、そのタイミングで上記の処理が実行されます。Function パラメータの data パラメータにトリガーに関連付けられたデータが格納されているので createReadStream で追加された課金データ(JSON)を読み込みます。

それでは BigQuery 側の処理をみていきましょう。BigQuery のテーブルは月単位(yyyyMM)で保持するようにしました。少しショボイですが、毎回テーブルが存在するかどうかチェックして、なければ作成するようにしています :cold_sweat:

bqbilling.js
function execute(fileName, jsonBilling){
    var fileDate = fileName.match(/\d{4}-(0[1-9]|1[0-2])/g) ;
    var tableName = fileDate.toString().replace(/-/g, '') ;

    var dataset = bq.dataset('{データセット名}') ;
    var targetTable = dataset.table(tableName) ;

    targetTable.exists(function(err, exists){
        if ( !exists ) {
            // TODO: テーブルがなければ新規に作る
            var options = {schema: {
                "fields": [
                    {
                        "type": "timestamp",
                        "mode": "required", 
                        "name": "StartTime"
                    }, 
                    {
                        "type": "timestamp",
                        "mode": "required", 
                        "name": "EndTime"
                    }, 
                    {
                        "type": "STRING", 
                        "mode": "required", 
                        "name": "ProjectId"
                    }, 
                    {
                        "type": "STRING", 
                        "mode": "required", 
                        "name": "Description"
                    }, 
                    {
                        "type": "FLOAT", 
                        "mode": "nullable", 
                        "name": "Debits"
                    }, 
                    {
                        "type": "STRING", 
                        "mode": "required", 
                        "name": "Currency"
                    }, 
                    {
                        "type": "FLOAT", 
                        "mode": "nullable", 
                        "name": "Amount"
                    }, 
                    {
                        "type": "STRING", 
                        "mode": "nullable", 
                        "name": "Unit"
                    }, 
                    {
                        "type": "STRING", 
                        "mode": "required", 
                        "name": "FileName"
                    }
                ]
            }};
            dataset.createTable(tableName, options, function(err, table, apiResponse){
                if ( err ) {
                    console.log('err: ', err) ;
                    console.log('apiResponse: ', apiResponse) ;
                    return ;
                }
                exportBigQuery(table, fileName, jsonBilling) ;
            });
        }
        else exportBigQuery(targetTable, fileName, jsonBilling) ;
    }) ;
}

最後に課金データ(JSON)を BigQuery に投入します。実装例を以下に示します。
どういうわけか Cloud Functions の Cloud Storage bucket トリガーでは同じ課金データ(JSON)が 2 回連続して通知されるので、重複して投入されないようにクエリーを投げてチェックするようにしています。

bqbilling.js
function exportBigQuery(table, fileName, jsonBilling){
    var query = util.format('SELECT COUNT(*) AS count FROM [%s] WHERE FileName = "%s"', table.metadata.id, fileName) ;
    bq.query(query, function(err, rows, nextQuery){
        if ( err ) {
            console.log('err: ', err) ;
            return ;
        }
        else if ( rows[0].count > 0 ) return ;

        for ( var i = 0; i < jsonBilling.length; i++ ) {
            var item = jsonBilling[i] ;

            var stime = moment(item.startTime).utc().format('YYYY-MM-DDTHH:mm:ssZ') ;
            var etime = moment(item.endTime).utc().format('YYYY-MM-DDTHH:mm:ssZ') ;

            var row = {
                insertId: uuid.v4(), 
                json: {
                    StartTime: stime, 
                    EndTime: etime, 
                    ProjectId: item.projectId, 
                    Description: item.description, 
                    Debits: item.cost.amount, 
                    Currency: item.cost.currency, 
                    Amount: item.measurements[0].sum, 
                    Unit: item.measurements[0].unit, 
                    FileName: fileName
                }
            };
            var options = {
                raw: true, 
                skipInvalidRows: true
            };
            table.insert(row, options, function(err, insertErrors, apiResponse){
                if ( err ) {
                    console.log('err: ', err) ;
                    console.log('apiResponse: ', apiResponse) ;
                }
            }) ;
        }
    });
}


4.2. 


Google Developer Console  Cloud Functions CREATE FUNCTION

 image05.png
 GCS Cloud Storage bucket zip  package.json zip  


 image07.png
JSON GCS  BigQuery  :grin:

5. BigQuery 


BigQuery  JSON

 image08.png
 :smiley:

image09.png

6. 


 GCP   BigQuery  Google App Engine + GCS Object Change Notification  Cloud Functions 

Cloud Functions 使
44
33
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
44
33