doum
7 天以前 e46bfa3ff94a8a1b4daf37c7fcb79c2fab22a72c
server/visits/dmvisit_service/src/main/java/com/doumee/service/business/impl/YwElectricalBizServiceImpl.java
@@ -12,6 +12,7 @@
import com.doumee.core.device.model.request.*;
import com.doumee.core.device.model.response.ElectronicBaseResponse;
import com.doumee.core.device.model.response.ElectronicDataResponse;
import com.doumee.core.device.model.response.MeterDealResponse;
import com.doumee.core.device.model.response.QueryDataInfoResponse;
import com.doumee.core.device.model.response.QueryDataV2Response;
import com.doumee.core.exception.BusinessException;
@@ -51,11 +52,24 @@
    public static final int ACTION_OPEN = 6;
    public static final int ACTION_RECHARGE = 7;
    public static final int ACTION_READ = 8;
    public static final int ACTION_POWER_PROTECT = 9;
    public static final int ACTION_POWER_PROTECT_RELEASE = 10;
    private static final long FIRST_STATUS_QUERY_DELAY_MS = 30_000L;
    private static final long STATUS_QUERY_MIN_INTERVAL_MS = 3_600_000L;
    private static final int STATUS_QUERY_BATCH_SIZE = 50;
    private static final int STATUS_QUERY_MAX_PENDING = 100;
    /** ele_read 抄电表数据操作类型(与 queryData functionids=253 不同) */
    private static final int ELE_READ_TYPE_METER = 3;
    /** queryData 电表状态详情功能 ID */
    private static final String QUERY_DATA_FUNCTION_METER_STATUS = "253";
    /** DataRequest 单次最大条数(接口 limit 上限 1000) */
    private static final int QUERY_DATA_PAGE_SIZE = 500;
    /** 抄表/DataRequest 查询时间跨度上限(天) */
    private static final int MAX_METER_QUERY_DAYS = 7;
    /** 分页 total 合理上限,避免误把时间戳当作总条数 */
    private static final int QUERY_DATA_TOTAL_SANITY_MAX = 100_000;
    @Autowired
    private YwElectricalMapper ywElectricalMapper;
@@ -158,6 +172,10 @@
                return doEleControl(e, 10, ACTION_TRIP, user);
            case "close":
                return doEleControl(e, 11, ACTION_CLOSE, user);
            case "powerProtect":
                return doEleControl(e, 63, ACTION_POWER_PROTECT, user);
            case "powerProtectRelease":
                return doEleControl(e, 220, ACTION_POWER_PROTECT_RELEASE, user);
            case "openAccount":
                return doOpenAccount(e, dto, user);
            case "recharge":
@@ -170,6 +188,7 @@
    }
    private String doSecurityReset(YwElectrical e, String paymentMode, int actionType, LoginUserInfo user) {
        assertNoPendingAsyncAction(e.getId(), actionType);
        String oprId = newOprId();
        SecurityResetRequest req = new SecurityResetRequest();
        req.setOpr_id(oprId);
@@ -188,6 +207,7 @@
    }
    private String doEleControl(YwElectrical e, int type, int actionType, LoginUserInfo user) {
        assertNoPendingAsyncAction(e.getId(), actionType);
        String oprId = newOprId();
        EleControlApiRequest req = new EleControlApiRequest();
        req.setOpr_id(oprId);
@@ -201,10 +221,11 @@
        List<OpenAccountRequest> list = new ArrayList<>();
        list.add(req);
        ElectronicBaseResponse resp = ElectronicToolUtil.eleControl(list);
        return finishAsync(e, actionType, oprId, "/Api_v2/ele_security/ele_control", reqJson, resp, user);
        return finishAsync(e, actionType, oprId, "/Api_v2/ele_control", reqJson, resp, user);
    }
    private String doOpenAccount(YwElectrical e, YwElectricalOperateDTO dto, LoginUserInfo user) {
        assertNoPendingAsyncAction(e.getId(), ACTION_OPEN);
        String oprId = newOprId();
        OpenAccountRequest req = buildOpenAccountRequest(e, oprId, dto.getMoney(), dto.getRemark());
        String reqJson = JSON.toJSONString(Collections.singletonList(req));
@@ -216,6 +237,7 @@
        if (!Objects.equals(e.getAccountStatus(), Constants.ONE)) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "电表未开户,请先开户");
        }
        assertNoPendingAsyncAction(e.getId(), ACTION_RECHARGE);
        String oprId = newOprId();
        OpenAccountRequest req = buildOpenAccountRequest(e, oprId, dto.getMoney(), dto.getRemark());
        String reqJson = JSON.toJSONString(Collections.singletonList(req));
@@ -228,6 +250,7 @@
    }
    private String doReadMeter(YwElectrical e, LoginUserInfo user) {
        assertNoPendingAsyncAction(e.getId(), ACTION_READ);
        String oprId = newOprId();
        EleReadRequest req = new EleReadRequest();
        req.setOpr_id(oprId);
@@ -244,7 +267,7 @@
            throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(), msg);
        }
        boolean synced = syncMeterDataForElectrical(e);
        refreshBalanceFromData(e);
        refreshBalanceFromData(e, null, true);
        return synced ? "抄表成功,用量余额已更新" : "抄表请求已提交,暂无新抄表数据,请稍后刷新查看";
    }
@@ -288,6 +311,21 @@
        }
        ywElectricalActionsMapper.insert(act);
        return isSuccess(resp) ? "提交成功,请稍后在操作记录或充值记录中查看结果" : act.getResultMsg();
    }
    /** 同一电表、相同操作类型存在处理中记录时,禁止重复提交异步任务 */
    private void assertNoPendingAsyncAction(Integer electricalId, int actionType) {
        if (electricalId == null) {
            return;
        }
        Long pending = ywElectricalActionsMapper.selectCount(new QueryWrapper<YwElectricalActions>().lambda()
                .eq(YwElectricalActions::getElectricalId, electricalId)
                .eq(YwElectricalActions::getActionType, actionType)
                .eq(YwElectricalActions::getStatus, Constants.ZERO)
                .eq(YwElectricalActions::getIsdeleted, Constants.ZERO));
        if (pending != null && pending > 0) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "需要等待上一次操作结果执行结束");
        }
    }
    private void saveLog(YwElectrical e, String apiName, String reqJson, ElectronicBaseResponse resp, LoginUserInfo user) {
@@ -336,26 +374,32 @@
    }
    private void applyNotifyItem(JSONObject item) {
        if (item == null) return;
        String oprId = item.getString("opr_id");
        if (StringUtils.isBlank(oprId)) return;
        YwElectricalActions act = ywElectricalActionsMapper.selectOne(new QueryWrapper<YwElectricalActions>().lambda()
                .eq(YwElectricalActions::getOprId, oprId)
                .eq(YwElectricalActions::getIsdeleted, Constants.ZERO)
                .orderByDesc(YwElectricalActions::getCreateDate)
                .last("limit 1"));
        if (act == null) {
            log.info("electricalNotify no action for opr_id={}", oprId);
        if (item == null) {
            return;
        }
        ElectronicNotifyStatus ns = ElectronicNotifyStatus.fromCode(item.getString("status"));
        String oprId = item.getString("opr_id");
        String status = item.getString("status");
        String errMsg = item.get("error_msg") != null ? String.valueOf(item.get("error_msg")) : null;
        applyActionStatusItem(oprId, status, errMsg, item.toJSONString(), true);
    }
    private void applyActionStatusItem(String oprId, String platformStatus, String errMsg, String responseBody,
                                       boolean fromNotify) {
        if (StringUtils.isBlank(oprId)) {
            return;
        }
        YwElectricalActions act = findActionByOprId(oprId);
        if (act == null) {
            log.info("electrical action not found for opr_id={}", oprId);
            return;
        }
        ElectronicNotifyStatus ns = ElectronicNotifyStatus.fromCode(platformStatus);
        String resultMsg = ns.getLabel() + (StringUtils.isNotBlank(errMsg) ? ":" + errMsg : "");
        YwElectricalActions upd = new YwElectricalActions();
        upd.setId(act.getId());
        upd.setEditDate(new Date());
        upd.setResponseBody(item.toJSONString());
        upd.setResponseBody(responseBody);
        upd.setResultMsg(resultMsg);
        if (ns.isTerminalSuccess()) {
            upd.setStatus(Constants.ONE);
@@ -367,8 +411,12 @@
        ywElectricalActionsMapper.updateById(upd);
        YwElectrical e = ywElectricalMapper.selectById(act.getElectricalId());
        if (e == null) return;
        saveNotifyLog(e, item, ns);
        if (e == null) {
            return;
        }
        if (fromNotify) {
            saveNotifyLog(e, JSON.parseObject(responseBody), ns);
        }
        if (ns.isTerminalSuccess()) {
            applyNotifySideEffect(e, act.getActionType());
            if (Objects.equals(act.getActionType(), ACTION_RECHARGE)) {
@@ -377,6 +425,183 @@
        } else if (ns.isTerminalFail() && Objects.equals(act.getActionType(), ACTION_RECHARGE)) {
            updateChargeByNotify(oprId, Constants.TWO, resultMsg);
        }
    }
    private YwElectricalActions findActionByOprId(String oprId) {
        return ywElectricalActionsMapper.selectOne(new QueryWrapper<YwElectricalActions>().lambda()
                .eq(YwElectricalActions::getOprId, oprId)
                .eq(YwElectricalActions::getIsdeleted, Constants.ZERO)
                .orderByDesc(YwElectricalActions::getCreateDate)
                .last("limit 1"));
    }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public String syncAsyncActionStatus(String oprId) {
        if (StringUtils.isBlank(oprId)) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "缺少任务 ID");
        }
        String trimmed = oprId.trim();
        if (findActionByOprId(trimmed) == null) {
            throw new BusinessException(ResponseStatus.DATA_EMPTY.getCode(), "未找到对应操作记录");
        }
        ElectronicBaseResponse resp = ElectronicToolUtil.requestStatus(trimmed);
        return handleStatusQueryResponse(resp, trimmed);
    }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public String syncChargeStatusById(Integer chargeId) {
        if (chargeId == null) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST);
        }
        YwElectricalCharge charge = ywElectricalChargeMapper.selectById(chargeId);
        if (charge == null || !Objects.equals(charge.getIsdeleted(), Constants.ZERO)) {
            throw new BusinessException(ResponseStatus.DATA_EMPTY.getCode(), "充值记录不存在");
        }
        if (!Objects.equals(charge.getStatus(), Constants.ZERO)) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "仅充值中记录可同步");
        }
        if (!Objects.equals(charge.getType(), Constants.ZERO)) {
            throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(), "仅电表充值记录可同步");
        }
        return syncAsyncActionStatus(charge.getOprId());
    }
    @Override
    public void syncPendingAsyncActionsScheduled() {
        try {
            Date minCreateTime = new Date(System.currentTimeMillis() - FIRST_STATUS_QUERY_DELAY_MS);
            List<YwElectricalActions> pending = ywElectricalActionsMapper.selectList(
                    new QueryWrapper<YwElectricalActions>().lambda()
                            .eq(YwElectricalActions::getStatus, Constants.ZERO)
                            .eq(YwElectricalActions::getIsdeleted, Constants.ZERO)
                            .isNotNull(YwElectricalActions::getOprId)
                            .le(YwElectricalActions::getCreateDate, minCreateTime)
                            .orderByAsc(YwElectricalActions::getCreateDate)
                            .last("limit " + STATUS_QUERY_MAX_PENDING));
            if (CollectionUtils.isEmpty(pending)) {
                return;
            }
            long nowMs = System.currentTimeMillis();
            List<String> oprIds = pending.stream()
                    .filter(act -> shouldPollAction(act, nowMs))
                    .map(YwElectricalActions::getOprId)
                    .filter(StringUtils::isNotBlank)
                    .distinct()
                    .collect(Collectors.toList());
            if (oprIds.isEmpty()) {
                return;
            }
            int updated = 0;
            for (int i = 0; i < oprIds.size(); i += STATUS_QUERY_BATCH_SIZE) {
                List<String> batch = oprIds.subList(i, Math.min(i + STATUS_QUERY_BATCH_SIZE, oprIds.size()));
                ElectronicBaseResponse resp = ElectronicToolUtil.requestStatusByOprIds(batch);
                updated += applyStatusQueryBatch(resp);
            }
            log.info("syncPendingAsyncActionsScheduled queried={}, updated={}", oprIds.size(), updated);
        } catch (Exception e) {
            log.warn("syncPendingAsyncActionsScheduled failed", e);
        }
    }
    private int applyStatusQueryBatch(ElectronicBaseResponse resp) {
        if (resp == null || !"SUCCESS".equalsIgnoreCase(resp.getStatus())) {
            return 0;
        }
        List<MeterDealResponse> items = extractMeterDealList(resp);
        if (CollectionUtils.isEmpty(items)) {
            return 0;
        }
        int updated = 0;
        for (MeterDealResponse item : items) {
            if (item == null || StringUtils.isBlank(item.getOpr_id())) {
                continue;
            }
            YwElectricalActions before = findActionByOprId(item.getOpr_id());
            if (before == null || !Objects.equals(before.getStatus(), Constants.ZERO)) {
                continue;
            }
            applyActionStatusItem(item.getOpr_id(), item.getStatus(), item.getError_msg(),
                    JSON.toJSONString(item), false);
            YwElectricalActions after = findActionByOprId(item.getOpr_id());
            if (after != null && !Objects.equals(before.getStatus(), after.getStatus())) {
                updated++;
            }
        }
        return updated;
    }
    private String handleStatusQueryResponse(ElectronicBaseResponse resp, String oprId) {
        if (resp == null) {
            return "平台无响应,请稍后重试";
        }
        if (!"SUCCESS".equalsIgnoreCase(resp.getStatus())) {
            return "查询失败:" + StringUtils.defaultIfBlank(resp.getError_msg(), resp.getStatus());
        }
        List<MeterDealResponse> items = extractMeterDealList(resp);
        MeterDealResponse item = items.stream()
                .filter(i -> i != null && oprId.equals(i.getOpr_id()))
                .findFirst()
                .orElse(CollectionUtils.isEmpty(items) ? null : items.get(0));
        if (item == null) {
            return "未查询到任务结果,请稍后重试";
        }
        applyActionStatusItem(oprId, item.getStatus(), item.getError_msg(), JSON.toJSONString(item), false);
        YwElectricalActions act = findActionByOprId(oprId);
        if (act == null) {
            return "未找到对应操作记录";
        }
        if (Objects.equals(act.getStatus(), Constants.ONE)) {
            return Objects.equals(act.getActionType(), ACTION_RECHARGE) ? "已同步为充值成功" : "操作已成功";
        }
        if (Objects.equals(act.getStatus(), Constants.TWO)) {
            return Objects.equals(act.getActionType(), ACTION_RECHARGE)
                    ? "已同步为充值失败:" + StringUtils.defaultString(act.getResultMsg())
                    : "操作已失败:" + StringUtils.defaultString(act.getResultMsg());
        }
        if (ElectronicToolUtil.isAsyncStatusFinal(item.getStatus())) {
            return "平台状态:" + item.getStatus();
        }
        return "任务处理中,请稍后重试";
    }
    @SuppressWarnings("unchecked")
    private List<MeterDealResponse> extractMeterDealList(ElectronicBaseResponse resp) {
        Object content = resp.getResponse_content();
        if (content == null) {
            return Collections.emptyList();
        }
        if (content instanceof List) {
            List<?> list = (List<?>) content;
            List<MeterDealResponse> result = new ArrayList<>();
            for (Object o : list) {
                if (o instanceof MeterDealResponse) {
                    result.add((MeterDealResponse) o);
                } else {
                    result.add(JSON.parseObject(JSON.toJSONString(o), MeterDealResponse.class));
                }
            }
            return result;
        }
        return JSON.parseArray(JSON.toJSONString(content), MeterDealResponse.class);
    }
    private boolean shouldPollAction(YwElectricalActions act, long nowMs) {
        Date create = act.getCreateDate();
        if (create == null || StringUtils.isBlank(act.getOprId())) {
            return false;
        }
        long createMs = create.getTime();
        if (nowMs - createMs < FIRST_STATUS_QUERY_DELAY_MS) {
            return false;
        }
        long editMs = act.getEditDate() != null ? act.getEditDate().getTime() : createMs;
        if (editMs - createMs < 5_000L) {
            return true;
        }
        return nowMs - editMs >= STATUS_QUERY_MIN_INTERVAL_MS;
    }
    private void saveChargeRecord(YwElectrical e, String oprId, YwElectricalOperateDTO dto, LoginUserInfo user) {
@@ -400,6 +625,12 @@
        charge.setStatusInfo("充值中");
        charge.setBanlance(e.getBalance());
        charge.setRoomNames(e.getRoomNames());
        if (dto.getCustomerId() != null) {
            charge.setCustomerId(dto.getCustomerId());
        }
        if (StringUtils.isNotBlank(e.getAddress()) || StringUtils.isNotBlank(e.getName())) {
            charge.setDeviceInfo(StringUtils.defaultString(e.getAddress()) + " " + StringUtils.defaultString(e.getName()));
        }
        if (StringUtils.isNotBlank(e.getParamId())) {
            try {
                charge.setParamId(Integer.parseInt(e.getParamId()));
@@ -410,13 +641,29 @@
    }
    private void updateChargeByNotify(String oprId, int status, String statusInfo) {
        ywElectricalChargeMapper.update(null, new UpdateWrapper<YwElectricalCharge>().lambda()
        YwElectricalCharge charge = ywElectricalChargeMapper.selectOne(new QueryWrapper<YwElectricalCharge>().lambda()
                .eq(YwElectricalCharge::getOprId, oprId)
                .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO)
                .last("limit 1"));
        BigDecimal balanceAfter = null;
        if (charge != null && status == Constants.ONE) {
            balanceAfter = calcBalanceAfterRecharge(charge);
            if (charge.getObjId() != null) {
                updateElectricalBalance(charge.getObjId(), balanceAfter);
            }
        }
        UpdateWrapper<YwElectricalCharge> uw = new UpdateWrapper<>();
        uw.lambda()
                .set(YwElectricalCharge::getStatus, status)
                .set(YwElectricalCharge::getStatusTime, new Date())
                .set(YwElectricalCharge::getStatusInfo, statusInfo)
                .set(YwElectricalCharge::getEditDate, new Date())
                .eq(YwElectricalCharge::getOprId, oprId)
                .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO));
                .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO);
        if (balanceAfter != null) {
            uw.lambda().set(YwElectricalCharge::getBalanceAfter, balanceAfter);
        }
        ywElectricalChargeMapper.update(null, uw);
    }
    private void saveNotifyLog(YwElectrical e, JSONObject item, ElectronicNotifyStatus ns) {
@@ -454,32 +701,126 @@
                ywElectricalMapper.update(null, uw);
                break;
            case ACTION_RECHARGE:
                // 充值成功后余额由 updateChargeByNotify 按「充值前+充值金额」累计,待定时抄表后再切回抄表余额
                break;
            case ACTION_READ:
                syncMeterDataForElectrical(e);
                refreshBalanceFromData(e);
                refreshBalanceFromData(e, null, true);
                break;
            default:
                break;
        }
    }
    /** 充值后累计余额 = 充值前余额 + 充值金额 */
    private BigDecimal calcBalanceAfterRecharge(YwElectricalCharge charge) {
        BigDecimal before = charge.getBanlance() != null ? charge.getBanlance() : BigDecimal.ZERO;
        BigDecimal money = charge.getMoney() != null ? charge.getMoney() : BigDecimal.ZERO;
        return before.add(money);
    }
    private void updateElectricalBalance(Integer electricalId, BigDecimal balance) {
        if (electricalId == null || balance == null) {
            return;
        }
        UpdateWrapper<YwElectrical> uw = new UpdateWrapper<>();
        uw.lambda()
                .eq(YwElectrical::getId, electricalId)
                .set(YwElectrical::getBalance, balance)
                .set(YwElectrical::getBalanceTime, new Date())
                .set(YwElectrical::getEditDate, new Date());
        ywElectricalMapper.update(null, uw);
    }
    private YwElectricalCharge findLastSuccessfulRecharge(Integer electricalId) {
        if (electricalId == null) {
            return null;
        }
        return ywElectricalChargeMapper.selectOne(new QueryWrapper<YwElectricalCharge>().lambda()
                .eq(YwElectricalCharge::getObjId, electricalId)
                .eq(YwElectricalCharge::getType, Constants.ZERO)
                .eq(YwElectricalCharge::getStatus, Constants.ONE)
                .eq(YwElectricalCharge::getIsdeleted, Constants.ZERO)
                .orderByDesc(YwElectricalCharge::getStatusTime)
                .orderByDesc(YwElectricalCharge::getId)
                .last("limit 1"));
    }
    private Date parseDataTime(YwElectricalData data) {
        if (data == null) {
            return null;
        }
        if (StringUtils.isNotBlank(data.getAddTime())) {
            try {
                return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(data.getAddTime().trim());
            } catch (Exception ignored) {
            }
        }
        return data.getCreateDate();
    }
    /**
     * 最近一次充值成功后,若抄表时间仍早于充值成功时间,则暂不以抄表余额覆盖累计余额。
     */
    private boolean shouldUseMeterBalance(Integer electricalId, YwElectricalData data) {
        YwElectricalCharge lastRecharge = findLastSuccessfulRecharge(electricalId);
        if (lastRecharge == null || lastRecharge.getStatusTime() == null) {
            return true;
        }
        Date dataTime = parseDataTime(data);
        if (dataTime == null) {
            return true;
        }
        return !dataTime.before(lastRecharge.getStatusTime());
    }
    private void refreshBalanceFromData(YwElectrical e) {
        YwElectricalData data = findLatestData(e.getId(), e.getAddress());
        if (data == null) return;
        refreshBalanceFromData(e, null, false);
    }
    private void refreshBalanceFromData(YwElectrical e, YwElectricalData data, boolean forceFromMeter) {
        if (e == null) {
            return;
        }
        if (data == null) {
            data = findLatestData(e.getId(), e.getAddress());
        }
        if (data == null) {
            return;
        }
        UpdateWrapper<YwElectrical> uw = new UpdateWrapper<>();
        uw.lambda().eq(YwElectrical::getId, e.getId());
        if (StringUtils.isNotBlank(data.getZhygzdl())) {
            uw.lambda().set(YwElectrical::getBalanceBattery, data.getZhygzdl());
        }
        if (StringUtils.isNotBlank(data.getYe())) {
        boolean useMeterBalance = forceFromMeter || shouldUseMeterBalance(e.getId(), data);
        if (useMeterBalance && StringUtils.isNotBlank(data.getYe())) {
            try {
                uw.lambda().set(YwElectrical::getBalance, new BigDecimal(data.getYe()));
            } catch (Exception ignored) {
            }
        }
        uw.lambda().set(YwElectrical::getBalanceTime, new Date());
        if (isPurchaseCountPositive(data.getCountnum())) {
            uw.lambda().set(YwElectrical::getAccountStatus, Constants.ONE);
        }
        if (useMeterBalance) {
            Date dataTime = parseDataTime(data);
            uw.lambda().set(YwElectrical::getBalanceTime, dataTime != null ? dataTime : new Date());
        }
        uw.lambda().set(YwElectrical::getEditDate, new Date());
        ywElectricalMapper.update(null, uw);
    }
    /** 购买次数大于 0 视为已开户 */
    private static boolean isPurchaseCountPositive(String purchaseCount) {
        if (StringUtils.isBlank(purchaseCount)) {
            return false;
        }
        try {
            return new BigDecimal(purchaseCount.trim()).compareTo(BigDecimal.ZERO) > 0;
        } catch (NumberFormatException e) {
            return false;
        }
    }
    private YwElectricalData findLatestData(Integer electricalId, String address) {
@@ -494,7 +835,7 @@
    @Override
    public void syncMeterDataScheduled() {
        try {
            syncMeterDataInternal();
            syncMeterDataInternal(null, null);
        } catch (Exception e) {
            log.warn("syncMeterDataScheduled failed", e);
        }
@@ -502,8 +843,36 @@
    @Override
    public String syncMeterDataFromPlatform() {
        MeterDataSyncStats stats = syncMeterDataInternal();
        MeterDataSyncStats stats = syncMeterDataInternal(null, null);
        return "抄表同步完成:新增【" + stats.addCount + "】条,跳过重复【" + stats.skipCount + "】条";
    }
    @Override
    public String syncMeterDataFromPlatform(String readTimeBegin, String readTimeEnd) {
        if (StringUtils.isBlank(readTimeBegin) || StringUtils.isBlank(readTimeEnd)) {
            throw new BusinessException(ResponseStatus.NOT_ALLOWED.getCode(), "请选择抄表时间段");
        }
        String startTime = readTimeBegin.trim();
        String endTime = readTimeEnd.trim();
        validateManualSyncTimeRange(startTime, endTime);
        String expandedStart = expandStartByOneDay(startTime);
        MeterDataSyncStats stats = syncMeterDataInternal(expandedStart, endTime);
        return "抄表同步完成:新增【" + stats.addCount + "】条,跳过重复【" + stats.skipCount + "】条";
    }
    private String expandStartByOneDay(String startTime) {
        try {
            Date start = DateUtil.StringToDate(startTime, "yyyy-MM-dd HH:mm:ss");
            if (start == null) {
                return startTime;
            }
            Calendar cal = Calendar.getInstance();
            cal.setTime(start);
            cal.add(Calendar.DAY_OF_MONTH, -1);
            return DateUtil.formatDate(cal.getTime(), "yyyy-MM-dd HH:mm:ss");
        } catch (Exception e) {
            return startTime;
        }
    }
    private static class MeterDataSyncStats {
@@ -511,12 +880,14 @@
        private int skipCount;
    }
    private MeterDataSyncStats syncMeterDataInternal() {
    private MeterDataSyncStats syncMeterDataInternal(String startTime, String endTime) {
        MeterDataSyncStats stats = new MeterDataSyncStats();
        String startTime = resolveSyncStartTime();
        QueryDataRequest param = buildQueryDataRequest(startTime, DateUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
        log.info("sync meter data, start_time={}, end_time={}", startTime, param.getEnd_time());
        List<QueryDataInfoResponse> list = fetchQueryDataList(param);
        String resolvedStart = StringUtils.isNotBlank(startTime) ? startTime : resolveSyncStartTime();
        String resolvedEnd = StringUtils.isNotBlank(endTime) ? endTime : DateUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
        resolvedStart = capQueryStartTime(resolvedStart, resolvedEnd, MAX_METER_QUERY_DAYS);
        QueryDataRequest param = buildQueryDataRequest(resolvedStart, resolvedEnd);
        log.info("sync meter data, start_time={}, end_time={}", resolvedStart, param.getEnd_time());
        List<QueryDataInfoResponse> list = fetchAllQueryDataList(param);
        if (CollectionUtils.isEmpty(list)) {
            return stats;
        }
@@ -540,6 +911,51 @@
        return stats;
    }
    private void validateManualSyncTimeRange(String startTime, String endTime) {
        if (!startTime.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")
                || !endTime.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")) {
            throw new BusinessException(ResponseStatus.NOT_ALLOWED.getCode(), "抄表时间格式不正确");
        }
        Date start;
        Date end;
        try {
            start = DateUtil.StringToDate(startTime, "yyyy-MM-dd HH:mm:ss");
            end = DateUtil.StringToDate(endTime, "yyyy-MM-dd HH:mm:ss");
        } catch (Exception e) {
            throw new BusinessException(ResponseStatus.NOT_ALLOWED.getCode(), "抄表时间格式不正确");
        }
        if (start == null || end == null || !start.before(end)) {
            throw new BusinessException(ResponseStatus.NOT_ALLOWED.getCode(), "抄表开始时间必须早于结束时间");
        }
        long diffMs = end.getTime() - start.getTime();
        if (diffMs > (long) MAX_METER_QUERY_DAYS * 24 * 60 * 60 * 1000) {
            throw new BusinessException(ResponseStatus.NOT_ALLOWED.getCode(), "抄表时间段不能超过" + MAX_METER_QUERY_DAYS + "天");
        }
    }
    private String capQueryStartTime(String startTime, String endTime, int maxDays) {
        if (StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime)) {
            return startTime;
        }
        try {
            Date start = DateUtil.StringToDate(startTime, "yyyy-MM-dd HH:mm:ss");
            Date end = DateUtil.StringToDate(endTime, "yyyy-MM-dd HH:mm:ss");
            if (start == null || end == null || !start.before(end)) {
                return startTime;
            }
            long maxMs = (long) maxDays * 24 * 60 * 60 * 1000;
            if (end.getTime() - start.getTime() <= maxMs) {
                return startTime;
            }
            Calendar cal = Calendar.getInstance();
            cal.setTime(end);
            cal.add(Calendar.DAY_OF_MONTH, -maxDays);
            return DateUtil.formatDate(cal.getTime(), "yyyy-MM-dd HH:mm:ss");
        } catch (Exception e) {
            return startTime;
        }
    }
    /** 单表抄表后从第三方拉取最新数据入库,返回是否有新记录 */
    private boolean syncMeterDataForElectrical(YwElectrical e) {
        if (e == null || StringUtils.isBlank(e.getAddress())) {
@@ -552,7 +968,7 @@
                DateUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
        List<QueryDataInfoResponse> list;
        try {
            list = fetchQueryDataList(param);
            list = fetchAllQueryDataList(param);
        } catch (BusinessException ex) {
            log.warn("sync meter data for electricalId={} failed: {}", e.getId(), ex.getMessage());
            return false;
@@ -583,17 +999,66 @@
        param.setStart_time(startTime);
        param.setEnd_time(endTime);
        param.setOffset(0);
        param.setLimit(500);
        param.setLimit(QUERY_DATA_PAGE_SIZE);
        return param;
    }
    private List<QueryDataInfoResponse> fetchQueryDataList(QueryDataRequest param) {
        ElectronicDataResponse response = ElectronicToolUtil.queryDataRequest(param);
        if (!ElectronicToolUtil.isDataApiSuccess(response)) {
            throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(),
                    ElectronicToolUtil.dataApiErrorMessage(response, "抄表数据同步失败"));
    /** 按 DataRequest 分页 total 循环拉取全部抄表数据 */
    private List<QueryDataInfoResponse> fetchAllQueryDataList(QueryDataRequest param) {
        if (param == null) {
            return Collections.emptyList();
        }
        return parseQueryDataList(response);
        int limit = param.getLimit() > 0 ? param.getLimit() : QUERY_DATA_PAGE_SIZE;
        List<QueryDataInfoResponse> all = new ArrayList<>();
        int offset = 0;
        Integer total = null;
        int pageNo = 0;
        while (true) {
            pageNo++;
            param.setOffset(offset);
            param.setLimit(limit);
            ElectronicDataResponse response = ElectronicToolUtil.queryDataRequest(param);
            if (!ElectronicToolUtil.isDataApiSuccess(response)) {
                throw new BusinessException(ResponseStatus.SERVER_ERROR.getCode(),
                        ElectronicToolUtil.dataApiErrorMessage(response, "抄表数据同步失败"));
            }
            List<QueryDataInfoResponse> page = parseQueryDataList(response);
            if (!CollectionUtils.isEmpty(page)) {
                all.addAll(page);
            }
            if (total == null) {
                total = resolveQueryTotal(response);
            }
            log.info("sync meter data page={}, offset={}, pageSize={}, accumulated={}, total={}",
                    pageNo, offset, page.size(), all.size(), total);
            if (total != null && total > 0) {
                offset += limit;
                if (offset >= total) {
                    break;
                }
                continue;
            }
            if (CollectionUtils.isEmpty(page) || page.size() < limit) {
                break;
            }
            offset += limit;
            if (pageNo >= 200) {
                log.warn("sync meter data pagination exceeded safety page limit, accumulated={}", all.size());
                break;
            }
        }
        return all;
    }
    private Integer resolveQueryTotal(ElectronicDataResponse response) {
        if (response == null || response.getTotal() == null) {
            return null;
        }
        int total = response.getTotal();
        if (total <= 0 || total > QUERY_DATA_TOTAL_SANITY_MAX) {
            return null;
        }
        return total;
    }
    private List<QueryDataInfoResponse> parseQueryDataList(ElectronicDataResponse response) {