package com.doumee.service.business.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.doumee.core.constants.ResponseStatus; import com.doumee.core.device.ElectronicToolUtil; import com.doumee.core.device.model.ElectronicConstant; import com.doumee.core.device.model.ElectronicNotifyStatus; import com.doumee.core.device.model.request.*; import com.doumee.core.device.model.response.ElectronicBaseResponse; import com.doumee.core.device.model.response.ElectronicDataResponse; import com.doumee.core.device.model.response.MeterDealResponse; import com.doumee.core.device.model.response.QueryDataInfoResponse; import com.doumee.core.device.model.response.QueryDataV2Response; import com.doumee.core.exception.BusinessException; import com.doumee.core.model.LoginUserInfo; import com.doumee.core.utils.Constants; import com.doumee.core.utils.DateUtil; import com.doumee.dao.business.*; import com.doumee.dao.business.dto.YwElectricalEditDTO; import com.doumee.dao.business.dto.YwElectricalOperateDTO; import com.doumee.dao.business.model.*; import com.doumee.service.business.YwElectricalBizService; import com.github.yulichang.wrapper.MPJLambdaWrapper; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.*; import java.util.stream.Collectors; @Service public class YwElectricalBizServiceImpl implements YwElectricalBizService { private static final Logger log = LoggerFactory.getLogger(YwElectricalBizServiceImpl.class); public static final int ACTION_RESET_PREPAY = 1; public static final int ACTION_RESET_POSTPAY = 2; public static final int ACTION_REMOTE_CLOSE = 3; public static final int ACTION_TRIP = 4; public static final int ACTION_CLOSE = 5; public static final int ACTION_OPEN = 6; public static final int ACTION_RECHARGE = 7; public static final int ACTION_READ = 8; private static final long FIRST_STATUS_QUERY_DELAY_MS = 30_000L; private static final long STATUS_QUERY_MIN_INTERVAL_MS = 3_600_000L; private static final int STATUS_QUERY_BATCH_SIZE = 50; private static final int STATUS_QUERY_MAX_PENDING = 100; /** ele_read 抄电表数据操作类型(与 queryData functionids=253 不同) */ private static final int ELE_READ_TYPE_METER = 3; /** queryData 电表状态详情功能 ID */ private static final String QUERY_DATA_FUNCTION_METER_STATUS = "253"; @Autowired private YwElectricalMapper ywElectricalMapper; @Autowired private YwElectricalRoomMapper ywElectricalRoomMapper; @Autowired private YwElectricalActionsMapper ywElectricalActionsMapper; @Autowired private YwElectricalLogMapper ywElectricalLogMapper; @Autowired private YwElectricalDataMapper ywElectricalDataMapper; @Autowired private YwElectricalChargeMapper ywElectricalChargeMapper; @Override public YwElectricalEditDTO getDetail(Integer id) { YwElectrical e = ywElectricalMapper.selectById(id); if (e == null || Objects.equals(e.getIsdeleted(), Constants.ONE)) { throw new BusinessException(ResponseStatus.DATA_EMPTY); } YwElectricalEditDTO dto = new YwElectricalEditDTO(); BeanUtils.copyProperties(e, dto); List rooms = ywElectricalRoomMapper.selectList(new QueryWrapper().lambda() .eq(YwElectricalRoom::getIsdeleted, Constants.ZERO) .eq(YwElectricalRoom::getType, Constants.ZERO) .eq(YwElectricalRoom::getObjId, id)); dto.setRoomIds(rooms.stream().map(YwElectricalRoom::getRoomId).filter(Objects::nonNull).collect(Collectors.toList())); fillRoomNames(Collections.singletonList(dto)); return dto; } @Override @Transactional(rollbackFor = Exception.class) public void updateDetail(YwElectricalEditDTO dto, LoginUserInfo user) { if (dto == null || dto.getId() == null) { throw new BusinessException(ResponseStatus.BAD_REQUEST); } ywElectricalMapper.update(null, new UpdateWrapper().lambda() .eq(YwElectrical::getId, dto.getId()) .set(YwElectrical::getRate, dto.getRate() != null ? dto.getRate() : BigDecimal.ONE) .set(YwElectrical::getElectricalParamId, dto.getElectricalParamId()) .set(YwElectrical::getEdirot, user.getId()) .set(YwElectrical::getEditDate, new Date())); saveRooms(dto.getId(), dto.getRoomIds(), user); } private void saveRooms(Integer electricalId, List roomIds, LoginUserInfo user) { ywElectricalRoomMapper.update(null, new UpdateWrapper().lambda() .set(YwElectricalRoom::getIsdeleted, Constants.ONE) .set(YwElectricalRoom::getEditDate, new Date()) .set(YwElectricalRoom::getEditor, user.getId()) .eq(YwElectricalRoom::getObjId, electricalId) .eq(YwElectricalRoom::getType, Constants.ZERO)); if (CollectionUtils.isEmpty(roomIds)) { return; } int sort = 0; for (Integer roomId : roomIds) { if (roomId == null) continue; YwElectricalRoom r = new YwElectricalRoom(); r.setCreator(user.getId()); r.setCreateDate(new Date()); r.setEditor(user.getId()); r.setEditDate(new Date()); r.setIsdeleted(Constants.ZERO); r.setType(Constants.ZERO); r.setObjId(electricalId); r.setRoomId(roomId); r.setSortnum(++sort); ywElectricalRoomMapper.insert(r); } } @Override public Map getRemoteInfo(Integer electricalId) { YwElectrical e = requireElectrical(electricalId); fillRoomNames(Collections.singletonList(e)); Map map = new LinkedHashMap<>(); map.put("electrical", e); YwElectricalData latest = findLatestData(e.getId(), e.getAddress()); map.put("latestData", latest); map.put("purchaseCount", latest != null && latest.getCountnum() != null ? latest.getCountnum() : "0"); return map; } @Override @Transactional(rollbackFor = Exception.class) public String operate(YwElectricalOperateDTO dto, LoginUserInfo user) { YwElectrical e = requireElectrical(dto.getElectricalId()); String action = dto.getAction(); if (StringUtils.isBlank(action)) { throw new BusinessException(ResponseStatus.BAD_REQUEST); } switch (action) { case "resetPrepay": return doSecurityReset(e, "default", ACTION_RESET_PREPAY, user); case "resetPostpay": return doSecurityReset(e, "noprepay", ACTION_RESET_POSTPAY, user); case "trip": return doEleControl(e, 10, ACTION_TRIP, user); case "close": return doEleControl(e, 11, ACTION_CLOSE, user); case "openAccount": return doOpenAccount(e, dto, user); case "recharge": return doRecharge(e, dto, user); case "readMeter": return doReadMeter(e, user); default: throw new BusinessException(ResponseStatus.BAD_REQUEST); } } private String doSecurityReset(YwElectrical e, String paymentMode, int actionType, LoginUserInfo user) { assertNoPendingAsyncAction(e.getId(), actionType); String oprId = newOprId(); SecurityResetRequest req = new SecurityResetRequest(); req.setOpr_id(oprId); req.setCid(e.getCollectorId()); req.setAddress(e.getAddress()); req.setTime_out(86400); req.setMust_online("false"); req.setRetry_times(1); SecurityResetParamRequest p = new SecurityResetParamRequest(); p.setPaymentmode(paymentMode); p.setAccount_id(StringUtils.defaultIfBlank(e.getAccountId(), "1")); req.setParams(p); String reqJson = JSON.toJSONString(Collections.singletonList(req)); ElectronicBaseResponse resp = ElectronicToolUtil.eleSecurityReset(Collections.singletonList(req)); return finishAsync(e, actionType, oprId, "/Api_v2/ele_security/reset", reqJson, resp, user); } private String doEleControl(YwElectrical e, int type, int actionType, LoginUserInfo user) { assertNoPendingAsyncAction(e.getId(), actionType); String oprId = newOprId(); EleControlApiRequest req = new EleControlApiRequest(); req.setOpr_id(oprId); req.setCid(e.getCollectorId()); req.setAddress(e.getAddress()); req.setTime_out(86400); req.setMust_online("false"); req.setRetry_times(1); req.setType(type); String reqJson = JSON.toJSONString(Collections.singletonList(req)); List list = new ArrayList<>(); list.add(req); ElectronicBaseResponse resp = ElectronicToolUtil.eleControl(list); return finishAsync(e, actionType, oprId, "/Api_v2/ele_security/ele_control", reqJson, resp, user); } private String doOpenAccount(YwElectrical e, YwElectricalOperateDTO dto, LoginUserInfo user) { assertNoPendingAsyncAction(e.getId(), ACTION_OPEN); String oprId = newOprId(); OpenAccountRequest req = buildOpenAccountRequest(e, oprId, dto.getMoney(), dto.getRemark()); String reqJson = JSON.toJSONString(Collections.singletonList(req)); ElectronicBaseResponse resp = ElectronicToolUtil.openAcount(Collections.singletonList(req)); return finishAsync(e, ACTION_OPEN, oprId, "/Api_v2/ele_security/open_acount", reqJson, resp, user); } private String doRecharge(YwElectrical e, YwElectricalOperateDTO dto, LoginUserInfo user) { if (!Objects.equals(e.getAccountStatus(), Constants.ONE)) { throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "电表未开户,请先开户"); } assertNoPendingAsyncAction(e.getId(), ACTION_RECHARGE); String oprId = newOprId(); OpenAccountRequest req = buildOpenAccountRequest(e, oprId, dto.getMoney(), dto.getRemark()); String reqJson = JSON.toJSONString(Collections.singletonList(req)); ElectronicBaseResponse resp = ElectronicToolUtil.recharger(Collections.singletonList(req)); String msg = finishAsync(e, ACTION_RECHARGE, oprId, "/Api_v2/ele_security/recharge", reqJson, resp, user); if (isSuccess(resp)) { saveChargeRecord(e, oprId, dto, user); } return msg; } private String doReadMeter(YwElectrical e, LoginUserInfo user) { assertNoPendingAsyncAction(e.getId(), ACTION_READ); String oprId = newOprId(); EleReadRequest req = new EleReadRequest(); req.setOpr_id(oprId); req.setCid(e.getCollectorId()); req.setAddress(e.getAddress()); req.setTime_out(86400); req.setMust_online("false"); req.setType(ELE_READ_TYPE_METER); req.setRetry_times(1); String reqJson = JSON.toJSONString(Collections.singletonList(req)); ElectronicBaseResponse resp = ElectronicToolUtil.eleRead(Collections.singletonList(req)); String msg = finishAsync(e, ACTION_READ, oprId, "/Api_v2/ele_read", reqJson, resp, user); if (!isSuccess(resp)) { throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(), msg); } boolean synced = syncMeterDataForElectrical(e); refreshBalanceFromData(e, null, true); return synced ? "抄表成功,用量余额已更新" : "抄表请求已提交,暂无新抄表数据,请稍后刷新查看"; } private OpenAccountRequest buildOpenAccountRequest(YwElectrical e, String oprId, BigDecimal money, String remark) { OpenAccountRequest req = new OpenAccountRequest(); req.setOpr_id(oprId); req.setCid(e.getCollectorId()); req.setAddress(e.getAddress()); req.setTime_out(86400); req.setMust_online("false"); req.setRetry_times(1); OpenAccountParamRequest p = new OpenAccountParamRequest(); p.setMoney(money != null ? money.toPlainString() : "0"); p.setAccount_id(StringUtils.defaultIfBlank(e.getAccountId(), "1")); p.setCount("1"); p.setRate(e.getRate() != null ? e.getRate().toPlainString() : "1"); req.setParams(p); return req; } private String finishAsync(YwElectrical e, int actionType, String oprId, String apiName, String reqJson, ElectronicBaseResponse resp, LoginUserInfo user) { saveLog(e, apiName, reqJson, resp, user); YwElectricalActions act = new YwElectricalActions(); act.setCreator(user.getId()); act.setCreateDate(new Date()); act.setEditor(user.getId()); act.setEditDate(new Date()); act.setIsdeleted(Constants.ZERO); act.setElectricalId(e.getId()); act.setActionType(actionType); act.setOprId(oprId); act.setRequestBody(reqJson); act.setResponseBody(resp != null ? JSON.toJSONString(resp) : null); if (isSuccess(resp)) { act.setStatus(Constants.ZERO); act.setResultMsg("已提交,等待平台执行"); } else { act.setStatus(Constants.TWO); act.setResultMsg(resp != null ? resp.getError_msg() : "调用失败"); } ywElectricalActionsMapper.insert(act); return isSuccess(resp) ? "提交成功,请稍后在操作记录或充值记录中查看结果" : act.getResultMsg(); } /** 同一电表、相同操作类型存在处理中记录时,禁止重复提交异步任务 */ private void assertNoPendingAsyncAction(Integer electricalId, int actionType) { if (electricalId == null) { return; } Long pending = ywElectricalActionsMapper.selectCount(new QueryWrapper().lambda() .eq(YwElectricalActions::getElectricalId, electricalId) .eq(YwElectricalActions::getActionType, actionType) .eq(YwElectricalActions::getStatus, Constants.ZERO) .eq(YwElectricalActions::getIsdeleted, Constants.ZERO)); if (pending != null && pending > 0) { throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "需要等待上一次操作结果执行结束"); } } private void saveLog(YwElectrical e, String apiName, String reqJson, ElectronicBaseResponse resp, LoginUserInfo user) { YwElectricalLog logRow = new YwElectricalLog(); logRow.setCreator(user.getId()); logRow.setCreateDate(new Date()); logRow.setEditor(user.getId()); logRow.setEditDate(new Date()); logRow.setIsdeleted(Constants.ZERO); logRow.setDeviceType(Constants.ZERO); logRow.setType(Constants.ZERO); logRow.setName(apiName); logRow.setUrl(apiName); logRow.setRequest(reqJson); logRow.setRepose(resp != null ? JSON.toJSONString(resp) : null); logRow.setSuccess(isSuccess(resp) ? Constants.ZERO : Constants.ONE); logRow.setObjId(String.valueOf(e.getId())); ywElectricalLogMapper.insert(logRow); } private boolean isSuccess(ElectronicBaseResponse resp) { return resp != null && "SUCCESS".equalsIgnoreCase(resp.getStatus()); } @Override @Transactional(rollbackFor = Exception.class) public boolean handleElectricalNotify(String responseContent, String timestamp, String sign) { if (!ElectronicToolUtil.verifyNotifySign(responseContent, timestamp, sign)) { log.warn("electricalNotify sign check failed, timestamp={}", timestamp); return false; } JSONArray items; try { items = JSON.parseArray(responseContent); } catch (Exception e) { log.warn("electricalNotify invalid response_content", e); return false; } if (items == null || items.isEmpty()) { return true; } for (int i = 0; i < items.size(); i++) { applyNotifyItem(items.getJSONObject(i)); } return true; } private void applyNotifyItem(JSONObject item) { if (item == null) { return; } String oprId = item.getString("opr_id"); String status = item.getString("status"); String errMsg = item.get("error_msg") != null ? String.valueOf(item.get("error_msg")) : null; applyActionStatusItem(oprId, status, errMsg, item.toJSONString(), true); } private void applyActionStatusItem(String oprId, String platformStatus, String errMsg, String responseBody, boolean fromNotify) { if (StringUtils.isBlank(oprId)) { return; } YwElectricalActions act = findActionByOprId(oprId); if (act == null) { log.info("electrical action not found for opr_id={}", oprId); return; } ElectronicNotifyStatus ns = ElectronicNotifyStatus.fromCode(platformStatus); String resultMsg = ns.getLabel() + (StringUtils.isNotBlank(errMsg) ? ":" + errMsg : ""); YwElectricalActions upd = new YwElectricalActions(); upd.setId(act.getId()); upd.setEditDate(new Date()); upd.setResponseBody(responseBody); upd.setResultMsg(resultMsg); if (ns.isTerminalSuccess()) { upd.setStatus(Constants.ONE); } else if (ns.isTerminalFail()) { upd.setStatus(Constants.TWO); } else if (ns.isInProgress()) { upd.setStatus(Constants.ZERO); } ywElectricalActionsMapper.updateById(upd); YwElectrical e = ywElectricalMapper.selectById(act.getElectricalId()); if (e == null) { return; } if (fromNotify) { saveNotifyLog(e, JSON.parseObject(responseBody), ns); } if (ns.isTerminalSuccess()) { applyNotifySideEffect(e, act.getActionType()); if (Objects.equals(act.getActionType(), ACTION_RECHARGE)) { updateChargeByNotify(oprId, Constants.ONE, resultMsg); } } else if (ns.isTerminalFail() && Objects.equals(act.getActionType(), ACTION_RECHARGE)) { updateChargeByNotify(oprId, Constants.TWO, resultMsg); } } private YwElectricalActions findActionByOprId(String oprId) { return ywElectricalActionsMapper.selectOne(new QueryWrapper().lambda() .eq(YwElectricalActions::getOprId, oprId) .eq(YwElectricalActions::getIsdeleted, Constants.ZERO) .orderByDesc(YwElectricalActions::getCreateDate) .last("limit 1")); } @Override @Transactional(rollbackFor = Exception.class) public String syncAsyncActionStatus(String oprId) { if (StringUtils.isBlank(oprId)) { throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "缺少任务 ID"); } String trimmed = oprId.trim(); if (findActionByOprId(trimmed) == null) { throw new BusinessException(ResponseStatus.DATA_EMPTY.getCode(), "未找到对应操作记录"); } ElectronicBaseResponse resp = ElectronicToolUtil.requestStatus(trimmed); return handleStatusQueryResponse(resp, trimmed); } @Override @Transactional(rollbackFor = Exception.class) public String syncChargeStatusById(Integer chargeId) { if (chargeId == null) { throw new BusinessException(ResponseStatus.BAD_REQUEST); } YwElectricalCharge charge = ywElectricalChargeMapper.selectById(chargeId); if (charge == null || !Objects.equals(charge.getIsdeleted(), Constants.ZERO)) { throw new BusinessException(ResponseStatus.DATA_EMPTY.getCode(), "充值记录不存在"); } if (!Objects.equals(charge.getStatus(), Constants.ZERO)) { throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "仅充值中记录可同步"); } if (!Objects.equals(charge.getType(), Constants.ZERO)) { throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "仅电表充值记录可同步"); } return syncAsyncActionStatus(charge.getOprId()); } @Override public void syncPendingAsyncActionsScheduled() { try { Date minCreateTime = new Date(System.currentTimeMillis() - FIRST_STATUS_QUERY_DELAY_MS); List pending = ywElectricalActionsMapper.selectList( new QueryWrapper().lambda() .eq(YwElectricalActions::getStatus, Constants.ZERO) .eq(YwElectricalActions::getIsdeleted, Constants.ZERO) .isNotNull(YwElectricalActions::getOprId) .le(YwElectricalActions::getCreateDate, minCreateTime) .orderByAsc(YwElectricalActions::getCreateDate) .last("limit " + STATUS_QUERY_MAX_PENDING)); if (CollectionUtils.isEmpty(pending)) { return; } long nowMs = System.currentTimeMillis(); List oprIds = pending.stream() .filter(act -> shouldPollAction(act, nowMs)) .map(YwElectricalActions::getOprId) .filter(StringUtils::isNotBlank) .distinct() .collect(Collectors.toList()); if (oprIds.isEmpty()) { return; } int updated = 0; for (int i = 0; i < oprIds.size(); i += STATUS_QUERY_BATCH_SIZE) { List batch = oprIds.subList(i, Math.min(i + STATUS_QUERY_BATCH_SIZE, oprIds.size())); ElectronicBaseResponse resp = ElectronicToolUtil.requestStatusByOprIds(batch); updated += applyStatusQueryBatch(resp); } log.info("syncPendingAsyncActionsScheduled queried={}, updated={}", oprIds.size(), updated); } catch (Exception e) { log.warn("syncPendingAsyncActionsScheduled failed", e); } } private int applyStatusQueryBatch(ElectronicBaseResponse resp) { if (resp == null || !"SUCCESS".equalsIgnoreCase(resp.getStatus())) { return 0; } List items = extractMeterDealList(resp); if (CollectionUtils.isEmpty(items)) { return 0; } int updated = 0; for (MeterDealResponse item : items) { if (item == null || StringUtils.isBlank(item.getOpr_id())) { continue; } YwElectricalActions before = findActionByOprId(item.getOpr_id()); if (before == null || !Objects.equals(before.getStatus(), Constants.ZERO)) { continue; } applyActionStatusItem(item.getOpr_id(), item.getStatus(), item.getError_msg(), JSON.toJSONString(item), false); YwElectricalActions after = findActionByOprId(item.getOpr_id()); if (after != null && !Objects.equals(before.getStatus(), after.getStatus())) { updated++; } } return updated; } private String handleStatusQueryResponse(ElectronicBaseResponse resp, String oprId) { if (resp == null) { return "平台无响应,请稍后重试"; } if (!"SUCCESS".equalsIgnoreCase(resp.getStatus())) { return "查询失败:" + StringUtils.defaultIfBlank(resp.getError_msg(), resp.getStatus()); } List items = extractMeterDealList(resp); MeterDealResponse item = items.stream() .filter(i -> i != null && oprId.equals(i.getOpr_id())) .findFirst() .orElse(CollectionUtils.isEmpty(items) ? null : items.get(0)); if (item == null) { return "未查询到任务结果,请稍后重试"; } applyActionStatusItem(oprId, item.getStatus(), item.getError_msg(), JSON.toJSONString(item), false); YwElectricalActions act = findActionByOprId(oprId); if (act == null) { return "未找到对应操作记录"; } if (Objects.equals(act.getStatus(), Constants.ONE)) { return Objects.equals(act.getActionType(), ACTION_RECHARGE) ? "已同步为充值成功" : "操作已成功"; } if (Objects.equals(act.getStatus(), Constants.TWO)) { return Objects.equals(act.getActionType(), ACTION_RECHARGE) ? "已同步为充值失败:" + StringUtils.defaultString(act.getResultMsg()) : "操作已失败:" + StringUtils.defaultString(act.getResultMsg()); } if (ElectronicToolUtil.isAsyncStatusFinal(item.getStatus())) { return "平台状态:" + item.getStatus(); } return "任务处理中,请稍后重试"; } @SuppressWarnings("unchecked") private List extractMeterDealList(ElectronicBaseResponse resp) { Object content = resp.getResponse_content(); if (content == null) { return Collections.emptyList(); } if (content instanceof List) { List list = (List) content; List result = new ArrayList<>(); for (Object o : list) { if (o instanceof MeterDealResponse) { result.add((MeterDealResponse) o); } else { result.add(JSON.parseObject(JSON.toJSONString(o), MeterDealResponse.class)); } } return result; } return JSON.parseArray(JSON.toJSONString(content), MeterDealResponse.class); } private boolean shouldPollAction(YwElectricalActions act, long nowMs) { Date create = act.getCreateDate(); if (create == null || StringUtils.isBlank(act.getOprId())) { return false; } long createMs = create.getTime(); if (nowMs - createMs < FIRST_STATUS_QUERY_DELAY_MS) { return false; } long editMs = act.getEditDate() != null ? act.getEditDate().getTime() : createMs; if (editMs - createMs < 5_000L) { return true; } return nowMs - editMs >= STATUS_QUERY_MIN_INTERVAL_MS; } private void saveChargeRecord(YwElectrical e, String oprId, YwElectricalOperateDTO dto, LoginUserInfo user) { fillRoomNames(Collections.singletonList(e)); YwElectricalCharge charge = new YwElectricalCharge(); charge.setCreator(user.getId()); charge.setCreateDate(new Date()); charge.setEditor(user.getId()); charge.setEditDate(new Date()); charge.setIsdeleted(Constants.ZERO); charge.setType(Constants.ZERO); charge.setObjId(e.getId()); charge.setAddress(e.getAddress()); charge.setName(e.getName()); charge.setCId(e.getCollectorId()); charge.setMoney(dto.getMoney() != null ? dto.getMoney() : BigDecimal.ZERO); charge.setRemark(dto.getRemark()); charge.setOprId(oprId); charge.setStatus(Constants.ZERO); charge.setStatusTime(new Date()); charge.setStatusInfo("充值中"); charge.setBanlance(e.getBalance()); charge.setRoomNames(e.getRoomNames()); if (dto.getCustomerId() != null) { charge.setCustomerId(dto.getCustomerId()); } if (StringUtils.isNotBlank(e.getAddress()) || StringUtils.isNotBlank(e.getName())) { charge.setDeviceInfo(StringUtils.defaultString(e.getAddress()) + " " + StringUtils.defaultString(e.getName())); } if (StringUtils.isNotBlank(e.getParamId())) { try { charge.setParamId(Integer.parseInt(e.getParamId())); } catch (NumberFormatException ignored) { } } ywElectricalChargeMapper.insert(charge); } private void updateChargeByNotify(String oprId, int status, String statusInfo) { YwElectricalCharge charge = ywElectricalChargeMapper.selectOne(new QueryWrapper().lambda() .eq(YwElectricalCharge::getOprId, oprId) .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO) .last("limit 1")); BigDecimal balanceAfter = null; if (charge != null && status == Constants.ONE) { balanceAfter = calcBalanceAfterRecharge(charge); if (charge.getObjId() != null) { updateElectricalBalance(charge.getObjId(), balanceAfter); } } UpdateWrapper uw = new UpdateWrapper<>(); uw.lambda() .set(YwElectricalCharge::getStatus, status) .set(YwElectricalCharge::getStatusTime, new Date()) .set(YwElectricalCharge::getStatusInfo, statusInfo) .set(YwElectricalCharge::getEditDate, new Date()) .eq(YwElectricalCharge::getOprId, oprId) .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO); if (balanceAfter != null) { uw.lambda().set(YwElectricalCharge::getBalanceAfter, balanceAfter); } ywElectricalChargeMapper.update(null, uw); } private void saveNotifyLog(YwElectrical e, JSONObject item, ElectronicNotifyStatus ns) { YwElectricalLog logRow = new YwElectricalLog(); logRow.setCreateDate(new Date()); logRow.setEditDate(new Date()); logRow.setIsdeleted(Constants.ZERO); logRow.setDeviceType(Constants.ZERO); logRow.setType(Constants.ONE); logRow.setName("electricalNotify"); logRow.setUrl("/electronic/electricalNotify"); logRow.setRequest(null); logRow.setRepose(item.toJSONString()); logRow.setSuccess(ns.isTerminalFail() ? Constants.ONE : Constants.ZERO); logRow.setObjId(String.valueOf(e.getId())); ywElectricalLogMapper.insert(logRow); } private void applyNotifySideEffect(YwElectrical e, Integer actionType) { if (actionType == null) return; UpdateWrapper uw = new UpdateWrapper<>(); uw.lambda().eq(YwElectrical::getId, e.getId()).set(YwElectrical::getEditDate, new Date()); switch (actionType) { case ACTION_TRIP: uw.lambda().set(YwElectrical::getRelayStatus, "0"); ywElectricalMapper.update(null, uw); break; case ACTION_CLOSE: uw.lambda().set(YwElectrical::getRelayStatus, "1"); ywElectricalMapper.update(null, uw); break; case ACTION_OPEN: uw.lambda().set(YwElectrical::getAccountStatus, Constants.ONE) .set(YwElectrical::getLastOpenDate, new Date()); ywElectricalMapper.update(null, uw); break; case ACTION_RECHARGE: // 充值成功后余额由 updateChargeByNotify 按「充值前+充值金额」累计,待定时抄表后再切回抄表余额 break; case ACTION_READ: syncMeterDataForElectrical(e); refreshBalanceFromData(e, null, true); break; default: break; } } /** 充值后累计余额 = 充值前余额 + 充值金额 */ private BigDecimal calcBalanceAfterRecharge(YwElectricalCharge charge) { BigDecimal before = charge.getBanlance() != null ? charge.getBanlance() : BigDecimal.ZERO; BigDecimal money = charge.getMoney() != null ? charge.getMoney() : BigDecimal.ZERO; return before.add(money); } private void updateElectricalBalance(Integer electricalId, BigDecimal balance) { if (electricalId == null || balance == null) { return; } UpdateWrapper uw = new UpdateWrapper<>(); uw.lambda() .eq(YwElectrical::getId, electricalId) .set(YwElectrical::getBalance, balance) .set(YwElectrical::getBalanceTime, new Date()) .set(YwElectrical::getEditDate, new Date()); ywElectricalMapper.update(null, uw); } private YwElectricalCharge findLastSuccessfulRecharge(Integer electricalId) { if (electricalId == null) { return null; } return ywElectricalChargeMapper.selectOne(new QueryWrapper().lambda() .eq(YwElectricalCharge::getObjId, electricalId) .eq(YwElectricalCharge::getType, Constants.ZERO) .eq(YwElectricalCharge::getStatus, Constants.ONE) .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO) .orderByDesc(YwElectricalCharge::getStatusTime) .orderByDesc(YwElectricalCharge::getId) .last("limit 1")); } private Date parseDataTime(YwElectricalData data) { if (data == null) { return null; } if (StringUtils.isNotBlank(data.getAddTime())) { try { return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(data.getAddTime().trim()); } catch (Exception ignored) { } } return data.getCreateDate(); } /** * 最近一次充值成功后,若抄表时间仍早于充值成功时间,则暂不以抄表余额覆盖累计余额。 */ private boolean shouldUseMeterBalance(Integer electricalId, YwElectricalData data) { YwElectricalCharge lastRecharge = findLastSuccessfulRecharge(electricalId); if (lastRecharge == null || lastRecharge.getStatusTime() == null) { return true; } Date dataTime = parseDataTime(data); if (dataTime == null) { return true; } return !dataTime.before(lastRecharge.getStatusTime()); } private void refreshBalanceFromData(YwElectrical e) { refreshBalanceFromData(e, null, false); } private void refreshBalanceFromData(YwElectrical e, YwElectricalData data, boolean forceFromMeter) { if (e == null) { return; } if (data == null) { data = findLatestData(e.getId(), e.getAddress()); } if (data == null) { return; } UpdateWrapper uw = new UpdateWrapper<>(); uw.lambda().eq(YwElectrical::getId, e.getId()); if (StringUtils.isNotBlank(data.getZhygzdl())) { uw.lambda().set(YwElectrical::getBalanceBattery, data.getZhygzdl()); } boolean useMeterBalance = forceFromMeter || shouldUseMeterBalance(e.getId(), data); if (useMeterBalance && StringUtils.isNotBlank(data.getYe())) { try { uw.lambda().set(YwElectrical::getBalance, new BigDecimal(data.getYe())); } catch (Exception ignored) { } } if (isPurchaseCountPositive(data.getCountnum())) { uw.lambda().set(YwElectrical::getAccountStatus, Constants.ONE); } if (useMeterBalance) { Date dataTime = parseDataTime(data); uw.lambda().set(YwElectrical::getBalanceTime, dataTime != null ? dataTime : new Date()); } uw.lambda().set(YwElectrical::getEditDate, new Date()); ywElectricalMapper.update(null, uw); } /** 购买次数大于 0 视为已开户 */ private static boolean isPurchaseCountPositive(String purchaseCount) { if (StringUtils.isBlank(purchaseCount)) { return false; } try { return new BigDecimal(purchaseCount.trim()).compareTo(BigDecimal.ZERO) > 0; } catch (NumberFormatException e) { return false; } } private YwElectricalData findLatestData(Integer electricalId, String address) { QueryWrapper q = new QueryWrapper<>(); q.lambda().eq(YwElectricalData::getIsdeleted, Constants.ZERO) .and(w -> w.eq(YwElectricalData::getDeviceId, String.valueOf(electricalId)) .or().eq(StringUtils.isNotBlank(address), YwElectricalData::getAddress, address)) .orderByDesc(YwElectricalData::getCreateDate).last("limit 1"); return ywElectricalDataMapper.selectOne(q); } @Override public void syncMeterDataScheduled() { try { syncMeterDataInternal(); } catch (Exception e) { log.warn("syncMeterDataScheduled failed", e); } } @Override public String syncMeterDataFromPlatform() { MeterDataSyncStats stats = syncMeterDataInternal(); return "抄表同步完成:新增【" + stats.addCount + "】条,跳过重复【" + stats.skipCount + "】条"; } private static class MeterDataSyncStats { private int addCount; private int skipCount; } private MeterDataSyncStats syncMeterDataInternal() { MeterDataSyncStats stats = new MeterDataSyncStats(); String startTime = resolveSyncStartTime(); QueryDataRequest param = buildQueryDataRequest(startTime, DateUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss")); log.info("sync meter data, start_time={}, end_time={}", startTime, param.getEnd_time()); List list = fetchQueryDataList(param); if (CollectionUtils.isEmpty(list)) { return stats; } for (QueryDataInfoResponse item : list) { if (item == null || StringUtils.isBlank(item.getAddress())) { continue; } YwElectrical meter = ywElectricalMapper.selectOne(new QueryWrapper().lambda() .eq(YwElectrical::getIsdeleted, Constants.ZERO) .eq(YwElectrical::getAddress, item.getAddress()) .last("limit 1")); if (saveDataRow(item, meter != null ? meter.getId() : null)) { stats.addCount++; if (meter != null) { refreshBalanceFromData(meter); } } else { stats.skipCount++; } } return stats; } /** 单表抄表后从第三方拉取最新数据入库,返回是否有新记录 */ private boolean syncMeterDataForElectrical(YwElectrical e) { if (e == null || StringUtils.isBlank(e.getAddress())) { return false; } Calendar cal = Calendar.getInstance(); cal.add(Calendar.HOUR_OF_DAY, -24); QueryDataRequest param = buildQueryDataRequest( DateUtil.formatDate(cal.getTime(), "yyyy-MM-dd HH:mm:ss"), DateUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss")); List list; try { list = fetchQueryDataList(param); } catch (BusinessException ex) { log.warn("sync meter data for electricalId={} failed: {}", e.getId(), ex.getMessage()); return false; } if (CollectionUtils.isEmpty(list)) { return false; } String address = e.getAddress().trim(); boolean updated = false; for (QueryDataInfoResponse item : list) { if (item == null || StringUtils.isBlank(item.getAddress())) { continue; } if (!address.equalsIgnoreCase(item.getAddress().trim())) { continue; } if (saveDataRow(item, e.getId())) { updated = true; } } return updated; } private QueryDataRequest buildQueryDataRequest(String startTime, String endTime) { QueryDataRequest param = new QueryDataRequest(); param.setType("json"); param.setFunctionids(QUERY_DATA_FUNCTION_METER_STATUS); param.setStart_time(startTime); param.setEnd_time(endTime); param.setOffset(0); param.setLimit(500); return param; } private List fetchQueryDataList(QueryDataRequest param) { ElectronicDataResponse response = ElectronicToolUtil.queryDataRequest(param); if (!ElectronicToolUtil.isDataApiSuccess(response)) { throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(), ElectronicToolUtil.dataApiErrorMessage(response, "抄表数据同步失败")); } return parseQueryDataList(response); } private List parseQueryDataList(ElectronicDataResponse response) { if (response == null || response.getData() == null) { return Collections.emptyList(); } String json = JSON.toJSONString(response.getData()); if (StringUtils.isBlank(json) || "null".equals(json)) { return Collections.emptyList(); } try { String trimmed = json.trim(); if (trimmed.startsWith("[")) { return JSON.parseArray(trimmed, QueryDataInfoResponse.class); } if (trimmed.startsWith("{")) { QueryDataInfoResponse one = JSON.parseObject(trimmed, QueryDataInfoResponse.class); return one != null ? Collections.singletonList(one) : Collections.emptyList(); } } catch (Exception e) { log.warn("parse queryDataRequest failed", e); throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(), "抄表数据解析失败"); } return Collections.emptyList(); } /** 取本地已同步的最大抄表时间作为下次 start_time;无记录时默认近24小时 */ private String resolveSyncStartTime() { YwElectricalData latest = ywElectricalDataMapper.selectOne(new QueryWrapper().lambda() .eq(YwElectricalData::getIsdeleted, Constants.ZERO) .isNotNull(YwElectricalData::getAddTime) .ne(YwElectricalData::getAddTime, "") .orderByDesc(YwElectricalData::getAddTime) .last("limit 1")); if (latest != null && StringUtils.isNotBlank(latest.getAddTime())) { String addTime = latest.getAddTime().trim(); if (addTime.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")) { return addTime; } log.warn("invalid addTime in yw_electrical_data, fallback to 24h: {}", addTime); } Calendar cal = Calendar.getInstance(); cal.add(Calendar.HOUR_OF_DAY, -24); return DateUtil.formatDate(cal.getTime(), "yyyy-MM-dd HH:mm:ss"); } private boolean existsByDataId(String dataId) { if (StringUtils.isBlank(dataId)) { return false; } return ywElectricalDataMapper.selectCount(new QueryWrapper().lambda() .eq(YwElectricalData::getIsdeleted, Constants.ZERO) .eq(YwElectricalData::getDataId, dataId.trim())) > 0; } /** 保存抄表记录;dataId 已存在则跳过,返回 false */ private boolean saveDataRow(QueryDataInfoResponse item, Integer electricalId) { String dataId = item.getId(); if (StringUtils.isNotBlank(dataId) && existsByDataId(dataId)) { return false; } YwElectricalData row = new YwElectricalData(); row.setCreateDate(new Date()); row.setEditDate(new Date()); row.setIsdeleted(Constants.ZERO); row.setDeviceId(electricalId != null ? String.valueOf(electricalId) : null); applyItemToRow(row, item); ywElectricalDataMapper.insert(row); return true; } /** 第三方 item / data_v2 字段名与本地实体差异映射 */ private static final Map SYNC_FIELD_ALIASES; private static final Set ITEM_SKIP_KEYS; static { Map aliases = new HashMap<>(); aliases.put("id", "dataId"); aliases.put("add_time", "addTime"); aliases.put("count", "countnum"); SYNC_FIELD_ALIASES = Collections.unmodifiableMap(aliases); ITEM_SKIP_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("data_v2", "idnum"))); } /** 解析 data 数组中的 item 属性映射到 yw_electrical_data */ private void applyItemToRow(YwElectricalData row, QueryDataInfoResponse item) { JSONObject itemJson = (JSONObject) JSON.toJSON(item); for (String key : itemJson.keySet()) { if (ITEM_SKIP_KEYS.contains(key)) { continue; } Object val = itemJson.get(key); if (val == null) { continue; } String fieldName = SYNC_FIELD_ALIASES.getOrDefault(key, key); setElectricalDataField(row, fieldName, val); } JSONObject dataV2 = extractDataV2Json(item, itemJson); if (dataV2 != null && !dataV2.isEmpty()) { applyDataV2Json(row, dataV2); } applyDspFallback(row, item.getDsp()); } private JSONObject extractDataV2Json(QueryDataInfoResponse item, JSONObject itemJson) { if (!CollectionUtils.isEmpty(item.getData_v2())) { return mergeDataV2Json(item.getData_v2()); } Object dataV2 = itemJson.get("data_v2"); if (dataV2 == null) { return null; } if (dataV2 instanceof JSONObject) { return (JSONObject) dataV2; } if (dataV2 instanceof JSONArray) { JSONArray arr = (JSONArray) dataV2; JSONObject merged = new JSONObject(); for (int i = 0; i < arr.size(); i++) { Object element = arr.get(i); if (element instanceof JSONObject) { mergeJsonObject(merged, (JSONObject) element); } else if (element != null) { QueryDataV2Response part = arr.getObject(i, QueryDataV2Response.class); if (part != null) { mergeJsonObject(merged, (JSONObject) JSON.toJSON(part)); } } } return merged; } return null; } private void mergeJsonObject(JSONObject target, JSONObject source) { for (String key : source.keySet()) { Object val = source.get(key); if (val != null) { target.put(key, val); } } } private void applyDspFallback(YwElectricalData row, String dsp) { if (StringUtils.isNotBlank(row.getZhygzdl()) || StringUtils.isBlank(dsp)) { return; } if (dsp.contains("kWh") || dsp.contains("kwh")) { String num = dsp.replaceAll("[^0-9.]", " ").trim().split("\\s+")[0]; if (StringUtils.isNotBlank(num)) { row.setZhygzdl(num); } } } /** 合并 data_v2 列表中各条记录的属性(后值覆盖前值) */ private JSONObject mergeDataV2Json(List dataV2List) { JSONObject merged = new JSONObject(); for (QueryDataV2Response part : dataV2List) { if (part == null) { continue; } JSONObject obj = (JSONObject) JSON.toJSON(part); for (String key : obj.keySet()) { Object val = obj.get(key); if (val != null) { merged.put(key, val); } } } return merged; } /** 按属性名将 data_v2 解析写入 yw_electrical_data 同名字段(覆盖 item 层同名字段) */ private void applyDataV2Json(YwElectricalData row, JSONObject dataV2) { if (dataV2 == null || dataV2.isEmpty()) { return; } for (String key : dataV2.keySet()) { Object val = dataV2.get(key); if (val == null) { continue; } String fieldName = SYNC_FIELD_ALIASES.getOrDefault(key, key); setElectricalDataField(row, fieldName, val); } } private void setElectricalDataField(YwElectricalData row, String fieldName, Object val) { if (val == null || StringUtils.isBlank(fieldName)) { return; } try { Field field = YwElectricalData.class.getDeclaredField(fieldName); field.setAccessible(true); Class type = field.getType(); if (String.class.equals(type)) { String strVal = convertFieldString(val); if (StringUtils.isNotBlank(strVal)) { field.set(row, strVal); } } else if (Integer.class.equals(type) || int.class.equals(type)) { if (val instanceof Number) { field.set(row, ((Number) val).intValue()); } else { String text = String.valueOf(val).trim(); if (StringUtils.isNotBlank(text)) { field.set(row, Integer.parseInt(text)); } } } else if (BigDecimal.class.equals(type)) { String text = String.valueOf(val).trim(); if (StringUtils.isNotBlank(text)) { field.set(row, new BigDecimal(text)); } } } catch (NoSuchFieldException | IllegalAccessException | NumberFormatException ignored) { // 忽略第三方扩展字段或无法转换的值 } } private String convertFieldString(Object val) { if (val instanceof Boolean) { return String.valueOf(val); } if (val instanceof JSONArray || val instanceof JSONObject || val instanceof Collection) { return JSON.toJSONString(val); } return String.valueOf(val).trim(); } @Override public void cleanLogBeforeThreeMonths() { Calendar cal = Calendar.getInstance(); cal.add(Calendar.MONTH, -3); ywElectricalLogMapper.delete(new QueryWrapper().lambda() .lt(YwElectricalLog::getCreateDate, cal.getTime())); } @Override public void enrichList(List list) { fillRoomNames(list); if (CollectionUtils.isEmpty(list)) return; for (YwElectrical row : list) { if (StringUtils.isNotBlank(row.getWarnType())) { row.setWarnTypeName(resolveWarnTypeNames(row.getWarnType())); } } } private String resolveWarnTypeNames(String warnType) { String[] codes = warnType.split(","); List names = new ArrayList<>(); for (String codeStr : codes) { if (StringUtils.isBlank(codeStr)) { continue; } String trimmed = codeStr.trim(); try { int code = Integer.parseInt(trimmed); ElectronicConstant.warningDefId def = ElectronicConstant.warningDefId.getByKey(code); names.add(def != null ? def.getName() : trimmed); } catch (NumberFormatException ex) { names.add(trimmed); } } return names.isEmpty() ? warnType : String.join(",", names); } @Override public void enrichDataList(List list) { if (CollectionUtils.isEmpty(list)) { return; } List temp = new ArrayList<>(); for (YwElectricalData row : list) { if (StringUtils.isBlank(row.getElectricalName()) && StringUtils.isNotBlank(row.getName())) { row.setElectricalName(row.getName()); } if (row.getElectricalId() == null) { continue; } YwElectrical e = new YwElectrical(); e.setId(row.getElectricalId()); temp.add(e); } fillRoomNames(temp); Map roomMap = temp.stream() .filter(e -> StringUtils.isNotBlank(e.getRoomNames())) .collect(Collectors.toMap(YwElectrical::getId, YwElectrical::getRoomNames, (a, b) -> a)); for (YwElectricalData row : list) { if (row.getElectricalId() != null) { row.setRoomNames(roomMap.get(row.getElectricalId())); } } } private void fillRoomNames(List list) { if (CollectionUtils.isEmpty(list)) return; List ids = list.stream().map(YwElectrical::getId).filter(Objects::nonNull).collect(Collectors.toList()); if (ids.isEmpty()) return; MPJLambdaWrapper w = new MPJLambdaWrapper<>(); w.selectAll(YwElectricalRoom.class) .selectAs(YwRoom::getRoomNum, YwElectricalRoom::getRoomName) .selectAs(YwBuilding::getName, YwElectricalRoom::getBuildingName) .selectAs(YwFloor::getName, YwElectricalRoom::getFloorName) .leftJoin(YwRoom.class, YwRoom::getId, YwElectricalRoom::getRoomId) .leftJoin(YwFloor.class, YwFloor::getId, YwRoom::getFloor) .leftJoin(YwBuilding.class, YwBuilding::getId, YwRoom::getBuildingId) .eq(YwElectricalRoom::getIsdeleted, Constants.ZERO) .eq(YwElectricalRoom::getType, Constants.ZERO) .in(YwElectricalRoom::getObjId, ids); List rooms = ywElectricalRoomMapper.selectJoinList(YwElectricalRoom.class, w); Map> grouped = rooms.stream().collect(Collectors.groupingBy(YwElectricalRoom::getObjId)); for (YwElectrical row : list) { List rs = grouped.get(row.getId()); if (CollectionUtils.isEmpty(rs)) continue; row.setRoomNames(rs.stream().map(this::formatRoomPath).filter(StringUtils::isNotBlank) .collect(Collectors.joining("、"))); } } private String formatRoomPath(YwElectricalRoom r) { List parts = new ArrayList<>(); if (StringUtils.isNotBlank(r.getBuildingName())) parts.add(r.getBuildingName()); if (StringUtils.isNotBlank(r.getFloorName())) parts.add(r.getFloorName()); if (StringUtils.isNotBlank(r.getRoomName())) parts.add(r.getRoomName()); return String.join("/", parts); } private YwElectrical requireElectrical(Integer id) { YwElectrical e = ywElectricalMapper.selectById(id); if (e == null || Objects.equals(e.getIsdeleted(), Constants.ONE)) { throw new BusinessException(ResponseStatus.DATA_EMPTY); } return e; } private String newOprId() { return UUID.randomUUID().toString().replace("-", ""); } }