博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在 MongoDB 中查找缺失的数据包
阅读量:7071 次
发布时间:2019-06-28

本文共 13158 字,大约阅读时间需要 43 分钟。

 

1 package syncPacker;  2   3 import java.io.BufferedReader;  4 import java.io.File;  5 import java.io.FileReader;  6 import java.io.FileWriter;  7 import java.io.IOException;  8 import java.net.UnknownHostException;  9 import java.util.ArrayList; 10 import java.util.Date; 11 import java.util.HashMap; 12 import java.util.List; 13 import java.util.Map; 14  15 import org.apache.http.message.BasicNameValuePair; 16 import org.apache.log4j.Logger; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.stereotype.Service; 20  21 import com.mongodb.BasicDBObject; 22 import com.mongodb.DB; 23 import com.mongodb.DBCollection; 24 import com.mongodb.DBObject; 25 import com.mongodb.Mongo; 26 import com.pro.framework.action.BaseController; 27  28 import logCenter.SendLog; 29 import net.sf.json.JSONObject; 30 import syncPacker.bean.PatentBibliographicChangeBean; 31 import syncPacker.bean.SyncDataPackageBean; 32 import utils.DateUtils; 33 import utils.DatetimeUtils; 34 import utils.HttpUtils; 35  36 /** 37  * 增量数据分块打包 全处理 38  *  39  * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action 40  *  41  * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean. 42  * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000 43  */ 44 @Service 45 public class SyncPacker_201603 extends BaseController { 46  47     private static final long serialVersionUID = 1L; 48  49     // 初始化:数据库接口 50     @Autowired 51     private SyncPackerDao dao; 52  53     // 初始化:发送端地址 54     @Value("${url_syncSender}") 55     private String url_syncSender; 56  57     // 初始化:本地文件存储路径 58     @Value("${path_syncPacker_package}") 59     private String path_syncPacker_package; 60  61     // 初始化:读取最大数据包名称的地址 62     @Value("${url_selectMaxPackageNumber}") 63     private String url_selectMaxPackageNumber; 64  65     // 初始化:存储最大数据包名称的地址 66     @Value("${url_insertSyncDataPackage}") 67     private String url_insertSyncDataPackage; 68  69     // 初始化:查询条件Bean 70     private SyncDataPackageBean bean = new SyncDataPackageBean(); 71     // 初始化:查询结果List 72     private List
pbcList = new ArrayList
(); 73 // 初始化:形成的数据包名称 74 private List
pbcList_withPackageName = new ArrayList
(); 75 // 初始化:已打包的增量数据ID表单 76 private List
pdaIdList = new ArrayList
(); 77 // 初始化:用于删除数据的临时ID List 78 private List
idPartList = new ArrayList
(); 79 // 初始化:传输协议 80 private HttpUtils httpUtils = new HttpUtils(); 81 // 初始化:键值串 82 private List
paramList = new ArrayList
(); 83 // 初始化:历史最大包编号 84 private String maxPackageNumber; 85 // 初始化:记录打包完成后的数据版本 86 private Integer centerNodeDataVersion; 87 // 发送远程日志 88 private SendLog sendLog = new SendLog(); 89 // 记录本地日志 90 private Logger logger = Logger.getLogger(SyncPacker_201603.class); 91 // 初始化:判断程序是否正在运行 92 public static boolean isRunning = false; 93 94 // 本次处理完成后的最大包编号 95 private String packedPackageNumber; 96 // 用于返回json的成功信息 97 private String success = "success"; 98 99 100 /** 文件补发用:指定数据重新打包 */101 // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102102 public String packByPackageNumber() throws Exception {103 104 logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() );105 106 String job_start = DateUtils.getCurrentTimeString(0);107 108 try {109 110 111 for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 112 i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) { 113 114 // 开始时间115 String package_start = DateUtils.getCurrentTimeString(0);116 117 // 包编号118 String packageNumber = String.format( "%06d", i );119 120 logMemory("开始,包编号:", packageNumber );121 122 //(1)读历史表123 pbcList = selectList_changeHistory( packageNumber ); 124 125 if( null == pbcList ) {126 logMemory("
数据为空 !!!", "");127 128 } else{129 logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));130 131 //(2)插入MongoDB132 insertMongoDB( pbcList );133 pbcList.clear();134 };135 136 logMemory("传输结束,包编号:" + packageNumber ," 用时: "137 + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) );138 }139 } catch (Exception e) {140 // 日志输出141 logMemory("系统发生异常", e.getMessage());142 e.printStackTrace();143 }144 145 logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd()146 + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) );147 148 return SUCCESS;149 }150 151 152 /**153 * 读历史表154 * @param packageNumber155 * @return156 */157 private List
selectList_changeHistory( String packageNumber ){158 159 for ( int i = 0; i < 100; i++ ) {160 161 try {162 return dao.selectList_changeHistory( packageNumber ); 163 } catch (Exception e) {164 // TODO Auto-generated catch block165 // e.printStackTrace();166 logMemory("系统发生异常", e.getMessage());167 try {168 Thread.sleep(500);169 logMemory("暂停 0.5 秒", "" );170 } catch (InterruptedException e1) {171 // TODO Auto-generated catch block172 e1.printStackTrace();173 }174 } 175 }// loop end176 return null ;177 178 }179 180 private PatentBibliographicChangeBean select_changeHistory( String packageNumber ){181 182 for ( int i = 0; i < 100; i++ ) {183 184 try {185 return dao.select_changeHistory( packageNumber ); 186 } catch (Exception e) {187 // TODO Auto-generated catch block188 // e.printStackTrace();189 logMemory("系统发生异常", e.getMessage());190 try {191 Thread.sleep(500);192 logMemory("暂停 0.5 秒", "" );193 } catch (InterruptedException e1) {194 // TODO Auto-generated catch block195 e1.printStackTrace();196 }197 } 198 }// loop end199 return null ;200 201 }202 203 204 205 206 /**207 * 插入 MongoDB208 */209 public void insertMongoDB( List
pbcList ) { 210 211 //# MongoDB(数据加载目标)212 String syncLoadIntoMongoDbService = "10.78.2.23:27017";213 String syncLoadIntoMongoDbName = "patent_search_extend";214 String syncLoadIntoMongoTable = "patent_bibliographic_20160319";215 216 // 加载开始217 // logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );218 219 Mongo m = null;220 try {221 m = new Mongo( syncLoadIntoMongoDbService );222 } catch (UnknownHostException e) {223 e.printStackTrace();224 logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());225 }226 227 // 库名228 DB db = m.getDB( syncLoadIntoMongoDbName );229 // logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );230 231 // 表名232 DBCollection collection = db.getCollection( syncLoadIntoMongoTable );233 // logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );234 235 // 循环列表,将每个元素插入数据库236 for( PatentBibliographicChangeBean pbcBean : pbcList ){237 238 239 240 //(1)读取一条著录数据241 // JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() );242 243 // if( null == json ) return ;244 245 // 列,值246 BasicDBObject insDoc = new BasicDBObject();247 248 insDoc.put("abstract_No" , pbcBean.getAbstract_No() );249 insDoc.put("app_Addr" , pbcBean.getApp_Addr() );250 insDoc.put("app_Cn" , pbcBean.getApp_Cn() );251 insDoc.put("app_Country" , pbcBean.getApp_Country() );252 insDoc.put("app_Date" , pbcBean.getApp_Date() );253 insDoc.put("app_Name" , pbcBean.getApp_Name() );254 insDoc.put("app_Sn" , pbcBean.getApp_Sn() );255 insDoc.put("app_Type" , pbcBean.getApp_Type() );256 insDoc.put("app_Zip" , pbcBean.getApp_Zip() );257 insDoc.put("ecla" , pbcBean.getEcla() );258 insDoc.put("fi" , pbcBean.getFi() );259 insDoc.put("ft" , pbcBean.getFt() );260 insDoc.put("id" , pbcBean.getId() );261 insDoc.put("inv_Title" , pbcBean.getInv_Title() );262 insDoc.put("invent_Type" , pbcBean.getInvent_Type() );263 insDoc.put("inventor" , pbcBean.getInventor() );264 insDoc.put("ipc_Standard" , pbcBean.getIpc_Standard() );265 insDoc.put("locarno" , pbcBean.getLocarno() );266 insDoc.put("operation_Time" , pbcBean.getOperation_Time() );267 insDoc.put("operation_Type" , pbcBean.getOperation_Type() );268 insDoc.put("package_Number" , pbcBean.getPackage_Number() );269 insDoc.put("pct_App_Cn" , pbcBean.getPct_App_Cn() );270 insDoc.put("pct_App_Date" , pbcBean.getPct_App_Date() );271 insDoc.put("pct_App_Sn" , pbcBean.getPct_App_Sn() );272 insDoc.put("pct_Date" , pbcBean.getPct_Date() );273 insDoc.put("pct_Pub_Cn" , pbcBean.getPct_Pub_Cn() );274 insDoc.put("pct_Pub_Date" , pbcBean.getPct_Pub_Date() );275 insDoc.put("pct_Pub_Lang" , pbcBean.getPct_Pub_Lang() );276 insDoc.put("pct_Pub_Sn" , pbcBean.getPct_Pub_Sn() );277 insDoc.put("prn" , pbcBean.getPrn() );278 insDoc.put("prn_Cn" , pbcBean.getPrn_Cn() );279 insDoc.put("prn_Date" , pbcBean.getPrn_Date() );280 insDoc.put("prn_Sn" , pbcBean.getPrn_Sn() );281 insDoc.put("prn_Type" , pbcBean.getPrn_Type() );282 insDoc.put("pub_Cn" , pbcBean.getPub_Cn() );283 insDoc.put("pub_Date" , pbcBean.getPub_Date() );284 insDoc.put("pub_Sn" , pbcBean.getPub_Sn() );285 insDoc.put("pub_Type" , pbcBean.getPub_Type() );286 insDoc.put("uc" , pbcBean.getUc() );287 288 collection.insert(insDoc);289 insDoc.clear();290 291 }292 293 // 循环遍历pdaBeanList294 // System.out.println("loading ...");295 // logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size());296 297 // 当前记录编号298 // int currentRowNumber = 0;299 300 if ( m != null) m.close();301 302 // System.out.println("Load finished.");303 }304 305 306 307 /**308 * 在 MongoDB 中查找缺失的数据包309 * @return310 * @throws Exception311 */312 public String findLostPackages_from_mongoDB() throws Exception {313 314 315 //# MongoDB(数据加载目标)316 String syncLoadIntoMongoDbService = "10.78.2.23:27017";317 String syncLoadIntoMongoDbName = "patent_search_extend";318 String syncLoadIntoMongoTable = "patent_bibliographic_20160319";319 320 // 加载开始321 // logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );322 323 Mongo m = null;324 try {325 m = new Mongo( syncLoadIntoMongoDbService );326 } catch (UnknownHostException e) {327 e.printStackTrace();328 logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());329 }330 331 // 库名332 DB db = m.getDB( syncLoadIntoMongoDbName );333 // logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );334 335 // 表名336 DBCollection collection = db.getCollection( syncLoadIntoMongoTable );337 // logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable ); 338 339 PatentBibliographicChangeBean pbcBean = new PatentBibliographicChangeBean();340 341 342 for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 343 i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) { 344 345 //(1)从 Oracle 取一个著录ID346 pbcBean = select_changeHistory( String.format( "%06d", i ));347 348 if( null == pbcBean ) {349 logMemory("数据为空! ", "包编号:"+i+",著录ID:" + pbcBean.getId() ); continue ;350 }351 352 // logMemory("包编号:"+i+",著录ID:", pbcBean.getId() );353 354 //(2)查询 MongoDB 中是否存在 355 BasicDBObject docFind = new BasicDBObject( "id", pbcBean.getId() );356 DBObject findResult = collection.findOne( docFind );357 358 if( null == findResult || "".equals( findResult )){359 logMemory("缺失 !!! ", "包编号:"+i+",著录ID:" + pbcBean.getId() );360 }361 }362 363 // retrieve364 365 366 if ( m != null) m.close();367 368 369 return SUCCESS ;370 }371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 /** 记录日志 */387 private void logMemory(String behavior, String content) {388 // 向服务器发送日志389 // sendLog.send("syncPacker", behavior, content);390 // 记录本地日志391 logger.info(DateUtils.getNow() + " " + behavior + " :" + content);392 // 控制台输出日志393 // System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content);394 }395 396 @Override397 public String insert() throws Exception {398 return null;399 }400 401 @Override402 public String update() throws Exception {403 return null;404 }405 406 @Override407 public String selectList() throws Exception {408 return null;409 }410 411 @Override412 public String delete() throws Exception {413 return null;414 }415 416 public static boolean isRunning() {417 return isRunning;418 }419 420 public static void setRunning(boolean isRunning) {421 SyncPacker_201603.isRunning = isRunning;422 }423 424 public Integer getCenterNodeDataVersion() {425 return centerNodeDataVersion;426 }427 428 public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {429 this.centerNodeDataVersion = centerNodeDataVersion;430 }431 432 public String getSuccess() {433 return success;434 }435 436 public void setSuccess(String success) {437 this.success = success;438 }439 440 public String getMaxPackageNumber() {441 return maxPackageNumber;442 }443 444 public void setMaxPackageNumber(String maxPackageNumber) {445 this.maxPackageNumber = maxPackageNumber;446 }447 448 public String getPackedPackageNumber() {449 return packedPackageNumber;450 }451 452 public void setPackedPackageNumber(String packedPackageNumber) {453 this.packedPackageNumber = packedPackageNumber;454 }455 456 public SyncDataPackageBean getBean() {457 return bean;458 }459 460 public void setBean(SyncDataPackageBean bean) {461 this.bean = bean;462 }463 }

 

转载于:https://www.cnblogs.com/livon/p/5302164.html

你可能感兴趣的文章
Selenium学习之==>Switch与SelectApi接口详解
查看>>
Js判断是否是直接进入本页面的
查看>>
GOlang eclipse install
查看>>
Centos7 hostname重启失效
查看>>
Postgre SQL连接服务器失败
查看>>
小程序之短信验证
查看>>
App Store优化推广 如何提高海外搜索排名
查看>>
POJ3580 SuperMemo(Splay的区间操作)
查看>>
Android Service解析
查看>>
CentOS7-部署kubernetes
查看>>
hdu 4001 To Miss Our Children Time( sort + DP )
查看>>
日常note
查看>>
Leetcode 727. Minimum Window Subsequence
查看>>
java切换jdk版本
查看>>
hdu 1005 Number Sequence zoj 1105
查看>>
VLAN
查看>>
Oracle12c 性能优化攻略:攻略1-2:创建具有最优性能的表空间
查看>>
yum install 报错[Errno 14] curl#37 - Couldn't open file /mnt/repodata/repomd.xml
查看>>
box-sizeing
查看>>
bzoj 3669 [Noi2014]魔法森林
查看>>