diff --git a/app/Jobs/ProcessAtmTransactionJob.php b/app/Jobs/ProcessAtmTransactionJob.php index a3d068f..e007f57 100644 --- a/app/Jobs/ProcessAtmTransactionJob.php +++ b/app/Jobs/ProcessAtmTransactionJob.php @@ -20,6 +20,7 @@ private const MAX_EXECUTION_TIME = 86400; // 24 hours in seconds private const FILENAME = 'ST.ATM.TRANSACTION.csv'; private const DISK_NAME = 'sftpStatement'; + private const CHUNK_SIZE = 1000; // Process data in chunks to reduce memory usage private const HEADER_MAP = [ 'id' => 'transaction_id', 'card_acc_id' => 'card_acc_id', @@ -43,6 +44,7 @@ private string $period = ''; private int $processedCount = 0; private int $errorCount = 0; + private array $atmTransactionBatch = []; /** * Create a new job instance. @@ -80,6 +82,7 @@ set_time_limit(self::MAX_EXECUTION_TIME); $this->processedCount = 0; $this->errorCount = 0; + $this->atmTransactionBatch = []; } private function processPeriod() @@ -136,9 +139,23 @@ } $rowCount = 0; + $chunkCount = 0; + while (($row = fgetcsv($handle, 0, self::CSV_DELIMITER)) !== false) { $rowCount++; $this->processRow($headerRow, $row, $rowCount, $filePath); + + // Process in chunks to avoid memory issues + if (count($this->atmTransactionBatch) >= self::CHUNK_SIZE) { + $this->saveBatch(); + $chunkCount++; + Log::info("Processed chunk $chunkCount ({$this->processedCount} records so far)"); + } + } + + // Process any remaining records + if (!empty($this->atmTransactionBatch)) { + $this->saveBatch(); } fclose($handle); @@ -151,15 +168,16 @@ if (count($headerRow) !== count($row)) { Log::warning("Row $rowCount in $filePath has incorrect column count. Expected: " . count($headerRow) . ", Got: " . count($row)); + $this->errorCount++; return; } // Combine the header row with the data row $rawData = array_combine($headerRow, $row); - $this->mapAndSaveRecord($rawData, $rowCount, $filePath); + $this->mapAndAddToBatch($rawData, $rowCount, $filePath); } - private function mapAndSaveRecord(array $rawData, int $rowCount, string $filePath) + private function mapAndAddToBatch(array $rawData, int $rowCount, string $filePath) : void { // Map the raw data to our model fields @@ -173,24 +191,53 @@ return; } - $this->saveRecord($data, $rowCount, $filePath); + $this->addToBatch($data, $rowCount, $filePath); } - private function saveRecord(array $data, int $rowCount, string $filePath) + /** + * Add record to batch instead of saving immediately + */ + private function addToBatch(array $data, int $rowCount, string $filePath) : void { try { + // Add timestamp fields + $now = now(); + $data['created_at'] = $now; + $data['updated_at'] = $now; - // Create or update the record - AtmTransaction::updateOrCreate( - ['transaction_id' => $data['transaction_id']], - $data - ); - + // Add to batch + $this->atmTransactionBatch[] = $data; $this->processedCount++; } catch (Exception $e) { $this->errorCount++; - Log::error("Error processing row $rowCount in $filePath: " . $e->getMessage()); + Log::error("Error processing ATM Transaction at row $rowCount in $filePath: " . $e->getMessage()); + } + } + + /** + * Save batched records to the database + */ + private function saveBatch() + : void + { + try { + if (!empty($this->atmTransactionBatch)) { + // Bulk insert/update ATM transactions + AtmTransaction::upsert( + $this->atmTransactionBatch, + ['transaction_id'], // Unique key + array_values(array_diff(self::HEADER_MAP, ['id'])) // Update columns (all except transaction_id) + ); + + // Reset batch after processing + $this->atmTransactionBatch = []; + } + } catch (Exception $e) { + Log::error("Error in saveBatch: " . $e->getMessage()); + $this->errorCount += count($this->atmTransactionBatch); + // Reset batch even if there's an error to prevent reprocessing the same failed records + $this->atmTransactionBatch = []; } } diff --git a/app/Jobs/ProcessDataCaptureDataJob.php b/app/Jobs/ProcessDataCaptureDataJob.php index 4ed1429..09f57a7 100644 --- a/app/Jobs/ProcessDataCaptureDataJob.php +++ b/app/Jobs/ProcessDataCaptureDataJob.php @@ -20,6 +20,7 @@ private const MAX_EXECUTION_TIME = 86400; // 24 hours in seconds private const FILENAME = 'ST.DATA.CAPTURE.csv'; private const DISK_NAME = 'sftpStatement'; + private const CHUNK_SIZE = 1000; // Process data in chunks to reduce memory usage private const CSV_HEADERS = [ 'id', 'account_number', @@ -66,6 +67,7 @@ private string $period = ''; private int $processedCount = 0; private int $errorCount = 0; + private array $captureBatch = []; /** * Create a new job instance. @@ -103,6 +105,7 @@ set_time_limit(self::MAX_EXECUTION_TIME); $this->processedCount = 0; $this->errorCount = 0; + $this->captureBatch = []; } private function processPeriod() @@ -152,6 +155,8 @@ } $rowCount = 0; + $chunkCount = 0; + while (($row = fgetcsv($handle, 0, self::CSV_DELIMITER)) !== false) { $rowCount++; @@ -161,6 +166,18 @@ } $this->processRow($row, $rowCount, $filePath); + + // Process in chunks to avoid memory issues + if (count($this->captureBatch) >= self::CHUNK_SIZE) { + $this->saveBatch(); + $chunkCount++; + Log::info("Processed chunk $chunkCount ({$this->processedCount} records so far)"); + } + } + + // Process any remaining records + if (!empty($this->captureBatch)) { + $this->saveBatch(); } fclose($handle); @@ -178,7 +195,7 @@ $data = array_combine(self::CSV_HEADERS, $row); $this->formatDates($data); - $this->saveRecord($data, $rowCount, $filePath); + $this->addToBatch($data, $rowCount, $filePath); } private function formatDates(array &$data) @@ -207,15 +224,21 @@ } } - private function saveRecord(array $data, int $rowCount, string $filePath) + /** + * Add record to batch instead of saving immediately + */ + private function addToBatch(array $data, int $rowCount, string $filePath) : void { try { if (!empty($data['id'])) { - DataCapture::updateOrCreate( - ['id' => $data['id']], - $data - ); + // Add timestamp fields + $now = now(); + $data['created_at'] = $now; + $data['updated_at'] = $now; + + // Add to capture batch + $this->captureBatch[] = $data; $this->processedCount++; } } catch (Exception $e) { @@ -224,6 +247,32 @@ } } + /** + * Save batched records to the database + */ + private function saveBatch() + : void + { + try { + if (!empty($this->captureBatch)) { + // Bulk insert/update data captures + DataCapture::upsert( + $this->captureBatch, + ['id'], // Unique key + array_diff(self::CSV_HEADERS, ['id']) // Update columns + ); + + // Reset capture batch after processing + $this->captureBatch = []; + } + } catch (Exception $e) { + Log::error("Error in saveBatch: " . $e->getMessage()); + $this->errorCount += count($this->captureBatch); + // Reset batch even if there's an error to prevent reprocessing the same failed records + $this->captureBatch = []; + } + } + private function cleanup(string $tempFilePath) : void { diff --git a/app/Jobs/ProcessFundsTransferDataJob.php b/app/Jobs/ProcessFundsTransferDataJob.php index 05d6220..91c1beb 100644 --- a/app/Jobs/ProcessFundsTransferDataJob.php +++ b/app/Jobs/ProcessFundsTransferDataJob.php @@ -20,10 +20,12 @@ private const MAX_EXECUTION_TIME = 86400; // 24 hours in seconds private const FILENAME = 'ST.FUNDS.TRANSFER.csv'; private const DISK_NAME = 'sftpStatement'; + private const CHUNK_SIZE = 1000; // Process data in chunks to reduce memory usage private string $period = ''; private int $processedCount = 0; private int $errorCount = 0; + private array $transferBatch = []; /** * Create a new job instance. @@ -61,6 +63,7 @@ set_time_limit(self::MAX_EXECUTION_TIME); $this->processedCount = 0; $this->errorCount = 0; + $this->transferBatch = []; } private function processPeriod() @@ -111,10 +114,23 @@ $headers = (new TempFundsTransfer())->getFillable(); $rowCount = 0; + $chunkCount = 0; while (($row = fgetcsv($handle, 0, self::CSV_DELIMITER)) !== false) { $rowCount++; $this->processRow($row, $headers, $rowCount, $filePath); + + // Process in chunks to avoid memory issues + if (count($this->transferBatch) >= self::CHUNK_SIZE) { + $this->saveBatch(); + $chunkCount++; + Log::info("Processed chunk $chunkCount ({$this->processedCount} records so far)"); + } + } + + // Process any remaining records + if (!empty($this->transferBatch)) { + $this->saveBatch(); } fclose($handle); @@ -136,18 +152,24 @@ } $data = array_combine($headers, $row); - $this->saveRecord($data, $rowCount, $filePath); + $this->addToBatch($data, $rowCount, $filePath); } - private function saveRecord(array $data, int $rowCount, string $filePath) + /** + * Add record to batch instead of saving immediately + */ + private function addToBatch(array $data, int $rowCount, string $filePath) : void { try { if (isset($data['_id']) && $data['_id'] !== '_id') { - TempFundsTransfer::updateOrCreate( - ['_id' => $data['_id']], - $data - ); + // Add timestamp fields + $now = now(); + $data['created_at'] = $now; + $data['updated_at'] = $now; + + // Add to transfer batch + $this->transferBatch[] = $data; $this->processedCount++; } } catch (Exception $e) { @@ -156,6 +178,32 @@ } } + /** + * Save batched records to the database + */ + private function saveBatch() + : void + { + try { + if (!empty($this->transferBatch)) { + // Bulk insert/update funds transfers + TempFundsTransfer::upsert( + $this->transferBatch, + ['_id'], // Unique key + array_diff((new TempFundsTransfer())->getFillable(), ['_id']) // Update columns + ); + + // Reset transfer batch after processing + $this->transferBatch = []; + } + } catch (Exception $e) { + Log::error("Error in saveBatch: " . $e->getMessage()); + $this->errorCount += count($this->transferBatch); + // Reset batch even if there's an error to prevent reprocessing the same failed records + $this->transferBatch = []; + } + } + private function cleanup(string $tempFilePath) : void { diff --git a/app/Jobs/ProcessTellerDataJob.php b/app/Jobs/ProcessTellerDataJob.php index 50d34e0..da2ed81 100644 --- a/app/Jobs/ProcessTellerDataJob.php +++ b/app/Jobs/ProcessTellerDataJob.php @@ -20,6 +20,7 @@ private const MAX_EXECUTION_TIME = 86400; // 24 hours in seconds private const FILENAME = 'ST.TELLER.csv'; private const DISK_NAME = 'sftpStatement'; + private const CHUNK_SIZE = 1000; // Process data in chunks to reduce memory usage private const HEADER_MAP = [ 'id' => 'id_teller', 'account_1' => 'account_1', @@ -129,6 +130,7 @@ private string $period = ''; private int $processedCount = 0; private int $errorCount = 0; + private array $tellerBatch = []; /** * Create a new job instance. @@ -166,6 +168,7 @@ set_time_limit(self::MAX_EXECUTION_TIME); $this->processedCount = 0; $this->errorCount = 0; + $this->tellerBatch = []; } private function processPeriod() @@ -222,9 +225,23 @@ } $rowCount = 0; + $chunkCount = 0; + while (($row = fgetcsv($handle, 0, self::CSV_DELIMITER)) !== false) { $rowCount++; $this->processRow($headerRow, $row, $rowCount, $filePath); + + // Process in chunks to avoid memory issues + if (count($this->tellerBatch) >= self::CHUNK_SIZE) { + $this->saveBatch(); + $chunkCount++; + Log::info("Processed chunk $chunkCount ({$this->processedCount} records so far)"); + } + } + + // Process any remaining records + if (!empty($this->tellerBatch)) { + $this->saveBatch(); } fclose($handle); @@ -234,6 +251,14 @@ private function processRow(array $headerRow, array $row, int $rowCount, string $filePath) : void { + // Skip if row doesn't have enough columns + if (count($headerRow) !== count($row)) { + Log::warning("Row $rowCount in $filePath has incorrect column count. Expected: " . + count($headerRow) . ", Got: " . count($row)); + $this->errorCount++; + return; + } + // Combine the header row with the data row $rawData = array_combine($headerRow, $row); @@ -249,10 +274,13 @@ } try { - $teller = Teller::firstOrNew(['id_teller' => $data['id_teller']]); - $teller->fill($data); - $teller->save(); + // Add timestamps + $now = now(); + $data['created_at'] = $now; + $data['updated_at'] = $now; + // Add to batch for bulk processing + $this->tellerBatch[] = $data; $this->processedCount++; } catch (Exception $e) { Log::error("Error processing Teller at row $rowCount in $filePath: " . $e->getMessage()); @@ -260,6 +288,28 @@ } } + private function saveBatch(): void + { + try { + if (!empty($this->tellerBatch)) { + // Bulk insert/update teller records + Teller::upsert( + $this->tellerBatch, + ['id_teller'], // Unique key + array_diff(array_values(self::HEADER_MAP), ['id_teller']) // Update columns + ); + + // Reset batch after processing + $this->tellerBatch = []; + } + } catch (Exception $e) { + Log::error("Error in saveBatch: " . $e->getMessage()); + $this->errorCount += count($this->tellerBatch); + // Reset batch even if there's an error to prevent reprocessing the same failed records + $this->tellerBatch = []; + } + } + private function logJobCompletion() : void {