趣味のログ

自分用の作業ログ。。

Cloud Dataflowを使ってGrovePi+のセンサデータをウィンドウ集計(Java編)

↓の記事でRaspberryPi3からCloud IoTへ送信したGrovePi+のセンサデータを、Cloud Dataflowでウィンドウ集計してみた。
kmth23.hatenablog.com

Cloud Dataflow のJavaのクイックスタートを参考に進める。

Java と Apache Maven を使用したクイックスタート  |  Cloud Dataflow のドキュメント  |  Google Cloud

まず、mavenでプロジェクトを作成。

mvn archetype:generate \
      -DgroupId=com.example \
      -DartifactId=dev-dataflow \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.example

サンプルを参考にしつつ、pom.xmlは下記のようにした。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>dev-dataflow</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>dev-dataflow</name>
  <url>http://maven.apache.org</url>

  <properties>
    <gson.version>2.8.2</gson.version>
    <dataflow.version>2.2.0</dataflow.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <guava.version>20.0</guava.version>
    <pubsub.version>v1-rev10-1.22.0</pubsub.version>
  </properties>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>${gson.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>${dataflow.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>${guava.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

Javaのコードは下記。
Cloud IoTへ送信し、Cloud Pub/Subへ入ったセンサデータをDataflowから取得し、そこから気温データを抜き出す。
この気温データについて、2分毎の合計値を、2分毎にログ出力させた。(特に目的はないが。。お試しで)

package com.example;

import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import com.google.gson.Gson;

import org.joda.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class App 
{
    public static void main( String[] args )
    {
        DataflowPipelineOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .create()
                .as(DataflowPipelineOptions.class);
        options.setStreaming(true);
        options.setJobName("test");

        Pipeline p = Pipeline.create(options);
        PCollection<KV<String, Double>> scores = p.apply(PubsubIO.readStrings().fromTopic("projects/project-id/topics/raspi3"))
            .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(2))))
            .apply(ParDo.of(new PubSubToNumFn()))
            .apply(Sum.<String>doublesPerKey())
            .apply(ParDo.of(new LoggingFn()));

        p.run();
    }

    public static class LoggingFn extends DoFn<KV<String, Double>, KV<String, Double>> {

        private static final Logger LOG = LoggerFactory.getLogger(LoggingFn.class);

        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, Double> kv = c.element();
            double sum = kv.getValue();

            LOG.info(String.valueOf(sum));
            c.output(kv);
        }
    }
}
package com.example;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.values.KV;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubToNumFn extends DoFn<String, KV<String, Double>> {

    private final Logger LOG = LoggerFactory.getLogger(PubSubToNumFn.class);

    @ProcessElement
    public void processElement(ProcessContext c) {
        String json = c.element();
        Gson gson = new Gson();
        LOG.info(json);
        SensorData data = gson.fromJson(json, SensorData.class);
        if (data == null) {
            LOG.error("data is null");
        } else {
            LOG.info(data.getTemperature());
        }
        c.output(KV.of("temperature", Double.valueOf(data.getTemperature())));
    }
}
package com.example;

public class SensorData {
    public String temperature;
    public SensorData(String temperature) {
        this.temperature = temperature;
    }
    public String getTemperature() {
        return this.temperature;
    }
}

実行は↓

mvn compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<<project-id>> --tempLocation=gs://path/to/temp/ --stagingLocation=gs://path/to/staging/ --runner=DataflowRunner"

maven exec:javaコマンドライン引数で、「--project=」のような形でクラウドの情報を入れて実行すると、コードの中でPipelineOptionsFactoryに引き渡され、Pipelineをrun()するとCloud Dataflowに自動でデプロイしてくれる。 Cloud Dataflowへデプロイし、コンソールで実行が確認出来たら、センサーデータを送信する。
センサデータの送信は↓でやった通りの手順。30秒単位で送信するため、setIntervalの第二引数は30000として実行した。 kmth23.hatenablog.com

Cloud Dataflowのコンソール上で、気温の合計値が、2分毎にログ出力されれば成功。

過去のQiitaのBeamの記事などを見ながらだと、SDKのバージョンが違うため仕様が変わっており、コンパイルエラーになることが多かった。 公式のドキュメントを見ながら進めるのが、結局最短で学習できるかもしれない。

RaspberryPi3からGoogle Cloud IoTへGrovePi+のセンサデータを送信

RaspberryPi3につないだGrovePi+のセンサデータをGCP Cloud IoTへ送信した。
以下、Cloud IoTのQuickStart(https://cloud.google.com/iot/docs/quickstart?hl=ja)に従って進める。
※Rasberry Pi3はModel B(+ではない)。OSはraspbian。

cat /etc/debian_version
=>8.0

事前準備

プロジェクト作成(既存でもOK)、課金を有効、Cloud IoT Core and Cloud Pub/Sub API(複数)を有効にする。

バイス側の設定

Google Cloud SDKインストール

手順:https://cloud.google.com/sdk/docs/?hl=ja#deb
DebianUbuntu」タブの内容に従いインストールする。
※毎回exportするのは面倒なので、今回は、~/.profileに追記した。また、追加コンポーネントのインストールはなし。

echo "export CLOUD_SDK_REPO=\"cloud-sdk-$(lsb_release -c -s)\"" >> ~/.profile
exec bash -l
echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update && sudo apt-get install google-cloud-sdk
gcloud init

gcloud initを実行すると、ブラウザが開いてGoogleへのログインを求められる。
事前準備で作成したプロジェクトのアカウントでログインすると、Google Cloud SDKが許可を求める画面に遷移するので、「許可」ボタンを押すと認証完了。

コンソールに戻ると、対話形式で、使用するプロジェクト、リージョン/ゾーンを聞かれる。
今回は、事前準備で作成したプロジェクト、asia-northeast1-b(東京リージョンのbゾーン)、を選択した。

※リージョン/ゾーンによって、マシンのスペックも変わるので注意(選択可能なリージョン/ゾーン一覧:https://cloud.google.com/compute/docs/regions-zones/regions-zones#available)。

Node.jsインストール

インストールは下記記事を参照。2018/03/22現在、最新のLTSはv8.10.0 kmth23.hatenablog.com

node -v
=>v8.10.0
npm -v
=>5.6.0

バイスの登録

Cloud IoTのコンソール(https://console.cloud.google.com/iot?hl=ja)で作業。
まず「レジストリ」を作成し、「レジストリ」にデバイスを追加する。

「Create device registry.」ボタンを押して下記のように設定。

「作成」ボタンを押す。(これでレジストリの作成が完了)
遷移したページで「端末を追加」ボタンを押して下記のように設定。

  • 端末 ID: 任意のデバイスID
  • 端末の通信: 許可
  • 認証: 設定しない(デフォルトのままにする)
  • 端末メタデータ: 設定しない(デフォルトのままにする)

「追加」ボタンを押す。(これでデバイスの登録が完了) コンソールはこのまま開いておく。

バイスに公開鍵を追加

opensslがない場合は、事前にインストールする。 下記のコマンドで、rsa_cert.pem(公開鍵)、rsa_private.pem(秘密鍵)を作成する。

cd /tmp
openssl req -x509 -newkey rsa:2048 -keyout rsa_private.pem -nodes -out rsa_cert.pem -subj "/CN=unused"

開いたままにしておいたコンソールで「公開鍵を追加」ボタンを押し、rsa_cert.pemの内容をコピーして張り付ける。

  • 入力方法: 手動で入力
  • 公開鍵の形式: RS256_X509
  • 公開鍵の有効期限: 設定しない

「追加」ボタンを押す。

サンプルで動作確認

サンプルをgit cloneで取得する。gitがない場合は、事前にインストールする。
下記コマンドについては、これまでに作成した情報に従い、<<PROJECT_ID>>をプロジェクトID、<<TOPIC_NAME>>をトピック名、<<REGISTRY_ID>>をレジストリID、<<DEVICE_ID>>をデバイスIDに置き換えること。 <<任意のサブスクリプション名>>は、任意のサブスクリプション名に置き換えること。

また、jsファイルの実行時には、--cloudRegionの指定を忘れないこと(デフォルトはus-central1。今回はasia-east1にしたので指定が必要)。
--numMessagesで送信するデータ数を指定できる。今回はテストのため1メッセージだけ送信する。

cd 任意の作業dir
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples
cd nodejs-docs-samples/iot/mqtt_example
cp /tmp/rsa_private.pem .
npm install
gcloud pubsub subscriptions create \
    projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>> \
    --topic=projects/<<PROJECT_ID>>/topics/<<TOPIC_NAME>>
node cloudiot_mqtt_example_nodejs.js \
    --projectId=<<PROJECT_ID>> \
    --registryId=<<REGISTRY_ID>> \
    --deviceId=<<DEVICE_ID>> \
    --privateKeyFile=rsa_private.pem \
    --numMessages=1 \
    --algorithm=RS256 \
    --cloudRegion=asia-east1
gcloud pubsub subscriptions pull --auto-ack \
    projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>>

jsファイルの実行で、1つのデータがpublishされる。gcloud pubsub subscriptions pullコマンドでsubscribeできれば成功。

センサー情報の取得

AWS IoTにデータ送信したときと同じ仕組みを使う。 kmth23.hatenablog.com

まず、pythonでGrovePi+につないだセンサー情報を取得する。
次に、動作確認で使ったJavaScriptコードを参考にして、mqttでCloud IoTへデータを送信するコードを書く。
pythonコードの実行は、AWS IoTの時と同様、child_processのexecファンクションを使用する。

まずは、package.jsonを作成し、必要なライブラリをインストール。

{
  "name": "grovepi-test",
  "version": "0.0.1",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "jsonwebtoken": "7.4.1",
    "mqtt": "2.15.0",
    "yargs": "8.0.2"
  },
  "devDependencies": {}
}
npm install

次に、実行ファイルを、index.jsとして作成。
5分間隔でセンサーデータを取得し、Cloud IoTへ送信する。
pythonコードは、/path/to/python/script.py。

// This software includes the work that is distributed in the Apache License 2.0

'use strict';

const fs = require('fs');
const jwt = require('jsonwebtoken');
const mqtt = require('mqtt');
const exec = require('child_process').exec;

var argv = require(`yargs`)
  .options({
    projectId: {
      default: process.env.GCLOUD_PROJECT || process.env.GOOGLE_CLOUD_PROJECT,
      description: 'The Project ID to use. Defaults to the value of the GCLOUD_PROJECT or GOOGLE_CLOUD_PROJECT environment variables.',
      requiresArg: true,
      type: 'string'
    },
    cloudRegion: {
      default: 'us-central1',
      description: 'GCP cloud region.',
      requiresArg: true,
      type: 'string'
    },
    registryId: {
      description: 'Cloud IoT registry ID.',
      requiresArg: true,
      demandOption: true,
      type: 'string'
    },
    deviceId: {
      description: 'Cloud IoT device ID.',
      requiresArg: true,
      demandOption: true,
      type: 'string'
    },
    privateKeyFile: {
      description: 'Path to private key file.',
      requiresArg: true,
      demandOption: true,
      type: 'string'
    },
    algorithm: {
      description: 'Encryption algorithm to generate the JWT.',
      requiresArg: true,
      demandOption: true,
      choices: ['RS256', 'ES256'],
      type: 'string'
    },
    mqttBridgeHostname: {
      default: 'mqtt.googleapis.com',
      description: 'MQTT bridge hostname.',
      requiresArg: true,
      type: 'string'
    },
    mqttBridgePort: {
      default: 8883,
      description: 'MQTT bridge port.',
      requiresArg: true,
      type: 'number'
    },
    messageType: {
      default: 'events',
      description: 'Message type to publish.',
      requiresArg: true,
      choices: ['events', 'state'],
      type: 'string'
    }
  })
  .example(`node $0 cloudiot_mqtt_example_nodejs.js --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t --cloudRegion=us-central1`)
  .wrap(120)
  .recommendCommands()
  .epilogue(`For more information, see https://cloud.google.com/iot-core/docs`)
  .help()
  .strict()
  .argv;

function createJwt (projectId, privateKeyFile, algorithm) {
  const token = {
    'iat': parseInt(Date.now() / 1000),
    'exp': parseInt(Date.now() / 1000) + 20 * 60, // 20 minutes
    'aud': projectId
  };
  const privateKey = fs.readFileSync(privateKeyFile);
  return jwt.sign(token, privateKey, { algorithm: algorithm });
}

const mqttClientId = `projects/${argv.projectId}/locations/${argv.cloudRegion}/registries/${argv.registryId}/devices/${argv.deviceId}`;
const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`;

let connectionArgs = {
  host: argv.mqttBridgeHostname,
  port: argv.mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method'
};

let client = mqtt.connect(connectionArgs);
client.subscribe(`/devices/${argv.deviceId}/config`);

client.on('connect', (success) => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else {
    setInterval(() => {
      exec('python /path/to/python/script.py', (error, stdout, stderr) => {
        if (error !== null) {
          console.log('exec error: ' + error);
          return
        }
        var data = stdout.replace(/\r?\n/g,"");
        var datas = data.split(",")
        var record = {
          registryid: argv.registryId,
          deviceid: argv.deviceId,
          timestamp: datas[0],
          temperature: Number(datas[1]),
          humidity: Number(datas[2]),
          moisture: Number(datas[3]),
          light: Number(datas[4]),
          location: datas[5] + "," + datas[6]
        };
        const payload = JSON.stringify(record);
        console.log("Publish: " + payload);
        client.publish(mqttTopic, payload, { qos: 1 });
      });
      return;
    }, 300000);
  }
});

client.on('close', () => {
  console.log('close');
});

client.on('error', (err) => {
  console.log('error', err);
});

client.on('message', (topic, message, packet) => {
  console.log('message received: ', Buffer.from(message, 'base64').toString('ascii'));
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

実行コマンドは下記。rsa_private.pemはあらかじめコピーしておくこと。

cp /tmp/rsa_private.pem .
npm start \
    --projectId=<<PROJECT_ID>> \
    --registryId=<<REGISTRY_ID>> \
    --deviceId=<<DEVICE_ID>> \
    --privateKeyFile=rsa_private.pem \
    --algorithm=RS256 \
    --cloudRegion=asia-east1

subscribeすると、下記のようなデータが取得できるはず。

gcloud pubsub subscriptions pull --auto-ack \
    projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>>
=>{
    "registryid":"<<REGISTRY_ID>>",
    "deviceid":"<<DEVICE_ID>>",
    "timestamp":"2018-03-23T16:19:48+09:00",
    "temperature":25.2,
    "humidity":30.4,
    "moisture":0,
    "light":206
    }

ストリーム処理を勉強してみた

素人ながら興味があったので少し勉強した。
以下はその際のメモ。

基本的には、↓のqiitaとslideshareを最初のインプットにさせていただいて、自分なりに調べていった。
https://qiita.com/kimutansk/items/60e48ec15e954fa95e1c
https://www.slideshare.net/SotaroKimura/ss-72769963

ストリーム処理の歴史

slideshareより抜粋。
2011年~ Lambdaアーキテクチャ
2013年~ Kappaアーキテクチャ
2015年~ Dataflowモデル

Lambdaアーキテクチャについては、オライリー本が日本語で出ている。

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

これは近年ではあまり使われていないのかもしれないが、素人が勉強するには、知っておいて損はない情報が詰まっていると思う。
ツールとしてはApache Stormが使われており、古い感じもするが、Spark streamingやWindow集計についても、(コラム程度だが)既に触れられていた。
日本語情報で体系的に学べる数少ない資料だと思う。

Kappaアーキテクチャについては、正直よく分からない。(日本語の情報が少ない)
公式サイトが存在しており、URLは↓。
http://milinda.pathirage.org/kappa-architecture.com/
「KappaアーキテクチャはLambdaアーキテクチャからバッチ処理を除去したようなもの」と説明されている。
基本的には、Apache Kafkaのような複数サブスクライブができるキューにデータを集約し、ストリーミング処理はキューからデータを取得して実施するらしい。
ストリーム処理システムとしては、Apache Storm、Apache Spark、Kafka Stream、Apache Flinkなどが挙げられている。
Kafkaに集約して、(分散バッチ処理は行わず)ストリーム処理を行う形は、よく見かける気がするけど、もともとはKappaアーキテクチャからきているのだろうか?

また、Flinkがツールとして挙げられているので、後述のDataflowモデルのOSSであるApache Beamと連携させても良いのかもしれない。
slideshareでも、DataflowモデルはLambdaアーキテクチャやKappaアーキテクチャを置き換えるものではないと説明されている)

Dataflowモデルについても、同じ方がqiitaにまとめてくれている。
https://qiita.com/kimutansk/items/d6daca473440462634a0
また、(情報が古いが)Cloud DataflowとApache Sparkとの比較記事も分かりやすいと思う。
https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison?hl=ja

どちらの記事にもリンクがある、
The World Beyond Batch: Streaming 101
The World Beyond Batch: Streaming 102
や、
VLDB 2015 の Dataflow モデル の論文
の内容が基本的な考え方になっているよう。

Dataflowモデルを使うには、GCPのCloud Dataflowを使うのが一番早いと思う。
コードの実装には、Apache BeamがOSSとして提供されている。
Beamはエンジンとして、別のOSSApache FlinkやApache Apexなどが利用でき、GCPのCloud Dataflowもエンジンとして使える。
そのため、Apache Beamでコードを書けば、GCPでも、オンプレミスでもストリーム処理を行うことができる。
GCPで使うなら、Cloud Dataflow SDKを使う方が、機能が豊富になる。
https://cloud.google.com/dataflow/docs/installing-dataflow-sdk?hl=ja

Apache Beamを使った実装などの例は、情報がある程度多くあり、Window集計についての説明も色々な方がまとめてくれている印象。
実装はバージョンごとに差異もありそうなので、注意が必要そうだが、Raspberry Piのセンサデータを、Apache Beamでストリーム処理するようなことをやってみたいと思う。

RaspberryPi3にmavenをインストール

環境変数JAVA_HOMEを設定する必要がある。
Javaはインストール済みのため、JAVA_HOMEの場所を調べる。

which java
=>/usr/bin/java

ls -l /usr/bin/java
=>/usr/bin/java -> /etc/alternatives/java

ls -l /etc/alternatives/java
=>/etc/alternatives/java -> /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre/bin/java

jdkの場所(/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt)をJAVA_HOMEとして環境変数に設定する。

echo "export JAVA_HOME=/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt" >> ~/.profile
source ~/.profile

mavenをインストールする。2018/3/25現在の最新は、v3.5.3。

cd /tmp
wget http://ftp.kddilabs.jp/infosystems/apache/maven/maven-3/3.5.3/binaries/apache-maven-3.5.3-bin.tar.gz
tar xzvf apache-maven-3.5.3-bin.tar.gz
sudo mv apache-maven-3.5.3 /opt
echo "export PATH=/opt/apache-maven-3.5.3/bin:$PATH" >> ~/.profile
source ~/.profile

確認

mvn -v
=>Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /opt/apache-maven-3.5.3
Java version: 1.8.0_65, vendor: Oracle Corporation
Java home: /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.8-v7+", arch: "arm", family: "unix"

Sigfox Shield for Arduino (UnaShield) を使ってセンサー情報をSORACOMへ送信

京セラコミュニケーションシステム株式会社(KCCS)がSigfoxをArduinoで使用できるシールドを発売した。
加速度センサ(MMA8451Q)、温湿度・気圧センサ(BME280)を搭載している。

www.kccs.co.jp

ソラコムと、スイッチサイエンスで販売しているが、今回はソラコムから購入した。(ソラコムの方が400円ほど高いので注意)
いずれから購入した場合も、1年間のSigfox回線利用料が含まれている。
シールドのみの販売で、Arduino本体は別売りなので注意。また2年目以降は、別途通信料が発生する。

スイッチサイエンスから購入すると、センサー情報をSigfox社のクラウドに送信・蓄積ができる。クラウドWebは日本語対応してない?が、データは1年保存してくれる(Sigfoxクラウドについて参照)。2年目以降は別途Sigfox契約が必要らしいが、どのように契約したらいいのかはよくわからない。。(KCCSの取扱説明書と、スイッチサイエンスの商品紹介を参照)

一方、ソラコムから購入すると、ソラコムAirなどと同じように、ユーザーコンソールで利用ができ、2年目以降も継続利用可能(料金は発生する)。ただしデータ蓄積はSORACOM Harvestの利用が必要で、データ保存期間は40日のみ。SORACOM Beamなどを使えば、AWSなどにデータ転送することは可能。

安く済むのはスイッチサイエンス版かもしれないが、ソラコムのアカウントは既に持っており、コンソールも使いやすかったので、今回はソラコムにした。

品薄だったのか、発注から手元に届くまで9日かかった。ただ4日目に、発送予定日と「遅れて申し訳ない」との旨をメールで連絡はいただけた。

ソラコムから購入するには、ソラコムアカウントの作成が必要。詳しくは、SORACOM Air for Sigfox の利用方法を参照。

Sigfoxとは

フランスで設立された通信事業者のSIGFOX社が提供する、低価格・低消費電力・長距離伝送を特長とした、グローバルIoTネットワーク。日本ではKCCSが展開している。
LPWA(Low Power=省電力、Wide Area=広域エリア)ネットワークの一つ。
アンライセンス系 LPWAと言われ、類似の通信規格にはLoRaWANがある。
ライセンス系 LPWAには、LTE Cat0, M1, NB-IoTがあるそう。 このあたりはソラコムの解説(LPWA とは? Low Power Wide Area)が詳しい。

Sigfoxについてはこちらも参照。

設定方法

※シールドを取り付けるArduino本体は別途必要。今回は、Arduino Uno R3を別途購入した。

ソラコムのSigfox Shield for Arduino をセットアップするに従えば設定が完了する。

ただ、↑のページで「Sigfox デバイスの受取確認をする」とあるが、「受け取り確認」ボタンを押すとエラーが発生して出来なかった※。
そのため、「Sigfoxデバイス管理」メニューから、「SigfoxデバイスID」と「PAC(Porting Authorization Code)」を自分で登録した。
バイスIDと、PACは、Arduinoシールド本体に貼ってあるQRコードを読み込むと分かる(KCCSの取扱説明書参照)。 ただQRコードが小さく、読み取りに苦戦。。自分のスマホでは読み取れず、手持ちの別スマホで無事読み取れた。スマホが古いのが悪かったのか読み取りアプリが悪かったのか。。

あと↑のページを見る限り、「受け取り確認」を押すと、「Sigfox-Harvest」という「Sigfoxグループ」が作られ、デバイスに自動で紐づけしてくれている?ようなのだが、これも自分で設定した。(コンソールから勘でやった)

※エラーメッセージを残すの忘れてた。。手動登録後は「サーバ内部でエラーが発生しました。メッセージ:400 Bad Request」と出る。初めは、「nullがどうたらこうたら。。」みたいなメッセージだった気がする)

シールドのセットアップは簡単。アンテナを付けて、Arduinoに差せば終わり。

Arduino 開発環境は、Arduino Desktop IDEの、Windowsインストーラーをダウンロードし、全てデフォルト設定のまま、Windows10にインストールした。
バージョンは、ARDUINO 1.8.5。

あとは、Sigfox Shield for Arduino をセットアップするのページ通りサンプルスケッチを実行すれば、気温データの送信に成功し、SORACOM Harvestで確認ができた。
データ収集・蓄積が不要であれば、SORACOM HarvestはOFFのままでよいと思う。
データの転送をしたいならば、SORACOM Beam などを使う必要がある。

RaspberryPi3にCloudPiをインストールしてNAT越え

スイッチサイエンスここを参考にした。

CloudPiの購入

CloudPiはここに購入リンクがあり、スイッチサイエンス経由Amazon経由で購入できる。
購入すると、UIDが割り振られたカードが送付される。
このUIDを、RaspberryPi3にインストールするCloudPiモジュールの設定ファイルに記入することで、CloudPiの機能を使うことが出来る。

RaspberryPi3にインストール

$ mkdir /home/pi/cloudpi
$ cd /home/pi/cloudpi

参考サイトでは、CloudPi用のサーバモジュールをwgetで取得しているが、リンク切れしているため、ここから、Raspberry Pi版のモジュール(p2ptunnel_v100.tar.gz)をダウンロードする。
ダウンロードしたモジュールは、WinSCPなどを使ってRaspberryPiに配置し、解凍する。

$ tar zxvf p2ptunnel_v100.tar.gz

解凍してできたディレクトリに移動し、次のコマンドを実行

$ cd p2ptunnel
$ sudo cp cloudpi /etc/init.d/cloudpi
$ sudo chmod 755 /etc/init.d/cloudpi

UIDを確認し、cloudpi.confファイルを編集してUIDとパスワードを記述します。

$ vi cloudpi.conf

以下の記述をcloudpi.confに記入する。

uid=割り振られたUID
password=任意のパスワード

記入後、以下のコマンドを実行し、RaspberryPi3起動時に自動でCloudPiが起動するようにする。

$ sudo insserv cloudpi

以下のコマンドで、CloudPiを起動する。

$ service cloudpi start

クライアント側の設定

Windows PCからRaspberryPi3へ、NAT越えでssh接続する。
ここから、Windows版クライアントモジュールをダウンロードし、解凍する。
解凍したフォルダ内の「P2PTunnel.exe」を実行する。 「追加」ボタンを押すと、 任意の名前、割り振られたUID、サーバモジュールで決めたパスワード、を記述し、「ポート設定」を押す。
今回は、sshで接続を行うため、ローカルの2222番ポートに、RaspberryPi3の22番ポートを割り当てる。
設定後、「接続」ボタンを押すと、localhostの2222番が、RaspberryPi3の22番ポートとして使えるようになっている。(NAT越えも可能)
teratermなどで、localhost:2222でログインできれば成功。

RaspberryPi3上でWEBサーバなどを起動している場合も、ポートを適切に設定しておけば、localhostでWindowsPCからアクセスできるようになる。

RaspberryPi3からAWS IoTへセンサデータの送信

AWS IoT SDK for JavaScriptのインストール

githubのREADMEを参考に、AWS IoT SDKをインストール
nodejsのバージョンは

node -v
v4.4.4

AWS IoT SDKインストール

npm install aws-iot-device-sdk
cd /home/pi (任意のディレクトリでOK)
git clone https://github.com/aws/aws-iot-device-sdk-js.git
cd aws-iot-device-sdk-js
npm install

センサー情報の取得

センサー情報はGrove+を使って取得する
AWS IoT SDKはnode.jsを使っているけれど、センサーデータはpythonで取得する
まずはpythonのコードを作成する

#!/usr/bin/env python
import sys
sys.path.append('/home/pi/Desktop/GrovePi/Software/Python/')
import time
import datetime
from grovepi import *
import serial

# for GPS
ser = serial.Serial('/dev/ttyAMA0',  9600, timeout = 0) #Open the serial port at 9600 baud
ser.flush()

class GPS:
        inp=[]
        GGA=[]
        def read(self):
                while True:
                        GPS.inp=ser.readline()
                        if GPS.inp[:6] =='$GPGGA': # GGA data , packet 1, has all the data we need
                                break
                        time.sleep(0.1)
                try:
                        ind=GPS.inp.index('$GPGGA',5,len(GPS.inp))
                        GPS.inp=GPS.inp[ind:]
                except ValueError:
                        hoge="fuga"
                GPS.GGA=GPS.inp.split(",")
                return [GPS.GGA]

        def vals(self):
                time=GPS.GGA[1]
                lat=GPS.GGA[2]
                lat_ns=GPS.GGA[3]
                long=GPS.GGA[4]
                long_ew=GPS.GGA[5]
                fix=GPS.GGA[6]
                sats=GPS.GGA[7]
                alt=GPS.GGA[9]
                return [time,fix,sats,alt,lat,lat_ns,long,long_ew]

# for Japan Timestamp
class JapanTZ(datetime.tzinfo):
    def tzname(self, dt):
        return "JST"
    def utcoffset(self, dt):
        return datetime.timedelta(hours=9)
    def dst(self, dt):
        return datetime.timedelta(0)


ptemperature = 3
pmoisture = 0
plight = 1

gps = GPS()

pinMode(ptemperature,"INPUT")
pinMode(pmoisture,"INPUT")
pinMode(plight,"INPUT")
time.sleep(1)

[temperature, humidity] = dht(ptemperature, 1)
moisture = analogRead(pmoisture)

light = analogRead(plight)

gps.read()

[t,fix,sats,alt,lat,lat_ns,long,long_ew] = gps.vals()

try:
  latitude = str(float(lat)/100)
  longitude = str(float(long)/100)
except ValueError:
  latitude = 0
  longitude = 0

time.sleep(.5)

print("{0},{1},{2},{3},{4},{5},{6}".format(datetime.datetime.now(JapanTZ()).strftime('%Y-%m-%dT%H:%M:%S+09:00'), temperature, humidity, moisture, light, latitude, longitude))

ser.close()

次にnode.jsのコード
5分間隔でセンサーデータを取得し、AWS IoTクラウドへ送信する
private.pem.key、certificate.pem.crt、root-CA.crtは、AWS IoTのクラウドコンソールからダウンロードしておき、任意のディレクトリに配置しておく
pythonコードの実行は、child_processのexecファンクションを使用する(pythonの標準出力を、execファンクションに設定したコールバック関数の引数として受け取れる)

var awsIot = require('aws-iot-device-sdk');
var sys = require('sys')
var exec = require('child_process').exec;

var device = awsIot.device({
    privateKey: './certs/private.pem.key',
    clientCert: './certs/certificate.pem.crt',
    caCert: './certs/root-CA.crt',
    clientId: 'test_client',
    region: 'ap-northeast-1'
});

device.on('connect', function() {
  console.log('Connect!!');
  setInterval(function() {
    exec("python /path to python code", function(error, stdout, stderr) {
      if (error !== null) {
        console.log('exec error: ' + error);
        return
      }
      var data = stdout.replace(/\r?\n/g,"");
      var datas = data.split(",")
      var record = {
        deviceid: "pi_01",
        timestamp: datas[0],
        temperature: Number(datas[1]),
        humidity: Number(datas[2]),
        moisture: Number(datas[3]),
        light: Number(datas[4]),
        location: datas[5] + "," + datas[6]
      };
      var message = JSON.stringify(record);
      console.log("Publish: " + message);
      device.publish('pi_test', message);
    });
    return;
  }, 300000);
});