Handling data in a Separated Network Environment with AWS S3 & Lambda
we operate services in Japan, Taiwan, Korea, and North America. Due to the specific characteristics of each country/culture, we needed to separate service data and operate separate services for each region. However, we recently received a content development requirement for a new feature that involves conducting a vote among users from all countries. The biggest challenge is that the network is separated in each region, and there is no direct communication between them.
Ideally, it would be best if all the network environments were unified, but due to certain constraints, network integration is not feasible. In such cases, we designed the system to operate using a push-based approach rather than pulling data from each region.
By employing a push-based approach, each region can independently send the required data to a centralized location for further processing and calculations related to the voting feature. This approach allows each region to maintain its network separation while still contributing to the overall data collection and computation process.
Service Architecture
The diagram provided illustrates the service architecture in a network divided into JP (Japan) and other countries.
The service flow is as follows:
- Jenkins periodically executes a Spring Batch job.
- The batch job reads data from Redis and uploads it to S3 as a CSV file.
- Lambda functions read the data for each country, normalize it,
- Lambda stores the aggregated results.
- The batch job periodically reads data from S3 and uploads global data to Redis.
- The API server queries the required data from Redis for usage.
Spring Batch implementation
To implement the Spring Batch upload task for converting and uploading the voting results to S3 as CSV files, you can follow these steps:
- Create a Spring Batch job configuration class. This class will define the steps and job configuration for the upload task.
- Define a step for the upload task. This step will handle the conversion and upload process.
- Implement a custom item reader to retrieve the voting results from Redis for each country. This reader will fetch the data in a normalized format, taking into account the differences in user count and preferences between countries. You can use the RedisTemplate or Jedis library to interact with Redis.
- Implement a custom item processor to perform any necessary data normalization or transformations based on the specific requirements of each country. This processor will ensure fair and consistent voting across different user counts and preferences.
- Implement a custom item writer that converts the processed voting results into CSV format and uploads them to the S3 bucket. You can use the AWS SDK for Java (AWS SDK for S3) to interact with S3 and perform the file upload.
- Configure the Spring Batch job to include the defined step and its dependencies. Set up any additional job configurations, such as cron expressions, to control the job execution frequency.
By configuring and executing the Spring Batch job with the appropriate cron expression in Jenkins, you can control the batch upload task’s frequency and ensure that it aligns with the characteristics you mentioned. This approach allows you to consider factors such as user count variations and voting trends while keeping the lambda invocation and update costs minimal.
Remember to handle error scenarios and implement appropriate logging and monitoring to ensure the stability and reliability of the upload task.
Please note that the implementation details provided here are a high-level overview, and you’ll need to adapt them to your specific requirements and use case.
@Bean
@StepScope
public Tasklet uploadTask(@Value("#{jobParameters['evno']}") Integer evno) {
return (stepContribution, chunkContext) -> {
Map<Triple, Integer> data = getKeywordScores(evno);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(outputStream))) {
String[] header = {"kseq", "score"};
csvWriter.writeNext(header);
for (Map.Entry<Triple, Integer> entry : data.entrySet()) {
String[] row = {String.valueOf(entry.getKey()), String.valueOf(entry.getValue())};
csvWriter.writeNext(row);
}
}
String formattedDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmm"));
String dataFileKey = String.format("/data/%d/%s-%s.csv", evno,
COUNTRY.toLowerCase(),
formattedDate);
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(dataFileKey)
.build();
PutObjectResponse resp = Mono.fromFuture(s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromBytes(outputStream.toByteArray())))
.subscribeOn(Schedulers.boundedElastic())
.block();
return resp.sdkHttpResponse().isSuccessful() ? RepeatStatus.FINISHED : RepeatStatus.CONTINUABLE;
};
}
AWS Lambda Implementation
You are correct that when uploading CSV files to S3 using Spring Batch, you can configure an S3 event to trigger a Lambda function. It’s essential to set the appropriate prefix to avoid infinite triggering of events between S3 and Lambda.
import urllib.parse
def lambda_handler(event, context):
# Retrieve the S3 bucket and key information from the event
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
# Parse the key to extract event number, locale, and stage information
evno, locale, stage = parse_path(key)
# Retrieve the latest file from S3 for the specific event number and locale
latest_file = get_latest_file(bucket, evno, locale)
# Perform data normalization on the retrieved file
result_df = get_normalized_result(bucket, latest_file)
# Upload the latest result data to S3
upload_s3_result(bucket, result_df, evno, locale)
# Upload the aggregated global result
upload_global_result(bucket, evno)
# Return the response with the processed information
return {
'statusCode': 200,
'body': {'bucket': bucket, 'evno': evno, 'locale': locale, 'stage': stage}
}
Review
I developed a service that handles data writing and reading in a separated network environment using an event-driven approach. Leveraging AWS made it easier to solve infrastructure-related issues. However, there was an increase in management points, so it is crucial to define and manage settings, such as S3 expiration time and Lambda cost, in advance to minimize the management overhead.