Spark & MongoDB Integration for LSST Workshop
Explore the use of Spark and MongoDB for processing workflows in the LSST workshop, focusing on parallelism, distribution, intermediate data handling, data management, and distribution methods. Learn about converting data formats, utilizing GeoSpark for 2D indexing, and comparing features with QServ. Dive into the simplified process of image simulation, observation, calibration, object detection, and cataloging. Understand the typical numbers and algorithms involved in LSST data processing, including image creation, data partitioning, and indexing.
Download Presentation
Please find below an Image/Link to download the presentation.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.
E N D
Presentation Transcript
Spark & MongoDb for LSST Christian Arnault (LAL) R za Ansari (LAL) Fabrice Jammes (LPC Clermont) Osman Aidel (CCIN2P3) C sar Richard (U-PSud) June, 15 2015 LSST Workshop - CCIN2P3 1
Topics Spark How to consider parallelism & distribution in the processing workflows How to cope with Intermediate data Manage steps in the workflow Production the final data (catalogues) How to distribute data (data formats) Avro/Parquet (converting FITS format) MongoDb To understand whether Mongo might offer similiar features as QServ Spark (again) Same question but using the Spark-Dataframe technology Combined with the GeoSpark module for 2D indexing June, 15 2015 LSST Workshop - CCIN2P3 2
Spark: the simplified process Simulation Images Observation Calibration Sky background Object Detection Reference Catalogues Objets {x, y, flux} Photom try, PhotoZ Astrom try Measured Objets {RA, DEC, flux, magnitude, Z} LSST Workshop - CCIN2P3 Catalogues June, 15 2015 3
Typical numbers Camera 3,2 Gpixels 15 To per night (x 10 years) Image Diameter: 3.5 / 64cm -> 9,6 (Moon = 0,5 ) ~ 300 000 x 6 CCD images 189 CCDs / 6 filters CCD 16 Mpixels (= 1 FITS file) 16 cm 3 Go/s 0,05 = 3 2.9 Pixels 10 m , 0,2 arc-secs 2 bytes June, 15 2015 LSST Workshop - CCIN2P3 4
Algorithms Simulation: Apply a gaussian pattern with common width (i.e. we only consider atmosphere and optical aberrations) + some noise Detection: Convolution with a gaussian pattern for PSF Handle an overlap margin for objects close to the image border Identification: Search for geo-2D coordinates from reference catalogues Handling large number of datafiles Based on multiple indexing keys(run, filter, ra, dec, ) aka data butler Studying the transfer mechanisms Throughput, serialization June, 15 2015 LSST Workshop - CCIN2P3 5
June, 15 2015 LSST Workshop - CCIN2P3 6
Images creation Declare a schema For: Serialization of images For data partitioning & indexing def make_schema(): schema = StructType([ StructField("id", IntegerType(), True), StructField("run", IntegerType(), True), StructField("ra", DoubleType(), True), StructField("dec", DoubleType(), True), StructField("image", ArrayType(ArrayType(DoubleType()), True))]) - - return schema def create_image(spark): runs = ... rows = 3; cols = 3; region_size = 4000 images = []; image_id = 0 # initialize image descriptors for run in range(runs) for r in range(rows): for c in range(cols): ra = ...; dec = ... images.append((image_id, run, ra, dec)) image_id += 1 def fill_image(image): filled = ... return filled rdd = sc.parallelize(images).map(lambda x: fill_image(x)) df = spark.createDataFrame(rdd, make_schema()) df.write.format("com.databricks.spark.avro") \ .mode("overwrite") \ .save("./images") June, 15 2015 Spark LSST Workshop - CCIN2P3 7
Working on images using RDD Structured data Selection via map, filter operations The User Defined Functions (UDF) may be written in any language Eg: In C++ and interfaced using PyBind def analyze(x): return 'analyze image', x[0] def read_images(spark): df = spark.read.format("com.databricks.spark.avro").load("./images") rdd = (df.rdd .filter(lambda x: x[1] == 3) .map(lambda x: analyze(x))) Select a data subset result = rdd.collect() print(result) June, 15 2015 LSST Workshop - CCIN2P3 8
Working on images Using DataFrame Appears like row-colums Image Indexing by run/patch/ra/dec/filter def analyze(x): return 'analyze image', x[0] def read_images(spark): analyze = functions.udf(lambda m: analyze(m), <type>) df = (spark.read.load("./images") .filter(df.run == 3) .select(df.run, analyze(df.image).alias('image'))) df.show() June, 15 2015 LSST Workshop - CCIN2P3 9
June, 15 2015 LSST Workshop - CCIN2P3 10
Using MongoDB for ref. catalog Object ingestion client = pymongo.MongoClient(MONGO_URL) lsst = client.lsst stars = lsst.stars for o_id in objects: o = objects[o_id] Conversion to BSON object = o.to_db() object['center'] = {'type': 'Point', 'coordinates': [o.ra, o.dec]} Add 2D indexing id = stars.insert_one(object) stars.create_index([('center', '2dsphere')]) Object finding center = [[cluster.ra(), cluster.dec()] for o in stars.find({'center': {'$geoWithin': {'$centerSphere': center, radius]}}}, {'_id': 0, 'where': 1, 'center': 1}): print('identified object') June, 15 2015 LSST Workshop - CCIN2P3 11
The Spark cluster @ LAL Operated in the context of VirtualData and the mutualisation project ERM/MRM (Universit Paris-Sud) This project groups several research teams in U-PSud (genomics, bio-informatics, LSST) both studying the Spark technology. We had a Spark school (in march 2017) (with the help of an expert from Databricks) June, 15 2015 LSST Workshop - CCIN2P3 12
U-Psud: OpenStack, CentOS7 108 cores 192 RAM 12 To Mongo 4 To LSST Master 18c Hadoop 2.6.5 Spark 2.1.0 Java 1.8 Python 3.5 Mongo 3.4 32Go 2.5 To HDFS Worker 18c Worker 18c Worker 18c Worker 18c 2 To Worker 18c 2 To Worker 18c 32Go 2 To 32Go 2 To 32Go 2 To 32Go 2 To 32Go 32Go June, 15 2015 LSST Workshop - CCIN2P3 13
MongoDb Several functional characteristics of the QServ system seem to be obtained using the MongoDb tool, Among which we may quote: Ability to distribute both the database and the server through the intrinsic Sharding mechanism. Indexing against 2D coordinates of the objects Indexing against a sky splitting in chunks (so as to drive the sharding) Thus, the study purpose is to evaluate if: the MongoDb database offers natively comparable or equivalent functionality the performances are comparable. June, 15 2015 LSST Workshop - CCIN2P3 14
MongoDb in the Galactica cluster One single server Name: MongoServer_1 Gabarit: C1.large RAM: 4Go VCPUs: 8 VCPU Disk: 40Go The tests are operated upon a dataset of 1.9 To: Object (79226537 documents) Source (1426096034 documents) ForcedSource (7151796541 documents) ObjectFullOverlap (32367384 documents) These catalogues are prepared to concern sky regions (identified by a chunkId). Therefore, 324 sky regions are available for any of the 4 catalog types. June, 15 2015 LSST Workshop - CCIN2P3 15
Operations Ingestion: Translating the SQL schema into a MongoDb schema (i.e. selecting the data types) Ingesting the CSV lines Automatic creation of the indexes from the SQL keys described in the SQL schema. Testing simple queries select count(*) from Object select count(*) from ForcedSource SELECT ra, decl FROM Object WHERE deepSourceId = 2322374716295173; 0.014 seconds SELECT ra, decl FROM Object WHERE qserv_areaspec_box( ); select count(*) from Object where y_instFlux > 5; select min(ra), max(ra), min(decl), max(decl) from Object; 0.432 seconds select count(*) from Source where flux_sinc between 1 and 2; 0.354 seconds select count(*) from Source where flux_sinc between 2 and 3; 0.076 seconds 0.002 seconds 0.000 seconds 0.343 seconds 0.008 seconds But measures done with indexes on quantities We don t want to index any of 300 parameters Better structure space parameters and index over groups LSST Workshop - CCIN2P3 16
Joins, Aggregations Mongo operate complex queries using an aggregation of map-reduce operations (based on iterators) Example: finding all neighbours with distance < Dmax within a region select a sky region around a reference point build a self-join so as to obtain a list of object couples compute the distance between objects in every couple select all computed distances lower than a maximum value. June, 15 2015 LSST Workshop - CCIN2P3 17
Aggregation result = lsst.Object.aggregate( [ {'$geoNear': { 'near': [ra0, dec0], 'query': { 'loc': { '$geoWithin': {'$box': [bottomleft, topright] } } }, 'distanceField': 'dist', } }, {'$lookup': {'from':'Object', 'localField':'Object.loc', 'foreignField':'Object.loc', 'as': neighbours'} }, {'$unwind': '$neighbours'}, {'$redact': { '$cond': [{ '$eq': ["$_id", "$ neighbours._id"] }, "$$PRUNE", "$$KEEP" ] } }, {'$addFields': {'dist': dist} }, {'$match': {'dist': { '$lt': 1 } }, {'$project': {'_id': 0, 'loc':1, ' neighbours.loc':1, 'dist': 1}}, ] ) Select objects in a region Construct all pairs within the region Flatten the list Remove the duplication Compute the distance between pairs Filter Final projection June, 15 2015 LSST Workshop - CCIN2P3 18
Spark/Dataframes Context Same dataset, same objective VirtualData cluster @ LAL Ingest the dataset using the CSV connector to Dataframes Operate SQL-like API to query Use the GeoSpark for 2D navigation, filtering, indexing http://geospark.datasyslab.org/ Objects: Point, Rectangle, Polygon, LineString Spatial index: R-Tree and Quad-Tree Geometrical operations: Minimum Bounding Rectangle, PolygonUnion, and Overlap/Inside(Self-Join) Spatial query operations: Spatial range query, spatial join Jia Yu, Jinxuan Wu, Mohamed Sarwat. In Proceeding of query and spatial KNN query IEEE International Conference on Data Engieering IEEE ICDE 2016, Helsinki, Finland May 2016 June, 15 2015 LSST Workshop - CCIN2P3 19
CSV Ingestion to Spark Get the SQL Schema & produce the Spark representation of this schema catalog.read_schema() set_schema_structures() spark = SparkSession.builder.appName("StoreCatalog").getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) Get CSV files from HDFS cat = subprocess.Popen(["hadoop", "fs", "-ls", "/user/christian.arnault/swift"], stdout=subprocess.PIPE) for line in cat.stdout: file_name = line.split('/')[-1].strip() schema = read_data(file_name) Get the Spark Schema df = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', delimiter=';') \ .load('swift/' + file_name, schema = schema.structure) Read the CSV file Append the data into the dataframe df.write.format("com.databricks.spark.avro")\ .mode(write_mode).partitionBy('chunkId').save("./lsstdb") June, 15 2015 LSST Workshop - CCIN2P3 20
Read the dataframe and query val conf = new SparkConf().setAppName("DF") val sc = new SparkContext(conf) Read the dataframe from HDFS using the Avro serializer val spark = SparkSession .builder() .appName("Read Dataset") .getOrCreate() Scala val sqlContext = new SQLContext(sc) var df = time("Load db", sqlContext. read. format("com.databricks.spark.avro"). load("./lsstdb")) Perform queries val df = time("sort", df.select("ra", "decl", "chunkId").sort("ra")) val seq = time("collect", df.rdd.take(10)) println(seq) June, 15 2015 LSST Workshop - CCIN2P3 21
Conclusion Spark is a rich and promising eco-system But it needs configuration understanding: Memory (RAM) Partitioning data (throughput) Building the pipeline (as a DAG of process) Understanding the monitoring tools (eg. Ganglia) MongoDb: Powerful, but based on a very different paradigm as SQL (map-reduce based) I observed strange performance results that need to be understood Spark for catalogues Migrating to Spark/Dataframe seems to be really encouraging and should not show the same limitations Primary results are at least better than Mongo (especially at ingestion step) GeoSpark powerful and meant to support very large datasets June, 15 2015 LSST Workshop - CCIN2P3 22