# -*- coding: utf-8 -*-
"""
A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py,
but instead using modern boto3 commands.
"""
try:
from urlparse import urlsplit
except:
from urllib.parse import urlsplit
import boto3
from botocore.exceptions import ClientError
from luigi import configuration
from luigi.format import get_default_format
from luigi.target import FileAlreadyExists, FileSystem, FileSystemTarget, AtomicLocalFile
"""Bytes in MiB"""
BYTES_IN_MiB=1.049e+6
S3_DIRECTORY_MARKER_SUFFIX = '/'
[docs]def merge_dicts(*dicts):
"Merge dicts together, with later entries overriding earlier ones."
merged = {}
for d in dicts:
merged.update(d)
return merged
[docs]def parse_s3_path(path):
"For a given S3 path, return the bucket and key values"
parsed_path = urlsplit(path)
s3_bucket = parsed_path.netloc
s3_key = parsed_path.path.lstrip('/')
return (s3_bucket, s3_key)
[docs]class S3FS(FileSystem):
def __init__(self, **kwargs):
luigi_s3_config = self._get_s3_client_config()
s3_config = merge_dicts(kwargs, luigi_s3_config)
self.s3 = boto3.resource('s3', **s3_config)
self.s3_client = boto3.client('s3', **s3_config)
def _get_s3_client_config(self):
defaults = dict(configuration.get_config().defaults())
try:
config = dict(configuration.get_config().items('s3'))
except NoSectionError:
return {}
return config
def _is_root(self, path):
(s3_bucket, s3_key) = parse_s3_path(path)
if not s3_key or s3_key == S3_DIRECTORY_MARKER_SUFFIX:
return True
return False
def _add_path_delimiter(self, key):
if not key or key.endswith(S3_DIRECTORY_MARKER_SUFFIX):
return key
else:
return key + S3_DIRECTORY_MARKER_SUFFIX
[docs] def exists(self, path):
"Return true if S3 key exists"
# Root path always exists
if self._is_root(path):
return True
(s3_bucket, s3_key) = parse_s3_path(path)
s3_obj = self.s3.Object(s3_bucket, s3_key)
try:
s3_obj.reload()
except ClientError as err:
if err.response['Error']['Message'] == 'Not Found':
return False
else:
raise
return True
[docs] def remove(self, path, recursive=True):
"Remove a file or directory from S3"
if self._is_root(path):
raise InvalidDeleteException('Cannot delete root of bucket at path {0}'.format(path))
(s3_bucket, s3_key) = parse_s3_path(path)
s3_obj = self.s3.Object(s3_bucket, s3_key)
if not self.exists(path):
logger.debug('Could not delete %s; path does not exist', path)
return False
s3_obj.delete()
logger.debug('Deleting %s from bucket %s', s3_key, s3_bucket)
return True
[docs] def mkdir(self, path, parents=True, raise_if_exists=False):
if self._is_root(path):
return
if raise_if_exists and self.isdir(path):
raise FileAlreadyExists()
(s3_bucket, s3_key) = parse_s3_path(path)
s3_obj = self.s3.Object(s3_bucket, s3_key)
return s3_obj.put(Body=b'')
[docs] def isdir(self, path):
(s3_bucket, s3_key) = parse_s3_path(path)
if not s3_key or s3_key.endswith(S3_DIRECTORY_MARKER_SUFFIX):
return True
# If there's keys under it, it's a dir
response = self.s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_key)
if response['Contents']:
return True
return False
[docs] def listdir(self, path):
(s3_bucket, s3_key) = parse_s3_path(path)
key_path_len = len(s3_key)
response = self.s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_key)
return [obj['Key'][key_path_len:] for obj in response['Contents']]
[docs] def copy(self, path, dest):
(source_s3_bucket, source_s3_key) = parse_s3_path(path)
(dest_s3_bucket, dest_s3_key) = parse_s3_path(dest)
dest_s3_obj = self.s3.Object(dest_s3_bucket, dest_s3_key)
return dest_s3_obj.copy({'Bucket': source_s3_bucket, 'Key': source_s3_key})
[docs] def move(self, path, dest):
self.copy(path, dest)
self.remove(path)
[docs] def du(self, path):
(s3_bucket, s3_key) = parse_s3_path(path)
response = self.s3_client.head_object(Bucket=s3_bucket, Key=s3_key)
return response['ContentLength']/BYTES_IN_MiB
[docs]class S3Target(FileSystemTarget):
fs = None
def __init__(self, path, s3_args={}, **kwargs):
super(S3Target, self).__init__(path)
self.path = path
(self.s3_bucket, self.s3_key) = parse_s3_path(self.path)
self.fs = S3FS(**s3_args)
self.s3_obj = self.fs.s3.Object(self.s3_bucket, self.s3_key)
self.s3_obj_options = kwargs
[docs] def open(self, mode='rb'):
if mode not in ('rb', 'wb'):
raise ValueError("Unsupported open mode '{0}'".format(mode))
if mode == 'rb':
return self.s3_obj.get()['Body']
else:
return AtomicS3File(self.path, self.s3_obj, **self.s3_obj_options)
[docs]class AtomicS3File(AtomicLocalFile):
def __init__(self, path, s3_obj, **kwargs):
self.s3_obj = s3_obj
super(AtomicS3File, self).__init__(path)
self.s3_obj_options = kwargs
[docs] def move_to_final_destination(self):
self.s3_obj.upload_file(self.tmp_path, **self.s3_obj_options)