001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.process; 017 018import java.io.File; 019import java.util.HashMap; 020import java.util.Map; 021 022import org.opengion.fukurou.business.BizUtil; 023import org.opengion.fukurou.queue.QueueInfo; 024import org.opengion.fukurou.queue.QueueReceive; 025import org.opengion.fukurou.queue.QueueReceiveFactory; 026import org.opengion.fukurou.util.Argument; 027import org.opengion.fukurou.util.LogWriter; 028 029/** 030 *Process_QueueReceiveは、MQ or SQSにメッセージキューを受信する、 031 *FirstProcessインタフェースの実装クラスです。 032 * 033 *受信したメッセージを指定したbizlogicファイルで処理を行います。 034 *bizlogicファイルでデータベース処理を行うため、 035 *Process_DBParamを前処理として実行します。 036 * 037 *@og.formSample 038 * 1)MQからメッセージを受信する場合 039 * java -DmqUserId=[mqユーザid] -DmqPassword=[mqパスワード] -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam -infoUSER=[ユーザーID] -infoPGID=process -configFile=[DBConfigパス] org.opengion.fukurou.process.Process_QueueReceive -accessKey=[アクセスキー] -secretKey=[シークレットキー] -webinfDir=[WEB-INFパス] -bizsrcDir=[bizディレクトリパス] -queueType=MQ -jmsServer=[mqサーバのurl] -groupId=[グループID] -systemId=[システムID] -logicName=[bizlogicファイル名] 040 * 2)SQSからメッセージを受信する場合 041 * java -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam -infoUSER=[ユーザーID] -infoPGID=process -configFile=[DBConfigパス] org.opengion.fukurou.process.Process_QueueReceive -accessKey=[アクセスキー] -secretKey=[シークレットキー] -webinfDir=[WEB-INFパス] -bizsrcDir=[bizディレクトリパス] -queueType=SQS -jmsServer=[sqsサーバのurl] -groupId=[グループID] -systemId=[システムID] -logicName=[bizlogicファイル名] 042 * 043 *※proxy環境から、外部のMQやSQSサーバにはプロキシ情報を渡して、実行する必要があります。 044 *-Dhttp.proxyHost=[proxyホスト] -Dhttp.proxyPort=[proxyポート] -Dhttps.proxyHost=[proxyホスト] -Dhttps.proxyPort=[proxyポート] 045 * 046 * -queueType=キュータイプ :MQ or SQS 047 * -jmsServer=キューサーバー :キューサーバーのURLを指定 048 * -groupId=グループID :キュー格納先のグループID 049 * -webinfDir=WEB-INFパス :WEB-INFのディレクトリパス(bizlogic用) 050 * -bizsrcDir=bizファイルパス :bizファイルディレクトリパス(bizlogic用) 051 * -systemId=システムID :システムID(bizlogic用) 052 * -logicName=ロジックファイル名 :bizLogicのファイル名(bizlogic用) 053 * [-sccessKey=アクセスキー] :SQSに接続用のアクセスキーです(aws上で取得) 054 * [-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得) 055 * 056 * コマンド例 057 * java -DmqUserId=admin -DmqPassword=admin -Dhttps.proxyHost=xxx-px^ 058 * -Dhttps.proxyPort=8081 -cp H:\sample\* ^ 059 * org.opengion.fukurou.process.MainProcess ^ 060 * org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam ^ 061 * -infoUSER=username -infoPGID=process -configFile=H:\sample\DBConfig.xml ^ 062 * org.opengion.fukurou.process.Process_QueueReceive ^ 063 * -webinfDir=H:\sample\gf\WEB-INF -bizsrcDir=H:\sample\gf\src\biz -queueType=MQ ^ 064 * -jmsServer=tcp://localhost:61616 -groupId=sample002 -systemId=GF -logicName=gf.TEST03g.opengion.fukurou.process.Process_QueueReceive ^ 065 * -webinfDir=H:\sample\gf\WEB-INF -bizsrcDir=H:\sample\gf\src\biz -queueType=MQ -jmsServer=tcp://localhost:61616 -groupId=sample002 ^ 066 * -systemId=GF -logicName=gf.TEST03 067 * 068 * @og.rev 5.10.17.1 (2019/11/15) 新規追加 069 * 070 * @verion 5 071 * @since JDK7 072 */ 073public class Process_QueueReceive extends AbstractProcess implements FirstProcess{ 074 private static String name; 075 private static Map<String, String> mustProperty; 076 private static Map<String, String> usableProperty; 077 078 QueueReceive queueReceive; 079 080 private String queueType; 081 private String jmsServer; 082 private String groupId; 083 private String systemId; 084 private String logicName; 085 private String receiveMessage; 086 087 static { 088 mustProperty = new HashMap<String, String>(); 089 mustProperty.put("queueType", "キュータイプ"); 090 mustProperty.put("jmsServer", "jms接続先"); 091 mustProperty.put("groupId", "グループID"); 092 mustProperty.put("webinfDir", "WEB-INFディレクトリ"); 093 mustProperty.put("bizsrcDir", "bizのソースファイルディレクトリ"); 094 mustProperty.put("systemId", "システムID"); 095 mustProperty.put("logicName", "ロジック名"); 096 097 usableProperty = new HashMap<String, String>(); 098 // SQS用 099 usableProperty.put("accessKey", "アクセスキ"); 100 usableProperty.put("secretKey", "シークレットキー"); 101 } 102 103 /** 104 * コンストラクター 105 */ 106 public Process_QueueReceive() { 107 super(name, mustProperty, usableProperty); 108 } 109 110 /** 111 * このクラスの使用方法を返します。 112 * 113 * @return このクラスの使用方法 114 */ 115 @Override 116 public String usage() { 117 StringBuilder buf = new StringBuilder(); 118 119 buf.append("Process_QueueReceiveは、MQ or SQSにメッセージキューを受信する、").append(CR); 120 buf.append("FirstProcessインタフェースの実装クラスです。").append(CR); 121 buf.append(CR); 122 buf.append("-queueType=キュータイプ :MQ or SQS").append(CR); 123 buf.append("-jmsServer=キューサーバー :キューサーバーのURLを指定").append(CR); 124 buf.append("-groupId=グループID :キュー格納先のグループID").append(CR); 125 buf.append("-webinfDir=WEB-INFパス :WEB-INFのディレクトリパス(bizlogic用)").append(CR); 126 buf.append("-bizsrcDir=bizファイルパス :bizファイルディレクトリパス(bizlogic用)").append(CR); 127 buf.append("-systemId=システムID :システムID(bizlogic用)").append(CR); 128 buf.append("-logicName=ロジックファイル名 :bizLogicのファイル名(bizlogic用)").append(CR); 129 buf.append("[-sccessKey=アクセスキー] :SQSに接続用のアクセスキーです(aws上で取得)").append(CR); 130 buf.append("[-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得)").append(CR); 131 buf.append( CR ).append( CR ); 132 buf.append( getArgument().usage() ).append( CR ); 133 134 return null; 135 } 136 137 /** 138 * プロセスの初期化を行います。初めに一度だけ、呼び出されます。 139 * 初期処理(ファイルオープン、DBオープン等)に使用します。 140 * 141 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 142 */ 143 @Override 144 public void init(ParamProcess paramProcess) { 145 Argument arg = getArgument(); 146 147 queueType = arg.getProparty("queueType"); 148 jmsServer = arg.getProparty("jmsServer"); 149 groupId = arg.getProparty("groupId"); 150 systemId = arg.getProparty("systemId"); 151 logicName = arg.getProparty("logicName"); 152 final String accessKey = arg.getProparty("accessKey"); 153 final String secretKey = arg.getProparty("secretKey"); 154 155 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 156 157 queueReceive.setBatchFlg(true); 158 159 queueReceive.connect(jmsServer, accessKey, secretKey); 160 } 161 162 /** 163 * プロセスの終了を行います。最後に一度だけ、呼び出されます。 164 * 終了処理(ファイルクローズ、DBクローズ等)に使用します。 165 * 166 * @param isOK トータルで、OKだったかどうか[true:成功/false:失敗] 167 */ 168 @Override 169 public void end(boolean isOK) { 170 queueType = ""; 171 jmsServer = ""; 172 groupId = ""; 173 systemId = ""; 174 logicName = ""; 175 receiveMessage = ""; 176 177 if(queueReceive != null) { 178 queueReceive.close(); 179 } 180 } 181 182 /** 183 * このデータの処理において、次の処理が出来るかどうかを問い合わせます。 184 * この呼び出し1回毎に、次のデータを取得する準備を行います。 185 * 186 * @return 処理できる:true / 処理できない:false 187 **/ 188 @Override 189 public boolean next() { 190 191 QueueInfo queueInfo = queueReceive.receive(groupId); 192 193 if(queueInfo != null) { 194 receiveMessage = queueInfo.getMessage(); 195 196 try { 197 // bizlogic処理 198 callActBizLogic(systemId, logicName, receiveMessage); 199 } catch (Throwable te) { 200 String errMsg = "bizlogicの実行に失敗しました。" + te.getMessage(); 201 throw new RuntimeException(errMsg); 202 } 203 }else { 204 String errMsg = "メッセージキューが登録されていません。"; 205 throw new RuntimeException(errMsg); 206 } 207 return false; 208 } 209 210 /** 211 * bizLogic処理の呼び出し 212 * 必要なパス情報をリソースから取得して、 213 * BizUtil.actBizLogicにパス情報を渡すことで、 214 * bizLogicの処理を行います。 215 * 216 * @param systemId システムID 217 * @param logicName ロジックファイル名 218 * @param msgText メッセージ 219 * @throws Throwable エラー情報 220 */ 221 private void callActBizLogic(String systemId, String logicName, String msgText) throws Throwable { 222 Argument arg = getArgument(); 223 final String webinfDir = arg.getProparty("webinfDir"); 224 final String srcDir = arg.getProparty("bizsrcDir"); 225 226 // 対象 クラスパスの生成 227 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 228 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 229 // bizLogicTag.javaのコードを移植 230 StringBuilder sb = new StringBuilder(); 231 sb.append('.').append(File.pathSeparatorChar); 232 233 File lib = new File(webinfDir); 234 File[] libFiles = lib.listFiles(); 235 for (int i = 0; i < libFiles.length; i++) { 236 sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 237 } 238 final String classDir = webinfDir + File.pathSeparator + "classes"; 239 sb.append(classDir).append(File.pathSeparatorChar); 240 String classPath = sb.toString(); 241 242 boolean isAutoCompile = true; 243 boolean isHotDeploy = true; 244 245 // bizLogicに渡すパラメータ 246 String[] keys = new String[] { "message" }; 247 String[] vals = new String[] { msgText }; 248 249 // bizLogic処理の実行 250 BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 251 } 252 253 /** 254 * 最初に、 行データである LineModel を作成します 255 * FirstProcess は、次々と処理をチェインしていく最初の行データを 256 * 作成して、後続の ChainProcess クラスに処理データを渡します。 257 * 258 * @param rowNo 処理中の行番号 259 * 260 * @return 処理変換後のLineModel 261 * */ 262 @Override 263 public LineModel makeLineModel(int rowNo) { 264 // 後続のChainProcessは実行しません。 265 return null; 266 } 267 268 /** 269 * プロセスの処理結果のレポート表現を返します。 270 * 処理プログラム名、入力件数、出力件数などの情報です。 271 * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような 272 * 形式で出してください。 273 * 274 * @return 処理結果のレポート 275 */ 276 @Override 277 public String report() { 278 final String report = "[" + getClass().getName() + "]" + CR 279 + TAB + "queueType:" + queueType + CR 280 + TAB + "jmsServer:" + jmsServer + CR 281 + TAB + "gropuId:" + groupId + CR 282 + TAB + "systemId:" + systemId + CR 283 + TAB + "logicName" + logicName + CR 284 + TAB + "receiveMessage:" + receiveMessage + CR 285 ; 286 return report; 287 } 288 289 /** 290 * このクラスは、main メソッドから実行できません。 291 * 292 * @param args コマンド引数配列 293 */ 294 public static void main(final String[] args) { 295 LogWriter.log(new Process_QueueReceive().usage()); 296 } 297}