Technology

How to read CSV file from S3 bucket using java ?

How to read csv file from S3 bucket using java ?

I have seen in AWS(Amazon web service) getting many articles using phython, nodejs etc but very few in java. Actually I was facing problem in java then I thought I should write a blog.

I hope whoever is reading,  have some knowledge in Amazon Web Service like lambda function, S3 bucket, CloudWatch log file etc.

I have written one lambda function which is able to read data from CSV file and able to write in CloudWatch log.

Lambda function inside a class

package learnandgrow.projects;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.net.URLDecoder;

import java.util.ArrayList;

import java.util.List;

import com.amazonaws.regions.Region;

import com.amazonaws.regions.Regions;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;

import

com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;

import com.amazonaws.services.dynamodbv2.document.DynamoDB;

import com.amazonaws.services.dynamodbv2.document.Item;

import com.amazonaws.services.dynamodbv2.document.TableWriteItems;

import com.amazonaws.services.lambda.runtime.Context;

import com.amazonaws.services.lambda.runtime.LambdaLogger;

import com.amazonaws.services.lambda.runtime.RequestHandler;

import com.amazonaws.services.lambda.runtime.events.S3Event;

import com.amazonaws.services.s3.AmazonS3;

import com.amazonaws.services.s3.AmazonS3Client;

import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;

import com.amazonaws.services.s3.model.GetObjectRequest;

import com.amazonaws.services.s3.model.S3Object;

import com.google.common.collect.Lists;

import au.com.bytecode.opencsv.CSVReader;

public class LambdaFunctionHandler implements RequestHandler<S3Event, Report> {

/** Provide the AWS region which your DynamoDB table is hosted. */

Region AWS_REGION = Region.getRegion(Regions.US_EAST_1);

/** The DynamoDB table name. */

String DYNAMO_TABLE_NAME = “def_specification”;

public Report handleRequest(S3Event s3event, Context context) {

long startTime = System.currentTimeMillis();

Report statusReport = new Report();

LambdaLogger logger = context.getLogger();

logger.log(“Lambda Function Started”);

logger.log(“I am inside lambda function”);

Helper helper = new Helper();

try {

S3EventNotificationRecord record = s3event.getRecords().get(0);

String srcBucket = record.getS3().getBucket().getName();

String srcKey =record.getS3().getObject().getKey().replace(‘+’, ‘ ‘);

srcKey = URLDecoder.decode(srcKey, “UTF-8”);

AmazonS3 s3Client = new AmazonS3Client();

S3Object s3Object = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey));

statusReport.setFileSize(s3Object.getObjectMetadata().getContentLength());

logger.log(“S3 Event Received: “ + srcBucket + “/” + srcKey);

BufferedReader br = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()));

CSVReader reader = new CSVReader(br);

AmazonDynamoDB dynamoDBClient = new AmazonDynamoDBClient();

dynamoDBClient.setRegion(AWS_REGION);

DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);

TableWriteItems energyDataTableWriteItems = new TableWriteItems(DYNAMO_TABLE_NAME);

List<Item> itemList = new ArrayList<Item>();

String[] nextLine;

logger.log(“I am inside DynamoDB-Table——>>”+br.readLine());

while ((nextLine = reader.readNext()) != null) {

Item newItem = helper.parseIt(nextLine);

itemList.add(newItem);

}

for (List<Item> partition : Lists.partition(itemList, 25)) {

energyDataTableWriteItems.withItemsToPut(partition);

BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(energyDataTableWriteItems);

logger.log(“I am inside for loop”);

}

logger.log(“Load finish in “ + String.valueOf(System.currentTimeMillis() – startTime) + “ms”);

reader.close();

br.close();

s3Object.close();

statusReport.setStatus(true);

} catch (Exception ex) {

logger.log(ex.getMessage());

}

statusReport.setExecutiongTime(System.currentTimeMillis() – startTime);

return statusReport;

}

}

The above class is responsible to read data csv file from s3 bucket In the above while loop it collecting data and passing to the parseIt method of Helper class. In the helper class we can keep any number variable that depends on how many number of columns are there in our CSV file . I have only 3 columns in my csv file I.e specification_id, specification_category,specification_name.

LambdaLogger logger = context.getLogger(); Logger I have used to print the log in the CloudWatch Log. So that it is easy to track in AWS.

Now we can come to the Helper class

Helper.java

package learnandgrow.projects;

import java.text.ParseException;

import com.amazonaws.services.dynamodbv2.document.Item;

public class Helper {

public Item parseIt(String[] nextLine) throws ParseException {

Item newItem = new Item();

String specification_id = nextLine[0];

String specification_category;

String specification_name;

if (nextLine[1] != null && !nextLine[1].isEmpty()) {

specification_category = nextLine[1];

}else {

specification_category =” “;

}

if (nextLine[2] != null && !nextLine[2].isEmpty()) {

specification_name = nextLine[2];

}else {

specification_name =” “;

}

newItem.withPrimaryKey(“specification_id”, specification_id);

newItem.withString(“specification_category”, specification_category);

newItem.withString(“specification_name”, specification_name);

return newItem;

}

}

In this I have specification_id as primary key.

If we use above code then we can easily read csv file from s3 bucket and display the output in CloudWatch log.

Thank you