001package org.opengion.fukurou.queue; 002 003import java.util.ArrayList; 004import java.util.List; 005 006import javax.jms.JMSException; 007// import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.Queue; 010import javax.jms.QueueConnection; 011import javax.jms.QueueConnectionFactory; 012import javax.jms.QueueReceiver; 013import javax.jms.QueueSession; 014import javax.jms.TextMessage; 015import javax.naming.Context; 016import javax.naming.InitialContext; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020/** 021 * MQメッセージ受信用クラス。 022 * 023 * @og.group メッセージ連携 024 * 025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 026 * 027 * @version 5 028 * @author oota 029 * @since JDK7 030 */ 031public class QueueReceive_MQ implements QueueReceive{ 032 033 private QueueConnection connection ; 034 private QueueSession session ; 035 private QueueReceiver receiver ; 036// List<QueueReceiver> listReceiver ; 037 private List<QueueReceiver> listReceiver ; // 7.2.9.4 (2020/11/20) 038 private boolean batch ; 039 040 /** 041 * 接続処理 042 * メッセージキューサーバに接続します。 043 * 044 * @param jmsServer jsmサーバ 045 * @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません) 046 * @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません) 047 */ 048 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 049 connect(jmsServer); 050 } 051 052 /** 053 * 接続処理 054 * jmsServerに接続します。 055 * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。 056 * SQSの場合は最大受信件数の10件の処理を行います。 057 * 058 * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL 059 */ 060 private void connect(final String jmsServer) { 061 try { 062 if(batch) { 063 // バッチ用 064 final String mqUserId = System.getProperty("mqUserId"); 065 final String mqPassword = System.getProperty("mqPassword"); 066 final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer); 067 connection = factory.createQueueConnection(mqUserId, mqPassword); 068 }else { 069 // jndi接続用 070 final Context ctx = new InitialContext(); 071 final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer); 072 connection = factory.createQueueConnection(); 073 } 074 075 connection.start(); 076 077 // Receiveの作成 078 session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); 079 080 // 初期化 081 listReceiver = new ArrayList<QueueReceiver>(); 082 }catch(Exception e) { 083 throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage()); 084 } 085 } 086 087 /** 088 * 受信処理 089 * メッセージキューの受信の処理を行います。 090 * 091 * @param queueName キューの名前 092 * @return キュー情報格納クラス 093 */ 094 @Override 095 public QueueInfo receive(final String queueName) { 096 QueueInfo queueInfo = null; 097 098 try { 099 final Queue queue = session.createQueue(queueName); 100 receiver = session.createReceiver(queue); 101 102 final TextMessage msg = (TextMessage)receiver.receive(1000); 103 104 if(msg != null) { 105 // メッセージ受信の確認応答 106 msg.acknowledge(); 107 108 // メッセージの設定 109 queueInfo = new QueueInfo(); 110 queueInfo.setMessage(msg.getText()); 111 } 112 }catch(Exception e) { 113 throw new RuntimeException(e.getMessage()); 114 }finally { 115 try { 116 receiver.close(); 117 }catch(Exception e) { ; } 118 } 119 120 return queueInfo; 121 } 122 123 /** 124 * リスナーの起動 125 * 指定したキュー名に対して、 126 * MessageListenerのリスナーを設定します。 127 * 128 * @param queueName キュー名 129 * @param listener MessageListerを実装したクラス 130 */ 131 @Override 132 public void setListener(final String queueName, final MessageListener listener) { 133 QueueReceiver receiver = null; 134 try { 135 final Queue queue = session.createQueue(queueName); 136 receiver = session.createReceiver(queue); 137 receiver.setMessageListener(listener); 138 139 // リスナーの起動 140 listReceiver.add(receiver); 141 }catch(JMSException e) { 142 throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage()); 143 } 144 } 145 146 /** 147 * クローズリスナー 148 * レシーバーをクローズすることで、 149 * リスナーの処理を終了します。 150 */ 151 public void closeListener() { 152 for(final QueueReceiver receiver: listReceiver) { 153 try { 154 receiver.close(); 155 }catch(Exception e) { ; } 156 } 157 158 // 初期化 159 listReceiver = null; 160 listReceiver = new ArrayList<QueueReceiver>(); 161 } 162 163 /** 164 * クローズ処理 165 * クローズ処理を行います。 166 */ 167 @Override 168 public void close() { 169 if(receiver != null) { 170 try { 171 receiver.close(); 172 }catch(Exception e) { ; } 173 } 174 if(session != null) { 175 try { 176 session.close(); 177 }catch(Exception e) { ; } 178 } 179 if(connection != null) { 180 try { 181 connection.close(); 182 }catch(Exception e) { ; } 183 } 184 } 185 186 /** 187 * バッチ処理判定フラグを設定します。 188 * 189 * @param batchFlg バッチ処理判定フラグ 190 */ 191 public void setBatchFlg(final Boolean batchFlg) { 192 batch = batchFlg; 193 } 194 195 /** 196 * 検証用メソッド 197 * テスト用のメソッドです。 198 * 199 * @param args 引数 200 */ 201 public static void main(final String[] args) { 202 final QueueReceive receive = new QueueReceive_MQ(); 203 final String jmsServer = "tcp://localhost:61616"; 204 205 // バッチフラグにtrueを設定 206 // 未設定の場合は、tomcatのjndi接続処理が実行されます。 207 receive.setBatchFlg(true); 208 209 // 認証情報の設定 210 System.setProperty("mqUserId", "admin"); 211 System.setProperty("mqPassword", "admin"); 212 213 // 接続 214 receive.connect(jmsServer, null, null); 215 216 // 処理対象のキュー名 217 final String queueName = "queue01"; 218 219 220 // ** 1件受信する場合 221 final QueueInfo queueInfo = receive.receive(queueName); 222 if(queueInfo != null) { 223 System.out.println("message:" + queueInfo.getMessage()); 224 }else { 225 System.out.println("キューが登録されていません。"); 226 } 227 228// // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ) 229// // MessageListerを実装した、QueueReceiveListenerクラスを作成します。 230// MessageListener listener = new QueueReceiveListener(); 231// receive.setListener(queueName, listener); 232// // 複数のキューにリスナーを設定することも可能です。 233// receive.setListener("queue02", listener); 234// 235// try { 236// // 1分間リスナーを起動しておく場合の、プロセス待機処理 237// Thread.sleep(60 * 1000); 238// }catch(InterruptedException e) { 239// throw new RuntimeException(e.getMessage()); 240// } 241 242 // リスナー利用時は、closeListenerを実行して、解放してください。 243 receive.closeListener(); 244 245 // 終了処理 246 receive.close(); 247 } 248 249// /** 250// * QueueReceiveリスナークラス 251// * リスナー用のクラスです。 252// * MQに設定することで、メッセージが受信されると、 253// * 自動的にonMessageメソッドが実行されます。 254// * 255// */ 256// static class QueueReceiveListener implements MessageListener { 257// /** 258// * メッセージ受信処理 259// * MQサーバにメッセージが受信されると、 260// * メソッドの処理が行われます。 261// * 262// * @param message 受信メッセージ 263// */ 264// @Override 265// public void onMessage(final Message message) { 266// 267// // メッセージ受信 268// TextMessage msg = (TextMessage) message; 269// String msgText = ""; 270// 271// try { 272// // キューサーバのメッセージを取得 273// msgText = msg.getText(); 274// // メーッセージの受信応答を返します。 275// msg.acknowledge(); 276// 277// System.out.println("message:" + msgText); 278// 279// } catch (JMSException e) { 280// throw new RuntimeException(e.getMessage()); 281// } 282// } 283// } 284 285}