add logs and some bugs fixes
This commit is contained in:
parent
3c0fbb4969
commit
966e610fee
147
src/gpt.py
147
src/gpt.py
@ -12,20 +12,65 @@ from langchain_core.prompt_values import StringPromptValue
|
||||
from langchain_core.prompts import ChatPromptTemplate
|
||||
from langchain_openai import ChatOpenAI
|
||||
from Levenshtein import distance
|
||||
import time
|
||||
from functools import wraps
|
||||
from openai import RateLimitError, OpenAIError, APIError
|
||||
|
||||
|
||||
import src.strings as strings
|
||||
from src.utils import logger
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Global timestamp for rate limiting
|
||||
last_call_time = 0
|
||||
|
||||
|
||||
def global_rate_limiter(min_interval):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
global last_call_time
|
||||
elapsed = time.time() - last_call_time
|
||||
if elapsed < min_interval:
|
||||
logger.debug("Rate limit hit, sleeping for %s seconds", min_interval - elapsed)
|
||||
time.sleep(min_interval - elapsed)
|
||||
last_call_time = time.time()
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
def parse_wait_time_from_error_message(error_message: str) -> int:
|
||||
logger.debug("Parsing wait time from error message: %s", error_message)
|
||||
match = re.search(r"Please try again in (\d+)([smhd])", error_message)
|
||||
if match:
|
||||
value, unit = int(match.group(1)), match.group(2)
|
||||
logger.debug("Extracted wait time: %d %s", value, unit)
|
||||
if unit == 's':
|
||||
return value
|
||||
elif unit == 'm':
|
||||
return value * 60
|
||||
elif unit == 'h':
|
||||
return value * 3600
|
||||
elif unit == 'd':
|
||||
return value * 86400
|
||||
logger.debug("Default wait time applied: 30 seconds")
|
||||
return 30 # По умолчанию ждать 30 секунд, если не удалось разобрать время
|
||||
|
||||
|
||||
class LLMLogger:
|
||||
|
||||
def __init__(self, llm: ChatOpenAI):
|
||||
self.llm = llm
|
||||
logger.debug("LLMLogger initialized with LLM: %s", llm)
|
||||
|
||||
@staticmethod
|
||||
def log_request(prompts, parsed_reply: Dict[str, Dict]):
|
||||
logger.debug("Logging request with prompts: %s", prompts)
|
||||
calls_log = os.path.join(Path("data_folder/output"), "open_ai_calls.json")
|
||||
|
||||
if isinstance(prompts, StringPromptValue):
|
||||
prompts = prompts.text
|
||||
elif isinstance(prompts, Dict):
|
||||
@ -41,6 +86,7 @@ class LLMLogger:
|
||||
}
|
||||
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
logger.debug("Current time: %s", current_time)
|
||||
|
||||
# Extract token usage details from the response
|
||||
token_usage = parsed_reply["usage_metadata"]
|
||||
@ -48,7 +94,8 @@ class LLMLogger:
|
||||
input_tokens = token_usage["input_tokens"]
|
||||
total_tokens = token_usage["total_tokens"]
|
||||
|
||||
# Extract model details from the response
|
||||
logger.debug("Token usage - Input: %d, Output: %d, Total: %d", input_tokens, output_tokens, total_tokens)
|
||||
|
||||
model_name = parsed_reply["response_metadata"]["model_name"]
|
||||
prompt_price_per_token = 0.00000015
|
||||
completion_price_per_token = 0.0000006
|
||||
@ -58,7 +105,8 @@ class LLMLogger:
|
||||
output_tokens * completion_price_per_token
|
||||
)
|
||||
|
||||
# Create a log entry with all relevant information
|
||||
logger.debug("Total cost calculated: %f", total_cost)
|
||||
|
||||
log_entry = {
|
||||
"model": model_name,
|
||||
"time": current_time,
|
||||
@ -70,26 +118,41 @@ class LLMLogger:
|
||||
"total_cost": total_cost,
|
||||
}
|
||||
|
||||
# Write the log entry to the log file in JSON format
|
||||
logger.debug("Log entry created: %s", log_entry)
|
||||
|
||||
with open(calls_log, "a", encoding="utf-8") as f:
|
||||
json_string = json.dumps(log_entry, ensure_ascii=False, indent=4)
|
||||
f.write(json_string + "\n")
|
||||
logger.debug("Log entry written to file: %s", calls_log)
|
||||
|
||||
|
||||
class LoggerChatModel:
|
||||
|
||||
def __init__(self, llm: ChatOpenAI):
|
||||
self.llm = llm
|
||||
logger.debug("LoggerChatModel initialized with LLM: %s", llm)
|
||||
|
||||
def __call__(self, messages: List[Dict[str, str]]) -> str:
|
||||
# Call the LLM with the provided messages and log the response.
|
||||
reply = self.llm(messages)
|
||||
parsed_reply = self.parse_llmresult(reply)
|
||||
LLMLogger.log_request(prompts=messages, parsed_reply=parsed_reply)
|
||||
return reply
|
||||
logger.debug("Calling LoggerChatModel with messages: %s", messages)
|
||||
while True:
|
||||
try:
|
||||
# Попытка вызвать модель
|
||||
reply = self.llm(messages)
|
||||
logger.debug("Model reply received: %s", reply)
|
||||
parsed_reply = self.parse_llmresult(reply)
|
||||
LLMLogger.log_request(prompts=messages, parsed_reply=parsed_reply)
|
||||
return reply
|
||||
except RateLimitError as err:
|
||||
# Handle RateLimitError
|
||||
wait_time = self.parse_wait_time_from_error_message(str(err))
|
||||
logger.warning("Rate limit exceeded. Waiting for %d seconds before retrying...", wait_time)
|
||||
time.sleep(wait_time)
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error occurred: %s", str(e))
|
||||
raise
|
||||
|
||||
def parse_llmresult(self, llmresult: AIMessage) -> Dict[str, Dict]:
|
||||
# Parse the LLM result into a structured format.
|
||||
logger.debug("Parsing LLM result: %s", llmresult)
|
||||
content = llmresult.content
|
||||
response_metadata = llmresult.response_metadata
|
||||
id_ = llmresult.id
|
||||
@ -109,61 +172,93 @@ class LoggerChatModel:
|
||||
"total_tokens": usage_metadata.get("total_tokens", 0),
|
||||
},
|
||||
}
|
||||
logger.debug("Parsed LLM result: %s", parsed_result)
|
||||
return parsed_result
|
||||
|
||||
def parse_wait_time_from_error_message(self, error_message: str) -> int:
|
||||
logger.debug("Parsing wait time from error message: %s", error_message)
|
||||
match = re.search(r"Please try again in (\d+)([smhd])", error_message)
|
||||
if match:
|
||||
value, unit = match.groups()
|
||||
value = int(value)
|
||||
logger.debug("Extracted wait time: %d %s", value, unit)
|
||||
if unit == "s":
|
||||
return value
|
||||
elif unit == "m":
|
||||
return value * 60
|
||||
elif unit == "h":
|
||||
return value * 3600
|
||||
elif unit == "d":
|
||||
return value * 86400
|
||||
logger.debug("Default wait time applied: 30 seconds")
|
||||
return 30
|
||||
|
||||
|
||||
class GPTAnswerer:
|
||||
def __init__(self, openai_api_key):
|
||||
self.llm_cheap = LoggerChatModel(
|
||||
ChatOpenAI(model_name="gpt-4o-mini", openai_api_key=openai_api_key, temperature=0.4)
|
||||
)
|
||||
logger.debug("GPTAnswerer initialized with API key")
|
||||
|
||||
@property
|
||||
def job_description(self):
|
||||
return self.job.description
|
||||
|
||||
@staticmethod
|
||||
def find_best_match(text: str, options: list[str]) -> str:
|
||||
logger.debug("Finding best match for text: '%s' in options: %s", text, options)
|
||||
distances = [
|
||||
(option, distance(text.lower(), option.lower())) for option in options
|
||||
]
|
||||
best_option = min(distances, key=lambda x: x[1])[0]
|
||||
logger.debug("Best match found: %s", best_option)
|
||||
return best_option
|
||||
|
||||
@staticmethod
|
||||
def _remove_placeholders(text: str) -> str:
|
||||
logger.debug("Removing placeholders from text: %s", text)
|
||||
text = text.replace("PLACEHOLDER", "")
|
||||
return text.strip()
|
||||
|
||||
@staticmethod
|
||||
def _preprocess_template_string(template: str) -> str:
|
||||
# Preprocess a template string to remove unnecessary indentation.
|
||||
logger.debug("Preprocessing template string")
|
||||
return textwrap.dedent(template)
|
||||
|
||||
def set_resume(self, resume):
|
||||
logger.debug("Setting resume: %s", resume)
|
||||
self.resume = resume
|
||||
|
||||
def set_job(self, job):
|
||||
logger.debug("Setting job: %s", job)
|
||||
self.job = job
|
||||
self.job.set_summarize_job_description(self.summarize_job_description(self.job.description))
|
||||
|
||||
def set_job_application_profile(self, job_application_profile):
|
||||
logger.debug("Setting job application profile: %s", job_application_profile)
|
||||
self.job_application_profile = job_application_profile
|
||||
|
||||
|
||||
@global_rate_limiter(25)
|
||||
def summarize_job_description(self, text: str) -> str:
|
||||
logger.debug("Summarizing job description: %s", text)
|
||||
strings.summarize_prompt_template = self._preprocess_template_string(
|
||||
strings.summarize_prompt_template
|
||||
)
|
||||
prompt = ChatPromptTemplate.from_template(strings.summarize_prompt_template)
|
||||
chain = prompt | self.llm_cheap | StrOutputParser()
|
||||
output = chain.invoke({"text": text})
|
||||
logger.debug("Summary generated: %s", output)
|
||||
return output
|
||||
|
||||
def _create_chain(self, template: str):
|
||||
logger.debug("Creating chain with template: %s", template)
|
||||
prompt = ChatPromptTemplate.from_template(template)
|
||||
return prompt | self.llm_cheap | StrOutputParser()
|
||||
|
||||
|
||||
@global_rate_limiter(25)
|
||||
def answer_question_textual_wide_range(self, question: str) -> str:
|
||||
# Define chains for each section of the resume
|
||||
logger.debug("Answering textual question: %s", question)
|
||||
chains = {
|
||||
"personal_information": self._create_chain(strings.personal_information_template),
|
||||
"self_identification": self._create_chain(strings.self_identification_template),
|
||||
@ -270,47 +365,66 @@ class GPTAnswerer:
|
||||
prompt = ChatPromptTemplate.from_template(section_prompt)
|
||||
chain = prompt | self.llm_cheap | StrOutputParser()
|
||||
output = chain.invoke({"question": question})
|
||||
logger.debug("Section determined from question: %s", output)
|
||||
section_name = output.lower().replace(" ", "_")
|
||||
if section_name == "cover_letter":
|
||||
chain = chains.get(section_name)
|
||||
output = chain.invoke({"resume": self.resume, "job_description": self.job_description})
|
||||
logger.debug("Cover letter generated: %s", output)
|
||||
return output
|
||||
resume_section = getattr(self.resume, section_name, None) or getattr(self.job_application_profile, section_name, None)
|
||||
if resume_section is None:
|
||||
logger.error("Section '%s' not found in either resume or job_application_profile.", section_name)
|
||||
raise ValueError(f"Section '{section_name}' not found in either resume or job_application_profile.")
|
||||
chain = chains.get(section_name)
|
||||
if chain is None:
|
||||
logger.error("Chain not defined for section '%s'", section_name)
|
||||
raise ValueError(f"Chain not defined for section '{section_name}'")
|
||||
return chain.invoke({"resume_section": resume_section, "question": question})
|
||||
output = chain.invoke({"resume_section": resume_section, "question": question})
|
||||
logger.debug("Question answered: %s", output)
|
||||
return output
|
||||
|
||||
@global_rate_limiter(25)
|
||||
def answer_question_numeric(self, question: str, default_experience: int = 3) -> int:
|
||||
logger.debug("Answering numeric question: %s", question)
|
||||
func_template = self._preprocess_template_string(strings.numeric_question_template)
|
||||
prompt = ChatPromptTemplate.from_template(func_template)
|
||||
chain = prompt | self.llm_cheap | StrOutputParser()
|
||||
output_str = chain.invoke({"resume_educations": self.resume.education_details,"resume_jobs": self.resume.experience_details,"resume_projects": self.resume.projects , "question": question})
|
||||
logger.debug("Raw output for numeric question: %s", output_str)
|
||||
try:
|
||||
output = self.extract_number_from_string(output_str)
|
||||
logger.debug("Extracted number: %d", output)
|
||||
except ValueError:
|
||||
logger.warning("Failed to extract number, using default experience: %d", default_experience)
|
||||
output = default_experience
|
||||
return output
|
||||
|
||||
def extract_number_from_string(self, output_str):
|
||||
logger.debug("Extracting number from string: %s", output_str)
|
||||
numbers = re.findall(r"\d+", output_str)
|
||||
if numbers:
|
||||
logger.debug("Numbers found: %s", numbers)
|
||||
return int(numbers[0])
|
||||
else:
|
||||
logger.error("No numbers found in the string")
|
||||
raise ValueError("No numbers found in the string")
|
||||
|
||||
@global_rate_limiter(25)
|
||||
def answer_question_from_options(self, question: str, options: list[str]) -> str:
|
||||
logger.debug("Answering question from options: %s", question)
|
||||
func_template = self._preprocess_template_string(strings.options_template)
|
||||
prompt = ChatPromptTemplate.from_template(func_template)
|
||||
chain = prompt | self.llm_cheap | StrOutputParser()
|
||||
output_str = chain.invoke({"resume": self.resume, "question": question, "options": options})
|
||||
logger.debug("Raw output for options question: %s", output_str)
|
||||
best_option = self.find_best_match(output_str, options)
|
||||
logger.debug("Best option determined: %s", best_option)
|
||||
return best_option
|
||||
|
||||
|
||||
@global_rate_limiter(25)
|
||||
def resume_or_cover(self, phrase: str) -> str:
|
||||
# Define the prompt template
|
||||
logger.debug("Determining if phrase refers to resume or cover letter: %s", phrase)
|
||||
prompt_template = """
|
||||
Given the following phrase, respond with only 'resume' if the phrase is about a resume, or 'cover' if it's about a cover letter. Do not provide any additional information or explanations.
|
||||
|
||||
@ -319,6 +433,7 @@ class GPTAnswerer:
|
||||
prompt = ChatPromptTemplate.from_template(prompt_template)
|
||||
chain = prompt | self.llm_cheap | StrOutputParser()
|
||||
response = chain.invoke({"phrase": phrase})
|
||||
logger.debug("Response for resume_or_cover: %s", response)
|
||||
if "resume" in response:
|
||||
return "resume"
|
||||
elif "cover" in response:
|
||||
|
11
src/job.py
11
src/job.py
@ -1,5 +1,8 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from src.utils import logger
|
||||
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
title: str
|
||||
@ -13,18 +16,22 @@ class Job:
|
||||
recruiter_link: str = ""
|
||||
|
||||
def set_summarize_job_description(self, summarize_job_description):
|
||||
logger.debug("Setting summarized job description: %s", summarize_job_description)
|
||||
self.summarize_job_description = summarize_job_description
|
||||
|
||||
def set_job_description(self, description):
|
||||
logger.debug("Setting job description: %s", description)
|
||||
self.description = description
|
||||
|
||||
def set_recruiter_link(self, recruiter_link):
|
||||
logger.debug("Setting recruiter link: %s", recruiter_link)
|
||||
self.recruiter_link = recruiter_link
|
||||
|
||||
def formatted_job_information(self):
|
||||
"""
|
||||
Formats the job information as a markdown string.
|
||||
"""
|
||||
logger.debug("Formatting job information for job: %s at %s", self.title, self.company)
|
||||
job_information = f"""
|
||||
# Job Description
|
||||
## Job Information
|
||||
@ -36,4 +43,6 @@ class Job:
|
||||
## Description
|
||||
{self.description or 'No description provided.'}
|
||||
"""
|
||||
return job_information.strip()
|
||||
formatted_information = job_information.strip()
|
||||
logger.debug("Formatted job information: %s", formatted_information)
|
||||
return formatted_information
|
||||
|
@ -2,6 +2,9 @@ from dataclasses import dataclass
|
||||
from typing import Dict, List
|
||||
import yaml
|
||||
|
||||
from src.utils import logger
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelfIdentification:
|
||||
gender: str
|
||||
@ -47,86 +50,122 @@ class JobApplicationProfile:
|
||||
salary_expectations: SalaryExpectations
|
||||
|
||||
def __init__(self, yaml_str: str):
|
||||
logger.debug("Initializing JobApplicationProfile with provided YAML string")
|
||||
try:
|
||||
data = yaml.safe_load(yaml_str)
|
||||
logger.debug("YAML data successfully parsed: %s", data)
|
||||
except yaml.YAMLError as e:
|
||||
logger.error("Error parsing YAML file: %s", e)
|
||||
raise ValueError("Error parsing YAML file.") from e
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error occurred while parsing the YAML file: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while parsing the YAML file.") from e
|
||||
|
||||
if not isinstance(data, dict):
|
||||
logger.error("YAML data must be a dictionary, received: %s", type(data))
|
||||
raise TypeError("YAML data must be a dictionary.")
|
||||
|
||||
# Process self_identification
|
||||
try:
|
||||
logger.debug("Processing self_identification")
|
||||
self.self_identification = SelfIdentification(**data['self_identification'])
|
||||
logger.debug("self_identification processed: %s", self.self_identification)
|
||||
except KeyError as e:
|
||||
logger.error("Required field %s is missing in self_identification data.", e)
|
||||
raise KeyError(f"Required field {e} is missing in self_identification data.") from e
|
||||
except TypeError as e:
|
||||
logger.error("Error in self_identification data: %s", e)
|
||||
raise TypeError(f"Error in self_identification data: {e}") from e
|
||||
except AttributeError as e:
|
||||
logger.error("Attribute error in self_identification processing: %s", e)
|
||||
raise AttributeError("Attribute error in self_identification processing.") from e
|
||||
except Exception as e:
|
||||
logger.error("An unexpected error occurred while processing self_identification: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while processing self_identification.") from e
|
||||
|
||||
# Process legal_authorization
|
||||
try:
|
||||
logger.debug("Processing legal_authorization")
|
||||
self.legal_authorization = LegalAuthorization(**data['legal_authorization'])
|
||||
logger.debug("legal_authorization processed: %s", self.legal_authorization)
|
||||
except KeyError as e:
|
||||
logger.error("Required field %s is missing in legal_authorization data.", e)
|
||||
raise KeyError(f"Required field {e} is missing in legal_authorization data.") from e
|
||||
except TypeError as e:
|
||||
logger.error("Error in legal_authorization data: %s", e)
|
||||
raise TypeError(f"Error in legal_authorization data: {e}") from e
|
||||
except AttributeError as e:
|
||||
logger.error("Attribute error in legal_authorization processing: %s", e)
|
||||
raise AttributeError("Attribute error in legal_authorization processing.") from e
|
||||
except Exception as e:
|
||||
logger.error("An unexpected error occurred while processing legal_authorization: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while processing legal_authorization.") from e
|
||||
|
||||
# Process work_preferences
|
||||
try:
|
||||
logger.debug("Processing work_preferences")
|
||||
self.work_preferences = WorkPreferences(**data['work_preferences'])
|
||||
logger.debug("work_preferences processed: %s", self.work_preferences)
|
||||
except KeyError as e:
|
||||
logger.error("Required field %s is missing in work_preferences data.", e)
|
||||
raise KeyError(f"Required field {e} is missing in work_preferences data.") from e
|
||||
except TypeError as e:
|
||||
logger.error("Error in work_preferences data: %s", e)
|
||||
raise TypeError(f"Error in work_preferences data: {e}") from e
|
||||
except AttributeError as e:
|
||||
logger.error("Attribute error in work_preferences processing: %s", e)
|
||||
raise AttributeError("Attribute error in work_preferences processing.") from e
|
||||
except Exception as e:
|
||||
logger.error("An unexpected error occurred while processing work_preferences: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while processing work_preferences.") from e
|
||||
|
||||
# Process availability
|
||||
try:
|
||||
logger.debug("Processing availability")
|
||||
self.availability = Availability(**data['availability'])
|
||||
logger.debug("availability processed: %s", self.availability)
|
||||
except KeyError as e:
|
||||
logger.error("Required field %s is missing in availability data.", e)
|
||||
raise KeyError(f"Required field {e} is missing in availability data.") from e
|
||||
except TypeError as e:
|
||||
logger.error("Error in availability data: %s", e)
|
||||
raise TypeError(f"Error in availability data: {e}") from e
|
||||
except AttributeError as e:
|
||||
logger.error("Attribute error in availability processing: %s", e)
|
||||
raise AttributeError("Attribute error in availability processing.") from e
|
||||
except Exception as e:
|
||||
logger.error("An unexpected error occurred while processing availability: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while processing availability.") from e
|
||||
|
||||
# Process salary_expectations
|
||||
try:
|
||||
logger.debug("Processing salary_expectations")
|
||||
self.salary_expectations = SalaryExpectations(**data['salary_expectations'])
|
||||
logger.debug("salary_expectations processed: %s", self.salary_expectations)
|
||||
except KeyError as e:
|
||||
logger.error("Required field %s is missing in salary_expectations data.", e)
|
||||
raise KeyError(f"Required field {e} is missing in salary_expectations data.") from e
|
||||
except TypeError as e:
|
||||
logger.error("Error in salary_expectations data: %s", e)
|
||||
raise TypeError(f"Error in salary_expectations data: {e}") from e
|
||||
except AttributeError as e:
|
||||
logger.error("Attribute error in salary_expectations processing: %s", e)
|
||||
raise AttributeError("Attribute error in salary_expectations processing.") from e
|
||||
except Exception as e:
|
||||
logger.error("An unexpected error occurred while processing salary_expectations: %s", e)
|
||||
raise RuntimeError("An unexpected error occurred while processing salary_expectations.") from e
|
||||
|
||||
# Process additional fields
|
||||
|
||||
|
||||
logger.debug("JobApplicationProfile initialization completed successfully.")
|
||||
|
||||
def __str__(self):
|
||||
logger.debug("Generating string representation of JobApplicationProfile")
|
||||
def format_dataclass(obj):
|
||||
return "\n".join(f"{field.name}: {getattr(obj, field.name)}" for field in obj.__dataclass_fields__.values())
|
||||
|
||||
return (f"Self Identification:\n{format_dataclass(self.self_identification)}\n\n"
|
||||
f"Legal Authorization:\n{format_dataclass(self.legal_authorization)}\n\n"
|
||||
f"Work Preferences:\n{format_dataclass(self.work_preferences)}\n\n"
|
||||
f"Availability: {self.availability.notice_period}\n\n"
|
||||
f"Salary Expectations: {self.salary_expectations.salary_range_usd}\n\n")
|
||||
formatted_str = (f"Self Identification:\n{format_dataclass(self.self_identification)}\n\n"
|
||||
f"Legal Authorization:\n{format_dataclass(self.legal_authorization)}\n\n"
|
||||
f"Work Preferences:\n{format_dataclass(self.work_preferences)}\n\n"
|
||||
f"Availability: {self.availability.notice_period}\n\n"
|
||||
f"Salary Expectations: {self.salary_expectations.salary_range_usd}\n\n")
|
||||
logger.debug("String representation generated: %s", formatted_str)
|
||||
return formatted_str
|
||||
|
@ -1,77 +1,142 @@
|
||||
import random
|
||||
import time
|
||||
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from src.utils import logger
|
||||
|
||||
|
||||
class LinkedInAuthenticator:
|
||||
|
||||
def __init__(self, driver=None):
|
||||
self.driver = driver
|
||||
self.email = ""
|
||||
self.password = ""
|
||||
logger.debug("LinkedInAuthenticator initialized with driver: %s", driver)
|
||||
|
||||
def set_secrets(self, email, password):
|
||||
self.email = email
|
||||
self.password = password
|
||||
logger.debug("Secrets set with email: %s", email)
|
||||
|
||||
def start(self):
|
||||
print("Starting Chrome browser to log in to LinkedIn.")
|
||||
self.driver.get('https://www.linkedin.com')
|
||||
logger.info("Starting Chrome browser to log in to LinkedIn.")
|
||||
self.driver.get('https://www.linkedin.com/feed')
|
||||
self.wait_for_page_load()
|
||||
if not self.is_logged_in():
|
||||
self.handle_login()
|
||||
|
||||
def handle_login(self):
|
||||
print("Navigating to the LinkedIn login page...")
|
||||
logger.info("Navigating to the LinkedIn login page...")
|
||||
self.driver.get("https://www.linkedin.com/login")
|
||||
try:
|
||||
self.enter_credentials()
|
||||
self.submit_login_form()
|
||||
except NoSuchElementException:
|
||||
print("Could not log in to LinkedIn. Please check your credentials.")
|
||||
time.sleep(35) #TODO fix better
|
||||
except NoSuchElementException as e:
|
||||
logger.error("Could not log in to LinkedIn. Element not found: %s", e)
|
||||
time.sleep(random.uniform(3, 5))
|
||||
self.handle_security_check()
|
||||
|
||||
def enter_credentials(self):
|
||||
try:
|
||||
logger.debug("Entering credentials...")
|
||||
email_field = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.ID, "username"))
|
||||
)
|
||||
email_field.send_keys(self.email)
|
||||
logger.debug("Email entered: %s", self.email)
|
||||
password_field = self.driver.find_element(By.ID, "password")
|
||||
password_field.send_keys(self.password)
|
||||
logger.debug("Password entered.")
|
||||
except TimeoutException:
|
||||
logger.error("Login form not found. Aborting login.")
|
||||
print("Login form not found. Aborting login.")
|
||||
|
||||
def submit_login_form(self):
|
||||
try:
|
||||
logger.debug("Submitting login form...")
|
||||
login_button = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
|
||||
login_button.click()
|
||||
logger.debug("Login form submitted.")
|
||||
except NoSuchElementException:
|
||||
logger.error("Login button not found. Please verify the page structure.")
|
||||
print("Login button not found. Please verify the page structure.")
|
||||
|
||||
def handle_security_check(self):
|
||||
try:
|
||||
logger.debug("Handling security check...")
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.url_contains('https://www.linkedin.com/checkpoint/challengesV2/')
|
||||
)
|
||||
logger.warning("Security checkpoint detected. Please complete the challenge.")
|
||||
print("Security checkpoint detected. Please complete the challenge.")
|
||||
WebDriverWait(self.driver, 300).until(
|
||||
EC.url_contains('https://www.linkedin.com/feed/')
|
||||
)
|
||||
logger.info("Security check completed")
|
||||
print("Security check completed")
|
||||
except TimeoutException:
|
||||
logger.error("Security check not completed within the timeout.")
|
||||
print("Security check not completed. Please try again later.")
|
||||
|
||||
def is_logged_in(self):
|
||||
self.driver.get('https://www.linkedin.com/')
|
||||
return self.driver.current_url == 'https://www.linkedin.com/feed/'
|
||||
target_url = 'https://www.linkedin.com/feed'
|
||||
|
||||
# Navigate to the target URL if not already there
|
||||
if self.driver.current_url != target_url:
|
||||
logger.debug("Navigating to target URL: %s", target_url)
|
||||
self.driver.get(target_url)
|
||||
|
||||
try:
|
||||
# Increase the wait time for the page elements to load
|
||||
logger.debug("Checking if user is logged in...")
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, 'share-box-feed-entry__trigger'))
|
||||
)
|
||||
|
||||
# Check for the presence of the "Start a post" button
|
||||
buttons = self.driver.find_elements(By.CLASS_NAME, 'share-box-feed-entry__trigger')
|
||||
if any(button.text.strip() == 'Start a post' for button in buttons):
|
||||
logger.info("User is already logged in.")
|
||||
|
||||
try:
|
||||
# Wait for the profile picture and name to load
|
||||
profile_img = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//img[contains(@alt, 'Photo of')]"))
|
||||
)
|
||||
profile_name = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//div[@class='t-16 t-black t-bold']"))
|
||||
)
|
||||
|
||||
if profile_img and profile_name:
|
||||
logger.info("Profile picture found for user: %s", profile_name.text)
|
||||
return True
|
||||
except NoSuchElementException:
|
||||
logger.warning("Profile picture or name not found.")
|
||||
print("Profile picture or name not found.")
|
||||
return False
|
||||
except TimeoutException:
|
||||
logger.warning("Profile picture or name took too long to load.")
|
||||
print("Profile picture or name took too long to load.")
|
||||
return False
|
||||
|
||||
except TimeoutException:
|
||||
logger.error("Page elements took too long to load or were not found.")
|
||||
print("Page elements took too long to load or were not found.")
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def wait_for_page_load(self, timeout=10):
|
||||
try:
|
||||
logger.debug("Waiting for page to load with timeout: %s seconds", timeout)
|
||||
WebDriverWait(self.driver, timeout).until(
|
||||
lambda d: d.execute_script('return document.readyState') == 'complete'
|
||||
)
|
||||
logger.debug("Page load completed.")
|
||||
except TimeoutException:
|
||||
logger.error("Page load timed out.")
|
||||
print("Page load timed out.")
|
||||
|
@ -1,8 +1,13 @@
|
||||
from src.utils import logger
|
||||
|
||||
|
||||
class LinkedInBotState:
|
||||
def __init__(self):
|
||||
logger.debug("Initializing LinkedInBotState")
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
logger.debug("Resetting LinkedInBotState")
|
||||
self.credentials_set = False
|
||||
self.api_key_set = False
|
||||
self.job_application_profile_set = False
|
||||
@ -11,12 +16,16 @@ class LinkedInBotState:
|
||||
self.logged_in = False
|
||||
|
||||
def validate_state(self, required_keys):
|
||||
logger.debug("Validating LinkedInBotState with required keys: %s", required_keys)
|
||||
for key in required_keys:
|
||||
if not getattr(self, key):
|
||||
logger.error("State validation failed: %s is not set", key)
|
||||
raise ValueError(f"{key.replace('_', ' ').capitalize()} must be set before proceeding.")
|
||||
logger.debug("State validation passed")
|
||||
|
||||
class LinkedInBotFacade:
|
||||
def __init__(self, login_component, apply_component):
|
||||
logger.debug("Initializing LinkedInBotFacade")
|
||||
self.login_component = login_component
|
||||
self.apply_component = apply_component
|
||||
self.state = LinkedInBotState()
|
||||
@ -27,47 +36,65 @@ class LinkedInBotFacade:
|
||||
self.parameters = None
|
||||
|
||||
def set_job_application_profile_and_resume(self, job_application_profile, resume):
|
||||
logger.debug("Setting job application profile and resume")
|
||||
self._validate_non_empty(job_application_profile, "Job application profile")
|
||||
self._validate_non_empty(resume, "Resume")
|
||||
self.job_application_profile = job_application_profile
|
||||
self.resume = resume
|
||||
self.state.job_application_profile_set = True
|
||||
logger.debug("Job application profile and resume set successfully")
|
||||
|
||||
def set_secrets(self, email, password):
|
||||
logger.debug("Setting secrets: email and password")
|
||||
self._validate_non_empty(email, "Email")
|
||||
self._validate_non_empty(password, "Password")
|
||||
self.email = email
|
||||
self.password = password
|
||||
self.state.credentials_set = True
|
||||
logger.debug("Secrets set successfully")
|
||||
|
||||
def set_gpt_answerer_and_resume_generator(self, gpt_answerer_component, resume_generator_manager):
|
||||
logger.debug("Setting GPT answerer and resume generator")
|
||||
self._ensure_job_profile_and_resume_set()
|
||||
gpt_answerer_component.set_job_application_profile(self.job_application_profile)
|
||||
gpt_answerer_component.set_resume(self.resume)
|
||||
self.apply_component.set_gpt_answerer(gpt_answerer_component)
|
||||
self.apply_component.set_resume_generator_manager(resume_generator_manager)
|
||||
self.state.gpt_answerer_set = True
|
||||
logger.debug("GPT answerer and resume generator set successfully")
|
||||
|
||||
def set_parameters(self, parameters):
|
||||
logger.debug("Setting parameters")
|
||||
self._validate_non_empty(parameters, "Parameters")
|
||||
self.parameters = parameters
|
||||
self.apply_component.set_parameters(parameters)
|
||||
self.state.parameters_set = True
|
||||
logger.debug("Parameters set successfully")
|
||||
|
||||
def start_login(self):
|
||||
logger.debug("Starting login process")
|
||||
self.state.validate_state(['credentials_set'])
|
||||
self.login_component.set_secrets(self.email, self.password)
|
||||
self.login_component.start()
|
||||
self.state.logged_in = True
|
||||
logger.debug("Login process completed successfully")
|
||||
|
||||
def start_apply(self):
|
||||
logger.debug("Starting apply process")
|
||||
self.state.validate_state(['logged_in', 'job_application_profile_set', 'gpt_answerer_set', 'parameters_set'])
|
||||
self.apply_component.start_applying()
|
||||
logger.debug("Apply process started successfully")
|
||||
|
||||
def _validate_non_empty(self, value, name):
|
||||
logger.debug("Validating that %s is not empty", name)
|
||||
if not value:
|
||||
logger.error("Validation failed: %s is empty", name)
|
||||
raise ValueError(f"{name} cannot be empty.")
|
||||
logger.debug("Validation passed for %s", name)
|
||||
|
||||
def _ensure_job_profile_and_resume_set(self):
|
||||
logger.debug("Ensuring job profile and resume are set")
|
||||
if not self.state.job_application_profile_set:
|
||||
logger.error("Job application profile and resume are not set")
|
||||
raise ValueError("Job application profile and resume must be set before proceeding.")
|
||||
logger.debug("Job profile and resume are set")
|
||||
|
@ -10,7 +10,7 @@ from datetime import date
|
||||
from typing import List, Optional, Any, Tuple
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.pdfgen import canvas
|
||||
from selenium.common.exceptions import NoSuchElementException
|
||||
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
@ -18,9 +18,10 @@ from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.ui import Select, WebDriverWait
|
||||
from selenium.webdriver import ActionChains
|
||||
import src.utils as utils
|
||||
|
||||
from src.utils import logger
|
||||
class LinkedInEasyApplier:
|
||||
def __init__(self, driver: Any, resume_dir: Optional[str], set_old_answers: List[Tuple[str, str, str]], gpt_answerer: Any, resume_generator_manager):
|
||||
logger.debug("Initializing LinkedInEasyApplier")
|
||||
if resume_dir is None or not os.path.exists(resume_dir):
|
||||
resume_dir = None
|
||||
self.driver = driver
|
||||
@ -29,28 +30,33 @@ class LinkedInEasyApplier:
|
||||
self.gpt_answerer = gpt_answerer
|
||||
self.resume_generator_manager = resume_generator_manager
|
||||
self.all_data = self._load_questions_from_json()
|
||||
|
||||
logger.debug("LinkedInEasyApplier initialized successfully")
|
||||
|
||||
def _load_questions_from_json(self) -> List[dict]:
|
||||
output_file = 'answers.json'
|
||||
logger.debug("Loading questions from JSON file: %s", output_file)
|
||||
try:
|
||||
try:
|
||||
with open(output_file, 'r') as f:
|
||||
try:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError("JSON file format is incorrect. Expected a list of questions.")
|
||||
except json.JSONDecodeError:
|
||||
data = []
|
||||
except FileNotFoundError:
|
||||
data = []
|
||||
with open(output_file, 'r') as f:
|
||||
try:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError("JSON file format is incorrect. Expected a list of questions.")
|
||||
except json.JSONDecodeError:
|
||||
logger.error("JSON decoding failed")
|
||||
data = []
|
||||
logger.debug("Questions loaded successfully from JSON")
|
||||
return data
|
||||
except FileNotFoundError:
|
||||
logger.warning("JSON file not found, returning empty list")
|
||||
return []
|
||||
except Exception:
|
||||
tb_str = traceback.format_exc()
|
||||
logger.error("Error loading questions data from JSON file: %s", tb_str)
|
||||
raise Exception(f"Error loading questions data from JSON file: \nTraceback:\n{tb_str}")
|
||||
|
||||
|
||||
def job_apply(self, job: Any):
|
||||
logger.debug("Starting job application for job: %s", job)
|
||||
self.driver.get(job.link)
|
||||
time.sleep(random.uniform(3, 5))
|
||||
try:
|
||||
@ -61,79 +67,103 @@ class LinkedInEasyApplier:
|
||||
actions.move_to_element(easy_apply_button).click().perform()
|
||||
self.gpt_answerer.set_job(job)
|
||||
self._fill_application_form(job)
|
||||
logger.debug("Job application process completed for job: %s", job)
|
||||
except Exception:
|
||||
tb_str = traceback.format_exc()
|
||||
logger.error("Failed to apply to job: %s", tb_str)
|
||||
self._discard_application()
|
||||
raise Exception(f"Failed to apply to job! Original exception: \nTraceback:\n{tb_str}")
|
||||
|
||||
def _find_easy_apply_button(self) -> WebElement:
|
||||
logger.debug("Searching for 'Easy Apply' button")
|
||||
attempt = 0
|
||||
while attempt < 2:
|
||||
self._scroll_page()
|
||||
buttons = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_all_elements_located(
|
||||
(By.XPATH, '//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")]')
|
||||
)
|
||||
)
|
||||
for index, _ in enumerate(buttons):
|
||||
try:
|
||||
button = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable(
|
||||
(By.XPATH, f'(//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")])[{index + 1}]')
|
||||
)
|
||||
try:
|
||||
buttons = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_all_elements_located(
|
||||
(By.XPATH, '//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")]')
|
||||
)
|
||||
return button
|
||||
except Exception as e:
|
||||
pass
|
||||
)
|
||||
for index, _ in enumerate(buttons):
|
||||
try:
|
||||
button = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable(
|
||||
(By.XPATH, f'(//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")])[{index + 1}]')
|
||||
)
|
||||
)
|
||||
logger.debug("Found and clicking 'Easy Apply' button")
|
||||
return button
|
||||
except Exception as e:
|
||||
logger.warning("Failed to click 'Easy Apply' button on attempt %d: %s", attempt + 1, e)
|
||||
except TimeoutException:
|
||||
logger.warning("Timeout while searching for 'Easy Apply' button")
|
||||
|
||||
if attempt == 0:
|
||||
logger.debug("Refreshing page to retry finding 'Easy Apply' button")
|
||||
self.driver.refresh()
|
||||
time.sleep(3)
|
||||
time.sleep(random.randint(3, 5))
|
||||
attempt += 1
|
||||
logger.error("No clickable 'Easy Apply' button found after 2 attempts")
|
||||
raise Exception("No clickable 'Easy Apply' button found")
|
||||
|
||||
|
||||
def _get_job_description(self) -> str:
|
||||
logger.debug("Getting job description")
|
||||
try:
|
||||
see_more_button = self.driver.find_element(By.XPATH, '//button[@aria-label="Click to see more description"]')
|
||||
actions = ActionChains(self.driver)
|
||||
actions.move_to_element(see_more_button).click().perform()
|
||||
time.sleep(2)
|
||||
try:
|
||||
see_more_button = self.driver.find_element(By.XPATH, '//button[@aria-label="Click to see more description"]')
|
||||
actions = ActionChains(self.driver)
|
||||
actions.move_to_element(see_more_button).click().perform()
|
||||
time.sleep(2)
|
||||
except NoSuchElementException:
|
||||
logger.debug("See more button not found, skipping")
|
||||
|
||||
description = self.driver.find_element(By.CLASS_NAME, 'jobs-description-content__text').text
|
||||
logger.debug("Job description retrieved successfully")
|
||||
return description
|
||||
except NoSuchElementException:
|
||||
tb_str = traceback.format_exc()
|
||||
raise Exception("Job description 'See more' button not found: \nTraceback:\n{tb_str}")
|
||||
logger.error("Job description not found: %s", tb_str)
|
||||
raise Exception(f"Job description not found: \nTraceback:\n{tb_str}")
|
||||
except Exception:
|
||||
tb_str = traceback.format_exc()
|
||||
logger.error("Error getting Job description: %s", tb_str)
|
||||
raise Exception(f"Error getting Job description: \nTraceback:\n{tb_str}")
|
||||
|
||||
|
||||
def _get_job_recruiter(self):
|
||||
logger.debug("Getting job recruiter information")
|
||||
try:
|
||||
hiring_team_section = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH, '//h2[text()="Meet the hiring team"]'))
|
||||
)
|
||||
recruiter_element = hiring_team_section.find_element(By.XPATH, './/following::a[contains(@href, "linkedin.com/in/")]')
|
||||
recruiter_link = recruiter_element.get_attribute('href')
|
||||
logger.debug("Job recruiter link retrieved successfully")
|
||||
return recruiter_link
|
||||
except Exception as e:
|
||||
logger.warning("Failed to retrieve recruiter information: %s", e)
|
||||
return ""
|
||||
|
||||
def _scroll_page(self) -> None:
|
||||
logger.debug("Scrolling the page")
|
||||
scrollable_element = self.driver.find_element(By.TAG_NAME, 'html')
|
||||
utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=False)
|
||||
utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=True)
|
||||
|
||||
def _fill_application_form(self, job):
|
||||
logger.debug("Filling out application form for job: %s", job)
|
||||
while True:
|
||||
self.fill_up(job)
|
||||
if self._next_or_submit():
|
||||
logger.debug("Application form submitted")
|
||||
break
|
||||
|
||||
def _next_or_submit(self):
|
||||
logger.debug("Clicking 'Next' or 'Submit' button")
|
||||
next_button = self.driver.find_element(By.CLASS_NAME, "artdeco-button--primary")
|
||||
button_text = next_button.text.lower()
|
||||
if 'submit application' in button_text:
|
||||
logger.debug("Submit button found, submitting application")
|
||||
self._unfollow_company()
|
||||
time.sleep(random.uniform(1.5, 2.5))
|
||||
next_button.click()
|
||||
@ -146,70 +176,88 @@ class LinkedInEasyApplier:
|
||||
|
||||
def _unfollow_company(self) -> None:
|
||||
try:
|
||||
logger.debug("Unfollowing company")
|
||||
follow_checkbox = self.driver.find_element(
|
||||
By.XPATH, "//label[contains(.,'to stay up to date with their page.')]")
|
||||
follow_checkbox.click()
|
||||
except Exception as e:
|
||||
pass
|
||||
logger.warning("Failed to unfollow company: %s", e)
|
||||
|
||||
def _check_for_errors(self) -> None:
|
||||
logger.debug("Checking for form errors")
|
||||
error_elements = self.driver.find_elements(By.CLASS_NAME, 'artdeco-inline-feedback--error')
|
||||
if error_elements:
|
||||
logger.error("Form submission failed with errors: %s", [e.text for e in error_elements])
|
||||
raise Exception(f"Failed answering or file upload. {str([e.text for e in error_elements])}")
|
||||
|
||||
def _discard_application(self) -> None:
|
||||
logger.debug("Discarding application")
|
||||
try:
|
||||
self.driver.find_element(By.CLASS_NAME, 'artdeco-modal__dismiss').click()
|
||||
time.sleep(random.uniform(3, 5))
|
||||
self.driver.find_elements(By.CLASS_NAME, 'artdeco-modal__confirm-dialog-btn')[0].click()
|
||||
time.sleep(random.uniform(3, 5))
|
||||
except Exception as e:
|
||||
pass
|
||||
logger.warning("Failed to discard application: %s", e)
|
||||
|
||||
def fill_up(self, job) -> None:
|
||||
logger.debug("Filling up form sections for job: %s", job)
|
||||
easy_apply_content = self.driver.find_element(By.CLASS_NAME, 'jobs-easy-apply-content')
|
||||
pb4_elements = easy_apply_content.find_elements(By.CLASS_NAME, 'pb4')
|
||||
for element in pb4_elements:
|
||||
self._process_form_element(element, job)
|
||||
|
||||
def _process_form_element(self, element: WebElement, job) -> None:
|
||||
logger.debug("Processing form element")
|
||||
if self._is_upload_field(element):
|
||||
self._handle_upload_fields(element, job)
|
||||
else:
|
||||
self._fill_additional_questions()
|
||||
|
||||
def _is_upload_field(self, element: WebElement) -> bool:
|
||||
return bool(element.find_elements(By.XPATH, ".//input[@type='file']"))
|
||||
is_upload = bool(element.find_elements(By.XPATH, ".//input[@type='file']"))
|
||||
logger.debug("Element is upload field: %s", is_upload)
|
||||
return is_upload
|
||||
|
||||
def _handle_upload_fields(self, element: WebElement, job) -> None:
|
||||
logger.debug("Handling upload fields")
|
||||
file_upload_elements = self.driver.find_elements(By.XPATH, "//input[@type='file']")
|
||||
for element in file_upload_elements:
|
||||
parent = element.find_element(By.XPATH, "..")
|
||||
self.driver.execute_script("arguments[0].classList.remove('hidden')", element)
|
||||
output = self.gpt_answerer.resume_or_cover(parent.text.lower())
|
||||
if 'resume' in output:
|
||||
logger.debug("Uploading resume")
|
||||
if self.resume_path is not None and self.resume_path.resolve().is_file():
|
||||
element.send_keys(str(self.resume_path.resolve()))
|
||||
else:
|
||||
self._create_and_upload_resume(element, job)
|
||||
elif 'cover' in output:
|
||||
logger.debug("Uploading cover letter")
|
||||
self._create_and_upload_cover_letter(element)
|
||||
|
||||
def _create_and_upload_resume(self, element, job):
|
||||
logger.debug("Creating and uploading resume")
|
||||
folder_path = 'generated_cv'
|
||||
os.makedirs(folder_path, exist_ok=True)
|
||||
try:
|
||||
file_path_pdf = os.path.join(folder_path, f"CV_{random.randint(0, 9999)}.pdf")
|
||||
with open(file_path_pdf, "xb") as f:
|
||||
timestamp = int(time.time())
|
||||
file_path_pdf = os.path.join(folder_path, f"CV_{timestamp}.pdf")
|
||||
|
||||
with open(file_path_pdf, "xb") as f: # gjcvjn
|
||||
f.write(base64.b64decode(self.resume_generator_manager.pdf_base64(job_description_text=job.description)))
|
||||
|
||||
element.send_keys(os.path.abspath(file_path_pdf))
|
||||
job.pdf_path = os.path.abspath(file_path_pdf)
|
||||
time.sleep(2)
|
||||
logger.debug("Resume created and uploaded successfully: %s", file_path_pdf)
|
||||
except Exception:
|
||||
tb_str = traceback.format_exc()
|
||||
logger.error("Resume upload failed: %s", tb_str)
|
||||
raise Exception(f"Upload failed: \nTraceback:\n{tb_str}")
|
||||
|
||||
def _create_and_upload_cover_letter(self, element: WebElement) -> None:
|
||||
logger.debug("Creating and uploading cover letter")
|
||||
cover_letter = self.gpt_answerer.answer_question_textual_wide_range("Write a cover letter")
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf_file:
|
||||
letter_path = temp_pdf_file.name
|
||||
@ -221,29 +269,38 @@ class LinkedInEasyApplier:
|
||||
c.drawText(text_object)
|
||||
c.save()
|
||||
element.send_keys(letter_path)
|
||||
logger.debug("Cover letter created and uploaded successfully: %s", letter_path)
|
||||
|
||||
def _fill_additional_questions(self) -> None:
|
||||
logger.debug("Filling additional questions")
|
||||
form_sections = self.driver.find_elements(By.CLASS_NAME, 'jobs-easy-apply-form-section__grouping')
|
||||
for section in form_sections:
|
||||
self._process_form_section(section)
|
||||
|
||||
|
||||
def _process_form_section(self, section: WebElement) -> None:
|
||||
logger.debug("Processing form section")
|
||||
if self._handle_terms_of_service(section):
|
||||
logger.debug("Handled terms of service")
|
||||
return
|
||||
if self._find_and_handle_radio_question(section):
|
||||
logger.debug("Handled radio question")
|
||||
return
|
||||
if self._find_and_handle_textbox_question(section):
|
||||
logger.debug("Handled textbox question")
|
||||
return
|
||||
if self._find_and_handle_date_question(section):
|
||||
logger.debug("Handled date question")
|
||||
return
|
||||
if self._find_and_handle_dropdown_question(section):
|
||||
logger.debug("Handled dropdown question")
|
||||
return
|
||||
|
||||
def _handle_terms_of_service(self, element: WebElement) -> bool:
|
||||
checkbox = element.find_elements(By.TAG_NAME, 'label')
|
||||
if checkbox and any(term in checkbox[0].text.lower() for term in ['terms of service', 'privacy policy', 'terms of use']):
|
||||
checkbox[0].click()
|
||||
logger.debug("Clicked terms of service checkbox")
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -261,11 +318,13 @@ class LinkedInEasyApplier:
|
||||
break
|
||||
if existing_answer:
|
||||
self._select_radio(radios, existing_answer['answer'])
|
||||
logger.debug("Selected existing radio answer")
|
||||
return True
|
||||
|
||||
answer = self.gpt_answerer.answer_question_from_options(question_text, options)
|
||||
self._save_questions_to_json({'type': 'radio', 'question': question_text, 'answer': answer})
|
||||
self._select_radio(radios, answer)
|
||||
logger.debug("Selected new radio answer")
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -288,9 +347,11 @@ class LinkedInEasyApplier:
|
||||
break
|
||||
if existing_answer:
|
||||
self._enter_text(text_field, existing_answer['answer'])
|
||||
logger.debug("Entered existing textbox answer")
|
||||
return True
|
||||
self._save_questions_to_json({'type': question_type, 'question': question_text, 'answer': answer})
|
||||
self._enter_text(text_field, answer)
|
||||
logger.debug("Entered new textbox answer")
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -305,15 +366,17 @@ class LinkedInEasyApplier:
|
||||
|
||||
existing_answer = None
|
||||
for item in self.all_data:
|
||||
if self._sanitize_text(question_text) in item['question'] and item['type'] == 'date':
|
||||
if self._sanitize_text(question_text) in item['question'] and item['type'] == 'date':
|
||||
existing_answer = item
|
||||
break
|
||||
if existing_answer:
|
||||
self._enter_text(date_field, existing_answer['answer'])
|
||||
logger.debug("Entered existing date answer")
|
||||
return True
|
||||
|
||||
self._save_questions_to_json({'type': 'date', 'question': question_text, 'answer': answer_text})
|
||||
self._enter_text(date_field, answer_text)
|
||||
logger.debug("Entered new date answer")
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -328,32 +391,36 @@ class LinkedInEasyApplier:
|
||||
|
||||
existing_answer = None
|
||||
for item in self.all_data:
|
||||
if self._sanitize_text(question_text) in item['question'] and item['type'] == 'dropdown':
|
||||
if self._sanitize_text(question_text) in item['question'] and item['type'] == 'dropdown':
|
||||
existing_answer = item
|
||||
break
|
||||
if existing_answer:
|
||||
self._select_dropdown_option(dropdown, existing_answer['answer'])
|
||||
logger.debug("Selected existing dropdown answer")
|
||||
return True
|
||||
|
||||
answer = self.gpt_answerer.answer_question_from_options(question_text, options)
|
||||
self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': answer})
|
||||
self._select_dropdown_option(dropdown, answer)
|
||||
logger.debug("Selected new dropdown answer")
|
||||
return True
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.warning("Failed to handle dropdown question: %s", e)
|
||||
return False
|
||||
|
||||
def _is_numeric_field(self, field: WebElement) -> bool:
|
||||
field_type = field.get_attribute('type').lower()
|
||||
if 'numeric' in field_type:
|
||||
return True
|
||||
class_attribute = field.get_attribute("id")
|
||||
return class_attribute and 'numeric' in class_attribute
|
||||
is_numeric = 'numeric' in field_type or ('id' in field.get_attribute("id") and 'numeric' in field.get_attribute("id"))
|
||||
logger.debug("Field is numeric: %s", is_numeric)
|
||||
return is_numeric
|
||||
|
||||
def _enter_text(self, element: WebElement, text: str) -> None:
|
||||
logger.debug("Entering text: %s", text)
|
||||
element.clear()
|
||||
element.send_keys(text)
|
||||
|
||||
def _select_radio(self, radios: List[WebElement], answer: str) -> None:
|
||||
logger.debug("Selecting radio option: %s", answer)
|
||||
for radio in radios:
|
||||
if answer in radio.text.lower():
|
||||
radio.find_element(By.TAG_NAME, 'label').click()
|
||||
@ -361,12 +428,14 @@ class LinkedInEasyApplier:
|
||||
radios[-1].find_element(By.TAG_NAME, 'label').click()
|
||||
|
||||
def _select_dropdown_option(self, element: WebElement, text: str) -> None:
|
||||
logger.debug("Selecting dropdown option: %s", text)
|
||||
select = Select(element)
|
||||
select.select_by_visible_text(text)
|
||||
|
||||
def _save_questions_to_json(self, question_data: dict) -> None:
|
||||
output_file = 'answers.json'
|
||||
question_data['question'] = self._sanitize_text(question_data['question'])
|
||||
logger.debug("Saving question data to JSON: %s", question_data)
|
||||
try:
|
||||
try:
|
||||
with open(output_file, 'r') as f:
|
||||
@ -375,23 +444,23 @@ class LinkedInEasyApplier:
|
||||
if not isinstance(data, list):
|
||||
raise ValueError("JSON file format is incorrect. Expected a list of questions.")
|
||||
except json.JSONDecodeError:
|
||||
logger.error("JSON decoding failed")
|
||||
data = []
|
||||
except FileNotFoundError:
|
||||
logger.warning("JSON file not found, creating new file")
|
||||
data = []
|
||||
data.append(question_data)
|
||||
with open(output_file, 'w') as f:
|
||||
json.dump(data, f, indent=4)
|
||||
logger.debug("Question data saved successfully to JSON")
|
||||
except Exception:
|
||||
tb_str = traceback.format_exc()
|
||||
logger.error("Error saving questions data to JSON file: %s", tb_str)
|
||||
raise Exception(f"Error saving questions data to JSON file: \nTraceback:\n{tb_str}")
|
||||
|
||||
|
||||
def _sanitize_text(self, text: str) -> str:
|
||||
sanitized_text = text.lower()
|
||||
sanitized_text = sanitized_text.strip()
|
||||
sanitized_text = sanitized_text.replace('"', '')
|
||||
sanitized_text = sanitized_text.replace('\\', '')
|
||||
sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text)
|
||||
sanitized_text = sanitized_text.replace('\n', ' ').replace('\r', '')
|
||||
sanitized_text = sanitized_text.rstrip(',')
|
||||
sanitized_text = text.lower().strip().replace('"', '').replace('\\', '')
|
||||
sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text).replace('\n', ' ').replace('\r', '').rstrip(',')
|
||||
logger.debug("Sanitized text: %s", sanitized_text)
|
||||
return sanitized_text
|
||||
|
@ -10,28 +10,39 @@ import src.utils as utils
|
||||
from src.job import Job
|
||||
from src.linkedIn_easy_applier import LinkedInEasyApplier
|
||||
import json
|
||||
from src.utils import logger
|
||||
|
||||
|
||||
class EnvironmentKeys:
|
||||
def __init__(self):
|
||||
logger.debug("Initializing EnvironmentKeys")
|
||||
self.skip_apply = self._read_env_key_bool("SKIP_APPLY")
|
||||
self.disable_description_filter = self._read_env_key_bool("DISABLE_DESCRIPTION_FILTER")
|
||||
logger.debug("EnvironmentKeys initialized: skip_apply=%s, disable_description_filter=%s",
|
||||
self.skip_apply, self.disable_description_filter)
|
||||
|
||||
@staticmethod
|
||||
def _read_env_key(key: str) -> str:
|
||||
return os.getenv(key, "")
|
||||
value = os.getenv(key, "")
|
||||
logger.debug("Read environment key %s: %s", key, value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _read_env_key_bool(key: str) -> bool:
|
||||
return os.getenv(key) == "True"
|
||||
value = os.getenv(key) == "True"
|
||||
logger.debug("Read environment key %s as bool: %s", key, value)
|
||||
return value
|
||||
|
||||
class LinkedInJobManager:
|
||||
def __init__(self, driver):
|
||||
logger.debug("Initializing LinkedInJobManager")
|
||||
self.driver = driver
|
||||
self.set_old_answers = set()
|
||||
self.easy_applier_component = None
|
||||
logger.debug("LinkedInJobManager initialized successfully")
|
||||
|
||||
def set_parameters(self, parameters):
|
||||
logger.debug("Setting parameters for LinkedInJobManager")
|
||||
self.company_blacklist = parameters.get('companyBlacklist', []) or []
|
||||
self.title_blacklist = parameters.get('titleBlacklist', []) or []
|
||||
self.positions = parameters.get('positions', [])
|
||||
@ -39,33 +50,21 @@ class LinkedInJobManager:
|
||||
self.base_search_url = self.get_base_search_url(parameters)
|
||||
self.seen_jobs = []
|
||||
resume_path = parameters.get('uploads', {}).get('resume', None)
|
||||
if resume_path is not None and Path(resume_path).exists():
|
||||
self.resume_path = Path(resume_path)
|
||||
else:
|
||||
self.resume_path = None
|
||||
self.resume_path = Path(resume_path) if resume_path and Path(resume_path).exists() else None
|
||||
self.output_file_directory = Path(parameters['outputFileDirectory'])
|
||||
self.env_config = EnvironmentKeys()
|
||||
#self.old_question()
|
||||
logger.debug("Parameters set successfully")
|
||||
|
||||
def set_gpt_answerer(self, gpt_answerer):
|
||||
logger.debug("Setting GPT answerer")
|
||||
self.gpt_answerer = gpt_answerer
|
||||
|
||||
def set_resume_generator_manager(self, resume_generator_manager):
|
||||
logger.debug("Setting resume generator manager")
|
||||
self.resume_generator_manager = resume_generator_manager
|
||||
|
||||
""" def old_question(self):
|
||||
self.set_old_answers = {}
|
||||
file_path = 'data_folder/output/old_Questions.csv'
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, 'r', newline='', encoding='utf-8', errors='ignore') as file:
|
||||
csv_reader = csv.reader(file, delimiter=',', quotechar='"')
|
||||
for row in csv_reader:
|
||||
if len(row) == 3:
|
||||
answer_type, question_text, answer = row
|
||||
self.set_old_answers[(answer_type.lower(), question_text.lower())] = answer"""
|
||||
|
||||
|
||||
def start_applying(self):
|
||||
logger.debug("Starting job application process")
|
||||
self.easy_applier_component = LinkedInEasyApplier(self.driver, self.resume_path, self.set_old_answers, self.gpt_answerer, self.resume_generator_manager)
|
||||
searches = list(product(self.positions, self.locations))
|
||||
random.shuffle(searches)
|
||||
@ -86,30 +85,40 @@ class LinkedInJobManager:
|
||||
self.next_job_page(position, location_url, job_page_number)
|
||||
time.sleep(random.uniform(1.5, 3.5))
|
||||
utils.printyellow("Starting the application process for this page...")
|
||||
self.apply_jobs()
|
||||
try:
|
||||
self.apply_jobs()
|
||||
except Exception as e:
|
||||
logger.error("Error during job application: %s", e)
|
||||
utils.printred(f"Error during job application: {e}")
|
||||
continue
|
||||
utils.printyellow("Applying to jobs on this page has been completed!")
|
||||
|
||||
time_left = minimum_page_time - time.time()
|
||||
if time_left > 0:
|
||||
utils.printyellow(f"Sleeping for {time_left} seconds.")
|
||||
logger.debug("Sleeping for %d seconds", time_left)
|
||||
time.sleep(time_left)
|
||||
minimum_page_time = time.time() + minimum_time
|
||||
if page_sleep % 5 == 0:
|
||||
sleep_time = random.randint(5, 34)
|
||||
utils.printyellow(f"Sleeping for {sleep_time / 60} minutes.")
|
||||
logger.debug("Sleeping for %d seconds", sleep_time)
|
||||
time.sleep(sleep_time)
|
||||
page_sleep += 1
|
||||
except Exception:
|
||||
traceback.format_exc()
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error during job search: %s", e)
|
||||
utils.printred(f"Unexpected error: {e}")
|
||||
continue
|
||||
time_left = minimum_page_time - time.time()
|
||||
if time_left > 0:
|
||||
utils.printyellow(f"Sleeping for {time_left} seconds.")
|
||||
logger.debug("Sleeping for %d seconds", time_left)
|
||||
time.sleep(time_left)
|
||||
minimum_page_time = time.time() + minimum_time
|
||||
if page_sleep % 5 == 0:
|
||||
sleep_time = random.randint(50, 90)
|
||||
utils.printyellow(f"Sleeping for {sleep_time / 60} minutes.")
|
||||
logger.debug("Sleeping for %d seconds", sleep_time)
|
||||
time.sleep(sleep_time)
|
||||
page_sleep += 1
|
||||
|
||||
@ -117,32 +126,40 @@ class LinkedInJobManager:
|
||||
try:
|
||||
no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand')
|
||||
if 'No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower():
|
||||
raise Exception("No more jobs on this page")
|
||||
utils.printyellow("No matching jobs found on this page, moving to next.")
|
||||
logger.debug("No matching jobs found on this page, skipping")
|
||||
return # Выход из метода, если нет больше подходящих вакансий
|
||||
except NoSuchElementException:
|
||||
pass
|
||||
|
||||
pass # Если элемент не найден, просто продолжаем
|
||||
|
||||
job_results = self.driver.find_element(By.CLASS_NAME, "jobs-search-results-list")
|
||||
utils.scroll_slow(self.driver, job_results)
|
||||
utils.scroll_slow(self.driver, job_results, step=300, reverse=True)
|
||||
job_list_elements = self.driver.find_elements(By.CLASS_NAME, 'scaffold-layout__list-container')[0].find_elements(By.CLASS_NAME, 'jobs-search-results__list-item')
|
||||
if not job_list_elements:
|
||||
raise Exception("No job class elements found on page")
|
||||
utils.printyellow("No job class elements found on page, moving to next page.")
|
||||
logger.debug("No job class elements found on page, skipping")
|
||||
return # Выход из метода, если нет вакансий на странице
|
||||
job_list = [Job(*self.extract_job_information_from_tile(job_element)) for job_element in job_list_elements]
|
||||
for job in job_list:
|
||||
if self.is_blacklisted(job.title, job.company, job.link):
|
||||
utils.printyellow(f"Blacklisted {job.title} at {job.company}, skipping...")
|
||||
logger.debug("Job blacklisted: %s at %s", job.title, job.company)
|
||||
self.write_to_file(job, "skipped")
|
||||
continue
|
||||
try:
|
||||
if job.apply_method not in {"Continue", "Applied", "Apply"}:
|
||||
self.easy_applier_component.job_apply(job)
|
||||
self.write_to_file(job, "success")
|
||||
logger.debug("Applied to job: %s at %s", job.title, job.company)
|
||||
except Exception as e:
|
||||
utils.printred(traceback.format_exc())
|
||||
logger.error("Failed to apply for %s at %s: %s", job.title, job.company, e)
|
||||
utils.printred(f"Failed to apply for {job.title} at {job.company}: {e}")
|
||||
self.write_to_file(job, "failed")
|
||||
continue
|
||||
|
||||
def write_to_file(self, job, file_name):
|
||||
logger.debug("Writing job application result to file: %s", file_name)
|
||||
pdf_path = Path(job.pdf_path).resolve()
|
||||
pdf_path = pdf_path.as_uri()
|
||||
data = {
|
||||
@ -157,18 +174,22 @@ class LinkedInJobManager:
|
||||
if not file_path.exists():
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump([data], f, indent=4)
|
||||
logger.debug("Job data written to new file: %s", file_path)
|
||||
else:
|
||||
with open(file_path, 'r+', encoding='utf-8') as f:
|
||||
try:
|
||||
existing_data = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("JSON decode error in file: %s", file_path)
|
||||
existing_data = []
|
||||
existing_data.append(data)
|
||||
f.seek(0)
|
||||
json.dump(existing_data, f, indent=4)
|
||||
f.truncate()
|
||||
logger.debug("Job data appended to existing file: %s", file_path)
|
||||
|
||||
def get_base_search_url(self, parameters):
|
||||
logger.debug("Constructing base search URL")
|
||||
url_parts = []
|
||||
if parameters['remote']:
|
||||
url_parts.append("f_CF=f_WRA")
|
||||
@ -188,33 +209,45 @@ class LinkedInJobManager:
|
||||
date_param = next((v for k, v in date_mapping.items() if parameters.get('date', {}).get(k)), "")
|
||||
url_parts.append("f_LF=f_AL") # Easy Apply
|
||||
base_url = "&".join(url_parts)
|
||||
return f"?{base_url}{date_param}"
|
||||
full_url = f"?{base_url}{date_param}"
|
||||
logger.debug("Base search URL constructed: %s", full_url)
|
||||
return full_url
|
||||
|
||||
def next_job_page(self, position, location, job_page):
|
||||
logger.debug("Navigating to next job page: %s in %s, page %d", position, location, job_page)
|
||||
self.driver.get(f"https://www.linkedin.com/jobs/search/{self.base_search_url}&keywords={position}{location}&start={job_page * 25}")
|
||||
|
||||
def extract_job_information_from_tile(self, job_tile):
|
||||
logger.debug("Extracting job information from tile")
|
||||
job_title, company, job_location, apply_method, link = "", "", "", "", ""
|
||||
try:
|
||||
job_title = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').text
|
||||
link = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').get_attribute('href').split('?')[0]
|
||||
company = job_tile.find_element(By.CLASS_NAME, 'job-card-container__primary-description').text
|
||||
except:
|
||||
pass
|
||||
logger.debug("Job information extracted: %s at %s", job_title, company)
|
||||
except NoSuchElementException:
|
||||
utils.printyellow("Some job information (title, link, or company) is missing.")
|
||||
logger.warning("Some job information (title, link, or company) is missing.")
|
||||
try:
|
||||
job_location = job_tile.find_element(By.CLASS_NAME, 'job-card-container__metadata-item').text
|
||||
except:
|
||||
pass
|
||||
except NoSuchElementException:
|
||||
utils.printyellow("Job location is missing.")
|
||||
logger.warning("Job location is missing.")
|
||||
try:
|
||||
apply_method = job_tile.find_element(By.CLASS_NAME, 'job-card-container__apply-method').text
|
||||
except:
|
||||
apply_method = "Applied"
|
||||
except NoSuchElementException:
|
||||
apply_method = "Applied" # Подразумеваем, что вакансия уже подана
|
||||
utils.printyellow("Apply method not found, assuming 'Applied'.")
|
||||
logger.warning("Apply method not found, assuming 'Applied'.")
|
||||
|
||||
return job_title, company, job_location, link, apply_method
|
||||
|
||||
def is_blacklisted(self, job_title, company, link):
|
||||
logger.debug("Checking if job is blacklisted: %s at %s", job_title, company)
|
||||
job_title_words = job_title.lower().split(' ')
|
||||
title_blacklisted = any(word in job_title_words for word in self.title_blacklist)
|
||||
company_blacklisted = company.strip().lower() in (word.strip().lower() for word in self.company_blacklist)
|
||||
link_seen = link in self.seen_jobs
|
||||
return title_blacklisted or company_blacklisted or link_seen
|
||||
is_blacklisted = title_blacklisted or company_blacklisted or link_seen
|
||||
logger.debug("Job blacklisted status: %s", is_blacklisted)
|
||||
return is_blacklisted
|
||||
|
80
src/utils.py
80
src/utils.py
@ -4,76 +4,97 @@ import time
|
||||
|
||||
from selenium import webdriver
|
||||
|
||||
import logging
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
chromeProfilePath = os.path.join(os.getcwd(), "chrome_profile", "linkedin_profile")
|
||||
|
||||
def ensure_chrome_profile():
|
||||
logger.debug("Ensuring Chrome profile exists at path: %s", chromeProfilePath)
|
||||
profile_dir = os.path.dirname(chromeProfilePath)
|
||||
if not os.path.exists(profile_dir):
|
||||
os.makedirs(profile_dir)
|
||||
logger.debug("Created directory for Chrome profile: %s", profile_dir)
|
||||
if not os.path.exists(chromeProfilePath):
|
||||
os.makedirs(chromeProfilePath)
|
||||
logger.debug("Created Chrome profile directory: %s", chromeProfilePath)
|
||||
return chromeProfilePath
|
||||
|
||||
def is_scrollable(element):
|
||||
scroll_height = element.get_attribute("scrollHeight")
|
||||
client_height = element.get_attribute("clientHeight")
|
||||
return int(scroll_height) > int(client_height)
|
||||
scrollable = int(scroll_height) > int(client_height)
|
||||
logger.debug("Element scrollable check: scrollHeight=%s, clientHeight=%s, scrollable=%s", scroll_height, client_height, scrollable)
|
||||
return scrollable
|
||||
|
||||
def scroll_slow(driver, scrollable_element, start=0, end=3600, step=100, reverse=False):
|
||||
logger.debug("Starting slow scroll: start=%d, end=%d, step=%d, reverse=%s", start, end, step, reverse)
|
||||
if reverse:
|
||||
start, end = end, start
|
||||
step = -step
|
||||
if step == 0:
|
||||
logger.error("Step value cannot be zero.")
|
||||
raise ValueError("Step cannot be zero.")
|
||||
script_scroll_to = "arguments[0].scrollTop = arguments[1];"
|
||||
try:
|
||||
if scrollable_element.is_displayed():
|
||||
if not is_scrollable(scrollable_element):
|
||||
logger.warning("The element is not scrollable.")
|
||||
print("The element is not scrollable.")
|
||||
return
|
||||
if (step > 0 and start >= end) or (step < 0 and start <= end):
|
||||
logger.warning("No scrolling will occur due to incorrect start/end values.")
|
||||
print("No scrolling will occur due to incorrect start/end values.")
|
||||
return
|
||||
for position in range(start, end, step):
|
||||
try:
|
||||
driver.execute_script(script_scroll_to, scrollable_element, position)
|
||||
logger.debug("Scrolled to position: %d", position)
|
||||
except Exception as e:
|
||||
logger.error("Error during scrolling: %s", e)
|
||||
print(f"Error during scrolling: {e}")
|
||||
time.sleep(random.uniform(1.0, 2.6))
|
||||
time.sleep(random.uniform(1.0, 1.6))
|
||||
driver.execute_script(script_scroll_to, scrollable_element, end)
|
||||
logger.debug("Scrolled to final position: %d", end)
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.warning("The element is not visible.")
|
||||
print("The element is not visible.")
|
||||
except Exception as e:
|
||||
logger.error("Exception occurred during scrolling: %s", e)
|
||||
print(f"Exception occurred: {e}")
|
||||
|
||||
def chromeBrowserOptions():
|
||||
logger.debug("Setting Chrome browser options")
|
||||
ensure_chrome_profile()
|
||||
options = webdriver.ChromeOptions()
|
||||
options.add_argument("--start-maximized") # Avvia il browser a schermo intero
|
||||
options.add_argument("--no-sandbox") # Disabilita la sandboxing per migliorare le prestazioni
|
||||
options.add_argument("--disable-dev-shm-usage") # Utilizza una directory temporanea per la memoria condivisa
|
||||
options.add_argument("--ignore-certificate-errors") # Ignora gli errori dei certificati SSL
|
||||
options.add_argument("--disable-extensions") # Disabilita le estensioni del browser
|
||||
options.add_argument("--disable-gpu") # Disabilita l'accelerazione GPU
|
||||
options.add_argument("window-size=1200x800") # Imposta la dimensione della finestra del browser
|
||||
options.add_argument("--disable-background-timer-throttling") # Disabilita il throttling dei timer in background
|
||||
options.add_argument("--disable-backgrounding-occluded-windows") # Disabilita la sospensione delle finestre occluse
|
||||
options.add_argument("--disable-translate") # Disabilita il traduttore automatico
|
||||
options.add_argument("--disable-popup-blocking") # Disabilita il blocco dei popup
|
||||
options.add_argument("--no-first-run") # Disabilita la configurazione iniziale del browser
|
||||
options.add_argument("--no-default-browser-check") # Disabilita il controllo del browser predefinito
|
||||
options.add_argument("--disable-logging") # Disabilita il logging
|
||||
options.add_argument("--disable-autofill") # Disabilita l'autocompletamento dei moduli
|
||||
options.add_argument("--disable-plugins") # Disabilita i plugin del browser
|
||||
options.add_argument("--disable-animations") # Disabilita le animazioni
|
||||
options.add_argument("--disable-cache") # Disabilita la cache
|
||||
options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"]) # Esclude switch della modalità automatica e logging
|
||||
options.add_argument("--start-maximized")
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
options.add_argument("--ignore-certificate-errors")
|
||||
options.add_argument("--disable-extensions")
|
||||
options.add_argument("--disable-gpu")
|
||||
options.add_argument("window-size=1200x800")
|
||||
options.add_argument("--disable-background-timer-throttling")
|
||||
options.add_argument("--disable-backgrounding-occluded-windows")
|
||||
options.add_argument("--disable-translate")
|
||||
options.add_argument("--disable-popup-blocking")
|
||||
options.add_argument("--no-first-run")
|
||||
options.add_argument("--no-default-browser-check")
|
||||
options.add_argument("--disable-logging")
|
||||
options.add_argument("--disable-autofill")
|
||||
options.add_argument("--disable-plugins")
|
||||
options.add_argument("--disable-animations")
|
||||
options.add_argument("--disable-cache")
|
||||
options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"])
|
||||
|
||||
# Preferenze per contenuti
|
||||
prefs = {
|
||||
"profile.default_content_setting_values.images": 2, # Disabilita il caricamento delle immagini
|
||||
"profile.managed_default_content_settings.stylesheets": 2, # Disabilita il caricamento dei fogli di stile
|
||||
"profile.default_content_setting_values.images": 2,
|
||||
"profile.managed_default_content_settings.stylesheets": 2,
|
||||
}
|
||||
options.add_experimental_option("prefs", prefs)
|
||||
|
||||
@ -82,22 +103,21 @@ def chromeBrowserOptions():
|
||||
profileDir = os.path.basename(chromeProfilePath)
|
||||
options.add_argument('--user-data-dir=' + initialPath)
|
||||
options.add_argument("--profile-directory=" + profileDir)
|
||||
logger.debug("Using Chrome profile directory: %s", chromeProfilePath)
|
||||
else:
|
||||
options.add_argument("--incognito")
|
||||
logger.debug("Using Chrome in incognito mode")
|
||||
|
||||
return options
|
||||
|
||||
|
||||
def printred(text):
|
||||
# Codice colore ANSI per il rosso
|
||||
RED = "\033[91m"
|
||||
RESET = "\033[0m"
|
||||
# Stampa il testo in rosso
|
||||
logger.debug("Printing text in red: %s", text)
|
||||
print(f"{RED}{text}{RESET}")
|
||||
|
||||
def printyellow(text):
|
||||
# Codice colore ANSI per il giallo
|
||||
YELLOW = "\033[93m"
|
||||
RESET = "\033[0m"
|
||||
# Stampa il testo in giallo
|
||||
print(f"{YELLOW}{text}{RESET}")
|
||||
logger.debug("Printing text in yellow: %s", text)
|
||||
print(f"{YELLOW}{text}{RESET}")
|
||||
|
Loading…
Reference in New Issue
Block a user