001 /* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 package org.opengion.fukurou.process; 017 018 import org.opengion.fukurou.util.Argument; 019 import org.opengion.fukurou.util.SystemParameter; 020 import org.opengion.fukurou.util.StringUtil; 021 import org.opengion.fukurou.util.LogWriter; 022 import org.opengion.fukurou.util.HybsEntry ; 023 import org.opengion.fukurou.util.Closer; 024 import org.opengion.fukurou.model.Formatter; 025 import org.opengion.fukurou.db.ConnectionFactory; 026 027 import java.util.Map ; 028 import java.util.LinkedHashMap ; 029 import java.util.Set ; 030 import java.util.HashSet ; 031 032 import java.sql.Connection; 033 import java.sql.PreparedStatement; 034 import java.sql.ParameterMetaData; 035 import java.sql.SQLException; 036 037 /** 038 * Process_DBWriter は、上流から受け取ったデータをデータベ?スに書き込? 039 * CainProcess インターフェースの実?ラスです? 040 * 041 * 上?プロセスチェインの??タは上流から下流へと渡されます?)から受け取っ? 042 * LineModel を?に、データベ?スへの書き込みを行います? 043 * 044 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に 045 * 設定された接?Connection)を使用します? 046 * 047 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ?? 048 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に 049 * 繋げてください? 050 * 051 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます? 052 * 053 * @og.formSample 054 * Process_DBWriter -dbid=DBGE -table=GE41 055 * 056 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? 057 * [ -table=登録??ブルID ] ???????する?合?不要?INSERT する場合???ブルID 058 * [ -sql=検索SQL? ] ??-sql="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] 059 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]" 060 * [ -sqlFile=登録SQLファイル ] ??-sqlFile=update.sql 061 * ?? -sql ?-sqlFile が指定されな??合??table で????ブルに全カラ?insert です? 062 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE 063 * SQL?の{@XXXX}??を指定?固定?で置き換えます? 064 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE' 065 * [ -const_XXXX=固定? ] ??-const_FGJ=1 066 * LineModel のキー(const_ に続く??)の値に、固定?を設定します? 067 * キーが異なれ?、?のカラ?を指定できます? 068 * [ -omitClms=AAA,BBB,… ] ??-omitClms=UNIQ,FGJ,DYSET 069 * -table 属?でINSERT?自動作?する場合?取り除くカラ?? 070 * カンマ区?で??できます? 071 * [ -commitCnt=commit処?定] ???数毎にコミットを発行します?0 の場合?、終?でコミットしません? 072 * [ -display=false|true ] ??結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 073 * 074 * @version 4.0 075 * @author Kazuhiko Hasegawa 076 * @since JDK5.0, 077 */ 078 public class Process_DBWriter extends AbstractProcess implements ChainProcess { 079 private static final String CNST_KEY = "const_" ; 080 private static final String SQL_KEY = "sql_" ; 081 082 private Connection connection = null; 083 private PreparedStatement pstmt = null; 084 private ParameterMetaData pMeta = null; // 5.1.1.0 (2009/11/11) setObject に、Type を渡す?(PostgreSQL対? 085 private boolean useParamMetaData = false; // 5.1.1.0 (2009/11/11) setObject に、Type を渡す?(PostgreSQL対? 086 087 private String dbid = null; 088 private String sql = null; 089 private String table = null; 090 private int[] clmNos = null; // ファイルのヘッ??のカラ?号 091 private int commitCnt = 0; // コミットするまとめ件数 092 private boolean display = false; // 表示しな? 093 094 private String[] cnstClm = null; // 固定?を設定するカラ? 095 private int[] cnstClmNos = null; // 固定?を設定するカラ?号 096 private String[] constVal = null; // カラ?号に対応した固定? 097 098 private boolean firstRow = true; // ??の?目 099 private int count = 0; 100 private String[] omitClms = null; // 4.0.0.0 (2007/09/21) table ?時に取り除くカラ? 101 102 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map 103 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map 104 105 static { 106 mustProparty = new LinkedHashMap<String,String>(); 107 108 usableProparty = new LinkedHashMap<String,String>(); 109 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 110 usableProparty.put( "table", "INSERT する場合???ブルID SQL??する?合?不要?" ); 111 usableProparty.put( "sql", "更新SQL?sql or sqlFile ??)" + 112 CR + "? \"UPDATE GE41 " + 113 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " + 114 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" ); 115 usableProparty.put( "sqlFile", "登録SQLファイル(sql or sqlFile ??)? update.sql" ); 116 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" + 117 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 118 usableProparty.put( "const_", "LineModel のキー(const_ に続く??)の値に、固定?? + 119 CR + "設定します?キーが異なれ?、?のカラ?を指定できます?" + 120 CR + "? -sql_SYSTEM_ID=GE" ); 121 // 4.0.0.0 (2007/09/21) 属?を追? 122 usableProparty.put( "omitClms", "-table 属?でINSERT?自動作?する場合?取り除くカラ?? + 123 CR + "カンマ区?で??できます?" + 124 CR + "? -omitClms=UNIQ,FGJ,DYSET" ); 125 usableProparty.put( "commitCnt", "?数毎にコミットを発行します?" + 126 CR + "0 の場合?、終?でコミットしません(初期値:0)" ); 127 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? + 128 CR + "(初期値:false:表示しな?" ); 129 } 130 131 /** 132 * ?ォルトコンストラクター? 133 * こ?クラスは、動??されます??ォルトコンストラクターで? 134 * super クラスに対して、?な初期化を行っておきます? 135 * 136 */ 137 public Process_DBWriter() { 138 super( "org.opengion.fukurou.process.Process_DBWriter",mustProparty,usableProparty ); 139 } 140 141 /** 142 * プロセスの初期化を行います?初めに??、呼び出されます? 143 * 初期処?ファイルオープン??オープン?に使用します? 144 * 145 * @og.rev 4.0.0.0 (2007/09/21) omitClms 属?を追? 146 * @og.rev 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 147 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData ?ConnectionFactory経由で取得?(PostgreSQL対? 148 * 149 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 150 */ 151 public void init( final ParamProcess paramProcess ) { 152 Argument arg = getArgument(); 153 154 table = arg.getProparty("table"); 155 sql = arg.getFileProparty("sql","sqlFile",false); 156 commitCnt = arg.getProparty("commitCnt",commitCnt); 157 display = arg.getProparty("display",display); 158 159 dbid = arg.getProparty("dbid"); 160 connection = paramProcess.getConnection( dbid ); 161 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 162 // useParamMetaData = ApplicationInfo.useParameterMetaData( connection ); 163 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01) 164 165 // 取り除くカラ?リストを配?に変換します? 166 String tempClms = arg.getProparty("omitClms",null); 167 if( tempClms != null ) { 168 omitClms = StringUtil.csv2Array( tempClms ); 169 } 170 171 if( sql == null && table == null ) { 172 String errMsg = "sql を指定しな??合?、table を??してください?; 173 throw new RuntimeException( errMsg ); 174 } 175 176 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処??追? 177 // {@DATE.YMDH} などの??を?yyyyMMddHHmmss 型?日付に置き換えます? 178 // SQL?? {@XXXX} ??の固定?への置き換? 179 HybsEntry[] entry =arg.getEntrys(SQL_KEY); // 配? 180 SystemParameter sysParam = new SystemParameter( sql ); 181 sql = sysParam.replace( entry ); 182 183 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配? 184 int csize = cnstKey.length; 185 cnstClm = new String[csize]; 186 constVal = new String[csize]; 187 for( int i=0; i<csize; i++ ) { 188 cnstClm[i] = cnstKey[i].getKey(); 189 constVal[i] = cnstKey[i].getValue(); 190 } 191 } 192 193 /** 194 * プロセスの終?行います??に??、呼び出されます? 195 * 終???ファイルクローズ??クローズ?に使用します? 196 * 197 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処?追? 198 * @og.rev 5.1.1.0 (2009/11/11) pMeta のクリア 199 * 200 * @param isOK ト?タルで、OK?たかど?[true:成功/false:失敗] 201 */ 202 public void end( final boolean isOK ) { 203 boolean flag = Closer.stmtClose( pstmt ); 204 pstmt = null; 205 pMeta = null; // 5.1.1.0 (2009/11/11) 206 207 if( isOK ) { 208 Closer.commit( connection ); 209 } 210 else { 211 Closer.rollback( connection ); 212 } 213 ConnectionFactory.remove( connection,dbid ); 214 215 if( !flag ) { 216 String errMsg = "ス??トメントをクローズ出来ません?; 217 throw new RuntimeException( errMsg ); 218 } 219 } 220 221 /** 222 * 引数の LineModel を??るメソ?です? 223 * 変換処?? LineModel を返します? 224 * 後続??行わな?????タのフィルタリングを行う場?は? 225 * null ??タを返します?つまり?null ??タは、後続??行わな? 226 * フラグの代わりにも使用して?す? 227 * なお?変換処?? LineModel と、オリジナルの LineModel が? 228 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す? 229 * ドキュメントに明記されて???合?、副作用が問題になる?合?? 230 * ???とに自?コピ?(クローン)して下さ?? 231 * 232 * @og.rev 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 233 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対?PostgreSQL対? 234 * 235 * @param data オリジナルのLineModel 236 * 237 * @return 処?換後?LineModel 238 */ 239 public LineModel action( final LineModel data ) { 240 count++ ; 241 // if( display ) { println( data.dataLine() ); } 242 try { 243 if( firstRow ) { 244 pstmt = makePrepareStatement( table,data ); 245 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 246 if( useParamMetaData ) { 247 pMeta = pstmt.getParameterMetaData(); 248 } 249 250 int size = cnstClm.length; 251 cnstClmNos = new int[size]; 252 for( int i=0; i<size; i++ ) { 253 cnstClmNos[i] = data.getColumnNo( cnstClm[i] ); 254 } 255 256 firstRow = false; 257 } 258 259 // 固定?置き換え?? 260 for( int j=0; j<cnstClmNos.length; j++ ) { 261 data.setValue( cnstClmNos[j],constVal[j] ); 262 } 263 264 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 265 if( useParamMetaData ) { 266 for( int i=0; i<clmNos.length; i++ ) { 267 int type = pMeta.getParameterType( i+1 ); 268 // 5.3.8.0 (2011/08/01) setNull 対? 269 // pstmt.setObject( i+1,data.getValue(clmNos[i]),type ); 270 Object val = data.getValue(clmNos[i]); 271 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) { 272 pstmt.setNull( i+1, type ); 273 } 274 else { 275 pstmt.setObject( i+1, val, type ); 276 } 277 } 278 } 279 else { 280 for( int i=0; i<clmNos.length; i++ ) { 281 pstmt.setObject( i+1,data.getValue(clmNos[i]) ); 282 } 283 } 284 285 pstmt.execute(); 286 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) { 287 Closer.commit( connection ); 288 } 289 } 290 catch (SQLException ex) { 291 String errMsg = "sql=[" + sql + "]" + CR + 292 "errorCode=[" + ex.getErrorCode() + "] State=[" + 293 ex.getSQLState() + "]" + CR ; 294 throw new RuntimeException( errMsg,ex ); 295 } 296 if( display ) { println( data.dataLine() ); } // 5.1.2.0 (2010/01/01) display の条件変更 297 return data; 298 } 299 300 /** 301 * ?で使用する PreparedStatement を作?します? 302 * 引数?? SQL また?、LineModel から作?した SQL より構築します? 303 * 304 * @og.rev 4.0.0.0 (2007/09/21) omitClms 属?を追? 305 * 306 * @param table 処?象の??ブルID 307 * @param data 処?象のLineModel 308 * 309 * @return PreparedStatementオブジェク? 310 */ 311 private PreparedStatement makePrepareStatement( final String table,final LineModel data ) { 312 if( sql == null ) { 313 StringBuilder buf = new StringBuilder(); 314 String[] names = data.getNames(); 315 316 // カラ?取り除く?? 317 if( omitClms != null ) { 318 Set<String> set = new HashSet<String>(); 319 for( int i=0; i<names.length; i++ ) { 320 set.add( names[i] ); 321 } 322 for( int i=0; i<omitClms.length; i++ ) { 323 set.remove( omitClms[i] ); 324 } 325 names = set.toArray( new String[set.size()] ); 326 } 327 int size = names.length; 328 329 buf.append( "INSERT INTO " ).append( table ).append( " (" ); 330 buf.append( names[0] ); 331 for( int i=1; i<size; i++ ) { 332 buf.append( "," ).append( names[i] ); 333 } 334 buf.append( " ) VALUES ( ?" ); 335 for( int i=1; i<size; i++ ) { 336 buf.append( ",?" ); 337 } 338 buf.append( " )" ); 339 sql = buf.toString(); 340 341 // カラ?号を設定します? 342 clmNos = new int[size]; 343 for( int i=0; i<size; i++ ) { 344 clmNos[i] = data.getColumnNo( names[i] ); // 4.0.0.0 (2007/09/21) 345 } 346 } 347 else { 348 Formatter format = new Formatter( data ); 349 format.setFormat( sql ); 350 sql = format.getQueryFormatString(); 351 clmNos = format.getClmNos(); 352 } 353 354 final PreparedStatement ps ; 355 try { 356 ps = connection.prepareStatement( sql ); 357 } 358 catch (SQLException ex) { 359 String errMsg = "PreparedStatement を取得できませんでした? + CR 360 + "sql=[" + sql + "]" + CR 361 + "table=[" + table + "]" + CR 362 + "nameLine=[" + data.nameLine() + "]" ; 363 throw new RuntimeException( errMsg,ex ); 364 } 365 366 return ps; 367 } 368 369 /** 370 * プロセスの処?果のレポ?ト表現を返します? 371 * 処??ログラ?、?力件数、?力件数などの??です? 372 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ? 373 * 形式で出してください? 374 * 375 * @return 処?果のレポ?? 376 */ 377 public String report() { 378 String report = "[" + getClass().getName() + "]" + CR 379 + TAB + "DBID : " + dbid + CR 380 + TAB + "Output Count : " + count ; 381 382 return report ; 383 } 384 385 /** 386 * こ?クラスの使用方法を返します? 387 * 388 * @return こ?クラスの使用方? 389 */ 390 public String usage() { 391 StringBuilder buf = new StringBuilder(); 392 393 buf.append( "Process_DBWriter は、上流から受け取ったデータをデータベ?スに書き込? ).append( CR ); 394 buf.append( "CainProcess インターフェースの実?ラスです?" ).append( CR ); 395 buf.append( CR ); 396 buf.append( "上?プロセスチェインの??タは上流から下流へと渡されます?)から" ).append( CR ); 397 buf.append( "受け取っ?LineModel を?に、データベ?スへの書き込みを行います?" ).append( CR ); 398 buf.append( CR ); 399 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 400 buf.append( "設定された接?Connection)を使用します?" ).append( CR ); 401 buf.append( CR ); 402 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR ); 403 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR ); 404 buf.append( "繋げてください? ).append( CR ); 405 buf.append( CR ); 406 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR ); 407 buf.append( CR ).append( CR ); 408 buf.append( getArgument().usage() ).append( CR ); 409 410 return buf.toString(); 411 } 412 413 /** 414 * こ?クラスは、main メソ?から実行できません? 415 * 416 * @param args コマンド引数配? 417 */ 418 public static void main( final String[] args ) { 419 LogWriter.log( new Process_DBWriter().usage() ); 420 } 421 }