001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
005
006import javax.jms.QueueSession;
007
008// import org.hsqldb.lib.StringUtil;
009import org.opengion.fukurou.util.StringUtil;                                            // 7.0.6.0 (2019/10/07)
010import org.opengion.fukurou.queue.QueueInfo;
011import org.opengion.fukurou.queue.QueueSend;
012import org.opengion.fukurou.queue.QueueSendFactory;
013import org.opengion.fukurou.util.HybsTimerTask;
014import org.opengion.hayabusa.common.HybsSystem;
015import org.opengion.hayabusa.queue.DBAccessQueue;
016
017/**
018 * メッセージキュー送信
019 * メッセージキュー送信テーブルを監視して、
020 * 送信処理を行います。
021 *
022 * @og.group メッセージ連携
023 *
024 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
025 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
026 *
027 * @version 5.0
028 * @author oota
029 * @since JDK7
030 *
031 */
032public class Daemon_QueueSend extends HybsTimerTask {
033        /** このプログラムのVERSION文字列を設定します。   {@value} */
034        private static final String VERSION = "7.2.9.4 (2020/11/20)" ;
035
036        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
037        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
038        private static final int    LOOP_COUNTER         = 24;
039
040//      private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
041        private final String SYSTEM_ID  = HybsSystem.sys("SYSTEM_ID");
042        private final String USER_ID    = "CYYYYY";
043        private final String PG_ID              = "DMN_QueSnd";
044        private final String DMN_NAME   = "QueueReceiveDMN";
045        private final DBAccessQueue dbAccessQueue;
046
047        private int loopCnt ;
048        private QueueSend queueSend;
049
050        /**
051         * コンストラクター
052         * 初期処理を行います。
053         */
054        public Daemon_QueueSend(){
055                super();                                                                // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor
056                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
057        }
058        /**
059         * 開始処理
060         * タイマータスクのデーモン処理の開始ポイントです。
061         *
062         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
063         */
064        @Override
065        protected void startDaemon() {
066                if (loopCnt % LOOP_COUNTER == 0) {
067                        loopCnt = 1;
068                        System.out.println();
069//                      System.out.println(toString() + " " + new Date() + "");
070                        System.out.println(toString() + " " + new Date() );                             // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings
071                } else {
072                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
073                        final String[][] vals = dbAccessQueue.selectGE65();
074
075                        // 取得データ分の繰り返し処理を実行する
076                        for(int i = 0; i  < vals.length; i++) {
077                                final String[] record = vals[i];
078
079                                // GE65から取得した値を変数に格納
080                                final String ykno =  record[0];
081                                final String queueId = record[1];
082                                final String message = record[2];
083                                final String dedupliId = record[3];
084                                final String queSyu = record[4];
085                                final String jmsUrl = record[5];
086
087//                              final String queueType = queSyu.toUpperCase();
088                                final String queueType = queSyu.toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
089                                queueSend = QueueSendFactory.newQueueSend(queueType);
090
091                                // 接続処理
092                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
093
094                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
095                                final QueueInfo queueInfo = new QueueInfo();
096
097                                // 応答確認種別
098                                if("MQ".equals(queueType)){
099                                        // MQメッセージサーバ指定時
100                                        queueInfo.setMqTransacted(false);
101                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
102                                        // キュー名
103                                        queueInfo.setMqQueueName(queueId);
104                                }else if("SQS".equals(queueType)){
105                                        // SQSメッセージサーバ指定時
106                                        // グループID
107                                        queueInfo.setSqsFifoGroupId(queueId);
108                                        if(!StringUtil.isEmpty(dedupliId)) {
109                                                // 重複排除ID
110                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
111                                                queueInfo.setSqsFifoDedupliId(dedupliId);
112                                        }
113                                }
114
115                                // メッセージ
116                                queueInfo.setMessage(message);
117
118                                // 完了フラグを処理中:2に更新
119                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
120
121                                // メッセージ送信処理
122                                try{
123                                        queueSend.sendMessage(queueInfo);
124
125                                        // 完了フラグを完了:3に更新
126                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
127
128                                }catch(Exception e) {
129                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
130                                        dbAccessQueue.updateGE66Error(ykno, e.getMessage());
131                                }
132                        }
133
134                        // クローズ処理
135                        queueSend.close();
136
137                        loopCnt++;
138                }
139        }
140}