Skip to content

Commit

Permalink
implement PayKonnect 360 (#53)
Browse files Browse the repository at this point in the history
* implement PayKonnect 360

* add tests

* fix lint

* remove comments

* remove dob error in paykonnect if empty

* version update

* initial pr review fixes

* split 180 and 360 into different files

* fix lint and tests 1

* fix lint and tests 2

* fix lint and tests 3

* fix lint and tests 4

* fix lint

* updates based on pr review

* unsorted import lint fix

* remove return

* return types
  • Loading branch information
kshitesh authored Apr 2, 2024
1 parent 8b5dfbf commit eac0ba7
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
from datetime import datetime
from decimal import ROUND_HALF_UP, Decimal
from enum import Enum
from io import IOBase, StringIO
from typing import Any, Optional, Union
from io import StringIO

from flux_sdk.flux_core.data_models import (
ContributionType,
DeductionType,
Employee,
File,
)
Expand All @@ -18,9 +16,6 @@
PayrollRunContribution,
PayrollUploadSettings,
)
from flux_sdk.pension.capabilities.update_deduction_elections.data_models import (
EmployeeDeductionSetting,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,23 +57,6 @@
"EMPLOYEE WORK EMAIL",
]

COLUMNS_360 = [
"RecordType",
"PlanId",
"EmployeeLastName",
"EmployeeFirstName",
"EmployeeMiddleInitial",
"EmployeeSSN",
"EffectiveDate",
"ContributionCode",
"DeferralPercent",
"DeferralAmount",
"EmployeeEligibilityDate",
"LoanNumber",
"LoanPaymentAmount",
"TotalLoanAmount",
]

STANDARD_DATE_FORMAT = "%m/%d/%Y"
TWO_PLACES = Decimal(".01")

Expand Down Expand Up @@ -403,141 +381,3 @@ def format_contributions_for_ascensus_vendor(

file.content = ReportPayrollContributionsAscensusUtil.to_bytes(header + output.getvalue())
return file


class UpdateDeductionElectionsAscensusUtil:
"""
This class represents the "update deduction elections" capability for vendors utilizing
the Ascensus. The developer is supposed to implement
parse_deductions_for_ascensus method in their implementation. For further details regarding their
implementation details, check their documentation.
"""

@staticmethod
def _create_eds_for_value(
deduction_type: DeductionType,
value: Union[str, Decimal],
percentage: bool,
ssn: str,
effective_date: datetime,
) -> EmployeeDeductionSetting:
eds = EmployeeDeductionSetting()
eds.ssn = ssn
eds.effective_date = effective_date
eds.deduction_type = deduction_type
eds.value = Decimal(value) # type: ignore
eds.is_percentage = percentage
return eds

@staticmethod
def _is_valid_amount(value) -> bool:
try:
Decimal(value)
return True
except Exception:
return False

@staticmethod
def get_deduction_type(given_ded_type) -> Optional[DeductionType]:
ded_match_map = {
"4ROTH": DeductionType.ROTH_401K,
"4ROTC": DeductionType.ROTH_401K,
"401K": DeductionType._401K,
"401KC": DeductionType._401K,
"401L": DeductionType._401K_LOAN_PAYMENT,
"403B": DeductionType._403B,
"401A": DeductionType.AFTER_TAX_401K,
"401O": DeductionType._401K,
}
return ded_match_map.get(given_ded_type, None)

@staticmethod
def _parse_deduction_rows(
row: dict[str, Any], result: list[EmployeeDeductionSetting]
) -> list[EmployeeDeductionSetting]:
ssn = row["EmployeeSSN"]
deduction_type = UpdateDeductionElectionsAscensusUtil.get_deduction_type(row["ContributionCode"])
eligibility_date = (
datetime.strptime(row["EmployeeEligibilityDate"], "%m%d%Y")
if row["EmployeeEligibilityDate"]
else datetime.now()
)

if (
UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["DeferralAmount"])
and UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["DeferralPercent"])
and deduction_type
):
result.append(
UpdateDeductionElectionsAscensusUtil._create_eds_for_value(
deduction_type=deduction_type,
value=row["DeferralAmount"]
if row["DeferralAmount"] > row["DeferralPercent"]
else row["DeferralPercent"],
percentage=row["DeferralPercent"] > row["DeferralAmount"],
ssn=ssn,
effective_date=eligibility_date,
)
)

return result

@staticmethod
def _parse_loan_rows(row: dict[str, Any], ssn_to_loan_sum_map: dict[str, Decimal]) -> dict[str, Decimal]:
ssn = row["EmployeeSSN"]
if UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["LoanPaymentAmount"]):
loan_value = Decimal(row["LoanPaymentAmount"])
if ssn in ssn_to_loan_sum_map:
ssn_to_loan_sum_map[ssn] += loan_value
else:
ssn_to_loan_sum_map[ssn] = loan_value

return ssn_to_loan_sum_map

@staticmethod
def parse_deductions_for_ascensus(uri: str, stream: IOBase) -> list[EmployeeDeductionSetting]:
"""
This method receives a stream from which the developer is expected to return a list of EmployeeDeductionSetting
for each employee identifier (SSN).
:param uri: Contains the path of file
:param stream: Contains the stream
:return: list[EmployeeDeductionSetting]
"""
result: list[EmployeeDeductionSetting] = []

try:
reader = csv.DictReader(stream) # type: ignore
except Exception as e:
logger.error(f"[UpdateDeductionElectionsImpl.parse_deductions] Parse deductions failed due to message {e}")
return result

ssn_to_loan_sum_map: dict[str, Decimal] = {}

for row in reader:
try:
ssn = row["EmployeeSSN"]
record_type = row["RecordType"]

if record_type == "D":
UpdateDeductionElectionsAscensusUtil._parse_deduction_rows(row, result)
elif record_type == "L":
UpdateDeductionElectionsAscensusUtil._parse_loan_rows(row, ssn_to_loan_sum_map)
else:
logger.error(f"Unknown transaction type in row: {row}")

except Exception as e:
logger.error(f"[UpdateDeductionElectionsImpl.parse_deductions] Parse row failed due to error {e}")

for ssn in ssn_to_loan_sum_map:
loan_sum = ssn_to_loan_sum_map[ssn]
result.append(
UpdateDeductionElectionsAscensusUtil._create_eds_for_value(
deduction_type=DeductionType._401K_LOAN_PAYMENT,
value=Decimal(loan_sum),
percentage=False,
ssn=ssn,
effective_date=datetime.now(),
)
)

return result
154 changes: 154 additions & 0 deletions flux_sdk/pension/utils/ascensus_update_deduction_elections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import csv
import logging
from datetime import datetime
from decimal import Decimal
from io import IOBase
from typing import Any, Union

from flux_sdk.flux_core.data_models import (
DeductionType,
)
from flux_sdk.pension.capabilities.update_deduction_elections.data_models import (
EmployeeDeductionSetting,
)
from flux_sdk.pension.utils.common import (
RecordTypeKeys,
get_deduction_type,
)

logger = logging.getLogger(__name__)

COLUMNS_360 = [
"RecordType", ## 'D' represents Contribution Change, 'L' represents Loan
"PlanId", ## Plan ID or Contract number
"EmployeeLastName",
"EmployeeFirstName",
"EmployeeMiddleInitial",
"EmployeeSSN",
"EffectiveDate", ## The date that the change is effective
"ContributionCode",
"DeferralPercent",
"DeferralAmount",
"EmployeeEligibilityDate", ## The date the employee became eligible
"LoanNumber",
"LoanPaymentAmount",
"TotalLoanAmount",
]


class UpdateDeductionElectionsAscensusUtil:
"""
This class represents the "update deduction elections" capability for vendors utilizing
the Ascensus. The developer is supposed to implement
parse_deductions_for_ascensus method in their implementation. For further details regarding their
implementation details, check their documentation.
"""

@staticmethod
def _create_eds_for_value(
deduction_type: DeductionType,
value: Union[str, Decimal],
is_percentage: bool,
ssn: str,
effective_date: datetime,
) -> EmployeeDeductionSetting:
eds = EmployeeDeductionSetting()
eds.ssn = ssn
eds.effective_date = effective_date
eds.deduction_type = deduction_type
eds.value = Decimal(value) # type: ignore
eds.is_percentage = is_percentage
return eds

@staticmethod
def _is_valid_amount(value) -> bool:
try:
Decimal(value)
return True
except Exception:
return False

@staticmethod
def _parse_deduction_rows(row: dict[str, Any], result: list[EmployeeDeductionSetting]) -> None:
ssn = row["EmployeeSSN"]
deduction_type = get_deduction_type(row["ContributionCode"])
eligibility_date = (
datetime.strptime(row["EmployeeEligibilityDate"], "%m%d%Y")
if row["EmployeeEligibilityDate"]
else datetime.now()
)

if (
UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["DeferralAmount"])
and UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["DeferralPercent"])
and deduction_type
):
result.append(
UpdateDeductionElectionsAscensusUtil._create_eds_for_value(
deduction_type=deduction_type,
value=row["DeferralAmount"]
if row["DeferralAmount"] > row["DeferralPercent"]
else row["DeferralPercent"],
is_percentage=row["DeferralPercent"] > row["DeferralAmount"],
ssn=ssn,
effective_date=eligibility_date,
)
)

@staticmethod
def _parse_loan_rows(row: dict[str, Any], ssn_to_loan_sum_map: dict[str, Decimal]) -> None:
ssn = row["EmployeeSSN"]
if UpdateDeductionElectionsAscensusUtil._is_valid_amount(row["LoanPaymentAmount"]):
loan_value = Decimal(row["LoanPaymentAmount"])
if ssn in ssn_to_loan_sum_map:
ssn_to_loan_sum_map[ssn] += loan_value
else:
ssn_to_loan_sum_map[ssn] = loan_value

@staticmethod
def parse_deductions_for_ascensus(uri: str, stream: IOBase) -> list[EmployeeDeductionSetting]:
"""
This method receives a stream from which the developer is expected to return a list of EmployeeDeductionSetting
for each employee identifier (SSN).
:param uri: Contains the path of file
:param stream: Contains the stream
:return: list[EmployeeDeductionSetting]
"""
result: list[EmployeeDeductionSetting] = []

try:
reader = csv.DictReader(stream) # type: ignore
except Exception as e:
logger.error(f"[UpdateDeductionElectionsImpl.parse_deductions] Parse deductions failed due to message {e}")
return result

ssn_to_loan_sum_map: dict[str, Decimal] = {}

for row in reader:
try:
ssn = row["EmployeeSSN"]
record_type = row["RecordType"]

if record_type == RecordTypeKeys.DeductionType.value:
UpdateDeductionElectionsAscensusUtil._parse_deduction_rows(row, result)
elif record_type == RecordTypeKeys.LoanType.value:
UpdateDeductionElectionsAscensusUtil._parse_loan_rows(row, ssn_to_loan_sum_map)
else:
logger.error(f"Unknown transaction type in row: {row}")

except Exception as e:
logger.error(f"[UpdateDeductionElectionsImpl.parse_deductions] Parse row failed due to error {e}")

for ssn in ssn_to_loan_sum_map:
loan_sum = ssn_to_loan_sum_map[ssn]
result.append(
UpdateDeductionElectionsAscensusUtil._create_eds_for_value(
deduction_type=DeductionType._401K_LOAN_PAYMENT,
value=Decimal(loan_sum),
is_percentage=False,
ssn=ssn,
effective_date=datetime.now(),
)
)

return result
23 changes: 23 additions & 0 deletions flux_sdk/pension/utils/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from enum import Enum
from typing import Optional

from flux_sdk.flux_core.data_models import DeductionType


class RecordTypeKeys(Enum):
DeductionType = "D"
LoanType = "L"


def get_deduction_type(given_ded_type: str) -> Optional[DeductionType]:
ded_match_map = {
"4ROTH": DeductionType.ROTH_401K,
"4ROTC": DeductionType.ROTH_401K,
"401K": DeductionType._401K,
"401KC": DeductionType._401K,
"401L": DeductionType._401K_LOAN_PAYMENT,
"403B": DeductionType._403B,
"401A": DeductionType.AFTER_TAX_401K,
"401O": DeductionType._401K,
}
return ded_match_map.get(given_ded_type, None)
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def format_contributions_for_pay_konnect_vendor(
pay_type = ReportPayrollContributionsPayKonnectUtil._get_employee_pay_type(employee)
termination_date = getattr(employee, "termination_date", None)
termination_date = termination_date.strftime(STANDARD_DATE_FORMAT) if termination_date else ""
birth_day = employee.dob.strftime(STANDARD_DATE_FORMAT)
birth_day = employee.dob.strftime(STANDARD_DATE_FORMAT) if employee.dob else ""
phone_number = employee.phone_number if employee.phone_number else ""
rehire_date = employee.start_date.strftime(STANDARD_DATE_FORMAT)
hire_date = employee.original_hire_date.strftime(STANDARD_DATE_FORMAT)
Expand Down
Loading

0 comments on commit eac0ba7

Please sign in to comment.