001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.fukurou.process;
017
018import org.opengion.fukurou.util.Argument;
019import org.opengion.fukurou.util.SystemParameter;
020import org.opengion.fukurou.util.LogWriter;
021
022import org.opengion.fukurou.util.HybsEntry ;
023import org.opengion.fukurou.util.Closer;
024import org.opengion.fukurou.db.ConnectionFactory;
025
026import java.util.Set ;
027import java.util.HashSet ;
028import java.util.Map ;
029import java.util.LinkedHashMap ;
030
031import java.sql.Connection;
032import java.sql.Statement;
033import java.sql.ResultSet;
034import java.sql.SQLException;
035
036/**
037 * Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、
038 * ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする
039 * FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。
040 *
041 * このクラスは、上流から、下流への処理は、1度しか実行されません。
042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。
043 * ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。
044 *
045 * FirstProcess では、-action は、query のみです。
046 *   query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。
047 * ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。
048 *   query     は、上記と同じです。
049 *   minus     は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。
050 *   intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。
051 *   bulkSet   は、先のSetを取り出し、SQL文に加味して処理します。
052 * 流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、bulkSet で
053 * 利用します。例えば、ORACLEから、ユニークキーのSetを作成し、SQLServerのユニークキーを
054 * minusした結果を、ORACLEからDELETEすれば、不要なデータを削除するなどの処理が実行可能になります。
055 * また、単純に、query だけを、チェインすれば、単発のUPDATE文を実行することが可能です。
056 *
057 * データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に
058 * 設定された接続(Connection)を使用します。
059 * DBID は、Process_DBParam の -configFile で指定する DBConfig.xml ファイルを使用します。
060 *
061 * 引数文字列中にスペースを含む場合は、ダブルコーテーション("") で括って下さい。
062 * 引数文字列の 『=』の前後には、スペースは挟めません。必ず、-key=value の様に
063 * 繋げてください。
064 *
065 * SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。
066 *
067 * @og.formSample
068 *  Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X"
069 *
070 *     -action=処理方法(必須)      : 実行する処理方法を指定します
071 *                                       -action=query     単なるSQL文を実行します。
072 *                                       -action=bulkSet   実行したSQL文の結果を、Set<String> オブジェクトに設定します。
073 *                                       -action=minus     Set<String> オブジェクトと、ここでの実行結果の差分をとります。
074 *                                       -action=intersect Set<String> オブジェクトと、ここでの実行結果の積分をとります。
075 *   [ -dbid=DB接続ID             ] : -dbid=DBGE (例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定)
076 *   [ -sql=検索SQL文             ] : -sql="select * from GEA08"
077 *   [ -sqlFile=検索SQLファイル   ] : -sqlFile=select.sql
078 *                                       -sql= を指定しない場合は、ファイルで必ず指定してください。
079 *   [ -sql_XXXX=固定値           ] : -sql_SYSTEM_ID=GE
080 *                                       SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。
081 *                                       WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'
082 *   [ -bulkKey=XXXX              ] : -bulkKey=XXXX
083 *                                       SQL文中の{@XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。
084 *                                       WHERE SYSTEM_ID IN ( {@XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' )
085 *   [ -bulkType=NUM|STR          ] : -bulType=STR
086 *                                     Bulkの値を文字列に変換する場合に、数字型か、文字型を指定します。
087 *                                     数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します(初期値:STR)。
088 *   [ -fetchSize=100             ] :フェッチする行数(初期値:100)
089 *   [ -display=[false/true]      ] :結果を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない])
090 *   [ -debug=[false/true]        ] :デバッグ情報を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない])
091 *
092 * @og.rev 5.3.4.0 (2011/04/01) 新規追加
093 * @version  4.0
094 * @author   Kazuhiko Hasegawa
095 * @since    JDK5.0,
096 */
097public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess {
098        private static final int    MAX_BULK_SET        = 500 ;         // ORACLE の制約が 1000 なので。
099
100        private static final String ACT_QUERY           = "query" ;
101        private static final String ACT_BULKSET         = "bulkSet" ;
102        private static final String ACT_MINUS           = "minus" ;
103        private static final String ACT_INTERSECT       = "intersect" ;
104
105        private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT };
106
107        private String          actionCmd       = null;         // SQL結果を加工(query:実行、minus:引き算、intersect:重複分)
108        private String          dbid            = null;         // メインDB接続ID
109
110        private String          bulkKey         = null;
111        private boolean         bulkType        = true;         // true:STR , false:NUM
112
113        private int                     sqlCount        = 0;            // SQL文の処理件数
114        private int                     setCount        = 0;            // 取り出したSetの件数
115        private int                     outCount        = 0;            // マージ後のSetの件数
116
117        private int                     fetchSize       = 100;
118        private boolean         display         = false;        // 表示しない
119        private boolean         debug           = false;        // デバッグ情報
120        private boolean         firstTime       = true;         // 最初の一回目
121
122        private static final Map<String,String> mustProparty   ;                // [プロパティ]必須チェック用 Map
123        private static final Map<String,String> usableProparty ;                // [プロパティ]整合性チェック Map
124
125        static {
126                mustProparty = new LinkedHashMap<String,String>();
127                mustProparty.put( "action",     "実行する処理方法を指定します。(query|minus|intersect)" );
128
129                usableProparty = new LinkedHashMap<String,String>();
130                usableProparty.put( "dbid",     "Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" );
131                usableProparty.put( "sql",              "検索SQL文(sql or sqlFile 必須)例: \"select * from GEA08\"" );
132                usableProparty.put( "sqlFile",  "検索SQLファイル(sql or sqlFile 必須)例: select.sql" );
133                usableProparty.put( "sql_",             "SQL文中の{&#064;XXXX}文字列を指定の固定値で置き換えます。" +
134                                                                        CR + "WHERE SYSTEM_ID='{&#064;SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" );
135                usableProparty.put( "dbid2",    "DB接続ID2 例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" );
136                usableProparty.put( "sql2",             "検索SQL文2(sql or sqlFile 必須)例: \"select * from GEA08\"" );
137                usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile 必須)例: select.sql" );
138                usableProparty.put( "sql2_",    "SQL文2中の{&#064;XXXX}文字列を指定の固定値で置き換えます。" +
139                                                                        CR + "WHERE SYSTEM_ID='{&#064;SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" );
140                usableProparty.put( "bulkKey",          "SQL文中の{&#064;XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。" +
141                                                                        CR + "WHERE SYSTEM_ID IN ( {&#064;XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" );
142                usableProparty.put( "bulkType",         "Bulkの値を文字列に変換する場合に、文字型か、数字型を指定します。" +
143                                                                        CR + "数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します。(初期値:STR)" );
144                usableProparty.put( "fetchSize","フェッチする行数 (初期値:100)" );
145                usableProparty.put( "display",  "結果を標準出力に表示する(true)かしない(false)か" +
146                                                                                CR + "(初期値:false:表示しない)" );
147                usableProparty.put( "debug",    "デバッグ情報を標準出力に表示する(true)かしない(false)か" +
148                                                                                CR + "(初期値:false:表示しない)" );
149        }
150
151        /**
152         * デフォルトコンストラクター。
153         * このクラスは、動的作成されます。デフォルトコンストラクターで、
154         * super クラスに対して、必要な初期化を行っておきます。
155         *
156         */
157        public Process_BulkQuery() {
158                super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty );
159        }
160
161        /**
162         * プロセスの初期化を行います。初めに一度だけ、呼び出されます。
163         * 初期処理(ファイルオープン、DBオープン等)に使用します。
164         *
165         * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加
166         *
167         * @param   paramProcess データベースの接続先情報などを持っているオブジェクト
168         */
169        public void init( final ParamProcess paramProcess ) {
170                Argument arg = getArgument();
171
172                actionCmd       = arg.getProparty("action" , null , ACTION_LST );
173
174                fetchSize       = arg.getProparty("fetchSize",fetchSize);
175                display         = arg.getProparty("display",display);
176                debug           = arg.getProparty("debug",debug);
177
178                dbid            = arg.getProparty("dbid");
179                String sql      = arg.getFileProparty("sql","sqlFile",true);
180                if( debug ) { println( "入力SQL:" + sql ); }
181
182                HybsEntry[] entry       =arg.getEntrys( "sql_" );               //配列
183                SystemParameter sysParam = new SystemParameter( sql );
184                sql = sysParam.replace( entry );
185                if( debug ) { println( "変換SQL:" + sql ); }
186
187                if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) {
188                        bulkKey         = arg.getProparty("bulkKey");
189                        String bkType = arg.getProparty("bulkType");
190                        if( bkType != null ) { bulkType = "STR".equalsIgnoreCase( bkType ); }   // 初期値が true なので、null チャックは外せません。
191
192                        Set<String> setData = paramProcess.getBulkData();
193                        if( debug ) { println( setData.toString() ); }
194                        setCount = setData.size();
195
196                        if( setCount > 0 ) {
197                                // 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加
198                                String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData );
199                                for( int i=0; i<sqls.length; i++ ) {
200                                        if( debug ) { println( "BulkSQL:" + sqls[i] ); }
201                                        createSetData( paramProcess, dbid, sqls[i] );
202                                }
203                        }
204                }
205                else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) {
206                        Set<String> setData2 = createSetData( paramProcess, dbid, sql );
207                        if( debug ) { println( setData2.toString() ); }
208                        setCount = setData2.size();
209                        outCount = setCount;
210                        paramProcess.setBulkData( setData2 );
211                }
212                else {
213                        Set<String> setData = paramProcess.getBulkData();
214                        Set<String> setData2 = createSetData( paramProcess, dbid, sql );
215                        setCount = setData2.size();
216
217                        if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) {
218                                setData.removeAll( setData2 );
219                        }
220                        else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) {
221                                setData.retainAll( setData2 );
222                        }
223                        outCount = setData.size();
224                        if( debug ) { println( setData.toString() ); }
225                        paramProcess.setBulkData( setData );
226                }
227        }
228
229        /**
230         * プロセスの終了を行います。最後に一度だけ、呼び出されます。
231         * 終了処理(ファイルクローズ、DBクローズ等)に使用します。
232         *
233         * @param   isOK トータルで、OKだったかどうか [true:成功/false:失敗]
234         */
235        public void end( final boolean isOK ) {
236                // 何もありません。
237        }
238
239        /**
240         * このデータの処理において、次の処理が出来るかどうかを問い合わせます。
241         * この呼び出し1回毎に、次のデータを取得する準備を行います。
242         *
243         * @return      処理できる:true / 処理できない:false
244         */
245        public boolean next() {
246                return firstTime;
247        }
248
249        /**
250         * 引数の LineModel を処理するメソッドです。
251         * 変換処理後の LineModel を返します。
252         * 後続処理を行わない場合(データのフィルタリングを行う場合)は、
253         * null データを返します。つまり、null データは、後続処理を行わない
254         * フラグの代わりにも使用しています。
255         * なお、変換処理後の LineModel と、オリジナルの LineModel が、
256         * 同一か、コピー(クローン)かは、各処理メソッド内で決めています。
257         * ドキュメントに明記されていない場合は、副作用が問題になる場合は、
258         * 各処理ごとに自分でコピー(クローン)して下さい。
259         *
260         * @param       data    オリジナルのLineModel
261         *
262         * @return      処理変換後のLineModel
263         */
264        @SuppressWarnings(value={"unchecked"})
265        public LineModel action( final LineModel data ) {
266                return data ;
267        }
268
269        /**
270         * 最初に、 行データである LineModel を作成します
271         * FirstProcess は、次々と処理をチェインしていく最初の行データを
272         * 作成して、後続の ChainProcess クラスに処理データを渡します。
273         *
274         * @param       rowNo   処理中の行番号
275         *
276         * @return      処理変換後のLineModel
277         */
278        public LineModel makeLineModel( final int rowNo ) {
279                firstTime = false;              // 一度しか処理しないため、false を設定する。
280
281                LineModel model = new LineModel();
282
283                model.setRowNo( rowNo );
284
285                return model;
286        }
287
288        /**
289         * 内部で使用する Set オブジェクトを作成します。
290         * Exception 以外では、必ず Set<String> オブジェクトを返します。
291         *
292         * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加
293         *
294         * @param   paramProcess        データベースの接続先情報などを持っているオブジェクト
295         * @param   dbid                        接続先ID
296         * @param   sql                         実行するSQL文(検索系)
297         *
298         * @return      実行結果から取り出した、最初のカラムのみを集めた Setオブジェクト
299         * @throws RuntimeException データベース処理ができなかった場合。
300         */
301        private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) {
302                Set<String> data = new HashSet<String>();
303
304                Connection connection   = null;
305                Statement  stmt                 = null;
306                ResultSet  resultSet    = null;
307
308                try {
309                        connection = paramProcess.getConnection( dbid );
310                        stmt = connection.createStatement();
311                        if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); }
312                        if( stmt.execute( sql ) ) {                     // true:検索系 , false:更新系
313                                resultSet = stmt.getResultSet();
314                                while( resultSet.next() ) {
315                                        sqlCount++ ;
316                                        String str = resultSet.getString(1);
317                                        if( display ) { println( str ); }
318                                        data.add( str );
319                                }
320                        }
321                        else {
322                                sqlCount += stmt.getUpdateCount();
323                        }
324                }
325                catch (SQLException ex) {
326                        String errMsg = "SQL を実行できませんでした。" + CR
327                                        + "errMsg=[" + ex.getMessage() + "]" + CR
328                                        + "errorCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR
329                                        + "DBID=" + dbid + CR
330                                        + "SQL =" + sql ;
331
332                        throw new RuntimeException( errMsg,ex );
333                }
334                finally {
335                        Closer.resultClose( resultSet );
336                        Closer.stmtClose( stmt );
337
338                        ConnectionFactory.remove( connection,dbid );
339                }
340                return data;
341        }
342
343        /**
344         * 内部で使用する Set オブジェクトを作成します。
345         * Exception 以外では、必ず Set<String[]> オブジェクトを返します。
346         *
347         * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加
348         *
349         * @param       sql                     オリジナルのSQL文
350         * @param       bulkKey         一括処理で置き換えるキー文字列
351         * @param       bulkType        文字型(true)か、数字型(false)を指定
352         * @param   setData             一括処理の元となるSetオブジェクト
353         *
354         * @return      オリジナルのSQL文 に 一括処理の文字列と置換したSQL文の配列
355         */
356        private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) {
357                String[] sqls = new String[ setData.size()/MAX_BULK_SET + 1 ];
358                int idx = 0;
359                int cnt = 0;
360
361                StringBuilder buf = new StringBuilder();
362                String bulkVal = null;
363                if( bulkType ) {                        // 文字列の場合
364                        for( String key : setData ) {
365                                cnt++;
366                                buf.append( ",'" ).append( key ).append( "'" );
367                                if( cnt >= MAX_BULK_SET ) {
368                                        bulkVal = buf.substring( 1 );   // 先頭のコロンをはずす
369                                        sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
370                                        cnt = 0;
371                                        buf = new StringBuilder();
372                                }
373                        }
374                        if( cnt > 0 ) {         // きっちりで終わらない場合
375                                bulkVal = buf.substring( 1 );   // 先頭のコロンをはずす
376                                sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
377                        }
378                }
379                else {                                          // 数字の場合
380                        for( String key : setData ) {
381                                cnt++;
382                                buf.append( "," ).append( key );
383                                if( cnt >= MAX_BULK_SET ) {
384                                        bulkVal = buf.substring( 1 );   // 先頭のコロンをはずす
385                                        sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
386                                        cnt = 0;
387                                        buf = new StringBuilder();
388                                }
389                        }
390                        if( cnt > 0 ) {         // きっちりで終わらない場合
391                                bulkVal = buf.substring( 1 );   // 先頭のコロンをはずす
392                                sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
393                        }
394                }
395
396                return sqls;
397        }
398
399        /**
400         * プロセスの処理結果のレポート表現を返します。
401         * 処理プログラム名、入力件数、出力件数などの情報です。
402         * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような
403         * 形式で出してください。
404         *
405         * @return   処理結果のレポート
406         */
407        public String report() {
408                String report = "[" + getClass().getName() + "]" + CR
409                                + TAB + "Action    : " + actionCmd      + CR
410                                + TAB + "DBID      : " + dbid           + CR
411                                + TAB + "sqlCount  : " + sqlCount       + CR
412                                + TAB + "setCount  : " + setCount       + CR
413                                + TAB + "outCount  : " + outCount ;
414
415                return report ;
416        }
417
418        /**
419         * このクラスの使用方法を返します。
420         *
421         * @return      このクラスの使用方法
422         */
423        public String usage() {
424                StringBuilder buf = new StringBuilder();
425
426                buf.append( "Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、"            ).append( CR );
427                buf.append( "ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする"                       ).append( CR );
428                buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。"      ).append( CR );
429                buf.append( CR );
430                buf.append( "このクラスは、上流から、下流への処理は、1度しか実行されません。"                                  ).append( CR );
431                buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。"  ).append( CR );
432                buf.append( "ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。"              ).append( CR );
433                buf.append( CR );
434                buf.append( "FirstProcess では、-action は、query のみです。"                                                                     ).append( CR );
435                buf.append( "  query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。"                  ).append( CR );
436                buf.append( "ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。"  ).append( CR );
437                buf.append( "  query     は、上記と同じです。"                                                                                                    ).append( CR );
438                buf.append( "  minus     は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。"        ).append( CR );
439                buf.append( "  intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。"         ).append( CR );
440                buf.append( "  bulkSet   は、先のSetを取り出し、SQL文に加味して処理します。"                                  ).append( CR );
441                buf.append( CR );
442                buf.append( "流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、"                ).append( CR );
443                buf.append( "bulkSet で利用します。例えば、ORACLEから、ユニークキーのSetを作成し、"                               ).append( CR );
444                buf.append( "SQLServerのユニークキーをminusした結果を、ORACLEからDELETEすれば、不要な"                 ).append( CR );
445                buf.append( "データを削除するなどの処理が実行可能になります。また、単純に、query だけを、"         ).append( CR );
446                buf.append( "チェインすれば、単発のUPDATE文を実行することが可能です。"                                                   ).append( CR );
447                buf.append( CR );
448                buf.append( "データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に"                 ).append( CR );
449                buf.append( "設定された接続(Connection)を使用します。"                                                                                ).append( CR );
450                buf.append( CR );
451                buf.append( "引数文字列中に空白を含む場合は、ダブルコーテーション(\"\") で括って下さい。" ).append( CR );
452                buf.append( "引数文字列の 『=』の前後には、空白は挟めません。必ず、-key=value の様に"                ).append( CR );
453                buf.append( "繋げてください。"                                                                                                                          ).append( CR );
454                buf.append( CR );
455                buf.append( "SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。"                                               ).append( CR );
456                buf.append( CR ).append( CR );
457
458                buf.append( getArgument().usage() ).append( CR );
459
460                return buf.toString();
461        }
462
463        /**
464         * このクラスは、main メソッドから実行できません。
465         *
466         * @param       args    コマンド引数配列
467         */
468        public static void main( final String[] args ) {
469                LogWriter.log( new Process_BulkQuery().usage() );
470        }
471}