001package org.opengion.fukurou.queue; 002 003import java.util.ArrayList; 004import java.util.List; 005 006import javax.jms.JMSException; 007import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.Queue; 010import javax.jms.QueueConnection; 011import javax.jms.QueueConnectionFactory; 012import javax.jms.QueueReceiver; 013import javax.jms.QueueSession; 014import javax.jms.TextMessage; 015import javax.naming.Context; 016import javax.naming.InitialContext; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020/** 021 * MQメッセージ受信用クラス。 022 * 023 * @og.group メッセージ連携 024 * 025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 026 * 027 * @version 5 028 * @author oota 029 * @since JDK7 030 */ 031public class QueueReceive_MQ implements QueueReceive{ 032 033 private QueueConnection connection = null; 034 private QueueSession session = null; 035 private QueueReceiver receiver = null; 036 List<QueueReceiver> listReceiver = null; 037 private boolean batch = false; 038 039 /** 040 * 接続処理 041 * メッセージキューサーバに接続します。 042 * 043 * @param jmsServer jsmサーバ 044 * @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません) 045 * @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません) 046 */ 047 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 048 connect(jmsServer); 049 } 050 051 /** 052 * 接続処理 053 * jmsServerに接続します。 054 * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。 055 * SQSの場合は最大受信件数の10件の処理を行います。 056 * 057 * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL 058 */ 059 private void connect(final String jmsServer) { 060 try { 061 if(batch) { 062 // バッチ用 063 String mqUserId = System.getProperty("mqUserId"); 064 String mqPassword = System.getProperty("mqPassword"); 065 QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer); 066 connection = factory.createQueueConnection(mqUserId, mqPassword); 067 }else { 068 // jndi接続用 069 Context ctx = new InitialContext(); 070 QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer); 071 connection = factory.createQueueConnection(); 072 } 073 074 connection.start(); 075 076 // Receiveの作成 077 session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); 078 079 // 初期化 080 listReceiver = new ArrayList<QueueReceiver>(); 081 }catch(Exception e) { 082 throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage()); 083 } 084 } 085 086 /** 087 * 受信処理 088 * メッセージキューの受信の処理を行います。 089 * 090 */ 091 @Override 092 public QueueInfo receive(final String queueName) { 093 QueueInfo queueInfo = null; 094 095 try { 096 Queue queue = session.createQueue(queueName); 097 receiver = session.createReceiver(queue); 098 099 TextMessage msg = (TextMessage)receiver.receive(1000); 100 101 if(msg != null) { 102 // メッセージ受信の確認応答 103 msg.acknowledge(); 104 105 // メッセージの設定 106 queueInfo = new QueueInfo(); 107 queueInfo.setMessage(msg.getText()); 108 } 109 }catch(Exception e) { 110 throw new RuntimeException(e.getMessage()); 111 }finally { 112 try { 113 receiver.close(); 114 }catch(Exception e) {} 115 } 116 117 return queueInfo; 118 } 119 120 /** 121 * リスナーの起動 122 * 指定したキュー名に対して、 123 * MessageListenerのリスナーを設定します。 124 * 125 * @param queueName キュー名 126 * @param listener MessageListerを実装したクラス 127 */ 128 @Override 129 public void setListener(final String queueName, MessageListener listener) { 130 QueueReceiver receiver = null; 131 try { 132 Queue queue = session.createQueue(queueName); 133 receiver = session.createReceiver(queue); 134 receiver.setMessageListener(listener); 135 136 // リスナーの起動 137 listReceiver.add(receiver); 138 }catch(JMSException e) { 139 throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage()); 140 } 141 } 142 143 /** 144 * クローズリスナー 145 * レシーバーをクローズすることで、 146 * リスナーの処理を終了します。 147 */ 148 public void closeListener() { 149 for(QueueReceiver receiver: listReceiver) { 150 try { 151 receiver.close(); 152 }catch(Exception e) { 153 154 } 155 } 156 157 // 初期化 158 listReceiver = null; 159 listReceiver = new ArrayList<QueueReceiver>(); 160 } 161 162 /** 163 * クローズ処理 164 * クローズ処理を行います。 165 */ 166 @Override 167 public void close() { 168 if(receiver != null) { 169 try { 170 receiver.close(); 171 }catch(Exception e) { 172 173 } 174 } 175 if(session != null) { 176 try { 177 session.close(); 178 }catch(Exception e) { 179 180 } 181 } 182 if(connection != null) { 183 try { 184 connection.close(); 185 }catch(Exception e) { 186 187 } 188 } 189 } 190 191 /** 192 * バッチ処理判定フラグを設定します。 193 * 194 * @param batchFlg バッチ処理判定フラグ 195 */ 196 public void setBatchFlg(final Boolean batchFlg) { 197 batch = batchFlg; 198 } 199 200 /** 201 * 検証用メソッド 202 * テスト用のメソッドです。 203 * 204 * @param args 引数 205 */ 206 public static void main(String[] args) { 207 QueueReceive receive = new QueueReceive_MQ(); 208 final String jmsServer = "tcp://localhost:61616"; 209 210 // バッチフラグにtrueを設定 211 // 未設定の場合は、tomcatのjndi接続処理が実行されます。 212 receive.setBatchFlg(true); 213 214 // 認証情報の設定 215 System.setProperty("mqUserId", "admin"); 216 System.setProperty("mqPassword", "admin"); 217 218 // 接続 219 receive.connect(jmsServer, null, null); 220 221 // 処理対象のキュー名 222 String queueName = "queue01"; 223 224 225 // ** 1件受信する場合 226 QueueInfo queueInfo = receive.receive(queueName); 227 if(queueInfo != null) { 228 System.out.println("message:" + queueInfo.getMessage()); 229 }else { 230 System.out.println("キューが登録されていません。"); 231 } 232 233// // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ) 234// // MessageListerを実装した、QueueReceiveListenerクラスを作成します。 235// MessageListener listener = new QueueReceiveListener(); 236// receive.setListener(queueName, listener); 237// // 複数のキューにリスナーを設定することも可能です。 238// receive.setListener("queue02", listener); 239// 240// try { 241// // 1分間リスナーを起動しておく場合の、プロセス待機処理 242// Thread.sleep(60 * 1000); 243// }catch(InterruptedException e) { 244// throw new RuntimeException(e.getMessage()); 245// } 246 247 // リスナー利用時は、closeListenerを実行して、解放してください。 248 receive.closeListener(); 249 250 // 終了処理 251 receive.close(); 252 } 253 254 /** 255 * QueueReceiveリスナークラス 256 * リスナー用のクラスです。 257 * MQに設定することで、メッセージが受信されると、 258 * 自動的にonMessageメソッドが実行されます。 259 * 260 */ 261 static class QueueReceiveListener implements MessageListener { 262 /** 263 * メッセージ受信処理 264 * MQサーバにメッセージが受信されると、 265 * メソッドの処理が行われます。 266 * 267 * @param Message 受信メッセージ 268 */ 269 @Override 270 public void onMessage(final Message message) { 271 272 // メッセージ受信 273 TextMessage msg = (TextMessage) message; 274 String msgText = ""; 275 276 try { 277 // キューサーバのメッセージを取得 278 msgText = msg.getText(); 279 // メーッセージの受信応答を返します。 280 msg.acknowledge(); 281 282 System.out.println("message:" + msgText); 283 284 } catch (JMSException e) { 285 throw new RuntimeException(e.getMessage()); 286 } 287 } 288 } 289 290}