001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.fukurou.process;
017
018import java.util.HashMap;
019import java.util.Map;
020
021import org.opengion.fukurou.queue.QueueInfo;
022import org.opengion.fukurou.queue.QueueSend;
023import org.opengion.fukurou.queue.QueueSendFactory;
024import org.opengion.fukurou.util.Argument;
025import org.opengion.fukurou.util.LogWriter;
026
027/**
028 *Process_QueueSendは、MQ or SQSにメッセージキューを送信する、
029 *FirstProcessインタフェースの実装クラスです。
030 *
031 *@og.formSample
032 * 1)MQにメッセージを送信する場合
033 *   java -DmqUserId=[mqユーザid] -DmqPassword=[mqパスワード] -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_QueueSend -queueType=MQ -jmsServer=[mqサーバー] -groupId=[グループID] -message=[メッセージ]
034 * 2)SQSにメッセージを送信する場合
035 *   java -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_QueueSend -accessKey=[awsアクセスキー] -secretKey=[awsシークレットキー] -queueType=SQS -jmsServer=[sqsサーバー] -groupId=[グループID] -message=[メッセージ]
036 *
037 *※proxy環境から、外部のMQやSQSサーバにはプロキシ情報を渡して、実行する必要があります。
038 *-Dhttp.proxyHost=[proxyホスト] -Dhttp.proxyPort=[proxyポート] -Dhttps.proxyHost=[proxyホスト] -Dhttps.proxyPort=[proxyポート]
039 *
040 * -queueType=キュータイプ       :MQ or SQS
041 * -jmsServer=キューサーバー     :キューサーバーのURLを指定
042 * -groupId=グループID           :キュー格納先のグループID
043 * -message=送信メッセージ       :キューに格納するメッセージ
044 * [-sccessKey=アクセスキー]     :SQSに接続用のアクセスキーです(aws上で取得)
045 * [-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得)
046 * 
047 * コマンド例
048 * java -Dhttp.proxyHost=proxyhost -Dhttp.proxyPort=8080 -Dhttps.proxyHost=proxyhost -Dhttps.proxyPort=8080 -cp H:\sample\* ^
049 * org.opengion.fukurou.process.MainProcess ^
050 * org.opengion.fukurou.process.Process_Logger -logFile=System.out ^
051 * org.opengion.fukurou.process.Process_QueueSend -accessKey=[アクセスキー] -secretKey=[シークレットキー] -queueType=SQS ^
052 * -jmsServer=https://sqs.ap-northeast-1.amazonaws.com/000000000000/otfifo01.fifo -groupId=sample -message=sendMsg
053 *
054 * @og.rev 5.10.17.1 (2019/11/15) 新規追加
055 *
056 * @verion 5
057 * @since JDK7
058 */
059public class Process_QueueSend extends AbstractProcess implements FirstProcess{
060        private static final String name = "";
061        private static final Map<String, String> mustProperty;
062        private static final Map<String, String> usableProperty;
063
064        QueueSend queueSend;
065
066        private String queueType;
067        private String jmsServer;
068        private String groupId;
069        private String message;
070
071        static {
072                mustProperty = new HashMap<String,String>();
073                mustProperty.put("queueType", "キュータイプ");
074                mustProperty.put("jmsServer", "jms接続先");
075                mustProperty.put("groupId", "グループID");
076                mustProperty.put("message", "メッセージ");
077
078                usableProperty = new HashMap<String,String>();
079                // SQS用
080                usableProperty.put("accessKey", "アクセスキ");
081                usableProperty.put("secretKey",  "シークレットキー");
082        }
083
084        /**
085         * コンストラクター
086         */
087        public Process_QueueSend() {
088                super(name, mustProperty, usableProperty);
089        }
090
091        /**
092         * プロセスの初期化を行います。初めに一度だけ、呼び出されます。
093         * 初期処理(ファイルオープン、DBオープン等)に使用します。
094         *
095         * @param   paramProcess データベースの接続先情報などを持っているオブジェクト
096         */
097        @Override
098        public void init(ParamProcess paramProcess) {
099                Argument arg = getArgument();
100
101                queueType = arg.getProparty("queueType");
102                jmsServer = arg.getProparty("jmsServer");
103                groupId = arg.getProparty("groupId");
104                message = arg.getProparty("message");
105                final String accessKey = arg.getProparty("accessKey");
106                final String secretKey = arg.getProparty("secretKey");
107
108                queueSend = QueueSendFactory.newQueueSend(queueType);
109
110                // バッチ実行
111                queueSend.setBatchFlg(true);
112
113                // 接続処理
114                queueSend.connect(jmsServer, accessKey, secretKey);
115        }
116
117        /**
118         * プロセスの終了を行います。最後に一度だけ、呼び出されます。
119         * 終了処理(ファイルクローズ、DBクローズ等)に使用します。
120         *
121         * @param   isOK トータルで、OKだったかどうか[true:成功/false:失敗]
122         */
123        @Override
124        public void end(boolean isOK) {
125                queueType = "";
126                jmsServer = "";
127                groupId = "";
128                message = "";
129
130                if(queueSend != null) {
131                        queueSend.close();
132                }
133                queueSend = null;
134        }
135
136        /**
137         * このデータの処理において、次の処理が出来るかどうかを問い合わせます。
138         * この呼び出し1回毎に、次のデータを取得する準備を行います。
139         *
140         * @return 処理できる:true / 処理できない:false
141         **/
142        @Override
143        public boolean next() {
144                QueueInfo queueInfo = new QueueInfo();
145                queueInfo.setJmsServer(jmsServer);
146                queueInfo.setMqQueueName(groupId);
147                queueInfo.setSqsFifoGroupId(groupId);
148                queueInfo.setMessage(message);
149
150                queueSend.sendMessage(queueInfo);
151
152                return false;
153        }
154
155        /**
156         * 最初に、 行データである LineModel を作成します
157         * FirstProcess は、次々と処理をチェインしていく最初の行データを
158         * 作成して、後続の ChainProcess クラスに処理データを渡します。
159         *
160         * @param rowNo 処理中の行番号
161         *
162         * @return 処理変換後のLineModel
163         * */
164        @Override
165        public LineModel makeLineModel(int rowNo) {
166                // 後続のChainProcessは実行しません。
167                return null;
168        }
169
170        /**
171         * プロセスの処理結果のレポート表現を返します。
172         * 処理プログラム名、入力件数、出力件数などの情報です。
173         * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような
174         * 形式で出してください。
175         *
176         * @return   処理結果のレポート
177         */
178        @Override
179        public String report() {
180                final String report = "[" + getClass().getName() + "]" + CR
181                                + TAB + "queueType:" + queueType + CR
182                                + TAB + "jmsServer:" + jmsServer + CR
183                                + TAB + "gropuId:" + groupId + CR
184                                + TAB + "message:" + message;
185                return report;
186        }
187
188        /**
189         * このクラスの使用方法を返します。
190         *
191         * @return      このクラスの使用方法
192         */
193        @Override
194        public String usage() {
195                StringBuilder buf = new StringBuilder();
196
197                buf.append("Process_QueueSendは、MQ or SQSにメッセージキューを送信する、").append( CR );
198                buf.append("FirstProcessインタフェースの実装クラスです。").append(CR);
199                buf.append(CR);
200                buf.append("-queueType=キュータイプ   :MQ or SQS").append(CR);
201                buf.append("-jmsServer=キューサーバー :キューサーバーのURLを指定").append(CR);
202                buf.append("-groupId=グループID       :キュー格納先のグループID").append(CR);
203                buf.append("-message=送信メッセージ   :キューに格納するメッセージ").append(CR);
204                buf.append("[-sccessKey=アクセスキー]").append(CR);
205                buf.append("[-secretKey=シークレットキー]").append(CR);
206                buf.append( CR ).append( CR );
207                buf.append( getArgument().usage() ).append( CR );
208
209                return buf.toString();
210        }
211
212        /**
213         * このクラスは、main メソッドから実行できません。
214         *
215         * @param       args    コマンド引数配列
216         */
217        public static void main(final String[] args) {
218                LogWriter.log(new Process_QueueSend().usage());
219        }
220
221}